|
@@ -2,6 +2,7 @@ package logbroker
|
|
|
|
|
|
import (
|
|
|
"errors"
|
|
|
+ "fmt"
|
|
|
"io"
|
|
|
"sync"
|
|
|
|
|
@@ -24,6 +25,12 @@ var (
|
|
|
errNotRunning = errors.New("broker is not running")
|
|
|
)
|
|
|
|
|
|
+type logMessage struct {
|
|
|
+ *api.PublishLogsMessage
|
|
|
+ completed bool
|
|
|
+ err error
|
|
|
+}
|
|
|
+
|
|
|
// LogBroker coordinates log subscriptions to services and tasks. Clients can
|
|
|
// publish and subscribe to logs channels.
|
|
|
//
|
|
@@ -35,6 +42,7 @@ type LogBroker struct {
|
|
|
subscriptionQueue *watch.Queue
|
|
|
|
|
|
registeredSubscriptions map[string]*subscription
|
|
|
+ connectedNodes map[string]struct{}
|
|
|
|
|
|
pctx context.Context
|
|
|
cancelAll context.CancelFunc
|
|
@@ -62,6 +70,7 @@ func (lb *LogBroker) Run(ctx context.Context) error {
|
|
|
lb.logQueue = watch.NewQueue()
|
|
|
lb.subscriptionQueue = watch.NewQueue()
|
|
|
lb.registeredSubscriptions = make(map[string]*subscription)
|
|
|
+ lb.connectedNodes = make(map[string]struct{})
|
|
|
lb.mu.Unlock()
|
|
|
|
|
|
select {
|
|
@@ -112,12 +121,30 @@ func (lb *LogBroker) newSubscription(selector *api.LogSelector, options *api.Log
|
|
|
return subscription
|
|
|
}
|
|
|
|
|
|
+func (lb *LogBroker) getSubscription(id string) *subscription {
|
|
|
+ lb.mu.RLock()
|
|
|
+ defer lb.mu.RUnlock()
|
|
|
+
|
|
|
+ subscription, ok := lb.registeredSubscriptions[id]
|
|
|
+ if !ok {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return subscription
|
|
|
+}
|
|
|
+
|
|
|
func (lb *LogBroker) registerSubscription(subscription *subscription) {
|
|
|
lb.mu.Lock()
|
|
|
defer lb.mu.Unlock()
|
|
|
|
|
|
lb.registeredSubscriptions[subscription.message.ID] = subscription
|
|
|
lb.subscriptionQueue.Publish(subscription)
|
|
|
+
|
|
|
+ // Mark nodes that won't receive the message as done.
|
|
|
+ for _, node := range subscription.Nodes() {
|
|
|
+ if _, ok := lb.connectedNodes[node]; !ok {
|
|
|
+ subscription.Done(node, fmt.Errorf("node %s is not available", node))
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
|
|
@@ -160,7 +187,7 @@ func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) {
|
|
|
defer lb.mu.RUnlock()
|
|
|
|
|
|
return lb.logQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
|
|
|
- publish := event.(*api.PublishLogsMessage)
|
|
|
+ publish := event.(*logMessage)
|
|
|
return publish.SubscriptionID == id
|
|
|
}))
|
|
|
}
|
|
@@ -169,7 +196,7 @@ func (lb *LogBroker) publish(log *api.PublishLogsMessage) {
|
|
|
lb.mu.RLock()
|
|
|
defer lb.mu.RUnlock()
|
|
|
|
|
|
- lb.logQueue.Publish(log)
|
|
|
+ lb.logQueue.Publish(&logMessage{PublishLogsMessage: log})
|
|
|
}
|
|
|
|
|
|
// SubscribeLogs creates a log subscription and streams back logs
|
|
@@ -190,7 +217,6 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
|
|
|
"subscription.id": subscription.message.ID,
|
|
|
},
|
|
|
)
|
|
|
-
|
|
|
log.Debug("subscribed")
|
|
|
|
|
|
publishCh, publishCancel := lb.subscribe(subscription.message.ID)
|
|
@@ -199,23 +225,50 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
|
|
|
lb.registerSubscription(subscription)
|
|
|
defer lb.unregisterSubscription(subscription)
|
|
|
|
|
|
+ completed := subscription.Wait(ctx)
|
|
|
for {
|
|
|
select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return ctx.Err()
|
|
|
+ case <-lb.pctx.Done():
|
|
|
+ return lb.pctx.Err()
|
|
|
case event := <-publishCh:
|
|
|
- publish := event.(*api.PublishLogsMessage)
|
|
|
+ publish := event.(*logMessage)
|
|
|
+ if publish.completed {
|
|
|
+ return publish.err
|
|
|
+ }
|
|
|
if err := stream.Send(&api.SubscribeLogsMessage{
|
|
|
Messages: publish.Messages,
|
|
|
}); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- case <-ctx.Done():
|
|
|
- return ctx.Err()
|
|
|
- case <-lb.pctx.Done():
|
|
|
- return nil
|
|
|
+ case <-completed:
|
|
|
+ completed = nil
|
|
|
+ lb.logQueue.Publish(&logMessage{
|
|
|
+ PublishLogsMessage: &api.PublishLogsMessage{
|
|
|
+ SubscriptionID: subscription.message.ID,
|
|
|
+ },
|
|
|
+ completed: true,
|
|
|
+ err: subscription.Err(),
|
|
|
+ })
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (lb *LogBroker) nodeConnected(nodeID string) {
|
|
|
+ lb.mu.Lock()
|
|
|
+ defer lb.mu.Unlock()
|
|
|
+
|
|
|
+ lb.connectedNodes[nodeID] = struct{}{}
|
|
|
+}
|
|
|
+
|
|
|
+func (lb *LogBroker) nodeDisconnected(nodeID string) {
|
|
|
+ lb.mu.Lock()
|
|
|
+ defer lb.mu.Unlock()
|
|
|
+
|
|
|
+ delete(lb.connectedNodes, nodeID)
|
|
|
+}
|
|
|
+
|
|
|
// ListenSubscriptions returns a stream of matching subscriptions for the current node
|
|
|
func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest, stream api.LogBroker_ListenSubscriptionsServer) error {
|
|
|
remote, err := ca.RemoteNode(stream.Context())
|
|
@@ -223,6 +276,9 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+ lb.nodeConnected(remote.NodeID)
|
|
|
+ defer lb.nodeDisconnected(remote.NodeID)
|
|
|
+
|
|
|
log := log.G(stream.Context()).WithFields(
|
|
|
logrus.Fields{
|
|
|
"method": "(*LogBroker).ListenSubscriptions",
|
|
@@ -234,7 +290,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|
|
|
|
|
log.Debug("node registered")
|
|
|
|
|
|
- activeSubscriptions := make(map[string]struct{})
|
|
|
+ activeSubscriptions := make(map[string]*subscription)
|
|
|
+ defer func() {
|
|
|
+ // If the worker quits, mark all active subscriptions as finished.
|
|
|
+ for _, subscription := range activeSubscriptions {
|
|
|
+ subscription.Done(remote.NodeID, fmt.Errorf("node %s disconnected unexpectedly", remote.NodeID))
|
|
|
+ }
|
|
|
+ }()
|
|
|
|
|
|
// Start by sending down all active subscriptions.
|
|
|
for _, subscription := range subscriptions {
|
|
@@ -250,7 +312,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|
|
log.Error(err)
|
|
|
return err
|
|
|
}
|
|
|
- activeSubscriptions[subscription.message.ID] = struct{}{}
|
|
|
+ activeSubscriptions[subscription.message.ID] = subscription
|
|
|
}
|
|
|
|
|
|
// Send down new subscriptions.
|
|
@@ -261,12 +323,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|
|
|
|
|
if subscription.message.Close {
|
|
|
log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
|
|
|
+ delete(activeSubscriptions, subscription.message.ID)
|
|
|
} else {
|
|
|
// Avoid sending down the same subscription multiple times
|
|
|
if _, ok := activeSubscriptions[subscription.message.ID]; ok {
|
|
|
continue
|
|
|
}
|
|
|
- activeSubscriptions[subscription.message.ID] = struct{}{}
|
|
|
+ activeSubscriptions[subscription.message.ID] = subscription
|
|
|
log.WithField("subscription.id", subscription.message.ID).Debug("subscription added")
|
|
|
}
|
|
|
if err := stream.Send(subscription.message); err != nil {
|
|
@@ -282,12 +345,19 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|
|
}
|
|
|
|
|
|
// PublishLogs publishes log messages for a given subscription
|
|
|
-func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) error {
|
|
|
+func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err error) {
|
|
|
remote, err := ca.RemoteNode(stream.Context())
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+ var currentSubscription *subscription
|
|
|
+ defer func() {
|
|
|
+ if currentSubscription != nil {
|
|
|
+ currentSubscription.Done(remote.NodeID, err)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
for {
|
|
|
log, err := stream.Recv()
|
|
|
if err == io.EOF {
|
|
@@ -301,6 +371,17 @@ func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) error {
|
|
|
return grpc.Errorf(codes.InvalidArgument, "missing subscription ID")
|
|
|
}
|
|
|
|
|
|
+ if currentSubscription == nil {
|
|
|
+ currentSubscription = lb.getSubscription(log.SubscriptionID)
|
|
|
+ if currentSubscription == nil {
|
|
|
+ return grpc.Errorf(codes.NotFound, "unknown subscription ID")
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if log.SubscriptionID != currentSubscription.message.ID {
|
|
|
+ return grpc.Errorf(codes.InvalidArgument, "different subscription IDs in the same session")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Make sure logs are emitted using the right Node ID to avoid impersonation.
|
|
|
for _, msg := range log.Messages {
|
|
|
if msg.Context.NodeID != remote.NodeID {
|