|
@@ -14,6 +14,7 @@ import (
|
|
|
"github.com/docker/swarmkit/ca"
|
|
|
"github.com/docker/swarmkit/identity"
|
|
|
"github.com/docker/swarmkit/log"
|
|
|
+ "github.com/docker/swarmkit/manager/state/store"
|
|
|
"github.com/docker/swarmkit/watch"
|
|
|
"golang.org/x/net/context"
|
|
|
)
|
|
@@ -23,7 +24,7 @@ var (
|
|
|
errNotRunning = errors.New("broker is not running")
|
|
|
)
|
|
|
|
|
|
-// LogBroker coordinates log subscriptions to services and tasks. Çlients can
|
|
|
+// LogBroker coordinates log subscriptions to services and tasks. Clients can
|
|
|
// publish and subscribe to logs channels.
|
|
|
//
|
|
|
// Log subscriptions are pushed to the work nodes by creating log subscsription
|
|
@@ -33,15 +34,19 @@ type LogBroker struct {
|
|
|
logQueue *watch.Queue
|
|
|
subscriptionQueue *watch.Queue
|
|
|
|
|
|
- registeredSubscriptions map[string]*api.SubscriptionMessage
|
|
|
+ registeredSubscriptions map[string]*subscription
|
|
|
|
|
|
pctx context.Context
|
|
|
cancelAll context.CancelFunc
|
|
|
+
|
|
|
+ store *store.MemoryStore
|
|
|
}
|
|
|
|
|
|
// New initializes and returns a new LogBroker
|
|
|
-func New() *LogBroker {
|
|
|
- return &LogBroker{}
|
|
|
+func New(store *store.MemoryStore) *LogBroker {
|
|
|
+ return &LogBroker{
|
|
|
+ store: store,
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Run the log broker
|
|
@@ -56,7 +61,7 @@ func (lb *LogBroker) Run(ctx context.Context) error {
|
|
|
lb.pctx, lb.cancelAll = context.WithCancel(ctx)
|
|
|
lb.logQueue = watch.NewQueue()
|
|
|
lb.subscriptionQueue = watch.NewQueue()
|
|
|
- lb.registeredSubscriptions = make(map[string]*api.SubscriptionMessage)
|
|
|
+ lb.registeredSubscriptions = make(map[string]*subscription)
|
|
|
lb.mu.Unlock()
|
|
|
|
|
|
select {
|
|
@@ -94,36 +99,60 @@ func validateSelector(selector *api.LogSelector) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (lb *LogBroker) registerSubscription(subscription *api.SubscriptionMessage) {
|
|
|
+func (lb *LogBroker) newSubscription(selector *api.LogSelector, options *api.LogSubscriptionOptions) *subscription {
|
|
|
+ lb.mu.RLock()
|
|
|
+ defer lb.mu.RUnlock()
|
|
|
+
|
|
|
+ subscription := newSubscription(lb.store, &api.SubscriptionMessage{
|
|
|
+ ID: identity.NewID(),
|
|
|
+ Selector: selector,
|
|
|
+ Options: options,
|
|
|
+ }, lb.subscriptionQueue)
|
|
|
+
|
|
|
+ return subscription
|
|
|
+}
|
|
|
+
|
|
|
+func (lb *LogBroker) registerSubscription(subscription *subscription) {
|
|
|
lb.mu.Lock()
|
|
|
defer lb.mu.Unlock()
|
|
|
|
|
|
- lb.registeredSubscriptions[subscription.ID] = subscription
|
|
|
+ lb.registeredSubscriptions[subscription.message.ID] = subscription
|
|
|
lb.subscriptionQueue.Publish(subscription)
|
|
|
}
|
|
|
|
|
|
-func (lb *LogBroker) unregisterSubscription(subscription *api.SubscriptionMessage) {
|
|
|
- subscription = subscription.Copy()
|
|
|
- subscription.Close = true
|
|
|
-
|
|
|
+func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
|
|
|
lb.mu.Lock()
|
|
|
defer lb.mu.Unlock()
|
|
|
|
|
|
- delete(lb.registeredSubscriptions, subscription.ID)
|
|
|
+ delete(lb.registeredSubscriptions, subscription.message.ID)
|
|
|
+ subscription.message.Close = true
|
|
|
lb.subscriptionQueue.Publish(subscription)
|
|
|
}
|
|
|
|
|
|
-func (lb *LogBroker) watchSubscriptions() ([]*api.SubscriptionMessage, chan events.Event, func()) {
|
|
|
+// watchSubscriptions grabs all current subscriptions and notifies of any
|
|
|
+// subscription change for this node.
|
|
|
+//
|
|
|
+// Subscriptions may fire multiple times and the caller has to protect against
|
|
|
+// dupes.
|
|
|
+func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan events.Event, func()) {
|
|
|
lb.mu.RLock()
|
|
|
defer lb.mu.RUnlock()
|
|
|
|
|
|
- subs := make([]*api.SubscriptionMessage, 0, len(lb.registeredSubscriptions))
|
|
|
- for _, sub := range lb.registeredSubscriptions {
|
|
|
- subs = append(subs, sub)
|
|
|
+ // Watch for subscription changes for this node.
|
|
|
+ ch, cancel := lb.subscriptionQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
|
|
|
+ s := event.(*subscription)
|
|
|
+ return s.Contains(nodeID)
|
|
|
+ }))
|
|
|
+
|
|
|
+ // Grab current subscriptions.
|
|
|
+ subscriptions := make([]*subscription, 0, len(lb.registeredSubscriptions))
|
|
|
+ for _, s := range lb.registeredSubscriptions {
|
|
|
+ if s.Contains(nodeID) {
|
|
|
+ subscriptions = append(subscriptions, s)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- ch, cancel := lb.subscriptionQueue.Watch()
|
|
|
- return subs, ch, cancel
|
|
|
+ return subscriptions, ch, cancel
|
|
|
}
|
|
|
|
|
|
func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) {
|
|
@@ -151,22 +180,20 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- subscription := &api.SubscriptionMessage{
|
|
|
- ID: identity.NewID(),
|
|
|
- Selector: request.Selector,
|
|
|
- Options: request.Options,
|
|
|
- }
|
|
|
+ subscription := lb.newSubscription(request.Selector, request.Options)
|
|
|
+ subscription.Run(lb.pctx)
|
|
|
+ defer subscription.Stop()
|
|
|
|
|
|
log := log.G(ctx).WithFields(
|
|
|
logrus.Fields{
|
|
|
"method": "(*LogBroker).SubscribeLogs",
|
|
|
- "subscription.id": subscription.ID,
|
|
|
+ "subscription.id": subscription.message.ID,
|
|
|
},
|
|
|
)
|
|
|
|
|
|
log.Debug("subscribed")
|
|
|
|
|
|
- publishCh, publishCancel := lb.subscribe(subscription.ID)
|
|
|
+ publishCh, publishCancel := lb.subscribe(subscription.message.ID)
|
|
|
defer publishCancel()
|
|
|
|
|
|
lb.registerSubscription(subscription)
|
|
@@ -202,11 +229,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|
|
"node": remote.NodeID,
|
|
|
},
|
|
|
)
|
|
|
- subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions()
|
|
|
+ subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions(remote.NodeID)
|
|
|
defer subscriptionCancel()
|
|
|
|
|
|
log.Debug("node registered")
|
|
|
|
|
|
+ activeSubscriptions := make(map[string]struct{})
|
|
|
+
|
|
|
// Start by sending down all active subscriptions.
|
|
|
for _, subscription := range subscriptions {
|
|
|
select {
|
|
@@ -217,19 +246,30 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|
|
default:
|
|
|
}
|
|
|
|
|
|
- if err := stream.Send(subscription); err != nil {
|
|
|
+ if err := stream.Send(subscription.message); err != nil {
|
|
|
log.Error(err)
|
|
|
return err
|
|
|
}
|
|
|
+ activeSubscriptions[subscription.message.ID] = struct{}{}
|
|
|
}
|
|
|
|
|
|
// Send down new subscriptions.
|
|
|
- // TODO(aluzzardi): We should filter by relevant tasks for this node rather
|
|
|
for {
|
|
|
select {
|
|
|
case v := <-subscriptionCh:
|
|
|
- subscription := v.(*api.SubscriptionMessage)
|
|
|
- if err := stream.Send(subscription); err != nil {
|
|
|
+ subscription := v.(*subscription)
|
|
|
+
|
|
|
+ if subscription.message.Close {
|
|
|
+ log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
|
|
|
+ } else {
|
|
|
+ // Avoid sending down the same subscription multiple times
|
|
|
+ if _, ok := activeSubscriptions[subscription.message.ID]; ok {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ activeSubscriptions[subscription.message.ID] = struct{}{}
|
|
|
+ log.WithField("subscription.id", subscription.message.ID).Debug("subscription added")
|
|
|
+ }
|
|
|
+ if err := stream.Send(subscription.message); err != nil {
|
|
|
log.Error(err)
|
|
|
return err
|
|
|
}
|