package agent import ( "bytes" "fmt" "math/rand" "reflect" "sync" "time" "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" "golang.org/x/net/context" ) const ( initialSessionFailureBackoff = 100 * time.Millisecond maxSessionFailureBackoff = 8 * time.Second nodeUpdatePeriod = 20 * time.Second ) // Agent implements the primary node functionality for a member of a swarm // cluster. The primary functionality is to run and report on the status of // tasks assigned to the node. type Agent struct { config *Config // The latest node object state from manager // for this node known to the agent. node *api.Node keys []*api.EncryptionKey sessionq chan sessionOperation worker Worker started chan struct{} startOnce sync.Once // start only once ready chan struct{} leaving chan struct{} leaveOnce sync.Once left chan struct{} // closed after "run" processes "leaving" and will no longer accept new assignments stopped chan struct{} // requests shutdown stopOnce sync.Once // only allow stop to be called once closed chan struct{} // only closed in run err error // read only after closed is closed nodeUpdatePeriod time.Duration } // New returns a new agent, ready for task dispatch. func New(config *Config) (*Agent, error) { if err := config.validate(); err != nil { return nil, err } a := &Agent{ config: config, sessionq: make(chan sessionOperation), started: make(chan struct{}), leaving: make(chan struct{}), left: make(chan struct{}), stopped: make(chan struct{}), closed: make(chan struct{}), ready: make(chan struct{}), nodeUpdatePeriod: nodeUpdatePeriod, } a.worker = newWorker(config.DB, config.Executor, a) return a, nil } // Start begins execution of the agent in the provided context, if not already // started. // // Start returns an error if the agent has already started. func (a *Agent) Start(ctx context.Context) error { err := errAgentStarted a.startOnce.Do(func() { close(a.started) go a.run(ctx) err = nil // clear error above, only once. }) return err } // Leave instructs the agent to leave the cluster. This method will shutdown // assignment processing and remove all assignments from the node. // Leave blocks until worker has finished closing all task managers or agent // is closed. func (a *Agent) Leave(ctx context.Context) error { select { case <-a.started: default: return errAgentNotStarted } a.leaveOnce.Do(func() { close(a.leaving) }) // Do not call Wait until we have confirmed that the agent is no longer // accepting assignments. Starting a worker might race with Wait. select { case <-a.left: case <-a.closed: return ErrClosed case <-ctx.Done(): return ctx.Err() } // agent could be closed while Leave is in progress var err error ch := make(chan struct{}) go func() { err = a.worker.Wait(ctx) close(ch) }() select { case <-ch: return err case <-a.closed: return ErrClosed } } // Stop shuts down the agent, blocking until full shutdown. If the agent is not // started, Stop will block until the agent has fully shutdown. func (a *Agent) Stop(ctx context.Context) error { select { case <-a.started: default: return errAgentNotStarted } a.stop() // wait till closed or context cancelled select { case <-a.closed: return nil case <-ctx.Done(): return ctx.Err() } } // stop signals the agent shutdown process, returning true if this call was the // first to actually shutdown the agent. func (a *Agent) stop() bool { var stopped bool a.stopOnce.Do(func() { close(a.stopped) stopped = true }) return stopped } // Err returns the error that caused the agent to shutdown or nil. Err blocks // until the agent is fully shutdown. func (a *Agent) Err(ctx context.Context) error { select { case <-a.closed: return a.err case <-ctx.Done(): return ctx.Err() } } // Ready returns a channel that will be closed when agent first becomes ready. func (a *Agent) Ready() <-chan struct{} { return a.ready } func (a *Agent) run(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() defer close(a.closed) // full shutdown. ctx = log.WithModule(ctx, "agent") log.G(ctx).Debug("(*Agent).run") defer log.G(ctx).Debug("(*Agent).run exited") nodeTLSInfo := a.config.NodeTLSInfo // get the node description nodeDescription, err := a.nodeDescriptionWithHostname(ctx, nodeTLSInfo) if err != nil { log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: node description unavailable") } // nodeUpdateTicker is used to periodically check for updates to node description nodeUpdateTicker := time.NewTicker(a.nodeUpdatePeriod) defer nodeUpdateTicker.Stop() var ( backoff time.Duration session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session registered = session.registered ready = a.ready // first session ready sessionq chan sessionOperation leaving = a.leaving subscriptions = map[string]context.CancelFunc{} ) if err := a.worker.Init(ctx); err != nil { log.G(ctx).WithError(err).Error("worker initialization failed") a.err = err return // fatal? } defer a.worker.Close() // setup a reliable reporter to call back to us. reporter := newStatusReporter(ctx, a) defer reporter.Close() a.worker.Listen(ctx, reporter) updateNode := func() { // skip updating if the registration isn't finished if registered != nil { return } // get the current node description newNodeDescription, err := a.nodeDescriptionWithHostname(ctx, nodeTLSInfo) if err != nil { log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: updated node description unavailable") } // if newNodeDescription is nil, it will cause a panic when // trying to create a session. Typically this can happen // if the engine goes down if newNodeDescription == nil { return } // if the node description has changed, update it to the new one // and close the session. The old session will be stopped and a // new one will be created with the updated description if !reflect.DeepEqual(nodeDescription, newNodeDescription) { nodeDescription = newNodeDescription // close the session log.G(ctx).Info("agent: found node update") if err := session.close(); err != nil { log.G(ctx).WithError(err).Error("agent: closing session failed") } sessionq = nil registered = nil } } for { select { case operation := <-sessionq: operation.response <- operation.fn(session) case <-leaving: leaving = nil // TODO(stevvooe): Signal to the manager that the node is leaving. // when leaving we remove all assignments. if err := a.worker.Assign(ctx, nil); err != nil { log.G(ctx).WithError(err).Error("failed removing all assignments") } close(a.left) case msg := <-session.assignments: // if we have left, accept no more assignments if leaving == nil { continue } switch msg.Type { case api.AssignmentsMessage_COMPLETE: // Need to assign secrets and configs before tasks, // because tasks might depend on new secrets or configs if err := a.worker.Assign(ctx, msg.Changes); err != nil { log.G(ctx).WithError(err).Error("failed to synchronize worker assignments") } case api.AssignmentsMessage_INCREMENTAL: if err := a.worker.Update(ctx, msg.Changes); err != nil { log.G(ctx).WithError(err).Error("failed to update worker assignments") } } case msg := <-session.messages: if err := a.handleSessionMessage(ctx, msg, nodeTLSInfo); err != nil { log.G(ctx).WithError(err).Error("session message handler failed") } case sub := <-session.subscriptions: if sub.Close { if cancel, ok := subscriptions[sub.ID]; ok { cancel() } delete(subscriptions, sub.ID) continue } if _, ok := subscriptions[sub.ID]; ok { // Duplicate subscription continue } subCtx, subCancel := context.WithCancel(ctx) subscriptions[sub.ID] = subCancel // TODO(dperny) we're tossing the error here, that seems wrong go a.worker.Subscribe(subCtx, sub) case <-registered: log.G(ctx).Debugln("agent: registered") if ready != nil { close(ready) } ready = nil registered = nil // we only care about this once per session backoff = 0 // reset backoff sessionq = a.sessionq case err := <-session.errs: // TODO(stevvooe): This may actually block if a session is closed // but no error was sent. This must be the only place // session.close is called in response to errors, for this to work. if err != nil { log.G(ctx).WithError(err).Error("agent: session failed") backoff = initialSessionFailureBackoff + 2*backoff if backoff > maxSessionFailureBackoff { backoff = maxSessionFailureBackoff } } if err := session.close(); err != nil { log.G(ctx).WithError(err).Error("agent: closing session failed") } sessionq = nil // if we're here before <-registered, do nothing for that event registered = nil case <-session.closed: log.G(ctx).Debugf("agent: rebuild session") // select a session registration delay from backoff range. delay := time.Duration(0) if backoff > 0 { delay = time.Duration(rand.Int63n(int64(backoff))) } session = newSession(ctx, a, delay, session.sessionID, nodeDescription) registered = session.registered case ev := <-a.config.NotifyTLSChange: // the TLS info has changed, so force a check to see if we need to restart the session if tlsInfo, ok := ev.(*api.NodeTLSInfo); ok { nodeTLSInfo = tlsInfo updateNode() nodeUpdateTicker.Stop() nodeUpdateTicker = time.NewTicker(a.nodeUpdatePeriod) } case <-nodeUpdateTicker.C: // periodically check to see whether the node information has changed, and if so, restart the session updateNode() case <-a.stopped: // TODO(stevvooe): Wait on shutdown and cleanup. May need to pump // this loop a few times. return case <-ctx.Done(): if a.err == nil { a.err = ctx.Err() } session.close() return } } } func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMessage, nti *api.NodeTLSInfo) error { seen := map[api.Peer]struct{}{} for _, manager := range message.Managers { if manager.Peer.Addr == "" { continue } a.config.ConnBroker.Remotes().Observe(*manager.Peer, int(manager.Weight)) seen[*manager.Peer] = struct{}{} } var changes *NodeChanges if message.Node != nil && (a.node == nil || !nodesEqual(a.node, message.Node)) { if a.config.NotifyNodeChange != nil { changes = &NodeChanges{Node: message.Node.Copy()} } a.node = message.Node.Copy() if err := a.config.Executor.Configure(ctx, a.node); err != nil { log.G(ctx).WithError(err).Error("node configure failed") } } if len(message.RootCA) > 0 && !bytes.Equal(message.RootCA, nti.TrustRoot) { if changes == nil { changes = &NodeChanges{RootCert: message.RootCA} } else { changes.RootCert = message.RootCA } } if changes != nil { a.config.NotifyNodeChange <- changes } // prune managers not in list. for peer := range a.config.ConnBroker.Remotes().Weights() { if _, ok := seen[peer]; !ok { a.config.ConnBroker.Remotes().Remove(peer) } } if message.NetworkBootstrapKeys == nil { return nil } for _, key := range message.NetworkBootstrapKeys { same := false for _, agentKey := range a.keys { if agentKey.LamportTime == key.LamportTime { same = true } } if !same { a.keys = message.NetworkBootstrapKeys if err := a.config.Executor.SetNetworkBootstrapKeys(a.keys); err != nil { panic(fmt.Errorf("configuring network key failed")) } } } return nil } type sessionOperation struct { fn func(session *session) error response chan error } // withSession runs fn with the current session. func (a *Agent) withSession(ctx context.Context, fn func(session *session) error) error { response := make(chan error, 1) select { case a.sessionq <- sessionOperation{ fn: fn, response: response, }: select { case err := <-response: return err case <-a.closed: return ErrClosed case <-ctx.Done(): return ctx.Err() } case <-a.closed: return ErrClosed case <-ctx.Done(): return ctx.Err() } } // UpdateTaskStatus attempts to send a task status update over the current session, // blocking until the operation is completed. // // If an error is returned, the operation should be retried. func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error { log.G(ctx).WithField("task.id", taskID).Debug("(*Agent).UpdateTaskStatus") ctx, cancel := context.WithCancel(ctx) defer cancel() errs := make(chan error, 1) if err := a.withSession(ctx, func(session *session) error { go func() { err := session.sendTaskStatus(ctx, taskID, status) if err != nil { if err == errTaskUnknown { err = nil // dispatcher no longer cares about this task. } else { log.G(ctx).WithError(err).Error("closing session after fatal error") session.sendError(err) } } else { log.G(ctx).Debug("task status reported") } errs <- err }() return nil }); err != nil { return err } select { case err := <-errs: return err case <-ctx.Done(): return ctx.Err() } } // Publisher returns a LogPublisher for the given subscription // as well as a cancel function that should be called when the log stream // is completed. func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, func(), error) { // TODO(stevvooe): The level of coordination here is WAY too much for logs. // These should only be best effort and really just buffer until a session is // ready. Ideally, they would use a separate connection completely. var ( err error publisher api.LogBroker_PublishLogsClient ) err = a.withSession(ctx, func(session *session) error { publisher, err = api.NewLogBrokerClient(session.conn.ClientConn).PublishLogs(ctx) return err }) if err != nil { return nil, nil, err } // make little closure for ending the log stream sendCloseMsg := func() { // send a close message, to tell the manager our logs are done publisher.Send(&api.PublishLogsMessage{ SubscriptionID: subscriptionID, Close: true, }) // close the stream forreal publisher.CloseSend() } return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error { select { case <-ctx.Done(): sendCloseMsg() return ctx.Err() default: } return publisher.Send(&api.PublishLogsMessage{ SubscriptionID: subscriptionID, Messages: []api.LogMessage{message}, }) }), func() { sendCloseMsg() }, nil } // nodeDescriptionWithHostname retrieves node description, and overrides hostname if available func (a *Agent) nodeDescriptionWithHostname(ctx context.Context, tlsInfo *api.NodeTLSInfo) (*api.NodeDescription, error) { desc, err := a.config.Executor.Describe(ctx) // Override hostname and TLS info if desc != nil { if a.config.Hostname != "" && desc != nil { desc.Hostname = a.config.Hostname } desc.TLSInfo = tlsInfo } return desc, err } // nodesEqual returns true if the node states are functionally equal, ignoring status, // version and other superfluous fields. // // This used to decide whether or not to propagate a node update to executor. func nodesEqual(a, b *api.Node) bool { a, b = a.Copy(), b.Copy() a.Status, b.Status = api.NodeStatus{}, api.NodeStatus{} a.Meta, b.Meta = api.Meta{}, api.Meta{} return reflect.DeepEqual(a, b) }