broker.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. package logbroker
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "google.golang.org/grpc"
  8. "google.golang.org/grpc/codes"
  9. "github.com/Sirupsen/logrus"
  10. "github.com/docker/go-events"
  11. "github.com/docker/swarmkit/api"
  12. "github.com/docker/swarmkit/ca"
  13. "github.com/docker/swarmkit/identity"
  14. "github.com/docker/swarmkit/log"
  15. "github.com/docker/swarmkit/manager/state/store"
  16. "github.com/docker/swarmkit/watch"
  17. "golang.org/x/net/context"
  18. )
  19. var (
  20. errAlreadyRunning = errors.New("broker is already running")
  21. errNotRunning = errors.New("broker is not running")
  22. )
  23. type logMessage struct {
  24. *api.PublishLogsMessage
  25. completed bool
  26. err error
  27. }
  28. // LogBroker coordinates log subscriptions to services and tasks. Clients can
  29. // publish and subscribe to logs channels.
  30. //
  31. // Log subscriptions are pushed to the work nodes by creating log subscsription
  32. // tasks. As such, the LogBroker also acts as an orchestrator of these tasks.
  33. type LogBroker struct {
  34. mu sync.RWMutex
  35. logQueue *watch.Queue
  36. subscriptionQueue *watch.Queue
  37. registeredSubscriptions map[string]*subscription
  38. subscriptionsByNode map[string]map[*subscription]struct{}
  39. pctx context.Context
  40. cancelAll context.CancelFunc
  41. store *store.MemoryStore
  42. }
  43. // New initializes and returns a new LogBroker
  44. func New(store *store.MemoryStore) *LogBroker {
  45. return &LogBroker{
  46. store: store,
  47. }
  48. }
  49. // Run the log broker
  50. func (lb *LogBroker) Run(ctx context.Context) error {
  51. lb.mu.Lock()
  52. if lb.cancelAll != nil {
  53. lb.mu.Unlock()
  54. return errAlreadyRunning
  55. }
  56. lb.pctx, lb.cancelAll = context.WithCancel(ctx)
  57. lb.logQueue = watch.NewQueue()
  58. lb.subscriptionQueue = watch.NewQueue()
  59. lb.registeredSubscriptions = make(map[string]*subscription)
  60. lb.subscriptionsByNode = make(map[string]map[*subscription]struct{})
  61. lb.mu.Unlock()
  62. select {
  63. case <-lb.pctx.Done():
  64. return lb.pctx.Err()
  65. }
  66. }
  67. // Stop stops the log broker
  68. func (lb *LogBroker) Stop() error {
  69. lb.mu.Lock()
  70. defer lb.mu.Unlock()
  71. if lb.cancelAll == nil {
  72. return errNotRunning
  73. }
  74. lb.cancelAll()
  75. lb.cancelAll = nil
  76. lb.logQueue.Close()
  77. lb.subscriptionQueue.Close()
  78. return nil
  79. }
  80. func validateSelector(selector *api.LogSelector) error {
  81. if selector == nil {
  82. return grpc.Errorf(codes.InvalidArgument, "log selector must be provided")
  83. }
  84. if len(selector.ServiceIDs) == 0 && len(selector.TaskIDs) == 0 && len(selector.NodeIDs) == 0 {
  85. return grpc.Errorf(codes.InvalidArgument, "log selector must not be empty")
  86. }
  87. return nil
  88. }
  89. func (lb *LogBroker) newSubscription(selector *api.LogSelector, options *api.LogSubscriptionOptions) *subscription {
  90. lb.mu.RLock()
  91. defer lb.mu.RUnlock()
  92. subscription := newSubscription(lb.store, &api.SubscriptionMessage{
  93. ID: identity.NewID(),
  94. Selector: selector,
  95. Options: options,
  96. }, lb.subscriptionQueue)
  97. return subscription
  98. }
  99. func (lb *LogBroker) getSubscription(id string) *subscription {
  100. lb.mu.RLock()
  101. defer lb.mu.RUnlock()
  102. subscription, ok := lb.registeredSubscriptions[id]
  103. if !ok {
  104. return nil
  105. }
  106. return subscription
  107. }
  108. func (lb *LogBroker) registerSubscription(subscription *subscription) {
  109. lb.mu.Lock()
  110. defer lb.mu.Unlock()
  111. lb.registeredSubscriptions[subscription.message.ID] = subscription
  112. lb.subscriptionQueue.Publish(subscription)
  113. for _, node := range subscription.Nodes() {
  114. if _, ok := lb.subscriptionsByNode[node]; !ok {
  115. // Mark nodes that won't receive the message as done.
  116. subscription.Done(node, fmt.Errorf("node %s is not available", node))
  117. } else {
  118. // otherwise, add the subscription to the node's subscriptions list
  119. lb.subscriptionsByNode[node][subscription] = struct{}{}
  120. }
  121. }
  122. }
  123. func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
  124. lb.mu.Lock()
  125. defer lb.mu.Unlock()
  126. delete(lb.registeredSubscriptions, subscription.message.ID)
  127. // remove the subscription from all of the nodes
  128. for _, node := range subscription.Nodes() {
  129. // but only if a node exists
  130. if _, ok := lb.subscriptionsByNode[node]; ok {
  131. delete(lb.subscriptionsByNode[node], subscription)
  132. }
  133. }
  134. subscription.Close()
  135. lb.subscriptionQueue.Publish(subscription)
  136. }
  137. // watchSubscriptions grabs all current subscriptions and notifies of any
  138. // subscription change for this node.
  139. //
  140. // Subscriptions may fire multiple times and the caller has to protect against
  141. // dupes.
  142. func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan events.Event, func()) {
  143. lb.mu.RLock()
  144. defer lb.mu.RUnlock()
  145. // Watch for subscription changes for this node.
  146. ch, cancel := lb.subscriptionQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
  147. s := event.(*subscription)
  148. return s.Contains(nodeID)
  149. }))
  150. // Grab current subscriptions.
  151. var subscriptions []*subscription
  152. for _, s := range lb.registeredSubscriptions {
  153. if s.Contains(nodeID) {
  154. subscriptions = append(subscriptions, s)
  155. }
  156. }
  157. return subscriptions, ch, cancel
  158. }
  159. func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) {
  160. lb.mu.RLock()
  161. defer lb.mu.RUnlock()
  162. return lb.logQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
  163. publish := event.(*logMessage)
  164. return publish.SubscriptionID == id
  165. }))
  166. }
  167. func (lb *LogBroker) publish(log *api.PublishLogsMessage) {
  168. lb.mu.RLock()
  169. defer lb.mu.RUnlock()
  170. lb.logQueue.Publish(&logMessage{PublishLogsMessage: log})
  171. }
  172. // markDone wraps (*Subscription).Done() so that the removal of the sub from
  173. // the node's subscription list is possible
  174. func (lb *LogBroker) markDone(sub *subscription, nodeID string, err error) {
  175. lb.mu.Lock()
  176. defer lb.mu.Unlock()
  177. // remove the subscription from the node's subscription list, if it exists
  178. if _, ok := lb.subscriptionsByNode[nodeID]; ok {
  179. delete(lb.subscriptionsByNode[nodeID], sub)
  180. }
  181. // mark the sub as done
  182. sub.Done(nodeID, err)
  183. }
  184. // SubscribeLogs creates a log subscription and streams back logs
  185. func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api.Logs_SubscribeLogsServer) error {
  186. ctx := stream.Context()
  187. if err := validateSelector(request.Selector); err != nil {
  188. return err
  189. }
  190. subscription := lb.newSubscription(request.Selector, request.Options)
  191. subscription.Run(lb.pctx)
  192. defer subscription.Stop()
  193. log := log.G(ctx).WithFields(
  194. logrus.Fields{
  195. "method": "(*LogBroker).SubscribeLogs",
  196. "subscription.id": subscription.message.ID,
  197. },
  198. )
  199. log.Debug("subscribed")
  200. publishCh, publishCancel := lb.subscribe(subscription.message.ID)
  201. defer publishCancel()
  202. lb.registerSubscription(subscription)
  203. defer lb.unregisterSubscription(subscription)
  204. completed := subscription.Wait(ctx)
  205. for {
  206. select {
  207. case <-ctx.Done():
  208. return ctx.Err()
  209. case <-lb.pctx.Done():
  210. return lb.pctx.Err()
  211. case event := <-publishCh:
  212. publish := event.(*logMessage)
  213. if publish.completed {
  214. return publish.err
  215. }
  216. if err := stream.Send(&api.SubscribeLogsMessage{
  217. Messages: publish.Messages,
  218. }); err != nil {
  219. return err
  220. }
  221. case <-completed:
  222. completed = nil
  223. lb.logQueue.Publish(&logMessage{
  224. PublishLogsMessage: &api.PublishLogsMessage{
  225. SubscriptionID: subscription.message.ID,
  226. },
  227. completed: true,
  228. err: subscription.Err(),
  229. })
  230. }
  231. }
  232. }
  233. func (lb *LogBroker) nodeConnected(nodeID string) {
  234. lb.mu.Lock()
  235. defer lb.mu.Unlock()
  236. if _, ok := lb.subscriptionsByNode[nodeID]; !ok {
  237. lb.subscriptionsByNode[nodeID] = make(map[*subscription]struct{})
  238. }
  239. }
  240. func (lb *LogBroker) nodeDisconnected(nodeID string) {
  241. lb.mu.Lock()
  242. defer lb.mu.Unlock()
  243. for sub := range lb.subscriptionsByNode[nodeID] {
  244. sub.Done(nodeID, fmt.Errorf("node %s disconnected unexpectedly", nodeID))
  245. }
  246. delete(lb.subscriptionsByNode, nodeID)
  247. }
  248. // ListenSubscriptions returns a stream of matching subscriptions for the current node
  249. func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest, stream api.LogBroker_ListenSubscriptionsServer) error {
  250. remote, err := ca.RemoteNode(stream.Context())
  251. if err != nil {
  252. return err
  253. }
  254. lb.nodeConnected(remote.NodeID)
  255. defer lb.nodeDisconnected(remote.NodeID)
  256. log := log.G(stream.Context()).WithFields(
  257. logrus.Fields{
  258. "method": "(*LogBroker).ListenSubscriptions",
  259. "node": remote.NodeID,
  260. },
  261. )
  262. subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions(remote.NodeID)
  263. defer subscriptionCancel()
  264. log.Debug("node registered")
  265. activeSubscriptions := make(map[string]*subscription)
  266. // Start by sending down all active subscriptions.
  267. for _, subscription := range subscriptions {
  268. select {
  269. case <-stream.Context().Done():
  270. return stream.Context().Err()
  271. case <-lb.pctx.Done():
  272. return nil
  273. default:
  274. }
  275. if err := stream.Send(subscription.message); err != nil {
  276. log.Error(err)
  277. return err
  278. }
  279. activeSubscriptions[subscription.message.ID] = subscription
  280. }
  281. // Send down new subscriptions.
  282. for {
  283. select {
  284. case v := <-subscriptionCh:
  285. subscription := v.(*subscription)
  286. if subscription.Closed() {
  287. delete(activeSubscriptions, subscription.message.ID)
  288. } else {
  289. // Avoid sending down the same subscription multiple times
  290. if _, ok := activeSubscriptions[subscription.message.ID]; ok {
  291. continue
  292. }
  293. activeSubscriptions[subscription.message.ID] = subscription
  294. }
  295. if err := stream.Send(subscription.message); err != nil {
  296. log.Error(err)
  297. return err
  298. }
  299. case <-stream.Context().Done():
  300. return stream.Context().Err()
  301. case <-lb.pctx.Done():
  302. return nil
  303. }
  304. }
  305. }
  306. // PublishLogs publishes log messages for a given subscription
  307. func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err error) {
  308. remote, err := ca.RemoteNode(stream.Context())
  309. if err != nil {
  310. return err
  311. }
  312. var currentSubscription *subscription
  313. defer func() {
  314. if currentSubscription != nil {
  315. lb.markDone(currentSubscription, remote.NodeID, err)
  316. }
  317. }()
  318. for {
  319. logMsg, err := stream.Recv()
  320. if err == io.EOF {
  321. return stream.SendAndClose(&api.PublishLogsResponse{})
  322. }
  323. if err != nil {
  324. return err
  325. }
  326. if logMsg.SubscriptionID == "" {
  327. return grpc.Errorf(codes.InvalidArgument, "missing subscription ID")
  328. }
  329. if currentSubscription == nil {
  330. currentSubscription = lb.getSubscription(logMsg.SubscriptionID)
  331. if currentSubscription == nil {
  332. return grpc.Errorf(codes.NotFound, "unknown subscription ID")
  333. }
  334. } else {
  335. if logMsg.SubscriptionID != currentSubscription.message.ID {
  336. return grpc.Errorf(codes.InvalidArgument, "different subscription IDs in the same session")
  337. }
  338. }
  339. // if we have a close message, close out the subscription
  340. if logMsg.Close {
  341. // Mark done and then set to nil so if we error after this point,
  342. // we don't try to close again in the defer
  343. lb.markDone(currentSubscription, remote.NodeID, err)
  344. currentSubscription = nil
  345. return nil
  346. }
  347. // Make sure logs are emitted using the right Node ID to avoid impersonation.
  348. for _, msg := range logMsg.Messages {
  349. if msg.Context.NodeID != remote.NodeID {
  350. return grpc.Errorf(codes.PermissionDenied, "invalid NodeID: expected=%s;received=%s", remote.NodeID, msg.Context.NodeID)
  351. }
  352. }
  353. lb.publish(logMsg)
  354. }
  355. }