dispatcher.go 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426
  1. package dispatcher
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/transport"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/go-events"
  13. "github.com/docker/swarmkit/api"
  14. "github.com/docker/swarmkit/api/equality"
  15. "github.com/docker/swarmkit/ca"
  16. "github.com/docker/swarmkit/log"
  17. "github.com/docker/swarmkit/manager/state"
  18. "github.com/docker/swarmkit/manager/state/store"
  19. "github.com/docker/swarmkit/remotes"
  20. "github.com/docker/swarmkit/watch"
  21. gogotypes "github.com/gogo/protobuf/types"
  22. "github.com/pkg/errors"
  23. "golang.org/x/net/context"
  24. )
  25. const (
  26. // DefaultHeartBeatPeriod is used for setting default value in cluster config
  27. // and in case if cluster config is missing.
  28. DefaultHeartBeatPeriod = 5 * time.Second
  29. defaultHeartBeatEpsilon = 500 * time.Millisecond
  30. defaultGracePeriodMultiplier = 3
  31. defaultRateLimitPeriod = 8 * time.Second
  32. // maxBatchItems is the threshold of queued writes that should
  33. // trigger an actual transaction to commit them to the shared store.
  34. maxBatchItems = 10000
  35. // maxBatchInterval needs to strike a balance between keeping
  36. // latency low, and realizing opportunities to combine many writes
  37. // into a single transaction. A fraction of a second feels about
  38. // right.
  39. maxBatchInterval = 100 * time.Millisecond
  40. modificationBatchLimit = 100
  41. batchingWaitTime = 100 * time.Millisecond
  42. // defaultNodeDownPeriod specifies the default time period we
  43. // wait before moving tasks assigned to down nodes to ORPHANED
  44. // state.
  45. defaultNodeDownPeriod = 24 * time.Hour
  46. )
  47. var (
  48. // ErrNodeAlreadyRegistered returned if node with same ID was already
  49. // registered with this dispatcher.
  50. ErrNodeAlreadyRegistered = errors.New("node already registered")
  51. // ErrNodeNotRegistered returned if node with such ID wasn't registered
  52. // with this dispatcher.
  53. ErrNodeNotRegistered = errors.New("node not registered")
  54. // ErrSessionInvalid returned when the session in use is no longer valid.
  55. // The node should re-register and start a new session.
  56. ErrSessionInvalid = errors.New("session invalid")
  57. // ErrNodeNotFound returned when the Node doesn't exist in raft.
  58. ErrNodeNotFound = errors.New("node not found")
  59. )
  60. // Config is configuration for Dispatcher. For default you should use
  61. // DefaultConfig.
  62. type Config struct {
  63. HeartbeatPeriod time.Duration
  64. HeartbeatEpsilon time.Duration
  65. // RateLimitPeriod specifies how often node with same ID can try to register
  66. // new session.
  67. RateLimitPeriod time.Duration
  68. GracePeriodMultiplier int
  69. }
  70. // DefaultConfig returns default config for Dispatcher.
  71. func DefaultConfig() *Config {
  72. return &Config{
  73. HeartbeatPeriod: DefaultHeartBeatPeriod,
  74. HeartbeatEpsilon: defaultHeartBeatEpsilon,
  75. RateLimitPeriod: defaultRateLimitPeriod,
  76. GracePeriodMultiplier: defaultGracePeriodMultiplier,
  77. }
  78. }
  79. // Cluster is interface which represent raft cluster. manager/state/raft.Node
  80. // is implements it. This interface needed only for easier unit-testing.
  81. type Cluster interface {
  82. GetMemberlist() map[uint64]*api.RaftMember
  83. SubscribePeers() (chan events.Event, func())
  84. MemoryStore() *store.MemoryStore
  85. }
  86. // nodeUpdate provides a new status and/or description to apply to a node
  87. // object.
  88. type nodeUpdate struct {
  89. status *api.NodeStatus
  90. description *api.NodeDescription
  91. }
  92. // Dispatcher is responsible for dispatching tasks and tracking agent health.
  93. type Dispatcher struct {
  94. mu sync.Mutex
  95. wg sync.WaitGroup
  96. nodes *nodeStore
  97. store *store.MemoryStore
  98. mgrQueue *watch.Queue
  99. lastSeenManagers []*api.WeightedPeer
  100. networkBootstrapKeys []*api.EncryptionKey
  101. keyMgrQueue *watch.Queue
  102. config *Config
  103. cluster Cluster
  104. ctx context.Context
  105. cancel context.CancelFunc
  106. taskUpdates map[string]*api.TaskStatus // indexed by task ID
  107. taskUpdatesLock sync.Mutex
  108. nodeUpdates map[string]nodeUpdate // indexed by node ID
  109. nodeUpdatesLock sync.Mutex
  110. downNodes *nodeStore
  111. processUpdatesTrigger chan struct{}
  112. // for waiting for the next task/node batch update
  113. processUpdatesLock sync.Mutex
  114. processUpdatesCond *sync.Cond
  115. }
  116. // New returns Dispatcher with cluster interface(usually raft.Node).
  117. func New(cluster Cluster, c *Config) *Dispatcher {
  118. d := &Dispatcher{
  119. nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.RateLimitPeriod),
  120. downNodes: newNodeStore(defaultNodeDownPeriod, 0, 1, 0),
  121. store: cluster.MemoryStore(),
  122. cluster: cluster,
  123. taskUpdates: make(map[string]*api.TaskStatus),
  124. nodeUpdates: make(map[string]nodeUpdate),
  125. processUpdatesTrigger: make(chan struct{}, 1),
  126. config: c,
  127. }
  128. d.processUpdatesCond = sync.NewCond(&d.processUpdatesLock)
  129. return d
  130. }
  131. func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
  132. members := cluster.GetMemberlist()
  133. var mgrs []*api.WeightedPeer
  134. for _, m := range members {
  135. mgrs = append(mgrs, &api.WeightedPeer{
  136. Peer: &api.Peer{
  137. NodeID: m.NodeID,
  138. Addr: m.Addr,
  139. },
  140. // TODO(stevvooe): Calculate weight of manager selection based on
  141. // cluster-level observations, such as number of connections and
  142. // load.
  143. Weight: remotes.DefaultObservationWeight,
  144. })
  145. }
  146. return mgrs
  147. }
  148. // Run runs dispatcher tasks which should be run on leader dispatcher.
  149. // Dispatcher can be stopped with cancelling ctx or calling Stop().
  150. func (d *Dispatcher) Run(ctx context.Context) error {
  151. d.mu.Lock()
  152. if d.isRunning() {
  153. d.mu.Unlock()
  154. return errors.New("dispatcher is already running")
  155. }
  156. ctx = log.WithModule(ctx, "dispatcher")
  157. if err := d.markNodesUnknown(ctx); err != nil {
  158. log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err)
  159. }
  160. configWatcher, cancel, err := store.ViewAndWatch(
  161. d.store,
  162. func(readTx store.ReadTx) error {
  163. clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
  164. if err != nil {
  165. return err
  166. }
  167. if err == nil && len(clusters) == 1 {
  168. heartbeatPeriod, err := gogotypes.DurationFromProto(clusters[0].Spec.Dispatcher.HeartbeatPeriod)
  169. if err == nil && heartbeatPeriod > 0 {
  170. d.config.HeartbeatPeriod = heartbeatPeriod
  171. }
  172. if clusters[0].NetworkBootstrapKeys != nil {
  173. d.networkBootstrapKeys = clusters[0].NetworkBootstrapKeys
  174. }
  175. }
  176. return nil
  177. },
  178. state.EventUpdateCluster{},
  179. )
  180. if err != nil {
  181. d.mu.Unlock()
  182. return err
  183. }
  184. // set queues here to guarantee that Close will close them
  185. d.mgrQueue = watch.NewQueue()
  186. d.keyMgrQueue = watch.NewQueue()
  187. peerWatcher, peerCancel := d.cluster.SubscribePeers()
  188. defer peerCancel()
  189. d.lastSeenManagers = getWeightedPeers(d.cluster)
  190. defer cancel()
  191. d.ctx, d.cancel = context.WithCancel(ctx)
  192. ctx = d.ctx
  193. d.wg.Add(1)
  194. defer d.wg.Done()
  195. d.mu.Unlock()
  196. publishManagers := func(peers []*api.Peer) {
  197. var mgrs []*api.WeightedPeer
  198. for _, p := range peers {
  199. mgrs = append(mgrs, &api.WeightedPeer{
  200. Peer: p,
  201. Weight: remotes.DefaultObservationWeight,
  202. })
  203. }
  204. d.mu.Lock()
  205. d.lastSeenManagers = mgrs
  206. d.mu.Unlock()
  207. d.mgrQueue.Publish(mgrs)
  208. }
  209. batchTimer := time.NewTimer(maxBatchInterval)
  210. defer batchTimer.Stop()
  211. for {
  212. select {
  213. case ev := <-peerWatcher:
  214. publishManagers(ev.([]*api.Peer))
  215. case <-d.processUpdatesTrigger:
  216. d.processUpdates(ctx)
  217. batchTimer.Reset(maxBatchInterval)
  218. case <-batchTimer.C:
  219. d.processUpdates(ctx)
  220. batchTimer.Reset(maxBatchInterval)
  221. case v := <-configWatcher:
  222. cluster := v.(state.EventUpdateCluster)
  223. d.mu.Lock()
  224. if cluster.Cluster.Spec.Dispatcher.HeartbeatPeriod != nil {
  225. // ignore error, since Spec has passed validation before
  226. heartbeatPeriod, _ := gogotypes.DurationFromProto(cluster.Cluster.Spec.Dispatcher.HeartbeatPeriod)
  227. if heartbeatPeriod != d.config.HeartbeatPeriod {
  228. // only call d.nodes.updatePeriod when heartbeatPeriod changes
  229. d.config.HeartbeatPeriod = heartbeatPeriod
  230. d.nodes.updatePeriod(d.config.HeartbeatPeriod, d.config.HeartbeatEpsilon, d.config.GracePeriodMultiplier)
  231. }
  232. }
  233. d.networkBootstrapKeys = cluster.Cluster.NetworkBootstrapKeys
  234. d.mu.Unlock()
  235. d.keyMgrQueue.Publish(cluster.Cluster.NetworkBootstrapKeys)
  236. case <-ctx.Done():
  237. return nil
  238. }
  239. }
  240. }
  241. // Stop stops dispatcher and closes all grpc streams.
  242. func (d *Dispatcher) Stop() error {
  243. d.mu.Lock()
  244. if !d.isRunning() {
  245. d.mu.Unlock()
  246. return errors.New("dispatcher is already stopped")
  247. }
  248. d.cancel()
  249. d.mu.Unlock()
  250. d.nodes.Clean()
  251. d.processUpdatesLock.Lock()
  252. // In case there are any waiters. There is no chance of any starting
  253. // after this point, because they check if the context is canceled
  254. // before waiting.
  255. d.processUpdatesCond.Broadcast()
  256. d.processUpdatesLock.Unlock()
  257. d.mgrQueue.Close()
  258. d.keyMgrQueue.Close()
  259. d.wg.Wait()
  260. return nil
  261. }
  262. func (d *Dispatcher) isRunningLocked() (context.Context, error) {
  263. d.mu.Lock()
  264. if !d.isRunning() {
  265. d.mu.Unlock()
  266. return nil, grpc.Errorf(codes.Aborted, "dispatcher is stopped")
  267. }
  268. ctx := d.ctx
  269. d.mu.Unlock()
  270. return ctx, nil
  271. }
  272. func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
  273. log := log.G(ctx).WithField("method", "(*Dispatcher).markNodesUnknown")
  274. var nodes []*api.Node
  275. var err error
  276. d.store.View(func(tx store.ReadTx) {
  277. nodes, err = store.FindNodes(tx, store.All)
  278. })
  279. if err != nil {
  280. return errors.Wrap(err, "failed to get list of nodes")
  281. }
  282. _, err = d.store.Batch(func(batch *store.Batch) error {
  283. for _, n := range nodes {
  284. err := batch.Update(func(tx store.Tx) error {
  285. // check if node is still here
  286. node := store.GetNode(tx, n.ID)
  287. if node == nil {
  288. return nil
  289. }
  290. // do not try to resurrect down nodes
  291. if node.Status.State == api.NodeStatus_DOWN {
  292. nodeCopy := node
  293. expireFunc := func() {
  294. if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil {
  295. log.WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
  296. }
  297. d.downNodes.Delete(nodeCopy.ID)
  298. }
  299. d.downNodes.Add(nodeCopy, expireFunc)
  300. return nil
  301. }
  302. node.Status.State = api.NodeStatus_UNKNOWN
  303. node.Status.Message = `Node moved to "unknown" state due to leadership change in cluster`
  304. nodeID := node.ID
  305. expireFunc := func() {
  306. log := log.WithField("node", nodeID)
  307. log.Debugf("heartbeat expiration for unknown node")
  308. if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil {
  309. log.WithError(err).Errorf(`failed deregistering node after heartbeat expiration for node in "unknown" state`)
  310. }
  311. }
  312. if err := d.nodes.AddUnknown(node, expireFunc); err != nil {
  313. return errors.Wrap(err, `adding node in "unknown" state to node store failed`)
  314. }
  315. if err := store.UpdateNode(tx, node); err != nil {
  316. return errors.Wrap(err, "update failed")
  317. }
  318. return nil
  319. })
  320. if err != nil {
  321. log.WithField("node", n.ID).WithError(err).Errorf(`failed to move node to "unknown" state`)
  322. }
  323. }
  324. return nil
  325. })
  326. return err
  327. }
  328. func (d *Dispatcher) isRunning() bool {
  329. if d.ctx == nil {
  330. return false
  331. }
  332. select {
  333. case <-d.ctx.Done():
  334. return false
  335. default:
  336. }
  337. return true
  338. }
  339. // markNodeReady updates the description of a node, updates its address, and sets status to READY
  340. // this is used during registration when a new node description is provided
  341. // and during node updates when the node description changes
  342. func (d *Dispatcher) markNodeReady(ctx context.Context, nodeID string, description *api.NodeDescription, addr string) error {
  343. d.nodeUpdatesLock.Lock()
  344. d.nodeUpdates[nodeID] = nodeUpdate{
  345. status: &api.NodeStatus{
  346. State: api.NodeStatus_READY,
  347. Addr: addr,
  348. },
  349. description: description,
  350. }
  351. numUpdates := len(d.nodeUpdates)
  352. d.nodeUpdatesLock.Unlock()
  353. // Node is marked ready. Remove the node from down nodes if it
  354. // is there.
  355. d.downNodes.Delete(nodeID)
  356. if numUpdates >= maxBatchItems {
  357. select {
  358. case d.processUpdatesTrigger <- struct{}{}:
  359. case <-ctx.Done():
  360. return ctx.Err()
  361. }
  362. }
  363. // Wait until the node update batch happens before unblocking register.
  364. d.processUpdatesLock.Lock()
  365. select {
  366. case <-ctx.Done():
  367. return ctx.Err()
  368. default:
  369. }
  370. d.processUpdatesCond.Wait()
  371. d.processUpdatesLock.Unlock()
  372. return nil
  373. }
  374. // gets the node IP from the context of a grpc call
  375. func nodeIPFromContext(ctx context.Context) (string, error) {
  376. nodeInfo, err := ca.RemoteNode(ctx)
  377. if err != nil {
  378. return "", err
  379. }
  380. addr, _, err := net.SplitHostPort(nodeInfo.RemoteAddr)
  381. if err != nil {
  382. return "", errors.Wrap(err, "unable to get ip from addr:port")
  383. }
  384. return addr, nil
  385. }
  386. // register is used for registration of node with particular dispatcher.
  387. func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
  388. // prevent register until we're ready to accept it
  389. dctx, err := d.isRunningLocked()
  390. if err != nil {
  391. return "", err
  392. }
  393. if err := d.nodes.CheckRateLimit(nodeID); err != nil {
  394. return "", err
  395. }
  396. // TODO(stevvooe): Validate node specification.
  397. var node *api.Node
  398. d.store.View(func(tx store.ReadTx) {
  399. node = store.GetNode(tx, nodeID)
  400. })
  401. if node == nil {
  402. return "", ErrNodeNotFound
  403. }
  404. addr, err := nodeIPFromContext(ctx)
  405. if err != nil {
  406. log.G(ctx).Debugf(err.Error())
  407. }
  408. if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil {
  409. return "", err
  410. }
  411. expireFunc := func() {
  412. log.G(ctx).Debugf("heartbeat expiration")
  413. if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil {
  414. log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration")
  415. }
  416. }
  417. rn := d.nodes.Add(node, expireFunc)
  418. // NOTE(stevvooe): We need be a little careful with re-registration. The
  419. // current implementation just matches the node id and then gives away the
  420. // sessionID. If we ever want to use sessionID as a secret, which we may
  421. // want to, this is giving away the keys to the kitchen.
  422. //
  423. // The right behavior is going to be informed by identity. Basically, each
  424. // time a node registers, we invalidate the session and issue a new
  425. // session, once identity is proven. This will cause misbehaved agents to
  426. // be kicked when multiple connections are made.
  427. return rn.SessionID, nil
  428. }
  429. // UpdateTaskStatus updates status of task. Node should send such updates
  430. // on every status change of its tasks.
  431. func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStatusRequest) (*api.UpdateTaskStatusResponse, error) {
  432. nodeInfo, err := ca.RemoteNode(ctx)
  433. if err != nil {
  434. return nil, err
  435. }
  436. nodeID := nodeInfo.NodeID
  437. fields := logrus.Fields{
  438. "node.id": nodeID,
  439. "node.session": r.SessionID,
  440. "method": "(*Dispatcher).UpdateTaskStatus",
  441. }
  442. if nodeInfo.ForwardedBy != nil {
  443. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  444. }
  445. log := log.G(ctx).WithFields(fields)
  446. dctx, err := d.isRunningLocked()
  447. if err != nil {
  448. return nil, err
  449. }
  450. if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  451. return nil, err
  452. }
  453. // Validate task updates
  454. for _, u := range r.Updates {
  455. if u.Status == nil {
  456. log.WithField("task.id", u.TaskID).Warn("task report has nil status")
  457. continue
  458. }
  459. var t *api.Task
  460. d.store.View(func(tx store.ReadTx) {
  461. t = store.GetTask(tx, u.TaskID)
  462. })
  463. if t == nil {
  464. log.WithField("task.id", u.TaskID).Warn("cannot find target task in store")
  465. continue
  466. }
  467. if t.NodeID != nodeID {
  468. err := grpc.Errorf(codes.PermissionDenied, "cannot update a task not assigned this node")
  469. log.WithField("task.id", u.TaskID).Error(err)
  470. return nil, err
  471. }
  472. }
  473. d.taskUpdatesLock.Lock()
  474. // Enqueue task updates
  475. for _, u := range r.Updates {
  476. if u.Status == nil {
  477. continue
  478. }
  479. d.taskUpdates[u.TaskID] = u.Status
  480. }
  481. numUpdates := len(d.taskUpdates)
  482. d.taskUpdatesLock.Unlock()
  483. if numUpdates >= maxBatchItems {
  484. select {
  485. case d.processUpdatesTrigger <- struct{}{}:
  486. case <-dctx.Done():
  487. }
  488. }
  489. return nil, nil
  490. }
  491. func (d *Dispatcher) processUpdates(ctx context.Context) {
  492. var (
  493. taskUpdates map[string]*api.TaskStatus
  494. nodeUpdates map[string]nodeUpdate
  495. )
  496. d.taskUpdatesLock.Lock()
  497. if len(d.taskUpdates) != 0 {
  498. taskUpdates = d.taskUpdates
  499. d.taskUpdates = make(map[string]*api.TaskStatus)
  500. }
  501. d.taskUpdatesLock.Unlock()
  502. d.nodeUpdatesLock.Lock()
  503. if len(d.nodeUpdates) != 0 {
  504. nodeUpdates = d.nodeUpdates
  505. d.nodeUpdates = make(map[string]nodeUpdate)
  506. }
  507. d.nodeUpdatesLock.Unlock()
  508. if len(taskUpdates) == 0 && len(nodeUpdates) == 0 {
  509. return
  510. }
  511. log := log.G(ctx).WithFields(logrus.Fields{
  512. "method": "(*Dispatcher).processUpdates",
  513. })
  514. _, err := d.store.Batch(func(batch *store.Batch) error {
  515. for taskID, status := range taskUpdates {
  516. err := batch.Update(func(tx store.Tx) error {
  517. logger := log.WithField("task.id", taskID)
  518. task := store.GetTask(tx, taskID)
  519. if task == nil {
  520. logger.Errorf("task unavailable")
  521. return nil
  522. }
  523. logger = logger.WithField("state.transition", fmt.Sprintf("%v->%v", task.Status.State, status.State))
  524. if task.Status == *status {
  525. logger.Debug("task status identical, ignoring")
  526. return nil
  527. }
  528. if task.Status.State > status.State {
  529. logger.Debug("task status invalid transition")
  530. return nil
  531. }
  532. task.Status = *status
  533. if err := store.UpdateTask(tx, task); err != nil {
  534. logger.WithError(err).Error("failed to update task status")
  535. return nil
  536. }
  537. logger.Debug("task status updated")
  538. return nil
  539. })
  540. if err != nil {
  541. log.WithError(err).Error("dispatcher task update transaction failed")
  542. }
  543. }
  544. for nodeID, nodeUpdate := range nodeUpdates {
  545. err := batch.Update(func(tx store.Tx) error {
  546. logger := log.WithField("node.id", nodeID)
  547. node := store.GetNode(tx, nodeID)
  548. if node == nil {
  549. logger.Errorf("node unavailable")
  550. return nil
  551. }
  552. if nodeUpdate.status != nil {
  553. node.Status.State = nodeUpdate.status.State
  554. node.Status.Message = nodeUpdate.status.Message
  555. if nodeUpdate.status.Addr != "" {
  556. node.Status.Addr = nodeUpdate.status.Addr
  557. }
  558. }
  559. if nodeUpdate.description != nil {
  560. node.Description = nodeUpdate.description
  561. }
  562. if err := store.UpdateNode(tx, node); err != nil {
  563. logger.WithError(err).Error("failed to update node status")
  564. return nil
  565. }
  566. logger.Debug("node status updated")
  567. return nil
  568. })
  569. if err != nil {
  570. log.WithError(err).Error("dispatcher node update transaction failed")
  571. }
  572. }
  573. return nil
  574. })
  575. if err != nil {
  576. log.WithError(err).Error("dispatcher batch failed")
  577. }
  578. d.processUpdatesCond.Broadcast()
  579. }
  580. // Tasks is a stream of tasks state for node. Each message contains full list
  581. // of tasks which should be run on node, if task is not present in that list,
  582. // it should be terminated.
  583. func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServer) error {
  584. nodeInfo, err := ca.RemoteNode(stream.Context())
  585. if err != nil {
  586. return err
  587. }
  588. nodeID := nodeInfo.NodeID
  589. dctx, err := d.isRunningLocked()
  590. if err != nil {
  591. return err
  592. }
  593. fields := logrus.Fields{
  594. "node.id": nodeID,
  595. "node.session": r.SessionID,
  596. "method": "(*Dispatcher).Tasks",
  597. }
  598. if nodeInfo.ForwardedBy != nil {
  599. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  600. }
  601. log.G(stream.Context()).WithFields(fields).Debugf("")
  602. if _, err = d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  603. return err
  604. }
  605. tasksMap := make(map[string]*api.Task)
  606. nodeTasks, cancel, err := store.ViewAndWatch(
  607. d.store,
  608. func(readTx store.ReadTx) error {
  609. tasks, err := store.FindTasks(readTx, store.ByNodeID(nodeID))
  610. if err != nil {
  611. return err
  612. }
  613. for _, t := range tasks {
  614. tasksMap[t.ID] = t
  615. }
  616. return nil
  617. },
  618. state.EventCreateTask{Task: &api.Task{NodeID: nodeID},
  619. Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
  620. state.EventUpdateTask{Task: &api.Task{NodeID: nodeID},
  621. Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
  622. state.EventDeleteTask{Task: &api.Task{NodeID: nodeID},
  623. Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
  624. )
  625. if err != nil {
  626. return err
  627. }
  628. defer cancel()
  629. for {
  630. if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  631. return err
  632. }
  633. var tasks []*api.Task
  634. for _, t := range tasksMap {
  635. // dispatcher only sends tasks that have been assigned to a node
  636. if t != nil && t.Status.State >= api.TaskStateAssigned {
  637. tasks = append(tasks, t)
  638. }
  639. }
  640. if err := stream.Send(&api.TasksMessage{Tasks: tasks}); err != nil {
  641. return err
  642. }
  643. // bursty events should be processed in batches and sent out snapshot
  644. var (
  645. modificationCnt int
  646. batchingTimer *time.Timer
  647. batchingTimeout <-chan time.Time
  648. )
  649. batchingLoop:
  650. for modificationCnt < modificationBatchLimit {
  651. select {
  652. case event := <-nodeTasks:
  653. switch v := event.(type) {
  654. case state.EventCreateTask:
  655. tasksMap[v.Task.ID] = v.Task
  656. modificationCnt++
  657. case state.EventUpdateTask:
  658. if oldTask, exists := tasksMap[v.Task.ID]; exists {
  659. // States ASSIGNED and below are set by the orchestrator/scheduler,
  660. // not the agent, so tasks in these states need to be sent to the
  661. // agent even if nothing else has changed.
  662. if equality.TasksEqualStable(oldTask, v.Task) && v.Task.Status.State > api.TaskStateAssigned {
  663. // this update should not trigger action at agent
  664. tasksMap[v.Task.ID] = v.Task
  665. continue
  666. }
  667. }
  668. tasksMap[v.Task.ID] = v.Task
  669. modificationCnt++
  670. case state.EventDeleteTask:
  671. delete(tasksMap, v.Task.ID)
  672. modificationCnt++
  673. }
  674. if batchingTimer != nil {
  675. batchingTimer.Reset(batchingWaitTime)
  676. } else {
  677. batchingTimer = time.NewTimer(batchingWaitTime)
  678. batchingTimeout = batchingTimer.C
  679. }
  680. case <-batchingTimeout:
  681. break batchingLoop
  682. case <-stream.Context().Done():
  683. return stream.Context().Err()
  684. case <-dctx.Done():
  685. return dctx.Err()
  686. }
  687. }
  688. if batchingTimer != nil {
  689. batchingTimer.Stop()
  690. }
  691. }
  692. }
  693. // Assignments is a stream of assignments for a node. Each message contains
  694. // either full list of tasks and secrets for the node, or an incremental update.
  695. func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatcher_AssignmentsServer) error {
  696. nodeInfo, err := ca.RemoteNode(stream.Context())
  697. if err != nil {
  698. return err
  699. }
  700. nodeID := nodeInfo.NodeID
  701. dctx, err := d.isRunningLocked()
  702. if err != nil {
  703. return err
  704. }
  705. fields := logrus.Fields{
  706. "node.id": nodeID,
  707. "node.session": r.SessionID,
  708. "method": "(*Dispatcher).Assignments",
  709. }
  710. if nodeInfo.ForwardedBy != nil {
  711. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  712. }
  713. log := log.G(stream.Context()).WithFields(fields)
  714. log.Debugf("")
  715. if _, err = d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  716. return err
  717. }
  718. var (
  719. sequence int64
  720. appliesTo string
  721. initial api.AssignmentsMessage
  722. )
  723. tasksMap := make(map[string]*api.Task)
  724. tasksUsingSecret := make(map[string]map[string]struct{})
  725. sendMessage := func(msg api.AssignmentsMessage, assignmentType api.AssignmentsMessage_Type) error {
  726. sequence++
  727. msg.AppliesTo = appliesTo
  728. msg.ResultsIn = strconv.FormatInt(sequence, 10)
  729. appliesTo = msg.ResultsIn
  730. msg.Type = assignmentType
  731. if err := stream.Send(&msg); err != nil {
  732. return err
  733. }
  734. return nil
  735. }
  736. // returns a slice of new secrets to send down
  737. addSecretsForTask := func(readTx store.ReadTx, t *api.Task) []*api.Secret {
  738. container := t.Spec.GetContainer()
  739. if container == nil {
  740. return nil
  741. }
  742. var newSecrets []*api.Secret
  743. for _, secretRef := range container.Secrets {
  744. secretID := secretRef.SecretID
  745. log := log.WithFields(logrus.Fields{
  746. "secret.id": secretID,
  747. "secret.name": secretRef.SecretName,
  748. })
  749. if len(tasksUsingSecret[secretID]) == 0 {
  750. tasksUsingSecret[secretID] = make(map[string]struct{})
  751. secret := store.GetSecret(readTx, secretID)
  752. if secret == nil {
  753. log.Debug("secret not found")
  754. continue
  755. }
  756. // If the secret was found, add this secret to
  757. // our set that we send down.
  758. newSecrets = append(newSecrets, secret)
  759. }
  760. tasksUsingSecret[secretID][t.ID] = struct{}{}
  761. }
  762. return newSecrets
  763. }
  764. // TODO(aaronl): Also send node secrets that should be exposed to
  765. // this node.
  766. nodeTasks, cancel, err := store.ViewAndWatch(
  767. d.store,
  768. func(readTx store.ReadTx) error {
  769. tasks, err := store.FindTasks(readTx, store.ByNodeID(nodeID))
  770. if err != nil {
  771. return err
  772. }
  773. for _, t := range tasks {
  774. // We only care about tasks that are ASSIGNED or
  775. // higher. If the state is below ASSIGNED, the
  776. // task may not meet the constraints for this
  777. // node, so we have to be careful about sending
  778. // secrets associated with it.
  779. if t.Status.State < api.TaskStateAssigned {
  780. continue
  781. }
  782. tasksMap[t.ID] = t
  783. taskChange := &api.AssignmentChange{
  784. Assignment: &api.Assignment{
  785. Item: &api.Assignment_Task{
  786. Task: t,
  787. },
  788. },
  789. Action: api.AssignmentChange_AssignmentActionUpdate,
  790. }
  791. initial.Changes = append(initial.Changes, taskChange)
  792. // Only send secrets down if these tasks are in < RUNNING
  793. if t.Status.State <= api.TaskStateRunning {
  794. newSecrets := addSecretsForTask(readTx, t)
  795. for _, secret := range newSecrets {
  796. secretChange := &api.AssignmentChange{
  797. Assignment: &api.Assignment{
  798. Item: &api.Assignment_Secret{
  799. Secret: secret,
  800. },
  801. },
  802. Action: api.AssignmentChange_AssignmentActionUpdate,
  803. }
  804. initial.Changes = append(initial.Changes, secretChange)
  805. }
  806. }
  807. }
  808. return nil
  809. },
  810. state.EventUpdateTask{Task: &api.Task{NodeID: nodeID},
  811. Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
  812. state.EventDeleteTask{Task: &api.Task{NodeID: nodeID},
  813. Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
  814. state.EventUpdateSecret{},
  815. state.EventDeleteSecret{},
  816. )
  817. if err != nil {
  818. return err
  819. }
  820. defer cancel()
  821. if err := sendMessage(initial, api.AssignmentsMessage_COMPLETE); err != nil {
  822. return err
  823. }
  824. for {
  825. // Check for session expiration
  826. if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  827. return err
  828. }
  829. // bursty events should be processed in batches and sent out together
  830. var (
  831. update api.AssignmentsMessage
  832. modificationCnt int
  833. batchingTimer *time.Timer
  834. batchingTimeout <-chan time.Time
  835. updateTasks = make(map[string]*api.Task)
  836. updateSecrets = make(map[string]*api.Secret)
  837. removeTasks = make(map[string]struct{})
  838. removeSecrets = make(map[string]struct{})
  839. )
  840. oneModification := func() {
  841. modificationCnt++
  842. if batchingTimer != nil {
  843. batchingTimer.Reset(batchingWaitTime)
  844. } else {
  845. batchingTimer = time.NewTimer(batchingWaitTime)
  846. batchingTimeout = batchingTimer.C
  847. }
  848. }
  849. // Release the secrets references from this task
  850. releaseSecretsForTask := func(t *api.Task) bool {
  851. var modified bool
  852. container := t.Spec.GetContainer()
  853. if container == nil {
  854. return modified
  855. }
  856. for _, secretRef := range container.Secrets {
  857. secretID := secretRef.SecretID
  858. delete(tasksUsingSecret[secretID], t.ID)
  859. if len(tasksUsingSecret[secretID]) == 0 {
  860. // No tasks are using the secret anymore
  861. delete(tasksUsingSecret, secretID)
  862. removeSecrets[secretID] = struct{}{}
  863. modified = true
  864. }
  865. }
  866. return modified
  867. }
  868. // The batching loop waits for 50 ms after the most recent
  869. // change, or until modificationBatchLimit is reached. The
  870. // worst case latency is modificationBatchLimit * batchingWaitTime,
  871. // which is 10 seconds.
  872. batchingLoop:
  873. for modificationCnt < modificationBatchLimit {
  874. select {
  875. case event := <-nodeTasks:
  876. switch v := event.(type) {
  877. // We don't monitor EventCreateTask because tasks are
  878. // never created in the ASSIGNED state. First tasks are
  879. // created by the orchestrator, then the scheduler moves
  880. // them to ASSIGNED. If this ever changes, we will need
  881. // to monitor task creations as well.
  882. case state.EventUpdateTask:
  883. // We only care about tasks that are ASSIGNED or
  884. // higher.
  885. if v.Task.Status.State < api.TaskStateAssigned {
  886. continue
  887. }
  888. if oldTask, exists := tasksMap[v.Task.ID]; exists {
  889. // States ASSIGNED and below are set by the orchestrator/scheduler,
  890. // not the agent, so tasks in these states need to be sent to the
  891. // agent even if nothing else has changed.
  892. if equality.TasksEqualStable(oldTask, v.Task) && v.Task.Status.State > api.TaskStateAssigned {
  893. // this update should not trigger a task change for the agent
  894. tasksMap[v.Task.ID] = v.Task
  895. // If this task got updated to a final state, let's release
  896. // the secrets that are being used by the task
  897. if v.Task.Status.State > api.TaskStateRunning {
  898. // If releasing the secrets caused a secret to be
  899. // removed from an agent, mark one modification
  900. if releaseSecretsForTask(v.Task) {
  901. oneModification()
  902. }
  903. }
  904. continue
  905. }
  906. } else if v.Task.Status.State <= api.TaskStateRunning {
  907. // If this task wasn't part of the assignment set before, and it's <= RUNNING
  908. // add the secrets it references to the secrets assignment.
  909. // Task states > RUNNING are worker reported only, are never created in
  910. // a > RUNNING state.
  911. var newSecrets []*api.Secret
  912. d.store.View(func(readTx store.ReadTx) {
  913. newSecrets = addSecretsForTask(readTx, v.Task)
  914. })
  915. for _, secret := range newSecrets {
  916. updateSecrets[secret.ID] = secret
  917. }
  918. }
  919. tasksMap[v.Task.ID] = v.Task
  920. updateTasks[v.Task.ID] = v.Task
  921. oneModification()
  922. case state.EventDeleteTask:
  923. if _, exists := tasksMap[v.Task.ID]; !exists {
  924. continue
  925. }
  926. removeTasks[v.Task.ID] = struct{}{}
  927. delete(tasksMap, v.Task.ID)
  928. // Release the secrets being used by this task
  929. // Ignoring the return here. We will always mark
  930. // this as a modification, since a task is being
  931. // removed.
  932. releaseSecretsForTask(v.Task)
  933. oneModification()
  934. // TODO(aaronl): For node secrets, we'll need to handle
  935. // EventCreateSecret.
  936. case state.EventUpdateSecret:
  937. if _, exists := tasksUsingSecret[v.Secret.ID]; !exists {
  938. continue
  939. }
  940. log.Debugf("Secret %s (ID: %d) was updated though it was still referenced by one or more tasks",
  941. v.Secret.Spec.Annotations.Name, v.Secret.ID)
  942. case state.EventDeleteSecret:
  943. if _, exists := tasksUsingSecret[v.Secret.ID]; !exists {
  944. continue
  945. }
  946. log.Debugf("Secret %s (ID: %d) was deleted though it was still referenced by one or more tasks",
  947. v.Secret.Spec.Annotations.Name, v.Secret.ID)
  948. }
  949. case <-batchingTimeout:
  950. break batchingLoop
  951. case <-stream.Context().Done():
  952. return stream.Context().Err()
  953. case <-dctx.Done():
  954. return dctx.Err()
  955. }
  956. }
  957. if batchingTimer != nil {
  958. batchingTimer.Stop()
  959. }
  960. if modificationCnt > 0 {
  961. for id, task := range updateTasks {
  962. if _, ok := removeTasks[id]; !ok {
  963. taskChange := &api.AssignmentChange{
  964. Assignment: &api.Assignment{
  965. Item: &api.Assignment_Task{
  966. Task: task,
  967. },
  968. },
  969. Action: api.AssignmentChange_AssignmentActionUpdate,
  970. }
  971. update.Changes = append(update.Changes, taskChange)
  972. }
  973. }
  974. for id, secret := range updateSecrets {
  975. // If, due to multiple updates, this secret is no longer in use,
  976. // don't send it down.
  977. if len(tasksUsingSecret[id]) == 0 {
  978. // delete this secret for the secrets to be updated
  979. // so that deleteSecrets knows the current list
  980. delete(updateSecrets, id)
  981. continue
  982. }
  983. secretChange := &api.AssignmentChange{
  984. Assignment: &api.Assignment{
  985. Item: &api.Assignment_Secret{
  986. Secret: secret,
  987. },
  988. },
  989. Action: api.AssignmentChange_AssignmentActionUpdate,
  990. }
  991. update.Changes = append(update.Changes, secretChange)
  992. }
  993. for id := range removeTasks {
  994. taskChange := &api.AssignmentChange{
  995. Assignment: &api.Assignment{
  996. Item: &api.Assignment_Task{
  997. Task: &api.Task{ID: id},
  998. },
  999. },
  1000. Action: api.AssignmentChange_AssignmentActionRemove,
  1001. }
  1002. update.Changes = append(update.Changes, taskChange)
  1003. }
  1004. for id := range removeSecrets {
  1005. // If this secret is also being sent on the updated set
  1006. // don't also add it to the removed set
  1007. if _, ok := updateSecrets[id]; ok {
  1008. continue
  1009. }
  1010. secretChange := &api.AssignmentChange{
  1011. Assignment: &api.Assignment{
  1012. Item: &api.Assignment_Secret{
  1013. Secret: &api.Secret{ID: id},
  1014. },
  1015. },
  1016. Action: api.AssignmentChange_AssignmentActionRemove,
  1017. }
  1018. update.Changes = append(update.Changes, secretChange)
  1019. }
  1020. if err := sendMessage(update, api.AssignmentsMessage_INCREMENTAL); err != nil {
  1021. return err
  1022. }
  1023. }
  1024. }
  1025. }
  1026. func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error {
  1027. _, err := d.store.Batch(func(batch *store.Batch) error {
  1028. var (
  1029. tasks []*api.Task
  1030. err error
  1031. )
  1032. d.store.View(func(tx store.ReadTx) {
  1033. tasks, err = store.FindTasks(tx, store.ByNodeID(nodeID))
  1034. })
  1035. if err != nil {
  1036. return err
  1037. }
  1038. for _, task := range tasks {
  1039. if task.Status.State < api.TaskStateOrphaned {
  1040. task.Status.State = api.TaskStateOrphaned
  1041. }
  1042. if err := batch.Update(func(tx store.Tx) error {
  1043. err := store.UpdateTask(tx, task)
  1044. if err != nil {
  1045. return err
  1046. }
  1047. return nil
  1048. }); err != nil {
  1049. return err
  1050. }
  1051. }
  1052. return nil
  1053. })
  1054. return err
  1055. }
  1056. // markNodeNotReady sets the node state to some state other than READY
  1057. func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
  1058. dctx, err := d.isRunningLocked()
  1059. if err != nil {
  1060. return err
  1061. }
  1062. // Node is down. Add it to down nodes so that we can keep
  1063. // track of tasks assigned to the node.
  1064. var node *api.Node
  1065. d.store.View(func(readTx store.ReadTx) {
  1066. node = store.GetNode(readTx, id)
  1067. if node == nil {
  1068. err = fmt.Errorf("could not find node %s while trying to add to down nodes store", id)
  1069. }
  1070. })
  1071. if err != nil {
  1072. return err
  1073. }
  1074. expireFunc := func() {
  1075. if err := d.moveTasksToOrphaned(id); err != nil {
  1076. log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
  1077. }
  1078. d.downNodes.Delete(id)
  1079. }
  1080. d.downNodes.Add(node, expireFunc)
  1081. status := &api.NodeStatus{
  1082. State: state,
  1083. Message: message,
  1084. }
  1085. d.nodeUpdatesLock.Lock()
  1086. // pluck the description out of nodeUpdates. this protects against a case
  1087. // where a node is marked ready and a description is added, but then the
  1088. // node is immediately marked not ready. this preserves that description
  1089. d.nodeUpdates[id] = nodeUpdate{status: status, description: d.nodeUpdates[id].description}
  1090. numUpdates := len(d.nodeUpdates)
  1091. d.nodeUpdatesLock.Unlock()
  1092. if numUpdates >= maxBatchItems {
  1093. select {
  1094. case d.processUpdatesTrigger <- struct{}{}:
  1095. case <-dctx.Done():
  1096. }
  1097. }
  1098. if rn := d.nodes.Delete(id); rn == nil {
  1099. return errors.Errorf("node %s is not found in local storage", id)
  1100. }
  1101. return nil
  1102. }
  1103. // Heartbeat is heartbeat method for nodes. It returns new TTL in response.
  1104. // Node should send new heartbeat earlier than now + TTL, otherwise it will
  1105. // be deregistered from dispatcher and its status will be updated to NodeStatus_DOWN
  1106. func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*api.HeartbeatResponse, error) {
  1107. nodeInfo, err := ca.RemoteNode(ctx)
  1108. if err != nil {
  1109. return nil, err
  1110. }
  1111. period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID)
  1112. return &api.HeartbeatResponse{Period: period}, err
  1113. }
  1114. func (d *Dispatcher) getManagers() []*api.WeightedPeer {
  1115. d.mu.Lock()
  1116. defer d.mu.Unlock()
  1117. return d.lastSeenManagers
  1118. }
  1119. func (d *Dispatcher) getNetworkBootstrapKeys() []*api.EncryptionKey {
  1120. d.mu.Lock()
  1121. defer d.mu.Unlock()
  1122. return d.networkBootstrapKeys
  1123. }
  1124. // Session is a stream which controls agent connection.
  1125. // Each message contains list of backup Managers with weights. Also there is
  1126. // a special boolean field Disconnect which if true indicates that node should
  1127. // reconnect to another Manager immediately.
  1128. func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
  1129. ctx := stream.Context()
  1130. nodeInfo, err := ca.RemoteNode(ctx)
  1131. if err != nil {
  1132. return err
  1133. }
  1134. nodeID := nodeInfo.NodeID
  1135. dctx, err := d.isRunningLocked()
  1136. if err != nil {
  1137. return err
  1138. }
  1139. var sessionID string
  1140. if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  1141. // register the node.
  1142. sessionID, err = d.register(ctx, nodeID, r.Description)
  1143. if err != nil {
  1144. return err
  1145. }
  1146. } else {
  1147. sessionID = r.SessionID
  1148. // get the node IP addr
  1149. addr, err := nodeIPFromContext(stream.Context())
  1150. if err != nil {
  1151. log.G(ctx).Debugf(err.Error())
  1152. }
  1153. // update the node description
  1154. if err := d.markNodeReady(dctx, nodeID, r.Description, addr); err != nil {
  1155. return err
  1156. }
  1157. }
  1158. fields := logrus.Fields{
  1159. "node.id": nodeID,
  1160. "node.session": sessionID,
  1161. "method": "(*Dispatcher).Session",
  1162. }
  1163. if nodeInfo.ForwardedBy != nil {
  1164. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  1165. }
  1166. log := log.G(ctx).WithFields(fields)
  1167. var nodeObj *api.Node
  1168. nodeUpdates, cancel, err := store.ViewAndWatch(d.store, func(readTx store.ReadTx) error {
  1169. nodeObj = store.GetNode(readTx, nodeID)
  1170. return nil
  1171. }, state.EventUpdateNode{Node: &api.Node{ID: nodeID},
  1172. Checks: []state.NodeCheckFunc{state.NodeCheckID}},
  1173. )
  1174. if cancel != nil {
  1175. defer cancel()
  1176. }
  1177. if err != nil {
  1178. log.WithError(err).Error("ViewAndWatch Node failed")
  1179. }
  1180. if _, err = d.nodes.GetWithSession(nodeID, sessionID); err != nil {
  1181. return err
  1182. }
  1183. if err := stream.Send(&api.SessionMessage{
  1184. SessionID: sessionID,
  1185. Node: nodeObj,
  1186. Managers: d.getManagers(),
  1187. NetworkBootstrapKeys: d.getNetworkBootstrapKeys(),
  1188. }); err != nil {
  1189. return err
  1190. }
  1191. managerUpdates, mgrCancel := d.mgrQueue.Watch()
  1192. defer mgrCancel()
  1193. keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch()
  1194. defer keyMgrCancel()
  1195. // disconnectNode is a helper forcibly shutdown connection
  1196. disconnectNode := func() error {
  1197. // force disconnect by shutting down the stream.
  1198. transportStream, ok := transport.StreamFromContext(stream.Context())
  1199. if ok {
  1200. // if we have the transport stream, we can signal a disconnect
  1201. // in the client.
  1202. if err := transportStream.ServerTransport().Close(); err != nil {
  1203. log.WithError(err).Error("session end")
  1204. }
  1205. }
  1206. if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil {
  1207. log.WithError(err).Error("failed to remove node")
  1208. }
  1209. // still return an abort if the transport closure was ineffective.
  1210. return grpc.Errorf(codes.Aborted, "node must disconnect")
  1211. }
  1212. for {
  1213. // After each message send, we need to check the nodes sessionID hasn't
  1214. // changed. If it has, we will shut down the stream and make the node
  1215. // re-register.
  1216. node, err := d.nodes.GetWithSession(nodeID, sessionID)
  1217. if err != nil {
  1218. return err
  1219. }
  1220. var (
  1221. disconnect bool
  1222. mgrs []*api.WeightedPeer
  1223. netKeys []*api.EncryptionKey
  1224. )
  1225. select {
  1226. case ev := <-managerUpdates:
  1227. mgrs = ev.([]*api.WeightedPeer)
  1228. case ev := <-nodeUpdates:
  1229. nodeObj = ev.(state.EventUpdateNode).Node
  1230. case <-stream.Context().Done():
  1231. return stream.Context().Err()
  1232. case <-node.Disconnect:
  1233. disconnect = true
  1234. case <-dctx.Done():
  1235. disconnect = true
  1236. case ev := <-keyMgrUpdates:
  1237. netKeys = ev.([]*api.EncryptionKey)
  1238. }
  1239. if mgrs == nil {
  1240. mgrs = d.getManagers()
  1241. }
  1242. if netKeys == nil {
  1243. netKeys = d.getNetworkBootstrapKeys()
  1244. }
  1245. if err := stream.Send(&api.SessionMessage{
  1246. SessionID: sessionID,
  1247. Node: nodeObj,
  1248. Managers: mgrs,
  1249. NetworkBootstrapKeys: netKeys,
  1250. }); err != nil {
  1251. return err
  1252. }
  1253. if disconnect {
  1254. return disconnectNode()
  1255. }
  1256. }
  1257. }