subscription.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package logbroker
  2. import (
  3. "context"
  4. "sync"
  5. events "github.com/docker/go-events"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/log"
  8. "github.com/docker/swarmkit/manager/state"
  9. "github.com/docker/swarmkit/manager/state/store"
  10. "github.com/docker/swarmkit/watch"
  11. )
  12. type subscription struct {
  13. mu sync.RWMutex
  14. store *store.MemoryStore
  15. message *api.SubscriptionMessage
  16. changed *watch.Queue
  17. ctx context.Context
  18. cancel context.CancelFunc
  19. nodes map[string]struct{}
  20. }
  21. func newSubscription(store *store.MemoryStore, message *api.SubscriptionMessage, changed *watch.Queue) *subscription {
  22. return &subscription{
  23. store: store,
  24. message: message,
  25. changed: changed,
  26. nodes: make(map[string]struct{}),
  27. }
  28. }
  29. func (s *subscription) Contains(nodeID string) bool {
  30. s.mu.RLock()
  31. defer s.mu.RUnlock()
  32. _, ok := s.nodes[nodeID]
  33. return ok
  34. }
  35. func (s *subscription) Run(ctx context.Context) {
  36. s.ctx, s.cancel = context.WithCancel(ctx)
  37. wq := s.store.WatchQueue()
  38. ch, cancel := state.Watch(wq, state.EventCreateTask{}, state.EventUpdateTask{})
  39. go func() {
  40. defer cancel()
  41. s.watch(ch)
  42. }()
  43. s.match()
  44. }
  45. func (s *subscription) Stop() {
  46. if s.cancel != nil {
  47. s.cancel()
  48. }
  49. }
  50. func (s *subscription) match() {
  51. s.mu.Lock()
  52. defer s.mu.Unlock()
  53. s.store.View(func(tx store.ReadTx) {
  54. for _, nid := range s.message.Selector.NodeIDs {
  55. s.nodes[nid] = struct{}{}
  56. }
  57. for _, tid := range s.message.Selector.TaskIDs {
  58. if task := store.GetTask(tx, tid); task != nil {
  59. s.nodes[task.NodeID] = struct{}{}
  60. }
  61. }
  62. for _, sid := range s.message.Selector.ServiceIDs {
  63. tasks, err := store.FindTasks(tx, store.ByServiceID(sid))
  64. if err != nil {
  65. log.L.Warning(err)
  66. continue
  67. }
  68. for _, task := range tasks {
  69. s.nodes[task.NodeID] = struct{}{}
  70. }
  71. }
  72. })
  73. }
  74. func (s *subscription) watch(ch <-chan events.Event) error {
  75. matchTasks := map[string]struct{}{}
  76. for _, tid := range s.message.Selector.TaskIDs {
  77. matchTasks[tid] = struct{}{}
  78. }
  79. matchServices := map[string]struct{}{}
  80. for _, sid := range s.message.Selector.ServiceIDs {
  81. matchServices[sid] = struct{}{}
  82. }
  83. add := func(nodeID string) {
  84. s.mu.Lock()
  85. defer s.mu.Unlock()
  86. if _, ok := s.nodes[nodeID]; !ok {
  87. s.nodes[nodeID] = struct{}{}
  88. s.changed.Publish(s)
  89. }
  90. }
  91. for {
  92. var t *api.Task
  93. select {
  94. case <-s.ctx.Done():
  95. return s.ctx.Err()
  96. case event := <-ch:
  97. switch v := event.(type) {
  98. case state.EventCreateTask:
  99. t = v.Task
  100. case state.EventUpdateTask:
  101. t = v.Task
  102. }
  103. }
  104. if t == nil {
  105. panic("received invalid task from the watch queue")
  106. }
  107. if _, ok := matchTasks[t.ID]; ok {
  108. add(t.NodeID)
  109. }
  110. if _, ok := matchServices[t.ServiceID]; ok {
  111. add(t.NodeID)
  112. }
  113. }
  114. }