|
@@ -293,9 +293,16 @@ func (d *Dispatcher) Run(ctx context.Context) error {
|
|
|
publishManagers(ev.([]*api.Peer))
|
|
|
case <-d.processUpdatesTrigger:
|
|
|
d.processUpdates(ctx)
|
|
|
+ batchTimer.Stop()
|
|
|
+ // drain the timer, if it has already expired
|
|
|
+ select {
|
|
|
+ case <-batchTimer.C:
|
|
|
+ default:
|
|
|
+ }
|
|
|
batchTimer.Reset(maxBatchInterval)
|
|
|
case <-batchTimer.C:
|
|
|
d.processUpdates(ctx)
|
|
|
+ // batch timer has already expired, so no need to drain
|
|
|
batchTimer.Reset(maxBatchInterval)
|
|
|
case v := <-configWatcher:
|
|
|
cluster := v.(api.EventUpdateCluster)
|
|
@@ -416,7 +423,7 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
|
|
|
|
|
|
expireFunc := func() {
|
|
|
log := log.WithField("node", nodeID)
|
|
|
- log.Info(`heartbeat expiration for node %s in state "unknown"`, nodeID)
|
|
|
+ log.Infof(`heartbeat expiration for node %s in state "unknown"`, nodeID)
|
|
|
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil {
|
|
|
log.WithError(err).Error(`failed deregistering node after heartbeat expiration for node in "unknown" state`)
|
|
|
}
|
|
@@ -537,7 +544,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
|
|
|
}
|
|
|
|
|
|
expireFunc := func() {
|
|
|
- log.G(ctx).Debug("heartbeat expiration for worker %s, setting worker status to NodeStatus_DOWN ", nodeID)
|
|
|
+ log.G(ctx).Debugf("heartbeat expiration for worker %s, setting worker status to NodeStatus_DOWN ", nodeID)
|
|
|
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil {
|
|
|
log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration")
|
|
|
}
|