|
@@ -195,6 +195,9 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
|
|
|
// Run runs dispatcher tasks which should be run on leader dispatcher.
|
|
|
// Dispatcher can be stopped with cancelling ctx or calling Stop().
|
|
|
func (d *Dispatcher) Run(ctx context.Context) error {
|
|
|
+ ctx = log.WithModule(ctx, "dispatcher")
|
|
|
+ log.G(ctx).Info("dispatcher starting")
|
|
|
+
|
|
|
d.taskUpdatesLock.Lock()
|
|
|
d.taskUpdates = make(map[string]*api.TaskStatus)
|
|
|
d.taskUpdatesLock.Unlock()
|
|
@@ -208,7 +211,6 @@ func (d *Dispatcher) Run(ctx context.Context) error {
|
|
|
d.mu.Unlock()
|
|
|
return errors.New("dispatcher is already running")
|
|
|
}
|
|
|
- ctx = log.WithModule(ctx, "dispatcher")
|
|
|
if err := d.markNodesUnknown(ctx); err != nil {
|
|
|
log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err)
|
|
|
}
|
|
@@ -310,8 +312,12 @@ func (d *Dispatcher) Stop() error {
|
|
|
d.mu.Unlock()
|
|
|
return errors.New("dispatcher is already stopped")
|
|
|
}
|
|
|
+
|
|
|
+ log := log.G(d.ctx).WithField("method", "(*Dispatcher).Stop")
|
|
|
+ log.Info("dispatcher stopping")
|
|
|
d.cancel()
|
|
|
d.mu.Unlock()
|
|
|
+
|
|
|
d.nodes.Clean()
|
|
|
|
|
|
d.processUpdatesLock.Lock()
|
|
@@ -361,13 +367,15 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
|
|
|
if node.Status.State == api.NodeStatus_DOWN {
|
|
|
nodeCopy := node
|
|
|
expireFunc := func() {
|
|
|
+ log.Infof("moving tasks to orphaned state for node: %s", nodeCopy.ID)
|
|
|
if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil {
|
|
|
- log.WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
|
|
|
+ log.WithError(err).Errorf(`failed to move all tasks for node %s to "ORPHANED" state`, node.ID)
|
|
|
}
|
|
|
|
|
|
d.downNodes.Delete(nodeCopy.ID)
|
|
|
}
|
|
|
|
|
|
+ log.Infof(`node %s was found to be down when marking unknown on dispatcher start`, node.ID)
|
|
|
d.downNodes.Add(nodeCopy, expireFunc)
|
|
|
return nil
|
|
|
}
|
|
@@ -379,16 +387,16 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
|
|
|
|
|
|
expireFunc := func() {
|
|
|
log := log.WithField("node", nodeID)
|
|
|
- log.Debug("heartbeat expiration for unknown node")
|
|
|
+ log.Info(`heartbeat expiration for node %s in state "unknown"`, nodeID)
|
|
|
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil {
|
|
|
log.WithError(err).Error(`failed deregistering node after heartbeat expiration for node in "unknown" state`)
|
|
|
}
|
|
|
}
|
|
|
if err := d.nodes.AddUnknown(node, expireFunc); err != nil {
|
|
|
- return errors.Wrap(err, `adding node in "unknown" state to node store failed`)
|
|
|
+ return errors.Wrapf(err, `adding node %s in "unknown" state to node store failed`, nodeID)
|
|
|
}
|
|
|
if err := store.UpdateNode(tx, node); err != nil {
|
|
|
- return errors.Wrap(err, "update failed")
|
|
|
+ return errors.Wrapf(err, "update for node %s failed", nodeID)
|
|
|
}
|
|
|
return nil
|
|
|
})
|
|
@@ -470,6 +478,7 @@ func nodeIPFromContext(ctx context.Context) (string, error) {
|
|
|
|
|
|
// register is used for registration of node with particular dispatcher.
|
|
|
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
|
|
|
+ logLocal := log.G(ctx).WithField("method", "(*Dispatcher).register")
|
|
|
// prevent register until we're ready to accept it
|
|
|
dctx, err := d.isRunningLocked()
|
|
|
if err != nil {
|
|
@@ -491,7 +500,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
|
|
|
|
|
|
addr, err := nodeIPFromContext(ctx)
|
|
|
if err != nil {
|
|
|
- log.G(ctx).WithError(err).Debug("failed to get remote node IP")
|
|
|
+ logLocal.WithError(err).Debug("failed to get remote node IP")
|
|
|
}
|
|
|
|
|
|
if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil {
|
|
@@ -499,13 +508,14 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
|
|
|
}
|
|
|
|
|
|
expireFunc := func() {
|
|
|
- log.G(ctx).Debug("heartbeat expiration")
|
|
|
+ log.G(ctx).Debug("heartbeat expiration for worker %s, setting worker status to NodeStatus_DOWN ", nodeID)
|
|
|
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil {
|
|
|
log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
rn := d.nodes.Add(node, expireFunc)
|
|
|
+ logLocal.Infof("worker %s was successfully registered", nodeID)
|
|
|
|
|
|
// NOTE(stevvooe): We need be a little careful with re-registration. The
|
|
|
// current implementation just matches the node id and then gives away the
|
|
@@ -1029,6 +1039,8 @@ func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error {
|
|
|
|
|
|
// markNodeNotReady sets the node state to some state other than READY
|
|
|
func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
|
|
|
+ logLocal := log.G(d.ctx).WithField("method", "(*Dispatcher).markNodeNotReady")
|
|
|
+
|
|
|
dctx, err := d.isRunningLocked()
|
|
|
if err != nil {
|
|
|
return err
|
|
@@ -1048,6 +1060,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
|
|
|
}
|
|
|
|
|
|
expireFunc := func() {
|
|
|
+ log.G(dctx).Debugf(`worker timed-out %s in "down" state, moving all tasks to "ORPHANED" state`, id)
|
|
|
if err := d.moveTasksToOrphaned(id); err != nil {
|
|
|
log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
|
|
|
}
|
|
@@ -1056,6 +1069,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
|
|
|
}
|
|
|
|
|
|
d.downNodes.Add(node, expireFunc)
|
|
|
+ logLocal.Debugf("added node %s to down nodes list", node.ID)
|
|
|
|
|
|
status := &api.NodeStatus{
|
|
|
State: state,
|
|
@@ -1080,6 +1094,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
|
|
|
if rn := d.nodes.Delete(id); rn == nil {
|
|
|
return errors.Errorf("node %s is not found in local storage", id)
|
|
|
}
|
|
|
+ logLocal.Debugf("deleted node %s from node store", node.ID)
|
|
|
|
|
|
return nil
|
|
|
}
|
|
@@ -1094,6 +1109,8 @@ func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*a
|
|
|
}
|
|
|
|
|
|
period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID)
|
|
|
+
|
|
|
+ log.G(ctx).WithField("method", "(*Dispatcher).Heartbeat").Debugf("received heartbeat from worker %v, expect next heartbeat in %v", nodeInfo, period)
|
|
|
return &api.HeartbeatResponse{Period: period}, err
|
|
|
}
|
|
|
|
|
@@ -1206,6 +1223,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ log.Infof("dispatcher session dropped, marking node %s down", nodeID)
|
|
|
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil {
|
|
|
log.WithError(err).Error("failed to remove node")
|
|
|
}
|