node.go 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366
  1. package node
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/tls"
  6. "encoding/json"
  7. "io/ioutil"
  8. "math"
  9. "net"
  10. "os"
  11. "path/filepath"
  12. "reflect"
  13. "sort"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/docker/swarmkit/ca/keyutils"
  18. "github.com/docker/swarmkit/identity"
  19. "github.com/docker/docker/pkg/plugingetter"
  20. "github.com/docker/go-metrics"
  21. "github.com/docker/libnetwork/drivers/overlay/overlayutils"
  22. "github.com/docker/swarmkit/agent"
  23. "github.com/docker/swarmkit/agent/exec"
  24. "github.com/docker/swarmkit/api"
  25. "github.com/docker/swarmkit/ca"
  26. "github.com/docker/swarmkit/connectionbroker"
  27. "github.com/docker/swarmkit/ioutils"
  28. "github.com/docker/swarmkit/log"
  29. "github.com/docker/swarmkit/manager"
  30. "github.com/docker/swarmkit/manager/allocator/cnmallocator"
  31. "github.com/docker/swarmkit/manager/encryption"
  32. "github.com/docker/swarmkit/remotes"
  33. "github.com/docker/swarmkit/xnet"
  34. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  35. "github.com/pkg/errors"
  36. "github.com/sirupsen/logrus"
  37. bolt "go.etcd.io/bbolt"
  38. "google.golang.org/grpc"
  39. "google.golang.org/grpc/credentials"
  40. "google.golang.org/grpc/status"
  41. )
  42. const (
  43. stateFilename = "state.json"
  44. roleChangeTimeout = 16 * time.Second
  45. )
  46. var (
  47. nodeInfo metrics.LabeledGauge
  48. nodeManager metrics.Gauge
  49. errNodeStarted = errors.New("node: already started")
  50. errNodeNotStarted = errors.New("node: not started")
  51. certDirectory = "certificates"
  52. // ErrInvalidUnlockKey is returned when we can't decrypt the TLS certificate
  53. ErrInvalidUnlockKey = errors.New("node is locked, and needs a valid unlock key")
  54. // ErrMandatoryFIPS is returned when the cluster we are joining mandates FIPS, but we are running in non-FIPS mode
  55. ErrMandatoryFIPS = errors.New("node is not FIPS-enabled but cluster requires FIPS")
  56. )
  57. func init() {
  58. ns := metrics.NewNamespace("swarm", "node", nil)
  59. nodeInfo = ns.NewLabeledGauge("info", "Information related to the swarm", "",
  60. "swarm_id",
  61. "node_id",
  62. )
  63. nodeManager = ns.NewGauge("manager", "Whether this node is a manager or not", "")
  64. metrics.Register(ns)
  65. }
  66. // Config provides values for a Node.
  67. type Config struct {
  68. // Hostname is the name of host for agent instance.
  69. Hostname string
  70. // JoinAddr specifies node that should be used for the initial connection to
  71. // other manager in cluster. This should be only one address and optional,
  72. // the actual remotes come from the stored state.
  73. JoinAddr string
  74. // StateDir specifies the directory the node uses to keep the state of the
  75. // remote managers and certificates.
  76. StateDir string
  77. // JoinToken is the token to be used on the first certificate request.
  78. JoinToken string
  79. // ExternalCAs is a list of CAs to which a manager node
  80. // will make certificate signing requests for node certificates.
  81. ExternalCAs []*api.ExternalCA
  82. // ForceNewCluster creates a new cluster from current raft state.
  83. ForceNewCluster bool
  84. // ListenControlAPI specifies address the control API should listen on.
  85. ListenControlAPI string
  86. // ListenRemoteAPI specifies the address for the remote API that agents
  87. // and raft members connect to.
  88. ListenRemoteAPI string
  89. // AdvertiseRemoteAPI specifies the address that should be advertised
  90. // for connections to the remote API (including the raft service).
  91. AdvertiseRemoteAPI string
  92. // NetworkConfig stores network related config for the cluster
  93. NetworkConfig *cnmallocator.NetworkConfig
  94. // Executor specifies the executor to use for the agent.
  95. Executor exec.Executor
  96. // ElectionTick defines the amount of ticks needed without
  97. // leader to trigger a new election
  98. ElectionTick uint32
  99. // HeartbeatTick defines the amount of ticks between each
  100. // heartbeat sent to other members for health-check purposes
  101. HeartbeatTick uint32
  102. // AutoLockManagers determines whether or not an unlock key will be generated
  103. // when bootstrapping a new cluster for the first time
  104. AutoLockManagers bool
  105. // UnlockKey is the key to unlock a node - used for decrypting at rest. This
  106. // only applies to nodes that have already joined a cluster.
  107. UnlockKey []byte
  108. // Availability allows a user to control the current scheduling status of a node
  109. Availability api.NodeSpec_Availability
  110. // PluginGetter provides access to docker's plugin inventory.
  111. PluginGetter plugingetter.PluginGetter
  112. // FIPS is a boolean stating whether the node is FIPS enabled
  113. FIPS bool
  114. }
  115. // Node implements the primary node functionality for a member of a swarm
  116. // cluster. Node handles workloads and may also run as a manager.
  117. type Node struct {
  118. sync.RWMutex
  119. config *Config
  120. remotes *persistentRemotes
  121. connBroker *connectionbroker.Broker
  122. role string
  123. roleCond *sync.Cond
  124. conn *grpc.ClientConn
  125. connCond *sync.Cond
  126. nodeID string
  127. started chan struct{}
  128. startOnce sync.Once
  129. stopped chan struct{}
  130. stopOnce sync.Once
  131. ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
  132. closed chan struct{}
  133. err error
  134. agent *agent.Agent
  135. manager *manager.Manager
  136. notifyNodeChange chan *agent.NodeChanges // used by the agent to relay node updates from the dispatcher Session stream to (*Node).run
  137. unlockKey []byte
  138. vxlanUDPPort uint32
  139. }
  140. type lastSeenRole struct {
  141. role api.NodeRole
  142. }
  143. // observe notes the latest value of this node role, and returns true if it
  144. // is the first seen value, or is different from the most recently seen value.
  145. func (l *lastSeenRole) observe(newRole api.NodeRole) bool {
  146. changed := l.role != newRole
  147. l.role = newRole
  148. return changed
  149. }
  150. // RemoteAPIAddr returns address on which remote manager api listens.
  151. // Returns nil if node is not manager.
  152. func (n *Node) RemoteAPIAddr() (string, error) {
  153. n.RLock()
  154. defer n.RUnlock()
  155. if n.manager == nil {
  156. return "", errors.New("manager is not running")
  157. }
  158. addr := n.manager.Addr()
  159. if addr == "" {
  160. return "", errors.New("manager addr is not set")
  161. }
  162. return addr, nil
  163. }
  164. // New returns new Node instance.
  165. func New(c *Config) (*Node, error) {
  166. if err := os.MkdirAll(c.StateDir, 0700); err != nil {
  167. return nil, err
  168. }
  169. stateFile := filepath.Join(c.StateDir, stateFilename)
  170. dt, err := ioutil.ReadFile(stateFile)
  171. var p []api.Peer
  172. if err != nil && !os.IsNotExist(err) {
  173. return nil, err
  174. }
  175. if err == nil {
  176. if err := json.Unmarshal(dt, &p); err != nil {
  177. return nil, err
  178. }
  179. }
  180. n := &Node{
  181. remotes: newPersistentRemotes(stateFile, p...),
  182. role: ca.WorkerRole,
  183. config: c,
  184. started: make(chan struct{}),
  185. stopped: make(chan struct{}),
  186. closed: make(chan struct{}),
  187. ready: make(chan struct{}),
  188. notifyNodeChange: make(chan *agent.NodeChanges, 1),
  189. unlockKey: c.UnlockKey,
  190. }
  191. if n.config.JoinAddr != "" || n.config.ForceNewCluster {
  192. n.remotes = newPersistentRemotes(filepath.Join(n.config.StateDir, stateFilename))
  193. if n.config.JoinAddr != "" {
  194. n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, remotes.DefaultObservationWeight)
  195. }
  196. }
  197. n.connBroker = connectionbroker.New(n.remotes)
  198. n.roleCond = sync.NewCond(n.RLocker())
  199. n.connCond = sync.NewCond(n.RLocker())
  200. return n, nil
  201. }
  202. // BindRemote starts a listener that exposes the remote API.
  203. func (n *Node) BindRemote(ctx context.Context, listenAddr string, advertiseAddr string) error {
  204. n.RLock()
  205. defer n.RUnlock()
  206. if n.manager == nil {
  207. return errors.New("manager is not running")
  208. }
  209. return n.manager.BindRemote(ctx, manager.RemoteAddrs{
  210. ListenAddr: listenAddr,
  211. AdvertiseAddr: advertiseAddr,
  212. })
  213. }
  214. // Start starts a node instance.
  215. func (n *Node) Start(ctx context.Context) error {
  216. err := errNodeStarted
  217. n.startOnce.Do(func() {
  218. close(n.started)
  219. go n.run(ctx)
  220. err = nil // clear error above, only once.
  221. })
  222. return err
  223. }
  224. func (n *Node) currentRole() api.NodeRole {
  225. n.Lock()
  226. currentRole := api.NodeRoleWorker
  227. if n.role == ca.ManagerRole {
  228. currentRole = api.NodeRoleManager
  229. }
  230. n.Unlock()
  231. return currentRole
  232. }
  233. // configVXLANUDPPort sets vxlan port in libnetwork
  234. func configVXLANUDPPort(ctx context.Context, vxlanUDPPort uint32) {
  235. if err := overlayutils.ConfigVXLANUDPPort(vxlanUDPPort); err != nil {
  236. log.G(ctx).WithError(err).Error("failed to configure VXLAN UDP port")
  237. return
  238. }
  239. logrus.Infof("initialized VXLAN UDP port to %d ", vxlanUDPPort)
  240. }
  241. func (n *Node) run(ctx context.Context) (err error) {
  242. defer func() {
  243. n.err = err
  244. // close the n.closed channel to indicate that the Node has completely
  245. // terminated
  246. close(n.closed)
  247. }()
  248. ctx, cancel := context.WithCancel(ctx)
  249. defer cancel()
  250. ctx = log.WithModule(ctx, "node")
  251. // set up a goroutine to monitor the stop channel, and cancel the run
  252. // context when the node is stopped
  253. go func(ctx context.Context) {
  254. select {
  255. case <-ctx.Done():
  256. case <-n.stopped:
  257. cancel()
  258. }
  259. }(ctx)
  260. // First thing's first: get the SecurityConfig for this node. This includes
  261. // the certificate information, and the root CA. It also returns a cancel
  262. // function. This is needed because the SecurityConfig is a live object,
  263. // and provides a watch queue so that caller can observe changes to the
  264. // security config. This watch queue has to be closed, which is done by the
  265. // secConfigCancel function.
  266. //
  267. // It's also noteworthy that loading the security config with the node's
  268. // loadSecurityConfig method has the side effect of setting the node's ID
  269. // and role fields, meaning it isn't until after that point that node knows
  270. // its ID
  271. paths := ca.NewConfigPaths(filepath.Join(n.config.StateDir, certDirectory))
  272. securityConfig, secConfigCancel, err := n.loadSecurityConfig(ctx, paths)
  273. if err != nil {
  274. return err
  275. }
  276. defer secConfigCancel()
  277. // Now that we have the security config, we can get a TLSRenewer, which is
  278. // a live component handling certificate rotation.
  279. renewer := ca.NewTLSRenewer(securityConfig, n.connBroker, paths.RootCA)
  280. // Now that we have the security goop all loaded, we know the Node's ID and
  281. // can add that to our logging context.
  282. ctx = log.WithLogger(ctx, log.G(ctx).WithField("node.id", n.NodeID()))
  283. // Next, set up the task database. The task database is used by the agent
  284. // to keep a persistent local record of its tasks. Since every manager also
  285. // has an agent, every node needs a task database, so we do this regardless
  286. // of role.
  287. taskDBPath := filepath.Join(n.config.StateDir, "worker", "tasks.db")
  288. // Doing os.MkdirAll will create the necessary directory path for the task
  289. // database if it doesn't already exist, and if it does already exist, no
  290. // error will be returned, so we use this regardless of whether this node
  291. // is new or not.
  292. if err := os.MkdirAll(filepath.Dir(taskDBPath), 0777); err != nil {
  293. return err
  294. }
  295. db, err := bolt.Open(taskDBPath, 0666, nil)
  296. if err != nil {
  297. return err
  298. }
  299. defer db.Close()
  300. // agentDone is a channel that represents the agent having exited. We start
  301. // the agent in a goroutine a few blocks down, and before that goroutine
  302. // exits, it closes this channel to signal to the goroutine just below to
  303. // terminate.
  304. agentDone := make(chan struct{})
  305. // This goroutine is the node changes loop. The n.notifyNodeChange
  306. // channel is passed to the agent. When an new node object gets sent down
  307. // to the agent, it gets passed back up to this node object, so that we can
  308. // check if a role update or a root certificate rotation is required. This
  309. // handles root rotation, but the renewer handles regular certification
  310. // rotation.
  311. go func() {
  312. // lastNodeDesiredRole is the last-seen value of Node.Spec.DesiredRole,
  313. // used to make role changes "edge triggered" and avoid renewal loops.
  314. lastNodeDesiredRole := lastSeenRole{role: n.currentRole()}
  315. for {
  316. select {
  317. case <-agentDone:
  318. return
  319. case nodeChanges := <-n.notifyNodeChange:
  320. if nodeChanges.Node != nil {
  321. if nodeChanges.Node.VXLANUDPPort != 0 {
  322. n.vxlanUDPPort = nodeChanges.Node.VXLANUDPPort
  323. configVXLANUDPPort(ctx, n.vxlanUDPPort)
  324. }
  325. // This is a bit complex to be backward compatible with older CAs that
  326. // don't support the Node.Role field. They only use what's presently
  327. // called DesiredRole.
  328. // 1) If DesiredRole changes, kick off a certificate renewal. The renewal
  329. // is delayed slightly to give Role time to change as well if this is
  330. // a newer CA. If the certificate we get back doesn't have the expected
  331. // role, we continue renewing with exponential backoff.
  332. // 2) If the server is sending us IssuanceStateRotate, renew the cert as
  333. // requested by the CA.
  334. desiredRoleChanged := lastNodeDesiredRole.observe(nodeChanges.Node.Spec.DesiredRole)
  335. if desiredRoleChanged {
  336. switch nodeChanges.Node.Spec.DesiredRole {
  337. case api.NodeRoleManager:
  338. renewer.SetExpectedRole(ca.ManagerRole)
  339. case api.NodeRoleWorker:
  340. renewer.SetExpectedRole(ca.WorkerRole)
  341. }
  342. }
  343. if desiredRoleChanged || nodeChanges.Node.Certificate.Status.State == api.IssuanceStateRotate {
  344. renewer.Renew()
  345. }
  346. }
  347. if nodeChanges.RootCert != nil {
  348. if bytes.Equal(nodeChanges.RootCert, securityConfig.RootCA().Certs) {
  349. continue
  350. }
  351. newRootCA, err := ca.NewRootCA(nodeChanges.RootCert, nil, nil, ca.DefaultNodeCertExpiration, nil)
  352. if err != nil {
  353. log.G(ctx).WithError(err).Error("invalid new root certificate from the dispatcher")
  354. continue
  355. }
  356. if err := securityConfig.UpdateRootCA(&newRootCA); err != nil {
  357. log.G(ctx).WithError(err).Error("could not use new root CA from dispatcher")
  358. continue
  359. }
  360. if err := ca.SaveRootCA(newRootCA, paths.RootCA); err != nil {
  361. log.G(ctx).WithError(err).Error("could not save new root certificate from the dispatcher")
  362. continue
  363. }
  364. }
  365. }
  366. }
  367. }()
  368. // Now we're going to launch the main component goroutines, the Agent, the
  369. // Manager (maybe) and the certificate updates loop. We shouldn't exit
  370. // the node object until all 3 of these components have terminated, so we
  371. // create a waitgroup to block termination of the node until then
  372. var wg sync.WaitGroup
  373. wg.Add(3)
  374. // These two blocks update some of the metrics settings.
  375. nodeInfo.WithValues(
  376. securityConfig.ClientTLSCreds.Organization(),
  377. securityConfig.ClientTLSCreds.NodeID(),
  378. ).Set(1)
  379. if n.currentRole() == api.NodeRoleManager {
  380. nodeManager.Set(1)
  381. } else {
  382. nodeManager.Set(0)
  383. }
  384. // We created the renewer way up when we were creating the SecurityConfig
  385. // at the beginning of run, but now we're ready to start receiving
  386. // CertificateUpdates, and launch a goroutine to handle this. Updates is a
  387. // channel we iterate containing the results of certificate renewals.
  388. updates := renewer.Start(ctx)
  389. go func() {
  390. for certUpdate := range updates {
  391. if certUpdate.Err != nil {
  392. logrus.Warnf("error renewing TLS certificate: %v", certUpdate.Err)
  393. continue
  394. }
  395. // Set the new role, and notify our waiting role changing logic
  396. // that the role has changed.
  397. n.Lock()
  398. n.role = certUpdate.Role
  399. n.roleCond.Broadcast()
  400. n.Unlock()
  401. // Export the new role for metrics
  402. if n.currentRole() == api.NodeRoleManager {
  403. nodeManager.Set(1)
  404. } else {
  405. nodeManager.Set(0)
  406. }
  407. }
  408. wg.Done()
  409. }()
  410. // and, finally, start the two main components: the manager and the agent
  411. role := n.role
  412. // Channels to signal when these respective components are up and ready to
  413. // go.
  414. managerReady := make(chan struct{})
  415. agentReady := make(chan struct{})
  416. // these variables are defined in this scope so that they're closed on by
  417. // respective goroutines below.
  418. var managerErr error
  419. var agentErr error
  420. go func() {
  421. // superviseManager is a routine that watches our manager role
  422. managerErr = n.superviseManager(ctx, securityConfig, paths.RootCA, managerReady, renewer) // store err and loop
  423. wg.Done()
  424. cancel()
  425. }()
  426. go func() {
  427. agentErr = n.runAgent(ctx, db, securityConfig, agentReady)
  428. wg.Done()
  429. cancel()
  430. close(agentDone)
  431. }()
  432. // This goroutine is what signals that the node has fully started by
  433. // closing the n.ready channel. First, it waits for the agent to start.
  434. // Then, if this node is a manager, it will wait on either the manager
  435. // starting, or the node role changing. This ensures that if the node is
  436. // demoted before the manager starts, it doesn't get stuck.
  437. go func() {
  438. <-agentReady
  439. if role == ca.ManagerRole {
  440. workerRole := make(chan struct{})
  441. waitRoleCtx, waitRoleCancel := context.WithCancel(ctx)
  442. go func() {
  443. if n.waitRole(waitRoleCtx, ca.WorkerRole) == nil {
  444. close(workerRole)
  445. }
  446. }()
  447. select {
  448. case <-managerReady:
  449. case <-workerRole:
  450. }
  451. waitRoleCancel()
  452. }
  453. close(n.ready)
  454. }()
  455. // And, finally, we park and wait for the node to close up. If we get any
  456. // error other than context canceled, we return it.
  457. wg.Wait()
  458. if managerErr != nil && errors.Cause(managerErr) != context.Canceled {
  459. return managerErr
  460. }
  461. if agentErr != nil && errors.Cause(agentErr) != context.Canceled {
  462. return agentErr
  463. }
  464. // NOTE(dperny): we return err here, but the last time I can see err being
  465. // set is when we open the boltdb way up in this method, so I don't know
  466. // what returning err is supposed to do.
  467. return err
  468. }
  469. // Stop stops node execution
  470. func (n *Node) Stop(ctx context.Context) error {
  471. select {
  472. case <-n.started:
  473. default:
  474. return errNodeNotStarted
  475. }
  476. // ask agent to clean up assignments
  477. n.Lock()
  478. if n.agent != nil {
  479. if err := n.agent.Leave(ctx); err != nil {
  480. log.G(ctx).WithError(err).Error("agent failed to clean up assignments")
  481. }
  482. }
  483. n.Unlock()
  484. n.stopOnce.Do(func() {
  485. close(n.stopped)
  486. })
  487. select {
  488. case <-n.closed:
  489. return nil
  490. case <-ctx.Done():
  491. return ctx.Err()
  492. }
  493. }
  494. // Err returns the error that caused the node to shutdown or nil. Err blocks
  495. // until the node has fully shut down.
  496. func (n *Node) Err(ctx context.Context) error {
  497. select {
  498. case <-n.closed:
  499. return n.err
  500. case <-ctx.Done():
  501. return ctx.Err()
  502. }
  503. }
  504. // runAgent starts the node's agent. When the agent has started, the provided
  505. // ready channel is closed. When the agent exits, this will return the error
  506. // that caused it.
  507. func (n *Node) runAgent(ctx context.Context, db *bolt.DB, securityConfig *ca.SecurityConfig, ready chan<- struct{}) error {
  508. // First, get a channel for knowing when a remote peer has been selected.
  509. // The value returned from the remotesCh is ignored, we just need to know
  510. // when the peer is selected
  511. remotesCh := n.remotes.WaitSelect(ctx)
  512. // then, we set up a new context to pass specifically to
  513. // ListenControlSocket, and start that method to wait on a connection on
  514. // the cluster control API.
  515. waitCtx, waitCancel := context.WithCancel(ctx)
  516. controlCh := n.ListenControlSocket(waitCtx)
  517. // The goal here to wait either until we have a remote peer selected, or
  518. // connection to the control
  519. // socket. These are both ways to connect the
  520. // agent to a manager, and we need to wait until one or the other is
  521. // available to start the agent
  522. waitPeer:
  523. for {
  524. select {
  525. case <-ctx.Done():
  526. break waitPeer
  527. case <-remotesCh:
  528. break waitPeer
  529. case conn := <-controlCh:
  530. // conn will probably be nil the first time we call this, probably,
  531. // but only a non-nil conn represent an actual connection.
  532. if conn != nil {
  533. break waitPeer
  534. }
  535. }
  536. }
  537. // We can stop listening for new control socket connections once we're
  538. // ready
  539. waitCancel()
  540. // NOTE(dperny): not sure why we need to recheck the context here. I guess
  541. // it avoids a race if the context was canceled at the same time that a
  542. // connection or peer was available. I think it's just an optimization.
  543. select {
  544. case <-ctx.Done():
  545. return ctx.Err()
  546. default:
  547. }
  548. // Now we can go ahead and configure, create, and start the agent.
  549. secChangesCh, secChangesCancel := securityConfig.Watch()
  550. defer secChangesCancel()
  551. rootCA := securityConfig.RootCA()
  552. issuer := securityConfig.IssuerInfo()
  553. agentConfig := &agent.Config{
  554. Hostname: n.config.Hostname,
  555. ConnBroker: n.connBroker,
  556. Executor: n.config.Executor,
  557. DB: db,
  558. NotifyNodeChange: n.notifyNodeChange,
  559. NotifyTLSChange: secChangesCh,
  560. Credentials: securityConfig.ClientTLSCreds,
  561. NodeTLSInfo: &api.NodeTLSInfo{
  562. TrustRoot: rootCA.Certs,
  563. CertIssuerPublicKey: issuer.PublicKey,
  564. CertIssuerSubject: issuer.Subject,
  565. },
  566. FIPS: n.config.FIPS,
  567. }
  568. // if a join address has been specified, then if the agent fails to connect
  569. // due to a TLS error, fail fast - don't keep re-trying to join
  570. if n.config.JoinAddr != "" {
  571. agentConfig.SessionTracker = &firstSessionErrorTracker{}
  572. }
  573. a, err := agent.New(agentConfig)
  574. if err != nil {
  575. return err
  576. }
  577. if err := a.Start(ctx); err != nil {
  578. return err
  579. }
  580. n.Lock()
  581. n.agent = a
  582. n.Unlock()
  583. defer func() {
  584. n.Lock()
  585. n.agent = nil
  586. n.Unlock()
  587. }()
  588. // when the agent indicates that it is ready, we close the ready channel.
  589. go func() {
  590. <-a.Ready()
  591. close(ready)
  592. }()
  593. // todo: manually call stop on context cancellation?
  594. return a.Err(context.Background())
  595. }
  596. // Ready returns a channel that is closed after node's initialization has
  597. // completes for the first time.
  598. func (n *Node) Ready() <-chan struct{} {
  599. return n.ready
  600. }
  601. func (n *Node) setControlSocket(conn *grpc.ClientConn) {
  602. n.Lock()
  603. if n.conn != nil {
  604. n.conn.Close()
  605. }
  606. n.conn = conn
  607. n.connBroker.SetLocalConn(conn)
  608. n.connCond.Broadcast()
  609. n.Unlock()
  610. }
  611. // ListenControlSocket listens changes of a connection for managing the
  612. // cluster control api
  613. func (n *Node) ListenControlSocket(ctx context.Context) <-chan *grpc.ClientConn {
  614. c := make(chan *grpc.ClientConn, 1)
  615. n.RLock()
  616. conn := n.conn
  617. c <- conn
  618. done := make(chan struct{})
  619. go func() {
  620. select {
  621. case <-ctx.Done():
  622. n.connCond.Broadcast()
  623. case <-done:
  624. }
  625. }()
  626. go func() {
  627. defer close(c)
  628. defer close(done)
  629. defer n.RUnlock()
  630. for {
  631. select {
  632. case <-ctx.Done():
  633. return
  634. default:
  635. }
  636. if conn == n.conn {
  637. n.connCond.Wait()
  638. continue
  639. }
  640. conn = n.conn
  641. select {
  642. case c <- conn:
  643. case <-ctx.Done():
  644. return
  645. }
  646. }
  647. }()
  648. return c
  649. }
  650. // NodeID returns current node's ID. May be empty if not set.
  651. func (n *Node) NodeID() string {
  652. n.RLock()
  653. defer n.RUnlock()
  654. return n.nodeID
  655. }
  656. // Manager returns manager instance started by node. May be nil.
  657. func (n *Node) Manager() *manager.Manager {
  658. n.RLock()
  659. defer n.RUnlock()
  660. return n.manager
  661. }
  662. // Agent returns agent instance started by node. May be nil.
  663. func (n *Node) Agent() *agent.Agent {
  664. n.RLock()
  665. defer n.RUnlock()
  666. return n.agent
  667. }
  668. // IsStateDirty returns true if any objects have been added to raft which make
  669. // the state "dirty". Currently, the existence of any object other than the
  670. // default cluster or the local node implies a dirty state.
  671. func (n *Node) IsStateDirty() (bool, error) {
  672. n.RLock()
  673. defer n.RUnlock()
  674. if n.manager == nil {
  675. return false, errors.New("node is not a manager")
  676. }
  677. return n.manager.IsStateDirty()
  678. }
  679. // Remotes returns a list of known peers known to node.
  680. func (n *Node) Remotes() []api.Peer {
  681. weights := n.remotes.Weights()
  682. remotes := make([]api.Peer, 0, len(weights))
  683. for p := range weights {
  684. remotes = append(remotes, p)
  685. }
  686. return remotes
  687. }
  688. // Given a cluster ID, returns whether the cluster ID indicates that the cluster
  689. // mandates FIPS mode. These cluster IDs start with "FIPS." as a prefix.
  690. func isMandatoryFIPSClusterID(securityConfig *ca.SecurityConfig) bool {
  691. return strings.HasPrefix(securityConfig.ClientTLSCreds.Organization(), "FIPS.")
  692. }
  693. // Given a join token, returns whether it indicates that the cluster mandates FIPS
  694. // mode.
  695. func isMandatoryFIPSClusterJoinToken(joinToken string) bool {
  696. if parsed, err := ca.ParseJoinToken(joinToken); err == nil {
  697. return parsed.FIPS
  698. }
  699. return false
  700. }
  701. func generateFIPSClusterID() string {
  702. return "FIPS." + identity.NewID()
  703. }
  704. func (n *Node) loadSecurityConfig(ctx context.Context, paths *ca.SecurityConfigPaths) (*ca.SecurityConfig, func() error, error) {
  705. var (
  706. securityConfig *ca.SecurityConfig
  707. cancel func() error
  708. )
  709. krw := ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{FIPS: n.config.FIPS})
  710. // if FIPS is required, we want to make sure our key is stored in PKCS8 format
  711. if n.config.FIPS {
  712. krw.SetKeyFormatter(keyutils.FIPS)
  713. }
  714. if err := krw.Migrate(); err != nil {
  715. return nil, nil, err
  716. }
  717. // Check if we already have a valid certificates on disk.
  718. rootCA, err := ca.GetLocalRootCA(paths.RootCA)
  719. if err != nil && err != ca.ErrNoLocalRootCA {
  720. return nil, nil, err
  721. }
  722. if err == nil {
  723. // if forcing a new cluster, we allow the certificates to be expired - a new set will be generated
  724. securityConfig, cancel, err = ca.LoadSecurityConfig(ctx, rootCA, krw, n.config.ForceNewCluster)
  725. if err != nil {
  726. _, isInvalidKEK := errors.Cause(err).(ca.ErrInvalidKEK)
  727. if isInvalidKEK {
  728. return nil, nil, ErrInvalidUnlockKey
  729. } else if !os.IsNotExist(err) {
  730. return nil, nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert)
  731. }
  732. }
  733. }
  734. if securityConfig == nil {
  735. if n.config.JoinAddr == "" {
  736. // if we're not joining a cluster, bootstrap a new one - and we have to set the unlock key
  737. n.unlockKey = nil
  738. if n.config.AutoLockManagers {
  739. n.unlockKey = encryption.GenerateSecretKey()
  740. }
  741. krw = ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{FIPS: n.config.FIPS})
  742. rootCA, err = ca.CreateRootCA(ca.DefaultRootCN)
  743. if err != nil {
  744. return nil, nil, err
  745. }
  746. if err := ca.SaveRootCA(rootCA, paths.RootCA); err != nil {
  747. return nil, nil, err
  748. }
  749. log.G(ctx).Debug("generated CA key and certificate")
  750. } else if err == ca.ErrNoLocalRootCA { // from previous error loading the root CA from disk
  751. // if we are attempting to join another cluster, which has a FIPS join token, and we are not FIPS, error
  752. if n.config.JoinAddr != "" && isMandatoryFIPSClusterJoinToken(n.config.JoinToken) && !n.config.FIPS {
  753. return nil, nil, ErrMandatoryFIPS
  754. }
  755. rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.connBroker)
  756. if err != nil {
  757. return nil, nil, err
  758. }
  759. log.G(ctx).Debug("downloaded CA certificate")
  760. }
  761. // Obtain new certs and setup TLS certificates renewal for this node:
  762. // - If certificates weren't present on disk, we call CreateSecurityConfig, which blocks
  763. // until a valid certificate has been issued.
  764. // - We wait for CreateSecurityConfig to finish since we need a certificate to operate.
  765. // Attempt to load certificate from disk
  766. securityConfig, cancel, err = ca.LoadSecurityConfig(ctx, rootCA, krw, n.config.ForceNewCluster)
  767. if err == nil {
  768. log.G(ctx).WithFields(logrus.Fields{
  769. "node.id": securityConfig.ClientTLSCreds.NodeID(),
  770. }).Debugf("loaded TLS certificate")
  771. } else {
  772. if _, ok := errors.Cause(err).(ca.ErrInvalidKEK); ok {
  773. return nil, nil, ErrInvalidUnlockKey
  774. }
  775. log.G(ctx).WithError(err).Debugf("no node credentials found in: %s", krw.Target())
  776. // if we are attempting to join another cluster, which has a FIPS join token, and we are not FIPS, error
  777. if n.config.JoinAddr != "" && isMandatoryFIPSClusterJoinToken(n.config.JoinToken) && !n.config.FIPS {
  778. return nil, nil, ErrMandatoryFIPS
  779. }
  780. requestConfig := ca.CertificateRequestConfig{
  781. Token: n.config.JoinToken,
  782. Availability: n.config.Availability,
  783. ConnBroker: n.connBroker,
  784. }
  785. // If this is a new cluster, we want to name the cluster ID "FIPS-something"
  786. if n.config.FIPS {
  787. requestConfig.Organization = generateFIPSClusterID()
  788. }
  789. securityConfig, cancel, err = rootCA.CreateSecurityConfig(ctx, krw, requestConfig)
  790. if err != nil {
  791. return nil, nil, err
  792. }
  793. }
  794. }
  795. if isMandatoryFIPSClusterID(securityConfig) && !n.config.FIPS {
  796. return nil, nil, ErrMandatoryFIPS
  797. }
  798. n.Lock()
  799. n.role = securityConfig.ClientTLSCreds.Role()
  800. n.nodeID = securityConfig.ClientTLSCreds.NodeID()
  801. n.roleCond.Broadcast()
  802. n.Unlock()
  803. return securityConfig, cancel, nil
  804. }
  805. func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
  806. opts := []grpc.DialOption{
  807. grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
  808. grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
  809. grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
  810. }
  811. insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
  812. opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
  813. addr := n.config.ListenControlAPI
  814. opts = append(opts, grpc.WithDialer(
  815. func(addr string, timeout time.Duration) (net.Conn, error) {
  816. return xnet.DialTimeoutLocal(addr, timeout)
  817. }))
  818. conn, err := grpc.Dial(addr, opts...)
  819. if err != nil {
  820. return err
  821. }
  822. client := api.NewHealthClient(conn)
  823. for {
  824. resp, err := client.Check(ctx, &api.HealthCheckRequest{Service: "ControlAPI"})
  825. if err != nil {
  826. return err
  827. }
  828. if resp.Status == api.HealthCheckResponse_SERVING {
  829. break
  830. }
  831. time.Sleep(500 * time.Millisecond)
  832. }
  833. n.setControlSocket(conn)
  834. if ready != nil {
  835. close(ready)
  836. }
  837. return nil
  838. }
  839. // waitRole takes a context and a role. it the blocks until the context is
  840. // canceled or the node's role updates to the provided role. returns nil when
  841. // the node has acquired the provided role, or ctx.Err() if the context is
  842. // canceled
  843. func (n *Node) waitRole(ctx context.Context, role string) error {
  844. n.roleCond.L.Lock()
  845. if role == n.role {
  846. n.roleCond.L.Unlock()
  847. return nil
  848. }
  849. finishCh := make(chan struct{})
  850. defer close(finishCh)
  851. go func() {
  852. select {
  853. case <-finishCh:
  854. case <-ctx.Done():
  855. // call broadcast to shutdown this function
  856. n.roleCond.Broadcast()
  857. }
  858. }()
  859. defer n.roleCond.L.Unlock()
  860. for role != n.role {
  861. n.roleCond.Wait()
  862. select {
  863. case <-ctx.Done():
  864. return ctx.Err()
  865. default:
  866. }
  867. }
  868. return nil
  869. }
  870. // runManager runs the manager on this node. It returns a boolean indicating if
  871. // the stoppage was due to a role change, and an error indicating why the
  872. // manager stopped
  873. func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, rootPaths ca.CertPaths, ready chan struct{}, workerRole <-chan struct{}) (bool, error) {
  874. // First, set up this manager's advertise and listen addresses, if
  875. // provided. they might not be provided if this node is joining the cluster
  876. // instead of creating a new one.
  877. var remoteAPI *manager.RemoteAddrs
  878. if n.config.ListenRemoteAPI != "" {
  879. remoteAPI = &manager.RemoteAddrs{
  880. ListenAddr: n.config.ListenRemoteAPI,
  881. AdvertiseAddr: n.config.AdvertiseRemoteAPI,
  882. }
  883. }
  884. joinAddr := n.config.JoinAddr
  885. if joinAddr == "" {
  886. remoteAddr, err := n.remotes.Select(n.NodeID())
  887. if err == nil {
  888. joinAddr = remoteAddr.Addr
  889. }
  890. }
  891. m, err := manager.New(&manager.Config{
  892. ForceNewCluster: n.config.ForceNewCluster,
  893. RemoteAPI: remoteAPI,
  894. ControlAPI: n.config.ListenControlAPI,
  895. SecurityConfig: securityConfig,
  896. ExternalCAs: n.config.ExternalCAs,
  897. JoinRaft: joinAddr,
  898. ForceJoin: n.config.JoinAddr != "",
  899. StateDir: n.config.StateDir,
  900. HeartbeatTick: n.config.HeartbeatTick,
  901. ElectionTick: n.config.ElectionTick,
  902. AutoLockManagers: n.config.AutoLockManagers,
  903. UnlockKey: n.unlockKey,
  904. Availability: n.config.Availability,
  905. PluginGetter: n.config.PluginGetter,
  906. RootCAPaths: rootPaths,
  907. FIPS: n.config.FIPS,
  908. NetworkConfig: n.config.NetworkConfig,
  909. })
  910. if err != nil {
  911. return false, err
  912. }
  913. // The done channel is used to signal that the manager has exited.
  914. done := make(chan struct{})
  915. // runErr is an error value set by the goroutine that runs the manager
  916. var runErr error
  917. // The context used to start this might have a logger associated with it
  918. // that we'd like to reuse, but we don't want to use that context, so we
  919. // pass to the goroutine only the logger, and create a new context with
  920. //that logger.
  921. go func(logger *logrus.Entry) {
  922. if err := m.Run(log.WithLogger(context.Background(), logger)); err != nil {
  923. runErr = err
  924. }
  925. close(done)
  926. }(log.G(ctx))
  927. // clearData is set in the select below, and is used to signal why the
  928. // manager is stopping, and indicate whether or not to delete raft data and
  929. // keys when stopping the manager.
  930. var clearData bool
  931. defer func() {
  932. n.Lock()
  933. n.manager = nil
  934. n.Unlock()
  935. m.Stop(ctx, clearData)
  936. <-done
  937. n.setControlSocket(nil)
  938. }()
  939. n.Lock()
  940. n.manager = m
  941. n.Unlock()
  942. connCtx, connCancel := context.WithCancel(ctx)
  943. defer connCancel()
  944. // launch a goroutine that will manage our local connection to the manager
  945. // from the agent. Remember the managerReady channel created way back in
  946. // run? This is actually where we close it. Not when the manager starts,
  947. // but when a connection to the control socket has been established.
  948. go n.initManagerConnection(connCtx, ready)
  949. // wait for manager stop or for role change
  950. // The manager can be stopped one of 4 ways:
  951. // 1. The manager may have errored out and returned an error, closing the
  952. // done channel in the process
  953. // 2. The node may have been demoted to a worker. In this case, we're gonna
  954. // have to stop the manager ourselves, setting clearData to true so the
  955. // local raft data, certs, keys, etc, are nuked.
  956. // 3. The manager may have been booted from raft. This could happen if it's
  957. // removed from the raft quorum but the role update hasn't registered
  958. // yet. The fact that there is more than 1 code path to cause the
  959. // manager to exit is a possible source of bugs.
  960. // 4. The context may have been canceled from above, in which case we
  961. // should stop the manager ourselves, but indicate that this is NOT a
  962. // demotion.
  963. select {
  964. case <-done:
  965. return false, runErr
  966. case <-workerRole:
  967. log.G(ctx).Info("role changed to worker, stopping manager")
  968. clearData = true
  969. case <-m.RemovedFromRaft():
  970. log.G(ctx).Info("manager removed from raft cluster, stopping manager")
  971. clearData = true
  972. case <-ctx.Done():
  973. return false, ctx.Err()
  974. }
  975. return clearData, nil
  976. }
  977. // superviseManager controls whether or not we are running a manager on this
  978. // node
  979. func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.SecurityConfig, rootPaths ca.CertPaths, ready chan struct{}, renewer *ca.TLSRenewer) error {
  980. // superviseManager is a loop, because we can come in and out of being a
  981. // manager, and need to appropriately handle that without disrupting the
  982. // node functionality.
  983. for {
  984. // if we're not a manager, we're just gonna park here and wait until we
  985. // are. For normal agent nodes, we'll stay here forever, as intended.
  986. if err := n.waitRole(ctx, ca.ManagerRole); err != nil {
  987. return err
  988. }
  989. // Once we know we are a manager, we get ourselves ready for when we
  990. // lose that role. we create a channel to signal that we've become a
  991. // worker, and close it when n.waitRole completes.
  992. workerRole := make(chan struct{})
  993. waitRoleCtx, waitRoleCancel := context.WithCancel(ctx)
  994. go func() {
  995. if n.waitRole(waitRoleCtx, ca.WorkerRole) == nil {
  996. close(workerRole)
  997. }
  998. }()
  999. // the ready channel passed to superviseManager is in turn passed down
  1000. // to the runManager function. It's used to signal to the caller that
  1001. // the manager has started.
  1002. wasRemoved, err := n.runManager(ctx, securityConfig, rootPaths, ready, workerRole)
  1003. if err != nil {
  1004. waitRoleCancel()
  1005. return errors.Wrap(err, "manager stopped")
  1006. }
  1007. // If the manager stopped running and our role is still
  1008. // "manager", it's possible that the manager was demoted and
  1009. // the agent hasn't realized this yet. We should wait for the
  1010. // role to change instead of restarting the manager immediately.
  1011. err = func() error {
  1012. timer := time.NewTimer(roleChangeTimeout)
  1013. defer timer.Stop()
  1014. defer waitRoleCancel()
  1015. select {
  1016. case <-timer.C:
  1017. case <-workerRole:
  1018. return nil
  1019. case <-ctx.Done():
  1020. return ctx.Err()
  1021. }
  1022. if !wasRemoved {
  1023. log.G(ctx).Warn("failed to get worker role after manager stop, restarting manager")
  1024. return nil
  1025. }
  1026. // We need to be extra careful about restarting the
  1027. // manager. It may cause the node to wrongly join under
  1028. // a new Raft ID. Since we didn't see a role change
  1029. // yet, force a certificate renewal. If the certificate
  1030. // comes back with a worker role, we know we shouldn't
  1031. // restart the manager. However, if we don't see
  1032. // workerRole get closed, it means we didn't switch to
  1033. // a worker certificate, either because we couldn't
  1034. // contact a working CA, or because we've been
  1035. // re-promoted. In this case, we must assume we were
  1036. // re-promoted, and restart the manager.
  1037. log.G(ctx).Warn("failed to get worker role after manager stop, forcing certificate renewal")
  1038. // We can safely reset this timer without stopping/draining the timer
  1039. // first because the only way the code has reached this point is if the timer
  1040. // has already expired - if the role changed or the context were canceled,
  1041. // then we would have returned already.
  1042. timer.Reset(roleChangeTimeout)
  1043. renewer.Renew()
  1044. // Now that the renewal request has been sent to the
  1045. // renewal goroutine, wait for a change in role.
  1046. select {
  1047. case <-timer.C:
  1048. log.G(ctx).Warn("failed to get worker role after manager stop, restarting manager")
  1049. case <-workerRole:
  1050. case <-ctx.Done():
  1051. return ctx.Err()
  1052. }
  1053. return nil
  1054. }()
  1055. if err != nil {
  1056. return err
  1057. }
  1058. // set ready to nil after the first time we've gone through this, as we
  1059. // don't need to signal after the first time that the manager is ready.
  1060. ready = nil
  1061. }
  1062. }
  1063. // DowngradeKey reverts the node key to older format so that it can
  1064. // run on older version of swarmkit
  1065. func (n *Node) DowngradeKey() error {
  1066. paths := ca.NewConfigPaths(filepath.Join(n.config.StateDir, certDirectory))
  1067. krw := ca.NewKeyReadWriter(paths.Node, n.config.UnlockKey, nil)
  1068. return krw.DowngradeKey()
  1069. }
  1070. type persistentRemotes struct {
  1071. sync.RWMutex
  1072. c *sync.Cond
  1073. remotes.Remotes
  1074. storePath string
  1075. lastSavedState []api.Peer
  1076. }
  1077. func newPersistentRemotes(f string, peers ...api.Peer) *persistentRemotes {
  1078. pr := &persistentRemotes{
  1079. storePath: f,
  1080. Remotes: remotes.NewRemotes(peers...),
  1081. }
  1082. pr.c = sync.NewCond(pr.RLocker())
  1083. return pr
  1084. }
  1085. func (s *persistentRemotes) Observe(peer api.Peer, weight int) {
  1086. s.Lock()
  1087. defer s.Unlock()
  1088. s.Remotes.Observe(peer, weight)
  1089. s.c.Broadcast()
  1090. if err := s.save(); err != nil {
  1091. logrus.Errorf("error writing cluster state file: %v", err)
  1092. }
  1093. }
  1094. func (s *persistentRemotes) Remove(peers ...api.Peer) {
  1095. s.Lock()
  1096. defer s.Unlock()
  1097. s.Remotes.Remove(peers...)
  1098. if err := s.save(); err != nil {
  1099. logrus.Errorf("error writing cluster state file: %v", err)
  1100. }
  1101. }
  1102. func (s *persistentRemotes) save() error {
  1103. weights := s.Weights()
  1104. remotes := make([]api.Peer, 0, len(weights))
  1105. for r := range weights {
  1106. remotes = append(remotes, r)
  1107. }
  1108. sort.Sort(sortablePeers(remotes))
  1109. if reflect.DeepEqual(remotes, s.lastSavedState) {
  1110. return nil
  1111. }
  1112. dt, err := json.Marshal(remotes)
  1113. if err != nil {
  1114. return err
  1115. }
  1116. s.lastSavedState = remotes
  1117. return ioutils.AtomicWriteFile(s.storePath, dt, 0600)
  1118. }
  1119. // WaitSelect waits until at least one remote becomes available and then selects one.
  1120. func (s *persistentRemotes) WaitSelect(ctx context.Context) <-chan api.Peer {
  1121. c := make(chan api.Peer, 1)
  1122. s.RLock()
  1123. done := make(chan struct{})
  1124. go func() {
  1125. select {
  1126. case <-ctx.Done():
  1127. s.c.Broadcast()
  1128. case <-done:
  1129. }
  1130. }()
  1131. go func() {
  1132. defer s.RUnlock()
  1133. defer close(c)
  1134. defer close(done)
  1135. for {
  1136. if ctx.Err() != nil {
  1137. return
  1138. }
  1139. p, err := s.Select()
  1140. if err == nil {
  1141. c <- p
  1142. return
  1143. }
  1144. s.c.Wait()
  1145. }
  1146. }()
  1147. return c
  1148. }
  1149. // sortablePeers is a sort wrapper for []api.Peer
  1150. type sortablePeers []api.Peer
  1151. func (sp sortablePeers) Less(i, j int) bool { return sp[i].NodeID < sp[j].NodeID }
  1152. func (sp sortablePeers) Len() int { return len(sp) }
  1153. func (sp sortablePeers) Swap(i, j int) { sp[i], sp[j] = sp[j], sp[i] }
  1154. // firstSessionErrorTracker is a utility that helps determine whether the agent should exit after
  1155. // a TLS failure on establishing the first session. This should only happen if a join address
  1156. // is specified. If establishing the first session succeeds, but later on some session fails
  1157. // because of a TLS error, we don't want to exit the agent because a previously successful
  1158. // session indicates that the TLS error may be a transient issue.
  1159. type firstSessionErrorTracker struct {
  1160. mu sync.Mutex
  1161. pastFirstSession bool
  1162. err error
  1163. }
  1164. func (fs *firstSessionErrorTracker) SessionEstablished() {
  1165. fs.mu.Lock()
  1166. fs.pastFirstSession = true
  1167. fs.mu.Unlock()
  1168. }
  1169. func (fs *firstSessionErrorTracker) SessionError(err error) {
  1170. fs.mu.Lock()
  1171. fs.err = err
  1172. fs.mu.Unlock()
  1173. }
  1174. // SessionClosed returns an error if we haven't yet established a session, and
  1175. // we get a gprc error as a result of an X509 failure.
  1176. func (fs *firstSessionErrorTracker) SessionClosed() error {
  1177. fs.mu.Lock()
  1178. defer fs.mu.Unlock()
  1179. // if we've successfully established at least 1 session, never return
  1180. // errors
  1181. if fs.pastFirstSession {
  1182. return nil
  1183. }
  1184. // get the GRPC status from the error, because we only care about GRPC
  1185. // errors
  1186. grpcStatus, ok := status.FromError(fs.err)
  1187. // if this isn't a GRPC error, it's not an error we return from this method
  1188. if !ok {
  1189. return nil
  1190. }
  1191. // NOTE(dperny, cyli): grpc does not expose the error type, which means we have
  1192. // to string matching to figure out if it's an x509 error.
  1193. //
  1194. // The error we're looking for has "connection error:", then says
  1195. // "transport:" and finally has "x509:"
  1196. // specifically, the connection error description reads:
  1197. //
  1198. // transport: authentication handshake failed: x509: certificate signed by unknown authority
  1199. //
  1200. // This string matching has caused trouble in the past. specifically, at
  1201. // some point between grpc versions 1.3.0 and 1.7.5, the string we were
  1202. // matching changed from "transport: x509" to "transport: authentication
  1203. // handshake failed: x509", which was an issue because we were matching for
  1204. // string "transport: x509:".
  1205. //
  1206. // In GRPC >= 1.10.x, transient errors like TLS errors became hidden by the
  1207. // load balancing that GRPC does. In GRPC 1.11.x, they were exposed again
  1208. // (usually) in RPC calls, but the error string then became:
  1209. // rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: authentication handshake failed: x509: certificate signed by unknown authority"
  1210. //
  1211. // It also went from an Internal error to an Unavailable error. So we're just going
  1212. // to search for the string: "transport: authentication handshake failed: x509:" since
  1213. // we want to fail for ALL x509 failures, not just unknown authority errors.
  1214. if !strings.Contains(grpcStatus.Message(), "connection error") ||
  1215. !strings.Contains(grpcStatus.Message(), "transport: authentication handshake failed: x509:") {
  1216. return nil
  1217. }
  1218. return fs.err
  1219. }