dispatcher.go 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427
  1. package dispatcher
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/transport"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/go-events"
  13. "github.com/docker/swarmkit/api"
  14. "github.com/docker/swarmkit/api/equality"
  15. "github.com/docker/swarmkit/ca"
  16. "github.com/docker/swarmkit/log"
  17. "github.com/docker/swarmkit/manager/state"
  18. "github.com/docker/swarmkit/manager/state/store"
  19. "github.com/docker/swarmkit/protobuf/ptypes"
  20. "github.com/docker/swarmkit/remotes"
  21. "github.com/docker/swarmkit/watch"
  22. "github.com/pkg/errors"
  23. "golang.org/x/net/context"
  24. )
  25. const (
  26. // DefaultHeartBeatPeriod is used for setting default value in cluster config
  27. // and in case if cluster config is missing.
  28. DefaultHeartBeatPeriod = 5 * time.Second
  29. defaultHeartBeatEpsilon = 500 * time.Millisecond
  30. defaultGracePeriodMultiplier = 3
  31. defaultRateLimitPeriod = 8 * time.Second
  32. // maxBatchItems is the threshold of queued writes that should
  33. // trigger an actual transaction to commit them to the shared store.
  34. maxBatchItems = 10000
  35. // maxBatchInterval needs to strike a balance between keeping
  36. // latency low, and realizing opportunities to combine many writes
  37. // into a single transaction. A fraction of a second feels about
  38. // right.
  39. maxBatchInterval = 100 * time.Millisecond
  40. modificationBatchLimit = 100
  41. batchingWaitTime = 100 * time.Millisecond
  42. // defaultNodeDownPeriod specifies the default time period we
  43. // wait before moving tasks assigned to down nodes to ORPHANED
  44. // state.
  45. defaultNodeDownPeriod = 24 * time.Hour
  46. )
  47. var (
  48. // ErrNodeAlreadyRegistered returned if node with same ID was already
  49. // registered with this dispatcher.
  50. ErrNodeAlreadyRegistered = errors.New("node already registered")
  51. // ErrNodeNotRegistered returned if node with such ID wasn't registered
  52. // with this dispatcher.
  53. ErrNodeNotRegistered = errors.New("node not registered")
  54. // ErrSessionInvalid returned when the session in use is no longer valid.
  55. // The node should re-register and start a new session.
  56. ErrSessionInvalid = errors.New("session invalid")
  57. // ErrNodeNotFound returned when the Node doesn't exist in raft.
  58. ErrNodeNotFound = errors.New("node not found")
  59. )
  60. // Config is configuration for Dispatcher. For default you should use
  61. // DefaultConfig.
  62. type Config struct {
  63. HeartbeatPeriod time.Duration
  64. HeartbeatEpsilon time.Duration
  65. // RateLimitPeriod specifies how often node with same ID can try to register
  66. // new session.
  67. RateLimitPeriod time.Duration
  68. GracePeriodMultiplier int
  69. }
  70. // DefaultConfig returns default config for Dispatcher.
  71. func DefaultConfig() *Config {
  72. return &Config{
  73. HeartbeatPeriod: DefaultHeartBeatPeriod,
  74. HeartbeatEpsilon: defaultHeartBeatEpsilon,
  75. RateLimitPeriod: defaultRateLimitPeriod,
  76. GracePeriodMultiplier: defaultGracePeriodMultiplier,
  77. }
  78. }
  79. // Cluster is interface which represent raft cluster. manager/state/raft.Node
  80. // is implements it. This interface needed only for easier unit-testing.
  81. type Cluster interface {
  82. GetMemberlist() map[uint64]*api.RaftMember
  83. SubscribePeers() (chan events.Event, func())
  84. MemoryStore() *store.MemoryStore
  85. }
  86. // nodeUpdate provides a new status and/or description to apply to a node
  87. // object.
  88. type nodeUpdate struct {
  89. status *api.NodeStatus
  90. description *api.NodeDescription
  91. }
  92. // Dispatcher is responsible for dispatching tasks and tracking agent health.
  93. type Dispatcher struct {
  94. mu sync.Mutex
  95. nodes *nodeStore
  96. store *store.MemoryStore
  97. mgrQueue *watch.Queue
  98. lastSeenManagers []*api.WeightedPeer
  99. networkBootstrapKeys []*api.EncryptionKey
  100. keyMgrQueue *watch.Queue
  101. config *Config
  102. cluster Cluster
  103. ctx context.Context
  104. cancel context.CancelFunc
  105. taskUpdates map[string]*api.TaskStatus // indexed by task ID
  106. taskUpdatesLock sync.Mutex
  107. nodeUpdates map[string]nodeUpdate // indexed by node ID
  108. nodeUpdatesLock sync.Mutex
  109. downNodes *nodeStore
  110. processUpdatesTrigger chan struct{}
  111. // for waiting for the next task/node batch update
  112. processUpdatesLock sync.Mutex
  113. processUpdatesCond *sync.Cond
  114. }
  115. // New returns Dispatcher with cluster interface(usually raft.Node).
  116. func New(cluster Cluster, c *Config) *Dispatcher {
  117. d := &Dispatcher{
  118. nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.RateLimitPeriod),
  119. downNodes: newNodeStore(defaultNodeDownPeriod, 0, 1, 0),
  120. store: cluster.MemoryStore(),
  121. cluster: cluster,
  122. taskUpdates: make(map[string]*api.TaskStatus),
  123. nodeUpdates: make(map[string]nodeUpdate),
  124. processUpdatesTrigger: make(chan struct{}, 1),
  125. config: c,
  126. }
  127. d.processUpdatesCond = sync.NewCond(&d.processUpdatesLock)
  128. return d
  129. }
  130. func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
  131. members := cluster.GetMemberlist()
  132. var mgrs []*api.WeightedPeer
  133. for _, m := range members {
  134. mgrs = append(mgrs, &api.WeightedPeer{
  135. Peer: &api.Peer{
  136. NodeID: m.NodeID,
  137. Addr: m.Addr,
  138. },
  139. // TODO(stevvooe): Calculate weight of manager selection based on
  140. // cluster-level observations, such as number of connections and
  141. // load.
  142. Weight: remotes.DefaultObservationWeight,
  143. })
  144. }
  145. return mgrs
  146. }
  147. // Run runs dispatcher tasks which should be run on leader dispatcher.
  148. // Dispatcher can be stopped with cancelling ctx or calling Stop().
  149. func (d *Dispatcher) Run(ctx context.Context) error {
  150. d.mu.Lock()
  151. if d.isRunning() {
  152. d.mu.Unlock()
  153. return errors.New("dispatcher is already running")
  154. }
  155. ctx = log.WithModule(ctx, "dispatcher")
  156. if err := d.markNodesUnknown(ctx); err != nil {
  157. log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err)
  158. }
  159. configWatcher, cancel, err := store.ViewAndWatch(
  160. d.store,
  161. func(readTx store.ReadTx) error {
  162. clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
  163. if err != nil {
  164. return err
  165. }
  166. if err == nil && len(clusters) == 1 {
  167. heartbeatPeriod, err := ptypes.Duration(clusters[0].Spec.Dispatcher.HeartbeatPeriod)
  168. if err == nil && heartbeatPeriod > 0 {
  169. d.config.HeartbeatPeriod = heartbeatPeriod
  170. }
  171. if clusters[0].NetworkBootstrapKeys != nil {
  172. d.networkBootstrapKeys = clusters[0].NetworkBootstrapKeys
  173. }
  174. }
  175. return nil
  176. },
  177. state.EventUpdateCluster{},
  178. )
  179. if err != nil {
  180. d.mu.Unlock()
  181. return err
  182. }
  183. // set queues here to guarantee that Close will close them
  184. d.mgrQueue = watch.NewQueue()
  185. d.keyMgrQueue = watch.NewQueue()
  186. peerWatcher, peerCancel := d.cluster.SubscribePeers()
  187. defer peerCancel()
  188. d.lastSeenManagers = getWeightedPeers(d.cluster)
  189. defer cancel()
  190. d.ctx, d.cancel = context.WithCancel(ctx)
  191. d.mu.Unlock()
  192. publishManagers := func(peers []*api.Peer) {
  193. var mgrs []*api.WeightedPeer
  194. for _, p := range peers {
  195. mgrs = append(mgrs, &api.WeightedPeer{
  196. Peer: p,
  197. Weight: remotes.DefaultObservationWeight,
  198. })
  199. }
  200. d.mu.Lock()
  201. d.lastSeenManagers = mgrs
  202. d.mu.Unlock()
  203. d.mgrQueue.Publish(mgrs)
  204. }
  205. batchTimer := time.NewTimer(maxBatchInterval)
  206. defer batchTimer.Stop()
  207. for {
  208. select {
  209. case ev := <-peerWatcher:
  210. publishManagers(ev.([]*api.Peer))
  211. case <-d.processUpdatesTrigger:
  212. d.processUpdates()
  213. batchTimer.Reset(maxBatchInterval)
  214. case <-batchTimer.C:
  215. d.processUpdates()
  216. batchTimer.Reset(maxBatchInterval)
  217. case v := <-configWatcher:
  218. cluster := v.(state.EventUpdateCluster)
  219. d.mu.Lock()
  220. if cluster.Cluster.Spec.Dispatcher.HeartbeatPeriod != nil {
  221. // ignore error, since Spec has passed validation before
  222. heartbeatPeriod, _ := ptypes.Duration(cluster.Cluster.Spec.Dispatcher.HeartbeatPeriod)
  223. if heartbeatPeriod != d.config.HeartbeatPeriod {
  224. // only call d.nodes.updatePeriod when heartbeatPeriod changes
  225. d.config.HeartbeatPeriod = heartbeatPeriod
  226. d.nodes.updatePeriod(d.config.HeartbeatPeriod, d.config.HeartbeatEpsilon, d.config.GracePeriodMultiplier)
  227. }
  228. }
  229. d.networkBootstrapKeys = cluster.Cluster.NetworkBootstrapKeys
  230. d.mu.Unlock()
  231. d.keyMgrQueue.Publish(cluster.Cluster.NetworkBootstrapKeys)
  232. case <-d.ctx.Done():
  233. return nil
  234. }
  235. }
  236. }
  237. // Stop stops dispatcher and closes all grpc streams.
  238. func (d *Dispatcher) Stop() error {
  239. d.mu.Lock()
  240. if !d.isRunning() {
  241. d.mu.Unlock()
  242. return errors.New("dispatcher is already stopped")
  243. }
  244. d.cancel()
  245. d.mu.Unlock()
  246. d.nodes.Clean()
  247. d.processUpdatesLock.Lock()
  248. // In case there are any waiters. There is no chance of any starting
  249. // after this point, because they check if the context is canceled
  250. // before waiting.
  251. d.processUpdatesCond.Broadcast()
  252. d.processUpdatesLock.Unlock()
  253. d.mgrQueue.Close()
  254. d.keyMgrQueue.Close()
  255. return nil
  256. }
  257. func (d *Dispatcher) isRunningLocked() error {
  258. d.mu.Lock()
  259. if !d.isRunning() {
  260. d.mu.Unlock()
  261. return grpc.Errorf(codes.Aborted, "dispatcher is stopped")
  262. }
  263. d.mu.Unlock()
  264. return nil
  265. }
  266. func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
  267. log := log.G(ctx).WithField("method", "(*Dispatcher).markNodesUnknown")
  268. var nodes []*api.Node
  269. var err error
  270. d.store.View(func(tx store.ReadTx) {
  271. nodes, err = store.FindNodes(tx, store.All)
  272. })
  273. if err != nil {
  274. return errors.Wrap(err, "failed to get list of nodes")
  275. }
  276. _, err = d.store.Batch(func(batch *store.Batch) error {
  277. for _, n := range nodes {
  278. err := batch.Update(func(tx store.Tx) error {
  279. // check if node is still here
  280. node := store.GetNode(tx, n.ID)
  281. if node == nil {
  282. return nil
  283. }
  284. // do not try to resurrect down nodes
  285. if node.Status.State == api.NodeStatus_DOWN {
  286. nodeCopy := node
  287. expireFunc := func() {
  288. if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil {
  289. log.WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
  290. }
  291. d.downNodes.Delete(nodeCopy.ID)
  292. }
  293. d.downNodes.Add(nodeCopy, expireFunc)
  294. return nil
  295. }
  296. node.Status.State = api.NodeStatus_UNKNOWN
  297. node.Status.Message = `Node moved to "unknown" state due to leadership change in cluster`
  298. nodeID := node.ID
  299. expireFunc := func() {
  300. log := log.WithField("node", nodeID)
  301. log.Debugf("heartbeat expiration for unknown node")
  302. if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil {
  303. log.WithError(err).Errorf(`failed deregistering node after heartbeat expiration for node in "unknown" state`)
  304. }
  305. }
  306. if err := d.nodes.AddUnknown(node, expireFunc); err != nil {
  307. return errors.Wrap(err, `adding node in "unknown" state to node store failed`)
  308. }
  309. if err := store.UpdateNode(tx, node); err != nil {
  310. return errors.Wrap(err, "update failed")
  311. }
  312. return nil
  313. })
  314. if err != nil {
  315. log.WithField("node", n.ID).WithError(err).Errorf(`failed to move node to "unknown" state`)
  316. }
  317. }
  318. return nil
  319. })
  320. return err
  321. }
  322. func (d *Dispatcher) isRunning() bool {
  323. if d.ctx == nil {
  324. return false
  325. }
  326. select {
  327. case <-d.ctx.Done():
  328. return false
  329. default:
  330. }
  331. return true
  332. }
  333. // markNodeReady updates the description of a node, updates its address, and sets status to READY
  334. // this is used during registration when a new node description is provided
  335. // and during node updates when the node description changes
  336. func (d *Dispatcher) markNodeReady(nodeID string, description *api.NodeDescription, addr string) error {
  337. d.nodeUpdatesLock.Lock()
  338. d.nodeUpdates[nodeID] = nodeUpdate{
  339. status: &api.NodeStatus{
  340. State: api.NodeStatus_READY,
  341. Addr: addr,
  342. },
  343. description: description,
  344. }
  345. numUpdates := len(d.nodeUpdates)
  346. d.nodeUpdatesLock.Unlock()
  347. // Node is marked ready. Remove the node from down nodes if it
  348. // is there.
  349. d.downNodes.Delete(nodeID)
  350. if numUpdates >= maxBatchItems {
  351. select {
  352. case d.processUpdatesTrigger <- struct{}{}:
  353. case <-d.ctx.Done():
  354. return d.ctx.Err()
  355. }
  356. }
  357. // Wait until the node update batch happens before unblocking register.
  358. d.processUpdatesLock.Lock()
  359. select {
  360. case <-d.ctx.Done():
  361. return d.ctx.Err()
  362. default:
  363. }
  364. d.processUpdatesCond.Wait()
  365. d.processUpdatesLock.Unlock()
  366. return nil
  367. }
  368. // gets the node IP from the context of a grpc call
  369. func nodeIPFromContext(ctx context.Context) (string, error) {
  370. nodeInfo, err := ca.RemoteNode(ctx)
  371. if err != nil {
  372. return "", err
  373. }
  374. addr, _, err := net.SplitHostPort(nodeInfo.RemoteAddr)
  375. if err != nil {
  376. return "", errors.Wrap(err, "unable to get ip from addr:port")
  377. }
  378. return addr, nil
  379. }
  380. // register is used for registration of node with particular dispatcher.
  381. func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
  382. // prevent register until we're ready to accept it
  383. if err := d.isRunningLocked(); err != nil {
  384. return "", err
  385. }
  386. if err := d.nodes.CheckRateLimit(nodeID); err != nil {
  387. return "", err
  388. }
  389. // TODO(stevvooe): Validate node specification.
  390. var node *api.Node
  391. d.store.View(func(tx store.ReadTx) {
  392. node = store.GetNode(tx, nodeID)
  393. })
  394. if node == nil {
  395. return "", ErrNodeNotFound
  396. }
  397. addr, err := nodeIPFromContext(ctx)
  398. if err != nil {
  399. log.G(ctx).Debugf(err.Error())
  400. }
  401. if err := d.markNodeReady(nodeID, description, addr); err != nil {
  402. return "", err
  403. }
  404. expireFunc := func() {
  405. log.G(ctx).Debugf("heartbeat expiration")
  406. if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil {
  407. log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration")
  408. }
  409. }
  410. rn := d.nodes.Add(node, expireFunc)
  411. // NOTE(stevvooe): We need be a little careful with re-registration. The
  412. // current implementation just matches the node id and then gives away the
  413. // sessionID. If we ever want to use sessionID as a secret, which we may
  414. // want to, this is giving away the keys to the kitchen.
  415. //
  416. // The right behavior is going to be informed by identity. Basically, each
  417. // time a node registers, we invalidate the session and issue a new
  418. // session, once identity is proven. This will cause misbehaved agents to
  419. // be kicked when multiple connections are made.
  420. return rn.SessionID, nil
  421. }
  422. // UpdateTaskStatus updates status of task. Node should send such updates
  423. // on every status change of its tasks.
  424. func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStatusRequest) (*api.UpdateTaskStatusResponse, error) {
  425. nodeInfo, err := ca.RemoteNode(ctx)
  426. if err != nil {
  427. return nil, err
  428. }
  429. nodeID := nodeInfo.NodeID
  430. fields := logrus.Fields{
  431. "node.id": nodeID,
  432. "node.session": r.SessionID,
  433. "method": "(*Dispatcher).UpdateTaskStatus",
  434. }
  435. if nodeInfo.ForwardedBy != nil {
  436. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  437. }
  438. log := log.G(ctx).WithFields(fields)
  439. if err := d.isRunningLocked(); err != nil {
  440. return nil, err
  441. }
  442. if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  443. return nil, err
  444. }
  445. // Validate task updates
  446. for _, u := range r.Updates {
  447. if u.Status == nil {
  448. log.WithField("task.id", u.TaskID).Warn("task report has nil status")
  449. continue
  450. }
  451. var t *api.Task
  452. d.store.View(func(tx store.ReadTx) {
  453. t = store.GetTask(tx, u.TaskID)
  454. })
  455. if t == nil {
  456. log.WithField("task.id", u.TaskID).Warn("cannot find target task in store")
  457. continue
  458. }
  459. if t.NodeID != nodeID {
  460. err := grpc.Errorf(codes.PermissionDenied, "cannot update a task not assigned this node")
  461. log.WithField("task.id", u.TaskID).Error(err)
  462. return nil, err
  463. }
  464. }
  465. d.taskUpdatesLock.Lock()
  466. // Enqueue task updates
  467. for _, u := range r.Updates {
  468. if u.Status == nil {
  469. continue
  470. }
  471. d.taskUpdates[u.TaskID] = u.Status
  472. }
  473. numUpdates := len(d.taskUpdates)
  474. d.taskUpdatesLock.Unlock()
  475. if numUpdates >= maxBatchItems {
  476. select {
  477. case d.processUpdatesTrigger <- struct{}{}:
  478. case <-d.ctx.Done():
  479. }
  480. }
  481. return nil, nil
  482. }
  483. func (d *Dispatcher) processUpdates() {
  484. var (
  485. taskUpdates map[string]*api.TaskStatus
  486. nodeUpdates map[string]nodeUpdate
  487. )
  488. d.taskUpdatesLock.Lock()
  489. if len(d.taskUpdates) != 0 {
  490. taskUpdates = d.taskUpdates
  491. d.taskUpdates = make(map[string]*api.TaskStatus)
  492. }
  493. d.taskUpdatesLock.Unlock()
  494. d.nodeUpdatesLock.Lock()
  495. if len(d.nodeUpdates) != 0 {
  496. nodeUpdates = d.nodeUpdates
  497. d.nodeUpdates = make(map[string]nodeUpdate)
  498. }
  499. d.nodeUpdatesLock.Unlock()
  500. if len(taskUpdates) == 0 && len(nodeUpdates) == 0 {
  501. return
  502. }
  503. log := log.G(d.ctx).WithFields(logrus.Fields{
  504. "method": "(*Dispatcher).processUpdates",
  505. })
  506. _, err := d.store.Batch(func(batch *store.Batch) error {
  507. for taskID, status := range taskUpdates {
  508. err := batch.Update(func(tx store.Tx) error {
  509. logger := log.WithField("task.id", taskID)
  510. task := store.GetTask(tx, taskID)
  511. if task == nil {
  512. logger.Errorf("task unavailable")
  513. return nil
  514. }
  515. logger = logger.WithField("state.transition", fmt.Sprintf("%v->%v", task.Status.State, status.State))
  516. if task.Status == *status {
  517. logger.Debug("task status identical, ignoring")
  518. return nil
  519. }
  520. if task.Status.State > status.State {
  521. logger.Debug("task status invalid transition")
  522. return nil
  523. }
  524. task.Status = *status
  525. if err := store.UpdateTask(tx, task); err != nil {
  526. logger.WithError(err).Error("failed to update task status")
  527. return nil
  528. }
  529. logger.Debug("task status updated")
  530. return nil
  531. })
  532. if err != nil {
  533. log.WithError(err).Error("dispatcher task update transaction failed")
  534. }
  535. }
  536. for nodeID, nodeUpdate := range nodeUpdates {
  537. err := batch.Update(func(tx store.Tx) error {
  538. logger := log.WithField("node.id", nodeID)
  539. node := store.GetNode(tx, nodeID)
  540. if node == nil {
  541. logger.Errorf("node unavailable")
  542. return nil
  543. }
  544. if nodeUpdate.status != nil {
  545. node.Status.State = nodeUpdate.status.State
  546. node.Status.Message = nodeUpdate.status.Message
  547. if nodeUpdate.status.Addr != "" {
  548. node.Status.Addr = nodeUpdate.status.Addr
  549. }
  550. }
  551. if nodeUpdate.description != nil {
  552. node.Description = nodeUpdate.description
  553. }
  554. if err := store.UpdateNode(tx, node); err != nil {
  555. logger.WithError(err).Error("failed to update node status")
  556. return nil
  557. }
  558. logger.Debug("node status updated")
  559. return nil
  560. })
  561. if err != nil {
  562. log.WithError(err).Error("dispatcher node update transaction failed")
  563. }
  564. }
  565. return nil
  566. })
  567. if err != nil {
  568. log.WithError(err).Error("dispatcher batch failed")
  569. }
  570. d.processUpdatesCond.Broadcast()
  571. }
  572. // Tasks is a stream of tasks state for node. Each message contains full list
  573. // of tasks which should be run on node, if task is not present in that list,
  574. // it should be terminated.
  575. func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServer) error {
  576. nodeInfo, err := ca.RemoteNode(stream.Context())
  577. if err != nil {
  578. return err
  579. }
  580. nodeID := nodeInfo.NodeID
  581. if err := d.isRunningLocked(); err != nil {
  582. return err
  583. }
  584. fields := logrus.Fields{
  585. "node.id": nodeID,
  586. "node.session": r.SessionID,
  587. "method": "(*Dispatcher).Tasks",
  588. }
  589. if nodeInfo.ForwardedBy != nil {
  590. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  591. }
  592. log.G(stream.Context()).WithFields(fields).Debugf("")
  593. if _, err = d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  594. return err
  595. }
  596. tasksMap := make(map[string]*api.Task)
  597. nodeTasks, cancel, err := store.ViewAndWatch(
  598. d.store,
  599. func(readTx store.ReadTx) error {
  600. tasks, err := store.FindTasks(readTx, store.ByNodeID(nodeID))
  601. if err != nil {
  602. return err
  603. }
  604. for _, t := range tasks {
  605. tasksMap[t.ID] = t
  606. }
  607. return nil
  608. },
  609. state.EventCreateTask{Task: &api.Task{NodeID: nodeID},
  610. Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
  611. state.EventUpdateTask{Task: &api.Task{NodeID: nodeID},
  612. Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
  613. state.EventDeleteTask{Task: &api.Task{NodeID: nodeID},
  614. Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
  615. )
  616. if err != nil {
  617. return err
  618. }
  619. defer cancel()
  620. for {
  621. if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  622. return err
  623. }
  624. var tasks []*api.Task
  625. for _, t := range tasksMap {
  626. // dispatcher only sends tasks that have been assigned to a node
  627. if t != nil && t.Status.State >= api.TaskStateAssigned {
  628. tasks = append(tasks, t)
  629. }
  630. }
  631. if err := stream.Send(&api.TasksMessage{Tasks: tasks}); err != nil {
  632. return err
  633. }
  634. // bursty events should be processed in batches and sent out snapshot
  635. var (
  636. modificationCnt int
  637. batchingTimer *time.Timer
  638. batchingTimeout <-chan time.Time
  639. )
  640. batchingLoop:
  641. for modificationCnt < modificationBatchLimit {
  642. select {
  643. case event := <-nodeTasks:
  644. switch v := event.(type) {
  645. case state.EventCreateTask:
  646. tasksMap[v.Task.ID] = v.Task
  647. modificationCnt++
  648. case state.EventUpdateTask:
  649. if oldTask, exists := tasksMap[v.Task.ID]; exists {
  650. // States ASSIGNED and below are set by the orchestrator/scheduler,
  651. // not the agent, so tasks in these states need to be sent to the
  652. // agent even if nothing else has changed.
  653. if equality.TasksEqualStable(oldTask, v.Task) && v.Task.Status.State > api.TaskStateAssigned {
  654. // this update should not trigger action at agent
  655. tasksMap[v.Task.ID] = v.Task
  656. continue
  657. }
  658. }
  659. tasksMap[v.Task.ID] = v.Task
  660. modificationCnt++
  661. case state.EventDeleteTask:
  662. delete(tasksMap, v.Task.ID)
  663. modificationCnt++
  664. }
  665. if batchingTimer != nil {
  666. batchingTimer.Reset(batchingWaitTime)
  667. } else {
  668. batchingTimer = time.NewTimer(batchingWaitTime)
  669. batchingTimeout = batchingTimer.C
  670. }
  671. case <-batchingTimeout:
  672. break batchingLoop
  673. case <-stream.Context().Done():
  674. return stream.Context().Err()
  675. case <-d.ctx.Done():
  676. return d.ctx.Err()
  677. }
  678. }
  679. if batchingTimer != nil {
  680. batchingTimer.Stop()
  681. }
  682. }
  683. }
  684. // Assignments is a stream of assignments for a node. Each message contains
  685. // either full list of tasks and secrets for the node, or an incremental update.
  686. func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatcher_AssignmentsServer) error {
  687. nodeInfo, err := ca.RemoteNode(stream.Context())
  688. if err != nil {
  689. return err
  690. }
  691. nodeID := nodeInfo.NodeID
  692. if err := d.isRunningLocked(); err != nil {
  693. return err
  694. }
  695. fields := logrus.Fields{
  696. "node.id": nodeID,
  697. "node.session": r.SessionID,
  698. "method": "(*Dispatcher).Assignments",
  699. }
  700. if nodeInfo.ForwardedBy != nil {
  701. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  702. }
  703. log := log.G(stream.Context()).WithFields(fields)
  704. log.Debugf("")
  705. if _, err = d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  706. return err
  707. }
  708. var (
  709. sequence int64
  710. appliesTo string
  711. initial api.AssignmentsMessage
  712. )
  713. tasksMap := make(map[string]*api.Task)
  714. tasksUsingSecret := make(map[string]map[string]struct{})
  715. sendMessage := func(msg api.AssignmentsMessage, assignmentType api.AssignmentsMessage_Type) error {
  716. sequence++
  717. msg.AppliesTo = appliesTo
  718. msg.ResultsIn = strconv.FormatInt(sequence, 10)
  719. appliesTo = msg.ResultsIn
  720. msg.Type = assignmentType
  721. if err := stream.Send(&msg); err != nil {
  722. return err
  723. }
  724. return nil
  725. }
  726. // returns a slice of new secrets to send down
  727. addSecretsForTask := func(readTx store.ReadTx, t *api.Task) []*api.Secret {
  728. container := t.Spec.GetContainer()
  729. if container == nil {
  730. return nil
  731. }
  732. var newSecrets []*api.Secret
  733. for _, secretRef := range container.Secrets {
  734. // Empty ID prefix will return all secrets. Bail if there is no SecretID
  735. if secretRef.SecretID == "" {
  736. log.Debugf("invalid secret reference")
  737. continue
  738. }
  739. secretID := secretRef.SecretID
  740. log := log.WithFields(logrus.Fields{
  741. "secret.id": secretID,
  742. "secret.name": secretRef.SecretName,
  743. })
  744. if len(tasksUsingSecret[secretID]) == 0 {
  745. tasksUsingSecret[secretID] = make(map[string]struct{})
  746. secrets, err := store.FindSecrets(readTx, store.ByIDPrefix(secretID))
  747. if err != nil {
  748. log.WithError(err).Errorf("error retrieving secret")
  749. continue
  750. }
  751. if len(secrets) != 1 {
  752. log.Debugf("secret not found")
  753. continue
  754. }
  755. // If the secret was found and there was one result
  756. // (there should never be more than one because of the
  757. // uniqueness constraint), add this secret to our
  758. // initial set that we send down.
  759. newSecrets = append(newSecrets, secrets[0])
  760. }
  761. tasksUsingSecret[secretID][t.ID] = struct{}{}
  762. }
  763. return newSecrets
  764. }
  765. // TODO(aaronl): Also send node secrets that should be exposed to
  766. // this node.
  767. nodeTasks, cancel, err := store.ViewAndWatch(
  768. d.store,
  769. func(readTx store.ReadTx) error {
  770. tasks, err := store.FindTasks(readTx, store.ByNodeID(nodeID))
  771. if err != nil {
  772. return err
  773. }
  774. for _, t := range tasks {
  775. // We only care about tasks that are ASSIGNED or
  776. // higher. If the state is below ASSIGNED, the
  777. // task may not meet the constraints for this
  778. // node, so we have to be careful about sending
  779. // secrets associated with it.
  780. if t.Status.State < api.TaskStateAssigned {
  781. continue
  782. }
  783. tasksMap[t.ID] = t
  784. taskChange := &api.AssignmentChange{
  785. Assignment: &api.Assignment{
  786. Item: &api.Assignment_Task{
  787. Task: t,
  788. },
  789. },
  790. Action: api.AssignmentChange_AssignmentActionUpdate,
  791. }
  792. initial.Changes = append(initial.Changes, taskChange)
  793. // Only send secrets down if these tasks are in < RUNNING
  794. if t.Status.State <= api.TaskStateRunning {
  795. newSecrets := addSecretsForTask(readTx, t)
  796. for _, secret := range newSecrets {
  797. secretChange := &api.AssignmentChange{
  798. Assignment: &api.Assignment{
  799. Item: &api.Assignment_Secret{
  800. Secret: secret,
  801. },
  802. },
  803. Action: api.AssignmentChange_AssignmentActionUpdate,
  804. }
  805. initial.Changes = append(initial.Changes, secretChange)
  806. }
  807. }
  808. }
  809. return nil
  810. },
  811. state.EventUpdateTask{Task: &api.Task{NodeID: nodeID},
  812. Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
  813. state.EventDeleteTask{Task: &api.Task{NodeID: nodeID},
  814. Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
  815. state.EventUpdateSecret{},
  816. state.EventDeleteSecret{},
  817. )
  818. if err != nil {
  819. return err
  820. }
  821. defer cancel()
  822. if err := sendMessage(initial, api.AssignmentsMessage_COMPLETE); err != nil {
  823. return err
  824. }
  825. for {
  826. // Check for session expiration
  827. if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  828. return err
  829. }
  830. // bursty events should be processed in batches and sent out together
  831. var (
  832. update api.AssignmentsMessage
  833. modificationCnt int
  834. batchingTimer *time.Timer
  835. batchingTimeout <-chan time.Time
  836. updateTasks = make(map[string]*api.Task)
  837. updateSecrets = make(map[string]*api.Secret)
  838. removeTasks = make(map[string]struct{})
  839. removeSecrets = make(map[string]struct{})
  840. )
  841. oneModification := func() {
  842. modificationCnt++
  843. if batchingTimer != nil {
  844. batchingTimer.Reset(batchingWaitTime)
  845. } else {
  846. batchingTimer = time.NewTimer(batchingWaitTime)
  847. batchingTimeout = batchingTimer.C
  848. }
  849. }
  850. // Release the secrets references from this task
  851. releaseSecretsForTask := func(t *api.Task) bool {
  852. var modified bool
  853. container := t.Spec.GetContainer()
  854. if container == nil {
  855. return modified
  856. }
  857. for _, secretRef := range container.Secrets {
  858. secretID := secretRef.SecretID
  859. delete(tasksUsingSecret[secretID], t.ID)
  860. if len(tasksUsingSecret[secretID]) == 0 {
  861. // No tasks are using the secret anymore
  862. delete(tasksUsingSecret, secretID)
  863. removeSecrets[secretID] = struct{}{}
  864. modified = true
  865. }
  866. }
  867. return modified
  868. }
  869. // The batching loop waits for 50 ms after the most recent
  870. // change, or until modificationBatchLimit is reached. The
  871. // worst case latency is modificationBatchLimit * batchingWaitTime,
  872. // which is 10 seconds.
  873. batchingLoop:
  874. for modificationCnt < modificationBatchLimit {
  875. select {
  876. case event := <-nodeTasks:
  877. switch v := event.(type) {
  878. // We don't monitor EventCreateTask because tasks are
  879. // never created in the ASSIGNED state. First tasks are
  880. // created by the orchestrator, then the scheduler moves
  881. // them to ASSIGNED. If this ever changes, we will need
  882. // to monitor task creations as well.
  883. case state.EventUpdateTask:
  884. // We only care about tasks that are ASSIGNED or
  885. // higher.
  886. if v.Task.Status.State < api.TaskStateAssigned {
  887. continue
  888. }
  889. if oldTask, exists := tasksMap[v.Task.ID]; exists {
  890. // States ASSIGNED and below are set by the orchestrator/scheduler,
  891. // not the agent, so tasks in these states need to be sent to the
  892. // agent even if nothing else has changed.
  893. if equality.TasksEqualStable(oldTask, v.Task) && v.Task.Status.State > api.TaskStateAssigned {
  894. // this update should not trigger a task change for the agent
  895. tasksMap[v.Task.ID] = v.Task
  896. // If this task got updated to a final state, let's release
  897. // the secrets that are being used by the task
  898. if v.Task.Status.State > api.TaskStateRunning {
  899. // If releasing the secrets caused a secret to be
  900. // removed from an agent, mark one modification
  901. if releaseSecretsForTask(v.Task) {
  902. oneModification()
  903. }
  904. }
  905. continue
  906. }
  907. } else if v.Task.Status.State <= api.TaskStateRunning {
  908. // If this task wasn't part of the assignment set before, and it's <= RUNNING
  909. // add the secrets it references to the secrets assignment.
  910. // Task states > RUNNING are worker reported only, are never created in
  911. // a > RUNNING state.
  912. var newSecrets []*api.Secret
  913. d.store.View(func(readTx store.ReadTx) {
  914. newSecrets = addSecretsForTask(readTx, v.Task)
  915. })
  916. for _, secret := range newSecrets {
  917. updateSecrets[secret.ID] = secret
  918. }
  919. }
  920. tasksMap[v.Task.ID] = v.Task
  921. updateTasks[v.Task.ID] = v.Task
  922. oneModification()
  923. case state.EventDeleteTask:
  924. if _, exists := tasksMap[v.Task.ID]; !exists {
  925. continue
  926. }
  927. removeTasks[v.Task.ID] = struct{}{}
  928. delete(tasksMap, v.Task.ID)
  929. // Release the secrets being used by this task
  930. // Ignoring the return here. We will always mark
  931. // this as a modification, since a task is being
  932. // removed.
  933. releaseSecretsForTask(v.Task)
  934. oneModification()
  935. // TODO(aaronl): For node secrets, we'll need to handle
  936. // EventCreateSecret.
  937. case state.EventUpdateSecret:
  938. if _, exists := tasksUsingSecret[v.Secret.ID]; !exists {
  939. continue
  940. }
  941. log.Debugf("Secret %s (ID: %d) was updated though it was still referenced by one or more tasks",
  942. v.Secret.Spec.Annotations.Name, v.Secret.ID)
  943. case state.EventDeleteSecret:
  944. if _, exists := tasksUsingSecret[v.Secret.ID]; !exists {
  945. continue
  946. }
  947. log.Debugf("Secret %s (ID: %d) was deleted though it was still referenced by one or more tasks",
  948. v.Secret.Spec.Annotations.Name, v.Secret.ID)
  949. }
  950. case <-batchingTimeout:
  951. break batchingLoop
  952. case <-stream.Context().Done():
  953. return stream.Context().Err()
  954. case <-d.ctx.Done():
  955. return d.ctx.Err()
  956. }
  957. }
  958. if batchingTimer != nil {
  959. batchingTimer.Stop()
  960. }
  961. if modificationCnt > 0 {
  962. for id, task := range updateTasks {
  963. if _, ok := removeTasks[id]; !ok {
  964. taskChange := &api.AssignmentChange{
  965. Assignment: &api.Assignment{
  966. Item: &api.Assignment_Task{
  967. Task: task,
  968. },
  969. },
  970. Action: api.AssignmentChange_AssignmentActionUpdate,
  971. }
  972. update.Changes = append(update.Changes, taskChange)
  973. }
  974. }
  975. for id, secret := range updateSecrets {
  976. // If, due to multiple updates, this secret is no longer in use,
  977. // don't send it down.
  978. if len(tasksUsingSecret[id]) == 0 {
  979. // delete this secret for the secrets to be updated
  980. // so that deleteSecrets knows the current list
  981. delete(updateSecrets, id)
  982. continue
  983. }
  984. secretChange := &api.AssignmentChange{
  985. Assignment: &api.Assignment{
  986. Item: &api.Assignment_Secret{
  987. Secret: secret,
  988. },
  989. },
  990. Action: api.AssignmentChange_AssignmentActionUpdate,
  991. }
  992. update.Changes = append(update.Changes, secretChange)
  993. }
  994. for id := range removeTasks {
  995. taskChange := &api.AssignmentChange{
  996. Assignment: &api.Assignment{
  997. Item: &api.Assignment_Task{
  998. Task: &api.Task{ID: id},
  999. },
  1000. },
  1001. Action: api.AssignmentChange_AssignmentActionRemove,
  1002. }
  1003. update.Changes = append(update.Changes, taskChange)
  1004. }
  1005. for id := range removeSecrets {
  1006. // If this secret is also being sent on the updated set
  1007. // don't also add it to the removed set
  1008. if _, ok := updateSecrets[id]; ok {
  1009. continue
  1010. }
  1011. secretChange := &api.AssignmentChange{
  1012. Assignment: &api.Assignment{
  1013. Item: &api.Assignment_Secret{
  1014. Secret: &api.Secret{ID: id},
  1015. },
  1016. },
  1017. Action: api.AssignmentChange_AssignmentActionRemove,
  1018. }
  1019. update.Changes = append(update.Changes, secretChange)
  1020. }
  1021. if err := sendMessage(update, api.AssignmentsMessage_INCREMENTAL); err != nil {
  1022. return err
  1023. }
  1024. }
  1025. }
  1026. }
  1027. func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error {
  1028. _, err := d.store.Batch(func(batch *store.Batch) error {
  1029. var (
  1030. tasks []*api.Task
  1031. err error
  1032. )
  1033. d.store.View(func(tx store.ReadTx) {
  1034. tasks, err = store.FindTasks(tx, store.ByNodeID(nodeID))
  1035. })
  1036. if err != nil {
  1037. return err
  1038. }
  1039. for _, task := range tasks {
  1040. if task.Status.State < api.TaskStateOrphaned {
  1041. task.Status.State = api.TaskStateOrphaned
  1042. }
  1043. if err := batch.Update(func(tx store.Tx) error {
  1044. err := store.UpdateTask(tx, task)
  1045. if err != nil {
  1046. return err
  1047. }
  1048. return nil
  1049. }); err != nil {
  1050. return err
  1051. }
  1052. }
  1053. return nil
  1054. })
  1055. return err
  1056. }
  1057. // markNodeNotReady sets the node state to some state other than READY
  1058. func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
  1059. if err := d.isRunningLocked(); err != nil {
  1060. return err
  1061. }
  1062. // Node is down. Add it to down nodes so that we can keep
  1063. // track of tasks assigned to the node.
  1064. var (
  1065. node *api.Node
  1066. err error
  1067. )
  1068. d.store.View(func(readTx store.ReadTx) {
  1069. node = store.GetNode(readTx, id)
  1070. if node == nil {
  1071. err = fmt.Errorf("could not find node %s while trying to add to down nodes store", id)
  1072. }
  1073. })
  1074. if err != nil {
  1075. return err
  1076. }
  1077. expireFunc := func() {
  1078. if err := d.moveTasksToOrphaned(id); err != nil {
  1079. log.G(context.TODO()).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
  1080. }
  1081. d.downNodes.Delete(id)
  1082. }
  1083. d.downNodes.Add(node, expireFunc)
  1084. status := &api.NodeStatus{
  1085. State: state,
  1086. Message: message,
  1087. }
  1088. d.nodeUpdatesLock.Lock()
  1089. // pluck the description out of nodeUpdates. this protects against a case
  1090. // where a node is marked ready and a description is added, but then the
  1091. // node is immediately marked not ready. this preserves that description
  1092. d.nodeUpdates[id] = nodeUpdate{status: status, description: d.nodeUpdates[id].description}
  1093. numUpdates := len(d.nodeUpdates)
  1094. d.nodeUpdatesLock.Unlock()
  1095. if numUpdates >= maxBatchItems {
  1096. select {
  1097. case d.processUpdatesTrigger <- struct{}{}:
  1098. case <-d.ctx.Done():
  1099. }
  1100. }
  1101. if rn := d.nodes.Delete(id); rn == nil {
  1102. return errors.Errorf("node %s is not found in local storage", id)
  1103. }
  1104. return nil
  1105. }
  1106. // Heartbeat is heartbeat method for nodes. It returns new TTL in response.
  1107. // Node should send new heartbeat earlier than now + TTL, otherwise it will
  1108. // be deregistered from dispatcher and its status will be updated to NodeStatus_DOWN
  1109. func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*api.HeartbeatResponse, error) {
  1110. nodeInfo, err := ca.RemoteNode(ctx)
  1111. if err != nil {
  1112. return nil, err
  1113. }
  1114. period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID)
  1115. return &api.HeartbeatResponse{Period: *ptypes.DurationProto(period)}, err
  1116. }
  1117. func (d *Dispatcher) getManagers() []*api.WeightedPeer {
  1118. d.mu.Lock()
  1119. defer d.mu.Unlock()
  1120. return d.lastSeenManagers
  1121. }
  1122. func (d *Dispatcher) getNetworkBootstrapKeys() []*api.EncryptionKey {
  1123. d.mu.Lock()
  1124. defer d.mu.Unlock()
  1125. return d.networkBootstrapKeys
  1126. }
  1127. // Session is a stream which controls agent connection.
  1128. // Each message contains list of backup Managers with weights. Also there is
  1129. // a special boolean field Disconnect which if true indicates that node should
  1130. // reconnect to another Manager immediately.
  1131. func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
  1132. ctx := stream.Context()
  1133. nodeInfo, err := ca.RemoteNode(ctx)
  1134. if err != nil {
  1135. return err
  1136. }
  1137. nodeID := nodeInfo.NodeID
  1138. if err := d.isRunningLocked(); err != nil {
  1139. return err
  1140. }
  1141. var sessionID string
  1142. if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  1143. // register the node.
  1144. sessionID, err = d.register(ctx, nodeID, r.Description)
  1145. if err != nil {
  1146. return err
  1147. }
  1148. } else {
  1149. sessionID = r.SessionID
  1150. // get the node IP addr
  1151. addr, err := nodeIPFromContext(stream.Context())
  1152. if err != nil {
  1153. log.G(ctx).Debugf(err.Error())
  1154. }
  1155. // update the node description
  1156. if err := d.markNodeReady(nodeID, r.Description, addr); err != nil {
  1157. return err
  1158. }
  1159. }
  1160. fields := logrus.Fields{
  1161. "node.id": nodeID,
  1162. "node.session": sessionID,
  1163. "method": "(*Dispatcher).Session",
  1164. }
  1165. if nodeInfo.ForwardedBy != nil {
  1166. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  1167. }
  1168. log := log.G(ctx).WithFields(fields)
  1169. var nodeObj *api.Node
  1170. nodeUpdates, cancel, err := store.ViewAndWatch(d.store, func(readTx store.ReadTx) error {
  1171. nodeObj = store.GetNode(readTx, nodeID)
  1172. return nil
  1173. }, state.EventUpdateNode{Node: &api.Node{ID: nodeID},
  1174. Checks: []state.NodeCheckFunc{state.NodeCheckID}},
  1175. )
  1176. if cancel != nil {
  1177. defer cancel()
  1178. }
  1179. if err != nil {
  1180. log.WithError(err).Error("ViewAndWatch Node failed")
  1181. }
  1182. if _, err = d.nodes.GetWithSession(nodeID, sessionID); err != nil {
  1183. return err
  1184. }
  1185. if err := stream.Send(&api.SessionMessage{
  1186. SessionID: sessionID,
  1187. Node: nodeObj,
  1188. Managers: d.getManagers(),
  1189. NetworkBootstrapKeys: d.getNetworkBootstrapKeys(),
  1190. }); err != nil {
  1191. return err
  1192. }
  1193. managerUpdates, mgrCancel := d.mgrQueue.Watch()
  1194. defer mgrCancel()
  1195. keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch()
  1196. defer keyMgrCancel()
  1197. // disconnectNode is a helper forcibly shutdown connection
  1198. disconnectNode := func() error {
  1199. // force disconnect by shutting down the stream.
  1200. transportStream, ok := transport.StreamFromContext(stream.Context())
  1201. if ok {
  1202. // if we have the transport stream, we can signal a disconnect
  1203. // in the client.
  1204. if err := transportStream.ServerTransport().Close(); err != nil {
  1205. log.WithError(err).Error("session end")
  1206. }
  1207. }
  1208. if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil {
  1209. log.WithError(err).Error("failed to remove node")
  1210. }
  1211. // still return an abort if the transport closure was ineffective.
  1212. return grpc.Errorf(codes.Aborted, "node must disconnect")
  1213. }
  1214. for {
  1215. // After each message send, we need to check the nodes sessionID hasn't
  1216. // changed. If it has, we will shut down the stream and make the node
  1217. // re-register.
  1218. node, err := d.nodes.GetWithSession(nodeID, sessionID)
  1219. if err != nil {
  1220. return err
  1221. }
  1222. var (
  1223. disconnect bool
  1224. mgrs []*api.WeightedPeer
  1225. netKeys []*api.EncryptionKey
  1226. )
  1227. select {
  1228. case ev := <-managerUpdates:
  1229. mgrs = ev.([]*api.WeightedPeer)
  1230. case ev := <-nodeUpdates:
  1231. nodeObj = ev.(state.EventUpdateNode).Node
  1232. case <-stream.Context().Done():
  1233. return stream.Context().Err()
  1234. case <-node.Disconnect:
  1235. disconnect = true
  1236. case <-d.ctx.Done():
  1237. disconnect = true
  1238. case ev := <-keyMgrUpdates:
  1239. netKeys = ev.([]*api.EncryptionKey)
  1240. }
  1241. if mgrs == nil {
  1242. mgrs = d.getManagers()
  1243. }
  1244. if netKeys == nil {
  1245. netKeys = d.getNetworkBootstrapKeys()
  1246. }
  1247. if err := stream.Send(&api.SessionMessage{
  1248. SessionID: sessionID,
  1249. Node: nodeObj,
  1250. Managers: mgrs,
  1251. NetworkBootstrapKeys: netKeys,
  1252. }); err != nil {
  1253. return err
  1254. }
  1255. if disconnect {
  1256. return disconnectNode()
  1257. }
  1258. }
  1259. }