noderunner.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package cluster
  2. import (
  3. "path/filepath"
  4. "runtime"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/Sirupsen/logrus"
  9. types "github.com/docker/docker/api/types/swarm"
  10. "github.com/docker/docker/daemon/cluster/executor/container"
  11. swarmapi "github.com/docker/swarmkit/api"
  12. swarmnode "github.com/docker/swarmkit/node"
  13. "github.com/pkg/errors"
  14. "golang.org/x/net/context"
  15. "google.golang.org/grpc"
  16. )
  17. // nodeRunner implements a manager for continuously running swarmkit node, restarting them with backoff delays if needed.
  18. type nodeRunner struct {
  19. nodeState
  20. mu sync.RWMutex
  21. done chan struct{} // closed when swarmNode exits
  22. ready chan struct{} // closed when swarmNode becomes active
  23. reconnectDelay time.Duration
  24. config nodeStartConfig
  25. repeatedRun bool
  26. cancelReconnect func()
  27. stopping bool
  28. cluster *Cluster // only for accessing config helpers, never call any methods. TODO: change to config struct
  29. }
  30. // nodeStartConfig holds configuration needed to start a new node. Exported
  31. // fields of this structure are saved to disk in json. Unexported fields
  32. // contain data that shouldn't be persisted between daemon reloads.
  33. type nodeStartConfig struct {
  34. // LocalAddr is this machine's local IP or hostname, if specified.
  35. LocalAddr string
  36. // RemoteAddr is the address that was given to "swarm join". It is used
  37. // to find LocalAddr if necessary.
  38. RemoteAddr string
  39. // ListenAddr is the address we bind to, including a port.
  40. ListenAddr string
  41. // AdvertiseAddr is the address other nodes should connect to,
  42. // including a port.
  43. AdvertiseAddr string
  44. joinAddr string
  45. forceNewCluster bool
  46. joinToken string
  47. lockKey []byte
  48. autolock bool
  49. }
  50. func (n *nodeRunner) Ready() chan error {
  51. c := make(chan error, 1)
  52. n.mu.RLock()
  53. ready, done := n.ready, n.done
  54. n.mu.RUnlock()
  55. go func() {
  56. select {
  57. case <-ready:
  58. case <-done:
  59. }
  60. select {
  61. case <-ready:
  62. default:
  63. n.mu.RLock()
  64. c <- n.err
  65. n.mu.RUnlock()
  66. }
  67. close(c)
  68. }()
  69. return c
  70. }
  71. func (n *nodeRunner) Start(conf nodeStartConfig) error {
  72. n.mu.Lock()
  73. defer n.mu.Unlock()
  74. n.reconnectDelay = initialReconnectDelay
  75. return n.start(conf)
  76. }
  77. func (n *nodeRunner) start(conf nodeStartConfig) error {
  78. var control string
  79. if runtime.GOOS == "windows" {
  80. control = `\\.\pipe\` + controlSocket
  81. } else {
  82. control = filepath.Join(n.cluster.runtimeRoot, controlSocket)
  83. }
  84. node, err := swarmnode.New(&swarmnode.Config{
  85. Hostname: n.cluster.config.Name,
  86. ForceNewCluster: conf.forceNewCluster,
  87. ListenControlAPI: control,
  88. ListenRemoteAPI: conf.ListenAddr,
  89. AdvertiseRemoteAPI: conf.AdvertiseAddr,
  90. JoinAddr: conf.joinAddr,
  91. StateDir: n.cluster.root,
  92. JoinToken: conf.joinToken,
  93. Executor: container.NewExecutor(n.cluster.config.Backend),
  94. HeartbeatTick: 1,
  95. ElectionTick: 3,
  96. UnlockKey: conf.lockKey,
  97. AutoLockManagers: conf.autolock,
  98. })
  99. if err != nil {
  100. return err
  101. }
  102. if err := node.Start(context.Background()); err != nil {
  103. return err
  104. }
  105. n.done = make(chan struct{})
  106. n.ready = make(chan struct{})
  107. n.swarmNode = node
  108. n.config = conf
  109. savePersistentState(n.cluster.root, conf)
  110. ctx, cancel := context.WithCancel(context.Background())
  111. go func() {
  112. n.handleNodeExit(node)
  113. cancel()
  114. }()
  115. go n.handleReadyEvent(ctx, node, n.ready)
  116. go n.handleControlSocketChange(ctx, node)
  117. return nil
  118. }
  119. func (n *nodeRunner) handleControlSocketChange(ctx context.Context, node *swarmnode.Node) {
  120. for conn := range node.ListenControlSocket(ctx) {
  121. n.mu.Lock()
  122. if n.grpcConn != conn {
  123. if conn == nil {
  124. n.controlClient = nil
  125. n.logsClient = nil
  126. } else {
  127. n.controlClient = swarmapi.NewControlClient(conn)
  128. n.logsClient = swarmapi.NewLogsClient(conn)
  129. }
  130. }
  131. n.grpcConn = conn
  132. n.mu.Unlock()
  133. n.cluster.configEvent <- struct{}{}
  134. }
  135. }
  136. func (n *nodeRunner) handleReadyEvent(ctx context.Context, node *swarmnode.Node, ready chan struct{}) {
  137. select {
  138. case <-node.Ready():
  139. n.mu.Lock()
  140. n.err = nil
  141. n.mu.Unlock()
  142. close(ready)
  143. case <-ctx.Done():
  144. }
  145. n.cluster.configEvent <- struct{}{}
  146. }
  147. func (n *nodeRunner) handleNodeExit(node *swarmnode.Node) {
  148. err := detectLockedError(node.Err(context.Background()))
  149. if err != nil {
  150. logrus.Errorf("cluster exited with error: %v", err)
  151. }
  152. n.mu.Lock()
  153. n.swarmNode = nil
  154. n.err = err
  155. close(n.done)
  156. select {
  157. case <-n.ready:
  158. n.enableReconnectWatcher()
  159. default:
  160. if n.repeatedRun {
  161. n.enableReconnectWatcher()
  162. }
  163. }
  164. n.repeatedRun = true
  165. n.mu.Unlock()
  166. }
  167. // Stop stops the current swarm node if it is running.
  168. func (n *nodeRunner) Stop() error {
  169. n.mu.Lock()
  170. if n.cancelReconnect != nil { // between restarts
  171. n.cancelReconnect()
  172. n.cancelReconnect = nil
  173. }
  174. if n.swarmNode == nil {
  175. n.mu.Unlock()
  176. return nil
  177. }
  178. n.stopping = true
  179. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
  180. defer cancel()
  181. if err := n.swarmNode.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
  182. n.mu.Unlock()
  183. return err
  184. }
  185. n.mu.Unlock()
  186. <-n.done
  187. return nil
  188. }
  189. func (n *nodeRunner) State() nodeState {
  190. if n == nil {
  191. return nodeState{status: types.LocalNodeStateInactive}
  192. }
  193. n.mu.RLock()
  194. defer n.mu.RUnlock()
  195. ns := n.nodeState
  196. if ns.err != nil || n.cancelReconnect != nil {
  197. if errors.Cause(ns.err) == ErrSwarmLocked {
  198. ns.status = types.LocalNodeStateLocked
  199. } else {
  200. ns.status = types.LocalNodeStateError
  201. }
  202. } else {
  203. select {
  204. case <-n.ready:
  205. ns.status = types.LocalNodeStateActive
  206. default:
  207. ns.status = types.LocalNodeStatePending
  208. }
  209. }
  210. return ns
  211. }
  212. func (n *nodeRunner) enableReconnectWatcher() {
  213. if n.stopping {
  214. return
  215. }
  216. n.reconnectDelay *= 2
  217. if n.reconnectDelay > maxReconnectDelay {
  218. n.reconnectDelay = maxReconnectDelay
  219. }
  220. logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
  221. delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
  222. n.cancelReconnect = cancel
  223. config := n.config
  224. go func() {
  225. <-delayCtx.Done()
  226. if delayCtx.Err() != context.DeadlineExceeded {
  227. return
  228. }
  229. n.mu.Lock()
  230. defer n.mu.Unlock()
  231. if n.stopping {
  232. return
  233. }
  234. config.RemoteAddr = n.cluster.getRemoteAddress()
  235. config.joinAddr = config.RemoteAddr
  236. if err := n.start(config); err != nil {
  237. n.err = err
  238. }
  239. }()
  240. }
  241. // nodeState represents information about the current state of the cluster and
  242. // provides access to the grpc clients.
  243. type nodeState struct {
  244. swarmNode *swarmnode.Node
  245. grpcConn *grpc.ClientConn
  246. controlClient swarmapi.ControlClient
  247. logsClient swarmapi.LogsClient
  248. status types.LocalNodeState
  249. actualLocalAddr string
  250. err error
  251. }
  252. // IsActiveManager returns true if node is a manager ready to accept control requests. It is safe to access the client properties if this returns true.
  253. func (ns nodeState) IsActiveManager() bool {
  254. return ns.controlClient != nil
  255. }
  256. // IsManager returns true if node is a manager.
  257. func (ns nodeState) IsManager() bool {
  258. return ns.swarmNode != nil && ns.swarmNode.Manager() != nil
  259. }
  260. // NodeID returns node's ID or empty string if node is inactive.
  261. func (ns nodeState) NodeID() string {
  262. if ns.swarmNode != nil {
  263. return ns.swarmNode.NodeID()
  264. }
  265. return ""
  266. }