manager.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226
  1. package manager
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "encoding/pem"
  6. "fmt"
  7. "net"
  8. "os"
  9. "path/filepath"
  10. "runtime"
  11. "sync"
  12. "syscall"
  13. "time"
  14. "github.com/cloudflare/cfssl/helpers"
  15. "github.com/docker/docker/pkg/plugingetter"
  16. "github.com/docker/go-events"
  17. gmetrics "github.com/docker/go-metrics"
  18. "github.com/docker/swarmkit/api"
  19. "github.com/docker/swarmkit/ca"
  20. "github.com/docker/swarmkit/connectionbroker"
  21. "github.com/docker/swarmkit/identity"
  22. "github.com/docker/swarmkit/log"
  23. "github.com/docker/swarmkit/manager/allocator"
  24. "github.com/docker/swarmkit/manager/allocator/networkallocator"
  25. "github.com/docker/swarmkit/manager/controlapi"
  26. "github.com/docker/swarmkit/manager/dispatcher"
  27. "github.com/docker/swarmkit/manager/drivers"
  28. "github.com/docker/swarmkit/manager/health"
  29. "github.com/docker/swarmkit/manager/keymanager"
  30. "github.com/docker/swarmkit/manager/logbroker"
  31. "github.com/docker/swarmkit/manager/metrics"
  32. "github.com/docker/swarmkit/manager/orchestrator/constraintenforcer"
  33. "github.com/docker/swarmkit/manager/orchestrator/global"
  34. "github.com/docker/swarmkit/manager/orchestrator/replicated"
  35. "github.com/docker/swarmkit/manager/orchestrator/taskreaper"
  36. "github.com/docker/swarmkit/manager/resourceapi"
  37. "github.com/docker/swarmkit/manager/scheduler"
  38. "github.com/docker/swarmkit/manager/state/raft"
  39. "github.com/docker/swarmkit/manager/state/raft/transport"
  40. "github.com/docker/swarmkit/manager/state/store"
  41. "github.com/docker/swarmkit/manager/watchapi"
  42. "github.com/docker/swarmkit/remotes"
  43. "github.com/docker/swarmkit/xnet"
  44. gogotypes "github.com/gogo/protobuf/types"
  45. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  46. "github.com/pkg/errors"
  47. "github.com/sirupsen/logrus"
  48. "golang.org/x/net/context"
  49. "google.golang.org/grpc"
  50. "google.golang.org/grpc/credentials"
  51. )
  52. const (
  53. // defaultTaskHistoryRetentionLimit is the number of tasks to keep.
  54. defaultTaskHistoryRetentionLimit = 5
  55. )
  56. // RemoteAddrs provides a listening address and an optional advertise address
  57. // for serving the remote API.
  58. type RemoteAddrs struct {
  59. // Address to bind
  60. ListenAddr string
  61. // Address to advertise to remote nodes (optional).
  62. AdvertiseAddr string
  63. }
  64. // Config is used to tune the Manager.
  65. type Config struct {
  66. SecurityConfig *ca.SecurityConfig
  67. // RootCAPaths is the path to which new root certs should be save
  68. RootCAPaths ca.CertPaths
  69. // ExternalCAs is a list of initial CAs to which a manager node
  70. // will make certificate signing requests for node certificates.
  71. ExternalCAs []*api.ExternalCA
  72. // ControlAPI is an address for serving the control API.
  73. ControlAPI string
  74. // RemoteAPI is a listening address for serving the remote API, and
  75. // an optional advertise address.
  76. RemoteAPI *RemoteAddrs
  77. // JoinRaft is an optional address of a node in an existing raft
  78. // cluster to join.
  79. JoinRaft string
  80. // ForceJoin causes us to invoke raft's Join RPC even if already part
  81. // of a cluster.
  82. ForceJoin bool
  83. // StateDir is the top-level state directory
  84. StateDir string
  85. // ForceNewCluster defines if we have to force a new cluster
  86. // because we are recovering from a backup data directory.
  87. ForceNewCluster bool
  88. // ElectionTick defines the amount of ticks needed without
  89. // leader to trigger a new election
  90. ElectionTick uint32
  91. // HeartbeatTick defines the amount of ticks between each
  92. // heartbeat sent to other members for health-check purposes
  93. HeartbeatTick uint32
  94. // AutoLockManagers determines whether or not managers require an unlock key
  95. // when starting from a stopped state. This configuration parameter is only
  96. // applicable when bootstrapping a new cluster for the first time.
  97. AutoLockManagers bool
  98. // UnlockKey is the key to unlock a node - used for decrypting manager TLS keys
  99. // as well as the raft data encryption key (DEK). It is applicable when
  100. // bootstrapping a cluster for the first time (it's a cluster-wide setting),
  101. // and also when loading up any raft data on disk (as a KEK for the raft DEK).
  102. UnlockKey []byte
  103. // Availability allows a user to control the current scheduling status of a node
  104. Availability api.NodeSpec_Availability
  105. // PluginGetter provides access to docker's plugin inventory.
  106. PluginGetter plugingetter.PluginGetter
  107. }
  108. // Manager is the cluster manager for Swarm.
  109. // This is the high-level object holding and initializing all the manager
  110. // subsystems.
  111. type Manager struct {
  112. config Config
  113. collector *metrics.Collector
  114. caserver *ca.Server
  115. dispatcher *dispatcher.Dispatcher
  116. logbroker *logbroker.LogBroker
  117. watchServer *watchapi.Server
  118. replicatedOrchestrator *replicated.Orchestrator
  119. globalOrchestrator *global.Orchestrator
  120. taskReaper *taskreaper.TaskReaper
  121. constraintEnforcer *constraintenforcer.ConstraintEnforcer
  122. scheduler *scheduler.Scheduler
  123. allocator *allocator.Allocator
  124. keyManager *keymanager.KeyManager
  125. server *grpc.Server
  126. localserver *grpc.Server
  127. raftNode *raft.Node
  128. dekRotator *RaftDEKManager
  129. roleManager *roleManager
  130. cancelFunc context.CancelFunc
  131. // mu is a general mutex used to coordinate starting/stopping and
  132. // leadership events.
  133. mu sync.Mutex
  134. // addrMu is a mutex that protects config.ControlAPI and config.RemoteAPI
  135. addrMu sync.Mutex
  136. started chan struct{}
  137. stopped bool
  138. remoteListener chan net.Listener
  139. controlListener chan net.Listener
  140. errServe chan error
  141. }
  142. var (
  143. leaderMetric gmetrics.Gauge
  144. )
  145. func init() {
  146. ns := gmetrics.NewNamespace("swarm", "manager", nil)
  147. leaderMetric = ns.NewGauge("leader", "Indicates if this manager node is a leader", "")
  148. gmetrics.Register(ns)
  149. }
  150. type closeOnceListener struct {
  151. once sync.Once
  152. net.Listener
  153. }
  154. func (l *closeOnceListener) Close() error {
  155. var err error
  156. l.once.Do(func() {
  157. err = l.Listener.Close()
  158. })
  159. return err
  160. }
  161. // New creates a Manager which has not started to accept requests yet.
  162. func New(config *Config) (*Manager, error) {
  163. err := os.MkdirAll(config.StateDir, 0700)
  164. if err != nil {
  165. return nil, errors.Wrap(err, "failed to create state directory")
  166. }
  167. raftStateDir := filepath.Join(config.StateDir, "raft")
  168. err = os.MkdirAll(raftStateDir, 0700)
  169. if err != nil {
  170. return nil, errors.Wrap(err, "failed to create raft state directory")
  171. }
  172. raftCfg := raft.DefaultNodeConfig()
  173. if config.ElectionTick > 0 {
  174. raftCfg.ElectionTick = int(config.ElectionTick)
  175. }
  176. if config.HeartbeatTick > 0 {
  177. raftCfg.HeartbeatTick = int(config.HeartbeatTick)
  178. }
  179. dekRotator, err := NewRaftDEKManager(config.SecurityConfig.KeyWriter())
  180. if err != nil {
  181. return nil, err
  182. }
  183. newNodeOpts := raft.NodeOptions{
  184. ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
  185. JoinAddr: config.JoinRaft,
  186. ForceJoin: config.ForceJoin,
  187. Config: raftCfg,
  188. StateDir: raftStateDir,
  189. ForceNewCluster: config.ForceNewCluster,
  190. TLSCredentials: config.SecurityConfig.ClientTLSCreds,
  191. KeyRotator: dekRotator,
  192. }
  193. raftNode := raft.NewNode(newNodeOpts)
  194. opts := []grpc.ServerOption{
  195. grpc.Creds(config.SecurityConfig.ServerTLSCreds),
  196. grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
  197. grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
  198. grpc.MaxMsgSize(transport.GRPCMaxMsgSize),
  199. }
  200. m := &Manager{
  201. config: *config,
  202. caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig),
  203. dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig(), drivers.New(config.PluginGetter), config.SecurityConfig),
  204. logbroker: logbroker.New(raftNode.MemoryStore()),
  205. watchServer: watchapi.NewServer(raftNode.MemoryStore()),
  206. server: grpc.NewServer(opts...),
  207. localserver: grpc.NewServer(opts...),
  208. raftNode: raftNode,
  209. started: make(chan struct{}),
  210. dekRotator: dekRotator,
  211. remoteListener: make(chan net.Listener, 1),
  212. controlListener: make(chan net.Listener, 1),
  213. errServe: make(chan error, 2),
  214. }
  215. if config.ControlAPI != "" {
  216. m.config.ControlAPI = ""
  217. if err := m.BindControl(config.ControlAPI); err != nil {
  218. return nil, err
  219. }
  220. }
  221. if config.RemoteAPI != nil {
  222. m.config.RemoteAPI = nil
  223. // The context isn't used in this case (before (*Manager).Run).
  224. if err := m.BindRemote(context.Background(), *config.RemoteAPI); err != nil {
  225. if config.ControlAPI != "" {
  226. l := <-m.controlListener
  227. l.Close()
  228. }
  229. return nil, err
  230. }
  231. }
  232. return m, nil
  233. }
  234. // BindControl binds a local socket for the control API.
  235. func (m *Manager) BindControl(addr string) error {
  236. m.addrMu.Lock()
  237. defer m.addrMu.Unlock()
  238. if m.config.ControlAPI != "" {
  239. return errors.New("manager already has a control API address")
  240. }
  241. // don't create a socket directory if we're on windows. we used named pipe
  242. if runtime.GOOS != "windows" {
  243. err := os.MkdirAll(filepath.Dir(addr), 0700)
  244. if err != nil {
  245. return errors.Wrap(err, "failed to create socket directory")
  246. }
  247. }
  248. l, err := xnet.ListenLocal(addr)
  249. // A unix socket may fail to bind if the file already
  250. // exists. Try replacing the file.
  251. if runtime.GOOS != "windows" {
  252. unwrappedErr := err
  253. if op, ok := unwrappedErr.(*net.OpError); ok {
  254. unwrappedErr = op.Err
  255. }
  256. if sys, ok := unwrappedErr.(*os.SyscallError); ok {
  257. unwrappedErr = sys.Err
  258. }
  259. if unwrappedErr == syscall.EADDRINUSE {
  260. os.Remove(addr)
  261. l, err = xnet.ListenLocal(addr)
  262. }
  263. }
  264. if err != nil {
  265. return errors.Wrap(err, "failed to listen on control API address")
  266. }
  267. m.config.ControlAPI = addr
  268. m.controlListener <- l
  269. return nil
  270. }
  271. // BindRemote binds a port for the remote API.
  272. func (m *Manager) BindRemote(ctx context.Context, addrs RemoteAddrs) error {
  273. m.addrMu.Lock()
  274. defer m.addrMu.Unlock()
  275. if m.config.RemoteAPI != nil {
  276. return errors.New("manager already has remote API address")
  277. }
  278. // If an AdvertiseAddr was specified, we use that as our
  279. // externally-reachable address.
  280. advertiseAddr := addrs.AdvertiseAddr
  281. var advertiseAddrPort string
  282. if advertiseAddr == "" {
  283. // Otherwise, we know we are joining an existing swarm. Use a
  284. // wildcard address to trigger remote autodetection of our
  285. // address.
  286. var err error
  287. _, advertiseAddrPort, err = net.SplitHostPort(addrs.ListenAddr)
  288. if err != nil {
  289. return fmt.Errorf("missing or invalid listen address %s", addrs.ListenAddr)
  290. }
  291. // Even with an IPv6 listening address, it's okay to use
  292. // 0.0.0.0 here. Any "unspecified" (wildcard) IP will
  293. // be substituted with the actual source address.
  294. advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort)
  295. }
  296. l, err := net.Listen("tcp", addrs.ListenAddr)
  297. if err != nil {
  298. return errors.Wrap(err, "failed to listen on remote API address")
  299. }
  300. if advertiseAddrPort == "0" {
  301. advertiseAddr = l.Addr().String()
  302. addrs.ListenAddr = advertiseAddr
  303. }
  304. m.config.RemoteAPI = &addrs
  305. m.raftNode.SetAddr(ctx, advertiseAddr)
  306. m.remoteListener <- l
  307. return nil
  308. }
  309. // RemovedFromRaft returns a channel that's closed if the manager is removed
  310. // from the raft cluster. This should be used to trigger a manager shutdown.
  311. func (m *Manager) RemovedFromRaft() <-chan struct{} {
  312. return m.raftNode.RemovedFromRaft
  313. }
  314. // Addr returns tcp address on which remote api listens.
  315. func (m *Manager) Addr() string {
  316. m.addrMu.Lock()
  317. defer m.addrMu.Unlock()
  318. if m.config.RemoteAPI == nil {
  319. return ""
  320. }
  321. return m.config.RemoteAPI.ListenAddr
  322. }
  323. // Run starts all manager sub-systems and the gRPC server at the configured
  324. // address.
  325. // The call never returns unless an error occurs or `Stop()` is called.
  326. func (m *Manager) Run(parent context.Context) error {
  327. ctx, ctxCancel := context.WithCancel(parent)
  328. defer ctxCancel()
  329. m.cancelFunc = ctxCancel
  330. leadershipCh, cancel := m.raftNode.SubscribeLeadership()
  331. defer cancel()
  332. go m.handleLeadershipEvents(ctx, leadershipCh)
  333. authorize := func(ctx context.Context, roles []string) error {
  334. var (
  335. blacklistedCerts map[string]*api.BlacklistedCertificate
  336. clusters []*api.Cluster
  337. err error
  338. )
  339. m.raftNode.MemoryStore().View(func(readTx store.ReadTx) {
  340. clusters, err = store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
  341. })
  342. // Not having a cluster object yet means we can't check
  343. // the blacklist.
  344. if err == nil && len(clusters) == 1 {
  345. blacklistedCerts = clusters[0].BlacklistedCertificates
  346. }
  347. // Authorize the remote roles, ensure they can only be forwarded by managers
  348. _, err = ca.AuthorizeForwardedRoleAndOrg(ctx, roles, []string{ca.ManagerRole}, m.config.SecurityConfig.ClientTLSCreds.Organization(), blacklistedCerts)
  349. return err
  350. }
  351. baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.config.PluginGetter, drivers.New(m.config.PluginGetter))
  352. baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore())
  353. healthServer := health.NewHealthServer()
  354. localHealthServer := health.NewHealthServer()
  355. authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
  356. authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(m.watchServer, authorize)
  357. authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
  358. authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize)
  359. authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize)
  360. authenticatedDispatcherAPI := api.NewAuthenticatedWrapperDispatcherServer(m.dispatcher, authorize)
  361. authenticatedCAAPI := api.NewAuthenticatedWrapperCAServer(m.caserver, authorize)
  362. authenticatedNodeCAAPI := api.NewAuthenticatedWrapperNodeCAServer(m.caserver, authorize)
  363. authenticatedRaftAPI := api.NewAuthenticatedWrapperRaftServer(m.raftNode, authorize)
  364. authenticatedHealthAPI := api.NewAuthenticatedWrapperHealthServer(healthServer, authorize)
  365. authenticatedRaftMembershipAPI := api.NewAuthenticatedWrapperRaftMembershipServer(m.raftNode, authorize)
  366. proxyDispatcherAPI := api.NewRaftProxyDispatcherServer(authenticatedDispatcherAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  367. proxyCAAPI := api.NewRaftProxyCAServer(authenticatedCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  368. proxyNodeCAAPI := api.NewRaftProxyNodeCAServer(authenticatedNodeCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  369. proxyRaftMembershipAPI := api.NewRaftProxyRaftMembershipServer(authenticatedRaftMembershipAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  370. proxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(authenticatedResourceAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  371. proxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(authenticatedLogBrokerAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  372. // The following local proxies are only wired up to receive requests
  373. // from a trusted local socket, and these requests don't use TLS,
  374. // therefore the requests they handle locally should bypass
  375. // authorization. When requests are proxied from these servers, they
  376. // are sent as requests from this manager rather than forwarded
  377. // requests (it has no TLS information to put in the metadata map).
  378. forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
  379. handleRequestLocally := func(ctx context.Context) (context.Context, error) {
  380. remoteAddr := "127.0.0.1:0"
  381. m.addrMu.Lock()
  382. if m.config.RemoteAPI != nil {
  383. if m.config.RemoteAPI.AdvertiseAddr != "" {
  384. remoteAddr = m.config.RemoteAPI.AdvertiseAddr
  385. } else {
  386. remoteAddr = m.config.RemoteAPI.ListenAddr
  387. }
  388. }
  389. m.addrMu.Unlock()
  390. creds := m.config.SecurityConfig.ClientTLSCreds
  391. nodeInfo := ca.RemoteNodeInfo{
  392. Roles: []string{creds.Role()},
  393. Organization: creds.Organization(),
  394. NodeID: creds.NodeID(),
  395. RemoteAddr: remoteAddr,
  396. }
  397. return context.WithValue(ctx, ca.LocalRequestKey, nodeInfo), nil
  398. }
  399. localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  400. localProxyLogsAPI := api.NewRaftProxyLogsServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  401. localProxyDispatcherAPI := api.NewRaftProxyDispatcherServer(m.dispatcher, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  402. localProxyCAAPI := api.NewRaftProxyCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  403. localProxyNodeCAAPI := api.NewRaftProxyNodeCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  404. localProxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(baseResourceAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  405. localProxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  406. // Everything registered on m.server should be an authenticated
  407. // wrapper, or a proxy wrapping an authenticated wrapper!
  408. api.RegisterCAServer(m.server, proxyCAAPI)
  409. api.RegisterNodeCAServer(m.server, proxyNodeCAAPI)
  410. api.RegisterRaftServer(m.server, authenticatedRaftAPI)
  411. api.RegisterHealthServer(m.server, authenticatedHealthAPI)
  412. api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI)
  413. api.RegisterControlServer(m.server, authenticatedControlAPI)
  414. api.RegisterWatchServer(m.server, authenticatedWatchAPI)
  415. api.RegisterLogsServer(m.server, authenticatedLogsServerAPI)
  416. api.RegisterLogBrokerServer(m.server, proxyLogBrokerAPI)
  417. api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI)
  418. api.RegisterDispatcherServer(m.server, proxyDispatcherAPI)
  419. grpc_prometheus.Register(m.server)
  420. api.RegisterControlServer(m.localserver, localProxyControlAPI)
  421. api.RegisterWatchServer(m.localserver, m.watchServer)
  422. api.RegisterLogsServer(m.localserver, localProxyLogsAPI)
  423. api.RegisterHealthServer(m.localserver, localHealthServer)
  424. api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI)
  425. api.RegisterCAServer(m.localserver, localProxyCAAPI)
  426. api.RegisterNodeCAServer(m.localserver, localProxyNodeCAAPI)
  427. api.RegisterResourceAllocatorServer(m.localserver, localProxyResourceAPI)
  428. api.RegisterLogBrokerServer(m.localserver, localProxyLogBrokerAPI)
  429. grpc_prometheus.Register(m.localserver)
  430. healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING)
  431. localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)
  432. if err := m.watchServer.Start(ctx); err != nil {
  433. log.G(ctx).WithError(err).Error("watch server failed to start")
  434. }
  435. go m.serveListener(ctx, m.remoteListener)
  436. go m.serveListener(ctx, m.controlListener)
  437. defer func() {
  438. m.server.Stop()
  439. m.localserver.Stop()
  440. }()
  441. // Set the raft server as serving for the health server
  442. healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING)
  443. if err := m.raftNode.JoinAndStart(ctx); err != nil {
  444. // Don't block future calls to Stop.
  445. close(m.started)
  446. return errors.Wrap(err, "can't initialize raft node")
  447. }
  448. localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING)
  449. // Start metrics collection.
  450. m.collector = metrics.NewCollector(m.raftNode.MemoryStore())
  451. go func(collector *metrics.Collector) {
  452. if err := collector.Run(ctx); err != nil {
  453. log.G(ctx).WithError(err).Error("collector failed with an error")
  454. }
  455. }(m.collector)
  456. close(m.started)
  457. go func() {
  458. err := m.raftNode.Run(ctx)
  459. if err != nil {
  460. log.G(ctx).WithError(err).Error("raft node stopped")
  461. m.Stop(ctx, false)
  462. }
  463. }()
  464. if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
  465. return err
  466. }
  467. c, err := raft.WaitForCluster(ctx, m.raftNode)
  468. if err != nil {
  469. return err
  470. }
  471. raftConfig := c.Spec.Raft
  472. if err := m.watchForClusterChanges(ctx); err != nil {
  473. return err
  474. }
  475. if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick {
  476. log.G(ctx).Warningf("election tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.ElectionTick, raftConfig.ElectionTick)
  477. }
  478. if int(raftConfig.HeartbeatTick) != m.raftNode.Config.HeartbeatTick {
  479. log.G(ctx).Warningf("heartbeat tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.HeartbeatTick, raftConfig.HeartbeatTick)
  480. }
  481. // wait for an error in serving.
  482. err = <-m.errServe
  483. m.mu.Lock()
  484. if m.stopped {
  485. m.mu.Unlock()
  486. return nil
  487. }
  488. m.mu.Unlock()
  489. m.Stop(ctx, false)
  490. return err
  491. }
  492. const stopTimeout = 8 * time.Second
  493. // Stop stops the manager. It immediately closes all open connections and
  494. // active RPCs as well as stopping the manager's subsystems. If clearData is
  495. // set, the raft logs, snapshots, and keys will be erased.
  496. func (m *Manager) Stop(ctx context.Context, clearData bool) {
  497. log.G(ctx).Info("Stopping manager")
  498. // It's not safe to start shutting down while the manager is still
  499. // starting up.
  500. <-m.started
  501. // the mutex stops us from trying to stop while we're already stopping, or
  502. // from returning before we've finished stopping.
  503. m.mu.Lock()
  504. defer m.mu.Unlock()
  505. if m.stopped {
  506. return
  507. }
  508. m.stopped = true
  509. srvDone, localSrvDone := make(chan struct{}), make(chan struct{})
  510. go func() {
  511. m.server.GracefulStop()
  512. close(srvDone)
  513. }()
  514. go func() {
  515. m.localserver.GracefulStop()
  516. close(localSrvDone)
  517. }()
  518. m.raftNode.Cancel()
  519. if m.collector != nil {
  520. m.collector.Stop()
  521. }
  522. m.dispatcher.Stop()
  523. m.logbroker.Stop()
  524. m.watchServer.Stop()
  525. m.caserver.Stop()
  526. if m.allocator != nil {
  527. m.allocator.Stop()
  528. }
  529. if m.replicatedOrchestrator != nil {
  530. m.replicatedOrchestrator.Stop()
  531. }
  532. if m.globalOrchestrator != nil {
  533. m.globalOrchestrator.Stop()
  534. }
  535. if m.taskReaper != nil {
  536. m.taskReaper.Stop()
  537. }
  538. if m.constraintEnforcer != nil {
  539. m.constraintEnforcer.Stop()
  540. }
  541. if m.scheduler != nil {
  542. m.scheduler.Stop()
  543. }
  544. if m.roleManager != nil {
  545. m.roleManager.Stop()
  546. }
  547. if m.keyManager != nil {
  548. m.keyManager.Stop()
  549. }
  550. if clearData {
  551. m.raftNode.ClearData()
  552. }
  553. m.cancelFunc()
  554. <-m.raftNode.Done()
  555. timer := time.AfterFunc(stopTimeout, func() {
  556. m.server.Stop()
  557. m.localserver.Stop()
  558. })
  559. defer timer.Stop()
  560. // TODO: we're not waiting on ctx because it very well could be passed from Run,
  561. // which is already cancelled here. We need to refactor that.
  562. select {
  563. case <-srvDone:
  564. <-localSrvDone
  565. case <-localSrvDone:
  566. <-srvDone
  567. }
  568. log.G(ctx).Info("Manager shut down")
  569. // mutex is released and Run can return now
  570. }
  571. func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
  572. securityConfig := m.config.SecurityConfig
  573. nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
  574. logger := log.G(ctx).WithFields(logrus.Fields{
  575. "node.id": nodeID,
  576. "node.role": ca.ManagerRole,
  577. })
  578. kekData := ca.KEKData{Version: cluster.Meta.Version.Index}
  579. for _, encryptionKey := range cluster.UnlockKeys {
  580. if encryptionKey.Subsystem == ca.ManagerRole {
  581. kekData.KEK = encryptionKey.Key
  582. break
  583. }
  584. }
  585. updated, unlockedToLocked, err := m.dekRotator.MaybeUpdateKEK(kekData)
  586. if err != nil {
  587. logger.WithError(err).Errorf("failed to re-encrypt TLS key with a new KEK")
  588. return err
  589. }
  590. if updated {
  591. logger.Debug("successfully rotated KEK")
  592. }
  593. if unlockedToLocked {
  594. // a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews;
  595. // don't wait because it might take a bit
  596. go func() {
  597. insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
  598. conn, err := grpc.Dial(
  599. m.config.ControlAPI,
  600. grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
  601. grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
  602. grpc.WithTransportCredentials(insecureCreds),
  603. grpc.WithDialer(
  604. func(addr string, timeout time.Duration) (net.Conn, error) {
  605. return xnet.DialTimeoutLocal(addr, timeout)
  606. }),
  607. )
  608. if err != nil {
  609. logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster")
  610. return
  611. }
  612. defer conn.Close()
  613. connBroker := connectionbroker.New(remotes.NewRemotes())
  614. connBroker.SetLocalConn(conn)
  615. if err := ca.RenewTLSConfigNow(ctx, securityConfig, connBroker, m.config.RootCAPaths); err != nil {
  616. logger.WithError(err).Error("failed to download new TLS certificate after locking the cluster")
  617. }
  618. }()
  619. }
  620. return nil
  621. }
  622. func (m *Manager) watchForClusterChanges(ctx context.Context) error {
  623. clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
  624. var cluster *api.Cluster
  625. clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(),
  626. func(tx store.ReadTx) error {
  627. cluster = store.GetCluster(tx, clusterID)
  628. if cluster == nil {
  629. return fmt.Errorf("unable to get current cluster")
  630. }
  631. return nil
  632. },
  633. api.EventUpdateCluster{
  634. Cluster: &api.Cluster{ID: clusterID},
  635. Checks: []api.ClusterCheckFunc{api.ClusterCheckID},
  636. },
  637. )
  638. if err != nil {
  639. return err
  640. }
  641. if err := m.updateKEK(ctx, cluster); err != nil {
  642. return err
  643. }
  644. go func() {
  645. for {
  646. select {
  647. case event := <-clusterWatch:
  648. clusterEvent := event.(api.EventUpdateCluster)
  649. m.updateKEK(ctx, clusterEvent.Cluster)
  650. case <-ctx.Done():
  651. clusterWatchCancel()
  652. return
  653. }
  654. }
  655. }()
  656. return nil
  657. }
  658. // rotateRootCAKEK will attempt to rotate the key-encryption-key for root CA key-material in raft.
  659. // If there is no passphrase set in ENV, it returns.
  660. // If there is plain-text root key-material, and a passphrase set, it encrypts it.
  661. // If there is encrypted root key-material and it is using the current passphrase, it returns.
  662. // If there is encrypted root key-material, and it is using the previous passphrase, it
  663. // re-encrypts it with the current passphrase.
  664. func (m *Manager) rotateRootCAKEK(ctx context.Context, clusterID string) error {
  665. // If we don't have a KEK, we won't ever be rotating anything
  666. strPassphrase := os.Getenv(ca.PassphraseENVVar)
  667. strPassphrasePrev := os.Getenv(ca.PassphraseENVVarPrev)
  668. if strPassphrase == "" && strPassphrasePrev == "" {
  669. return nil
  670. }
  671. if strPassphrase != "" {
  672. log.G(ctx).Warn("Encrypting the root CA key in swarm using environment variables is deprecated. " +
  673. "Support for decrypting or rotating the key will be removed in the future.")
  674. }
  675. passphrase := []byte(strPassphrase)
  676. passphrasePrev := []byte(strPassphrasePrev)
  677. s := m.raftNode.MemoryStore()
  678. var (
  679. cluster *api.Cluster
  680. err error
  681. finalKey []byte
  682. )
  683. // Retrieve the cluster identified by ClusterID
  684. return s.Update(func(tx store.Tx) error {
  685. cluster = store.GetCluster(tx, clusterID)
  686. if cluster == nil {
  687. return fmt.Errorf("cluster not found: %s", clusterID)
  688. }
  689. // Try to get the private key from the cluster
  690. privKeyPEM := cluster.RootCA.CAKey
  691. if len(privKeyPEM) == 0 {
  692. // We have no PEM root private key in this cluster.
  693. log.G(ctx).Warnf("cluster %s does not have private key material", clusterID)
  694. return nil
  695. }
  696. // Decode the PEM private key
  697. keyBlock, _ := pem.Decode(privKeyPEM)
  698. if keyBlock == nil {
  699. return fmt.Errorf("invalid PEM-encoded private key inside of cluster %s", clusterID)
  700. }
  701. if x509.IsEncryptedPEMBlock(keyBlock) {
  702. // PEM encryption does not have a digest, so sometimes decryption doesn't
  703. // error even with the wrong passphrase. So actually try to parse it into a valid key.
  704. _, err := helpers.ParsePrivateKeyPEMWithPassword(privKeyPEM, []byte(passphrase))
  705. if err == nil {
  706. // This key is already correctly encrypted with the correct KEK, nothing to do here
  707. return nil
  708. }
  709. // This key is already encrypted, but failed with current main passphrase.
  710. // Let's try to decrypt with the previous passphrase, and parse into a valid key, for the
  711. // same reason as above.
  712. _, err = helpers.ParsePrivateKeyPEMWithPassword(privKeyPEM, []byte(passphrasePrev))
  713. if err != nil {
  714. // We were not able to decrypt either with the main or backup passphrase, error
  715. return err
  716. }
  717. // ok the above passphrase is correct, so decrypt the PEM block so we can re-encrypt -
  718. // since the key was successfully decrypted above, there will be no error doing PEM
  719. // decryption
  720. unencryptedDER, _ := x509.DecryptPEMBlock(keyBlock, []byte(passphrasePrev))
  721. unencryptedKeyBlock := &pem.Block{
  722. Type: keyBlock.Type,
  723. Bytes: unencryptedDER,
  724. }
  725. // we were able to decrypt the key with the previous passphrase - if the current passphrase is empty,
  726. // the we store the decrypted key in raft
  727. finalKey = pem.EncodeToMemory(unencryptedKeyBlock)
  728. // the current passphrase is not empty, so let's encrypt with the new one and store it in raft
  729. if strPassphrase != "" {
  730. finalKey, err = ca.EncryptECPrivateKey(finalKey, strPassphrase)
  731. if err != nil {
  732. log.G(ctx).WithError(err).Debugf("failed to rotate the key-encrypting-key for the root key material of cluster %s", clusterID)
  733. return err
  734. }
  735. }
  736. } else if strPassphrase != "" {
  737. // If this key is not encrypted, and the passphrase is not nil, then we have to encrypt it
  738. finalKey, err = ca.EncryptECPrivateKey(privKeyPEM, strPassphrase)
  739. if err != nil {
  740. log.G(ctx).WithError(err).Debugf("failed to rotate the key-encrypting-key for the root key material of cluster %s", clusterID)
  741. return err
  742. }
  743. } else {
  744. return nil // don't update if it's not encrypted and we don't want it encrypted
  745. }
  746. log.G(ctx).Infof("Updating the encryption on the root key material of cluster %s", clusterID)
  747. cluster.RootCA.CAKey = finalKey
  748. return store.UpdateCluster(tx, cluster)
  749. })
  750. }
  751. // handleLeadershipEvents handles the is leader event or is follower event.
  752. func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan events.Event) {
  753. for {
  754. select {
  755. case leadershipEvent := <-leadershipCh:
  756. m.mu.Lock()
  757. if m.stopped {
  758. m.mu.Unlock()
  759. return
  760. }
  761. newState := leadershipEvent.(raft.LeadershipState)
  762. if newState == raft.IsLeader {
  763. m.becomeLeader(ctx)
  764. leaderMetric.Set(1)
  765. } else if newState == raft.IsFollower {
  766. m.becomeFollower()
  767. leaderMetric.Set(0)
  768. }
  769. m.mu.Unlock()
  770. case <-ctx.Done():
  771. return
  772. }
  773. }
  774. }
  775. // serveListener serves a listener for local and non local connections.
  776. func (m *Manager) serveListener(ctx context.Context, lCh <-chan net.Listener) {
  777. var l net.Listener
  778. select {
  779. case l = <-lCh:
  780. case <-ctx.Done():
  781. return
  782. }
  783. ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
  784. logrus.Fields{
  785. "proto": l.Addr().Network(),
  786. "addr": l.Addr().String(),
  787. }))
  788. if _, ok := l.(*net.TCPListener); !ok {
  789. log.G(ctx).Info("Listening for local connections")
  790. // we need to disallow double closes because UnixListener.Close
  791. // can delete unix-socket file of newer listener. grpc calls
  792. // Close twice indeed: in Serve and in Stop.
  793. m.errServe <- m.localserver.Serve(&closeOnceListener{Listener: l})
  794. } else {
  795. log.G(ctx).Info("Listening for connections")
  796. m.errServe <- m.server.Serve(l)
  797. }
  798. }
  799. // becomeLeader starts the subsystems that are run on the leader.
  800. func (m *Manager) becomeLeader(ctx context.Context) {
  801. s := m.raftNode.MemoryStore()
  802. rootCA := m.config.SecurityConfig.RootCA()
  803. nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
  804. raftCfg := raft.DefaultRaftConfig()
  805. raftCfg.ElectionTick = uint32(m.raftNode.Config.ElectionTick)
  806. raftCfg.HeartbeatTick = uint32(m.raftNode.Config.HeartbeatTick)
  807. clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
  808. initialCAConfig := ca.DefaultCAConfig()
  809. initialCAConfig.ExternalCAs = m.config.ExternalCAs
  810. var unlockKeys []*api.EncryptionKey
  811. if m.config.AutoLockManagers {
  812. unlockKeys = []*api.EncryptionKey{{
  813. Subsystem: ca.ManagerRole,
  814. Key: m.config.UnlockKey,
  815. }}
  816. }
  817. s.Update(func(tx store.Tx) error {
  818. // Add a default cluster object to the
  819. // store. Don't check the error because
  820. // we expect this to fail unless this
  821. // is a brand new cluster.
  822. err := store.CreateCluster(tx, defaultClusterObject(
  823. clusterID,
  824. initialCAConfig,
  825. raftCfg,
  826. api.EncryptionConfig{AutoLockManagers: m.config.AutoLockManagers},
  827. unlockKeys,
  828. rootCA))
  829. if err != nil && err != store.ErrExist {
  830. log.G(ctx).WithError(err).Errorf("error creating cluster object")
  831. }
  832. // Add Node entry for ourself, if one
  833. // doesn't exist already.
  834. freshCluster := nil == store.CreateNode(tx, managerNode(nodeID, m.config.Availability))
  835. if freshCluster {
  836. // This is a fresh swarm cluster. Add to store now any initial
  837. // cluster resource, like the default ingress network which
  838. // provides the routing mesh for this cluster.
  839. log.G(ctx).Info("Creating default ingress network")
  840. if err := store.CreateNetwork(tx, newIngressNetwork()); err != nil {
  841. log.G(ctx).WithError(err).Error("failed to create default ingress network")
  842. }
  843. }
  844. // Create now the static predefined if the store does not contain predefined
  845. //networks like bridge/host node-local networks which
  846. // are known to be present in each cluster node. This is needed
  847. // in order to allow running services on the predefined docker
  848. // networks like `bridge` and `host`.
  849. for _, p := range allocator.PredefinedNetworks() {
  850. if store.GetNetwork(tx, p.Name) == nil {
  851. if err := store.CreateNetwork(tx, newPredefinedNetwork(p.Name, p.Driver)); err != nil {
  852. log.G(ctx).WithError(err).Error("failed to create predefined network " + p.Name)
  853. }
  854. }
  855. }
  856. return nil
  857. })
  858. // Attempt to rotate the key-encrypting-key of the root CA key-material
  859. err := m.rotateRootCAKEK(ctx, clusterID)
  860. if err != nil {
  861. log.G(ctx).WithError(err).Error("root key-encrypting-key rotation failed")
  862. }
  863. m.replicatedOrchestrator = replicated.NewReplicatedOrchestrator(s)
  864. m.constraintEnforcer = constraintenforcer.New(s)
  865. m.globalOrchestrator = global.NewGlobalOrchestrator(s)
  866. m.taskReaper = taskreaper.New(s)
  867. m.scheduler = scheduler.New(s)
  868. m.keyManager = keymanager.New(s, keymanager.DefaultConfig())
  869. m.roleManager = newRoleManager(s, m.raftNode)
  870. // TODO(stevvooe): Allocate a context that can be used to
  871. // shutdown underlying manager processes when leadership is
  872. // lost.
  873. m.allocator, err = allocator.New(s, m.config.PluginGetter)
  874. if err != nil {
  875. log.G(ctx).WithError(err).Error("failed to create allocator")
  876. // TODO(stevvooe): It doesn't seem correct here to fail
  877. // creating the allocator but then use it anyway.
  878. }
  879. if m.keyManager != nil {
  880. go func(keyManager *keymanager.KeyManager) {
  881. if err := keyManager.Run(ctx); err != nil {
  882. log.G(ctx).WithError(err).Error("keymanager failed with an error")
  883. }
  884. }(m.keyManager)
  885. }
  886. go func(d *dispatcher.Dispatcher) {
  887. if err := d.Run(ctx); err != nil {
  888. log.G(ctx).WithError(err).Error("Dispatcher exited with an error")
  889. }
  890. }(m.dispatcher)
  891. if err := m.logbroker.Start(ctx); err != nil {
  892. log.G(ctx).WithError(err).Error("LogBroker failed to start")
  893. }
  894. go func(server *ca.Server) {
  895. if err := server.Run(ctx); err != nil {
  896. log.G(ctx).WithError(err).Error("CA signer exited with an error")
  897. }
  898. }(m.caserver)
  899. // Start all sub-components in separate goroutines.
  900. // TODO(aluzzardi): This should have some kind of error handling so that
  901. // any component that goes down would bring the entire manager down.
  902. if m.allocator != nil {
  903. go func(allocator *allocator.Allocator) {
  904. if err := allocator.Run(ctx); err != nil {
  905. log.G(ctx).WithError(err).Error("allocator exited with an error")
  906. }
  907. }(m.allocator)
  908. }
  909. go func(scheduler *scheduler.Scheduler) {
  910. if err := scheduler.Run(ctx); err != nil {
  911. log.G(ctx).WithError(err).Error("scheduler exited with an error")
  912. }
  913. }(m.scheduler)
  914. go func(constraintEnforcer *constraintenforcer.ConstraintEnforcer) {
  915. constraintEnforcer.Run()
  916. }(m.constraintEnforcer)
  917. go func(taskReaper *taskreaper.TaskReaper) {
  918. taskReaper.Run(ctx)
  919. }(m.taskReaper)
  920. go func(orchestrator *replicated.Orchestrator) {
  921. if err := orchestrator.Run(ctx); err != nil {
  922. log.G(ctx).WithError(err).Error("replicated orchestrator exited with an error")
  923. }
  924. }(m.replicatedOrchestrator)
  925. go func(globalOrchestrator *global.Orchestrator) {
  926. if err := globalOrchestrator.Run(ctx); err != nil {
  927. log.G(ctx).WithError(err).Error("global orchestrator exited with an error")
  928. }
  929. }(m.globalOrchestrator)
  930. go func(roleManager *roleManager) {
  931. roleManager.Run(ctx)
  932. }(m.roleManager)
  933. }
  934. // becomeFollower shuts down the subsystems that are only run by the leader.
  935. func (m *Manager) becomeFollower() {
  936. m.dispatcher.Stop()
  937. m.logbroker.Stop()
  938. m.caserver.Stop()
  939. if m.allocator != nil {
  940. m.allocator.Stop()
  941. m.allocator = nil
  942. }
  943. m.constraintEnforcer.Stop()
  944. m.constraintEnforcer = nil
  945. m.replicatedOrchestrator.Stop()
  946. m.replicatedOrchestrator = nil
  947. m.globalOrchestrator.Stop()
  948. m.globalOrchestrator = nil
  949. m.taskReaper.Stop()
  950. m.taskReaper = nil
  951. m.scheduler.Stop()
  952. m.scheduler = nil
  953. m.roleManager.Stop()
  954. m.roleManager = nil
  955. if m.keyManager != nil {
  956. m.keyManager.Stop()
  957. m.keyManager = nil
  958. }
  959. }
  960. // defaultClusterObject creates a default cluster.
  961. func defaultClusterObject(
  962. clusterID string,
  963. initialCAConfig api.CAConfig,
  964. raftCfg api.RaftConfig,
  965. encryptionConfig api.EncryptionConfig,
  966. initialUnlockKeys []*api.EncryptionKey,
  967. rootCA *ca.RootCA) *api.Cluster {
  968. var caKey []byte
  969. if rcaSigner, err := rootCA.Signer(); err == nil {
  970. caKey = rcaSigner.Key
  971. }
  972. return &api.Cluster{
  973. ID: clusterID,
  974. Spec: api.ClusterSpec{
  975. Annotations: api.Annotations{
  976. Name: store.DefaultClusterName,
  977. },
  978. Orchestration: api.OrchestrationConfig{
  979. TaskHistoryRetentionLimit: defaultTaskHistoryRetentionLimit,
  980. },
  981. Dispatcher: api.DispatcherConfig{
  982. HeartbeatPeriod: gogotypes.DurationProto(dispatcher.DefaultHeartBeatPeriod),
  983. },
  984. Raft: raftCfg,
  985. CAConfig: initialCAConfig,
  986. EncryptionConfig: encryptionConfig,
  987. },
  988. RootCA: api.RootCA{
  989. CAKey: caKey,
  990. CACert: rootCA.Certs,
  991. CACertHash: rootCA.Digest.String(),
  992. JoinTokens: api.JoinTokens{
  993. Worker: ca.GenerateJoinToken(rootCA),
  994. Manager: ca.GenerateJoinToken(rootCA),
  995. },
  996. },
  997. UnlockKeys: initialUnlockKeys,
  998. }
  999. }
  1000. // managerNode creates a new node with NodeRoleManager role.
  1001. func managerNode(nodeID string, availability api.NodeSpec_Availability) *api.Node {
  1002. return &api.Node{
  1003. ID: nodeID,
  1004. Certificate: api.Certificate{
  1005. CN: nodeID,
  1006. Role: api.NodeRoleManager,
  1007. Status: api.IssuanceStatus{
  1008. State: api.IssuanceStateIssued,
  1009. },
  1010. },
  1011. Spec: api.NodeSpec{
  1012. DesiredRole: api.NodeRoleManager,
  1013. Membership: api.NodeMembershipAccepted,
  1014. Availability: availability,
  1015. },
  1016. }
  1017. }
  1018. // newIngressNetwork returns the network object for the default ingress
  1019. // network, the network which provides the routing mesh. Caller will save to
  1020. // store this object once, at fresh cluster creation. It is expected to
  1021. // call this function inside a store update transaction.
  1022. func newIngressNetwork() *api.Network {
  1023. return &api.Network{
  1024. ID: identity.NewID(),
  1025. Spec: api.NetworkSpec{
  1026. Ingress: true,
  1027. Annotations: api.Annotations{
  1028. Name: "ingress",
  1029. },
  1030. DriverConfig: &api.Driver{},
  1031. IPAM: &api.IPAMOptions{
  1032. Driver: &api.Driver{},
  1033. Configs: []*api.IPAMConfig{
  1034. {
  1035. Subnet: "10.255.0.0/16",
  1036. },
  1037. },
  1038. },
  1039. },
  1040. }
  1041. }
  1042. // Creates a network object representing one of the predefined networks
  1043. // known to be statically created on the cluster nodes. These objects
  1044. // are populated in the store at cluster creation solely in order to
  1045. // support running services on the nodes' predefined networks.
  1046. // External clients can filter these predefined networks by looking
  1047. // at the predefined label.
  1048. func newPredefinedNetwork(name, driver string) *api.Network {
  1049. return &api.Network{
  1050. ID: identity.NewID(),
  1051. Spec: api.NetworkSpec{
  1052. Annotations: api.Annotations{
  1053. Name: name,
  1054. Labels: map[string]string{
  1055. networkallocator.PredefinedLabel: "true",
  1056. },
  1057. },
  1058. DriverConfig: &api.Driver{Name: driver},
  1059. },
  1060. }
  1061. }