package logbroker import ( "fmt" "strings" "sync" events "github.com/docker/go-events" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state/store" "github.com/docker/swarmkit/watch" "golang.org/x/net/context" ) type subscription struct { mu sync.RWMutex wg sync.WaitGroup store *store.MemoryStore message *api.SubscriptionMessage changed *watch.Queue ctx context.Context cancel context.CancelFunc errors []error nodes map[string]struct{} pendingTasks map[string]struct{} } func newSubscription(store *store.MemoryStore, message *api.SubscriptionMessage, changed *watch.Queue) *subscription { return &subscription{ store: store, message: message, changed: changed, nodes: make(map[string]struct{}), pendingTasks: make(map[string]struct{}), } } func (s *subscription) follow() bool { return s.message.Options != nil && s.message.Options.Follow } func (s *subscription) Contains(nodeID string) bool { s.mu.RLock() defer s.mu.RUnlock() _, ok := s.nodes[nodeID] return ok } func (s *subscription) Nodes() []string { s.mu.RLock() defer s.mu.RUnlock() nodes := make([]string, 0, len(s.nodes)) for node := range s.nodes { nodes = append(nodes, node) } return nodes } func (s *subscription) Run(ctx context.Context) { s.ctx, s.cancel = context.WithCancel(ctx) if s.follow() { wq := s.store.WatchQueue() ch, cancel := state.Watch(wq, api.EventCreateTask{}, api.EventUpdateTask{}) go func() { defer cancel() s.watch(ch) }() } s.match() } func (s *subscription) Stop() { if s.cancel != nil { s.cancel() } } func (s *subscription) Wait(ctx context.Context) <-chan struct{} { // Follow subscriptions never end if s.follow() { return nil } ch := make(chan struct{}) go func() { defer close(ch) s.wg.Wait() }() return ch } func (s *subscription) Done(nodeID string, err error) { s.mu.Lock() defer s.mu.Unlock() if err != nil { s.errors = append(s.errors, err) } if s.follow() { return } if _, ok := s.nodes[nodeID]; !ok { return } delete(s.nodes, nodeID) s.wg.Done() } func (s *subscription) Err() error { s.mu.RLock() defer s.mu.RUnlock() if len(s.errors) == 0 && len(s.pendingTasks) == 0 { return nil } messages := make([]string, 0, len(s.errors)) for _, err := range s.errors { messages = append(messages, err.Error()) } for t := range s.pendingTasks { messages = append(messages, fmt.Sprintf("task %s has not been scheduled", t)) } return fmt.Errorf("warning: incomplete log stream. some logs could not be retrieved for the following reasons: %s", strings.Join(messages, ", ")) } func (s *subscription) Close() { s.mu.Lock() s.message.Close = true s.mu.Unlock() } func (s *subscription) Closed() bool { s.mu.RLock() defer s.mu.RUnlock() return s.message.Close } func (s *subscription) match() { s.mu.Lock() defer s.mu.Unlock() add := func(t *api.Task) { if t.NodeID == "" { s.pendingTasks[t.ID] = struct{}{} return } if _, ok := s.nodes[t.NodeID]; !ok { s.nodes[t.NodeID] = struct{}{} s.wg.Add(1) } } s.store.View(func(tx store.ReadTx) { for _, nid := range s.message.Selector.NodeIDs { s.nodes[nid] = struct{}{} } for _, tid := range s.message.Selector.TaskIDs { if task := store.GetTask(tx, tid); task != nil { add(task) } } for _, sid := range s.message.Selector.ServiceIDs { tasks, err := store.FindTasks(tx, store.ByServiceID(sid)) if err != nil { log.L.Warning(err) continue } for _, task := range tasks { // if we're not following, don't add tasks that aren't running yet if !s.follow() && task.Status.State < api.TaskStateRunning { continue } add(task) } } }) } func (s *subscription) watch(ch <-chan events.Event) error { matchTasks := map[string]struct{}{} for _, tid := range s.message.Selector.TaskIDs { matchTasks[tid] = struct{}{} } matchServices := map[string]struct{}{} for _, sid := range s.message.Selector.ServiceIDs { matchServices[sid] = struct{}{} } add := func(t *api.Task) { s.mu.Lock() defer s.mu.Unlock() // Un-allocated task. if t.NodeID == "" { s.pendingTasks[t.ID] = struct{}{} return } delete(s.pendingTasks, t.ID) if _, ok := s.nodes[t.NodeID]; !ok { s.nodes[t.NodeID] = struct{}{} s.changed.Publish(s) } } for { var t *api.Task select { case <-s.ctx.Done(): return s.ctx.Err() case event := <-ch: switch v := event.(type) { case api.EventCreateTask: t = v.Task case api.EventUpdateTask: t = v.Task } } if t == nil { panic("received invalid task from the watch queue") } if _, ok := matchTasks[t.ID]; ok { add(t) } if _, ok := matchServices[t.ServiceID]; ok { add(t) } } }