role_manager.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package manager
  2. import (
  3. "time"
  4. "github.com/docker/swarmkit/api"
  5. "github.com/docker/swarmkit/log"
  6. "github.com/docker/swarmkit/manager/state/raft"
  7. "github.com/docker/swarmkit/manager/state/store"
  8. "golang.org/x/net/context"
  9. )
  10. const roleReconcileInterval = 5 * time.Second
  11. // roleManager reconciles the raft member list with desired role changes.
  12. type roleManager struct {
  13. ctx context.Context
  14. cancel func()
  15. store *store.MemoryStore
  16. raft *raft.Node
  17. doneChan chan struct{}
  18. // pending contains changed nodes that have not yet been reconciled in
  19. // the raft member list.
  20. pending map[string]*api.Node
  21. }
  22. // newRoleManager creates a new roleManager.
  23. func newRoleManager(store *store.MemoryStore, raftNode *raft.Node) *roleManager {
  24. ctx, cancel := context.WithCancel(context.Background())
  25. return &roleManager{
  26. ctx: ctx,
  27. cancel: cancel,
  28. store: store,
  29. raft: raftNode,
  30. doneChan: make(chan struct{}),
  31. pending: make(map[string]*api.Node),
  32. }
  33. }
  34. // Run is roleManager's main loop.
  35. // ctx is only used for logging.
  36. func (rm *roleManager) Run(ctx context.Context) {
  37. defer close(rm.doneChan)
  38. var (
  39. nodes []*api.Node
  40. ticker *time.Ticker
  41. tickerCh <-chan time.Time
  42. )
  43. watcher, cancelWatch, err := store.ViewAndWatch(rm.store,
  44. func(readTx store.ReadTx) error {
  45. var err error
  46. nodes, err = store.FindNodes(readTx, store.All)
  47. return err
  48. },
  49. api.EventUpdateNode{})
  50. defer cancelWatch()
  51. if err != nil {
  52. log.G(ctx).WithError(err).Error("failed to check nodes for role changes")
  53. } else {
  54. for _, node := range nodes {
  55. rm.pending[node.ID] = node
  56. rm.reconcileRole(ctx, node)
  57. }
  58. if len(rm.pending) != 0 {
  59. ticker = time.NewTicker(roleReconcileInterval)
  60. tickerCh = ticker.C
  61. }
  62. }
  63. for {
  64. select {
  65. case event := <-watcher:
  66. node := event.(api.EventUpdateNode).Node
  67. rm.pending[node.ID] = node
  68. rm.reconcileRole(ctx, node)
  69. if len(rm.pending) != 0 && ticker == nil {
  70. ticker = time.NewTicker(roleReconcileInterval)
  71. tickerCh = ticker.C
  72. }
  73. case <-tickerCh:
  74. for _, node := range rm.pending {
  75. rm.reconcileRole(ctx, node)
  76. }
  77. if len(rm.pending) == 0 {
  78. ticker.Stop()
  79. ticker = nil
  80. tickerCh = nil
  81. }
  82. case <-rm.ctx.Done():
  83. if ticker != nil {
  84. ticker.Stop()
  85. }
  86. return
  87. }
  88. }
  89. }
  90. func (rm *roleManager) reconcileRole(ctx context.Context, node *api.Node) {
  91. if node.Role == node.Spec.DesiredRole {
  92. // Nothing to do.
  93. delete(rm.pending, node.ID)
  94. return
  95. }
  96. // Promotion can proceed right away.
  97. if node.Spec.DesiredRole == api.NodeRoleManager && node.Role == api.NodeRoleWorker {
  98. err := rm.store.Update(func(tx store.Tx) error {
  99. updatedNode := store.GetNode(tx, node.ID)
  100. if updatedNode == nil || updatedNode.Spec.DesiredRole != node.Spec.DesiredRole || updatedNode.Role != node.Role {
  101. return nil
  102. }
  103. updatedNode.Role = api.NodeRoleManager
  104. return store.UpdateNode(tx, updatedNode)
  105. })
  106. if err != nil {
  107. log.G(ctx).WithError(err).Errorf("failed to promote node %s", node.ID)
  108. } else {
  109. delete(rm.pending, node.ID)
  110. }
  111. } else if node.Spec.DesiredRole == api.NodeRoleWorker && node.Role == api.NodeRoleManager {
  112. // Check for node in memberlist
  113. member := rm.raft.GetMemberByNodeID(node.ID)
  114. if member != nil {
  115. // Quorum safeguard
  116. if !rm.raft.CanRemoveMember(member.RaftID) {
  117. // TODO(aaronl): Retry later
  118. log.G(ctx).Debugf("can't demote node %s at this time: removing member from raft would result in a loss of quorum", node.ID)
  119. return
  120. }
  121. rmCtx, rmCancel := context.WithTimeout(rm.ctx, 5*time.Second)
  122. defer rmCancel()
  123. if member.RaftID == rm.raft.Config.ID {
  124. // Don't use rmCtx, because we expect to lose
  125. // leadership, which will cancel this context.
  126. log.G(ctx).Info("demoted; transferring leadership")
  127. err := rm.raft.TransferLeadership(context.Background())
  128. if err == nil {
  129. return
  130. }
  131. log.G(ctx).WithError(err).Info("failed to transfer leadership")
  132. }
  133. if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil {
  134. // TODO(aaronl): Retry later
  135. log.G(ctx).WithError(err).Debugf("can't demote node %s at this time", node.ID)
  136. }
  137. return
  138. }
  139. err := rm.store.Update(func(tx store.Tx) error {
  140. updatedNode := store.GetNode(tx, node.ID)
  141. if updatedNode == nil || updatedNode.Spec.DesiredRole != node.Spec.DesiredRole || updatedNode.Role != node.Role {
  142. return nil
  143. }
  144. updatedNode.Role = api.NodeRoleWorker
  145. return store.UpdateNode(tx, updatedNode)
  146. })
  147. if err != nil {
  148. log.G(ctx).WithError(err).Errorf("failed to demote node %s", node.ID)
  149. } else {
  150. delete(rm.pending, node.ID)
  151. }
  152. }
  153. }
  154. // Stop stops the roleManager and waits for the main loop to exit.
  155. func (rm *roleManager) Stop() {
  156. rm.cancel()
  157. <-rm.doneChan
  158. }