allocator.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package allocator
  2. import (
  3. "sync"
  4. "github.com/docker/docker/pkg/plugingetter"
  5. "github.com/docker/go-events"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/manager/state"
  8. "github.com/docker/swarmkit/manager/state/store"
  9. "golang.org/x/net/context"
  10. )
  11. // Allocator controls how the allocation stage in the manager is handled.
  12. type Allocator struct {
  13. // The manager store.
  14. store *store.MemoryStore
  15. // the ballot used to synchronize across all allocators to ensure
  16. // all of them have completed their respective allocations so that the
  17. // task can be moved to ALLOCATED state.
  18. taskBallot *taskBallot
  19. // context for the network allocator that will be needed by
  20. // network allocator.
  21. netCtx *networkContext
  22. // stopChan signals to the allocator to stop running.
  23. stopChan chan struct{}
  24. // doneChan is closed when the allocator is finished running.
  25. doneChan chan struct{}
  26. // pluginGetter provides access to docker's plugin inventory.
  27. pluginGetter plugingetter.PluginGetter
  28. }
  29. // taskBallot controls how the voting for task allocation is
  30. // coordinated b/w different allocators. This the only structure that
  31. // will be written by all allocator goroutines concurrently. Hence the
  32. // mutex.
  33. type taskBallot struct {
  34. sync.Mutex
  35. // List of registered voters who have to cast their vote to
  36. // indicate their allocation complete
  37. voters []string
  38. // List of votes collected for every task so far from different voters.
  39. votes map[string][]string
  40. }
  41. // allocActor controls the various phases in the lifecycle of one kind of allocator.
  42. type allocActor struct {
  43. // Channel through which the allocator gets all the events
  44. // that it is interested in.
  45. ch chan events.Event
  46. // cancel unregisters the watcher.
  47. cancel func()
  48. // Task voter identity of the allocator.
  49. taskVoter string
  50. // Action routine which is called for every event that the
  51. // allocator received.
  52. action func(context.Context, events.Event)
  53. // Init routine which is called during the initialization of
  54. // the allocator.
  55. init func(ctx context.Context) error
  56. }
  57. // New returns a new instance of Allocator for use during allocation
  58. // stage of the manager.
  59. func New(store *store.MemoryStore, pg plugingetter.PluginGetter) (*Allocator, error) {
  60. a := &Allocator{
  61. store: store,
  62. taskBallot: &taskBallot{
  63. votes: make(map[string][]string),
  64. },
  65. stopChan: make(chan struct{}),
  66. doneChan: make(chan struct{}),
  67. pluginGetter: pg,
  68. }
  69. return a, nil
  70. }
  71. // Run starts all allocator go-routines and waits for Stop to be called.
  72. func (a *Allocator) Run(ctx context.Context) error {
  73. // Setup cancel context for all goroutines to use.
  74. ctx, cancel := context.WithCancel(ctx)
  75. var wg sync.WaitGroup
  76. defer func() {
  77. cancel()
  78. wg.Wait()
  79. close(a.doneChan)
  80. }()
  81. var actors []func() error
  82. watch, watchCancel := state.Watch(a.store.WatchQueue(),
  83. api.EventCreateNetwork{},
  84. api.EventDeleteNetwork{},
  85. api.EventCreateService{},
  86. api.EventUpdateService{},
  87. api.EventDeleteService{},
  88. api.EventCreateTask{},
  89. api.EventUpdateTask{},
  90. api.EventDeleteTask{},
  91. api.EventCreateNode{},
  92. api.EventUpdateNode{},
  93. api.EventDeleteNode{},
  94. state.EventCommit{},
  95. )
  96. for _, aa := range []allocActor{
  97. {
  98. ch: watch,
  99. cancel: watchCancel,
  100. taskVoter: networkVoter,
  101. init: a.doNetworkInit,
  102. action: a.doNetworkAlloc,
  103. },
  104. } {
  105. if aa.taskVoter != "" {
  106. a.registerToVote(aa.taskVoter)
  107. }
  108. // Copy the iterated value for variable capture.
  109. aaCopy := aa
  110. actor := func() error {
  111. wg.Add(1)
  112. defer wg.Done()
  113. // init might return an allocator specific context
  114. // which is a child of the passed in context to hold
  115. // allocator specific state
  116. if err := aaCopy.init(ctx); err != nil {
  117. // Stop the watches for this allocator
  118. // if we are failing in the init of
  119. // this allocator.
  120. aa.cancel()
  121. return err
  122. }
  123. wg.Add(1)
  124. go func() {
  125. defer wg.Done()
  126. a.run(ctx, aaCopy)
  127. }()
  128. return nil
  129. }
  130. actors = append(actors, actor)
  131. }
  132. for _, actor := range actors {
  133. if err := actor(); err != nil {
  134. return err
  135. }
  136. }
  137. <-a.stopChan
  138. return nil
  139. }
  140. // Stop stops the allocator
  141. func (a *Allocator) Stop() {
  142. close(a.stopChan)
  143. // Wait for all allocator goroutines to truly exit
  144. <-a.doneChan
  145. }
  146. func (a *Allocator) run(ctx context.Context, aa allocActor) {
  147. for {
  148. select {
  149. case ev, ok := <-aa.ch:
  150. if !ok {
  151. return
  152. }
  153. aa.action(ctx, ev)
  154. case <-ctx.Done():
  155. return
  156. }
  157. }
  158. }
  159. func (a *Allocator) registerToVote(name string) {
  160. a.taskBallot.Lock()
  161. defer a.taskBallot.Unlock()
  162. a.taskBallot.voters = append(a.taskBallot.voters, name)
  163. }
  164. func (a *Allocator) taskAllocateVote(voter string, id string) bool {
  165. a.taskBallot.Lock()
  166. defer a.taskBallot.Unlock()
  167. // If voter has already voted, return false
  168. for _, v := range a.taskBallot.votes[id] {
  169. // check if voter is in x
  170. if v == voter {
  171. return false
  172. }
  173. }
  174. a.taskBallot.votes[id] = append(a.taskBallot.votes[id], voter)
  175. // We haven't gotten enough votes yet
  176. if len(a.taskBallot.voters) > len(a.taskBallot.votes[id]) {
  177. return false
  178. }
  179. nextVoter:
  180. for _, voter := range a.taskBallot.voters {
  181. for _, vote := range a.taskBallot.votes[id] {
  182. if voter == vote {
  183. continue nextVoter
  184. }
  185. }
  186. // Not every registered voter has registered a vote.
  187. return false
  188. }
  189. return true
  190. }