allocator.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package allocator
  2. import (
  3. "net"
  4. "sync"
  5. "github.com/docker/docker/pkg/plugingetter"
  6. "github.com/docker/go-events"
  7. "github.com/docker/swarmkit/api"
  8. "github.com/docker/swarmkit/manager/state"
  9. "github.com/docker/swarmkit/manager/state/store"
  10. "golang.org/x/net/context"
  11. )
  12. // Allocator controls how the allocation stage in the manager is handled.
  13. type Allocator struct {
  14. // The manager store.
  15. store *store.MemoryStore
  16. // the ballot used to synchronize across all allocators to ensure
  17. // all of them have completed their respective allocations so that the
  18. // task can be moved to ALLOCATED state.
  19. taskBallot *taskBallot
  20. // context for the network allocator that will be needed by
  21. // network allocator.
  22. netCtx *networkContext
  23. // stopChan signals to the allocator to stop running.
  24. stopChan chan struct{}
  25. // doneChan is closed when the allocator is finished running.
  26. doneChan chan struct{}
  27. // pluginGetter provides access to docker's plugin inventory.
  28. pluginGetter plugingetter.PluginGetter
  29. // DefaultAddrPool specifies default subnet pool for global scope networks
  30. defaultAddrPool []*net.IPNet
  31. // SubnetSize specifies the subnet size of the networks created from
  32. // the default subnet pool
  33. subnetSize int
  34. }
  35. // taskBallot controls how the voting for task allocation is
  36. // coordinated b/w different allocators. This the only structure that
  37. // will be written by all allocator goroutines concurrently. Hence the
  38. // mutex.
  39. type taskBallot struct {
  40. sync.Mutex
  41. // List of registered voters who have to cast their vote to
  42. // indicate their allocation complete
  43. voters []string
  44. // List of votes collected for every task so far from different voters.
  45. votes map[string][]string
  46. }
  47. // allocActor controls the various phases in the lifecycle of one kind of allocator.
  48. type allocActor struct {
  49. // Task voter identity of the allocator.
  50. taskVoter string
  51. // Action routine which is called for every event that the
  52. // allocator received.
  53. action func(context.Context, events.Event)
  54. // Init routine which is called during the initialization of
  55. // the allocator.
  56. init func(ctx context.Context) error
  57. }
  58. // New returns a new instance of Allocator for use during allocation
  59. // stage of the manager.
  60. func New(store *store.MemoryStore, pg plugingetter.PluginGetter, defaultAddrPool []*net.IPNet, subnetSize int) (*Allocator, error) {
  61. a := &Allocator{
  62. store: store,
  63. taskBallot: &taskBallot{
  64. votes: make(map[string][]string),
  65. },
  66. stopChan: make(chan struct{}),
  67. doneChan: make(chan struct{}),
  68. pluginGetter: pg,
  69. defaultAddrPool: defaultAddrPool,
  70. subnetSize: subnetSize,
  71. }
  72. return a, nil
  73. }
  74. // Run starts all allocator go-routines and waits for Stop to be called.
  75. func (a *Allocator) Run(ctx context.Context) error {
  76. // Setup cancel context for all goroutines to use.
  77. ctx, cancel := context.WithCancel(ctx)
  78. var (
  79. wg sync.WaitGroup
  80. actors []func() error
  81. )
  82. defer func() {
  83. cancel()
  84. wg.Wait()
  85. close(a.doneChan)
  86. }()
  87. for _, aa := range []allocActor{
  88. {
  89. taskVoter: networkVoter,
  90. init: a.doNetworkInit,
  91. action: a.doNetworkAlloc,
  92. },
  93. } {
  94. if aa.taskVoter != "" {
  95. a.registerToVote(aa.taskVoter)
  96. }
  97. // Assign a pointer for variable capture
  98. aaPtr := &aa
  99. actor := func() error {
  100. wg.Add(1)
  101. defer wg.Done()
  102. // init might return an allocator specific context
  103. // which is a child of the passed in context to hold
  104. // allocator specific state
  105. watch, watchCancel, err := a.init(ctx, aaPtr)
  106. if err != nil {
  107. return err
  108. }
  109. wg.Add(1)
  110. go func(watch <-chan events.Event, watchCancel func()) {
  111. defer func() {
  112. wg.Done()
  113. watchCancel()
  114. }()
  115. a.run(ctx, *aaPtr, watch)
  116. }(watch, watchCancel)
  117. return nil
  118. }
  119. actors = append(actors, actor)
  120. }
  121. for _, actor := range actors {
  122. if err := actor(); err != nil {
  123. return err
  124. }
  125. }
  126. <-a.stopChan
  127. return nil
  128. }
  129. // Stop stops the allocator
  130. func (a *Allocator) Stop() {
  131. close(a.stopChan)
  132. // Wait for all allocator goroutines to truly exit
  133. <-a.doneChan
  134. }
  135. func (a *Allocator) init(ctx context.Context, aa *allocActor) (<-chan events.Event, func(), error) {
  136. watch, watchCancel := state.Watch(a.store.WatchQueue(),
  137. api.EventCreateNetwork{},
  138. api.EventDeleteNetwork{},
  139. api.EventCreateService{},
  140. api.EventUpdateService{},
  141. api.EventDeleteService{},
  142. api.EventCreateTask{},
  143. api.EventUpdateTask{},
  144. api.EventDeleteTask{},
  145. api.EventCreateNode{},
  146. api.EventUpdateNode{},
  147. api.EventDeleteNode{},
  148. state.EventCommit{},
  149. )
  150. if err := aa.init(ctx); err != nil {
  151. watchCancel()
  152. return nil, nil, err
  153. }
  154. return watch, watchCancel, nil
  155. }
  156. func (a *Allocator) run(ctx context.Context, aa allocActor, watch <-chan events.Event) {
  157. for {
  158. select {
  159. case ev, ok := <-watch:
  160. if !ok {
  161. return
  162. }
  163. aa.action(ctx, ev)
  164. case <-ctx.Done():
  165. return
  166. }
  167. }
  168. }
  169. func (a *Allocator) registerToVote(name string) {
  170. a.taskBallot.Lock()
  171. defer a.taskBallot.Unlock()
  172. a.taskBallot.voters = append(a.taskBallot.voters, name)
  173. }
  174. func (a *Allocator) taskAllocateVote(voter string, id string) bool {
  175. a.taskBallot.Lock()
  176. defer a.taskBallot.Unlock()
  177. // If voter has already voted, return false
  178. for _, v := range a.taskBallot.votes[id] {
  179. // check if voter is in x
  180. if v == voter {
  181. return false
  182. }
  183. }
  184. a.taskBallot.votes[id] = append(a.taskBallot.votes[id], voter)
  185. // We haven't gotten enough votes yet
  186. if len(a.taskBallot.voters) > len(a.taskBallot.votes[id]) {
  187. return false
  188. }
  189. nextVoter:
  190. for _, voter := range a.taskBallot.voters {
  191. for _, vote := range a.taskBallot.votes[id] {
  192. if voter == vote {
  193. continue nextVoter
  194. }
  195. }
  196. // Not every registered voter has registered a vote.
  197. return false
  198. }
  199. return true
  200. }