broker.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. package logbroker
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "github.com/docker/go-events"
  8. "github.com/docker/swarmkit/api"
  9. "github.com/docker/swarmkit/ca"
  10. "github.com/docker/swarmkit/identity"
  11. "github.com/docker/swarmkit/log"
  12. "github.com/docker/swarmkit/manager/state/store"
  13. "github.com/docker/swarmkit/watch"
  14. "github.com/sirupsen/logrus"
  15. "golang.org/x/net/context"
  16. "google.golang.org/grpc/codes"
  17. "google.golang.org/grpc/status"
  18. )
  19. var (
  20. errAlreadyRunning = errors.New("broker is already running")
  21. errNotRunning = errors.New("broker is not running")
  22. )
  23. type logMessage struct {
  24. *api.PublishLogsMessage
  25. completed bool
  26. err error
  27. }
  28. // LogBroker coordinates log subscriptions to services and tasks. Clients can
  29. // publish and subscribe to logs channels.
  30. //
  31. // Log subscriptions are pushed to the work nodes by creating log subscsription
  32. // tasks. As such, the LogBroker also acts as an orchestrator of these tasks.
  33. type LogBroker struct {
  34. mu sync.RWMutex
  35. logQueue *watch.Queue
  36. subscriptionQueue *watch.Queue
  37. registeredSubscriptions map[string]*subscription
  38. subscriptionsByNode map[string]map[*subscription]struct{}
  39. pctx context.Context
  40. cancelAll context.CancelFunc
  41. store *store.MemoryStore
  42. }
  43. // New initializes and returns a new LogBroker
  44. func New(store *store.MemoryStore) *LogBroker {
  45. return &LogBroker{
  46. store: store,
  47. }
  48. }
  49. // Start starts the log broker
  50. func (lb *LogBroker) Start(ctx context.Context) error {
  51. lb.mu.Lock()
  52. defer lb.mu.Unlock()
  53. if lb.cancelAll != nil {
  54. return errAlreadyRunning
  55. }
  56. lb.pctx, lb.cancelAll = context.WithCancel(ctx)
  57. lb.logQueue = watch.NewQueue()
  58. lb.subscriptionQueue = watch.NewQueue()
  59. lb.registeredSubscriptions = make(map[string]*subscription)
  60. lb.subscriptionsByNode = make(map[string]map[*subscription]struct{})
  61. return nil
  62. }
  63. // Stop stops the log broker
  64. func (lb *LogBroker) Stop() error {
  65. lb.mu.Lock()
  66. defer lb.mu.Unlock()
  67. if lb.cancelAll == nil {
  68. return errNotRunning
  69. }
  70. lb.cancelAll()
  71. lb.cancelAll = nil
  72. lb.logQueue.Close()
  73. lb.subscriptionQueue.Close()
  74. return nil
  75. }
  76. func validateSelector(selector *api.LogSelector) error {
  77. if selector == nil {
  78. return status.Errorf(codes.InvalidArgument, "log selector must be provided")
  79. }
  80. if len(selector.ServiceIDs) == 0 && len(selector.TaskIDs) == 0 && len(selector.NodeIDs) == 0 {
  81. return status.Errorf(codes.InvalidArgument, "log selector must not be empty")
  82. }
  83. return nil
  84. }
  85. func (lb *LogBroker) newSubscription(selector *api.LogSelector, options *api.LogSubscriptionOptions) *subscription {
  86. lb.mu.RLock()
  87. defer lb.mu.RUnlock()
  88. subscription := newSubscription(lb.store, &api.SubscriptionMessage{
  89. ID: identity.NewID(),
  90. Selector: selector,
  91. Options: options,
  92. }, lb.subscriptionQueue)
  93. return subscription
  94. }
  95. func (lb *LogBroker) getSubscription(id string) *subscription {
  96. lb.mu.RLock()
  97. defer lb.mu.RUnlock()
  98. subscription, ok := lb.registeredSubscriptions[id]
  99. if !ok {
  100. return nil
  101. }
  102. return subscription
  103. }
  104. func (lb *LogBroker) registerSubscription(subscription *subscription) {
  105. lb.mu.Lock()
  106. defer lb.mu.Unlock()
  107. lb.registeredSubscriptions[subscription.message.ID] = subscription
  108. lb.subscriptionQueue.Publish(subscription)
  109. for _, node := range subscription.Nodes() {
  110. if _, ok := lb.subscriptionsByNode[node]; !ok {
  111. // Mark nodes that won't receive the message as done.
  112. subscription.Done(node, fmt.Errorf("node %s is not available", node))
  113. } else {
  114. // otherwise, add the subscription to the node's subscriptions list
  115. lb.subscriptionsByNode[node][subscription] = struct{}{}
  116. }
  117. }
  118. }
  119. func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
  120. lb.mu.Lock()
  121. defer lb.mu.Unlock()
  122. delete(lb.registeredSubscriptions, subscription.message.ID)
  123. // remove the subscription from all of the nodes
  124. for _, node := range subscription.Nodes() {
  125. // but only if a node exists
  126. if _, ok := lb.subscriptionsByNode[node]; ok {
  127. delete(lb.subscriptionsByNode[node], subscription)
  128. }
  129. }
  130. subscription.Close()
  131. lb.subscriptionQueue.Publish(subscription)
  132. }
  133. // watchSubscriptions grabs all current subscriptions and notifies of any
  134. // subscription change for this node.
  135. //
  136. // Subscriptions may fire multiple times and the caller has to protect against
  137. // dupes.
  138. func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan events.Event, func()) {
  139. lb.mu.RLock()
  140. defer lb.mu.RUnlock()
  141. // Watch for subscription changes for this node.
  142. ch, cancel := lb.subscriptionQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
  143. s := event.(*subscription)
  144. return s.Contains(nodeID)
  145. }))
  146. // Grab current subscriptions.
  147. var subscriptions []*subscription
  148. for _, s := range lb.registeredSubscriptions {
  149. if s.Contains(nodeID) {
  150. subscriptions = append(subscriptions, s)
  151. }
  152. }
  153. return subscriptions, ch, cancel
  154. }
  155. func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) {
  156. lb.mu.RLock()
  157. defer lb.mu.RUnlock()
  158. return lb.logQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
  159. publish := event.(*logMessage)
  160. return publish.SubscriptionID == id
  161. }))
  162. }
  163. func (lb *LogBroker) publish(log *api.PublishLogsMessage) {
  164. lb.mu.RLock()
  165. defer lb.mu.RUnlock()
  166. lb.logQueue.Publish(&logMessage{PublishLogsMessage: log})
  167. }
  168. // markDone wraps (*Subscription).Done() so that the removal of the sub from
  169. // the node's subscription list is possible
  170. func (lb *LogBroker) markDone(sub *subscription, nodeID string, err error) {
  171. lb.mu.Lock()
  172. defer lb.mu.Unlock()
  173. // remove the subscription from the node's subscription list, if it exists
  174. if _, ok := lb.subscriptionsByNode[nodeID]; ok {
  175. delete(lb.subscriptionsByNode[nodeID], sub)
  176. }
  177. // mark the sub as done
  178. sub.Done(nodeID, err)
  179. }
  180. // SubscribeLogs creates a log subscription and streams back logs
  181. func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api.Logs_SubscribeLogsServer) error {
  182. ctx := stream.Context()
  183. if err := validateSelector(request.Selector); err != nil {
  184. return err
  185. }
  186. lb.mu.Lock()
  187. pctx := lb.pctx
  188. lb.mu.Unlock()
  189. if pctx == nil {
  190. return errNotRunning
  191. }
  192. subscription := lb.newSubscription(request.Selector, request.Options)
  193. subscription.Run(pctx)
  194. defer subscription.Stop()
  195. log := log.G(ctx).WithFields(
  196. logrus.Fields{
  197. "method": "(*LogBroker).SubscribeLogs",
  198. "subscription.id": subscription.message.ID,
  199. },
  200. )
  201. log.Debug("subscribed")
  202. publishCh, publishCancel := lb.subscribe(subscription.message.ID)
  203. defer publishCancel()
  204. lb.registerSubscription(subscription)
  205. defer lb.unregisterSubscription(subscription)
  206. completed := subscription.Wait(ctx)
  207. for {
  208. select {
  209. case <-ctx.Done():
  210. return ctx.Err()
  211. case <-pctx.Done():
  212. return pctx.Err()
  213. case event := <-publishCh:
  214. publish := event.(*logMessage)
  215. if publish.completed {
  216. return publish.err
  217. }
  218. if err := stream.Send(&api.SubscribeLogsMessage{
  219. Messages: publish.Messages,
  220. }); err != nil {
  221. return err
  222. }
  223. case <-completed:
  224. completed = nil
  225. lb.logQueue.Publish(&logMessage{
  226. PublishLogsMessage: &api.PublishLogsMessage{
  227. SubscriptionID: subscription.message.ID,
  228. },
  229. completed: true,
  230. err: subscription.Err(),
  231. })
  232. }
  233. }
  234. }
  235. func (lb *LogBroker) nodeConnected(nodeID string) {
  236. lb.mu.Lock()
  237. defer lb.mu.Unlock()
  238. if _, ok := lb.subscriptionsByNode[nodeID]; !ok {
  239. lb.subscriptionsByNode[nodeID] = make(map[*subscription]struct{})
  240. }
  241. }
  242. func (lb *LogBroker) nodeDisconnected(nodeID string) {
  243. lb.mu.Lock()
  244. defer lb.mu.Unlock()
  245. for sub := range lb.subscriptionsByNode[nodeID] {
  246. sub.Done(nodeID, fmt.Errorf("node %s disconnected unexpectedly", nodeID))
  247. }
  248. delete(lb.subscriptionsByNode, nodeID)
  249. }
  250. // ListenSubscriptions returns a stream of matching subscriptions for the current node
  251. func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest, stream api.LogBroker_ListenSubscriptionsServer) error {
  252. remote, err := ca.RemoteNode(stream.Context())
  253. if err != nil {
  254. return err
  255. }
  256. lb.mu.Lock()
  257. pctx := lb.pctx
  258. lb.mu.Unlock()
  259. if pctx == nil {
  260. return errNotRunning
  261. }
  262. lb.nodeConnected(remote.NodeID)
  263. defer lb.nodeDisconnected(remote.NodeID)
  264. log := log.G(stream.Context()).WithFields(
  265. logrus.Fields{
  266. "method": "(*LogBroker).ListenSubscriptions",
  267. "node": remote.NodeID,
  268. },
  269. )
  270. subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions(remote.NodeID)
  271. defer subscriptionCancel()
  272. log.Debug("node registered")
  273. activeSubscriptions := make(map[string]*subscription)
  274. // Start by sending down all active subscriptions.
  275. for _, subscription := range subscriptions {
  276. select {
  277. case <-stream.Context().Done():
  278. return stream.Context().Err()
  279. case <-pctx.Done():
  280. return nil
  281. default:
  282. }
  283. if err := stream.Send(subscription.message); err != nil {
  284. log.Error(err)
  285. return err
  286. }
  287. activeSubscriptions[subscription.message.ID] = subscription
  288. }
  289. // Send down new subscriptions.
  290. for {
  291. select {
  292. case v := <-subscriptionCh:
  293. subscription := v.(*subscription)
  294. if subscription.Closed() {
  295. delete(activeSubscriptions, subscription.message.ID)
  296. } else {
  297. // Avoid sending down the same subscription multiple times
  298. if _, ok := activeSubscriptions[subscription.message.ID]; ok {
  299. continue
  300. }
  301. activeSubscriptions[subscription.message.ID] = subscription
  302. }
  303. if err := stream.Send(subscription.message); err != nil {
  304. log.Error(err)
  305. return err
  306. }
  307. case <-stream.Context().Done():
  308. return stream.Context().Err()
  309. case <-pctx.Done():
  310. return nil
  311. }
  312. }
  313. }
  314. // PublishLogs publishes log messages for a given subscription
  315. func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err error) {
  316. remote, err := ca.RemoteNode(stream.Context())
  317. if err != nil {
  318. return err
  319. }
  320. var currentSubscription *subscription
  321. defer func() {
  322. if currentSubscription != nil {
  323. lb.markDone(currentSubscription, remote.NodeID, err)
  324. }
  325. }()
  326. for {
  327. logMsg, err := stream.Recv()
  328. if err == io.EOF {
  329. return stream.SendAndClose(&api.PublishLogsResponse{})
  330. }
  331. if err != nil {
  332. return err
  333. }
  334. if logMsg.SubscriptionID == "" {
  335. return status.Errorf(codes.InvalidArgument, "missing subscription ID")
  336. }
  337. if currentSubscription == nil {
  338. currentSubscription = lb.getSubscription(logMsg.SubscriptionID)
  339. if currentSubscription == nil {
  340. return status.Errorf(codes.NotFound, "unknown subscription ID")
  341. }
  342. } else {
  343. if logMsg.SubscriptionID != currentSubscription.message.ID {
  344. return status.Errorf(codes.InvalidArgument, "different subscription IDs in the same session")
  345. }
  346. }
  347. // if we have a close message, close out the subscription
  348. if logMsg.Close {
  349. // Mark done and then set to nil so if we error after this point,
  350. // we don't try to close again in the defer
  351. lb.markDone(currentSubscription, remote.NodeID, err)
  352. currentSubscription = nil
  353. return nil
  354. }
  355. // Make sure logs are emitted using the right Node ID to avoid impersonation.
  356. for _, msg := range logMsg.Messages {
  357. if msg.Context.NodeID != remote.NodeID {
  358. return status.Errorf(codes.PermissionDenied, "invalid NodeID: expected=%s;received=%s", remote.NodeID, msg.Context.NodeID)
  359. }
  360. }
  361. lb.publish(logMsg)
  362. }
  363. }