node.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859
  1. package node
  2. import (
  3. "crypto/tls"
  4. "encoding/json"
  5. "io/ioutil"
  6. "net"
  7. "os"
  8. "path/filepath"
  9. "reflect"
  10. "sort"
  11. "sync"
  12. "time"
  13. "github.com/Sirupsen/logrus"
  14. "github.com/boltdb/bolt"
  15. "github.com/docker/docker/pkg/plugingetter"
  16. "github.com/docker/swarmkit/agent"
  17. "github.com/docker/swarmkit/agent/exec"
  18. "github.com/docker/swarmkit/api"
  19. "github.com/docker/swarmkit/ca"
  20. "github.com/docker/swarmkit/ioutils"
  21. "github.com/docker/swarmkit/log"
  22. "github.com/docker/swarmkit/manager"
  23. "github.com/docker/swarmkit/manager/encryption"
  24. "github.com/docker/swarmkit/manager/state/raft"
  25. "github.com/docker/swarmkit/remotes"
  26. "github.com/docker/swarmkit/xnet"
  27. "github.com/pkg/errors"
  28. "golang.org/x/net/context"
  29. "google.golang.org/grpc"
  30. "google.golang.org/grpc/credentials"
  31. )
  32. const stateFilename = "state.json"
  33. var (
  34. errNodeStarted = errors.New("node: already started")
  35. errNodeNotStarted = errors.New("node: not started")
  36. certDirectory = "certificates"
  37. // ErrInvalidUnlockKey is returned when we can't decrypt the TLS certificate
  38. ErrInvalidUnlockKey = errors.New("node is locked, and needs a valid unlock key")
  39. )
  40. // Config provides values for a Node.
  41. type Config struct {
  42. // Hostname is the name of host for agent instance.
  43. Hostname string
  44. // JoinAddr specifies node that should be used for the initial connection to
  45. // other manager in cluster. This should be only one address and optional,
  46. // the actual remotes come from the stored state.
  47. JoinAddr string
  48. // StateDir specifies the directory the node uses to keep the state of the
  49. // remote managers and certificates.
  50. StateDir string
  51. // JoinToken is the token to be used on the first certificate request.
  52. JoinToken string
  53. // ExternalCAs is a list of CAs to which a manager node
  54. // will make certificate signing requests for node certificates.
  55. ExternalCAs []*api.ExternalCA
  56. // ForceNewCluster creates a new cluster from current raft state.
  57. ForceNewCluster bool
  58. // ListenControlAPI specifies address the control API should listen on.
  59. ListenControlAPI string
  60. // ListenRemoteAPI specifies the address for the remote API that agents
  61. // and raft members connect to.
  62. ListenRemoteAPI string
  63. // AdvertiseRemoteAPI specifies the address that should be advertised
  64. // for connections to the remote API (including the raft service).
  65. AdvertiseRemoteAPI string
  66. // Executor specifies the executor to use for the agent.
  67. Executor exec.Executor
  68. // ElectionTick defines the amount of ticks needed without
  69. // leader to trigger a new election
  70. ElectionTick uint32
  71. // HeartbeatTick defines the amount of ticks between each
  72. // heartbeat sent to other members for health-check purposes
  73. HeartbeatTick uint32
  74. // AutoLockManagers determines whether or not an unlock key will be generated
  75. // when bootstrapping a new cluster for the first time
  76. AutoLockManagers bool
  77. // UnlockKey is the key to unlock a node - used for decrypting at rest. This
  78. // only applies to nodes that have already joined a cluster.
  79. UnlockKey []byte
  80. // PluginGetter provides access to docker's plugin inventory.
  81. PluginGetter plugingetter.PluginGetter
  82. }
  83. // Node implements the primary node functionality for a member of a swarm
  84. // cluster. Node handles workloads and may also run as a manager.
  85. type Node struct {
  86. sync.RWMutex
  87. config *Config
  88. remotes *persistentRemotes
  89. role string
  90. roleCond *sync.Cond
  91. conn *grpc.ClientConn
  92. connCond *sync.Cond
  93. nodeID string
  94. nodeMembership api.NodeSpec_Membership
  95. started chan struct{}
  96. startOnce sync.Once
  97. stopped chan struct{}
  98. stopOnce sync.Once
  99. ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
  100. certificateRequested chan struct{} // closed when certificate issue request has been sent by node
  101. closed chan struct{}
  102. err error
  103. agent *agent.Agent
  104. manager *manager.Manager
  105. notifyNodeChange chan *api.Node // used to send role updates from the dispatcher api on promotion/demotion
  106. unlockKey []byte
  107. }
  108. // RemoteAPIAddr returns address on which remote manager api listens.
  109. // Returns nil if node is not manager.
  110. func (n *Node) RemoteAPIAddr() (string, error) {
  111. n.RLock()
  112. defer n.RUnlock()
  113. if n.manager == nil {
  114. return "", errors.Errorf("node is not manager")
  115. }
  116. addr := n.manager.Addr()
  117. if addr == "" {
  118. return "", errors.Errorf("manager addr is not set")
  119. }
  120. return addr, nil
  121. }
  122. // New returns new Node instance.
  123. func New(c *Config) (*Node, error) {
  124. if err := os.MkdirAll(c.StateDir, 0700); err != nil {
  125. return nil, err
  126. }
  127. stateFile := filepath.Join(c.StateDir, stateFilename)
  128. dt, err := ioutil.ReadFile(stateFile)
  129. var p []api.Peer
  130. if err != nil && !os.IsNotExist(err) {
  131. return nil, err
  132. }
  133. if err == nil {
  134. if err := json.Unmarshal(dt, &p); err != nil {
  135. return nil, err
  136. }
  137. }
  138. n := &Node{
  139. remotes: newPersistentRemotes(stateFile, p...),
  140. role: ca.WorkerRole,
  141. config: c,
  142. started: make(chan struct{}),
  143. stopped: make(chan struct{}),
  144. closed: make(chan struct{}),
  145. ready: make(chan struct{}),
  146. certificateRequested: make(chan struct{}),
  147. notifyNodeChange: make(chan *api.Node, 1),
  148. unlockKey: c.UnlockKey,
  149. }
  150. if n.config.JoinAddr != "" || n.config.ForceNewCluster {
  151. n.remotes = newPersistentRemotes(filepath.Join(n.config.StateDir, stateFilename))
  152. if n.config.JoinAddr != "" {
  153. n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, remotes.DefaultObservationWeight)
  154. }
  155. }
  156. n.roleCond = sync.NewCond(n.RLocker())
  157. n.connCond = sync.NewCond(n.RLocker())
  158. return n, nil
  159. }
  160. // Start starts a node instance.
  161. func (n *Node) Start(ctx context.Context) error {
  162. err := errNodeStarted
  163. n.startOnce.Do(func() {
  164. close(n.started)
  165. go n.run(ctx)
  166. err = nil // clear error above, only once.
  167. })
  168. return err
  169. }
  170. func (n *Node) run(ctx context.Context) (err error) {
  171. defer func() {
  172. n.err = err
  173. close(n.closed)
  174. }()
  175. ctx, cancel := context.WithCancel(ctx)
  176. defer cancel()
  177. ctx = log.WithModule(ctx, "node")
  178. go func() {
  179. select {
  180. case <-ctx.Done():
  181. case <-n.stopped:
  182. cancel()
  183. }
  184. }()
  185. securityConfig, err := n.loadSecurityConfig(ctx)
  186. if err != nil {
  187. return err
  188. }
  189. taskDBPath := filepath.Join(n.config.StateDir, "worker/tasks.db")
  190. if err := os.MkdirAll(filepath.Dir(taskDBPath), 0777); err != nil {
  191. return err
  192. }
  193. db, err := bolt.Open(taskDBPath, 0666, nil)
  194. if err != nil {
  195. return err
  196. }
  197. defer db.Close()
  198. forceCertRenewal := make(chan struct{})
  199. renewCert := func() {
  200. select {
  201. case forceCertRenewal <- struct{}{}:
  202. case <-ctx.Done():
  203. }
  204. }
  205. go func() {
  206. for {
  207. select {
  208. case <-ctx.Done():
  209. return
  210. case node := <-n.notifyNodeChange:
  211. // If the server is sending us a ForceRenewal State, renew
  212. if node.Certificate.Status.State == api.IssuanceStateRotate {
  213. renewCert()
  214. continue
  215. }
  216. n.Lock()
  217. // If we got a role change, renew
  218. lastRole := n.role
  219. role := ca.WorkerRole
  220. if node.Spec.Role == api.NodeRoleManager {
  221. role = ca.ManagerRole
  222. }
  223. if lastRole == role {
  224. n.Unlock()
  225. continue
  226. }
  227. // switch role to agent immediately to shutdown manager early
  228. if role == ca.WorkerRole {
  229. n.role = role
  230. n.roleCond.Broadcast()
  231. }
  232. n.Unlock()
  233. renewCert()
  234. }
  235. }
  236. }()
  237. updates := ca.RenewTLSConfig(ctx, securityConfig, n.remotes, forceCertRenewal)
  238. go func() {
  239. for {
  240. select {
  241. case certUpdate := <-updates:
  242. if certUpdate.Err != nil {
  243. logrus.Warnf("error renewing TLS certificate: %v", certUpdate.Err)
  244. continue
  245. }
  246. n.Lock()
  247. n.role = certUpdate.Role
  248. n.roleCond.Broadcast()
  249. n.Unlock()
  250. case <-ctx.Done():
  251. return
  252. }
  253. }
  254. }()
  255. role := n.role
  256. managerReady := make(chan struct{})
  257. agentReady := make(chan struct{})
  258. var managerErr error
  259. var agentErr error
  260. var wg sync.WaitGroup
  261. wg.Add(2)
  262. go func() {
  263. managerErr = n.runManager(ctx, securityConfig, managerReady) // store err and loop
  264. wg.Done()
  265. cancel()
  266. }()
  267. go func() {
  268. agentErr = n.runAgent(ctx, db, securityConfig.ClientTLSCreds, agentReady)
  269. wg.Done()
  270. cancel()
  271. }()
  272. go func() {
  273. <-agentReady
  274. if role == ca.ManagerRole {
  275. <-managerReady
  276. }
  277. close(n.ready)
  278. }()
  279. wg.Wait()
  280. if managerErr != nil && managerErr != context.Canceled {
  281. return managerErr
  282. }
  283. if agentErr != nil && agentErr != context.Canceled {
  284. return agentErr
  285. }
  286. return err
  287. }
  288. // Stop stops node execution
  289. func (n *Node) Stop(ctx context.Context) error {
  290. select {
  291. case <-n.started:
  292. default:
  293. return errNodeNotStarted
  294. }
  295. // ask agent to clean up assignments
  296. n.Lock()
  297. if n.agent != nil {
  298. if err := n.agent.Leave(ctx); err != nil {
  299. log.G(ctx).WithError(err).Error("agent failed to clean up assignments")
  300. }
  301. }
  302. n.Unlock()
  303. n.stopOnce.Do(func() {
  304. close(n.stopped)
  305. })
  306. select {
  307. case <-n.closed:
  308. return nil
  309. case <-ctx.Done():
  310. return ctx.Err()
  311. }
  312. }
  313. // Err returns the error that caused the node to shutdown or nil. Err blocks
  314. // until the node has fully shut down.
  315. func (n *Node) Err(ctx context.Context) error {
  316. select {
  317. case <-n.closed:
  318. return n.err
  319. case <-ctx.Done():
  320. return ctx.Err()
  321. }
  322. }
  323. func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportCredentials, ready chan<- struct{}) error {
  324. select {
  325. case <-ctx.Done():
  326. case <-n.remotes.WaitSelect(ctx):
  327. }
  328. if ctx.Err() != nil {
  329. return ctx.Err()
  330. }
  331. a, err := agent.New(&agent.Config{
  332. Hostname: n.config.Hostname,
  333. Managers: n.remotes,
  334. Executor: n.config.Executor,
  335. DB: db,
  336. NotifyNodeChange: n.notifyNodeChange,
  337. Credentials: creds,
  338. })
  339. if err != nil {
  340. return err
  341. }
  342. if err := a.Start(ctx); err != nil {
  343. return err
  344. }
  345. n.Lock()
  346. n.agent = a
  347. n.Unlock()
  348. defer func() {
  349. n.Lock()
  350. n.agent = nil
  351. n.Unlock()
  352. }()
  353. go func() {
  354. <-a.Ready()
  355. close(ready)
  356. }()
  357. // todo: manually call stop on context cancellation?
  358. return a.Err(context.Background())
  359. }
  360. // Ready returns a channel that is closed after node's initialization has
  361. // completes for the first time.
  362. func (n *Node) Ready() <-chan struct{} {
  363. return n.ready
  364. }
  365. // CertificateRequested returns a channel that is closed after node has
  366. // requested a certificate. After this call a caller can expect calls to
  367. // NodeID() and `NodeMembership()` to succeed.
  368. func (n *Node) CertificateRequested() <-chan struct{} {
  369. return n.certificateRequested
  370. }
  371. func (n *Node) setControlSocket(conn *grpc.ClientConn) {
  372. n.Lock()
  373. if n.conn != nil {
  374. n.conn.Close()
  375. }
  376. n.conn = conn
  377. n.connCond.Broadcast()
  378. n.Unlock()
  379. }
  380. // ListenControlSocket listens changes of a connection for managing the
  381. // cluster control api
  382. func (n *Node) ListenControlSocket(ctx context.Context) <-chan *grpc.ClientConn {
  383. c := make(chan *grpc.ClientConn, 1)
  384. n.RLock()
  385. conn := n.conn
  386. c <- conn
  387. done := make(chan struct{})
  388. go func() {
  389. select {
  390. case <-ctx.Done():
  391. n.connCond.Broadcast()
  392. case <-done:
  393. }
  394. }()
  395. go func() {
  396. defer close(c)
  397. defer close(done)
  398. defer n.RUnlock()
  399. for {
  400. if ctx.Err() != nil {
  401. return
  402. }
  403. if conn == n.conn {
  404. n.connCond.Wait()
  405. continue
  406. }
  407. conn = n.conn
  408. c <- conn
  409. }
  410. }()
  411. return c
  412. }
  413. // NodeID returns current node's ID. May be empty if not set.
  414. func (n *Node) NodeID() string {
  415. n.RLock()
  416. defer n.RUnlock()
  417. return n.nodeID
  418. }
  419. // NodeMembership returns current node's membership. May be empty if not set.
  420. func (n *Node) NodeMembership() api.NodeSpec_Membership {
  421. n.RLock()
  422. defer n.RUnlock()
  423. return n.nodeMembership
  424. }
  425. // Manager returns manager instance started by node. May be nil.
  426. func (n *Node) Manager() *manager.Manager {
  427. n.RLock()
  428. defer n.RUnlock()
  429. return n.manager
  430. }
  431. // Agent returns agent instance started by node. May be nil.
  432. func (n *Node) Agent() *agent.Agent {
  433. n.RLock()
  434. defer n.RUnlock()
  435. return n.agent
  436. }
  437. // Remotes returns a list of known peers known to node.
  438. func (n *Node) Remotes() []api.Peer {
  439. weights := n.remotes.Weights()
  440. remotes := make([]api.Peer, 0, len(weights))
  441. for p := range weights {
  442. remotes = append(remotes, p)
  443. }
  444. return remotes
  445. }
  446. func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, error) {
  447. paths := ca.NewConfigPaths(filepath.Join(n.config.StateDir, certDirectory))
  448. var securityConfig *ca.SecurityConfig
  449. krw := ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{})
  450. if err := krw.Migrate(); err != nil {
  451. return nil, err
  452. }
  453. // Check if we already have a valid certificates on disk.
  454. rootCA, err := ca.GetLocalRootCA(paths.RootCA)
  455. if err != nil && err != ca.ErrNoLocalRootCA {
  456. return nil, err
  457. }
  458. if err == nil {
  459. clientTLSCreds, serverTLSCreds, err := ca.LoadTLSCreds(rootCA, krw)
  460. _, ok := errors.Cause(err).(ca.ErrInvalidKEK)
  461. switch {
  462. case err == nil:
  463. securityConfig = ca.NewSecurityConfig(&rootCA, krw, clientTLSCreds, serverTLSCreds)
  464. log.G(ctx).Debug("loaded CA and TLS certificates")
  465. case ok:
  466. return nil, ErrInvalidUnlockKey
  467. case os.IsNotExist(err):
  468. break
  469. default:
  470. return nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert)
  471. }
  472. }
  473. if securityConfig == nil {
  474. if n.config.JoinAddr == "" {
  475. // if we're not joining a cluster, bootstrap a new one - and we have to set the unlock key
  476. n.unlockKey = nil
  477. if n.config.AutoLockManagers {
  478. n.unlockKey = encryption.GenerateSecretKey()
  479. }
  480. krw = ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{})
  481. rootCA, err = ca.CreateRootCA(ca.DefaultRootCN, paths.RootCA)
  482. if err != nil {
  483. return nil, err
  484. }
  485. log.G(ctx).Debug("generated CA key and certificate")
  486. } else if err == ca.ErrNoLocalRootCA { // from previous error loading the root CA from disk
  487. rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.remotes)
  488. if err != nil {
  489. return nil, err
  490. }
  491. log.G(ctx).Debug("downloaded CA certificate")
  492. }
  493. // Obtain new certs and setup TLS certificates renewal for this node:
  494. // - We call LoadOrCreateSecurityConfig which blocks until a valid certificate has been issued
  495. // - We retrieve the nodeID from LoadOrCreateSecurityConfig through the info channel. This allows
  496. // us to display the ID before the certificate gets issued (for potential approval).
  497. // - We wait for LoadOrCreateSecurityConfig to finish since we need a certificate to operate.
  498. // - Given a valid certificate, spin a renewal go-routine that will ensure that certificates stay
  499. // up to date.
  500. issueResponseChan := make(chan api.IssueNodeCertificateResponse, 1)
  501. go func() {
  502. select {
  503. case <-ctx.Done():
  504. case resp := <-issueResponseChan:
  505. log.G(log.WithModule(ctx, "tls")).WithFields(logrus.Fields{
  506. "node.id": resp.NodeID,
  507. }).Debugf("loaded TLS certificate")
  508. n.Lock()
  509. n.nodeID = resp.NodeID
  510. n.nodeMembership = resp.NodeMembership
  511. n.Unlock()
  512. close(n.certificateRequested)
  513. }
  514. }()
  515. // LoadOrCreateSecurityConfig is the point at which a new node joining a cluster will retrieve TLS
  516. // certificates and write them to disk
  517. securityConfig, err = ca.LoadOrCreateSecurityConfig(
  518. ctx, rootCA, n.config.JoinToken, ca.ManagerRole, n.remotes, issueResponseChan, krw)
  519. if err != nil {
  520. if _, ok := errors.Cause(err).(ca.ErrInvalidKEK); ok {
  521. return nil, ErrInvalidUnlockKey
  522. }
  523. return nil, err
  524. }
  525. }
  526. n.Lock()
  527. n.role = securityConfig.ClientTLSCreds.Role()
  528. n.nodeID = securityConfig.ClientTLSCreds.NodeID()
  529. n.nodeMembership = api.NodeMembershipAccepted
  530. n.roleCond.Broadcast()
  531. n.Unlock()
  532. return securityConfig, nil
  533. }
  534. func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
  535. opts := []grpc.DialOption{}
  536. insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
  537. opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
  538. addr := n.config.ListenControlAPI
  539. opts = append(opts, grpc.WithDialer(
  540. func(addr string, timeout time.Duration) (net.Conn, error) {
  541. return xnet.DialTimeoutLocal(addr, timeout)
  542. }))
  543. conn, err := grpc.Dial(addr, opts...)
  544. if err != nil {
  545. return err
  546. }
  547. client := api.NewHealthClient(conn)
  548. for {
  549. resp, err := client.Check(ctx, &api.HealthCheckRequest{Service: "ControlAPI"})
  550. if err != nil {
  551. return err
  552. }
  553. if resp.Status == api.HealthCheckResponse_SERVING {
  554. break
  555. }
  556. time.Sleep(500 * time.Millisecond)
  557. }
  558. n.setControlSocket(conn)
  559. if ready != nil {
  560. close(ready)
  561. }
  562. return nil
  563. }
  564. func (n *Node) waitRole(ctx context.Context, role string) error {
  565. n.roleCond.L.Lock()
  566. if role == n.role {
  567. n.roleCond.L.Unlock()
  568. return nil
  569. }
  570. finishCh := make(chan struct{})
  571. defer close(finishCh)
  572. go func() {
  573. select {
  574. case <-finishCh:
  575. case <-ctx.Done():
  576. // call broadcast to shutdown this function
  577. n.roleCond.Broadcast()
  578. }
  579. }()
  580. defer n.roleCond.L.Unlock()
  581. for role != n.role {
  582. n.roleCond.Wait()
  583. select {
  584. case <-ctx.Done():
  585. if ctx.Err() != nil {
  586. return ctx.Err()
  587. }
  588. default:
  589. }
  590. }
  591. return nil
  592. }
  593. func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
  594. for {
  595. if err := n.waitRole(ctx, ca.ManagerRole); err != nil {
  596. return err
  597. }
  598. remoteAddr, _ := n.remotes.Select(n.NodeID())
  599. m, err := manager.New(&manager.Config{
  600. ForceNewCluster: n.config.ForceNewCluster,
  601. RemoteAPI: manager.RemoteAddrs{
  602. ListenAddr: n.config.ListenRemoteAPI,
  603. AdvertiseAddr: n.config.AdvertiseRemoteAPI,
  604. },
  605. ControlAPI: n.config.ListenControlAPI,
  606. SecurityConfig: securityConfig,
  607. ExternalCAs: n.config.ExternalCAs,
  608. JoinRaft: remoteAddr.Addr,
  609. StateDir: n.config.StateDir,
  610. HeartbeatTick: n.config.HeartbeatTick,
  611. ElectionTick: n.config.ElectionTick,
  612. AutoLockManagers: n.config.AutoLockManagers,
  613. UnlockKey: n.unlockKey,
  614. PluginGetter: n.config.PluginGetter,
  615. })
  616. if err != nil {
  617. return err
  618. }
  619. done := make(chan struct{})
  620. var runErr error
  621. go func() {
  622. runErr = m.Run(context.Background())
  623. close(done)
  624. }()
  625. n.Lock()
  626. n.manager = m
  627. n.Unlock()
  628. connCtx, connCancel := context.WithCancel(ctx)
  629. go n.initManagerConnection(connCtx, ready)
  630. // this happens only on initial start
  631. if ready != nil {
  632. go func(ready chan struct{}) {
  633. select {
  634. case <-ready:
  635. addr, err := n.RemoteAPIAddr()
  636. if err != nil {
  637. log.G(ctx).WithError(err).Errorf("get remote api addr")
  638. } else {
  639. n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight)
  640. }
  641. case <-connCtx.Done():
  642. }
  643. }(ready)
  644. ready = nil
  645. }
  646. roleChanged := make(chan error)
  647. waitCtx, waitCancel := context.WithCancel(ctx)
  648. go func() {
  649. err := n.waitRole(waitCtx, ca.WorkerRole)
  650. roleChanged <- err
  651. }()
  652. select {
  653. case <-done:
  654. // Fail out if m.Run() returns error, otherwise wait for
  655. // role change.
  656. if runErr != nil && runErr != raft.ErrMemberRemoved {
  657. err = runErr
  658. } else {
  659. err = <-roleChanged
  660. }
  661. case err = <-roleChanged:
  662. }
  663. n.Lock()
  664. n.manager = nil
  665. n.Unlock()
  666. select {
  667. case <-done:
  668. case <-ctx.Done():
  669. err = ctx.Err()
  670. m.Stop(context.Background())
  671. <-done
  672. }
  673. connCancel()
  674. n.setControlSocket(nil)
  675. waitCancel()
  676. if err != nil {
  677. return err
  678. }
  679. }
  680. }
  681. type persistentRemotes struct {
  682. sync.RWMutex
  683. c *sync.Cond
  684. remotes.Remotes
  685. storePath string
  686. lastSavedState []api.Peer
  687. }
  688. func newPersistentRemotes(f string, peers ...api.Peer) *persistentRemotes {
  689. pr := &persistentRemotes{
  690. storePath: f,
  691. Remotes: remotes.NewRemotes(peers...),
  692. }
  693. pr.c = sync.NewCond(pr.RLocker())
  694. return pr
  695. }
  696. func (s *persistentRemotes) Observe(peer api.Peer, weight int) {
  697. s.Lock()
  698. defer s.Unlock()
  699. s.Remotes.Observe(peer, weight)
  700. s.c.Broadcast()
  701. if err := s.save(); err != nil {
  702. logrus.Errorf("error writing cluster state file: %v", err)
  703. return
  704. }
  705. return
  706. }
  707. func (s *persistentRemotes) Remove(peers ...api.Peer) {
  708. s.Lock()
  709. defer s.Unlock()
  710. s.Remotes.Remove(peers...)
  711. if err := s.save(); err != nil {
  712. logrus.Errorf("error writing cluster state file: %v", err)
  713. return
  714. }
  715. return
  716. }
  717. func (s *persistentRemotes) save() error {
  718. weights := s.Weights()
  719. remotes := make([]api.Peer, 0, len(weights))
  720. for r := range weights {
  721. remotes = append(remotes, r)
  722. }
  723. sort.Sort(sortablePeers(remotes))
  724. if reflect.DeepEqual(remotes, s.lastSavedState) {
  725. return nil
  726. }
  727. dt, err := json.Marshal(remotes)
  728. if err != nil {
  729. return err
  730. }
  731. s.lastSavedState = remotes
  732. return ioutils.AtomicWriteFile(s.storePath, dt, 0600)
  733. }
  734. // WaitSelect waits until at least one remote becomes available and then selects one.
  735. func (s *persistentRemotes) WaitSelect(ctx context.Context) <-chan api.Peer {
  736. c := make(chan api.Peer, 1)
  737. s.RLock()
  738. done := make(chan struct{})
  739. go func() {
  740. select {
  741. case <-ctx.Done():
  742. s.c.Broadcast()
  743. case <-done:
  744. }
  745. }()
  746. go func() {
  747. defer s.RUnlock()
  748. defer close(c)
  749. defer close(done)
  750. for {
  751. if ctx.Err() != nil {
  752. return
  753. }
  754. p, err := s.Select()
  755. if err == nil {
  756. c <- p
  757. return
  758. }
  759. s.c.Wait()
  760. }
  761. }()
  762. return c
  763. }
  764. // sortablePeers is a sort wrapper for []api.Peer
  765. type sortablePeers []api.Peer
  766. func (sp sortablePeers) Less(i, j int) bool { return sp[i].NodeID < sp[j].NodeID }
  767. func (sp sortablePeers) Len() int { return len(sp) }
  768. func (sp sortablePeers) Swap(i, j int) { sp[i], sp[j] = sp[j], sp[i] }