subscription.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package logbroker
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. events "github.com/docker/go-events"
  7. "github.com/docker/swarmkit/api"
  8. "github.com/docker/swarmkit/log"
  9. "github.com/docker/swarmkit/manager/state"
  10. "github.com/docker/swarmkit/manager/state/store"
  11. "github.com/docker/swarmkit/watch"
  12. "golang.org/x/net/context"
  13. )
  14. type subscription struct {
  15. mu sync.RWMutex
  16. wg sync.WaitGroup
  17. store *store.MemoryStore
  18. message *api.SubscriptionMessage
  19. changed *watch.Queue
  20. ctx context.Context
  21. cancel context.CancelFunc
  22. errors []error
  23. nodes map[string]struct{}
  24. pendingTasks map[string]struct{}
  25. }
  26. func newSubscription(store *store.MemoryStore, message *api.SubscriptionMessage, changed *watch.Queue) *subscription {
  27. return &subscription{
  28. store: store,
  29. message: message,
  30. changed: changed,
  31. nodes: make(map[string]struct{}),
  32. pendingTasks: make(map[string]struct{}),
  33. }
  34. }
  35. func (s *subscription) follow() bool {
  36. return s.message.Options != nil && s.message.Options.Follow
  37. }
  38. func (s *subscription) Contains(nodeID string) bool {
  39. s.mu.RLock()
  40. defer s.mu.RUnlock()
  41. _, ok := s.nodes[nodeID]
  42. return ok
  43. }
  44. func (s *subscription) Nodes() []string {
  45. s.mu.RLock()
  46. defer s.mu.RUnlock()
  47. nodes := make([]string, 0, len(s.nodes))
  48. for node := range s.nodes {
  49. nodes = append(nodes, node)
  50. }
  51. return nodes
  52. }
  53. func (s *subscription) Run(ctx context.Context) {
  54. s.ctx, s.cancel = context.WithCancel(ctx)
  55. if s.follow() {
  56. wq := s.store.WatchQueue()
  57. ch, cancel := state.Watch(wq, api.EventCreateTask{}, api.EventUpdateTask{})
  58. go func() {
  59. defer cancel()
  60. s.watch(ch)
  61. }()
  62. }
  63. s.match()
  64. }
  65. func (s *subscription) Stop() {
  66. if s.cancel != nil {
  67. s.cancel()
  68. }
  69. }
  70. func (s *subscription) Wait(ctx context.Context) <-chan struct{} {
  71. // Follow subscriptions never end
  72. if s.follow() {
  73. return nil
  74. }
  75. ch := make(chan struct{})
  76. go func() {
  77. defer close(ch)
  78. s.wg.Wait()
  79. }()
  80. return ch
  81. }
  82. func (s *subscription) Done(nodeID string, err error) {
  83. s.mu.Lock()
  84. defer s.mu.Unlock()
  85. if err != nil {
  86. s.errors = append(s.errors, err)
  87. }
  88. if s.follow() {
  89. return
  90. }
  91. if _, ok := s.nodes[nodeID]; !ok {
  92. return
  93. }
  94. delete(s.nodes, nodeID)
  95. s.wg.Done()
  96. }
  97. func (s *subscription) Err() error {
  98. s.mu.RLock()
  99. defer s.mu.RUnlock()
  100. if len(s.errors) == 0 && len(s.pendingTasks) == 0 {
  101. return nil
  102. }
  103. messages := make([]string, 0, len(s.errors))
  104. for _, err := range s.errors {
  105. messages = append(messages, err.Error())
  106. }
  107. for t := range s.pendingTasks {
  108. messages = append(messages, fmt.Sprintf("task %s has not been scheduled", t))
  109. }
  110. return fmt.Errorf("warning: incomplete log stream. some logs could not be retrieved for the following reasons: %s", strings.Join(messages, ", "))
  111. }
  112. func (s *subscription) Close() {
  113. s.mu.Lock()
  114. s.message.Close = true
  115. s.mu.Unlock()
  116. }
  117. func (s *subscription) Closed() bool {
  118. s.mu.RLock()
  119. defer s.mu.RUnlock()
  120. return s.message.Close
  121. }
  122. func (s *subscription) match() {
  123. s.mu.Lock()
  124. defer s.mu.Unlock()
  125. add := func(t *api.Task) {
  126. if t.NodeID == "" {
  127. s.pendingTasks[t.ID] = struct{}{}
  128. return
  129. }
  130. if _, ok := s.nodes[t.NodeID]; !ok {
  131. s.nodes[t.NodeID] = struct{}{}
  132. s.wg.Add(1)
  133. }
  134. }
  135. s.store.View(func(tx store.ReadTx) {
  136. for _, nid := range s.message.Selector.NodeIDs {
  137. s.nodes[nid] = struct{}{}
  138. }
  139. for _, tid := range s.message.Selector.TaskIDs {
  140. if task := store.GetTask(tx, tid); task != nil {
  141. add(task)
  142. }
  143. }
  144. for _, sid := range s.message.Selector.ServiceIDs {
  145. tasks, err := store.FindTasks(tx, store.ByServiceID(sid))
  146. if err != nil {
  147. log.L.Warning(err)
  148. continue
  149. }
  150. for _, task := range tasks {
  151. // if we're not following, don't add tasks that aren't running yet
  152. if !s.follow() && task.Status.State < api.TaskStateRunning {
  153. continue
  154. }
  155. add(task)
  156. }
  157. }
  158. })
  159. }
  160. func (s *subscription) watch(ch <-chan events.Event) error {
  161. matchTasks := map[string]struct{}{}
  162. for _, tid := range s.message.Selector.TaskIDs {
  163. matchTasks[tid] = struct{}{}
  164. }
  165. matchServices := map[string]struct{}{}
  166. for _, sid := range s.message.Selector.ServiceIDs {
  167. matchServices[sid] = struct{}{}
  168. }
  169. add := func(t *api.Task) {
  170. s.mu.Lock()
  171. defer s.mu.Unlock()
  172. // Un-allocated task.
  173. if t.NodeID == "" {
  174. s.pendingTasks[t.ID] = struct{}{}
  175. return
  176. }
  177. delete(s.pendingTasks, t.ID)
  178. if _, ok := s.nodes[t.NodeID]; !ok {
  179. s.nodes[t.NodeID] = struct{}{}
  180. s.changed.Publish(s)
  181. }
  182. }
  183. for {
  184. var t *api.Task
  185. select {
  186. case <-s.ctx.Done():
  187. return s.ctx.Err()
  188. case event := <-ch:
  189. switch v := event.(type) {
  190. case api.EventCreateTask:
  191. t = v.Task
  192. case api.EventUpdateTask:
  193. t = v.Task
  194. }
  195. }
  196. if t == nil {
  197. panic("received invalid task from the watch queue")
  198. }
  199. if _, ok := matchTasks[t.ID]; ok {
  200. add(t)
  201. }
  202. if _, ok := matchServices[t.ServiceID]; ok {
  203. add(t)
  204. }
  205. }
  206. }