dispatcher.go 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250
  1. package dispatcher
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/transport"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/go-events"
  13. "github.com/docker/swarmkit/api"
  14. "github.com/docker/swarmkit/api/equality"
  15. "github.com/docker/swarmkit/ca"
  16. "github.com/docker/swarmkit/log"
  17. "github.com/docker/swarmkit/manager/state/store"
  18. "github.com/docker/swarmkit/remotes"
  19. "github.com/docker/swarmkit/watch"
  20. gogotypes "github.com/gogo/protobuf/types"
  21. "github.com/pkg/errors"
  22. "golang.org/x/net/context"
  23. )
  24. const (
  25. // DefaultHeartBeatPeriod is used for setting default value in cluster config
  26. // and in case if cluster config is missing.
  27. DefaultHeartBeatPeriod = 5 * time.Second
  28. defaultHeartBeatEpsilon = 500 * time.Millisecond
  29. defaultGracePeriodMultiplier = 3
  30. defaultRateLimitPeriod = 8 * time.Second
  31. // maxBatchItems is the threshold of queued writes that should
  32. // trigger an actual transaction to commit them to the shared store.
  33. maxBatchItems = 10000
  34. // maxBatchInterval needs to strike a balance between keeping
  35. // latency low, and realizing opportunities to combine many writes
  36. // into a single transaction. A fraction of a second feels about
  37. // right.
  38. maxBatchInterval = 100 * time.Millisecond
  39. modificationBatchLimit = 100
  40. batchingWaitTime = 100 * time.Millisecond
  41. // defaultNodeDownPeriod specifies the default time period we
  42. // wait before moving tasks assigned to down nodes to ORPHANED
  43. // state.
  44. defaultNodeDownPeriod = 24 * time.Hour
  45. )
  46. var (
  47. // ErrNodeAlreadyRegistered returned if node with same ID was already
  48. // registered with this dispatcher.
  49. ErrNodeAlreadyRegistered = errors.New("node already registered")
  50. // ErrNodeNotRegistered returned if node with such ID wasn't registered
  51. // with this dispatcher.
  52. ErrNodeNotRegistered = errors.New("node not registered")
  53. // ErrSessionInvalid returned when the session in use is no longer valid.
  54. // The node should re-register and start a new session.
  55. ErrSessionInvalid = errors.New("session invalid")
  56. // ErrNodeNotFound returned when the Node doesn't exist in raft.
  57. ErrNodeNotFound = errors.New("node not found")
  58. )
  59. // Config is configuration for Dispatcher. For default you should use
  60. // DefaultConfig.
  61. type Config struct {
  62. HeartbeatPeriod time.Duration
  63. HeartbeatEpsilon time.Duration
  64. // RateLimitPeriod specifies how often node with same ID can try to register
  65. // new session.
  66. RateLimitPeriod time.Duration
  67. GracePeriodMultiplier int
  68. }
  69. // DefaultConfig returns default config for Dispatcher.
  70. func DefaultConfig() *Config {
  71. return &Config{
  72. HeartbeatPeriod: DefaultHeartBeatPeriod,
  73. HeartbeatEpsilon: defaultHeartBeatEpsilon,
  74. RateLimitPeriod: defaultRateLimitPeriod,
  75. GracePeriodMultiplier: defaultGracePeriodMultiplier,
  76. }
  77. }
  78. // Cluster is interface which represent raft cluster. manager/state/raft.Node
  79. // is implements it. This interface needed only for easier unit-testing.
  80. type Cluster interface {
  81. GetMemberlist() map[uint64]*api.RaftMember
  82. SubscribePeers() (chan events.Event, func())
  83. MemoryStore() *store.MemoryStore
  84. }
  85. // nodeUpdate provides a new status and/or description to apply to a node
  86. // object.
  87. type nodeUpdate struct {
  88. status *api.NodeStatus
  89. description *api.NodeDescription
  90. }
  91. // clusterUpdate is an object that stores an update to the cluster that should trigger
  92. // a new session message. These are pointers to indicate the difference between
  93. // "there is no update" and "update this to nil"
  94. type clusterUpdate struct {
  95. managerUpdate *[]*api.WeightedPeer
  96. bootstrapKeyUpdate *[]*api.EncryptionKey
  97. rootCAUpdate *[]byte
  98. }
  99. // Dispatcher is responsible for dispatching tasks and tracking agent health.
  100. type Dispatcher struct {
  101. mu sync.Mutex
  102. wg sync.WaitGroup
  103. nodes *nodeStore
  104. store *store.MemoryStore
  105. lastSeenManagers []*api.WeightedPeer
  106. networkBootstrapKeys []*api.EncryptionKey
  107. lastSeenRootCert []byte
  108. config *Config
  109. cluster Cluster
  110. ctx context.Context
  111. cancel context.CancelFunc
  112. clusterUpdateQueue *watch.Queue
  113. taskUpdates map[string]*api.TaskStatus // indexed by task ID
  114. taskUpdatesLock sync.Mutex
  115. nodeUpdates map[string]nodeUpdate // indexed by node ID
  116. nodeUpdatesLock sync.Mutex
  117. downNodes *nodeStore
  118. processUpdatesTrigger chan struct{}
  119. // for waiting for the next task/node batch update
  120. processUpdatesLock sync.Mutex
  121. processUpdatesCond *sync.Cond
  122. }
  123. // New returns Dispatcher with cluster interface(usually raft.Node).
  124. func New(cluster Cluster, c *Config) *Dispatcher {
  125. d := &Dispatcher{
  126. nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.RateLimitPeriod),
  127. downNodes: newNodeStore(defaultNodeDownPeriod, 0, 1, 0),
  128. store: cluster.MemoryStore(),
  129. cluster: cluster,
  130. processUpdatesTrigger: make(chan struct{}, 1),
  131. config: c,
  132. }
  133. d.processUpdatesCond = sync.NewCond(&d.processUpdatesLock)
  134. return d
  135. }
  136. func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
  137. members := cluster.GetMemberlist()
  138. var mgrs []*api.WeightedPeer
  139. for _, m := range members {
  140. mgrs = append(mgrs, &api.WeightedPeer{
  141. Peer: &api.Peer{
  142. NodeID: m.NodeID,
  143. Addr: m.Addr,
  144. },
  145. // TODO(stevvooe): Calculate weight of manager selection based on
  146. // cluster-level observations, such as number of connections and
  147. // load.
  148. Weight: remotes.DefaultObservationWeight,
  149. })
  150. }
  151. return mgrs
  152. }
  153. // Run runs dispatcher tasks which should be run on leader dispatcher.
  154. // Dispatcher can be stopped with cancelling ctx or calling Stop().
  155. func (d *Dispatcher) Run(ctx context.Context) error {
  156. d.taskUpdatesLock.Lock()
  157. d.taskUpdates = make(map[string]*api.TaskStatus)
  158. d.taskUpdatesLock.Unlock()
  159. d.nodeUpdatesLock.Lock()
  160. d.nodeUpdates = make(map[string]nodeUpdate)
  161. d.nodeUpdatesLock.Unlock()
  162. d.mu.Lock()
  163. if d.isRunning() {
  164. d.mu.Unlock()
  165. return errors.New("dispatcher is already running")
  166. }
  167. ctx = log.WithModule(ctx, "dispatcher")
  168. if err := d.markNodesUnknown(ctx); err != nil {
  169. log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err)
  170. }
  171. configWatcher, cancel, err := store.ViewAndWatch(
  172. d.store,
  173. func(readTx store.ReadTx) error {
  174. clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
  175. if err != nil {
  176. return err
  177. }
  178. if err == nil && len(clusters) == 1 {
  179. heartbeatPeriod, err := gogotypes.DurationFromProto(clusters[0].Spec.Dispatcher.HeartbeatPeriod)
  180. if err == nil && heartbeatPeriod > 0 {
  181. d.config.HeartbeatPeriod = heartbeatPeriod
  182. }
  183. if clusters[0].NetworkBootstrapKeys != nil {
  184. d.networkBootstrapKeys = clusters[0].NetworkBootstrapKeys
  185. }
  186. d.lastSeenRootCert = clusters[0].RootCA.CACert
  187. }
  188. return nil
  189. },
  190. api.EventUpdateCluster{},
  191. )
  192. if err != nil {
  193. d.mu.Unlock()
  194. return err
  195. }
  196. // set queue here to guarantee that Close will close it
  197. d.clusterUpdateQueue = watch.NewQueue()
  198. peerWatcher, peerCancel := d.cluster.SubscribePeers()
  199. defer peerCancel()
  200. d.lastSeenManagers = getWeightedPeers(d.cluster)
  201. defer cancel()
  202. d.ctx, d.cancel = context.WithCancel(ctx)
  203. ctx = d.ctx
  204. d.wg.Add(1)
  205. defer d.wg.Done()
  206. d.mu.Unlock()
  207. publishManagers := func(peers []*api.Peer) {
  208. var mgrs []*api.WeightedPeer
  209. for _, p := range peers {
  210. mgrs = append(mgrs, &api.WeightedPeer{
  211. Peer: p,
  212. Weight: remotes.DefaultObservationWeight,
  213. })
  214. }
  215. d.mu.Lock()
  216. d.lastSeenManagers = mgrs
  217. d.mu.Unlock()
  218. d.clusterUpdateQueue.Publish(clusterUpdate{managerUpdate: &mgrs})
  219. }
  220. batchTimer := time.NewTimer(maxBatchInterval)
  221. defer batchTimer.Stop()
  222. for {
  223. select {
  224. case ev := <-peerWatcher:
  225. publishManagers(ev.([]*api.Peer))
  226. case <-d.processUpdatesTrigger:
  227. d.processUpdates(ctx)
  228. batchTimer.Reset(maxBatchInterval)
  229. case <-batchTimer.C:
  230. d.processUpdates(ctx)
  231. batchTimer.Reset(maxBatchInterval)
  232. case v := <-configWatcher:
  233. cluster := v.(api.EventUpdateCluster)
  234. d.mu.Lock()
  235. if cluster.Cluster.Spec.Dispatcher.HeartbeatPeriod != nil {
  236. // ignore error, since Spec has passed validation before
  237. heartbeatPeriod, _ := gogotypes.DurationFromProto(cluster.Cluster.Spec.Dispatcher.HeartbeatPeriod)
  238. if heartbeatPeriod != d.config.HeartbeatPeriod {
  239. // only call d.nodes.updatePeriod when heartbeatPeriod changes
  240. d.config.HeartbeatPeriod = heartbeatPeriod
  241. d.nodes.updatePeriod(d.config.HeartbeatPeriod, d.config.HeartbeatEpsilon, d.config.GracePeriodMultiplier)
  242. }
  243. }
  244. d.lastSeenRootCert = cluster.Cluster.RootCA.CACert
  245. d.networkBootstrapKeys = cluster.Cluster.NetworkBootstrapKeys
  246. d.mu.Unlock()
  247. d.clusterUpdateQueue.Publish(clusterUpdate{
  248. bootstrapKeyUpdate: &cluster.Cluster.NetworkBootstrapKeys,
  249. rootCAUpdate: &cluster.Cluster.RootCA.CACert,
  250. })
  251. case <-ctx.Done():
  252. return nil
  253. }
  254. }
  255. }
  256. // Stop stops dispatcher and closes all grpc streams.
  257. func (d *Dispatcher) Stop() error {
  258. d.mu.Lock()
  259. if !d.isRunning() {
  260. d.mu.Unlock()
  261. return errors.New("dispatcher is already stopped")
  262. }
  263. d.cancel()
  264. d.mu.Unlock()
  265. d.nodes.Clean()
  266. d.processUpdatesLock.Lock()
  267. // In case there are any waiters. There is no chance of any starting
  268. // after this point, because they check if the context is canceled
  269. // before waiting.
  270. d.processUpdatesCond.Broadcast()
  271. d.processUpdatesLock.Unlock()
  272. d.clusterUpdateQueue.Close()
  273. d.wg.Wait()
  274. return nil
  275. }
  276. func (d *Dispatcher) isRunningLocked() (context.Context, error) {
  277. d.mu.Lock()
  278. if !d.isRunning() {
  279. d.mu.Unlock()
  280. return nil, grpc.Errorf(codes.Aborted, "dispatcher is stopped")
  281. }
  282. ctx := d.ctx
  283. d.mu.Unlock()
  284. return ctx, nil
  285. }
  286. func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
  287. log := log.G(ctx).WithField("method", "(*Dispatcher).markNodesUnknown")
  288. var nodes []*api.Node
  289. var err error
  290. d.store.View(func(tx store.ReadTx) {
  291. nodes, err = store.FindNodes(tx, store.All)
  292. })
  293. if err != nil {
  294. return errors.Wrap(err, "failed to get list of nodes")
  295. }
  296. err = d.store.Batch(func(batch *store.Batch) error {
  297. for _, n := range nodes {
  298. err := batch.Update(func(tx store.Tx) error {
  299. // check if node is still here
  300. node := store.GetNode(tx, n.ID)
  301. if node == nil {
  302. return nil
  303. }
  304. // do not try to resurrect down nodes
  305. if node.Status.State == api.NodeStatus_DOWN {
  306. nodeCopy := node
  307. expireFunc := func() {
  308. if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil {
  309. log.WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
  310. }
  311. d.downNodes.Delete(nodeCopy.ID)
  312. }
  313. d.downNodes.Add(nodeCopy, expireFunc)
  314. return nil
  315. }
  316. node.Status.State = api.NodeStatus_UNKNOWN
  317. node.Status.Message = `Node moved to "unknown" state due to leadership change in cluster`
  318. nodeID := node.ID
  319. expireFunc := func() {
  320. log := log.WithField("node", nodeID)
  321. log.Debug("heartbeat expiration for unknown node")
  322. if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil {
  323. log.WithError(err).Error(`failed deregistering node after heartbeat expiration for node in "unknown" state`)
  324. }
  325. }
  326. if err := d.nodes.AddUnknown(node, expireFunc); err != nil {
  327. return errors.Wrap(err, `adding node in "unknown" state to node store failed`)
  328. }
  329. if err := store.UpdateNode(tx, node); err != nil {
  330. return errors.Wrap(err, "update failed")
  331. }
  332. return nil
  333. })
  334. if err != nil {
  335. log.WithField("node", n.ID).WithError(err).Error(`failed to move node to "unknown" state`)
  336. }
  337. }
  338. return nil
  339. })
  340. return err
  341. }
  342. func (d *Dispatcher) isRunning() bool {
  343. if d.ctx == nil {
  344. return false
  345. }
  346. select {
  347. case <-d.ctx.Done():
  348. return false
  349. default:
  350. }
  351. return true
  352. }
  353. // markNodeReady updates the description of a node, updates its address, and sets status to READY
  354. // this is used during registration when a new node description is provided
  355. // and during node updates when the node description changes
  356. func (d *Dispatcher) markNodeReady(ctx context.Context, nodeID string, description *api.NodeDescription, addr string) error {
  357. d.nodeUpdatesLock.Lock()
  358. d.nodeUpdates[nodeID] = nodeUpdate{
  359. status: &api.NodeStatus{
  360. State: api.NodeStatus_READY,
  361. Addr: addr,
  362. },
  363. description: description,
  364. }
  365. numUpdates := len(d.nodeUpdates)
  366. d.nodeUpdatesLock.Unlock()
  367. // Node is marked ready. Remove the node from down nodes if it
  368. // is there.
  369. d.downNodes.Delete(nodeID)
  370. if numUpdates >= maxBatchItems {
  371. select {
  372. case d.processUpdatesTrigger <- struct{}{}:
  373. case <-ctx.Done():
  374. return ctx.Err()
  375. }
  376. }
  377. // Wait until the node update batch happens before unblocking register.
  378. d.processUpdatesLock.Lock()
  379. defer d.processUpdatesLock.Unlock()
  380. select {
  381. case <-ctx.Done():
  382. return ctx.Err()
  383. default:
  384. }
  385. d.processUpdatesCond.Wait()
  386. return nil
  387. }
  388. // gets the node IP from the context of a grpc call
  389. func nodeIPFromContext(ctx context.Context) (string, error) {
  390. nodeInfo, err := ca.RemoteNode(ctx)
  391. if err != nil {
  392. return "", err
  393. }
  394. addr, _, err := net.SplitHostPort(nodeInfo.RemoteAddr)
  395. if err != nil {
  396. return "", errors.Wrap(err, "unable to get ip from addr:port")
  397. }
  398. return addr, nil
  399. }
  400. // register is used for registration of node with particular dispatcher.
  401. func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
  402. // prevent register until we're ready to accept it
  403. dctx, err := d.isRunningLocked()
  404. if err != nil {
  405. return "", err
  406. }
  407. if err := d.nodes.CheckRateLimit(nodeID); err != nil {
  408. return "", err
  409. }
  410. // TODO(stevvooe): Validate node specification.
  411. var node *api.Node
  412. d.store.View(func(tx store.ReadTx) {
  413. node = store.GetNode(tx, nodeID)
  414. })
  415. if node == nil {
  416. return "", ErrNodeNotFound
  417. }
  418. addr, err := nodeIPFromContext(ctx)
  419. if err != nil {
  420. log.G(ctx).Debug(err.Error())
  421. }
  422. if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil {
  423. return "", err
  424. }
  425. expireFunc := func() {
  426. log.G(ctx).Debugf("heartbeat expiration")
  427. if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil {
  428. log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration")
  429. }
  430. }
  431. rn := d.nodes.Add(node, expireFunc)
  432. // NOTE(stevvooe): We need be a little careful with re-registration. The
  433. // current implementation just matches the node id and then gives away the
  434. // sessionID. If we ever want to use sessionID as a secret, which we may
  435. // want to, this is giving away the keys to the kitchen.
  436. //
  437. // The right behavior is going to be informed by identity. Basically, each
  438. // time a node registers, we invalidate the session and issue a new
  439. // session, once identity is proven. This will cause misbehaved agents to
  440. // be kicked when multiple connections are made.
  441. return rn.SessionID, nil
  442. }
  443. // UpdateTaskStatus updates status of task. Node should send such updates
  444. // on every status change of its tasks.
  445. func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStatusRequest) (*api.UpdateTaskStatusResponse, error) {
  446. nodeInfo, err := ca.RemoteNode(ctx)
  447. if err != nil {
  448. return nil, err
  449. }
  450. nodeID := nodeInfo.NodeID
  451. fields := logrus.Fields{
  452. "node.id": nodeID,
  453. "node.session": r.SessionID,
  454. "method": "(*Dispatcher).UpdateTaskStatus",
  455. }
  456. if nodeInfo.ForwardedBy != nil {
  457. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  458. }
  459. log := log.G(ctx).WithFields(fields)
  460. dctx, err := d.isRunningLocked()
  461. if err != nil {
  462. return nil, err
  463. }
  464. if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  465. return nil, err
  466. }
  467. validTaskUpdates := make([]*api.UpdateTaskStatusRequest_TaskStatusUpdate, 0, len(r.Updates))
  468. // Validate task updates
  469. for _, u := range r.Updates {
  470. if u.Status == nil {
  471. log.WithField("task.id", u.TaskID).Warn("task report has nil status")
  472. continue
  473. }
  474. var t *api.Task
  475. d.store.View(func(tx store.ReadTx) {
  476. t = store.GetTask(tx, u.TaskID)
  477. })
  478. if t == nil {
  479. // Task may have been deleted
  480. log.WithField("task.id", u.TaskID).Debug("cannot find target task in store")
  481. continue
  482. }
  483. if t.NodeID != nodeID {
  484. err := grpc.Errorf(codes.PermissionDenied, "cannot update a task not assigned this node")
  485. log.WithField("task.id", u.TaskID).Error(err)
  486. return nil, err
  487. }
  488. validTaskUpdates = append(validTaskUpdates, u)
  489. }
  490. d.taskUpdatesLock.Lock()
  491. // Enqueue task updates
  492. for _, u := range validTaskUpdates {
  493. d.taskUpdates[u.TaskID] = u.Status
  494. }
  495. numUpdates := len(d.taskUpdates)
  496. d.taskUpdatesLock.Unlock()
  497. if numUpdates >= maxBatchItems {
  498. select {
  499. case d.processUpdatesTrigger <- struct{}{}:
  500. case <-dctx.Done():
  501. }
  502. }
  503. return nil, nil
  504. }
  505. func (d *Dispatcher) processUpdates(ctx context.Context) {
  506. var (
  507. taskUpdates map[string]*api.TaskStatus
  508. nodeUpdates map[string]nodeUpdate
  509. )
  510. d.taskUpdatesLock.Lock()
  511. if len(d.taskUpdates) != 0 {
  512. taskUpdates = d.taskUpdates
  513. d.taskUpdates = make(map[string]*api.TaskStatus)
  514. }
  515. d.taskUpdatesLock.Unlock()
  516. d.nodeUpdatesLock.Lock()
  517. if len(d.nodeUpdates) != 0 {
  518. nodeUpdates = d.nodeUpdates
  519. d.nodeUpdates = make(map[string]nodeUpdate)
  520. }
  521. d.nodeUpdatesLock.Unlock()
  522. if len(taskUpdates) == 0 && len(nodeUpdates) == 0 {
  523. return
  524. }
  525. log := log.G(ctx).WithFields(logrus.Fields{
  526. "method": "(*Dispatcher).processUpdates",
  527. })
  528. err := d.store.Batch(func(batch *store.Batch) error {
  529. for taskID, status := range taskUpdates {
  530. err := batch.Update(func(tx store.Tx) error {
  531. logger := log.WithField("task.id", taskID)
  532. task := store.GetTask(tx, taskID)
  533. if task == nil {
  534. // Task may have been deleted
  535. logger.Debug("cannot find target task in store")
  536. return nil
  537. }
  538. logger = logger.WithField("state.transition", fmt.Sprintf("%v->%v", task.Status.State, status.State))
  539. if task.Status == *status {
  540. logger.Debug("task status identical, ignoring")
  541. return nil
  542. }
  543. if task.Status.State > status.State {
  544. logger.Debug("task status invalid transition")
  545. return nil
  546. }
  547. task.Status = *status
  548. if err := store.UpdateTask(tx, task); err != nil {
  549. logger.WithError(err).Error("failed to update task status")
  550. return nil
  551. }
  552. logger.Debug("task status updated")
  553. return nil
  554. })
  555. if err != nil {
  556. log.WithError(err).Error("dispatcher task update transaction failed")
  557. }
  558. }
  559. for nodeID, nodeUpdate := range nodeUpdates {
  560. err := batch.Update(func(tx store.Tx) error {
  561. logger := log.WithField("node.id", nodeID)
  562. node := store.GetNode(tx, nodeID)
  563. if node == nil {
  564. logger.Errorf("node unavailable")
  565. return nil
  566. }
  567. if nodeUpdate.status != nil {
  568. node.Status.State = nodeUpdate.status.State
  569. node.Status.Message = nodeUpdate.status.Message
  570. if nodeUpdate.status.Addr != "" {
  571. node.Status.Addr = nodeUpdate.status.Addr
  572. }
  573. }
  574. if nodeUpdate.description != nil {
  575. node.Description = nodeUpdate.description
  576. }
  577. if err := store.UpdateNode(tx, node); err != nil {
  578. logger.WithError(err).Error("failed to update node status")
  579. return nil
  580. }
  581. logger.Debug("node status updated")
  582. return nil
  583. })
  584. if err != nil {
  585. log.WithError(err).Error("dispatcher node update transaction failed")
  586. }
  587. }
  588. return nil
  589. })
  590. if err != nil {
  591. log.WithError(err).Error("dispatcher batch failed")
  592. }
  593. d.processUpdatesCond.Broadcast()
  594. }
  595. // Tasks is a stream of tasks state for node. Each message contains full list
  596. // of tasks which should be run on node, if task is not present in that list,
  597. // it should be terminated.
  598. func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServer) error {
  599. nodeInfo, err := ca.RemoteNode(stream.Context())
  600. if err != nil {
  601. return err
  602. }
  603. nodeID := nodeInfo.NodeID
  604. dctx, err := d.isRunningLocked()
  605. if err != nil {
  606. return err
  607. }
  608. fields := logrus.Fields{
  609. "node.id": nodeID,
  610. "node.session": r.SessionID,
  611. "method": "(*Dispatcher).Tasks",
  612. }
  613. if nodeInfo.ForwardedBy != nil {
  614. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  615. }
  616. log.G(stream.Context()).WithFields(fields).Debugf("")
  617. if _, err = d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  618. return err
  619. }
  620. tasksMap := make(map[string]*api.Task)
  621. nodeTasks, cancel, err := store.ViewAndWatch(
  622. d.store,
  623. func(readTx store.ReadTx) error {
  624. tasks, err := store.FindTasks(readTx, store.ByNodeID(nodeID))
  625. if err != nil {
  626. return err
  627. }
  628. for _, t := range tasks {
  629. tasksMap[t.ID] = t
  630. }
  631. return nil
  632. },
  633. api.EventCreateTask{Task: &api.Task{NodeID: nodeID},
  634. Checks: []api.TaskCheckFunc{api.TaskCheckNodeID}},
  635. api.EventUpdateTask{Task: &api.Task{NodeID: nodeID},
  636. Checks: []api.TaskCheckFunc{api.TaskCheckNodeID}},
  637. api.EventDeleteTask{Task: &api.Task{NodeID: nodeID},
  638. Checks: []api.TaskCheckFunc{api.TaskCheckNodeID}},
  639. )
  640. if err != nil {
  641. return err
  642. }
  643. defer cancel()
  644. for {
  645. if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  646. return err
  647. }
  648. var tasks []*api.Task
  649. for _, t := range tasksMap {
  650. // dispatcher only sends tasks that have been assigned to a node
  651. if t != nil && t.Status.State >= api.TaskStateAssigned {
  652. tasks = append(tasks, t)
  653. }
  654. }
  655. if err := stream.Send(&api.TasksMessage{Tasks: tasks}); err != nil {
  656. return err
  657. }
  658. // bursty events should be processed in batches and sent out snapshot
  659. var (
  660. modificationCnt int
  661. batchingTimer *time.Timer
  662. batchingTimeout <-chan time.Time
  663. )
  664. batchingLoop:
  665. for modificationCnt < modificationBatchLimit {
  666. select {
  667. case event := <-nodeTasks:
  668. switch v := event.(type) {
  669. case api.EventCreateTask:
  670. tasksMap[v.Task.ID] = v.Task
  671. modificationCnt++
  672. case api.EventUpdateTask:
  673. if oldTask, exists := tasksMap[v.Task.ID]; exists {
  674. // States ASSIGNED and below are set by the orchestrator/scheduler,
  675. // not the agent, so tasks in these states need to be sent to the
  676. // agent even if nothing else has changed.
  677. if equality.TasksEqualStable(oldTask, v.Task) && v.Task.Status.State > api.TaskStateAssigned {
  678. // this update should not trigger action at agent
  679. tasksMap[v.Task.ID] = v.Task
  680. continue
  681. }
  682. }
  683. tasksMap[v.Task.ID] = v.Task
  684. modificationCnt++
  685. case api.EventDeleteTask:
  686. delete(tasksMap, v.Task.ID)
  687. modificationCnt++
  688. }
  689. if batchingTimer != nil {
  690. batchingTimer.Reset(batchingWaitTime)
  691. } else {
  692. batchingTimer = time.NewTimer(batchingWaitTime)
  693. batchingTimeout = batchingTimer.C
  694. }
  695. case <-batchingTimeout:
  696. break batchingLoop
  697. case <-stream.Context().Done():
  698. return stream.Context().Err()
  699. case <-dctx.Done():
  700. return dctx.Err()
  701. }
  702. }
  703. if batchingTimer != nil {
  704. batchingTimer.Stop()
  705. }
  706. }
  707. }
  708. // Assignments is a stream of assignments for a node. Each message contains
  709. // either full list of tasks and secrets for the node, or an incremental update.
  710. func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatcher_AssignmentsServer) error {
  711. nodeInfo, err := ca.RemoteNode(stream.Context())
  712. if err != nil {
  713. return err
  714. }
  715. nodeID := nodeInfo.NodeID
  716. dctx, err := d.isRunningLocked()
  717. if err != nil {
  718. return err
  719. }
  720. fields := logrus.Fields{
  721. "node.id": nodeID,
  722. "node.session": r.SessionID,
  723. "method": "(*Dispatcher).Assignments",
  724. }
  725. if nodeInfo.ForwardedBy != nil {
  726. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  727. }
  728. log := log.G(stream.Context()).WithFields(fields)
  729. log.Debugf("")
  730. if _, err = d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  731. return err
  732. }
  733. var (
  734. sequence int64
  735. appliesTo string
  736. assignments = newAssignmentSet(log)
  737. )
  738. sendMessage := func(msg api.AssignmentsMessage, assignmentType api.AssignmentsMessage_Type) error {
  739. sequence++
  740. msg.AppliesTo = appliesTo
  741. msg.ResultsIn = strconv.FormatInt(sequence, 10)
  742. appliesTo = msg.ResultsIn
  743. msg.Type = assignmentType
  744. if err := stream.Send(&msg); err != nil {
  745. return err
  746. }
  747. return nil
  748. }
  749. // TODO(aaronl): Also send node secrets that should be exposed to
  750. // this node.
  751. nodeTasks, cancel, err := store.ViewAndWatch(
  752. d.store,
  753. func(readTx store.ReadTx) error {
  754. tasks, err := store.FindTasks(readTx, store.ByNodeID(nodeID))
  755. if err != nil {
  756. return err
  757. }
  758. for _, t := range tasks {
  759. assignments.addOrUpdateTask(readTx, t)
  760. }
  761. return nil
  762. },
  763. api.EventUpdateTask{Task: &api.Task{NodeID: nodeID},
  764. Checks: []api.TaskCheckFunc{api.TaskCheckNodeID}},
  765. api.EventDeleteTask{Task: &api.Task{NodeID: nodeID},
  766. Checks: []api.TaskCheckFunc{api.TaskCheckNodeID}},
  767. )
  768. if err != nil {
  769. return err
  770. }
  771. defer cancel()
  772. if err := sendMessage(assignments.message(), api.AssignmentsMessage_COMPLETE); err != nil {
  773. return err
  774. }
  775. for {
  776. // Check for session expiration
  777. if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  778. return err
  779. }
  780. // bursty events should be processed in batches and sent out together
  781. var (
  782. modificationCnt int
  783. batchingTimer *time.Timer
  784. batchingTimeout <-chan time.Time
  785. )
  786. oneModification := func() {
  787. modificationCnt++
  788. if batchingTimer != nil {
  789. batchingTimer.Reset(batchingWaitTime)
  790. } else {
  791. batchingTimer = time.NewTimer(batchingWaitTime)
  792. batchingTimeout = batchingTimer.C
  793. }
  794. }
  795. // The batching loop waits for 50 ms after the most recent
  796. // change, or until modificationBatchLimit is reached. The
  797. // worst case latency is modificationBatchLimit * batchingWaitTime,
  798. // which is 10 seconds.
  799. batchingLoop:
  800. for modificationCnt < modificationBatchLimit {
  801. select {
  802. case event := <-nodeTasks:
  803. switch v := event.(type) {
  804. // We don't monitor EventCreateTask because tasks are
  805. // never created in the ASSIGNED state. First tasks are
  806. // created by the orchestrator, then the scheduler moves
  807. // them to ASSIGNED. If this ever changes, we will need
  808. // to monitor task creations as well.
  809. case api.EventUpdateTask:
  810. d.store.View(func(readTx store.ReadTx) {
  811. if assignments.addOrUpdateTask(readTx, v.Task) {
  812. oneModification()
  813. }
  814. })
  815. case api.EventDeleteTask:
  816. if assignments.removeTask(v.Task) {
  817. oneModification()
  818. }
  819. // TODO(aaronl): For node secrets, we'll need to handle
  820. // EventCreateSecret.
  821. }
  822. case <-batchingTimeout:
  823. break batchingLoop
  824. case <-stream.Context().Done():
  825. return stream.Context().Err()
  826. case <-dctx.Done():
  827. return dctx.Err()
  828. }
  829. }
  830. if batchingTimer != nil {
  831. batchingTimer.Stop()
  832. }
  833. if modificationCnt > 0 {
  834. if err := sendMessage(assignments.message(), api.AssignmentsMessage_INCREMENTAL); err != nil {
  835. return err
  836. }
  837. }
  838. }
  839. }
  840. func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error {
  841. err := d.store.Batch(func(batch *store.Batch) error {
  842. var (
  843. tasks []*api.Task
  844. err error
  845. )
  846. d.store.View(func(tx store.ReadTx) {
  847. tasks, err = store.FindTasks(tx, store.ByNodeID(nodeID))
  848. })
  849. if err != nil {
  850. return err
  851. }
  852. for _, task := range tasks {
  853. // Tasks running on an unreachable node need to be marked as
  854. // orphaned since we have no idea whether the task is still running
  855. // or not.
  856. //
  857. // This only applies for tasks that could have made progress since
  858. // the agent became unreachable (assigned<->running)
  859. //
  860. // Tasks in a final state (e.g. rejected) *cannot* have made
  861. // progress, therefore there's no point in marking them as orphaned
  862. if task.Status.State >= api.TaskStateAssigned && task.Status.State <= api.TaskStateRunning {
  863. task.Status.State = api.TaskStateOrphaned
  864. }
  865. if err := batch.Update(func(tx store.Tx) error {
  866. err := store.UpdateTask(tx, task)
  867. if err != nil {
  868. return err
  869. }
  870. return nil
  871. }); err != nil {
  872. return err
  873. }
  874. }
  875. return nil
  876. })
  877. return err
  878. }
  879. // markNodeNotReady sets the node state to some state other than READY
  880. func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
  881. dctx, err := d.isRunningLocked()
  882. if err != nil {
  883. return err
  884. }
  885. // Node is down. Add it to down nodes so that we can keep
  886. // track of tasks assigned to the node.
  887. var node *api.Node
  888. d.store.View(func(readTx store.ReadTx) {
  889. node = store.GetNode(readTx, id)
  890. if node == nil {
  891. err = fmt.Errorf("could not find node %s while trying to add to down nodes store", id)
  892. }
  893. })
  894. if err != nil {
  895. return err
  896. }
  897. expireFunc := func() {
  898. if err := d.moveTasksToOrphaned(id); err != nil {
  899. log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
  900. }
  901. d.downNodes.Delete(id)
  902. }
  903. d.downNodes.Add(node, expireFunc)
  904. status := &api.NodeStatus{
  905. State: state,
  906. Message: message,
  907. }
  908. d.nodeUpdatesLock.Lock()
  909. // pluck the description out of nodeUpdates. this protects against a case
  910. // where a node is marked ready and a description is added, but then the
  911. // node is immediately marked not ready. this preserves that description
  912. d.nodeUpdates[id] = nodeUpdate{status: status, description: d.nodeUpdates[id].description}
  913. numUpdates := len(d.nodeUpdates)
  914. d.nodeUpdatesLock.Unlock()
  915. if numUpdates >= maxBatchItems {
  916. select {
  917. case d.processUpdatesTrigger <- struct{}{}:
  918. case <-dctx.Done():
  919. }
  920. }
  921. if rn := d.nodes.Delete(id); rn == nil {
  922. return errors.Errorf("node %s is not found in local storage", id)
  923. }
  924. return nil
  925. }
  926. // Heartbeat is heartbeat method for nodes. It returns new TTL in response.
  927. // Node should send new heartbeat earlier than now + TTL, otherwise it will
  928. // be deregistered from dispatcher and its status will be updated to NodeStatus_DOWN
  929. func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*api.HeartbeatResponse, error) {
  930. nodeInfo, err := ca.RemoteNode(ctx)
  931. if err != nil {
  932. return nil, err
  933. }
  934. period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID)
  935. return &api.HeartbeatResponse{Period: period}, err
  936. }
  937. func (d *Dispatcher) getManagers() []*api.WeightedPeer {
  938. d.mu.Lock()
  939. defer d.mu.Unlock()
  940. return d.lastSeenManagers
  941. }
  942. func (d *Dispatcher) getNetworkBootstrapKeys() []*api.EncryptionKey {
  943. d.mu.Lock()
  944. defer d.mu.Unlock()
  945. return d.networkBootstrapKeys
  946. }
  947. func (d *Dispatcher) getRootCACert() []byte {
  948. d.mu.Lock()
  949. defer d.mu.Unlock()
  950. return d.lastSeenRootCert
  951. }
  952. // Session is a stream which controls agent connection.
  953. // Each message contains list of backup Managers with weights. Also there is
  954. // a special boolean field Disconnect which if true indicates that node should
  955. // reconnect to another Manager immediately.
  956. func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
  957. ctx := stream.Context()
  958. nodeInfo, err := ca.RemoteNode(ctx)
  959. if err != nil {
  960. return err
  961. }
  962. nodeID := nodeInfo.NodeID
  963. dctx, err := d.isRunningLocked()
  964. if err != nil {
  965. return err
  966. }
  967. var sessionID string
  968. if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
  969. // register the node.
  970. sessionID, err = d.register(ctx, nodeID, r.Description)
  971. if err != nil {
  972. return err
  973. }
  974. } else {
  975. sessionID = r.SessionID
  976. // get the node IP addr
  977. addr, err := nodeIPFromContext(stream.Context())
  978. if err != nil {
  979. log.G(ctx).Debugf(err.Error())
  980. }
  981. // update the node description
  982. if err := d.markNodeReady(dctx, nodeID, r.Description, addr); err != nil {
  983. return err
  984. }
  985. }
  986. fields := logrus.Fields{
  987. "node.id": nodeID,
  988. "node.session": sessionID,
  989. "method": "(*Dispatcher).Session",
  990. }
  991. if nodeInfo.ForwardedBy != nil {
  992. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  993. }
  994. log := log.G(ctx).WithFields(fields)
  995. var nodeObj *api.Node
  996. nodeUpdates, cancel, err := store.ViewAndWatch(d.store, func(readTx store.ReadTx) error {
  997. nodeObj = store.GetNode(readTx, nodeID)
  998. return nil
  999. }, api.EventUpdateNode{Node: &api.Node{ID: nodeID},
  1000. Checks: []api.NodeCheckFunc{api.NodeCheckID}},
  1001. )
  1002. if cancel != nil {
  1003. defer cancel()
  1004. }
  1005. if err != nil {
  1006. log.WithError(err).Error("ViewAndWatch Node failed")
  1007. }
  1008. if _, err = d.nodes.GetWithSession(nodeID, sessionID); err != nil {
  1009. return err
  1010. }
  1011. clusterUpdatesCh, clusterCancel := d.clusterUpdateQueue.Watch()
  1012. defer clusterCancel()
  1013. if err := stream.Send(&api.SessionMessage{
  1014. SessionID: sessionID,
  1015. Node: nodeObj,
  1016. Managers: d.getManagers(),
  1017. NetworkBootstrapKeys: d.getNetworkBootstrapKeys(),
  1018. RootCA: d.getRootCACert(),
  1019. }); err != nil {
  1020. return err
  1021. }
  1022. // disconnectNode is a helper forcibly shutdown connection
  1023. disconnectNode := func() error {
  1024. // force disconnect by shutting down the stream.
  1025. transportStream, ok := transport.StreamFromContext(stream.Context())
  1026. if ok {
  1027. // if we have the transport stream, we can signal a disconnect
  1028. // in the client.
  1029. if err := transportStream.ServerTransport().Close(); err != nil {
  1030. log.WithError(err).Error("session end")
  1031. }
  1032. }
  1033. if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil {
  1034. log.WithError(err).Error("failed to remove node")
  1035. }
  1036. // still return an abort if the transport closure was ineffective.
  1037. return grpc.Errorf(codes.Aborted, "node must disconnect")
  1038. }
  1039. for {
  1040. // After each message send, we need to check the nodes sessionID hasn't
  1041. // changed. If it has, we will shut down the stream and make the node
  1042. // re-register.
  1043. node, err := d.nodes.GetWithSession(nodeID, sessionID)
  1044. if err != nil {
  1045. return err
  1046. }
  1047. var (
  1048. disconnect bool
  1049. mgrs []*api.WeightedPeer
  1050. netKeys []*api.EncryptionKey
  1051. rootCert []byte
  1052. )
  1053. select {
  1054. case ev := <-clusterUpdatesCh:
  1055. update := ev.(clusterUpdate)
  1056. if update.managerUpdate != nil {
  1057. mgrs = *update.managerUpdate
  1058. }
  1059. if update.bootstrapKeyUpdate != nil {
  1060. netKeys = *update.bootstrapKeyUpdate
  1061. }
  1062. if update.rootCAUpdate != nil {
  1063. rootCert = *update.rootCAUpdate
  1064. }
  1065. case ev := <-nodeUpdates:
  1066. nodeObj = ev.(api.EventUpdateNode).Node
  1067. case <-stream.Context().Done():
  1068. return stream.Context().Err()
  1069. case <-node.Disconnect:
  1070. disconnect = true
  1071. case <-dctx.Done():
  1072. disconnect = true
  1073. }
  1074. if mgrs == nil {
  1075. mgrs = d.getManagers()
  1076. }
  1077. if netKeys == nil {
  1078. netKeys = d.getNetworkBootstrapKeys()
  1079. }
  1080. if rootCert == nil {
  1081. rootCert = d.getRootCACert()
  1082. }
  1083. if err := stream.Send(&api.SessionMessage{
  1084. SessionID: sessionID,
  1085. Node: nodeObj,
  1086. Managers: mgrs,
  1087. NetworkBootstrapKeys: netKeys,
  1088. RootCA: rootCert,
  1089. }); err != nil {
  1090. return err
  1091. }
  1092. if disconnect {
  1093. return disconnectNode()
  1094. }
  1095. }
  1096. }