manager.go 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253
  1. package manager
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "math"
  7. "net"
  8. "os"
  9. "path/filepath"
  10. "runtime"
  11. "sync"
  12. "syscall"
  13. "time"
  14. "github.com/docker/docker/pkg/plugingetter"
  15. "github.com/docker/go-events"
  16. gmetrics "github.com/docker/go-metrics"
  17. "github.com/docker/swarmkit/api"
  18. "github.com/docker/swarmkit/ca"
  19. "github.com/docker/swarmkit/connectionbroker"
  20. "github.com/docker/swarmkit/identity"
  21. "github.com/docker/swarmkit/log"
  22. "github.com/docker/swarmkit/manager/allocator"
  23. "github.com/docker/swarmkit/manager/allocator/cnmallocator"
  24. "github.com/docker/swarmkit/manager/allocator/networkallocator"
  25. "github.com/docker/swarmkit/manager/controlapi"
  26. "github.com/docker/swarmkit/manager/dispatcher"
  27. "github.com/docker/swarmkit/manager/drivers"
  28. "github.com/docker/swarmkit/manager/health"
  29. "github.com/docker/swarmkit/manager/keymanager"
  30. "github.com/docker/swarmkit/manager/logbroker"
  31. "github.com/docker/swarmkit/manager/metrics"
  32. "github.com/docker/swarmkit/manager/orchestrator/constraintenforcer"
  33. "github.com/docker/swarmkit/manager/orchestrator/global"
  34. "github.com/docker/swarmkit/manager/orchestrator/replicated"
  35. "github.com/docker/swarmkit/manager/orchestrator/taskreaper"
  36. "github.com/docker/swarmkit/manager/resourceapi"
  37. "github.com/docker/swarmkit/manager/scheduler"
  38. "github.com/docker/swarmkit/manager/state/raft"
  39. "github.com/docker/swarmkit/manager/state/raft/transport"
  40. "github.com/docker/swarmkit/manager/state/store"
  41. "github.com/docker/swarmkit/manager/watchapi"
  42. "github.com/docker/swarmkit/remotes"
  43. "github.com/docker/swarmkit/xnet"
  44. gogotypes "github.com/gogo/protobuf/types"
  45. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  46. "github.com/pkg/errors"
  47. "github.com/sirupsen/logrus"
  48. "google.golang.org/grpc"
  49. "google.golang.org/grpc/credentials"
  50. )
  51. const (
  52. // defaultTaskHistoryRetentionLimit is the number of tasks to keep.
  53. defaultTaskHistoryRetentionLimit = 5
  54. )
  55. // RemoteAddrs provides a listening address and an optional advertise address
  56. // for serving the remote API.
  57. type RemoteAddrs struct {
  58. // Address to bind
  59. ListenAddr string
  60. // Address to advertise to remote nodes (optional).
  61. AdvertiseAddr string
  62. }
  63. // Config is used to tune the Manager.
  64. type Config struct {
  65. SecurityConfig *ca.SecurityConfig
  66. // RootCAPaths is the path to which new root certs should be save
  67. RootCAPaths ca.CertPaths
  68. // ExternalCAs is a list of initial CAs to which a manager node
  69. // will make certificate signing requests for node certificates.
  70. ExternalCAs []*api.ExternalCA
  71. // ControlAPI is an address for serving the control API.
  72. ControlAPI string
  73. // RemoteAPI is a listening address for serving the remote API, and
  74. // an optional advertise address.
  75. RemoteAPI *RemoteAddrs
  76. // JoinRaft is an optional address of a node in an existing raft
  77. // cluster to join.
  78. JoinRaft string
  79. // ForceJoin causes us to invoke raft's Join RPC even if already part
  80. // of a cluster.
  81. ForceJoin bool
  82. // StateDir is the top-level state directory
  83. StateDir string
  84. // ForceNewCluster defines if we have to force a new cluster
  85. // because we are recovering from a backup data directory.
  86. ForceNewCluster bool
  87. // ElectionTick defines the amount of ticks needed without
  88. // leader to trigger a new election
  89. ElectionTick uint32
  90. // HeartbeatTick defines the amount of ticks between each
  91. // heartbeat sent to other members for health-check purposes
  92. HeartbeatTick uint32
  93. // AutoLockManagers determines whether or not managers require an unlock key
  94. // when starting from a stopped state. This configuration parameter is only
  95. // applicable when bootstrapping a new cluster for the first time.
  96. AutoLockManagers bool
  97. // UnlockKey is the key to unlock a node - used for decrypting manager TLS keys
  98. // as well as the raft data encryption key (DEK). It is applicable when
  99. // bootstrapping a cluster for the first time (it's a cluster-wide setting),
  100. // and also when loading up any raft data on disk (as a KEK for the raft DEK).
  101. UnlockKey []byte
  102. // Availability allows a user to control the current scheduling status of a node
  103. Availability api.NodeSpec_Availability
  104. // PluginGetter provides access to docker's plugin inventory.
  105. PluginGetter plugingetter.PluginGetter
  106. // FIPS is a boolean stating whether the node is FIPS enabled - if this is the
  107. // first node in the cluster, this setting is used to set the cluster-wide mandatory
  108. // FIPS setting.
  109. FIPS bool
  110. // NetworkConfig stores network related config for the cluster
  111. NetworkConfig *cnmallocator.NetworkConfig
  112. }
  113. // Manager is the cluster manager for Swarm.
  114. // This is the high-level object holding and initializing all the manager
  115. // subsystems.
  116. type Manager struct {
  117. config Config
  118. collector *metrics.Collector
  119. caserver *ca.Server
  120. dispatcher *dispatcher.Dispatcher
  121. logbroker *logbroker.LogBroker
  122. watchServer *watchapi.Server
  123. replicatedOrchestrator *replicated.Orchestrator
  124. globalOrchestrator *global.Orchestrator
  125. taskReaper *taskreaper.TaskReaper
  126. constraintEnforcer *constraintenforcer.ConstraintEnforcer
  127. scheduler *scheduler.Scheduler
  128. allocator *allocator.Allocator
  129. keyManager *keymanager.KeyManager
  130. server *grpc.Server
  131. localserver *grpc.Server
  132. raftNode *raft.Node
  133. dekRotator *RaftDEKManager
  134. roleManager *roleManager
  135. cancelFunc context.CancelFunc
  136. // mu is a general mutex used to coordinate starting/stopping and
  137. // leadership events.
  138. mu sync.Mutex
  139. // addrMu is a mutex that protects config.ControlAPI and config.RemoteAPI
  140. addrMu sync.Mutex
  141. started chan struct{}
  142. stopped bool
  143. remoteListener chan net.Listener
  144. controlListener chan net.Listener
  145. errServe chan error
  146. }
  147. var (
  148. leaderMetric gmetrics.Gauge
  149. )
  150. func init() {
  151. ns := gmetrics.NewNamespace("swarm", "manager", nil)
  152. leaderMetric = ns.NewGauge("leader", "Indicates if this manager node is a leader", "")
  153. gmetrics.Register(ns)
  154. }
  155. type closeOnceListener struct {
  156. once sync.Once
  157. net.Listener
  158. }
  159. func (l *closeOnceListener) Close() error {
  160. var err error
  161. l.once.Do(func() {
  162. err = l.Listener.Close()
  163. })
  164. return err
  165. }
  166. // New creates a Manager which has not started to accept requests yet.
  167. func New(config *Config) (*Manager, error) {
  168. err := os.MkdirAll(config.StateDir, 0700)
  169. if err != nil {
  170. return nil, errors.Wrap(err, "failed to create state directory")
  171. }
  172. raftStateDir := filepath.Join(config.StateDir, "raft")
  173. err = os.MkdirAll(raftStateDir, 0700)
  174. if err != nil {
  175. return nil, errors.Wrap(err, "failed to create raft state directory")
  176. }
  177. raftCfg := raft.DefaultNodeConfig()
  178. if config.ElectionTick > 0 {
  179. raftCfg.ElectionTick = int(config.ElectionTick)
  180. }
  181. if config.HeartbeatTick > 0 {
  182. raftCfg.HeartbeatTick = int(config.HeartbeatTick)
  183. }
  184. dekRotator, err := NewRaftDEKManager(config.SecurityConfig.KeyWriter(), config.FIPS)
  185. if err != nil {
  186. return nil, err
  187. }
  188. newNodeOpts := raft.NodeOptions{
  189. ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
  190. JoinAddr: config.JoinRaft,
  191. ForceJoin: config.ForceJoin,
  192. Config: raftCfg,
  193. StateDir: raftStateDir,
  194. ForceNewCluster: config.ForceNewCluster,
  195. TLSCredentials: config.SecurityConfig.ClientTLSCreds,
  196. KeyRotator: dekRotator,
  197. FIPS: config.FIPS,
  198. }
  199. raftNode := raft.NewNode(newNodeOpts)
  200. // the interceptorWrappers are functions that wrap the prometheus grpc
  201. // interceptor, and add some of code to log errors locally. one for stream
  202. // and one for unary. this is needed because the grpc unary interceptor
  203. // doesn't natively do chaining, you have to implement it in the caller.
  204. // note that even though these are logging errors, we're still using
  205. // debug level. returning errors from GRPC methods is common and expected,
  206. // and logging an ERROR every time a user mistypes a service name would
  207. // pollute the logs really fast.
  208. //
  209. // NOTE(dperny): Because of the fact that these functions are very simple
  210. // in their operation and have no side effects other than the log output,
  211. // they are not automatically tested. If you modify them later, make _sure_
  212. // that they are correct. If you add substantial side effects, abstract
  213. // these out and test them!
  214. unaryInterceptorWrapper := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  215. // pass the call down into the grpc_prometheus interceptor
  216. resp, err := grpc_prometheus.UnaryServerInterceptor(ctx, req, info, handler)
  217. if err != nil {
  218. log.G(ctx).WithField("rpc", info.FullMethod).WithError(err).Debug("error handling rpc")
  219. }
  220. return resp, err
  221. }
  222. streamInterceptorWrapper := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  223. // we can't re-write a stream context, so don't bother creating a
  224. // sub-context like in unary methods
  225. // pass the call down into the grpc_prometheus interceptor
  226. err := grpc_prometheus.StreamServerInterceptor(srv, ss, info, handler)
  227. if err != nil {
  228. log.G(ss.Context()).WithField("rpc", info.FullMethod).WithError(err).Debug("error handling streaming rpc")
  229. }
  230. return err
  231. }
  232. opts := []grpc.ServerOption{
  233. grpc.Creds(config.SecurityConfig.ServerTLSCreds),
  234. grpc.StreamInterceptor(streamInterceptorWrapper),
  235. grpc.UnaryInterceptor(unaryInterceptorWrapper),
  236. grpc.MaxRecvMsgSize(transport.GRPCMaxMsgSize),
  237. }
  238. m := &Manager{
  239. config: *config,
  240. caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig),
  241. dispatcher: dispatcher.New(),
  242. logbroker: logbroker.New(raftNode.MemoryStore()),
  243. watchServer: watchapi.NewServer(raftNode.MemoryStore()),
  244. server: grpc.NewServer(opts...),
  245. localserver: grpc.NewServer(opts...),
  246. raftNode: raftNode,
  247. started: make(chan struct{}),
  248. dekRotator: dekRotator,
  249. remoteListener: make(chan net.Listener, 1),
  250. controlListener: make(chan net.Listener, 1),
  251. errServe: make(chan error, 2),
  252. }
  253. if config.ControlAPI != "" {
  254. m.config.ControlAPI = ""
  255. if err := m.BindControl(config.ControlAPI); err != nil {
  256. return nil, err
  257. }
  258. }
  259. if config.RemoteAPI != nil {
  260. m.config.RemoteAPI = nil
  261. // The context isn't used in this case (before (*Manager).Run).
  262. if err := m.BindRemote(context.Background(), *config.RemoteAPI); err != nil {
  263. if config.ControlAPI != "" {
  264. l := <-m.controlListener
  265. l.Close()
  266. }
  267. return nil, err
  268. }
  269. }
  270. return m, nil
  271. }
  272. // BindControl binds a local socket for the control API.
  273. func (m *Manager) BindControl(addr string) error {
  274. m.addrMu.Lock()
  275. defer m.addrMu.Unlock()
  276. if m.config.ControlAPI != "" {
  277. return errors.New("manager already has a control API address")
  278. }
  279. // don't create a socket directory if we're on windows. we used named pipe
  280. if runtime.GOOS != "windows" {
  281. err := os.MkdirAll(filepath.Dir(addr), 0700)
  282. if err != nil {
  283. return errors.Wrap(err, "failed to create socket directory")
  284. }
  285. }
  286. l, err := xnet.ListenLocal(addr)
  287. // A unix socket may fail to bind if the file already
  288. // exists. Try replacing the file.
  289. if runtime.GOOS != "windows" {
  290. unwrappedErr := err
  291. if op, ok := unwrappedErr.(*net.OpError); ok {
  292. unwrappedErr = op.Err
  293. }
  294. if sys, ok := unwrappedErr.(*os.SyscallError); ok {
  295. unwrappedErr = sys.Err
  296. }
  297. if unwrappedErr == syscall.EADDRINUSE {
  298. os.Remove(addr)
  299. l, err = xnet.ListenLocal(addr)
  300. }
  301. }
  302. if err != nil {
  303. return errors.Wrap(err, "failed to listen on control API address")
  304. }
  305. m.config.ControlAPI = addr
  306. m.controlListener <- l
  307. return nil
  308. }
  309. // BindRemote binds a port for the remote API.
  310. func (m *Manager) BindRemote(ctx context.Context, addrs RemoteAddrs) error {
  311. m.addrMu.Lock()
  312. defer m.addrMu.Unlock()
  313. if m.config.RemoteAPI != nil {
  314. return errors.New("manager already has remote API address")
  315. }
  316. // If an AdvertiseAddr was specified, we use that as our
  317. // externally-reachable address.
  318. advertiseAddr := addrs.AdvertiseAddr
  319. var advertiseAddrPort string
  320. if advertiseAddr == "" {
  321. // Otherwise, we know we are joining an existing swarm. Use a
  322. // wildcard address to trigger remote autodetection of our
  323. // address.
  324. var err error
  325. _, advertiseAddrPort, err = net.SplitHostPort(addrs.ListenAddr)
  326. if err != nil {
  327. return fmt.Errorf("missing or invalid listen address %s", addrs.ListenAddr)
  328. }
  329. // Even with an IPv6 listening address, it's okay to use
  330. // 0.0.0.0 here. Any "unspecified" (wildcard) IP will
  331. // be substituted with the actual source address.
  332. advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort)
  333. }
  334. l, err := net.Listen("tcp", addrs.ListenAddr)
  335. if err != nil {
  336. return errors.Wrap(err, "failed to listen on remote API address")
  337. }
  338. if advertiseAddrPort == "0" {
  339. advertiseAddr = l.Addr().String()
  340. addrs.ListenAddr = advertiseAddr
  341. }
  342. m.config.RemoteAPI = &addrs
  343. m.raftNode.SetAddr(ctx, advertiseAddr)
  344. m.remoteListener <- l
  345. return nil
  346. }
  347. // RemovedFromRaft returns a channel that's closed if the manager is removed
  348. // from the raft cluster. This should be used to trigger a manager shutdown.
  349. func (m *Manager) RemovedFromRaft() <-chan struct{} {
  350. return m.raftNode.RemovedFromRaft
  351. }
  352. // Addr returns tcp address on which remote api listens.
  353. func (m *Manager) Addr() string {
  354. m.addrMu.Lock()
  355. defer m.addrMu.Unlock()
  356. if m.config.RemoteAPI == nil {
  357. return ""
  358. }
  359. return m.config.RemoteAPI.ListenAddr
  360. }
  361. // Run starts all manager sub-systems and the gRPC server at the configured
  362. // address.
  363. // The call never returns unless an error occurs or `Stop()` is called.
  364. func (m *Manager) Run(parent context.Context) error {
  365. ctx, ctxCancel := context.WithCancel(parent)
  366. defer ctxCancel()
  367. m.cancelFunc = ctxCancel
  368. leadershipCh, cancel := m.raftNode.SubscribeLeadership()
  369. defer cancel()
  370. go m.handleLeadershipEvents(ctx, leadershipCh)
  371. authorize := func(ctx context.Context, roles []string) error {
  372. var (
  373. blacklistedCerts map[string]*api.BlacklistedCertificate
  374. clusters []*api.Cluster
  375. err error
  376. )
  377. m.raftNode.MemoryStore().View(func(readTx store.ReadTx) {
  378. clusters, err = store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
  379. })
  380. // Not having a cluster object yet means we can't check
  381. // the blacklist.
  382. if err == nil && len(clusters) == 1 {
  383. blacklistedCerts = clusters[0].BlacklistedCertificates
  384. }
  385. // Authorize the remote roles, ensure they can only be forwarded by managers
  386. _, err = ca.AuthorizeForwardedRoleAndOrg(ctx, roles, []string{ca.ManagerRole}, m.config.SecurityConfig.ClientTLSCreds.Organization(), blacklistedCerts)
  387. return err
  388. }
  389. baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.config.PluginGetter, drivers.New(m.config.PluginGetter))
  390. baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore())
  391. healthServer := health.NewHealthServer()
  392. localHealthServer := health.NewHealthServer()
  393. authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
  394. authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(m.watchServer, authorize)
  395. authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
  396. authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize)
  397. authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize)
  398. authenticatedDispatcherAPI := api.NewAuthenticatedWrapperDispatcherServer(m.dispatcher, authorize)
  399. authenticatedCAAPI := api.NewAuthenticatedWrapperCAServer(m.caserver, authorize)
  400. authenticatedNodeCAAPI := api.NewAuthenticatedWrapperNodeCAServer(m.caserver, authorize)
  401. authenticatedRaftAPI := api.NewAuthenticatedWrapperRaftServer(m.raftNode, authorize)
  402. authenticatedHealthAPI := api.NewAuthenticatedWrapperHealthServer(healthServer, authorize)
  403. authenticatedRaftMembershipAPI := api.NewAuthenticatedWrapperRaftMembershipServer(m.raftNode, authorize)
  404. proxyDispatcherAPI := api.NewRaftProxyDispatcherServer(authenticatedDispatcherAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  405. proxyCAAPI := api.NewRaftProxyCAServer(authenticatedCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  406. proxyNodeCAAPI := api.NewRaftProxyNodeCAServer(authenticatedNodeCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  407. proxyRaftMembershipAPI := api.NewRaftProxyRaftMembershipServer(authenticatedRaftMembershipAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  408. proxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(authenticatedResourceAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  409. proxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(authenticatedLogBrokerAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  410. // The following local proxies are only wired up to receive requests
  411. // from a trusted local socket, and these requests don't use TLS,
  412. // therefore the requests they handle locally should bypass
  413. // authorization. When requests are proxied from these servers, they
  414. // are sent as requests from this manager rather than forwarded
  415. // requests (it has no TLS information to put in the metadata map).
  416. forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
  417. handleRequestLocally := func(ctx context.Context) (context.Context, error) {
  418. remoteAddr := "127.0.0.1:0"
  419. m.addrMu.Lock()
  420. if m.config.RemoteAPI != nil {
  421. if m.config.RemoteAPI.AdvertiseAddr != "" {
  422. remoteAddr = m.config.RemoteAPI.AdvertiseAddr
  423. } else {
  424. remoteAddr = m.config.RemoteAPI.ListenAddr
  425. }
  426. }
  427. m.addrMu.Unlock()
  428. creds := m.config.SecurityConfig.ClientTLSCreds
  429. nodeInfo := ca.RemoteNodeInfo{
  430. Roles: []string{creds.Role()},
  431. Organization: creds.Organization(),
  432. NodeID: creds.NodeID(),
  433. RemoteAddr: remoteAddr,
  434. }
  435. return context.WithValue(ctx, ca.LocalRequestKey, nodeInfo), nil
  436. }
  437. localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  438. localProxyLogsAPI := api.NewRaftProxyLogsServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  439. localProxyDispatcherAPI := api.NewRaftProxyDispatcherServer(m.dispatcher, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  440. localProxyCAAPI := api.NewRaftProxyCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  441. localProxyNodeCAAPI := api.NewRaftProxyNodeCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  442. localProxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(baseResourceAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  443. localProxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  444. // Everything registered on m.server should be an authenticated
  445. // wrapper, or a proxy wrapping an authenticated wrapper!
  446. api.RegisterCAServer(m.server, proxyCAAPI)
  447. api.RegisterNodeCAServer(m.server, proxyNodeCAAPI)
  448. api.RegisterRaftServer(m.server, authenticatedRaftAPI)
  449. api.RegisterHealthServer(m.server, authenticatedHealthAPI)
  450. api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI)
  451. api.RegisterControlServer(m.server, authenticatedControlAPI)
  452. api.RegisterWatchServer(m.server, authenticatedWatchAPI)
  453. api.RegisterLogsServer(m.server, authenticatedLogsServerAPI)
  454. api.RegisterLogBrokerServer(m.server, proxyLogBrokerAPI)
  455. api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI)
  456. api.RegisterDispatcherServer(m.server, proxyDispatcherAPI)
  457. grpc_prometheus.Register(m.server)
  458. api.RegisterControlServer(m.localserver, localProxyControlAPI)
  459. api.RegisterWatchServer(m.localserver, m.watchServer)
  460. api.RegisterLogsServer(m.localserver, localProxyLogsAPI)
  461. api.RegisterHealthServer(m.localserver, localHealthServer)
  462. api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI)
  463. api.RegisterCAServer(m.localserver, localProxyCAAPI)
  464. api.RegisterNodeCAServer(m.localserver, localProxyNodeCAAPI)
  465. api.RegisterResourceAllocatorServer(m.localserver, localProxyResourceAPI)
  466. api.RegisterLogBrokerServer(m.localserver, localProxyLogBrokerAPI)
  467. grpc_prometheus.Register(m.localserver)
  468. healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING)
  469. localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)
  470. if err := m.watchServer.Start(ctx); err != nil {
  471. log.G(ctx).WithError(err).Error("watch server failed to start")
  472. }
  473. go m.serveListener(ctx, m.remoteListener)
  474. go m.serveListener(ctx, m.controlListener)
  475. defer func() {
  476. m.server.Stop()
  477. m.localserver.Stop()
  478. }()
  479. // Set the raft server as serving for the health server
  480. healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING)
  481. if err := m.raftNode.JoinAndStart(ctx); err != nil {
  482. // Don't block future calls to Stop.
  483. close(m.started)
  484. return errors.Wrap(err, "can't initialize raft node")
  485. }
  486. localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING)
  487. // Start metrics collection.
  488. m.collector = metrics.NewCollector(m.raftNode.MemoryStore())
  489. go func(collector *metrics.Collector) {
  490. if err := collector.Run(ctx); err != nil {
  491. log.G(ctx).WithError(err).Error("collector failed with an error")
  492. }
  493. }(m.collector)
  494. close(m.started)
  495. go func() {
  496. err := m.raftNode.Run(ctx)
  497. if err != nil {
  498. log.G(ctx).WithError(err).Error("raft node stopped")
  499. m.Stop(ctx, false)
  500. }
  501. }()
  502. if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
  503. return err
  504. }
  505. c, err := raft.WaitForCluster(ctx, m.raftNode)
  506. if err != nil {
  507. return err
  508. }
  509. raftConfig := c.Spec.Raft
  510. if err := m.watchForClusterChanges(ctx); err != nil {
  511. return err
  512. }
  513. if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick {
  514. log.G(ctx).Warningf("election tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.ElectionTick, raftConfig.ElectionTick)
  515. }
  516. if int(raftConfig.HeartbeatTick) != m.raftNode.Config.HeartbeatTick {
  517. log.G(ctx).Warningf("heartbeat tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.HeartbeatTick, raftConfig.HeartbeatTick)
  518. }
  519. // wait for an error in serving.
  520. err = <-m.errServe
  521. m.mu.Lock()
  522. if m.stopped {
  523. m.mu.Unlock()
  524. return nil
  525. }
  526. m.mu.Unlock()
  527. m.Stop(ctx, false)
  528. return err
  529. }
  530. const stopTimeout = 8 * time.Second
  531. // Stop stops the manager. It immediately closes all open connections and
  532. // active RPCs as well as stopping the manager's subsystems. If clearData is
  533. // set, the raft logs, snapshots, and keys will be erased.
  534. func (m *Manager) Stop(ctx context.Context, clearData bool) {
  535. log.G(ctx).Info("Stopping manager")
  536. // It's not safe to start shutting down while the manager is still
  537. // starting up.
  538. <-m.started
  539. // the mutex stops us from trying to stop while we're already stopping, or
  540. // from returning before we've finished stopping.
  541. m.mu.Lock()
  542. defer m.mu.Unlock()
  543. if m.stopped {
  544. return
  545. }
  546. m.stopped = true
  547. srvDone, localSrvDone := make(chan struct{}), make(chan struct{})
  548. go func() {
  549. m.server.GracefulStop()
  550. close(srvDone)
  551. }()
  552. go func() {
  553. m.localserver.GracefulStop()
  554. close(localSrvDone)
  555. }()
  556. m.raftNode.Cancel()
  557. if m.collector != nil {
  558. m.collector.Stop()
  559. }
  560. // The following components are gRPC services that are
  561. // registered when creating the manager and will need
  562. // to be re-registered if they are recreated.
  563. // For simplicity, they are not nilled out.
  564. m.dispatcher.Stop()
  565. m.logbroker.Stop()
  566. m.watchServer.Stop()
  567. m.caserver.Stop()
  568. if m.allocator != nil {
  569. m.allocator.Stop()
  570. }
  571. if m.replicatedOrchestrator != nil {
  572. m.replicatedOrchestrator.Stop()
  573. }
  574. if m.globalOrchestrator != nil {
  575. m.globalOrchestrator.Stop()
  576. }
  577. if m.taskReaper != nil {
  578. m.taskReaper.Stop()
  579. }
  580. if m.constraintEnforcer != nil {
  581. m.constraintEnforcer.Stop()
  582. }
  583. if m.scheduler != nil {
  584. m.scheduler.Stop()
  585. }
  586. if m.roleManager != nil {
  587. m.roleManager.Stop()
  588. }
  589. if m.keyManager != nil {
  590. m.keyManager.Stop()
  591. }
  592. if clearData {
  593. m.raftNode.ClearData()
  594. }
  595. m.cancelFunc()
  596. <-m.raftNode.Done()
  597. timer := time.AfterFunc(stopTimeout, func() {
  598. m.server.Stop()
  599. m.localserver.Stop()
  600. })
  601. defer timer.Stop()
  602. // TODO: we're not waiting on ctx because it very well could be passed from Run,
  603. // which is already cancelled here. We need to refactor that.
  604. select {
  605. case <-srvDone:
  606. <-localSrvDone
  607. case <-localSrvDone:
  608. <-srvDone
  609. }
  610. log.G(ctx).Info("Manager shut down")
  611. // mutex is released and Run can return now
  612. }
  613. func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
  614. securityConfig := m.config.SecurityConfig
  615. nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
  616. logger := log.G(ctx).WithFields(logrus.Fields{
  617. "node.id": nodeID,
  618. "node.role": ca.ManagerRole,
  619. })
  620. kekData := ca.KEKData{Version: cluster.Meta.Version.Index}
  621. for _, encryptionKey := range cluster.UnlockKeys {
  622. if encryptionKey.Subsystem == ca.ManagerRole {
  623. kekData.KEK = encryptionKey.Key
  624. break
  625. }
  626. }
  627. updated, unlockedToLocked, err := m.dekRotator.MaybeUpdateKEK(kekData)
  628. if err != nil {
  629. logger.WithError(err).Errorf("failed to re-encrypt TLS key with a new KEK")
  630. return err
  631. }
  632. if updated {
  633. logger.Debug("successfully rotated KEK")
  634. }
  635. if unlockedToLocked {
  636. // a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews;
  637. // don't wait because it might take a bit
  638. go func() {
  639. insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
  640. conn, err := grpc.Dial(
  641. m.config.ControlAPI,
  642. grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
  643. grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
  644. grpc.WithTransportCredentials(insecureCreds),
  645. grpc.WithDialer(
  646. func(addr string, timeout time.Duration) (net.Conn, error) {
  647. return xnet.DialTimeoutLocal(addr, timeout)
  648. }),
  649. grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
  650. )
  651. if err != nil {
  652. logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster")
  653. return
  654. }
  655. defer conn.Close()
  656. connBroker := connectionbroker.New(remotes.NewRemotes())
  657. connBroker.SetLocalConn(conn)
  658. if err := ca.RenewTLSConfigNow(ctx, securityConfig, connBroker, m.config.RootCAPaths); err != nil {
  659. logger.WithError(err).Error("failed to download new TLS certificate after locking the cluster")
  660. }
  661. }()
  662. }
  663. return nil
  664. }
  665. func (m *Manager) watchForClusterChanges(ctx context.Context) error {
  666. clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
  667. var cluster *api.Cluster
  668. clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(),
  669. func(tx store.ReadTx) error {
  670. cluster = store.GetCluster(tx, clusterID)
  671. if cluster == nil {
  672. return fmt.Errorf("unable to get current cluster")
  673. }
  674. return nil
  675. },
  676. api.EventUpdateCluster{
  677. Cluster: &api.Cluster{ID: clusterID},
  678. Checks: []api.ClusterCheckFunc{api.ClusterCheckID},
  679. },
  680. )
  681. if err != nil {
  682. return err
  683. }
  684. if err := m.updateKEK(ctx, cluster); err != nil {
  685. return err
  686. }
  687. go func() {
  688. for {
  689. select {
  690. case event := <-clusterWatch:
  691. clusterEvent := event.(api.EventUpdateCluster)
  692. m.updateKEK(ctx, clusterEvent.Cluster)
  693. case <-ctx.Done():
  694. clusterWatchCancel()
  695. return
  696. }
  697. }
  698. }()
  699. return nil
  700. }
  701. // getLeaderNodeID is a small helper function returning a string with the
  702. // leader's node ID. it is only used for logging, and should not be relied on
  703. // to give a node ID for actual operational purposes (because it returns errors
  704. // as nicely decorated strings)
  705. func (m *Manager) getLeaderNodeID() string {
  706. // get the current leader ID. this variable tracks the leader *only* for
  707. // the purposes of logging leadership changes, and should not be relied on
  708. // for other purposes
  709. leader, leaderErr := m.raftNode.Leader()
  710. switch leaderErr {
  711. case raft.ErrNoRaftMember:
  712. // this is an unlikely case, but we have to handle it. this means this
  713. // node is not a member of the raft quorum. this won't look very pretty
  714. // in logs ("leadership changed from aslkdjfa to ErrNoRaftMember") but
  715. // it also won't be very common
  716. return "not yet part of a raft cluster"
  717. case raft.ErrNoClusterLeader:
  718. return "no cluster leader"
  719. default:
  720. id, err := m.raftNode.GetNodeIDByRaftID(leader)
  721. // the only possible error here is "ErrMemberUnknown"
  722. if err != nil {
  723. return "an unknown node"
  724. }
  725. return id
  726. }
  727. }
  728. // handleLeadershipEvents handles the is leader event or is follower event.
  729. func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan events.Event) {
  730. // get the current leader and save it for logging leadership changes in
  731. // this loop
  732. oldLeader := m.getLeaderNodeID()
  733. for {
  734. select {
  735. case leadershipEvent := <-leadershipCh:
  736. m.mu.Lock()
  737. if m.stopped {
  738. m.mu.Unlock()
  739. return
  740. }
  741. newState := leadershipEvent.(raft.LeadershipState)
  742. if newState == raft.IsLeader {
  743. m.becomeLeader(ctx)
  744. leaderMetric.Set(1)
  745. } else if newState == raft.IsFollower {
  746. m.becomeFollower()
  747. leaderMetric.Set(0)
  748. }
  749. m.mu.Unlock()
  750. newLeader := m.getLeaderNodeID()
  751. // maybe we should use logrus fields for old and new leader, so
  752. // that users are better able to ingest leadership changes into log
  753. // aggregators?
  754. log.G(ctx).Infof("leadership changed from %v to %v", oldLeader, newLeader)
  755. case <-ctx.Done():
  756. return
  757. }
  758. }
  759. }
  760. // serveListener serves a listener for local and non local connections.
  761. func (m *Manager) serveListener(ctx context.Context, lCh <-chan net.Listener) {
  762. var l net.Listener
  763. select {
  764. case l = <-lCh:
  765. case <-ctx.Done():
  766. return
  767. }
  768. ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
  769. logrus.Fields{
  770. "proto": l.Addr().Network(),
  771. "addr": l.Addr().String(),
  772. }))
  773. if _, ok := l.(*net.TCPListener); !ok {
  774. log.G(ctx).Info("Listening for local connections")
  775. // we need to disallow double closes because UnixListener.Close
  776. // can delete unix-socket file of newer listener. grpc calls
  777. // Close twice indeed: in Serve and in Stop.
  778. m.errServe <- m.localserver.Serve(&closeOnceListener{Listener: l})
  779. } else {
  780. log.G(ctx).Info("Listening for connections")
  781. m.errServe <- m.server.Serve(l)
  782. }
  783. }
  784. // becomeLeader starts the subsystems that are run on the leader.
  785. func (m *Manager) becomeLeader(ctx context.Context) {
  786. s := m.raftNode.MemoryStore()
  787. rootCA := m.config.SecurityConfig.RootCA()
  788. nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
  789. raftCfg := raft.DefaultRaftConfig()
  790. raftCfg.ElectionTick = uint32(m.raftNode.Config.ElectionTick)
  791. raftCfg.HeartbeatTick = uint32(m.raftNode.Config.HeartbeatTick)
  792. clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
  793. initialCAConfig := ca.DefaultCAConfig()
  794. initialCAConfig.ExternalCAs = m.config.ExternalCAs
  795. var (
  796. unlockKeys []*api.EncryptionKey
  797. err error
  798. )
  799. if m.config.AutoLockManagers {
  800. unlockKeys = []*api.EncryptionKey{{
  801. Subsystem: ca.ManagerRole,
  802. Key: m.config.UnlockKey,
  803. }}
  804. }
  805. s.Update(func(tx store.Tx) error {
  806. // Add a default cluster object to the
  807. // store. Don't check the error because
  808. // we expect this to fail unless this
  809. // is a brand new cluster.
  810. clusterObj := defaultClusterObject(
  811. clusterID,
  812. initialCAConfig,
  813. raftCfg,
  814. api.EncryptionConfig{AutoLockManagers: m.config.AutoLockManagers},
  815. unlockKeys,
  816. rootCA,
  817. m.config.FIPS,
  818. nil,
  819. 0,
  820. 0)
  821. // If defaultAddrPool is valid we update cluster object with new value
  822. // If VXLANUDPPort is not 0 then we call update cluster object with new value
  823. if m.config.NetworkConfig != nil {
  824. if m.config.NetworkConfig.DefaultAddrPool != nil {
  825. clusterObj.DefaultAddressPool = m.config.NetworkConfig.DefaultAddrPool
  826. clusterObj.SubnetSize = m.config.NetworkConfig.SubnetSize
  827. }
  828. if m.config.NetworkConfig.VXLANUDPPort != 0 {
  829. clusterObj.VXLANUDPPort = m.config.NetworkConfig.VXLANUDPPort
  830. }
  831. }
  832. err := store.CreateCluster(tx, clusterObj)
  833. if err != nil && err != store.ErrExist {
  834. log.G(ctx).WithError(err).Errorf("error creating cluster object")
  835. }
  836. // Add Node entry for ourself, if one
  837. // doesn't exist already.
  838. freshCluster := nil == store.CreateNode(tx, managerNode(nodeID, m.config.Availability, clusterObj.VXLANUDPPort))
  839. if freshCluster {
  840. // This is a fresh swarm cluster. Add to store now any initial
  841. // cluster resource, like the default ingress network which
  842. // provides the routing mesh for this cluster.
  843. log.G(ctx).Info("Creating default ingress network")
  844. if err := store.CreateNetwork(tx, newIngressNetwork()); err != nil {
  845. log.G(ctx).WithError(err).Error("failed to create default ingress network")
  846. }
  847. }
  848. // Create now the static predefined if the store does not contain predefined
  849. // networks like bridge/host node-local networks which
  850. // are known to be present in each cluster node. This is needed
  851. // in order to allow running services on the predefined docker
  852. // networks like `bridge` and `host`.
  853. for _, p := range allocator.PredefinedNetworks() {
  854. if err := store.CreateNetwork(tx, newPredefinedNetwork(p.Name, p.Driver)); err != nil && err != store.ErrNameConflict {
  855. log.G(ctx).WithError(err).Error("failed to create predefined network " + p.Name)
  856. }
  857. }
  858. return nil
  859. })
  860. m.replicatedOrchestrator = replicated.NewReplicatedOrchestrator(s)
  861. m.constraintEnforcer = constraintenforcer.New(s)
  862. m.globalOrchestrator = global.NewGlobalOrchestrator(s)
  863. m.taskReaper = taskreaper.New(s)
  864. m.scheduler = scheduler.New(s)
  865. m.keyManager = keymanager.New(s, keymanager.DefaultConfig())
  866. m.roleManager = newRoleManager(s, m.raftNode)
  867. // TODO(stevvooe): Allocate a context that can be used to
  868. // shutdown underlying manager processes when leadership isTestUpdaterRollback
  869. // lost.
  870. // If DefaultAddrPool is null, Read from store and check if
  871. // DefaultAddrPool info is stored in cluster object
  872. // If VXLANUDPPort is 0, read it from the store - cluster object
  873. if m.config.NetworkConfig == nil || m.config.NetworkConfig.DefaultAddrPool == nil || m.config.NetworkConfig.VXLANUDPPort == 0 {
  874. var cluster *api.Cluster
  875. s.View(func(tx store.ReadTx) {
  876. cluster = store.GetCluster(tx, clusterID)
  877. })
  878. if cluster.DefaultAddressPool != nil {
  879. if m.config.NetworkConfig == nil {
  880. m.config.NetworkConfig = &cnmallocator.NetworkConfig{}
  881. }
  882. m.config.NetworkConfig.DefaultAddrPool = append(m.config.NetworkConfig.DefaultAddrPool, cluster.DefaultAddressPool...)
  883. m.config.NetworkConfig.SubnetSize = cluster.SubnetSize
  884. }
  885. if cluster.VXLANUDPPort != 0 {
  886. if m.config.NetworkConfig == nil {
  887. m.config.NetworkConfig = &cnmallocator.NetworkConfig{}
  888. }
  889. m.config.NetworkConfig.VXLANUDPPort = cluster.VXLANUDPPort
  890. }
  891. }
  892. m.allocator, err = allocator.New(s, m.config.PluginGetter, m.config.NetworkConfig)
  893. if err != nil {
  894. log.G(ctx).WithError(err).Error("failed to create allocator")
  895. // TODO(stevvooe): It doesn't seem correct here to fail
  896. // creating the allocator but then use it anyway.
  897. }
  898. if m.keyManager != nil {
  899. go func(keyManager *keymanager.KeyManager) {
  900. if err := keyManager.Run(ctx); err != nil {
  901. log.G(ctx).WithError(err).Error("keymanager failed with an error")
  902. }
  903. }(m.keyManager)
  904. }
  905. go func(d *dispatcher.Dispatcher) {
  906. // Initialize the dispatcher.
  907. d.Init(m.raftNode, dispatcher.DefaultConfig(), drivers.New(m.config.PluginGetter), m.config.SecurityConfig)
  908. if err := d.Run(ctx); err != nil {
  909. log.G(ctx).WithError(err).Error("Dispatcher exited with an error")
  910. }
  911. }(m.dispatcher)
  912. if err := m.logbroker.Start(ctx); err != nil {
  913. log.G(ctx).WithError(err).Error("LogBroker failed to start")
  914. }
  915. go func(server *ca.Server) {
  916. if err := server.Run(ctx); err != nil {
  917. log.G(ctx).WithError(err).Error("CA signer exited with an error")
  918. }
  919. }(m.caserver)
  920. // Start all sub-components in separate goroutines.
  921. // TODO(aluzzardi): This should have some kind of error handling so that
  922. // any component that goes down would bring the entire manager down.
  923. if m.allocator != nil {
  924. go func(allocator *allocator.Allocator) {
  925. if err := allocator.Run(ctx); err != nil {
  926. log.G(ctx).WithError(err).Error("allocator exited with an error")
  927. }
  928. }(m.allocator)
  929. }
  930. go func(scheduler *scheduler.Scheduler) {
  931. if err := scheduler.Run(ctx); err != nil {
  932. log.G(ctx).WithError(err).Error("scheduler exited with an error")
  933. }
  934. }(m.scheduler)
  935. go func(constraintEnforcer *constraintenforcer.ConstraintEnforcer) {
  936. constraintEnforcer.Run()
  937. }(m.constraintEnforcer)
  938. go func(taskReaper *taskreaper.TaskReaper) {
  939. taskReaper.Run(ctx)
  940. }(m.taskReaper)
  941. go func(orchestrator *replicated.Orchestrator) {
  942. if err := orchestrator.Run(ctx); err != nil {
  943. log.G(ctx).WithError(err).Error("replicated orchestrator exited with an error")
  944. }
  945. }(m.replicatedOrchestrator)
  946. go func(globalOrchestrator *global.Orchestrator) {
  947. if err := globalOrchestrator.Run(ctx); err != nil {
  948. log.G(ctx).WithError(err).Error("global orchestrator exited with an error")
  949. }
  950. }(m.globalOrchestrator)
  951. go func(roleManager *roleManager) {
  952. roleManager.Run(ctx)
  953. }(m.roleManager)
  954. }
  955. // becomeFollower shuts down the subsystems that are only run by the leader.
  956. func (m *Manager) becomeFollower() {
  957. // The following components are gRPC services that are
  958. // registered when creating the manager and will need
  959. // to be re-registered if they are recreated.
  960. // For simplicity, they are not nilled out.
  961. m.dispatcher.Stop()
  962. m.logbroker.Stop()
  963. m.caserver.Stop()
  964. if m.allocator != nil {
  965. m.allocator.Stop()
  966. m.allocator = nil
  967. }
  968. m.constraintEnforcer.Stop()
  969. m.constraintEnforcer = nil
  970. m.replicatedOrchestrator.Stop()
  971. m.replicatedOrchestrator = nil
  972. m.globalOrchestrator.Stop()
  973. m.globalOrchestrator = nil
  974. m.taskReaper.Stop()
  975. m.taskReaper = nil
  976. m.scheduler.Stop()
  977. m.scheduler = nil
  978. m.roleManager.Stop()
  979. m.roleManager = nil
  980. if m.keyManager != nil {
  981. m.keyManager.Stop()
  982. m.keyManager = nil
  983. }
  984. }
  985. // defaultClusterObject creates a default cluster.
  986. func defaultClusterObject(
  987. clusterID string,
  988. initialCAConfig api.CAConfig,
  989. raftCfg api.RaftConfig,
  990. encryptionConfig api.EncryptionConfig,
  991. initialUnlockKeys []*api.EncryptionKey,
  992. rootCA *ca.RootCA,
  993. fips bool,
  994. defaultAddressPool []string,
  995. subnetSize uint32,
  996. vxlanUDPPort uint32) *api.Cluster {
  997. var caKey []byte
  998. if rcaSigner, err := rootCA.Signer(); err == nil {
  999. caKey = rcaSigner.Key
  1000. }
  1001. return &api.Cluster{
  1002. ID: clusterID,
  1003. Spec: api.ClusterSpec{
  1004. Annotations: api.Annotations{
  1005. Name: store.DefaultClusterName,
  1006. },
  1007. Orchestration: api.OrchestrationConfig{
  1008. TaskHistoryRetentionLimit: defaultTaskHistoryRetentionLimit,
  1009. },
  1010. Dispatcher: api.DispatcherConfig{
  1011. HeartbeatPeriod: gogotypes.DurationProto(dispatcher.DefaultHeartBeatPeriod),
  1012. },
  1013. Raft: raftCfg,
  1014. CAConfig: initialCAConfig,
  1015. EncryptionConfig: encryptionConfig,
  1016. },
  1017. RootCA: api.RootCA{
  1018. CAKey: caKey,
  1019. CACert: rootCA.Certs,
  1020. CACertHash: rootCA.Digest.String(),
  1021. JoinTokens: api.JoinTokens{
  1022. Worker: ca.GenerateJoinToken(rootCA, fips),
  1023. Manager: ca.GenerateJoinToken(rootCA, fips),
  1024. },
  1025. },
  1026. UnlockKeys: initialUnlockKeys,
  1027. FIPS: fips,
  1028. DefaultAddressPool: defaultAddressPool,
  1029. SubnetSize: subnetSize,
  1030. VXLANUDPPort: vxlanUDPPort,
  1031. }
  1032. }
  1033. // managerNode creates a new node with NodeRoleManager role.
  1034. func managerNode(nodeID string, availability api.NodeSpec_Availability, vxlanPort uint32) *api.Node {
  1035. return &api.Node{
  1036. ID: nodeID,
  1037. Certificate: api.Certificate{
  1038. CN: nodeID,
  1039. Role: api.NodeRoleManager,
  1040. Status: api.IssuanceStatus{
  1041. State: api.IssuanceStateIssued,
  1042. },
  1043. },
  1044. Spec: api.NodeSpec{
  1045. DesiredRole: api.NodeRoleManager,
  1046. Membership: api.NodeMembershipAccepted,
  1047. Availability: availability,
  1048. },
  1049. VXLANUDPPort: vxlanPort,
  1050. }
  1051. }
  1052. // newIngressNetwork returns the network object for the default ingress
  1053. // network, the network which provides the routing mesh. Caller will save to
  1054. // store this object once, at fresh cluster creation. It is expected to
  1055. // call this function inside a store update transaction.
  1056. func newIngressNetwork() *api.Network {
  1057. return &api.Network{
  1058. ID: identity.NewID(),
  1059. Spec: api.NetworkSpec{
  1060. Ingress: true,
  1061. Annotations: api.Annotations{
  1062. Name: "ingress",
  1063. },
  1064. DriverConfig: &api.Driver{},
  1065. IPAM: &api.IPAMOptions{
  1066. Driver: &api.Driver{},
  1067. Configs: []*api.IPAMConfig{},
  1068. },
  1069. },
  1070. }
  1071. }
  1072. // Creates a network object representing one of the predefined networks
  1073. // known to be statically created on the cluster nodes. These objects
  1074. // are populated in the store at cluster creation solely in order to
  1075. // support running services on the nodes' predefined networks.
  1076. // External clients can filter these predefined networks by looking
  1077. // at the predefined label.
  1078. func newPredefinedNetwork(name, driver string) *api.Network {
  1079. return &api.Network{
  1080. ID: identity.NewID(),
  1081. Spec: api.NetworkSpec{
  1082. Annotations: api.Annotations{
  1083. Name: name,
  1084. Labels: map[string]string{
  1085. networkallocator.PredefinedLabel: "true",
  1086. },
  1087. },
  1088. DriverConfig: &api.Driver{Name: driver},
  1089. },
  1090. }
  1091. }