manager.go 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190
  1. package manager
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "encoding/pem"
  6. "fmt"
  7. "net"
  8. "os"
  9. "path/filepath"
  10. "runtime"
  11. "sync"
  12. "syscall"
  13. "time"
  14. "github.com/Sirupsen/logrus"
  15. "github.com/cloudflare/cfssl/helpers"
  16. "github.com/docker/docker/pkg/plugingetter"
  17. "github.com/docker/go-events"
  18. "github.com/docker/swarmkit/api"
  19. "github.com/docker/swarmkit/ca"
  20. "github.com/docker/swarmkit/connectionbroker"
  21. "github.com/docker/swarmkit/identity"
  22. "github.com/docker/swarmkit/log"
  23. "github.com/docker/swarmkit/manager/allocator"
  24. "github.com/docker/swarmkit/manager/allocator/networkallocator"
  25. "github.com/docker/swarmkit/manager/controlapi"
  26. "github.com/docker/swarmkit/manager/dispatcher"
  27. "github.com/docker/swarmkit/manager/health"
  28. "github.com/docker/swarmkit/manager/keymanager"
  29. "github.com/docker/swarmkit/manager/logbroker"
  30. "github.com/docker/swarmkit/manager/metrics"
  31. "github.com/docker/swarmkit/manager/orchestrator/constraintenforcer"
  32. "github.com/docker/swarmkit/manager/orchestrator/global"
  33. "github.com/docker/swarmkit/manager/orchestrator/replicated"
  34. "github.com/docker/swarmkit/manager/orchestrator/taskreaper"
  35. "github.com/docker/swarmkit/manager/resourceapi"
  36. "github.com/docker/swarmkit/manager/scheduler"
  37. "github.com/docker/swarmkit/manager/state/raft"
  38. "github.com/docker/swarmkit/manager/state/store"
  39. "github.com/docker/swarmkit/manager/watchapi"
  40. "github.com/docker/swarmkit/remotes"
  41. "github.com/docker/swarmkit/xnet"
  42. gogotypes "github.com/gogo/protobuf/types"
  43. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  44. "github.com/pkg/errors"
  45. "golang.org/x/net/context"
  46. "google.golang.org/grpc"
  47. "google.golang.org/grpc/credentials"
  48. )
  49. const (
  50. // defaultTaskHistoryRetentionLimit is the number of tasks to keep.
  51. defaultTaskHistoryRetentionLimit = 5
  52. )
  53. // RemoteAddrs provides a listening address and an optional advertise address
  54. // for serving the remote API.
  55. type RemoteAddrs struct {
  56. // Address to bind
  57. ListenAddr string
  58. // Address to advertise to remote nodes (optional).
  59. AdvertiseAddr string
  60. }
  61. // Config is used to tune the Manager.
  62. type Config struct {
  63. SecurityConfig *ca.SecurityConfig
  64. // RootCAPaths is the path to which new root certs should be save
  65. RootCAPaths ca.CertPaths
  66. // ExternalCAs is a list of initial CAs to which a manager node
  67. // will make certificate signing requests for node certificates.
  68. ExternalCAs []*api.ExternalCA
  69. // ControlAPI is an address for serving the control API.
  70. ControlAPI string
  71. // RemoteAPI is a listening address for serving the remote API, and
  72. // an optional advertise address.
  73. RemoteAPI *RemoteAddrs
  74. // JoinRaft is an optional address of a node in an existing raft
  75. // cluster to join.
  76. JoinRaft string
  77. // Top-level state directory
  78. StateDir string
  79. // ForceNewCluster defines if we have to force a new cluster
  80. // because we are recovering from a backup data directory.
  81. ForceNewCluster bool
  82. // ElectionTick defines the amount of ticks needed without
  83. // leader to trigger a new election
  84. ElectionTick uint32
  85. // HeartbeatTick defines the amount of ticks between each
  86. // heartbeat sent to other members for health-check purposes
  87. HeartbeatTick uint32
  88. // AutoLockManagers determines whether or not managers require an unlock key
  89. // when starting from a stopped state. This configuration parameter is only
  90. // applicable when bootstrapping a new cluster for the first time.
  91. AutoLockManagers bool
  92. // UnlockKey is the key to unlock a node - used for decrypting manager TLS keys
  93. // as well as the raft data encryption key (DEK). It is applicable when
  94. // bootstrapping a cluster for the first time (it's a cluster-wide setting),
  95. // and also when loading up any raft data on disk (as a KEK for the raft DEK).
  96. UnlockKey []byte
  97. // Availability allows a user to control the current scheduling status of a node
  98. Availability api.NodeSpec_Availability
  99. // PluginGetter provides access to docker's plugin inventory.
  100. PluginGetter plugingetter.PluginGetter
  101. }
  102. // Manager is the cluster manager for Swarm.
  103. // This is the high-level object holding and initializing all the manager
  104. // subsystems.
  105. type Manager struct {
  106. config Config
  107. collector *metrics.Collector
  108. caserver *ca.Server
  109. dispatcher *dispatcher.Dispatcher
  110. logbroker *logbroker.LogBroker
  111. replicatedOrchestrator *replicated.Orchestrator
  112. globalOrchestrator *global.Orchestrator
  113. taskReaper *taskreaper.TaskReaper
  114. constraintEnforcer *constraintenforcer.ConstraintEnforcer
  115. scheduler *scheduler.Scheduler
  116. allocator *allocator.Allocator
  117. keyManager *keymanager.KeyManager
  118. server *grpc.Server
  119. localserver *grpc.Server
  120. raftNode *raft.Node
  121. dekRotator *RaftDEKManager
  122. roleManager *roleManager
  123. cancelFunc context.CancelFunc
  124. // mu is a general mutex used to coordinate starting/stopping and
  125. // leadership events.
  126. mu sync.Mutex
  127. // addrMu is a mutex that protects config.ControlAPI and config.RemoteAPI
  128. addrMu sync.Mutex
  129. started chan struct{}
  130. stopped bool
  131. remoteListener chan net.Listener
  132. controlListener chan net.Listener
  133. errServe chan error
  134. }
  135. type closeOnceListener struct {
  136. once sync.Once
  137. net.Listener
  138. }
  139. func (l *closeOnceListener) Close() error {
  140. var err error
  141. l.once.Do(func() {
  142. err = l.Listener.Close()
  143. })
  144. return err
  145. }
  146. // New creates a Manager which has not started to accept requests yet.
  147. func New(config *Config) (*Manager, error) {
  148. err := os.MkdirAll(config.StateDir, 0700)
  149. if err != nil {
  150. return nil, errors.Wrap(err, "failed to create state directory")
  151. }
  152. raftStateDir := filepath.Join(config.StateDir, "raft")
  153. err = os.MkdirAll(raftStateDir, 0700)
  154. if err != nil {
  155. return nil, errors.Wrap(err, "failed to create raft state directory")
  156. }
  157. raftCfg := raft.DefaultNodeConfig()
  158. if config.ElectionTick > 0 {
  159. raftCfg.ElectionTick = int(config.ElectionTick)
  160. }
  161. if config.HeartbeatTick > 0 {
  162. raftCfg.HeartbeatTick = int(config.HeartbeatTick)
  163. }
  164. dekRotator, err := NewRaftDEKManager(config.SecurityConfig.KeyWriter())
  165. if err != nil {
  166. return nil, err
  167. }
  168. newNodeOpts := raft.NodeOptions{
  169. ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
  170. JoinAddr: config.JoinRaft,
  171. Config: raftCfg,
  172. StateDir: raftStateDir,
  173. ForceNewCluster: config.ForceNewCluster,
  174. TLSCredentials: config.SecurityConfig.ClientTLSCreds,
  175. KeyRotator: dekRotator,
  176. }
  177. raftNode := raft.NewNode(newNodeOpts)
  178. opts := []grpc.ServerOption{
  179. grpc.Creds(config.SecurityConfig.ServerTLSCreds),
  180. grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
  181. grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
  182. }
  183. m := &Manager{
  184. config: *config,
  185. collector: metrics.NewCollector(raftNode.MemoryStore()),
  186. caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths),
  187. dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()),
  188. logbroker: logbroker.New(raftNode.MemoryStore()),
  189. server: grpc.NewServer(opts...),
  190. localserver: grpc.NewServer(opts...),
  191. raftNode: raftNode,
  192. started: make(chan struct{}),
  193. dekRotator: dekRotator,
  194. remoteListener: make(chan net.Listener, 1),
  195. controlListener: make(chan net.Listener, 1),
  196. errServe: make(chan error, 2),
  197. }
  198. if config.ControlAPI != "" {
  199. m.config.ControlAPI = ""
  200. if err := m.BindControl(config.ControlAPI); err != nil {
  201. return nil, err
  202. }
  203. }
  204. if config.RemoteAPI != nil {
  205. m.config.RemoteAPI = nil
  206. // The context isn't used in this case (before (*Manager).Run).
  207. if err := m.BindRemote(context.Background(), *config.RemoteAPI); err != nil {
  208. if config.ControlAPI != "" {
  209. l := <-m.controlListener
  210. l.Close()
  211. }
  212. return nil, err
  213. }
  214. }
  215. return m, nil
  216. }
  217. // BindControl binds a local socket for the control API.
  218. func (m *Manager) BindControl(addr string) error {
  219. m.addrMu.Lock()
  220. defer m.addrMu.Unlock()
  221. if m.config.ControlAPI != "" {
  222. return errors.New("manager already has a control API address")
  223. }
  224. // don't create a socket directory if we're on windows. we used named pipe
  225. if runtime.GOOS != "windows" {
  226. err := os.MkdirAll(filepath.Dir(addr), 0700)
  227. if err != nil {
  228. return errors.Wrap(err, "failed to create socket directory")
  229. }
  230. }
  231. l, err := xnet.ListenLocal(addr)
  232. // A unix socket may fail to bind if the file already
  233. // exists. Try replacing the file.
  234. if runtime.GOOS != "windows" {
  235. unwrappedErr := err
  236. if op, ok := unwrappedErr.(*net.OpError); ok {
  237. unwrappedErr = op.Err
  238. }
  239. if sys, ok := unwrappedErr.(*os.SyscallError); ok {
  240. unwrappedErr = sys.Err
  241. }
  242. if unwrappedErr == syscall.EADDRINUSE {
  243. os.Remove(addr)
  244. l, err = xnet.ListenLocal(addr)
  245. }
  246. }
  247. if err != nil {
  248. return errors.Wrap(err, "failed to listen on control API address")
  249. }
  250. m.config.ControlAPI = addr
  251. m.controlListener <- l
  252. return nil
  253. }
  254. // BindRemote binds a port for the remote API.
  255. func (m *Manager) BindRemote(ctx context.Context, addrs RemoteAddrs) error {
  256. m.addrMu.Lock()
  257. defer m.addrMu.Unlock()
  258. if m.config.RemoteAPI != nil {
  259. return errors.New("manager already has remote API address")
  260. }
  261. // If an AdvertiseAddr was specified, we use that as our
  262. // externally-reachable address.
  263. advertiseAddr := addrs.AdvertiseAddr
  264. var advertiseAddrPort string
  265. if advertiseAddr == "" {
  266. // Otherwise, we know we are joining an existing swarm. Use a
  267. // wildcard address to trigger remote autodetection of our
  268. // address.
  269. var err error
  270. _, advertiseAddrPort, err = net.SplitHostPort(addrs.ListenAddr)
  271. if err != nil {
  272. return fmt.Errorf("missing or invalid listen address %s", addrs.ListenAddr)
  273. }
  274. // Even with an IPv6 listening address, it's okay to use
  275. // 0.0.0.0 here. Any "unspecified" (wildcard) IP will
  276. // be substituted with the actual source address.
  277. advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort)
  278. }
  279. l, err := net.Listen("tcp", addrs.ListenAddr)
  280. if err != nil {
  281. return errors.Wrap(err, "failed to listen on remote API address")
  282. }
  283. if advertiseAddrPort == "0" {
  284. advertiseAddr = l.Addr().String()
  285. addrs.ListenAddr = advertiseAddr
  286. }
  287. m.config.RemoteAPI = &addrs
  288. m.raftNode.SetAddr(ctx, advertiseAddr)
  289. m.remoteListener <- l
  290. return nil
  291. }
  292. // RemovedFromRaft returns a channel that's closed if the manager is removed
  293. // from the raft cluster. This should be used to trigger a manager shutdown.
  294. func (m *Manager) RemovedFromRaft() <-chan struct{} {
  295. return m.raftNode.RemovedFromRaft
  296. }
  297. // Addr returns tcp address on which remote api listens.
  298. func (m *Manager) Addr() string {
  299. m.addrMu.Lock()
  300. defer m.addrMu.Unlock()
  301. if m.config.RemoteAPI == nil {
  302. return ""
  303. }
  304. return m.config.RemoteAPI.ListenAddr
  305. }
  306. // Run starts all manager sub-systems and the gRPC server at the configured
  307. // address.
  308. // The call never returns unless an error occurs or `Stop()` is called.
  309. func (m *Manager) Run(parent context.Context) error {
  310. ctx, ctxCancel := context.WithCancel(parent)
  311. defer ctxCancel()
  312. m.cancelFunc = ctxCancel
  313. leadershipCh, cancel := m.raftNode.SubscribeLeadership()
  314. defer cancel()
  315. go m.handleLeadershipEvents(ctx, leadershipCh)
  316. authorize := func(ctx context.Context, roles []string) error {
  317. var (
  318. blacklistedCerts map[string]*api.BlacklistedCertificate
  319. clusters []*api.Cluster
  320. err error
  321. )
  322. m.raftNode.MemoryStore().View(func(readTx store.ReadTx) {
  323. clusters, err = store.FindClusters(readTx, store.ByName("default"))
  324. })
  325. // Not having a cluster object yet means we can't check
  326. // the blacklist.
  327. if err == nil && len(clusters) == 1 {
  328. blacklistedCerts = clusters[0].BlacklistedCertificates
  329. }
  330. // Authorize the remote roles, ensure they can only be forwarded by managers
  331. _, err = ca.AuthorizeForwardedRoleAndOrg(ctx, roles, []string{ca.ManagerRole}, m.config.SecurityConfig.ClientTLSCreds.Organization(), blacklistedCerts)
  332. return err
  333. }
  334. baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.caserver, m.config.PluginGetter)
  335. baseWatchAPI := watchapi.NewServer(m.raftNode.MemoryStore())
  336. baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore())
  337. healthServer := health.NewHealthServer()
  338. localHealthServer := health.NewHealthServer()
  339. authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
  340. authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(baseWatchAPI, authorize)
  341. authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
  342. authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize)
  343. authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize)
  344. authenticatedDispatcherAPI := api.NewAuthenticatedWrapperDispatcherServer(m.dispatcher, authorize)
  345. authenticatedCAAPI := api.NewAuthenticatedWrapperCAServer(m.caserver, authorize)
  346. authenticatedNodeCAAPI := api.NewAuthenticatedWrapperNodeCAServer(m.caserver, authorize)
  347. authenticatedRaftAPI := api.NewAuthenticatedWrapperRaftServer(m.raftNode, authorize)
  348. authenticatedHealthAPI := api.NewAuthenticatedWrapperHealthServer(healthServer, authorize)
  349. authenticatedRaftMembershipAPI := api.NewAuthenticatedWrapperRaftMembershipServer(m.raftNode, authorize)
  350. proxyDispatcherAPI := api.NewRaftProxyDispatcherServer(authenticatedDispatcherAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  351. proxyCAAPI := api.NewRaftProxyCAServer(authenticatedCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  352. proxyNodeCAAPI := api.NewRaftProxyNodeCAServer(authenticatedNodeCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  353. proxyRaftMembershipAPI := api.NewRaftProxyRaftMembershipServer(authenticatedRaftMembershipAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  354. proxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(authenticatedResourceAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  355. proxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(authenticatedLogBrokerAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
  356. // The following local proxies are only wired up to receive requests
  357. // from a trusted local socket, and these requests don't use TLS,
  358. // therefore the requests they handle locally should bypass
  359. // authorization. When requests are proxied from these servers, they
  360. // are sent as requests from this manager rather than forwarded
  361. // requests (it has no TLS information to put in the metadata map).
  362. forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
  363. handleRequestLocally := func(ctx context.Context) (context.Context, error) {
  364. remoteAddr := "127.0.0.1:0"
  365. m.addrMu.Lock()
  366. if m.config.RemoteAPI != nil {
  367. if m.config.RemoteAPI.AdvertiseAddr != "" {
  368. remoteAddr = m.config.RemoteAPI.AdvertiseAddr
  369. } else {
  370. remoteAddr = m.config.RemoteAPI.ListenAddr
  371. }
  372. }
  373. m.addrMu.Unlock()
  374. creds := m.config.SecurityConfig.ClientTLSCreds
  375. nodeInfo := ca.RemoteNodeInfo{
  376. Roles: []string{creds.Role()},
  377. Organization: creds.Organization(),
  378. NodeID: creds.NodeID(),
  379. RemoteAddr: remoteAddr,
  380. }
  381. return context.WithValue(ctx, ca.LocalRequestKey, nodeInfo), nil
  382. }
  383. localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  384. localProxyLogsAPI := api.NewRaftProxyLogsServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  385. localProxyDispatcherAPI := api.NewRaftProxyDispatcherServer(m.dispatcher, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  386. localProxyCAAPI := api.NewRaftProxyCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  387. localProxyNodeCAAPI := api.NewRaftProxyNodeCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  388. localProxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(baseResourceAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  389. localProxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
  390. // Everything registered on m.server should be an authenticated
  391. // wrapper, or a proxy wrapping an authenticated wrapper!
  392. api.RegisterCAServer(m.server, proxyCAAPI)
  393. api.RegisterNodeCAServer(m.server, proxyNodeCAAPI)
  394. api.RegisterRaftServer(m.server, authenticatedRaftAPI)
  395. api.RegisterHealthServer(m.server, authenticatedHealthAPI)
  396. api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI)
  397. api.RegisterControlServer(m.server, authenticatedControlAPI)
  398. api.RegisterWatchServer(m.server, authenticatedWatchAPI)
  399. api.RegisterLogsServer(m.server, authenticatedLogsServerAPI)
  400. api.RegisterLogBrokerServer(m.server, proxyLogBrokerAPI)
  401. api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI)
  402. api.RegisterDispatcherServer(m.server, proxyDispatcherAPI)
  403. grpc_prometheus.Register(m.server)
  404. api.RegisterControlServer(m.localserver, localProxyControlAPI)
  405. api.RegisterWatchServer(m.localserver, baseWatchAPI)
  406. api.RegisterLogsServer(m.localserver, localProxyLogsAPI)
  407. api.RegisterHealthServer(m.localserver, localHealthServer)
  408. api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI)
  409. api.RegisterCAServer(m.localserver, localProxyCAAPI)
  410. api.RegisterNodeCAServer(m.localserver, localProxyNodeCAAPI)
  411. api.RegisterResourceAllocatorServer(m.localserver, localProxyResourceAPI)
  412. api.RegisterLogBrokerServer(m.localserver, localProxyLogBrokerAPI)
  413. grpc_prometheus.Register(m.localserver)
  414. healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING)
  415. localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)
  416. go m.serveListener(ctx, m.remoteListener)
  417. go m.serveListener(ctx, m.controlListener)
  418. defer func() {
  419. m.server.Stop()
  420. m.localserver.Stop()
  421. }()
  422. // Set the raft server as serving for the health server
  423. healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING)
  424. if err := m.raftNode.JoinAndStart(ctx); err != nil {
  425. return errors.Wrap(err, "can't initialize raft node")
  426. }
  427. localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING)
  428. // Start metrics collection.
  429. go func(collector *metrics.Collector) {
  430. if err := collector.Run(ctx); err != nil {
  431. log.G(ctx).WithError(err).Error("collector failed with an error")
  432. }
  433. }(m.collector)
  434. close(m.started)
  435. go func() {
  436. err := m.raftNode.Run(ctx)
  437. if err != nil {
  438. log.G(ctx).WithError(err).Error("raft node stopped")
  439. m.Stop(ctx, false)
  440. }
  441. }()
  442. if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
  443. return err
  444. }
  445. c, err := raft.WaitForCluster(ctx, m.raftNode)
  446. if err != nil {
  447. return err
  448. }
  449. raftConfig := c.Spec.Raft
  450. if err := m.watchForClusterChanges(ctx); err != nil {
  451. return err
  452. }
  453. if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick {
  454. log.G(ctx).Warningf("election tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.ElectionTick, raftConfig.ElectionTick)
  455. }
  456. if int(raftConfig.HeartbeatTick) != m.raftNode.Config.HeartbeatTick {
  457. log.G(ctx).Warningf("heartbeat tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.HeartbeatTick, raftConfig.HeartbeatTick)
  458. }
  459. // wait for an error in serving.
  460. err = <-m.errServe
  461. m.mu.Lock()
  462. if m.stopped {
  463. m.mu.Unlock()
  464. return nil
  465. }
  466. m.mu.Unlock()
  467. m.Stop(ctx, false)
  468. return err
  469. }
  470. const stopTimeout = 8 * time.Second
  471. // Stop stops the manager. It immediately closes all open connections and
  472. // active RPCs as well as stopping the scheduler. If clearData is set, the
  473. // raft logs, snapshots, and keys will be erased.
  474. func (m *Manager) Stop(ctx context.Context, clearData bool) {
  475. log.G(ctx).Info("Stopping manager")
  476. // It's not safe to start shutting down while the manager is still
  477. // starting up.
  478. <-m.started
  479. // the mutex stops us from trying to stop while we're already stopping, or
  480. // from returning before we've finished stopping.
  481. m.mu.Lock()
  482. defer m.mu.Unlock()
  483. if m.stopped {
  484. return
  485. }
  486. m.stopped = true
  487. srvDone, localSrvDone := make(chan struct{}), make(chan struct{})
  488. go func() {
  489. m.server.GracefulStop()
  490. close(srvDone)
  491. }()
  492. go func() {
  493. m.localserver.GracefulStop()
  494. close(localSrvDone)
  495. }()
  496. m.raftNode.Cancel()
  497. m.collector.Stop()
  498. m.dispatcher.Stop()
  499. m.logbroker.Stop()
  500. m.caserver.Stop()
  501. if m.allocator != nil {
  502. m.allocator.Stop()
  503. }
  504. if m.replicatedOrchestrator != nil {
  505. m.replicatedOrchestrator.Stop()
  506. }
  507. if m.globalOrchestrator != nil {
  508. m.globalOrchestrator.Stop()
  509. }
  510. if m.taskReaper != nil {
  511. m.taskReaper.Stop()
  512. }
  513. if m.constraintEnforcer != nil {
  514. m.constraintEnforcer.Stop()
  515. }
  516. if m.scheduler != nil {
  517. m.scheduler.Stop()
  518. }
  519. if m.roleManager != nil {
  520. m.roleManager.Stop()
  521. }
  522. if m.keyManager != nil {
  523. m.keyManager.Stop()
  524. }
  525. if clearData {
  526. m.raftNode.ClearData()
  527. }
  528. m.cancelFunc()
  529. <-m.raftNode.Done()
  530. timer := time.AfterFunc(stopTimeout, func() {
  531. m.server.Stop()
  532. m.localserver.Stop()
  533. })
  534. defer timer.Stop()
  535. // TODO: we're not waiting on ctx because it very well could be passed from Run,
  536. // which is already cancelled here. We need to refactor that.
  537. select {
  538. case <-srvDone:
  539. <-localSrvDone
  540. case <-localSrvDone:
  541. <-srvDone
  542. }
  543. log.G(ctx).Info("Manager shut down")
  544. // mutex is released and Run can return now
  545. }
  546. func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
  547. securityConfig := m.config.SecurityConfig
  548. nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
  549. logger := log.G(ctx).WithFields(logrus.Fields{
  550. "node.id": nodeID,
  551. "node.role": ca.ManagerRole,
  552. })
  553. kekData := ca.KEKData{Version: cluster.Meta.Version.Index}
  554. for _, encryptionKey := range cluster.UnlockKeys {
  555. if encryptionKey.Subsystem == ca.ManagerRole {
  556. kekData.KEK = encryptionKey.Key
  557. break
  558. }
  559. }
  560. updated, unlockedToLocked, err := m.dekRotator.MaybeUpdateKEK(kekData)
  561. if err != nil {
  562. logger.WithError(err).Errorf("failed to re-encrypt TLS key with a new KEK")
  563. return err
  564. }
  565. if updated {
  566. logger.Debug("successfully rotated KEK")
  567. }
  568. if unlockedToLocked {
  569. // a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews;
  570. // don't wait because it might take a bit
  571. go func() {
  572. insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
  573. conn, err := grpc.Dial(
  574. m.config.ControlAPI,
  575. grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
  576. grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
  577. grpc.WithTransportCredentials(insecureCreds),
  578. grpc.WithDialer(
  579. func(addr string, timeout time.Duration) (net.Conn, error) {
  580. return xnet.DialTimeoutLocal(addr, timeout)
  581. }),
  582. )
  583. if err != nil {
  584. logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster")
  585. return
  586. }
  587. defer conn.Close()
  588. connBroker := connectionbroker.New(remotes.NewRemotes())
  589. connBroker.SetLocalConn(conn)
  590. if err := ca.RenewTLSConfigNow(ctx, securityConfig, connBroker); err != nil {
  591. logger.WithError(err).Error("failed to download new TLS certificate after locking the cluster")
  592. }
  593. }()
  594. }
  595. return nil
  596. }
  597. func (m *Manager) watchForClusterChanges(ctx context.Context) error {
  598. clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
  599. clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(),
  600. func(tx store.ReadTx) error {
  601. cluster := store.GetCluster(tx, clusterID)
  602. if cluster == nil {
  603. return fmt.Errorf("unable to get current cluster")
  604. }
  605. if err := m.caserver.UpdateRootCA(ctx, cluster); err != nil {
  606. log.G(ctx).WithError(err).Error("could not update security config")
  607. }
  608. return m.updateKEK(ctx, cluster)
  609. },
  610. api.EventUpdateCluster{
  611. Cluster: &api.Cluster{ID: clusterID},
  612. Checks: []api.ClusterCheckFunc{api.ClusterCheckID},
  613. },
  614. )
  615. if err != nil {
  616. return err
  617. }
  618. go func() {
  619. for {
  620. select {
  621. case event := <-clusterWatch:
  622. clusterEvent := event.(api.EventUpdateCluster)
  623. if err := m.caserver.UpdateRootCA(ctx, clusterEvent.Cluster); err != nil {
  624. log.G(ctx).WithError(err).Error("could not update security config")
  625. }
  626. m.updateKEK(ctx, clusterEvent.Cluster)
  627. case <-ctx.Done():
  628. clusterWatchCancel()
  629. return
  630. }
  631. }
  632. }()
  633. return nil
  634. }
  635. // rotateRootCAKEK will attempt to rotate the key-encryption-key for root CA key-material in raft.
  636. // If there is no passphrase set in ENV, it returns.
  637. // If there is plain-text root key-material, and a passphrase set, it encrypts it.
  638. // If there is encrypted root key-material and it is using the current passphrase, it returns.
  639. // If there is encrypted root key-material, and it is using the previous passphrase, it
  640. // re-encrypts it with the current passphrase.
  641. func (m *Manager) rotateRootCAKEK(ctx context.Context, clusterID string) error {
  642. // If we don't have a KEK, we won't ever be rotating anything
  643. strPassphrase := os.Getenv(ca.PassphraseENVVar)
  644. strPassphrasePrev := os.Getenv(ca.PassphraseENVVarPrev)
  645. if strPassphrase == "" && strPassphrasePrev == "" {
  646. return nil
  647. }
  648. if strPassphrase != "" {
  649. log.G(ctx).Warn("Encrypting the root CA key in swarm using environment variables is deprecated. " +
  650. "Support for decrypting or rotating the key will be removed in the future.")
  651. }
  652. passphrase := []byte(strPassphrase)
  653. passphrasePrev := []byte(strPassphrasePrev)
  654. s := m.raftNode.MemoryStore()
  655. var (
  656. cluster *api.Cluster
  657. err error
  658. finalKey []byte
  659. )
  660. // Retrieve the cluster identified by ClusterID
  661. return s.Update(func(tx store.Tx) error {
  662. cluster = store.GetCluster(tx, clusterID)
  663. if cluster == nil {
  664. return fmt.Errorf("cluster not found: %s", clusterID)
  665. }
  666. // Try to get the private key from the cluster
  667. privKeyPEM := cluster.RootCA.CAKey
  668. if len(privKeyPEM) == 0 {
  669. // We have no PEM root private key in this cluster.
  670. log.G(ctx).Warnf("cluster %s does not have private key material", clusterID)
  671. return nil
  672. }
  673. // Decode the PEM private key
  674. keyBlock, _ := pem.Decode(privKeyPEM)
  675. if keyBlock == nil {
  676. return fmt.Errorf("invalid PEM-encoded private key inside of cluster %s", clusterID)
  677. }
  678. if x509.IsEncryptedPEMBlock(keyBlock) {
  679. // PEM encryption does not have a digest, so sometimes decryption doesn't
  680. // error even with the wrong passphrase. So actually try to parse it into a valid key.
  681. _, err := helpers.ParsePrivateKeyPEMWithPassword(privKeyPEM, []byte(passphrase))
  682. if err == nil {
  683. // This key is already correctly encrypted with the correct KEK, nothing to do here
  684. return nil
  685. }
  686. // This key is already encrypted, but failed with current main passphrase.
  687. // Let's try to decrypt with the previous passphrase, and parse into a valid key, for the
  688. // same reason as above.
  689. _, err = helpers.ParsePrivateKeyPEMWithPassword(privKeyPEM, []byte(passphrasePrev))
  690. if err != nil {
  691. // We were not able to decrypt either with the main or backup passphrase, error
  692. return err
  693. }
  694. // ok the above passphrase is correct, so decrypt the PEM block so we can re-encrypt -
  695. // since the key was successfully decrypted above, there will be no error doing PEM
  696. // decryption
  697. unencryptedDER, _ := x509.DecryptPEMBlock(keyBlock, []byte(passphrasePrev))
  698. unencryptedKeyBlock := &pem.Block{
  699. Type: keyBlock.Type,
  700. Bytes: unencryptedDER,
  701. }
  702. // we were able to decrypt the key with the previous passphrase - if the current passphrase is empty,
  703. // the we store the decrypted key in raft
  704. finalKey = pem.EncodeToMemory(unencryptedKeyBlock)
  705. // the current passphrase is not empty, so let's encrypt with the new one and store it in raft
  706. if strPassphrase != "" {
  707. finalKey, err = ca.EncryptECPrivateKey(finalKey, strPassphrase)
  708. if err != nil {
  709. log.G(ctx).WithError(err).Debugf("failed to rotate the key-encrypting-key for the root key material of cluster %s", clusterID)
  710. return err
  711. }
  712. }
  713. } else if strPassphrase != "" {
  714. // If this key is not encrypted, and the passphrase is not nil, then we have to encrypt it
  715. finalKey, err = ca.EncryptECPrivateKey(privKeyPEM, strPassphrase)
  716. if err != nil {
  717. log.G(ctx).WithError(err).Debugf("failed to rotate the key-encrypting-key for the root key material of cluster %s", clusterID)
  718. return err
  719. }
  720. } else {
  721. return nil // don't update if it's not encrypted and we don't want it encrypted
  722. }
  723. log.G(ctx).Infof("Updating the encryption on the root key material of cluster %s", clusterID)
  724. cluster.RootCA.CAKey = finalKey
  725. return store.UpdateCluster(tx, cluster)
  726. })
  727. }
  728. // handleLeadershipEvents handles the is leader event or is follower event.
  729. func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan events.Event) {
  730. for {
  731. select {
  732. case leadershipEvent := <-leadershipCh:
  733. m.mu.Lock()
  734. if m.stopped {
  735. m.mu.Unlock()
  736. return
  737. }
  738. newState := leadershipEvent.(raft.LeadershipState)
  739. if newState == raft.IsLeader {
  740. m.becomeLeader(ctx)
  741. } else if newState == raft.IsFollower {
  742. m.becomeFollower()
  743. }
  744. m.mu.Unlock()
  745. case <-ctx.Done():
  746. return
  747. }
  748. }
  749. }
  750. // serveListener serves a listener for local and non local connections.
  751. func (m *Manager) serveListener(ctx context.Context, lCh <-chan net.Listener) {
  752. var l net.Listener
  753. select {
  754. case l = <-lCh:
  755. case <-ctx.Done():
  756. return
  757. }
  758. ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
  759. logrus.Fields{
  760. "proto": l.Addr().Network(),
  761. "addr": l.Addr().String(),
  762. }))
  763. if _, ok := l.(*net.TCPListener); !ok {
  764. log.G(ctx).Info("Listening for local connections")
  765. // we need to disallow double closes because UnixListener.Close
  766. // can delete unix-socket file of newer listener. grpc calls
  767. // Close twice indeed: in Serve and in Stop.
  768. m.errServe <- m.localserver.Serve(&closeOnceListener{Listener: l})
  769. } else {
  770. log.G(ctx).Info("Listening for connections")
  771. m.errServe <- m.server.Serve(l)
  772. }
  773. }
  774. // becomeLeader starts the subsystems that are run on the leader.
  775. func (m *Manager) becomeLeader(ctx context.Context) {
  776. s := m.raftNode.MemoryStore()
  777. rootCA := m.config.SecurityConfig.RootCA()
  778. nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
  779. raftCfg := raft.DefaultRaftConfig()
  780. raftCfg.ElectionTick = uint32(m.raftNode.Config.ElectionTick)
  781. raftCfg.HeartbeatTick = uint32(m.raftNode.Config.HeartbeatTick)
  782. clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
  783. initialCAConfig := ca.DefaultCAConfig()
  784. initialCAConfig.ExternalCAs = m.config.ExternalCAs
  785. var unlockKeys []*api.EncryptionKey
  786. if m.config.AutoLockManagers {
  787. unlockKeys = []*api.EncryptionKey{{
  788. Subsystem: ca.ManagerRole,
  789. Key: m.config.UnlockKey,
  790. }}
  791. }
  792. s.Update(func(tx store.Tx) error {
  793. // Add a default cluster object to the
  794. // store. Don't check the error because
  795. // we expect this to fail unless this
  796. // is a brand new cluster.
  797. store.CreateCluster(tx, defaultClusterObject(
  798. clusterID,
  799. initialCAConfig,
  800. raftCfg,
  801. api.EncryptionConfig{AutoLockManagers: m.config.AutoLockManagers},
  802. unlockKeys,
  803. rootCA))
  804. // Add Node entry for ourself, if one
  805. // doesn't exist already.
  806. freshCluster := nil == store.CreateNode(tx, managerNode(nodeID, m.config.Availability))
  807. if freshCluster {
  808. // This is a fresh swarm cluster. Add to store now any initial
  809. // cluster resource, like the default ingress network which
  810. // provides the routing mesh for this cluster.
  811. log.G(ctx).Info("Creating default ingress network")
  812. if err := store.CreateNetwork(tx, newIngressNetwork()); err != nil {
  813. log.G(ctx).WithError(err).Error("failed to create default ingress network")
  814. }
  815. // Create now the static predefined node-local networks which
  816. // are known to be present in each cluster node. This is needed
  817. // in order to allow running services on the predefined docker
  818. // networks like `bridge` and `host`.
  819. log.G(ctx).Info("Creating node-local predefined networks")
  820. for _, p := range networkallocator.PredefinedNetworks() {
  821. if err := store.CreateNetwork(tx, newPredefinedNetwork(p.Name, p.Driver)); err != nil {
  822. log.G(ctx).WithError(err).Error("failed to create predefined network " + p.Name)
  823. }
  824. }
  825. }
  826. return nil
  827. })
  828. // Attempt to rotate the key-encrypting-key of the root CA key-material
  829. err := m.rotateRootCAKEK(ctx, clusterID)
  830. if err != nil {
  831. log.G(ctx).WithError(err).Error("root key-encrypting-key rotation failed")
  832. }
  833. m.replicatedOrchestrator = replicated.NewReplicatedOrchestrator(s)
  834. m.constraintEnforcer = constraintenforcer.New(s)
  835. m.globalOrchestrator = global.NewGlobalOrchestrator(s)
  836. m.taskReaper = taskreaper.New(s)
  837. m.scheduler = scheduler.New(s)
  838. m.keyManager = keymanager.New(s, keymanager.DefaultConfig())
  839. m.roleManager = newRoleManager(s, m.raftNode)
  840. // TODO(stevvooe): Allocate a context that can be used to
  841. // shutdown underlying manager processes when leadership is
  842. // lost.
  843. m.allocator, err = allocator.New(s, m.config.PluginGetter)
  844. if err != nil {
  845. log.G(ctx).WithError(err).Error("failed to create allocator")
  846. // TODO(stevvooe): It doesn't seem correct here to fail
  847. // creating the allocator but then use it anyway.
  848. }
  849. if m.keyManager != nil {
  850. go func(keyManager *keymanager.KeyManager) {
  851. if err := keyManager.Run(ctx); err != nil {
  852. log.G(ctx).WithError(err).Error("keymanager failed with an error")
  853. }
  854. }(m.keyManager)
  855. }
  856. go func(d *dispatcher.Dispatcher) {
  857. if err := d.Run(ctx); err != nil {
  858. log.G(ctx).WithError(err).Error("Dispatcher exited with an error")
  859. }
  860. }(m.dispatcher)
  861. go func(lb *logbroker.LogBroker) {
  862. if err := lb.Run(ctx); err != nil {
  863. log.G(ctx).WithError(err).Error("LogBroker exited with an error")
  864. }
  865. }(m.logbroker)
  866. go func(server *ca.Server) {
  867. if err := server.Run(ctx); err != nil {
  868. log.G(ctx).WithError(err).Error("CA signer exited with an error")
  869. }
  870. }(m.caserver)
  871. // Start all sub-components in separate goroutines.
  872. // TODO(aluzzardi): This should have some kind of error handling so that
  873. // any component that goes down would bring the entire manager down.
  874. if m.allocator != nil {
  875. go func(allocator *allocator.Allocator) {
  876. if err := allocator.Run(ctx); err != nil {
  877. log.G(ctx).WithError(err).Error("allocator exited with an error")
  878. }
  879. }(m.allocator)
  880. }
  881. go func(scheduler *scheduler.Scheduler) {
  882. if err := scheduler.Run(ctx); err != nil {
  883. log.G(ctx).WithError(err).Error("scheduler exited with an error")
  884. }
  885. }(m.scheduler)
  886. go func(constraintEnforcer *constraintenforcer.ConstraintEnforcer) {
  887. constraintEnforcer.Run()
  888. }(m.constraintEnforcer)
  889. go func(taskReaper *taskreaper.TaskReaper) {
  890. taskReaper.Run()
  891. }(m.taskReaper)
  892. go func(orchestrator *replicated.Orchestrator) {
  893. if err := orchestrator.Run(ctx); err != nil {
  894. log.G(ctx).WithError(err).Error("replicated orchestrator exited with an error")
  895. }
  896. }(m.replicatedOrchestrator)
  897. go func(globalOrchestrator *global.Orchestrator) {
  898. if err := globalOrchestrator.Run(ctx); err != nil {
  899. log.G(ctx).WithError(err).Error("global orchestrator exited with an error")
  900. }
  901. }(m.globalOrchestrator)
  902. go func(roleManager *roleManager) {
  903. roleManager.Run(ctx)
  904. }(m.roleManager)
  905. }
  906. // becomeFollower shuts down the subsystems that are only run by the leader.
  907. func (m *Manager) becomeFollower() {
  908. m.dispatcher.Stop()
  909. m.logbroker.Stop()
  910. m.caserver.Stop()
  911. if m.allocator != nil {
  912. m.allocator.Stop()
  913. m.allocator = nil
  914. }
  915. m.constraintEnforcer.Stop()
  916. m.constraintEnforcer = nil
  917. m.replicatedOrchestrator.Stop()
  918. m.replicatedOrchestrator = nil
  919. m.globalOrchestrator.Stop()
  920. m.globalOrchestrator = nil
  921. m.taskReaper.Stop()
  922. m.taskReaper = nil
  923. m.scheduler.Stop()
  924. m.scheduler = nil
  925. m.roleManager.Stop()
  926. m.roleManager = nil
  927. if m.keyManager != nil {
  928. m.keyManager.Stop()
  929. m.keyManager = nil
  930. }
  931. }
  932. // defaultClusterObject creates a default cluster.
  933. func defaultClusterObject(
  934. clusterID string,
  935. initialCAConfig api.CAConfig,
  936. raftCfg api.RaftConfig,
  937. encryptionConfig api.EncryptionConfig,
  938. initialUnlockKeys []*api.EncryptionKey,
  939. rootCA *ca.RootCA) *api.Cluster {
  940. var caKey []byte
  941. if rcaSigner, err := rootCA.Signer(); err == nil {
  942. caKey = rcaSigner.Key
  943. }
  944. return &api.Cluster{
  945. ID: clusterID,
  946. Spec: api.ClusterSpec{
  947. Annotations: api.Annotations{
  948. Name: store.DefaultClusterName,
  949. },
  950. Orchestration: api.OrchestrationConfig{
  951. TaskHistoryRetentionLimit: defaultTaskHistoryRetentionLimit,
  952. },
  953. Dispatcher: api.DispatcherConfig{
  954. HeartbeatPeriod: gogotypes.DurationProto(dispatcher.DefaultHeartBeatPeriod),
  955. },
  956. Raft: raftCfg,
  957. CAConfig: initialCAConfig,
  958. EncryptionConfig: encryptionConfig,
  959. },
  960. RootCA: api.RootCA{
  961. CAKey: caKey,
  962. CACert: rootCA.Certs,
  963. CACertHash: rootCA.Digest.String(),
  964. JoinTokens: api.JoinTokens{
  965. Worker: ca.GenerateJoinToken(rootCA),
  966. Manager: ca.GenerateJoinToken(rootCA),
  967. },
  968. },
  969. UnlockKeys: initialUnlockKeys,
  970. }
  971. }
  972. // managerNode creates a new node with NodeRoleManager role.
  973. func managerNode(nodeID string, availability api.NodeSpec_Availability) *api.Node {
  974. return &api.Node{
  975. ID: nodeID,
  976. Certificate: api.Certificate{
  977. CN: nodeID,
  978. Role: api.NodeRoleManager,
  979. Status: api.IssuanceStatus{
  980. State: api.IssuanceStateIssued,
  981. },
  982. },
  983. Spec: api.NodeSpec{
  984. DesiredRole: api.NodeRoleManager,
  985. Membership: api.NodeMembershipAccepted,
  986. Availability: availability,
  987. },
  988. }
  989. }
  990. // newIngressNetwork returns the network object for the default ingress
  991. // network, the network which provides the routing mesh. Caller will save to
  992. // store this object once, at fresh cluster creation. It is expected to
  993. // call this function inside a store update transaction.
  994. func newIngressNetwork() *api.Network {
  995. return &api.Network{
  996. ID: identity.NewID(),
  997. Spec: api.NetworkSpec{
  998. Ingress: true,
  999. Annotations: api.Annotations{
  1000. Name: "ingress",
  1001. },
  1002. DriverConfig: &api.Driver{},
  1003. IPAM: &api.IPAMOptions{
  1004. Driver: &api.Driver{},
  1005. Configs: []*api.IPAMConfig{
  1006. {
  1007. Subnet: "10.255.0.0/16",
  1008. },
  1009. },
  1010. },
  1011. },
  1012. }
  1013. }
  1014. // Creates a network object representing one of the predefined networks
  1015. // known to be statically created on the cluster nodes. These objects
  1016. // are populated in the store at cluster creation solely in order to
  1017. // support running services on the nodes' predefined networks.
  1018. // External clients can filter these predefined networks by looking
  1019. // at the predefined label.
  1020. func newPredefinedNetwork(name, driver string) *api.Network {
  1021. return &api.Network{
  1022. ID: identity.NewID(),
  1023. Spec: api.NetworkSpec{
  1024. Annotations: api.Annotations{
  1025. Name: name,
  1026. Labels: map[string]string{
  1027. networkallocator.PredefinedLabel: "true",
  1028. },
  1029. },
  1030. DriverConfig: &api.Driver{Name: driver},
  1031. },
  1032. }
  1033. }