|
@@ -125,12 +125,17 @@ type clusterUpdate struct {
|
|
|
|
|
|
// Dispatcher is responsible for dispatching tasks and tracking agent health.
|
|
|
type Dispatcher struct {
|
|
|
- // mu is a lock to provide mutually exclusive access to dispatcher fields
|
|
|
- // e.g. lastSeenManagers, networkBootstrapKeys, lastSeenRootCert etc.
|
|
|
+ // Mutex to synchronize access to dispatcher shared state e.g. nodes,
|
|
|
+ // lastSeenManagers, networkBootstrapKeys etc.
|
|
|
+ // TODO(anshul): This can potentially be removed and rpcRW used in its place.
|
|
|
mu sync.Mutex
|
|
|
- // shutdownWait is used by stop() to wait for existing operations to finish.
|
|
|
- shutdownWait sync.WaitGroup
|
|
|
-
|
|
|
+ // WaitGroup to handle the case when Stop() gets called before Run()
|
|
|
+ // has finished initializing the dispatcher.
|
|
|
+ wg sync.WaitGroup
|
|
|
+ // This RWMutex synchronizes RPC handlers and the dispatcher stop().
|
|
|
+ // The RPC handlers use the read lock while stop() uses the write lock
|
|
|
+ // and acts as a barrier to shutdown.
|
|
|
+ rpcRW sync.RWMutex
|
|
|
nodes *nodeStore
|
|
|
store *store.MemoryStore
|
|
|
lastSeenManagers []*api.WeightedPeer
|
|
@@ -253,11 +258,8 @@ func (d *Dispatcher) Run(ctx context.Context) error {
|
|
|
defer cancel()
|
|
|
d.ctx, d.cancel = context.WithCancel(ctx)
|
|
|
ctx = d.ctx
|
|
|
-
|
|
|
- // If Stop() is called, it should wait
|
|
|
- // for Run() to complete.
|
|
|
- d.shutdownWait.Add(1)
|
|
|
- defer d.shutdownWait.Done()
|
|
|
+ d.wg.Add(1)
|
|
|
+ defer d.wg.Done()
|
|
|
d.mu.Unlock()
|
|
|
|
|
|
publishManagers := func(peers []*api.Peer) {
|
|
@@ -320,15 +322,18 @@ func (d *Dispatcher) Stop() error {
|
|
|
return errors.New("dispatcher is already stopped")
|
|
|
}
|
|
|
|
|
|
- // Cancel dispatcher context.
|
|
|
- // This should also close the the streams in Tasks(), Assignments().
|
|
|
+ log := log.G(d.ctx).WithField("method", "(*Dispatcher).Stop")
|
|
|
+ log.Info("dispatcher stopping")
|
|
|
d.cancel()
|
|
|
d.mu.Unlock()
|
|
|
|
|
|
- // Wait for the RPCs that are in-progress to finish.
|
|
|
- d.shutdownWait.Wait()
|
|
|
-
|
|
|
+ // The active nodes list can be cleaned out only when all
|
|
|
+ // existing RPCs have finished.
|
|
|
+ // RPCs that start after rpcRW.Unlock() should find the context
|
|
|
+ // cancelled and should fail organically.
|
|
|
+ d.rpcRW.Lock()
|
|
|
d.nodes.Clean()
|
|
|
+ d.rpcRW.Unlock()
|
|
|
|
|
|
d.processUpdatesLock.Lock()
|
|
|
// In case there are any waiters. There is no chance of any starting
|
|
@@ -338,6 +343,14 @@ func (d *Dispatcher) Stop() error {
|
|
|
d.processUpdatesLock.Unlock()
|
|
|
|
|
|
d.clusterUpdateQueue.Close()
|
|
|
+
|
|
|
+ // TODO(anshul): This use of Wait() could be unsafe.
|
|
|
+ // According to go's documentation on WaitGroup,
|
|
|
+ // Add() with a positive delta that occur when the counter is zero
|
|
|
+ // must happen before a Wait().
|
|
|
+ // As is, dispatcher Stop() can race with Run().
|
|
|
+ d.wg.Wait()
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -485,13 +498,13 @@ func nodeIPFromContext(ctx context.Context) (string, error) {
|
|
|
|
|
|
// register is used for registration of node with particular dispatcher.
|
|
|
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
|
|
|
+ logLocal := log.G(ctx).WithField("method", "(*Dispatcher).register")
|
|
|
+ // prevent register until we're ready to accept it
|
|
|
dctx, err := d.isRunningLocked()
|
|
|
if err != nil {
|
|
|
return "", err
|
|
|
}
|
|
|
|
|
|
- logLocal := log.G(ctx).WithField("method", "(*Dispatcher).register")
|
|
|
-
|
|
|
if err := d.nodes.CheckRateLimit(nodeID); err != nil {
|
|
|
return "", err
|
|
|
}
|
|
@@ -539,15 +552,8 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
|
|
|
// UpdateTaskStatus updates status of task. Node should send such updates
|
|
|
// on every status change of its tasks.
|
|
|
func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStatusRequest) (*api.UpdateTaskStatusResponse, error) {
|
|
|
- // shutdownWait.Add() followed by isRunning() to ensures that
|
|
|
- // if this rpc sees the dispatcher running,
|
|
|
- // it will already have called Add() on the shutdownWait wait,
|
|
|
- // which ensures that Stop() will wait for this rpc to complete.
|
|
|
- // Note that Stop() first does Dispatcher.ctx.cancel() followed by
|
|
|
- // shutdownWait.Wait() to make sure new rpc's don't start before waiting
|
|
|
- // for existing ones to finish.
|
|
|
- d.shutdownWait.Add(1)
|
|
|
- defer d.shutdownWait.Done()
|
|
|
+ d.rpcRW.RLock()
|
|
|
+ defer d.rpcRW.RUnlock()
|
|
|
|
|
|
dctx, err := d.isRunningLocked()
|
|
|
if err != nil {
|
|
@@ -740,15 +746,8 @@ func (d *Dispatcher) processUpdates(ctx context.Context) {
|
|
|
// of tasks which should be run on node, if task is not present in that list,
|
|
|
// it should be terminated.
|
|
|
func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServer) error {
|
|
|
- // shutdownWait.Add() followed by isRunning() to ensures that
|
|
|
- // if this rpc sees the dispatcher running,
|
|
|
- // it will already have called Add() on the shutdownWait wait,
|
|
|
- // which ensures that Stop() will wait for this rpc to complete.
|
|
|
- // Note that Stop() first does Dispatcher.ctx.cancel() followed by
|
|
|
- // shutdownWait.Wait() to make sure new rpc's don't start before waiting
|
|
|
- // for existing ones to finish.
|
|
|
- d.shutdownWait.Add(1)
|
|
|
- defer d.shutdownWait.Done()
|
|
|
+ d.rpcRW.RLock()
|
|
|
+ defer d.rpcRW.RUnlock()
|
|
|
|
|
|
dctx, err := d.isRunningLocked()
|
|
|
if err != nil {
|
|
@@ -873,15 +872,8 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe
|
|
|
// Assignments is a stream of assignments for a node. Each message contains
|
|
|
// either full list of tasks and secrets for the node, or an incremental update.
|
|
|
func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatcher_AssignmentsServer) error {
|
|
|
- // shutdownWait.Add() followed by isRunning() to ensures that
|
|
|
- // if this rpc sees the dispatcher running,
|
|
|
- // it will already have called Add() on the shutdownWait wait,
|
|
|
- // which ensures that Stop() will wait for this rpc to complete.
|
|
|
- // Note that Stop() first does Dispatcher.ctx.cancel() followed by
|
|
|
- // shutdownWait.Wait() to make sure new rpc's don't start before waiting
|
|
|
- // for existing ones to finish.
|
|
|
- d.shutdownWait.Add(1)
|
|
|
- defer d.shutdownWait.Done()
|
|
|
+ d.rpcRW.RLock()
|
|
|
+ defer d.rpcRW.RUnlock()
|
|
|
|
|
|
dctx, err := d.isRunningLocked()
|
|
|
if err != nil {
|
|
@@ -1140,20 +1132,13 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
|
|
|
// Node should send new heartbeat earlier than now + TTL, otherwise it will
|
|
|
// be deregistered from dispatcher and its status will be updated to NodeStatus_DOWN
|
|
|
func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*api.HeartbeatResponse, error) {
|
|
|
- // shutdownWait.Add() followed by isRunning() to ensures that
|
|
|
- // if this rpc sees the dispatcher running,
|
|
|
- // it will already have called Add() on the shutdownWait wait,
|
|
|
- // which ensures that Stop() will wait for this rpc to complete.
|
|
|
- // Note that Stop() first does Dispatcher.ctx.cancel() followed by
|
|
|
- // shutdownWait.Wait() to make sure new rpc's don't start before waiting
|
|
|
- // for existing ones to finish.
|
|
|
- d.shutdownWait.Add(1)
|
|
|
- defer d.shutdownWait.Done()
|
|
|
-
|
|
|
- // isRunningLocked() is not needed since its OK if
|
|
|
- // the dispatcher context is cancelled while this call is in progress
|
|
|
- // since Stop() which cancels the dispatcher context will wait for
|
|
|
- // Heartbeat() to complete.
|
|
|
+ d.rpcRW.RLock()
|
|
|
+ defer d.rpcRW.RUnlock()
|
|
|
+
|
|
|
+ // Its OK to call isRunning() here instead of isRunningLocked()
|
|
|
+ // because of the rpcRW readlock above.
|
|
|
+ // TODO(anshul) other uses of isRunningLocked() can probably
|
|
|
+ // also be removed.
|
|
|
if !d.isRunning() {
|
|
|
return nil, status.Errorf(codes.Aborted, "dispatcher is stopped")
|
|
|
}
|
|
@@ -1192,15 +1177,8 @@ func (d *Dispatcher) getRootCACert() []byte {
|
|
|
// a special boolean field Disconnect which if true indicates that node should
|
|
|
// reconnect to another Manager immediately.
|
|
|
func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
|
|
|
- // shutdownWait.Add() followed by isRunning() to ensures that
|
|
|
- // if this rpc sees the dispatcher running,
|
|
|
- // it will already have called Add() on the shutdownWait wait,
|
|
|
- // which ensures that Stop() will wait for this rpc to complete.
|
|
|
- // Note that Stop() first does Dispatcher.ctx.cancel() followed by
|
|
|
- // shutdownWait.Wait() to make sure new rpc's don't start before waiting
|
|
|
- // for existing ones to finish.
|
|
|
- d.shutdownWait.Add(1)
|
|
|
- defer d.shutdownWait.Done()
|
|
|
+ d.rpcRW.RLock()
|
|
|
+ defer d.rpcRW.RUnlock()
|
|
|
|
|
|
dctx, err := d.isRunningLocked()
|
|
|
if err != nil {
|
|
@@ -1208,6 +1186,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
|
|
|
}
|
|
|
|
|
|
ctx := stream.Context()
|
|
|
+
|
|
|
nodeInfo, err := ca.RemoteNode(ctx)
|
|
|
if err != nil {
|
|
|
return err
|