123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- package manager
- import (
- "time"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/log"
- "github.com/docker/swarmkit/manager/state/raft"
- "github.com/docker/swarmkit/manager/state/store"
- "golang.org/x/net/context"
- )
- const roleReconcileInterval = 5 * time.Second
- // roleManager reconciles the raft member list with desired role changes.
- type roleManager struct {
- ctx context.Context
- cancel func()
- store *store.MemoryStore
- raft *raft.Node
- doneChan chan struct{}
- // pending contains changed nodes that have not yet been reconciled in
- // the raft member list.
- pending map[string]*api.Node
- }
- // newRoleManager creates a new roleManager.
- func newRoleManager(store *store.MemoryStore, raftNode *raft.Node) *roleManager {
- ctx, cancel := context.WithCancel(context.Background())
- return &roleManager{
- ctx: ctx,
- cancel: cancel,
- store: store,
- raft: raftNode,
- doneChan: make(chan struct{}),
- pending: make(map[string]*api.Node),
- }
- }
- // Run is roleManager's main loop.
- // ctx is only used for logging.
- func (rm *roleManager) Run(ctx context.Context) {
- defer close(rm.doneChan)
- var (
- nodes []*api.Node
- ticker *time.Ticker
- tickerCh <-chan time.Time
- )
- watcher, cancelWatch, err := store.ViewAndWatch(rm.store,
- func(readTx store.ReadTx) error {
- var err error
- nodes, err = store.FindNodes(readTx, store.All)
- return err
- },
- api.EventUpdateNode{})
- defer cancelWatch()
- if err != nil {
- log.G(ctx).WithError(err).Error("failed to check nodes for role changes")
- } else {
- for _, node := range nodes {
- rm.pending[node.ID] = node
- rm.reconcileRole(ctx, node)
- }
- if len(rm.pending) != 0 {
- ticker = time.NewTicker(roleReconcileInterval)
- tickerCh = ticker.C
- }
- }
- for {
- select {
- case event := <-watcher:
- node := event.(api.EventUpdateNode).Node
- rm.pending[node.ID] = node
- rm.reconcileRole(ctx, node)
- if len(rm.pending) != 0 && ticker == nil {
- ticker = time.NewTicker(roleReconcileInterval)
- tickerCh = ticker.C
- }
- case <-tickerCh:
- for _, node := range rm.pending {
- rm.reconcileRole(ctx, node)
- }
- if len(rm.pending) == 0 {
- ticker.Stop()
- ticker = nil
- tickerCh = nil
- }
- case <-rm.ctx.Done():
- if ticker != nil {
- ticker.Stop()
- }
- return
- }
- }
- }
- func (rm *roleManager) reconcileRole(ctx context.Context, node *api.Node) {
- if node.Role == node.Spec.DesiredRole {
- // Nothing to do.
- delete(rm.pending, node.ID)
- return
- }
- // Promotion can proceed right away.
- if node.Spec.DesiredRole == api.NodeRoleManager && node.Role == api.NodeRoleWorker {
- err := rm.store.Update(func(tx store.Tx) error {
- updatedNode := store.GetNode(tx, node.ID)
- if updatedNode == nil || updatedNode.Spec.DesiredRole != node.Spec.DesiredRole || updatedNode.Role != node.Role {
- return nil
- }
- updatedNode.Role = api.NodeRoleManager
- return store.UpdateNode(tx, updatedNode)
- })
- if err != nil {
- log.G(ctx).WithError(err).Errorf("failed to promote node %s", node.ID)
- } else {
- delete(rm.pending, node.ID)
- }
- } else if node.Spec.DesiredRole == api.NodeRoleWorker && node.Role == api.NodeRoleManager {
- // Check for node in memberlist
- member := rm.raft.GetMemberByNodeID(node.ID)
- if member != nil {
- // Quorum safeguard
- if !rm.raft.CanRemoveMember(member.RaftID) {
- // TODO(aaronl): Retry later
- log.G(ctx).Debugf("can't demote node %s at this time: removing member from raft would result in a loss of quorum", node.ID)
- return
- }
- rmCtx, rmCancel := context.WithTimeout(rm.ctx, 5*time.Second)
- defer rmCancel()
- if member.RaftID == rm.raft.Config.ID {
- // Don't use rmCtx, because we expect to lose
- // leadership, which will cancel this context.
- log.G(ctx).Info("demoted; transferring leadership")
- err := rm.raft.TransferLeadership(context.Background())
- if err == nil {
- return
- }
- log.G(ctx).WithError(err).Info("failed to transfer leadership")
- }
- if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil {
- // TODO(aaronl): Retry later
- log.G(ctx).WithError(err).Debugf("can't demote node %s at this time", node.ID)
- }
- return
- }
- err := rm.store.Update(func(tx store.Tx) error {
- updatedNode := store.GetNode(tx, node.ID)
- if updatedNode == nil || updatedNode.Spec.DesiredRole != node.Spec.DesiredRole || updatedNode.Role != node.Role {
- return nil
- }
- updatedNode.Role = api.NodeRoleWorker
- return store.UpdateNode(tx, updatedNode)
- })
- if err != nil {
- log.G(ctx).WithError(err).Errorf("failed to demote node %s", node.ID)
- } else {
- delete(rm.pending, node.ID)
- }
- }
- }
- // Stop stops the roleManager and waits for the main loop to exit.
- func (rm *roleManager) Stop() {
- rm.cancel()
- <-rm.doneChan
- }
|