123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427 |
- package logbroker
- import (
- "errors"
- "fmt"
- "io"
- "sync"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "github.com/Sirupsen/logrus"
- "github.com/docker/go-events"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/ca"
- "github.com/docker/swarmkit/identity"
- "github.com/docker/swarmkit/log"
- "github.com/docker/swarmkit/manager/state/store"
- "github.com/docker/swarmkit/watch"
- "golang.org/x/net/context"
- )
- var (
- errAlreadyRunning = errors.New("broker is already running")
- errNotRunning = errors.New("broker is not running")
- )
- type logMessage struct {
- *api.PublishLogsMessage
- completed bool
- err error
- }
- // LogBroker coordinates log subscriptions to services and tasks. Clients can
- // publish and subscribe to logs channels.
- //
- // Log subscriptions are pushed to the work nodes by creating log subscsription
- // tasks. As such, the LogBroker also acts as an orchestrator of these tasks.
- type LogBroker struct {
- mu sync.RWMutex
- logQueue *watch.Queue
- subscriptionQueue *watch.Queue
- registeredSubscriptions map[string]*subscription
- subscriptionsByNode map[string]map[*subscription]struct{}
- pctx context.Context
- cancelAll context.CancelFunc
- store *store.MemoryStore
- }
- // New initializes and returns a new LogBroker
- func New(store *store.MemoryStore) *LogBroker {
- return &LogBroker{
- store: store,
- }
- }
- // Run the log broker
- func (lb *LogBroker) Run(ctx context.Context) error {
- lb.mu.Lock()
- if lb.cancelAll != nil {
- lb.mu.Unlock()
- return errAlreadyRunning
- }
- lb.pctx, lb.cancelAll = context.WithCancel(ctx)
- lb.logQueue = watch.NewQueue()
- lb.subscriptionQueue = watch.NewQueue()
- lb.registeredSubscriptions = make(map[string]*subscription)
- lb.subscriptionsByNode = make(map[string]map[*subscription]struct{})
- lb.mu.Unlock()
- select {
- case <-lb.pctx.Done():
- return lb.pctx.Err()
- }
- }
- // Stop stops the log broker
- func (lb *LogBroker) Stop() error {
- lb.mu.Lock()
- defer lb.mu.Unlock()
- if lb.cancelAll == nil {
- return errNotRunning
- }
- lb.cancelAll()
- lb.cancelAll = nil
- lb.logQueue.Close()
- lb.subscriptionQueue.Close()
- return nil
- }
- func validateSelector(selector *api.LogSelector) error {
- if selector == nil {
- return grpc.Errorf(codes.InvalidArgument, "log selector must be provided")
- }
- if len(selector.ServiceIDs) == 0 && len(selector.TaskIDs) == 0 && len(selector.NodeIDs) == 0 {
- return grpc.Errorf(codes.InvalidArgument, "log selector must not be empty")
- }
- return nil
- }
- func (lb *LogBroker) newSubscription(selector *api.LogSelector, options *api.LogSubscriptionOptions) *subscription {
- lb.mu.RLock()
- defer lb.mu.RUnlock()
- subscription := newSubscription(lb.store, &api.SubscriptionMessage{
- ID: identity.NewID(),
- Selector: selector,
- Options: options,
- }, lb.subscriptionQueue)
- return subscription
- }
- func (lb *LogBroker) getSubscription(id string) *subscription {
- lb.mu.RLock()
- defer lb.mu.RUnlock()
- subscription, ok := lb.registeredSubscriptions[id]
- if !ok {
- return nil
- }
- return subscription
- }
- func (lb *LogBroker) registerSubscription(subscription *subscription) {
- lb.mu.Lock()
- defer lb.mu.Unlock()
- lb.registeredSubscriptions[subscription.message.ID] = subscription
- lb.subscriptionQueue.Publish(subscription)
- for _, node := range subscription.Nodes() {
- if _, ok := lb.subscriptionsByNode[node]; !ok {
- // Mark nodes that won't receive the message as done.
- subscription.Done(node, fmt.Errorf("node %s is not available", node))
- } else {
- // otherwise, add the subscription to the node's subscriptions list
- lb.subscriptionsByNode[node][subscription] = struct{}{}
- }
- }
- }
- func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
- lb.mu.Lock()
- defer lb.mu.Unlock()
- delete(lb.registeredSubscriptions, subscription.message.ID)
- // remove the subscription from all of the nodes
- for _, node := range subscription.Nodes() {
- // but only if a node exists
- if _, ok := lb.subscriptionsByNode[node]; ok {
- delete(lb.subscriptionsByNode[node], subscription)
- }
- }
- subscription.Close()
- lb.subscriptionQueue.Publish(subscription)
- }
- // watchSubscriptions grabs all current subscriptions and notifies of any
- // subscription change for this node.
- //
- // Subscriptions may fire multiple times and the caller has to protect against
- // dupes.
- func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan events.Event, func()) {
- lb.mu.RLock()
- defer lb.mu.RUnlock()
- // Watch for subscription changes for this node.
- ch, cancel := lb.subscriptionQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
- s := event.(*subscription)
- return s.Contains(nodeID)
- }))
- // Grab current subscriptions.
- var subscriptions []*subscription
- for _, s := range lb.registeredSubscriptions {
- if s.Contains(nodeID) {
- subscriptions = append(subscriptions, s)
- }
- }
- return subscriptions, ch, cancel
- }
- func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) {
- lb.mu.RLock()
- defer lb.mu.RUnlock()
- return lb.logQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
- publish := event.(*logMessage)
- return publish.SubscriptionID == id
- }))
- }
- func (lb *LogBroker) publish(log *api.PublishLogsMessage) {
- lb.mu.RLock()
- defer lb.mu.RUnlock()
- lb.logQueue.Publish(&logMessage{PublishLogsMessage: log})
- }
- // markDone wraps (*Subscription).Done() so that the removal of the sub from
- // the node's subscription list is possible
- func (lb *LogBroker) markDone(sub *subscription, nodeID string, err error) {
- lb.mu.Lock()
- defer lb.mu.Unlock()
- // remove the subscription from the node's subscription list, if it exists
- if _, ok := lb.subscriptionsByNode[nodeID]; ok {
- delete(lb.subscriptionsByNode[nodeID], sub)
- }
- // mark the sub as done
- sub.Done(nodeID, err)
- }
- // SubscribeLogs creates a log subscription and streams back logs
- func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api.Logs_SubscribeLogsServer) error {
- ctx := stream.Context()
- if err := validateSelector(request.Selector); err != nil {
- return err
- }
- subscription := lb.newSubscription(request.Selector, request.Options)
- subscription.Run(lb.pctx)
- defer subscription.Stop()
- log := log.G(ctx).WithFields(
- logrus.Fields{
- "method": "(*LogBroker).SubscribeLogs",
- "subscription.id": subscription.message.ID,
- },
- )
- log.Debug("subscribed")
- publishCh, publishCancel := lb.subscribe(subscription.message.ID)
- defer publishCancel()
- lb.registerSubscription(subscription)
- defer lb.unregisterSubscription(subscription)
- completed := subscription.Wait(ctx)
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-lb.pctx.Done():
- return lb.pctx.Err()
- case event := <-publishCh:
- publish := event.(*logMessage)
- if publish.completed {
- return publish.err
- }
- if err := stream.Send(&api.SubscribeLogsMessage{
- Messages: publish.Messages,
- }); err != nil {
- return err
- }
- case <-completed:
- completed = nil
- lb.logQueue.Publish(&logMessage{
- PublishLogsMessage: &api.PublishLogsMessage{
- SubscriptionID: subscription.message.ID,
- },
- completed: true,
- err: subscription.Err(),
- })
- }
- }
- }
- func (lb *LogBroker) nodeConnected(nodeID string) {
- lb.mu.Lock()
- defer lb.mu.Unlock()
- if _, ok := lb.subscriptionsByNode[nodeID]; !ok {
- lb.subscriptionsByNode[nodeID] = make(map[*subscription]struct{})
- }
- }
- func (lb *LogBroker) nodeDisconnected(nodeID string) {
- lb.mu.Lock()
- defer lb.mu.Unlock()
- for sub := range lb.subscriptionsByNode[nodeID] {
- sub.Done(nodeID, fmt.Errorf("node %s disconnected unexpectedly", nodeID))
- }
- delete(lb.subscriptionsByNode, nodeID)
- }
- // ListenSubscriptions returns a stream of matching subscriptions for the current node
- func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest, stream api.LogBroker_ListenSubscriptionsServer) error {
- remote, err := ca.RemoteNode(stream.Context())
- if err != nil {
- return err
- }
- lb.nodeConnected(remote.NodeID)
- defer lb.nodeDisconnected(remote.NodeID)
- log := log.G(stream.Context()).WithFields(
- logrus.Fields{
- "method": "(*LogBroker).ListenSubscriptions",
- "node": remote.NodeID,
- },
- )
- subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions(remote.NodeID)
- defer subscriptionCancel()
- log.Debug("node registered")
- activeSubscriptions := make(map[string]*subscription)
- // Start by sending down all active subscriptions.
- for _, subscription := range subscriptions {
- select {
- case <-stream.Context().Done():
- return stream.Context().Err()
- case <-lb.pctx.Done():
- return nil
- default:
- }
- if err := stream.Send(subscription.message); err != nil {
- log.Error(err)
- return err
- }
- activeSubscriptions[subscription.message.ID] = subscription
- }
- // Send down new subscriptions.
- for {
- select {
- case v := <-subscriptionCh:
- subscription := v.(*subscription)
- if subscription.Closed() {
- delete(activeSubscriptions, subscription.message.ID)
- } else {
- // Avoid sending down the same subscription multiple times
- if _, ok := activeSubscriptions[subscription.message.ID]; ok {
- continue
- }
- activeSubscriptions[subscription.message.ID] = subscription
- }
- if err := stream.Send(subscription.message); err != nil {
- log.Error(err)
- return err
- }
- case <-stream.Context().Done():
- return stream.Context().Err()
- case <-lb.pctx.Done():
- return nil
- }
- }
- }
- // PublishLogs publishes log messages for a given subscription
- func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err error) {
- remote, err := ca.RemoteNode(stream.Context())
- if err != nil {
- return err
- }
- var currentSubscription *subscription
- defer func() {
- if currentSubscription != nil {
- lb.markDone(currentSubscription, remote.NodeID, err)
- }
- }()
- for {
- logMsg, err := stream.Recv()
- if err == io.EOF {
- return stream.SendAndClose(&api.PublishLogsResponse{})
- }
- if err != nil {
- return err
- }
- if logMsg.SubscriptionID == "" {
- return grpc.Errorf(codes.InvalidArgument, "missing subscription ID")
- }
- if currentSubscription == nil {
- currentSubscription = lb.getSubscription(logMsg.SubscriptionID)
- if currentSubscription == nil {
- return grpc.Errorf(codes.NotFound, "unknown subscription ID")
- }
- } else {
- if logMsg.SubscriptionID != currentSubscription.message.ID {
- return grpc.Errorf(codes.InvalidArgument, "different subscription IDs in the same session")
- }
- }
- // if we have a close message, close out the subscription
- if logMsg.Close {
- // Mark done and then set to nil so if we error after this point,
- // we don't try to close again in the defer
- lb.markDone(currentSubscription, remote.NodeID, err)
- currentSubscription = nil
- return nil
- }
- // Make sure logs are emitted using the right Node ID to avoid impersonation.
- for _, msg := range logMsg.Messages {
- if msg.Context.NodeID != remote.NodeID {
- return grpc.Errorf(codes.PermissionDenied, "invalid NodeID: expected=%s;received=%s", remote.NodeID, msg.Context.NodeID)
- }
- }
- lb.publish(logMsg)
- }
- }
|