123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579 |
- package agent
- import (
- "bytes"
- "fmt"
- "math/rand"
- "reflect"
- "sync"
- "time"
- "github.com/docker/swarmkit/agent/exec"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/log"
- "golang.org/x/net/context"
- )
- const (
- initialSessionFailureBackoff = 100 * time.Millisecond
- maxSessionFailureBackoff = 8 * time.Second
- nodeUpdatePeriod = 20 * time.Second
- )
- // Agent implements the primary node functionality for a member of a swarm
- // cluster. The primary functionality is to run and report on the status of
- // tasks assigned to the node.
- type Agent struct {
- config *Config
- // The latest node object state from manager
- // for this node known to the agent.
- node *api.Node
- keys []*api.EncryptionKey
- sessionq chan sessionOperation
- worker Worker
- started chan struct{}
- startOnce sync.Once // start only once
- ready chan struct{}
- leaving chan struct{}
- leaveOnce sync.Once
- left chan struct{} // closed after "run" processes "leaving" and will no longer accept new assignments
- stopped chan struct{} // requests shutdown
- stopOnce sync.Once // only allow stop to be called once
- closed chan struct{} // only closed in run
- err error // read only after closed is closed
- nodeUpdatePeriod time.Duration
- }
- // New returns a new agent, ready for task dispatch.
- func New(config *Config) (*Agent, error) {
- if err := config.validate(); err != nil {
- return nil, err
- }
- a := &Agent{
- config: config,
- sessionq: make(chan sessionOperation),
- started: make(chan struct{}),
- leaving: make(chan struct{}),
- left: make(chan struct{}),
- stopped: make(chan struct{}),
- closed: make(chan struct{}),
- ready: make(chan struct{}),
- nodeUpdatePeriod: nodeUpdatePeriod,
- }
- a.worker = newWorker(config.DB, config.Executor, a)
- return a, nil
- }
- // Start begins execution of the agent in the provided context, if not already
- // started.
- //
- // Start returns an error if the agent has already started.
- func (a *Agent) Start(ctx context.Context) error {
- err := errAgentStarted
- a.startOnce.Do(func() {
- close(a.started)
- go a.run(ctx)
- err = nil // clear error above, only once.
- })
- return err
- }
- // Leave instructs the agent to leave the cluster. This method will shutdown
- // assignment processing and remove all assignments from the node.
- // Leave blocks until worker has finished closing all task managers or agent
- // is closed.
- func (a *Agent) Leave(ctx context.Context) error {
- select {
- case <-a.started:
- default:
- return errAgentNotStarted
- }
- a.leaveOnce.Do(func() {
- close(a.leaving)
- })
- // Do not call Wait until we have confirmed that the agent is no longer
- // accepting assignments. Starting a worker might race with Wait.
- select {
- case <-a.left:
- case <-a.closed:
- return ErrClosed
- case <-ctx.Done():
- return ctx.Err()
- }
- // agent could be closed while Leave is in progress
- var err error
- ch := make(chan struct{})
- go func() {
- err = a.worker.Wait(ctx)
- close(ch)
- }()
- select {
- case <-ch:
- return err
- case <-a.closed:
- return ErrClosed
- }
- }
- // Stop shuts down the agent, blocking until full shutdown. If the agent is not
- // started, Stop will block until the agent has fully shutdown.
- func (a *Agent) Stop(ctx context.Context) error {
- select {
- case <-a.started:
- default:
- return errAgentNotStarted
- }
- a.stop()
- // wait till closed or context cancelled
- select {
- case <-a.closed:
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- // stop signals the agent shutdown process, returning true if this call was the
- // first to actually shutdown the agent.
- func (a *Agent) stop() bool {
- var stopped bool
- a.stopOnce.Do(func() {
- close(a.stopped)
- stopped = true
- })
- return stopped
- }
- // Err returns the error that caused the agent to shutdown or nil. Err blocks
- // until the agent is fully shutdown.
- func (a *Agent) Err(ctx context.Context) error {
- select {
- case <-a.closed:
- return a.err
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- // Ready returns a channel that will be closed when agent first becomes ready.
- func (a *Agent) Ready() <-chan struct{} {
- return a.ready
- }
- func (a *Agent) run(ctx context.Context) {
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
- defer close(a.closed) // full shutdown.
- ctx = log.WithModule(ctx, "agent")
- log.G(ctx).Debug("(*Agent).run")
- defer log.G(ctx).Debug("(*Agent).run exited")
- nodeTLSInfo := a.config.NodeTLSInfo
- // get the node description
- nodeDescription, err := a.nodeDescriptionWithHostname(ctx, nodeTLSInfo)
- if err != nil {
- log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: node description unavailable")
- }
- // nodeUpdateTicker is used to periodically check for updates to node description
- nodeUpdateTicker := time.NewTicker(a.nodeUpdatePeriod)
- defer nodeUpdateTicker.Stop()
- var (
- backoff time.Duration
- session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session
- registered = session.registered
- ready = a.ready // first session ready
- sessionq chan sessionOperation
- leaving = a.leaving
- subscriptions = map[string]context.CancelFunc{}
- )
- if err := a.worker.Init(ctx); err != nil {
- log.G(ctx).WithError(err).Error("worker initialization failed")
- a.err = err
- return // fatal?
- }
- defer a.worker.Close()
- // setup a reliable reporter to call back to us.
- reporter := newStatusReporter(ctx, a)
- defer reporter.Close()
- a.worker.Listen(ctx, reporter)
- updateNode := func() {
- // skip updating if the registration isn't finished
- if registered != nil {
- return
- }
- // get the current node description
- newNodeDescription, err := a.nodeDescriptionWithHostname(ctx, nodeTLSInfo)
- if err != nil {
- log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: updated node description unavailable")
- }
- // if newNodeDescription is nil, it will cause a panic when
- // trying to create a session. Typically this can happen
- // if the engine goes down
- if newNodeDescription == nil {
- return
- }
- // if the node description has changed, update it to the new one
- // and close the session. The old session will be stopped and a
- // new one will be created with the updated description
- if !reflect.DeepEqual(nodeDescription, newNodeDescription) {
- nodeDescription = newNodeDescription
- // close the session
- log.G(ctx).Info("agent: found node update")
- if err := session.close(); err != nil {
- log.G(ctx).WithError(err).Error("agent: closing session failed")
- }
- sessionq = nil
- registered = nil
- }
- }
- for {
- select {
- case operation := <-sessionq:
- operation.response <- operation.fn(session)
- case <-leaving:
- leaving = nil
- // TODO(stevvooe): Signal to the manager that the node is leaving.
- // when leaving we remove all assignments.
- if err := a.worker.Assign(ctx, nil); err != nil {
- log.G(ctx).WithError(err).Error("failed removing all assignments")
- }
- close(a.left)
- case msg := <-session.assignments:
- // if we have left, accept no more assignments
- if leaving == nil {
- continue
- }
- switch msg.Type {
- case api.AssignmentsMessage_COMPLETE:
- // Need to assign secrets and configs before tasks,
- // because tasks might depend on new secrets or configs
- if err := a.worker.Assign(ctx, msg.Changes); err != nil {
- log.G(ctx).WithError(err).Error("failed to synchronize worker assignments")
- }
- case api.AssignmentsMessage_INCREMENTAL:
- if err := a.worker.Update(ctx, msg.Changes); err != nil {
- log.G(ctx).WithError(err).Error("failed to update worker assignments")
- }
- }
- case msg := <-session.messages:
- if err := a.handleSessionMessage(ctx, msg, nodeTLSInfo); err != nil {
- log.G(ctx).WithError(err).Error("session message handler failed")
- }
- case sub := <-session.subscriptions:
- if sub.Close {
- if cancel, ok := subscriptions[sub.ID]; ok {
- cancel()
- }
- delete(subscriptions, sub.ID)
- continue
- }
- if _, ok := subscriptions[sub.ID]; ok {
- // Duplicate subscription
- continue
- }
- subCtx, subCancel := context.WithCancel(ctx)
- subscriptions[sub.ID] = subCancel
- // TODO(dperny) we're tossing the error here, that seems wrong
- go a.worker.Subscribe(subCtx, sub)
- case <-registered:
- log.G(ctx).Debugln("agent: registered")
- if ready != nil {
- close(ready)
- }
- ready = nil
- registered = nil // we only care about this once per session
- backoff = 0 // reset backoff
- sessionq = a.sessionq
- case err := <-session.errs:
- // TODO(stevvooe): This may actually block if a session is closed
- // but no error was sent. This must be the only place
- // session.close is called in response to errors, for this to work.
- if err != nil {
- log.G(ctx).WithError(err).Error("agent: session failed")
- backoff = initialSessionFailureBackoff + 2*backoff
- if backoff > maxSessionFailureBackoff {
- backoff = maxSessionFailureBackoff
- }
- }
- if err := session.close(); err != nil {
- log.G(ctx).WithError(err).Error("agent: closing session failed")
- }
- sessionq = nil
- // if we're here before <-registered, do nothing for that event
- registered = nil
- case <-session.closed:
- log.G(ctx).Debugf("agent: rebuild session")
- // select a session registration delay from backoff range.
- delay := time.Duration(0)
- if backoff > 0 {
- delay = time.Duration(rand.Int63n(int64(backoff)))
- }
- session = newSession(ctx, a, delay, session.sessionID, nodeDescription)
- registered = session.registered
- case ev := <-a.config.NotifyTLSChange:
- // the TLS info has changed, so force a check to see if we need to restart the session
- if tlsInfo, ok := ev.(*api.NodeTLSInfo); ok {
- nodeTLSInfo = tlsInfo
- updateNode()
- nodeUpdateTicker.Stop()
- nodeUpdateTicker = time.NewTicker(a.nodeUpdatePeriod)
- }
- case <-nodeUpdateTicker.C:
- // periodically check to see whether the node information has changed, and if so, restart the session
- updateNode()
- case <-a.stopped:
- // TODO(stevvooe): Wait on shutdown and cleanup. May need to pump
- // this loop a few times.
- return
- case <-ctx.Done():
- if a.err == nil {
- a.err = ctx.Err()
- }
- session.close()
- return
- }
- }
- }
- func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMessage, nti *api.NodeTLSInfo) error {
- seen := map[api.Peer]struct{}{}
- for _, manager := range message.Managers {
- if manager.Peer.Addr == "" {
- continue
- }
- a.config.ConnBroker.Remotes().Observe(*manager.Peer, int(manager.Weight))
- seen[*manager.Peer] = struct{}{}
- }
- var changes *NodeChanges
- if message.Node != nil && (a.node == nil || !nodesEqual(a.node, message.Node)) {
- if a.config.NotifyNodeChange != nil {
- changes = &NodeChanges{Node: message.Node.Copy()}
- }
- a.node = message.Node.Copy()
- if err := a.config.Executor.Configure(ctx, a.node); err != nil {
- log.G(ctx).WithError(err).Error("node configure failed")
- }
- }
- if len(message.RootCA) > 0 && !bytes.Equal(message.RootCA, nti.TrustRoot) {
- if changes == nil {
- changes = &NodeChanges{RootCert: message.RootCA}
- } else {
- changes.RootCert = message.RootCA
- }
- }
- if changes != nil {
- a.config.NotifyNodeChange <- changes
- }
- // prune managers not in list.
- for peer := range a.config.ConnBroker.Remotes().Weights() {
- if _, ok := seen[peer]; !ok {
- a.config.ConnBroker.Remotes().Remove(peer)
- }
- }
- if message.NetworkBootstrapKeys == nil {
- return nil
- }
- for _, key := range message.NetworkBootstrapKeys {
- same := false
- for _, agentKey := range a.keys {
- if agentKey.LamportTime == key.LamportTime {
- same = true
- }
- }
- if !same {
- a.keys = message.NetworkBootstrapKeys
- if err := a.config.Executor.SetNetworkBootstrapKeys(a.keys); err != nil {
- panic(fmt.Errorf("configuring network key failed"))
- }
- }
- }
- return nil
- }
- type sessionOperation struct {
- fn func(session *session) error
- response chan error
- }
- // withSession runs fn with the current session.
- func (a *Agent) withSession(ctx context.Context, fn func(session *session) error) error {
- response := make(chan error, 1)
- select {
- case a.sessionq <- sessionOperation{
- fn: fn,
- response: response,
- }:
- select {
- case err := <-response:
- return err
- case <-a.closed:
- return ErrClosed
- case <-ctx.Done():
- return ctx.Err()
- }
- case <-a.closed:
- return ErrClosed
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- // UpdateTaskStatus attempts to send a task status update over the current session,
- // blocking until the operation is completed.
- //
- // If an error is returned, the operation should be retried.
- func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
- log.G(ctx).WithField("task.id", taskID).Debug("(*Agent).UpdateTaskStatus")
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
- errs := make(chan error, 1)
- if err := a.withSession(ctx, func(session *session) error {
- go func() {
- err := session.sendTaskStatus(ctx, taskID, status)
- if err != nil {
- if err == errTaskUnknown {
- err = nil // dispatcher no longer cares about this task.
- } else {
- log.G(ctx).WithError(err).Error("closing session after fatal error")
- session.sendError(err)
- }
- } else {
- log.G(ctx).Debug("task status reported")
- }
- errs <- err
- }()
- return nil
- }); err != nil {
- return err
- }
- select {
- case err := <-errs:
- return err
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- // Publisher returns a LogPublisher for the given subscription
- // as well as a cancel function that should be called when the log stream
- // is completed.
- func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, func(), error) {
- // TODO(stevvooe): The level of coordination here is WAY too much for logs.
- // These should only be best effort and really just buffer until a session is
- // ready. Ideally, they would use a separate connection completely.
- var (
- err error
- publisher api.LogBroker_PublishLogsClient
- )
- err = a.withSession(ctx, func(session *session) error {
- publisher, err = api.NewLogBrokerClient(session.conn.ClientConn).PublishLogs(ctx)
- return err
- })
- if err != nil {
- return nil, nil, err
- }
- // make little closure for ending the log stream
- sendCloseMsg := func() {
- // send a close message, to tell the manager our logs are done
- publisher.Send(&api.PublishLogsMessage{
- SubscriptionID: subscriptionID,
- Close: true,
- })
- // close the stream forreal
- publisher.CloseSend()
- }
- return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
- select {
- case <-ctx.Done():
- sendCloseMsg()
- return ctx.Err()
- default:
- }
- return publisher.Send(&api.PublishLogsMessage{
- SubscriptionID: subscriptionID,
- Messages: []api.LogMessage{message},
- })
- }), func() {
- sendCloseMsg()
- }, nil
- }
- // nodeDescriptionWithHostname retrieves node description, and overrides hostname if available
- func (a *Agent) nodeDescriptionWithHostname(ctx context.Context, tlsInfo *api.NodeTLSInfo) (*api.NodeDescription, error) {
- desc, err := a.config.Executor.Describe(ctx)
- // Override hostname and TLS info
- if desc != nil {
- if a.config.Hostname != "" && desc != nil {
- desc.Hostname = a.config.Hostname
- }
- desc.TLSInfo = tlsInfo
- }
- return desc, err
- }
- // nodesEqual returns true if the node states are functionally equal, ignoring status,
- // version and other superfluous fields.
- //
- // This used to decide whether or not to propagate a node update to executor.
- func nodesEqual(a, b *api.Node) bool {
- a, b = a.Copy(), b.Copy()
- a.Status, b.Status = api.NodeStatus{}, api.NodeStatus{}
- a.Meta, b.Meta = api.Meta{}, api.Meta{}
- return reflect.DeepEqual(a, b)
- }
|