manager.go 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255
  1. package manager
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "net"
  7. "os"
  8. "path/filepath"
  9. "runtime"
  10. "sync"
  11. "syscall"
  12. "time"
  13. "github.com/docker/docker/pkg/plugingetter"
  14. "github.com/docker/go-events"
  15. gmetrics "github.com/docker/go-metrics"
  16. "github.com/docker/swarmkit/api"
  17. "github.com/docker/swarmkit/ca"
  18. "github.com/docker/swarmkit/connectionbroker"
  19. "github.com/docker/swarmkit/identity"
  20. "github.com/docker/swarmkit/log"
  21. "github.com/docker/swarmkit/manager/allocator"
  22. "github.com/docker/swarmkit/manager/allocator/cnmallocator"
  23. "github.com/docker/swarmkit/manager/allocator/networkallocator"
  24. "github.com/docker/swarmkit/manager/controlapi"
  25. "github.com/docker/swarmkit/manager/dispatcher"
  26. "github.com/docker/swarmkit/manager/drivers"
  27. "github.com/docker/swarmkit/manager/health"
  28. "github.com/docker/swarmkit/manager/keymanager"
  29. "github.com/docker/swarmkit/manager/logbroker"
  30. "github.com/docker/swarmkit/manager/metrics"
  31. "github.com/docker/swarmkit/manager/orchestrator/constraintenforcer"
  32. "github.com/docker/swarmkit/manager/orchestrator/global"
  33. "github.com/docker/swarmkit/manager/orchestrator/replicated"
  34. "github.com/docker/swarmkit/manager/orchestrator/taskreaper"
  35. "github.com/docker/swarmkit/manager/resourceapi"
  36. "github.com/docker/swarmkit/manager/scheduler"
  37. "github.com/docker/swarmkit/manager/state/raft"
  38. "github.com/docker/swarmkit/manager/state/raft/transport"
  39. "github.com/docker/swarmkit/manager/state/store"
  40. "github.com/docker/swarmkit/manager/watchapi"
  41. "github.com/docker/swarmkit/remotes"
  42. "github.com/docker/swarmkit/xnet"
  43. gogotypes "github.com/gogo/protobuf/types"
  44. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  45. "github.com/pkg/errors"
  46. "github.com/sirupsen/logrus"
  47. "google.golang.org/grpc"
  48. "google.golang.org/grpc/credentials"
  49. )
  50. const (
  51. // defaultTaskHistoryRetentionLimit is the number of tasks to keep.
  52. defaultTaskHistoryRetentionLimit = 5
  53. )
  54. // RemoteAddrs provides a listening address and an optional advertise address
  55. // for serving the remote API.
  56. type RemoteAddrs struct {
  57. // Address to bind
  58. ListenAddr string
  59. // Address to advertise to remote nodes (optional).
  60. AdvertiseAddr string
  61. }
  62. // Config is used to tune the Manager.
  63. type Config struct {
  64. SecurityConfig *ca.SecurityConfig
  65. // RootCAPaths is the path to which new root certs should be save
  66. RootCAPaths ca.CertPaths
  67. // ExternalCAs is a list of initial CAs to which a manager node
  68. // will make certificate signing requests for node certificates.
  69. ExternalCAs []*api.ExternalCA
  70. // ControlAPI is an address for serving the control API.
  71. ControlAPI string
  72. // RemoteAPI is a listening address for serving the remote API, and
  73. // an optional advertise address.
  74. RemoteAPI *RemoteAddrs
  75. // JoinRaft is an optional address of a node in an existing raft
  76. // cluster to join.
  77. JoinRaft string
  78. // ForceJoin causes us to invoke raft's Join RPC even if already part
  79. // of a cluster.
  80. ForceJoin bool
  81. // StateDir is the top-level state directory
  82. StateDir string
  83. // ForceNewCluster defines if we have to force a new cluster
  84. // because we are recovering from a backup data directory.
  85. ForceNewCluster bool
  86. // ElectionTick defines the amount of ticks needed without
  87. // leader to trigger a new election
  88. ElectionTick uint32
  89. // HeartbeatTick defines the amount of ticks between each
  90. // heartbeat sent to other members for health-check purposes
  91. HeartbeatTick uint32
  92. // AutoLockManagers determines whether or not managers require an unlock key
  93. // when starting from a stopped state. This configuration parameter is only
  94. // applicable when bootstrapping a new cluster for the first time.
  95. AutoLockManagers bool
  96. // UnlockKey is the key to unlock a node - used for decrypting manager TLS keys
  97. // as well as the raft data encryption key (DEK). It is applicable when
  98. // bootstrapping a cluster for the first time (it's a cluster-wide setting),
  99. // and also when loading up any raft data on disk (as a KEK for the raft DEK).
  100. UnlockKey []byte
  101. // Availability allows a user to control the current scheduling status of a node
  102. Availability api.NodeSpec_Availability
  103. // PluginGetter provides access to docker's plugin inventory.
  104. PluginGetter plugingetter.PluginGetter
  105. // FIPS is a boolean stating whether the node is FIPS enabled - if this is the
  106. // first node in the cluster, this setting is used to set the cluster-wide mandatory
  107. // FIPS setting.
  108. FIPS bool
  109. // NetworkConfig stores network related config for the cluster
  110. NetworkConfig *cnmallocator.NetworkConfig
  111. }
  112. // Manager is the cluster manager for Swarm.
  113. // This is the high-level object holding and initializing all the manager
  114. // subsystems.
  115. type Manager struct {
  116. config Config
  117. collector *metrics.Collector
  118. caserver *ca.Server
  119. dispatcher *dispatcher.Dispatcher
  120. logbroker *logbroker.LogBroker
  121. watchServer *watchapi.Server
  122. replicatedOrchestrator *replicated.Orchestrator
  123. globalOrchestrator *global.Orchestrator
  124. taskReaper *taskreaper.TaskReaper
  125. constraintEnforcer *constraintenforcer.ConstraintEnforcer
  126. scheduler *scheduler.Scheduler
  127. allocator *allocator.Allocator
  128. keyManager *keymanager.KeyManager
  129. server *grpc.Server
  130. localserver *grpc.Server
  131. raftNode *raft.Node
  132. dekRotator *RaftDEKManager
  133. roleManager *roleManager
  134. cancelFunc context.CancelFunc
  135. // mu is a general mutex used to coordinate starting/stopping and
  136. // leadership events.
  137. mu sync.Mutex
  138. // addrMu is a mutex that protects config.ControlAPI and config.RemoteAPI
  139. addrMu sync.Mutex
  140. started chan struct{}
  141. stopped bool
  142. remoteListener chan net.Listener
  143. controlListener chan net.Listener
  144. errServe chan error
  145. }
  146. var (
  147. leaderMetric gmetrics.Gauge
  148. )
  149. func init() {
  150. ns := gmetrics.NewNamespace("swarm", "manager", nil)
  151. leaderMetric = ns.NewGauge("leader", "Indicates if this manager node is a leader", "")
  152. gmetrics.Register(ns)
  153. }
  154. type closeOnceListener struct {
  155. once sync.Once
  156. net.Listener
  157. }
  158. func (l *closeOnceListener) Close() error {
  159. var err error
  160. l.once.Do(func() {
  161. err = l.Listener.Close()
  162. })
  163. return err
  164. }
  165. // New creates a Manager which has not started to accept requests yet.
  166. func New(config *Config) (*Manager, error) {
  167. err := os.MkdirAll(config.StateDir, 0700)
  168. if err != nil {
  169. return nil, errors.Wrap(err, "failed to create state directory")
  170. }
  171. raftStateDir := filepath.Join(config.StateDir, "raft")
  172. err = os.MkdirAll(raftStateDir, 0700)
  173. if err != nil {
  174. return nil, errors.Wrap(err, "failed to create raft state directory")
  175. }
  176. raftCfg := raft.DefaultNodeConfig()
  177. if config.ElectionTick > 0 {
  178. raftCfg.ElectionTick = int(config.ElectionTick)
  179. }
  180. if config.HeartbeatTick > 0 {
  181. raftCfg.HeartbeatTick = int(config.HeartbeatTick)
  182. }
  183. dekRotator, err := NewRaftDEKManager(config.SecurityConfig.KeyWriter(), config.FIPS)
  184. if err != nil {
  185. return nil, err
  186. }
  187. newNodeOpts := raft.NodeOptions{
  188. ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
  189. JoinAddr: config.JoinRaft,
  190. ForceJoin: config.ForceJoin,
  191. Config: raftCfg,
  192. StateDir: raftStateDir,
  193. ForceNewCluster: config.ForceNewCluster,
  194. TLSCredentials: config.SecurityConfig.ClientTLSCreds,
  195. KeyRotator: dekRotator,
  196. FIPS: config.FIPS,
  197. }
  198. raftNode := raft.NewNode(newNodeOpts)
  199. // the interceptorWrappers are functions that wrap the prometheus grpc
  200. // interceptor, and add some of code to log errors locally. one for stream
  201. // and one for unary. this is needed because the grpc unary interceptor
  202. // doesn't natively do chaining, you have to implement it in the caller.
  203. // note that even though these are logging errors, we're still using
  204. // debug level. returning errors from GRPC methods is common and expected,
  205. // and logging an ERROR every time a user mistypes a service name would
  206. // pollute the logs really fast.
  207. //
  208. // NOTE(dperny): Because of the fact that these functions are very simple
  209. // in their operation and have no side effects other than the log output,
  210. // they are not automatically tested. If you modify them later, make _sure_
  211. // that they are correct. If you add substantial side effects, abstract
  212. // these out and test them!
  213. unaryInterceptorWrapper := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  214. // pass the call down into the grpc_prometheus interceptor
  215. resp, err := grpc_prometheus.UnaryServerInterceptor(ctx, req, info, handler)
  216. if err != nil {
  217. log.G(ctx).WithField("rpc", info.FullMethod).WithError(err).Debug("error handling rpc")
  218. }
  219. return resp, err
  220. }
  221. streamInterceptorWrapper := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  222. // we can't re-write a stream context, so don't bother creating a
  223. // sub-context like in unary methods
  224. // pass the call down into the grpc_prometheus interceptor
  225. err := grpc_prometheus.StreamServerInterceptor(srv, ss, info, handler)
  226. if err != nil {
  227. log.G(ss.Context()).WithField("rpc", info.FullMethod).WithError(err).Debug("error handling streaming rpc")
  228. }
  229. return err
  230. }
  231. opts := []grpc.ServerOption{
  232. grpc.Creds(config.SecurityConfig.ServerTLSCreds),
  233. grpc.StreamInterceptor(streamInterceptorWrapper),
  234. grpc.UnaryInterceptor(unaryInterceptorWrapper),
  235. grpc.MaxRecvMsgSize(transport.GRPCMaxMsgSize),
  236. }
  237. m := &Manager{
  238. config: *config,
  239. caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig),
  240. dispatcher: dispatcher.New(),
  241. logbroker: logbroker.New(raftNode.MemoryStore()),
  242. watchServer: watchapi.NewServer(raftNode.MemoryStore()),
  243. server: grpc.NewServer(opts...),
  244. localserver: grpc.NewServer(opts...),
  245. raftNode: raftNode,
  246. started: make(chan struct{}),
  247. dekRotator: dekRotator,
  248. remoteListener: make(chan net.Listener, 1),
  249. controlListener: make(chan net.Listener, 1),
  250. errServe: make(chan error, 2),
  251. }
  252. if config.ControlAPI != "" {
  253. m.config.ControlAPI = ""
  254. if err := m.BindControl(config.ControlAPI); err != nil {
  255. return nil, err
  256. }
  257. }
  258. if config.RemoteAPI != nil {
  259. m.config.RemoteAPI = nil
  260. // The context isn't used in this case (before (*Manager).Run).
  261. if err := m.BindRemote(context.Background(), *config.RemoteAPI); err != nil {
  262. if config.ControlAPI != "" {
  263. l := <-m.controlListener
  264. l.Close()
  265. }
  266. return nil, err
  267. }
  268. }
  269. return m, nil
  270. }
  271. // BindControl binds a local socket for the control API.
  272. func (m *Manager) BindControl(addr string) error {
  273. m.addrMu.Lock()
  274. defer m.addrMu.Unlock()
  275. if m.config.ControlAPI != "" {
  276. return errors.New("manager already has a control API address")
  277. }
  278. // don't create a socket directory if we're on windows. we used named pipe
  279. if runtime.GOOS != "windows" {
  280. err := os.MkdirAll(filepath.Dir(addr), 0700)
  281. if err != nil {
  282. return errors.Wrap(err, "failed to create socket directory")
  283. }
  284. }
  285. l, err := xnet.ListenLocal(addr)
  286. // A unix socket may fail to bind if the file already
  287. // exists. Try replacing the file.
  288. if runtime.GOOS != "windows" {
  289. unwrappedErr := err
  290. if op, ok := unwrappedErr.(*net.OpError); ok {
  291. unwrappedErr = op.Err
  292. }
  293. if sys, ok := unwrappedErr.(*os.SyscallError); ok {
  294. unwrappedErr = sys.Err
  295. }
  296. if unwrappedErr == syscall.EADDRINUSE {
  297. os.Remove(addr)
  298. l, err = xnet.ListenLocal(addr)
  299. }
  300. }
  301. if err != nil {
  302. return errors.Wrap(err, "failed to listen on control API address")
  303. }
  304. m.config.ControlAPI = addr
  305. m.controlListener <- l
  306. return nil
  307. }
  308. // BindRemote binds a port for the remote API.
  309. func (m *Manager) BindRemote(ctx context.Context, addrs RemoteAddrs) error {
  310. m.addrMu.Lock()
  311. defer m.addrMu.Unlock()
  312. if m.config.RemoteAPI != nil {
  313. return errors.New("manager already has remote API address")
  314. }
  315. // If an AdvertiseAddr was specified, we use that as our
  316. // externally-reachable address.
  317. advertiseAddr := addrs.AdvertiseAddr
  318. var advertiseAddrPort string
  319. if advertiseAddr == "" {
  320. // Otherwise, we know we are joining an existing swarm. Use a
  321. // wildcard address to trigger remote autodetection of our
  322. // address.
  323. var err error
  324. _, advertiseAddrPort, err = net.SplitHostPort(addrs.ListenAddr)
  325. if err != nil {
  326. return fmt.Errorf("missing or invalid listen address %s", addrs.ListenAddr)
  327. }
  328. // Even with an IPv6 listening address, it's okay to use
  329. // 0.0.0.0 here. Any "unspecified" (wildcard) IP will
  330. // be substituted with the actual source address.
  331. advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort)
  332. }
  333. l, err := net.Listen("tcp", addrs.ListenAddr)
  334. if err != nil {
  335. return errors.Wrap(err, "failed to listen on remote API address")
  336. }
  337. if advertiseAddrPort == "0" {
  338. advertiseAddr = l.Addr().String()
  339. addrs.ListenAddr = advertiseAddr
  340. }
  341. m.config.RemoteAPI = &addrs
  342. m.raftNode.SetAddr(ctx, advertiseAddr)
  343. m.remoteListener <- l
  344. return nil
  345. }
  346. // RemovedFromRaft returns a channel that's closed if the manager is removed
  347. // from the raft cluster. This should be used to trigger a manager shutdown.
  348. func (m *Manager) RemovedFromRaft() <-chan struct{} {
  349. return m.raftNode.RemovedFromRaft
  350. }
  351. // Addr returns tcp address on which remote api listens.
  352. func (m *Manager) Addr() string {
  353. m.addrMu.Lock()
  354. defer m.addrMu.Unlock()
  355. if m.config.RemoteAPI == nil {
  356. return ""
  357. }
  358. return m.config.RemoteAPI.ListenAddr
  359. }
  360. // Run starts all manager sub-systems and the gRPC server at the configured
  361. // address.
  362. // The call never returns unless an error occurs or `Stop()` is called.
  363. func (m *Manager) Run(parent context.Context) error {
  364. ctx, ctxCancel := context.WithCancel(parent)
  365. defer ctxCancel()
  366. m.cancelFunc = ctxCancel
  367. leadershipCh, cancel := m.raftNode.SubscribeLeadership()
  368. defer cancel()
  369. go m.handleLeadershipEvents(ctx, leadershipCh)
  370. authorize := func(ctx context.Context, roles []string) error {
  371. var (
  372. blacklistedCerts map[string]*api.BlacklistedCertificate
  373. clusters []*api.Cluster
  374. err error
  375. )
  376. m.raftNode.MemoryStore().View(func(readTx store.ReadTx) {
  377. clusters, err = store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
  378. })
  379. // Not having a cluster object yet means we can't check
  380. // the blacklist.
  381. if err == nil && len(clusters) == 1 {
  382. blacklistedCerts = clusters[0].BlacklistedCertificates
  383. }
  384. // Authorize the remote roles, ensure they can only be forwarded by managers
  385. _, err = ca.AuthorizeForwardedRoleAndOrg(ctx, roles, []string{ca.ManagerRole}, m.config.SecurityConfig.ClientTLSCreds.Organization(), blacklistedCerts)
  386. return err
  387. }
  388. baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.config.PluginGetter, drivers.New(m.config.PluginGetter))
  389. baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore())
  390. healthServer := health.NewHealthServer()
  391. localHealthServer := health.NewHealthServer()
  392. authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
  393. authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(m.watchServer, authorize)
  394. authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
  395. authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize)
  396. authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize)
  397. authenticatedDispatcherAPI := api.NewAuthenticatedWrapperDispatcherServer(m.dispatcher, authorize)
  398. authenticatedCAAPI := api.NewAuthenticatedWrapperCAServer(m.caserver, authorize)
  399. authenticatedNodeCAAPI := api.NewAuthenticatedWrapperNodeCAServer(m.caserver, authorize)
  400. authenticatedRaftAPI := api.NewAuthenticatedWrapperRaftServer(m.raftNode, authorize)
  401. authenticatedHealthAPI := api.NewAuthenticatedWrapperHealthServer(healthServer, authorize)
  402. authenticatedRaftMembershipAPI := api.NewAuthenticatedWrapperRaftMembershipServer(m.raftNode, authorize)
  403. proxyDispatcherAPI := api.NewRaftProxyDispatcherServer(authenticatedDispatcherAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  404. proxyCAAPI := api.NewRaftProxyCAServer(authenticatedCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  405. proxyNodeCAAPI := api.NewRaftProxyNodeCAServer(authenticatedNodeCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  406. proxyRaftMembershipAPI := api.NewRaftProxyRaftMembershipServer(authenticatedRaftMembershipAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  407. proxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(authenticatedResourceAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  408. proxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(authenticatedLogBrokerAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  409. // The following local proxies are only wired up to receive requests
  410. // from a trusted local socket, and these requests don't use TLS,
  411. // therefore the requests they handle locally should bypass
  412. // authorization. When requests are proxied from these servers, they
  413. // are sent as requests from this manager rather than forwarded
  414. // requests (it has no TLS information to put in the metadata map).
  415. forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
  416. handleRequestLocally := func(ctx context.Context) (context.Context, error) {
  417. remoteAddr := "127.0.0.1:0"
  418. m.addrMu.Lock()
  419. if m.config.RemoteAPI != nil {
  420. if m.config.RemoteAPI.AdvertiseAddr != "" {
  421. remoteAddr = m.config.RemoteAPI.AdvertiseAddr
  422. } else {
  423. remoteAddr = m.config.RemoteAPI.ListenAddr
  424. }
  425. }
  426. m.addrMu.Unlock()
  427. creds := m.config.SecurityConfig.ClientTLSCreds
  428. nodeInfo := ca.RemoteNodeInfo{
  429. Roles: []string{creds.Role()},
  430. Organization: creds.Organization(),
  431. NodeID: creds.NodeID(),
  432. RemoteAddr: remoteAddr,
  433. }
  434. return context.WithValue(ctx, ca.LocalRequestKey, nodeInfo), nil
  435. }
  436. localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  437. localProxyLogsAPI := api.NewRaftProxyLogsServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  438. localProxyDispatcherAPI := api.NewRaftProxyDispatcherServer(m.dispatcher, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  439. localProxyCAAPI := api.NewRaftProxyCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  440. localProxyNodeCAAPI := api.NewRaftProxyNodeCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  441. localProxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(baseResourceAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  442. localProxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  443. // Everything registered on m.server should be an authenticated
  444. // wrapper, or a proxy wrapping an authenticated wrapper!
  445. api.RegisterCAServer(m.server, proxyCAAPI)
  446. api.RegisterNodeCAServer(m.server, proxyNodeCAAPI)
  447. api.RegisterRaftServer(m.server, authenticatedRaftAPI)
  448. api.RegisterHealthServer(m.server, authenticatedHealthAPI)
  449. api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI)
  450. api.RegisterControlServer(m.server, authenticatedControlAPI)
  451. api.RegisterWatchServer(m.server, authenticatedWatchAPI)
  452. api.RegisterLogsServer(m.server, authenticatedLogsServerAPI)
  453. api.RegisterLogBrokerServer(m.server, proxyLogBrokerAPI)
  454. api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI)
  455. api.RegisterDispatcherServer(m.server, proxyDispatcherAPI)
  456. grpc_prometheus.Register(m.server)
  457. api.RegisterControlServer(m.localserver, localProxyControlAPI)
  458. api.RegisterWatchServer(m.localserver, m.watchServer)
  459. api.RegisterLogsServer(m.localserver, localProxyLogsAPI)
  460. api.RegisterHealthServer(m.localserver, localHealthServer)
  461. api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI)
  462. api.RegisterCAServer(m.localserver, localProxyCAAPI)
  463. api.RegisterNodeCAServer(m.localserver, localProxyNodeCAAPI)
  464. api.RegisterResourceAllocatorServer(m.localserver, localProxyResourceAPI)
  465. api.RegisterLogBrokerServer(m.localserver, localProxyLogBrokerAPI)
  466. grpc_prometheus.Register(m.localserver)
  467. healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING)
  468. localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)
  469. if err := m.watchServer.Start(ctx); err != nil {
  470. log.G(ctx).WithError(err).Error("watch server failed to start")
  471. }
  472. go m.serveListener(ctx, m.remoteListener)
  473. go m.serveListener(ctx, m.controlListener)
  474. defer func() {
  475. m.server.Stop()
  476. m.localserver.Stop()
  477. }()
  478. // Set the raft server as serving for the health server
  479. healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING)
  480. if err := m.raftNode.JoinAndStart(ctx); err != nil {
  481. // Don't block future calls to Stop.
  482. close(m.started)
  483. return errors.Wrap(err, "can't initialize raft node")
  484. }
  485. localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING)
  486. // Start metrics collection.
  487. m.collector = metrics.NewCollector(m.raftNode.MemoryStore())
  488. go func(collector *metrics.Collector) {
  489. if err := collector.Run(ctx); err != nil {
  490. log.G(ctx).WithError(err).Error("collector failed with an error")
  491. }
  492. }(m.collector)
  493. close(m.started)
  494. go func() {
  495. err := m.raftNode.Run(ctx)
  496. if err != nil {
  497. log.G(ctx).WithError(err).Error("raft node stopped")
  498. m.Stop(ctx, false)
  499. }
  500. }()
  501. if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
  502. return err
  503. }
  504. c, err := raft.WaitForCluster(ctx, m.raftNode)
  505. if err != nil {
  506. return err
  507. }
  508. raftConfig := c.Spec.Raft
  509. if err := m.watchForClusterChanges(ctx); err != nil {
  510. return err
  511. }
  512. if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick {
  513. log.G(ctx).Warningf("election tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.ElectionTick, raftConfig.ElectionTick)
  514. }
  515. if int(raftConfig.HeartbeatTick) != m.raftNode.Config.HeartbeatTick {
  516. log.G(ctx).Warningf("heartbeat tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.HeartbeatTick, raftConfig.HeartbeatTick)
  517. }
  518. // wait for an error in serving.
  519. err = <-m.errServe
  520. m.mu.Lock()
  521. if m.stopped {
  522. m.mu.Unlock()
  523. return nil
  524. }
  525. m.mu.Unlock()
  526. m.Stop(ctx, false)
  527. return err
  528. }
  529. const stopTimeout = 8 * time.Second
  530. // Stop stops the manager. It immediately closes all open connections and
  531. // active RPCs as well as stopping the manager's subsystems. If clearData is
  532. // set, the raft logs, snapshots, and keys will be erased.
  533. func (m *Manager) Stop(ctx context.Context, clearData bool) {
  534. log.G(ctx).Info("Stopping manager")
  535. // It's not safe to start shutting down while the manager is still
  536. // starting up.
  537. <-m.started
  538. // the mutex stops us from trying to stop while we're already stopping, or
  539. // from returning before we've finished stopping.
  540. m.mu.Lock()
  541. defer m.mu.Unlock()
  542. if m.stopped {
  543. return
  544. }
  545. m.stopped = true
  546. srvDone, localSrvDone := make(chan struct{}), make(chan struct{})
  547. go func() {
  548. m.server.GracefulStop()
  549. close(srvDone)
  550. }()
  551. go func() {
  552. m.localserver.GracefulStop()
  553. close(localSrvDone)
  554. }()
  555. m.raftNode.Cancel()
  556. if m.collector != nil {
  557. m.collector.Stop()
  558. }
  559. // The following components are gRPC services that are
  560. // registered when creating the manager and will need
  561. // to be re-registered if they are recreated.
  562. // For simplicity, they are not nilled out.
  563. m.dispatcher.Stop()
  564. m.logbroker.Stop()
  565. m.watchServer.Stop()
  566. m.caserver.Stop()
  567. if m.allocator != nil {
  568. m.allocator.Stop()
  569. }
  570. if m.replicatedOrchestrator != nil {
  571. m.replicatedOrchestrator.Stop()
  572. }
  573. if m.globalOrchestrator != nil {
  574. m.globalOrchestrator.Stop()
  575. }
  576. if m.taskReaper != nil {
  577. m.taskReaper.Stop()
  578. }
  579. if m.constraintEnforcer != nil {
  580. m.constraintEnforcer.Stop()
  581. }
  582. if m.scheduler != nil {
  583. m.scheduler.Stop()
  584. }
  585. if m.roleManager != nil {
  586. m.roleManager.Stop()
  587. }
  588. if m.keyManager != nil {
  589. m.keyManager.Stop()
  590. }
  591. if clearData {
  592. m.raftNode.ClearData()
  593. }
  594. m.cancelFunc()
  595. <-m.raftNode.Done()
  596. timer := time.AfterFunc(stopTimeout, func() {
  597. m.server.Stop()
  598. m.localserver.Stop()
  599. })
  600. defer timer.Stop()
  601. // TODO: we're not waiting on ctx because it very well could be passed from Run,
  602. // which is already cancelled here. We need to refactor that.
  603. select {
  604. case <-srvDone:
  605. <-localSrvDone
  606. case <-localSrvDone:
  607. <-srvDone
  608. }
  609. log.G(ctx).Info("Manager shut down")
  610. // mutex is released and Run can return now
  611. }
  612. func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
  613. securityConfig := m.config.SecurityConfig
  614. nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
  615. logger := log.G(ctx).WithFields(logrus.Fields{
  616. "node.id": nodeID,
  617. "node.role": ca.ManagerRole,
  618. })
  619. kekData := ca.KEKData{Version: cluster.Meta.Version.Index}
  620. for _, encryptionKey := range cluster.UnlockKeys {
  621. if encryptionKey.Subsystem == ca.ManagerRole {
  622. kekData.KEK = encryptionKey.Key
  623. break
  624. }
  625. }
  626. updated, unlockedToLocked, err := m.dekRotator.MaybeUpdateKEK(kekData)
  627. if err != nil {
  628. logger.WithError(err).Errorf("failed to re-encrypt TLS key with a new KEK")
  629. return err
  630. }
  631. if updated {
  632. logger.Debug("successfully rotated KEK")
  633. }
  634. if unlockedToLocked {
  635. // a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews;
  636. // don't wait because it might take a bit
  637. go func() {
  638. insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
  639. conn, err := grpc.Dial(
  640. m.config.ControlAPI,
  641. grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
  642. grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
  643. grpc.WithTransportCredentials(insecureCreds),
  644. grpc.WithDialer(
  645. func(addr string, timeout time.Duration) (net.Conn, error) {
  646. return xnet.DialTimeoutLocal(addr, timeout)
  647. }),
  648. )
  649. if err != nil {
  650. logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster")
  651. return
  652. }
  653. defer conn.Close()
  654. connBroker := connectionbroker.New(remotes.NewRemotes())
  655. connBroker.SetLocalConn(conn)
  656. if err := ca.RenewTLSConfigNow(ctx, securityConfig, connBroker, m.config.RootCAPaths); err != nil {
  657. logger.WithError(err).Error("failed to download new TLS certificate after locking the cluster")
  658. }
  659. }()
  660. }
  661. return nil
  662. }
  663. func (m *Manager) watchForClusterChanges(ctx context.Context) error {
  664. clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
  665. var cluster *api.Cluster
  666. clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(),
  667. func(tx store.ReadTx) error {
  668. cluster = store.GetCluster(tx, clusterID)
  669. if cluster == nil {
  670. return fmt.Errorf("unable to get current cluster")
  671. }
  672. return nil
  673. },
  674. api.EventUpdateCluster{
  675. Cluster: &api.Cluster{ID: clusterID},
  676. Checks: []api.ClusterCheckFunc{api.ClusterCheckID},
  677. },
  678. )
  679. if err != nil {
  680. return err
  681. }
  682. if err := m.updateKEK(ctx, cluster); err != nil {
  683. return err
  684. }
  685. go func() {
  686. for {
  687. select {
  688. case event := <-clusterWatch:
  689. clusterEvent := event.(api.EventUpdateCluster)
  690. m.updateKEK(ctx, clusterEvent.Cluster)
  691. case <-ctx.Done():
  692. clusterWatchCancel()
  693. return
  694. }
  695. }
  696. }()
  697. return nil
  698. }
  699. // getLeaderNodeID is a small helper function returning a string with the
  700. // leader's node ID. it is only used for logging, and should not be relied on
  701. // to give a node ID for actual operational purposes (because it returns errors
  702. // as nicely decorated strings)
  703. func (m *Manager) getLeaderNodeID() string {
  704. // get the current leader ID. this variable tracks the leader *only* for
  705. // the purposes of logging leadership changes, and should not be relied on
  706. // for other purposes
  707. leader, leaderErr := m.raftNode.Leader()
  708. switch leaderErr {
  709. case raft.ErrNoRaftMember:
  710. // this is an unlikely case, but we have to handle it. this means this
  711. // node is not a member of the raft quorum. this won't look very pretty
  712. // in logs ("leadership changed from aslkdjfa to ErrNoRaftMember") but
  713. // it also won't be very common
  714. return "not yet part of a raft cluster"
  715. case raft.ErrNoClusterLeader:
  716. return "no cluster leader"
  717. default:
  718. id, err := m.raftNode.GetNodeIDByRaftID(leader)
  719. // the only possible error here is "ErrMemberUnknown"
  720. if err != nil {
  721. return "an unknown node"
  722. }
  723. return id
  724. }
  725. }
  726. // handleLeadershipEvents handles the is leader event or is follower event.
  727. func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan events.Event) {
  728. // get the current leader and save it for logging leadership changes in
  729. // this loop
  730. oldLeader := m.getLeaderNodeID()
  731. for {
  732. select {
  733. case leadershipEvent := <-leadershipCh:
  734. m.mu.Lock()
  735. if m.stopped {
  736. m.mu.Unlock()
  737. return
  738. }
  739. newState := leadershipEvent.(raft.LeadershipState)
  740. if newState == raft.IsLeader {
  741. m.becomeLeader(ctx)
  742. leaderMetric.Set(1)
  743. } else if newState == raft.IsFollower {
  744. m.becomeFollower()
  745. leaderMetric.Set(0)
  746. }
  747. m.mu.Unlock()
  748. newLeader := m.getLeaderNodeID()
  749. // maybe we should use logrus fields for old and new leader, so
  750. // that users are better able to ingest leadership changes into log
  751. // aggregators?
  752. log.G(ctx).Infof("leadership changed from %v to %v", oldLeader, newLeader)
  753. case <-ctx.Done():
  754. return
  755. }
  756. }
  757. }
  758. // serveListener serves a listener for local and non local connections.
  759. func (m *Manager) serveListener(ctx context.Context, lCh <-chan net.Listener) {
  760. var l net.Listener
  761. select {
  762. case l = <-lCh:
  763. case <-ctx.Done():
  764. return
  765. }
  766. ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
  767. logrus.Fields{
  768. "proto": l.Addr().Network(),
  769. "addr": l.Addr().String(),
  770. }))
  771. if _, ok := l.(*net.TCPListener); !ok {
  772. log.G(ctx).Info("Listening for local connections")
  773. // we need to disallow double closes because UnixListener.Close
  774. // can delete unix-socket file of newer listener. grpc calls
  775. // Close twice indeed: in Serve and in Stop.
  776. m.errServe <- m.localserver.Serve(&closeOnceListener{Listener: l})
  777. } else {
  778. log.G(ctx).Info("Listening for connections")
  779. m.errServe <- m.server.Serve(l)
  780. }
  781. }
  782. // becomeLeader starts the subsystems that are run on the leader.
  783. func (m *Manager) becomeLeader(ctx context.Context) {
  784. s := m.raftNode.MemoryStore()
  785. rootCA := m.config.SecurityConfig.RootCA()
  786. nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
  787. raftCfg := raft.DefaultRaftConfig()
  788. raftCfg.ElectionTick = uint32(m.raftNode.Config.ElectionTick)
  789. raftCfg.HeartbeatTick = uint32(m.raftNode.Config.HeartbeatTick)
  790. clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
  791. initialCAConfig := ca.DefaultCAConfig()
  792. initialCAConfig.ExternalCAs = m.config.ExternalCAs
  793. var (
  794. unlockKeys []*api.EncryptionKey
  795. err error
  796. )
  797. if m.config.AutoLockManagers {
  798. unlockKeys = []*api.EncryptionKey{{
  799. Subsystem: ca.ManagerRole,
  800. Key: m.config.UnlockKey,
  801. }}
  802. }
  803. s.Update(func(tx store.Tx) error {
  804. // Add a default cluster object to the
  805. // store. Don't check the error because
  806. // we expect this to fail unless this
  807. // is a brand new cluster.
  808. clusterObj := defaultClusterObject(
  809. clusterID,
  810. initialCAConfig,
  811. raftCfg,
  812. api.EncryptionConfig{AutoLockManagers: m.config.AutoLockManagers},
  813. unlockKeys,
  814. rootCA,
  815. m.config.FIPS,
  816. nil,
  817. 0,
  818. 0)
  819. // If defaultAddrPool is valid we update cluster object with new value
  820. // If VXLANUDPPort is not 0 then we call update cluster object with new value
  821. if m.config.NetworkConfig != nil {
  822. if m.config.NetworkConfig.DefaultAddrPool != nil {
  823. clusterObj.DefaultAddressPool = m.config.NetworkConfig.DefaultAddrPool
  824. clusterObj.SubnetSize = m.config.NetworkConfig.SubnetSize
  825. }
  826. if m.config.NetworkConfig.VXLANUDPPort != 0 {
  827. clusterObj.VXLANUDPPort = m.config.NetworkConfig.VXLANUDPPort
  828. }
  829. }
  830. err := store.CreateCluster(tx, clusterObj)
  831. if err != nil && err != store.ErrExist {
  832. log.G(ctx).WithError(err).Errorf("error creating cluster object")
  833. }
  834. // Add Node entry for ourself, if one
  835. // doesn't exist already.
  836. freshCluster := nil == store.CreateNode(tx, managerNode(nodeID, m.config.Availability, clusterObj.VXLANUDPPort))
  837. if freshCluster {
  838. // This is a fresh swarm cluster. Add to store now any initial
  839. // cluster resource, like the default ingress network which
  840. // provides the routing mesh for this cluster.
  841. log.G(ctx).Info("Creating default ingress network")
  842. if err := store.CreateNetwork(tx, newIngressNetwork()); err != nil {
  843. log.G(ctx).WithError(err).Error("failed to create default ingress network")
  844. }
  845. }
  846. // Create now the static predefined if the store does not contain predefined
  847. // networks like bridge/host node-local networks which
  848. // are known to be present in each cluster node. This is needed
  849. // in order to allow running services on the predefined docker
  850. // networks like `bridge` and `host`.
  851. for _, p := range allocator.PredefinedNetworks() {
  852. if err := store.CreateNetwork(tx, newPredefinedNetwork(p.Name, p.Driver)); err != nil && err != store.ErrNameConflict {
  853. log.G(ctx).WithError(err).Error("failed to create predefined network " + p.Name)
  854. }
  855. }
  856. return nil
  857. })
  858. m.replicatedOrchestrator = replicated.NewReplicatedOrchestrator(s)
  859. m.constraintEnforcer = constraintenforcer.New(s)
  860. m.globalOrchestrator = global.NewGlobalOrchestrator(s)
  861. m.taskReaper = taskreaper.New(s)
  862. m.scheduler = scheduler.New(s)
  863. m.keyManager = keymanager.New(s, keymanager.DefaultConfig())
  864. m.roleManager = newRoleManager(s, m.raftNode)
  865. // TODO(stevvooe): Allocate a context that can be used to
  866. // shutdown underlying manager processes when leadership isTestUpdaterRollback
  867. // lost.
  868. // If DefaultAddrPool is null, Read from store and check if
  869. // DefaultAddrPool info is stored in cluster object
  870. // If VXLANUDPPort is 0, read it from the store - cluster object
  871. if m.config.NetworkConfig == nil || m.config.NetworkConfig.DefaultAddrPool == nil || m.config.NetworkConfig.VXLANUDPPort == 0 {
  872. var cluster *api.Cluster
  873. s.View(func(tx store.ReadTx) {
  874. cluster = store.GetCluster(tx, clusterID)
  875. })
  876. if cluster.DefaultAddressPool != nil {
  877. if m.config.NetworkConfig == nil {
  878. m.config.NetworkConfig = &cnmallocator.NetworkConfig{}
  879. }
  880. m.config.NetworkConfig.DefaultAddrPool = append(m.config.NetworkConfig.DefaultAddrPool, cluster.DefaultAddressPool...)
  881. m.config.NetworkConfig.SubnetSize = cluster.SubnetSize
  882. }
  883. if cluster.VXLANUDPPort != 0 {
  884. if m.config.NetworkConfig == nil {
  885. m.config.NetworkConfig = &cnmallocator.NetworkConfig{}
  886. }
  887. m.config.NetworkConfig.VXLANUDPPort = cluster.VXLANUDPPort
  888. }
  889. }
  890. m.allocator, err = allocator.New(s, m.config.PluginGetter, m.config.NetworkConfig)
  891. if err != nil {
  892. log.G(ctx).WithError(err).Error("failed to create allocator")
  893. // TODO(stevvooe): It doesn't seem correct here to fail
  894. // creating the allocator but then use it anyway.
  895. }
  896. if m.keyManager != nil {
  897. go func(keyManager *keymanager.KeyManager) {
  898. if err := keyManager.Run(ctx); err != nil {
  899. log.G(ctx).WithError(err).Error("keymanager failed with an error")
  900. }
  901. }(m.keyManager)
  902. }
  903. go func(d *dispatcher.Dispatcher) {
  904. // Initialize the dispatcher.
  905. d.Init(m.raftNode, dispatcher.DefaultConfig(), drivers.New(m.config.PluginGetter), m.config.SecurityConfig)
  906. if err := d.Run(ctx); err != nil {
  907. log.G(ctx).WithError(err).Error("Dispatcher exited with an error")
  908. }
  909. }(m.dispatcher)
  910. if err := m.logbroker.Start(ctx); err != nil {
  911. log.G(ctx).WithError(err).Error("LogBroker failed to start")
  912. }
  913. go func(server *ca.Server) {
  914. if err := server.Run(ctx); err != nil {
  915. log.G(ctx).WithError(err).Error("CA signer exited with an error")
  916. }
  917. }(m.caserver)
  918. // Start all sub-components in separate goroutines.
  919. // TODO(aluzzardi): This should have some kind of error handling so that
  920. // any component that goes down would bring the entire manager down.
  921. if m.allocator != nil {
  922. go func(allocator *allocator.Allocator) {
  923. if err := allocator.Run(ctx); err != nil {
  924. log.G(ctx).WithError(err).Error("allocator exited with an error")
  925. }
  926. }(m.allocator)
  927. }
  928. go func(scheduler *scheduler.Scheduler) {
  929. if err := scheduler.Run(ctx); err != nil {
  930. log.G(ctx).WithError(err).Error("scheduler exited with an error")
  931. }
  932. }(m.scheduler)
  933. go func(constraintEnforcer *constraintenforcer.ConstraintEnforcer) {
  934. constraintEnforcer.Run()
  935. }(m.constraintEnforcer)
  936. go func(taskReaper *taskreaper.TaskReaper) {
  937. taskReaper.Run(ctx)
  938. }(m.taskReaper)
  939. go func(orchestrator *replicated.Orchestrator) {
  940. if err := orchestrator.Run(ctx); err != nil {
  941. log.G(ctx).WithError(err).Error("replicated orchestrator exited with an error")
  942. }
  943. }(m.replicatedOrchestrator)
  944. go func(globalOrchestrator *global.Orchestrator) {
  945. if err := globalOrchestrator.Run(ctx); err != nil {
  946. log.G(ctx).WithError(err).Error("global orchestrator exited with an error")
  947. }
  948. }(m.globalOrchestrator)
  949. go func(roleManager *roleManager) {
  950. roleManager.Run(ctx)
  951. }(m.roleManager)
  952. }
  953. // becomeFollower shuts down the subsystems that are only run by the leader.
  954. func (m *Manager) becomeFollower() {
  955. // The following components are gRPC services that are
  956. // registered when creating the manager and will need
  957. // to be re-registered if they are recreated.
  958. // For simplicity, they are not nilled out.
  959. m.dispatcher.Stop()
  960. m.logbroker.Stop()
  961. m.caserver.Stop()
  962. if m.allocator != nil {
  963. m.allocator.Stop()
  964. m.allocator = nil
  965. }
  966. m.constraintEnforcer.Stop()
  967. m.constraintEnforcer = nil
  968. m.replicatedOrchestrator.Stop()
  969. m.replicatedOrchestrator = nil
  970. m.globalOrchestrator.Stop()
  971. m.globalOrchestrator = nil
  972. m.taskReaper.Stop()
  973. m.taskReaper = nil
  974. m.scheduler.Stop()
  975. m.scheduler = nil
  976. m.roleManager.Stop()
  977. m.roleManager = nil
  978. if m.keyManager != nil {
  979. m.keyManager.Stop()
  980. m.keyManager = nil
  981. }
  982. }
  983. // defaultClusterObject creates a default cluster.
  984. func defaultClusterObject(
  985. clusterID string,
  986. initialCAConfig api.CAConfig,
  987. raftCfg api.RaftConfig,
  988. encryptionConfig api.EncryptionConfig,
  989. initialUnlockKeys []*api.EncryptionKey,
  990. rootCA *ca.RootCA,
  991. fips bool,
  992. defaultAddressPool []string,
  993. subnetSize uint32,
  994. vxlanUDPPort uint32) *api.Cluster {
  995. var caKey []byte
  996. if rcaSigner, err := rootCA.Signer(); err == nil {
  997. caKey = rcaSigner.Key
  998. }
  999. return &api.Cluster{
  1000. ID: clusterID,
  1001. Spec: api.ClusterSpec{
  1002. Annotations: api.Annotations{
  1003. Name: store.DefaultClusterName,
  1004. },
  1005. Orchestration: api.OrchestrationConfig{
  1006. TaskHistoryRetentionLimit: defaultTaskHistoryRetentionLimit,
  1007. },
  1008. Dispatcher: api.DispatcherConfig{
  1009. HeartbeatPeriod: gogotypes.DurationProto(dispatcher.DefaultHeartBeatPeriod),
  1010. },
  1011. Raft: raftCfg,
  1012. CAConfig: initialCAConfig,
  1013. EncryptionConfig: encryptionConfig,
  1014. },
  1015. RootCA: api.RootCA{
  1016. CAKey: caKey,
  1017. CACert: rootCA.Certs,
  1018. CACertHash: rootCA.Digest.String(),
  1019. JoinTokens: api.JoinTokens{
  1020. Worker: ca.GenerateJoinToken(rootCA, fips),
  1021. Manager: ca.GenerateJoinToken(rootCA, fips),
  1022. },
  1023. },
  1024. UnlockKeys: initialUnlockKeys,
  1025. FIPS: fips,
  1026. DefaultAddressPool: defaultAddressPool,
  1027. SubnetSize: subnetSize,
  1028. VXLANUDPPort: vxlanUDPPort,
  1029. }
  1030. }
  1031. // managerNode creates a new node with NodeRoleManager role.
  1032. func managerNode(nodeID string, availability api.NodeSpec_Availability, vxlanPort uint32) *api.Node {
  1033. return &api.Node{
  1034. ID: nodeID,
  1035. Certificate: api.Certificate{
  1036. CN: nodeID,
  1037. Role: api.NodeRoleManager,
  1038. Status: api.IssuanceStatus{
  1039. State: api.IssuanceStateIssued,
  1040. },
  1041. },
  1042. Spec: api.NodeSpec{
  1043. DesiredRole: api.NodeRoleManager,
  1044. Membership: api.NodeMembershipAccepted,
  1045. Availability: availability,
  1046. },
  1047. VXLANUDPPort: vxlanPort,
  1048. }
  1049. }
  1050. // newIngressNetwork returns the network object for the default ingress
  1051. // network, the network which provides the routing mesh. Caller will save to
  1052. // store this object once, at fresh cluster creation. It is expected to
  1053. // call this function inside a store update transaction.
  1054. func newIngressNetwork() *api.Network {
  1055. return &api.Network{
  1056. ID: identity.NewID(),
  1057. Spec: api.NetworkSpec{
  1058. Ingress: true,
  1059. Annotations: api.Annotations{
  1060. Name: "ingress",
  1061. },
  1062. DriverConfig: &api.Driver{},
  1063. IPAM: &api.IPAMOptions{
  1064. Driver: &api.Driver{},
  1065. Configs: []*api.IPAMConfig{
  1066. {
  1067. Subnet: "10.255.0.0/16",
  1068. },
  1069. },
  1070. },
  1071. },
  1072. }
  1073. }
  1074. // Creates a network object representing one of the predefined networks
  1075. // known to be statically created on the cluster nodes. These objects
  1076. // are populated in the store at cluster creation solely in order to
  1077. // support running services on the nodes' predefined networks.
  1078. // External clients can filter these predefined networks by looking
  1079. // at the predefined label.
  1080. func newPredefinedNetwork(name, driver string) *api.Network {
  1081. return &api.Network{
  1082. ID: identity.NewID(),
  1083. Spec: api.NetworkSpec{
  1084. Annotations: api.Annotations{
  1085. Name: name,
  1086. Labels: map[string]string{
  1087. networkallocator.PredefinedLabel: "true",
  1088. },
  1089. },
  1090. DriverConfig: &api.Driver{Name: driver},
  1091. },
  1092. }
  1093. }