node.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854
  1. package node
  2. import (
  3. "crypto/tls"
  4. "encoding/json"
  5. "io/ioutil"
  6. "net"
  7. "os"
  8. "path/filepath"
  9. "reflect"
  10. "sort"
  11. "sync"
  12. "time"
  13. "github.com/Sirupsen/logrus"
  14. "github.com/boltdb/bolt"
  15. "github.com/docker/swarmkit/agent"
  16. "github.com/docker/swarmkit/agent/exec"
  17. "github.com/docker/swarmkit/api"
  18. "github.com/docker/swarmkit/ca"
  19. "github.com/docker/swarmkit/ioutils"
  20. "github.com/docker/swarmkit/log"
  21. "github.com/docker/swarmkit/manager"
  22. "github.com/docker/swarmkit/manager/encryption"
  23. "github.com/docker/swarmkit/manager/state/raft"
  24. "github.com/docker/swarmkit/remotes"
  25. "github.com/docker/swarmkit/xnet"
  26. "github.com/pkg/errors"
  27. "golang.org/x/net/context"
  28. "google.golang.org/grpc"
  29. "google.golang.org/grpc/credentials"
  30. )
  31. const stateFilename = "state.json"
  32. var (
  33. errNodeStarted = errors.New("node: already started")
  34. errNodeNotStarted = errors.New("node: not started")
  35. certDirectory = "certificates"
  36. // ErrInvalidUnlockKey is returned when we can't decrypt the TLS certificate
  37. ErrInvalidUnlockKey = errors.New("node is locked, and needs a valid unlock key")
  38. )
  39. // Config provides values for a Node.
  40. type Config struct {
  41. // Hostname is the name of host for agent instance.
  42. Hostname string
  43. // JoinAddr specifies node that should be used for the initial connection to
  44. // other manager in cluster. This should be only one address and optional,
  45. // the actual remotes come from the stored state.
  46. JoinAddr string
  47. // StateDir specifies the directory the node uses to keep the state of the
  48. // remote managers and certificates.
  49. StateDir string
  50. // JoinToken is the token to be used on the first certificate request.
  51. JoinToken string
  52. // ExternalCAs is a list of CAs to which a manager node
  53. // will make certificate signing requests for node certificates.
  54. ExternalCAs []*api.ExternalCA
  55. // ForceNewCluster creates a new cluster from current raft state.
  56. ForceNewCluster bool
  57. // ListenControlAPI specifies address the control API should listen on.
  58. ListenControlAPI string
  59. // ListenRemoteAPI specifies the address for the remote API that agents
  60. // and raft members connect to.
  61. ListenRemoteAPI string
  62. // AdvertiseRemoteAPI specifies the address that should be advertised
  63. // for connections to the remote API (including the raft service).
  64. AdvertiseRemoteAPI string
  65. // Executor specifies the executor to use for the agent.
  66. Executor exec.Executor
  67. // ElectionTick defines the amount of ticks needed without
  68. // leader to trigger a new election
  69. ElectionTick uint32
  70. // HeartbeatTick defines the amount of ticks between each
  71. // heartbeat sent to other members for health-check purposes
  72. HeartbeatTick uint32
  73. // AutoLockManagers determines whether or not an unlock key will be generated
  74. // when bootstrapping a new cluster for the first time
  75. AutoLockManagers bool
  76. // UnlockKey is the key to unlock a node - used for decrypting at rest. This
  77. // only applies to nodes that have already joined a cluster.
  78. UnlockKey []byte
  79. }
  80. // Node implements the primary node functionality for a member of a swarm
  81. // cluster. Node handles workloads and may also run as a manager.
  82. type Node struct {
  83. sync.RWMutex
  84. config *Config
  85. remotes *persistentRemotes
  86. role string
  87. roleCond *sync.Cond
  88. conn *grpc.ClientConn
  89. connCond *sync.Cond
  90. nodeID string
  91. nodeMembership api.NodeSpec_Membership
  92. started chan struct{}
  93. startOnce sync.Once
  94. stopped chan struct{}
  95. stopOnce sync.Once
  96. ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
  97. certificateRequested chan struct{} // closed when certificate issue request has been sent by node
  98. closed chan struct{}
  99. err error
  100. agent *agent.Agent
  101. manager *manager.Manager
  102. notifyNodeChange chan *api.Node // used to send role updates from the dispatcher api on promotion/demotion
  103. unlockKey []byte
  104. }
  105. // RemoteAPIAddr returns address on which remote manager api listens.
  106. // Returns nil if node is not manager.
  107. func (n *Node) RemoteAPIAddr() (string, error) {
  108. n.RLock()
  109. defer n.RUnlock()
  110. if n.manager == nil {
  111. return "", errors.Errorf("node is not manager")
  112. }
  113. addr := n.manager.Addr()
  114. if addr == "" {
  115. return "", errors.Errorf("manager addr is not set")
  116. }
  117. return addr, nil
  118. }
  119. // New returns new Node instance.
  120. func New(c *Config) (*Node, error) {
  121. if err := os.MkdirAll(c.StateDir, 0700); err != nil {
  122. return nil, err
  123. }
  124. stateFile := filepath.Join(c.StateDir, stateFilename)
  125. dt, err := ioutil.ReadFile(stateFile)
  126. var p []api.Peer
  127. if err != nil && !os.IsNotExist(err) {
  128. return nil, err
  129. }
  130. if err == nil {
  131. if err := json.Unmarshal(dt, &p); err != nil {
  132. return nil, err
  133. }
  134. }
  135. n := &Node{
  136. remotes: newPersistentRemotes(stateFile, p...),
  137. role: ca.WorkerRole,
  138. config: c,
  139. started: make(chan struct{}),
  140. stopped: make(chan struct{}),
  141. closed: make(chan struct{}),
  142. ready: make(chan struct{}),
  143. certificateRequested: make(chan struct{}),
  144. notifyNodeChange: make(chan *api.Node, 1),
  145. unlockKey: c.UnlockKey,
  146. }
  147. if n.config.JoinAddr != "" || n.config.ForceNewCluster {
  148. n.remotes = newPersistentRemotes(filepath.Join(n.config.StateDir, stateFilename))
  149. if n.config.JoinAddr != "" {
  150. n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, remotes.DefaultObservationWeight)
  151. }
  152. }
  153. n.roleCond = sync.NewCond(n.RLocker())
  154. n.connCond = sync.NewCond(n.RLocker())
  155. return n, nil
  156. }
  157. // Start starts a node instance.
  158. func (n *Node) Start(ctx context.Context) error {
  159. err := errNodeStarted
  160. n.startOnce.Do(func() {
  161. close(n.started)
  162. go n.run(ctx)
  163. err = nil // clear error above, only once.
  164. })
  165. return err
  166. }
  167. func (n *Node) run(ctx context.Context) (err error) {
  168. defer func() {
  169. n.err = err
  170. close(n.closed)
  171. }()
  172. ctx, cancel := context.WithCancel(ctx)
  173. defer cancel()
  174. ctx = log.WithModule(ctx, "node")
  175. go func() {
  176. select {
  177. case <-ctx.Done():
  178. case <-n.stopped:
  179. cancel()
  180. }
  181. }()
  182. securityConfig, err := n.loadSecurityConfig(ctx)
  183. if err != nil {
  184. return err
  185. }
  186. taskDBPath := filepath.Join(n.config.StateDir, "worker/tasks.db")
  187. if err := os.MkdirAll(filepath.Dir(taskDBPath), 0777); err != nil {
  188. return err
  189. }
  190. db, err := bolt.Open(taskDBPath, 0666, nil)
  191. if err != nil {
  192. return err
  193. }
  194. defer db.Close()
  195. forceCertRenewal := make(chan struct{})
  196. renewCert := func() {
  197. select {
  198. case forceCertRenewal <- struct{}{}:
  199. case <-ctx.Done():
  200. }
  201. }
  202. go func() {
  203. for {
  204. select {
  205. case <-ctx.Done():
  206. return
  207. case node := <-n.notifyNodeChange:
  208. // If the server is sending us a ForceRenewal State, renew
  209. if node.Certificate.Status.State == api.IssuanceStateRotate {
  210. renewCert()
  211. continue
  212. }
  213. n.Lock()
  214. // If we got a role change, renew
  215. lastRole := n.role
  216. role := ca.WorkerRole
  217. if node.Spec.Role == api.NodeRoleManager {
  218. role = ca.ManagerRole
  219. }
  220. if lastRole == role {
  221. n.Unlock()
  222. continue
  223. }
  224. // switch role to agent immediately to shutdown manager early
  225. if role == ca.WorkerRole {
  226. n.role = role
  227. n.roleCond.Broadcast()
  228. }
  229. n.Unlock()
  230. renewCert()
  231. }
  232. }
  233. }()
  234. updates := ca.RenewTLSConfig(ctx, securityConfig, n.remotes, forceCertRenewal)
  235. go func() {
  236. for {
  237. select {
  238. case certUpdate := <-updates:
  239. if certUpdate.Err != nil {
  240. logrus.Warnf("error renewing TLS certificate: %v", certUpdate.Err)
  241. continue
  242. }
  243. n.Lock()
  244. n.role = certUpdate.Role
  245. n.roleCond.Broadcast()
  246. n.Unlock()
  247. case <-ctx.Done():
  248. return
  249. }
  250. }
  251. }()
  252. role := n.role
  253. managerReady := make(chan struct{})
  254. agentReady := make(chan struct{})
  255. var managerErr error
  256. var agentErr error
  257. var wg sync.WaitGroup
  258. wg.Add(2)
  259. go func() {
  260. managerErr = n.runManager(ctx, securityConfig, managerReady) // store err and loop
  261. wg.Done()
  262. cancel()
  263. }()
  264. go func() {
  265. agentErr = n.runAgent(ctx, db, securityConfig.ClientTLSCreds, agentReady)
  266. wg.Done()
  267. cancel()
  268. }()
  269. go func() {
  270. <-agentReady
  271. if role == ca.ManagerRole {
  272. <-managerReady
  273. }
  274. close(n.ready)
  275. }()
  276. wg.Wait()
  277. if managerErr != nil && managerErr != context.Canceled {
  278. return managerErr
  279. }
  280. if agentErr != nil && agentErr != context.Canceled {
  281. return agentErr
  282. }
  283. return err
  284. }
  285. // Stop stops node execution
  286. func (n *Node) Stop(ctx context.Context) error {
  287. select {
  288. case <-n.started:
  289. default:
  290. return errNodeNotStarted
  291. }
  292. // ask agent to clean up assignments
  293. n.Lock()
  294. if n.agent != nil {
  295. if err := n.agent.Leave(ctx); err != nil {
  296. log.G(ctx).WithError(err).Error("agent failed to clean up assignments")
  297. }
  298. }
  299. n.Unlock()
  300. n.stopOnce.Do(func() {
  301. close(n.stopped)
  302. })
  303. select {
  304. case <-n.closed:
  305. return nil
  306. case <-ctx.Done():
  307. return ctx.Err()
  308. }
  309. }
  310. // Err returns the error that caused the node to shutdown or nil. Err blocks
  311. // until the node has fully shut down.
  312. func (n *Node) Err(ctx context.Context) error {
  313. select {
  314. case <-n.closed:
  315. return n.err
  316. case <-ctx.Done():
  317. return ctx.Err()
  318. }
  319. }
  320. func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportCredentials, ready chan<- struct{}) error {
  321. select {
  322. case <-ctx.Done():
  323. case <-n.remotes.WaitSelect(ctx):
  324. }
  325. if ctx.Err() != nil {
  326. return ctx.Err()
  327. }
  328. a, err := agent.New(&agent.Config{
  329. Hostname: n.config.Hostname,
  330. Managers: n.remotes,
  331. Executor: n.config.Executor,
  332. DB: db,
  333. NotifyNodeChange: n.notifyNodeChange,
  334. Credentials: creds,
  335. })
  336. if err != nil {
  337. return err
  338. }
  339. if err := a.Start(ctx); err != nil {
  340. return err
  341. }
  342. n.Lock()
  343. n.agent = a
  344. n.Unlock()
  345. defer func() {
  346. n.Lock()
  347. n.agent = nil
  348. n.Unlock()
  349. }()
  350. go func() {
  351. <-a.Ready()
  352. close(ready)
  353. }()
  354. // todo: manually call stop on context cancellation?
  355. return a.Err(context.Background())
  356. }
  357. // Ready returns a channel that is closed after node's initialization has
  358. // completes for the first time.
  359. func (n *Node) Ready() <-chan struct{} {
  360. return n.ready
  361. }
  362. // CertificateRequested returns a channel that is closed after node has
  363. // requested a certificate. After this call a caller can expect calls to
  364. // NodeID() and `NodeMembership()` to succeed.
  365. func (n *Node) CertificateRequested() <-chan struct{} {
  366. return n.certificateRequested
  367. }
  368. func (n *Node) setControlSocket(conn *grpc.ClientConn) {
  369. n.Lock()
  370. if n.conn != nil {
  371. n.conn.Close()
  372. }
  373. n.conn = conn
  374. n.connCond.Broadcast()
  375. n.Unlock()
  376. }
  377. // ListenControlSocket listens changes of a connection for managing the
  378. // cluster control api
  379. func (n *Node) ListenControlSocket(ctx context.Context) <-chan *grpc.ClientConn {
  380. c := make(chan *grpc.ClientConn, 1)
  381. n.RLock()
  382. conn := n.conn
  383. c <- conn
  384. done := make(chan struct{})
  385. go func() {
  386. select {
  387. case <-ctx.Done():
  388. n.connCond.Broadcast()
  389. case <-done:
  390. }
  391. }()
  392. go func() {
  393. defer close(c)
  394. defer close(done)
  395. defer n.RUnlock()
  396. for {
  397. if ctx.Err() != nil {
  398. return
  399. }
  400. if conn == n.conn {
  401. n.connCond.Wait()
  402. continue
  403. }
  404. conn = n.conn
  405. c <- conn
  406. }
  407. }()
  408. return c
  409. }
  410. // NodeID returns current node's ID. May be empty if not set.
  411. func (n *Node) NodeID() string {
  412. n.RLock()
  413. defer n.RUnlock()
  414. return n.nodeID
  415. }
  416. // NodeMembership returns current node's membership. May be empty if not set.
  417. func (n *Node) NodeMembership() api.NodeSpec_Membership {
  418. n.RLock()
  419. defer n.RUnlock()
  420. return n.nodeMembership
  421. }
  422. // Manager returns manager instance started by node. May be nil.
  423. func (n *Node) Manager() *manager.Manager {
  424. n.RLock()
  425. defer n.RUnlock()
  426. return n.manager
  427. }
  428. // Agent returns agent instance started by node. May be nil.
  429. func (n *Node) Agent() *agent.Agent {
  430. n.RLock()
  431. defer n.RUnlock()
  432. return n.agent
  433. }
  434. // Remotes returns a list of known peers known to node.
  435. func (n *Node) Remotes() []api.Peer {
  436. weights := n.remotes.Weights()
  437. remotes := make([]api.Peer, 0, len(weights))
  438. for p := range weights {
  439. remotes = append(remotes, p)
  440. }
  441. return remotes
  442. }
  443. func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, error) {
  444. paths := ca.NewConfigPaths(filepath.Join(n.config.StateDir, certDirectory))
  445. var securityConfig *ca.SecurityConfig
  446. krw := ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{})
  447. if err := krw.Migrate(); err != nil {
  448. return nil, err
  449. }
  450. // Check if we already have a valid certificates on disk.
  451. rootCA, err := ca.GetLocalRootCA(paths.RootCA)
  452. if err != nil && err != ca.ErrNoLocalRootCA {
  453. return nil, err
  454. }
  455. if err == nil {
  456. clientTLSCreds, serverTLSCreds, err := ca.LoadTLSCreds(rootCA, krw)
  457. _, ok := errors.Cause(err).(ca.ErrInvalidKEK)
  458. switch {
  459. case err == nil:
  460. securityConfig = ca.NewSecurityConfig(&rootCA, krw, clientTLSCreds, serverTLSCreds)
  461. log.G(ctx).Debug("loaded CA and TLS certificates")
  462. case ok:
  463. return nil, ErrInvalidUnlockKey
  464. case os.IsNotExist(err):
  465. break
  466. default:
  467. return nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert)
  468. }
  469. }
  470. if securityConfig == nil {
  471. if n.config.JoinAddr == "" {
  472. // if we're not joining a cluster, bootstrap a new one - and we have to set the unlock key
  473. n.unlockKey = nil
  474. if n.config.AutoLockManagers {
  475. n.unlockKey = encryption.GenerateSecretKey()
  476. }
  477. krw = ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{})
  478. rootCA, err = ca.CreateRootCA(ca.DefaultRootCN, paths.RootCA)
  479. if err != nil {
  480. return nil, err
  481. }
  482. log.G(ctx).Debug("generated CA key and certificate")
  483. } else if err == ca.ErrNoLocalRootCA { // from previous error loading the root CA from disk
  484. rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.remotes)
  485. if err != nil {
  486. return nil, err
  487. }
  488. log.G(ctx).Debug("downloaded CA certificate")
  489. }
  490. // Obtain new certs and setup TLS certificates renewal for this node:
  491. // - We call LoadOrCreateSecurityConfig which blocks until a valid certificate has been issued
  492. // - We retrieve the nodeID from LoadOrCreateSecurityConfig through the info channel. This allows
  493. // us to display the ID before the certificate gets issued (for potential approval).
  494. // - We wait for LoadOrCreateSecurityConfig to finish since we need a certificate to operate.
  495. // - Given a valid certificate, spin a renewal go-routine that will ensure that certificates stay
  496. // up to date.
  497. issueResponseChan := make(chan api.IssueNodeCertificateResponse, 1)
  498. go func() {
  499. select {
  500. case <-ctx.Done():
  501. case resp := <-issueResponseChan:
  502. log.G(log.WithModule(ctx, "tls")).WithFields(logrus.Fields{
  503. "node.id": resp.NodeID,
  504. }).Debugf("loaded TLS certificate")
  505. n.Lock()
  506. n.nodeID = resp.NodeID
  507. n.nodeMembership = resp.NodeMembership
  508. n.Unlock()
  509. close(n.certificateRequested)
  510. }
  511. }()
  512. // LoadOrCreateSecurityConfig is the point at which a new node joining a cluster will retrieve TLS
  513. // certificates and write them to disk
  514. securityConfig, err = ca.LoadOrCreateSecurityConfig(
  515. ctx, rootCA, n.config.JoinToken, ca.ManagerRole, n.remotes, issueResponseChan, krw)
  516. if err != nil {
  517. if _, ok := errors.Cause(err).(ca.ErrInvalidKEK); ok {
  518. return nil, ErrInvalidUnlockKey
  519. }
  520. return nil, err
  521. }
  522. }
  523. n.Lock()
  524. n.role = securityConfig.ClientTLSCreds.Role()
  525. n.nodeID = securityConfig.ClientTLSCreds.NodeID()
  526. n.nodeMembership = api.NodeMembershipAccepted
  527. n.roleCond.Broadcast()
  528. n.Unlock()
  529. return securityConfig, nil
  530. }
  531. func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
  532. opts := []grpc.DialOption{}
  533. insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
  534. opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
  535. addr := n.config.ListenControlAPI
  536. opts = append(opts, grpc.WithDialer(
  537. func(addr string, timeout time.Duration) (net.Conn, error) {
  538. return xnet.DialTimeoutLocal(addr, timeout)
  539. }))
  540. conn, err := grpc.Dial(addr, opts...)
  541. if err != nil {
  542. return err
  543. }
  544. client := api.NewHealthClient(conn)
  545. for {
  546. resp, err := client.Check(ctx, &api.HealthCheckRequest{Service: "ControlAPI"})
  547. if err != nil {
  548. return err
  549. }
  550. if resp.Status == api.HealthCheckResponse_SERVING {
  551. break
  552. }
  553. time.Sleep(500 * time.Millisecond)
  554. }
  555. n.setControlSocket(conn)
  556. if ready != nil {
  557. close(ready)
  558. }
  559. return nil
  560. }
  561. func (n *Node) waitRole(ctx context.Context, role string) error {
  562. n.roleCond.L.Lock()
  563. if role == n.role {
  564. n.roleCond.L.Unlock()
  565. return nil
  566. }
  567. finishCh := make(chan struct{})
  568. defer close(finishCh)
  569. go func() {
  570. select {
  571. case <-finishCh:
  572. case <-ctx.Done():
  573. // call broadcast to shutdown this function
  574. n.roleCond.Broadcast()
  575. }
  576. }()
  577. defer n.roleCond.L.Unlock()
  578. for role != n.role {
  579. n.roleCond.Wait()
  580. select {
  581. case <-ctx.Done():
  582. if ctx.Err() != nil {
  583. return ctx.Err()
  584. }
  585. default:
  586. }
  587. }
  588. return nil
  589. }
  590. func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
  591. for {
  592. if err := n.waitRole(ctx, ca.ManagerRole); err != nil {
  593. return err
  594. }
  595. remoteAddr, _ := n.remotes.Select(n.NodeID())
  596. m, err := manager.New(&manager.Config{
  597. ForceNewCluster: n.config.ForceNewCluster,
  598. RemoteAPI: manager.RemoteAddrs{
  599. ListenAddr: n.config.ListenRemoteAPI,
  600. AdvertiseAddr: n.config.AdvertiseRemoteAPI,
  601. },
  602. ControlAPI: n.config.ListenControlAPI,
  603. SecurityConfig: securityConfig,
  604. ExternalCAs: n.config.ExternalCAs,
  605. JoinRaft: remoteAddr.Addr,
  606. StateDir: n.config.StateDir,
  607. HeartbeatTick: n.config.HeartbeatTick,
  608. ElectionTick: n.config.ElectionTick,
  609. AutoLockManagers: n.config.AutoLockManagers,
  610. UnlockKey: n.unlockKey,
  611. })
  612. if err != nil {
  613. return err
  614. }
  615. done := make(chan struct{})
  616. var runErr error
  617. go func() {
  618. runErr = m.Run(context.Background())
  619. close(done)
  620. }()
  621. n.Lock()
  622. n.manager = m
  623. n.Unlock()
  624. connCtx, connCancel := context.WithCancel(ctx)
  625. go n.initManagerConnection(connCtx, ready)
  626. // this happens only on initial start
  627. if ready != nil {
  628. go func(ready chan struct{}) {
  629. select {
  630. case <-ready:
  631. addr, err := n.RemoteAPIAddr()
  632. if err != nil {
  633. log.G(ctx).WithError(err).Errorf("get remote api addr")
  634. } else {
  635. n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight)
  636. }
  637. case <-connCtx.Done():
  638. }
  639. }(ready)
  640. ready = nil
  641. }
  642. roleChanged := make(chan error)
  643. waitCtx, waitCancel := context.WithCancel(ctx)
  644. go func() {
  645. err := n.waitRole(waitCtx, ca.WorkerRole)
  646. roleChanged <- err
  647. }()
  648. select {
  649. case <-done:
  650. // Fail out if m.Run() returns error, otherwise wait for
  651. // role change.
  652. if runErr != nil && runErr != raft.ErrMemberRemoved {
  653. err = runErr
  654. } else {
  655. err = <-roleChanged
  656. }
  657. case err = <-roleChanged:
  658. }
  659. n.Lock()
  660. n.manager = nil
  661. n.Unlock()
  662. select {
  663. case <-done:
  664. case <-ctx.Done():
  665. err = ctx.Err()
  666. m.Stop(context.Background())
  667. <-done
  668. }
  669. connCancel()
  670. n.setControlSocket(nil)
  671. waitCancel()
  672. if err != nil {
  673. return err
  674. }
  675. }
  676. }
  677. type persistentRemotes struct {
  678. sync.RWMutex
  679. c *sync.Cond
  680. remotes.Remotes
  681. storePath string
  682. lastSavedState []api.Peer
  683. }
  684. func newPersistentRemotes(f string, peers ...api.Peer) *persistentRemotes {
  685. pr := &persistentRemotes{
  686. storePath: f,
  687. Remotes: remotes.NewRemotes(peers...),
  688. }
  689. pr.c = sync.NewCond(pr.RLocker())
  690. return pr
  691. }
  692. func (s *persistentRemotes) Observe(peer api.Peer, weight int) {
  693. s.Lock()
  694. defer s.Unlock()
  695. s.Remotes.Observe(peer, weight)
  696. s.c.Broadcast()
  697. if err := s.save(); err != nil {
  698. logrus.Errorf("error writing cluster state file: %v", err)
  699. return
  700. }
  701. return
  702. }
  703. func (s *persistentRemotes) Remove(peers ...api.Peer) {
  704. s.Lock()
  705. defer s.Unlock()
  706. s.Remotes.Remove(peers...)
  707. if err := s.save(); err != nil {
  708. logrus.Errorf("error writing cluster state file: %v", err)
  709. return
  710. }
  711. return
  712. }
  713. func (s *persistentRemotes) save() error {
  714. weights := s.Weights()
  715. remotes := make([]api.Peer, 0, len(weights))
  716. for r := range weights {
  717. remotes = append(remotes, r)
  718. }
  719. sort.Sort(sortablePeers(remotes))
  720. if reflect.DeepEqual(remotes, s.lastSavedState) {
  721. return nil
  722. }
  723. dt, err := json.Marshal(remotes)
  724. if err != nil {
  725. return err
  726. }
  727. s.lastSavedState = remotes
  728. return ioutils.AtomicWriteFile(s.storePath, dt, 0600)
  729. }
  730. // WaitSelect waits until at least one remote becomes available and then selects one.
  731. func (s *persistentRemotes) WaitSelect(ctx context.Context) <-chan api.Peer {
  732. c := make(chan api.Peer, 1)
  733. s.RLock()
  734. done := make(chan struct{})
  735. go func() {
  736. select {
  737. case <-ctx.Done():
  738. s.c.Broadcast()
  739. case <-done:
  740. }
  741. }()
  742. go func() {
  743. defer s.RUnlock()
  744. defer close(c)
  745. defer close(done)
  746. for {
  747. if ctx.Err() != nil {
  748. return
  749. }
  750. p, err := s.Select()
  751. if err == nil {
  752. c <- p
  753. return
  754. }
  755. s.c.Wait()
  756. }
  757. }()
  758. return c
  759. }
  760. // sortablePeers is a sort wrapper for []api.Peer
  761. type sortablePeers []api.Peer
  762. func (sp sortablePeers) Less(i, j int) bool { return sp[i].NodeID < sp[j].NodeID }
  763. func (sp sortablePeers) Len() int { return len(sp) }
  764. func (sp sortablePeers) Swap(i, j int) { sp[i], sp[j] = sp[j], sp[i] }