1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255 |
- package manager
- import (
- "context"
- "crypto/tls"
- "fmt"
- "net"
- "os"
- "path/filepath"
- "runtime"
- "sync"
- "syscall"
- "time"
- "github.com/docker/docker/pkg/plugingetter"
- "github.com/docker/go-events"
- gmetrics "github.com/docker/go-metrics"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/ca"
- "github.com/docker/swarmkit/connectionbroker"
- "github.com/docker/swarmkit/identity"
- "github.com/docker/swarmkit/log"
- "github.com/docker/swarmkit/manager/allocator"
- "github.com/docker/swarmkit/manager/allocator/cnmallocator"
- "github.com/docker/swarmkit/manager/allocator/networkallocator"
- "github.com/docker/swarmkit/manager/controlapi"
- "github.com/docker/swarmkit/manager/dispatcher"
- "github.com/docker/swarmkit/manager/drivers"
- "github.com/docker/swarmkit/manager/health"
- "github.com/docker/swarmkit/manager/keymanager"
- "github.com/docker/swarmkit/manager/logbroker"
- "github.com/docker/swarmkit/manager/metrics"
- "github.com/docker/swarmkit/manager/orchestrator/constraintenforcer"
- "github.com/docker/swarmkit/manager/orchestrator/global"
- "github.com/docker/swarmkit/manager/orchestrator/replicated"
- "github.com/docker/swarmkit/manager/orchestrator/taskreaper"
- "github.com/docker/swarmkit/manager/resourceapi"
- "github.com/docker/swarmkit/manager/scheduler"
- "github.com/docker/swarmkit/manager/state/raft"
- "github.com/docker/swarmkit/manager/state/raft/transport"
- "github.com/docker/swarmkit/manager/state/store"
- "github.com/docker/swarmkit/manager/watchapi"
- "github.com/docker/swarmkit/remotes"
- "github.com/docker/swarmkit/xnet"
- gogotypes "github.com/gogo/protobuf/types"
- grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
- )
- const (
- // defaultTaskHistoryRetentionLimit is the number of tasks to keep.
- defaultTaskHistoryRetentionLimit = 5
- )
- // RemoteAddrs provides a listening address and an optional advertise address
- // for serving the remote API.
- type RemoteAddrs struct {
- // Address to bind
- ListenAddr string
- // Address to advertise to remote nodes (optional).
- AdvertiseAddr string
- }
- // Config is used to tune the Manager.
- type Config struct {
- SecurityConfig *ca.SecurityConfig
- // RootCAPaths is the path to which new root certs should be save
- RootCAPaths ca.CertPaths
- // ExternalCAs is a list of initial CAs to which a manager node
- // will make certificate signing requests for node certificates.
- ExternalCAs []*api.ExternalCA
- // ControlAPI is an address for serving the control API.
- ControlAPI string
- // RemoteAPI is a listening address for serving the remote API, and
- // an optional advertise address.
- RemoteAPI *RemoteAddrs
- // JoinRaft is an optional address of a node in an existing raft
- // cluster to join.
- JoinRaft string
- // ForceJoin causes us to invoke raft's Join RPC even if already part
- // of a cluster.
- ForceJoin bool
- // StateDir is the top-level state directory
- StateDir string
- // ForceNewCluster defines if we have to force a new cluster
- // because we are recovering from a backup data directory.
- ForceNewCluster bool
- // ElectionTick defines the amount of ticks needed without
- // leader to trigger a new election
- ElectionTick uint32
- // HeartbeatTick defines the amount of ticks between each
- // heartbeat sent to other members for health-check purposes
- HeartbeatTick uint32
- // AutoLockManagers determines whether or not managers require an unlock key
- // when starting from a stopped state. This configuration parameter is only
- // applicable when bootstrapping a new cluster for the first time.
- AutoLockManagers bool
- // UnlockKey is the key to unlock a node - used for decrypting manager TLS keys
- // as well as the raft data encryption key (DEK). It is applicable when
- // bootstrapping a cluster for the first time (it's a cluster-wide setting),
- // and also when loading up any raft data on disk (as a KEK for the raft DEK).
- UnlockKey []byte
- // Availability allows a user to control the current scheduling status of a node
- Availability api.NodeSpec_Availability
- // PluginGetter provides access to docker's plugin inventory.
- PluginGetter plugingetter.PluginGetter
- // FIPS is a boolean stating whether the node is FIPS enabled - if this is the
- // first node in the cluster, this setting is used to set the cluster-wide mandatory
- // FIPS setting.
- FIPS bool
- // NetworkConfig stores network related config for the cluster
- NetworkConfig *cnmallocator.NetworkConfig
- }
- // Manager is the cluster manager for Swarm.
- // This is the high-level object holding and initializing all the manager
- // subsystems.
- type Manager struct {
- config Config
- collector *metrics.Collector
- caserver *ca.Server
- dispatcher *dispatcher.Dispatcher
- logbroker *logbroker.LogBroker
- watchServer *watchapi.Server
- replicatedOrchestrator *replicated.Orchestrator
- globalOrchestrator *global.Orchestrator
- taskReaper *taskreaper.TaskReaper
- constraintEnforcer *constraintenforcer.ConstraintEnforcer
- scheduler *scheduler.Scheduler
- allocator *allocator.Allocator
- keyManager *keymanager.KeyManager
- server *grpc.Server
- localserver *grpc.Server
- raftNode *raft.Node
- dekRotator *RaftDEKManager
- roleManager *roleManager
- cancelFunc context.CancelFunc
- // mu is a general mutex used to coordinate starting/stopping and
- // leadership events.
- mu sync.Mutex
- // addrMu is a mutex that protects config.ControlAPI and config.RemoteAPI
- addrMu sync.Mutex
- started chan struct{}
- stopped bool
- remoteListener chan net.Listener
- controlListener chan net.Listener
- errServe chan error
- }
- var (
- leaderMetric gmetrics.Gauge
- )
- func init() {
- ns := gmetrics.NewNamespace("swarm", "manager", nil)
- leaderMetric = ns.NewGauge("leader", "Indicates if this manager node is a leader", "")
- gmetrics.Register(ns)
- }
- type closeOnceListener struct {
- once sync.Once
- net.Listener
- }
- func (l *closeOnceListener) Close() error {
- var err error
- l.once.Do(func() {
- err = l.Listener.Close()
- })
- return err
- }
- // New creates a Manager which has not started to accept requests yet.
- func New(config *Config) (*Manager, error) {
- err := os.MkdirAll(config.StateDir, 0700)
- if err != nil {
- return nil, errors.Wrap(err, "failed to create state directory")
- }
- raftStateDir := filepath.Join(config.StateDir, "raft")
- err = os.MkdirAll(raftStateDir, 0700)
- if err != nil {
- return nil, errors.Wrap(err, "failed to create raft state directory")
- }
- raftCfg := raft.DefaultNodeConfig()
- if config.ElectionTick > 0 {
- raftCfg.ElectionTick = int(config.ElectionTick)
- }
- if config.HeartbeatTick > 0 {
- raftCfg.HeartbeatTick = int(config.HeartbeatTick)
- }
- dekRotator, err := NewRaftDEKManager(config.SecurityConfig.KeyWriter(), config.FIPS)
- if err != nil {
- return nil, err
- }
- newNodeOpts := raft.NodeOptions{
- ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
- JoinAddr: config.JoinRaft,
- ForceJoin: config.ForceJoin,
- Config: raftCfg,
- StateDir: raftStateDir,
- ForceNewCluster: config.ForceNewCluster,
- TLSCredentials: config.SecurityConfig.ClientTLSCreds,
- KeyRotator: dekRotator,
- FIPS: config.FIPS,
- }
- raftNode := raft.NewNode(newNodeOpts)
- // the interceptorWrappers are functions that wrap the prometheus grpc
- // interceptor, and add some of code to log errors locally. one for stream
- // and one for unary. this is needed because the grpc unary interceptor
- // doesn't natively do chaining, you have to implement it in the caller.
- // note that even though these are logging errors, we're still using
- // debug level. returning errors from GRPC methods is common and expected,
- // and logging an ERROR every time a user mistypes a service name would
- // pollute the logs really fast.
- //
- // NOTE(dperny): Because of the fact that these functions are very simple
- // in their operation and have no side effects other than the log output,
- // they are not automatically tested. If you modify them later, make _sure_
- // that they are correct. If you add substantial side effects, abstract
- // these out and test them!
- unaryInterceptorWrapper := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
- // pass the call down into the grpc_prometheus interceptor
- resp, err := grpc_prometheus.UnaryServerInterceptor(ctx, req, info, handler)
- if err != nil {
- log.G(ctx).WithField("rpc", info.FullMethod).WithError(err).Debug("error handling rpc")
- }
- return resp, err
- }
- streamInterceptorWrapper := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- // we can't re-write a stream context, so don't bother creating a
- // sub-context like in unary methods
- // pass the call down into the grpc_prometheus interceptor
- err := grpc_prometheus.StreamServerInterceptor(srv, ss, info, handler)
- if err != nil {
- log.G(ss.Context()).WithField("rpc", info.FullMethod).WithError(err).Debug("error handling streaming rpc")
- }
- return err
- }
- opts := []grpc.ServerOption{
- grpc.Creds(config.SecurityConfig.ServerTLSCreds),
- grpc.StreamInterceptor(streamInterceptorWrapper),
- grpc.UnaryInterceptor(unaryInterceptorWrapper),
- grpc.MaxRecvMsgSize(transport.GRPCMaxMsgSize),
- }
- m := &Manager{
- config: *config,
- caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig),
- dispatcher: dispatcher.New(),
- logbroker: logbroker.New(raftNode.MemoryStore()),
- watchServer: watchapi.NewServer(raftNode.MemoryStore()),
- server: grpc.NewServer(opts...),
- localserver: grpc.NewServer(opts...),
- raftNode: raftNode,
- started: make(chan struct{}),
- dekRotator: dekRotator,
- remoteListener: make(chan net.Listener, 1),
- controlListener: make(chan net.Listener, 1),
- errServe: make(chan error, 2),
- }
- if config.ControlAPI != "" {
- m.config.ControlAPI = ""
- if err := m.BindControl(config.ControlAPI); err != nil {
- return nil, err
- }
- }
- if config.RemoteAPI != nil {
- m.config.RemoteAPI = nil
- // The context isn't used in this case (before (*Manager).Run).
- if err := m.BindRemote(context.Background(), *config.RemoteAPI); err != nil {
- if config.ControlAPI != "" {
- l := <-m.controlListener
- l.Close()
- }
- return nil, err
- }
- }
- return m, nil
- }
- // BindControl binds a local socket for the control API.
- func (m *Manager) BindControl(addr string) error {
- m.addrMu.Lock()
- defer m.addrMu.Unlock()
- if m.config.ControlAPI != "" {
- return errors.New("manager already has a control API address")
- }
- // don't create a socket directory if we're on windows. we used named pipe
- if runtime.GOOS != "windows" {
- err := os.MkdirAll(filepath.Dir(addr), 0700)
- if err != nil {
- return errors.Wrap(err, "failed to create socket directory")
- }
- }
- l, err := xnet.ListenLocal(addr)
- // A unix socket may fail to bind if the file already
- // exists. Try replacing the file.
- if runtime.GOOS != "windows" {
- unwrappedErr := err
- if op, ok := unwrappedErr.(*net.OpError); ok {
- unwrappedErr = op.Err
- }
- if sys, ok := unwrappedErr.(*os.SyscallError); ok {
- unwrappedErr = sys.Err
- }
- if unwrappedErr == syscall.EADDRINUSE {
- os.Remove(addr)
- l, err = xnet.ListenLocal(addr)
- }
- }
- if err != nil {
- return errors.Wrap(err, "failed to listen on control API address")
- }
- m.config.ControlAPI = addr
- m.controlListener <- l
- return nil
- }
- // BindRemote binds a port for the remote API.
- func (m *Manager) BindRemote(ctx context.Context, addrs RemoteAddrs) error {
- m.addrMu.Lock()
- defer m.addrMu.Unlock()
- if m.config.RemoteAPI != nil {
- return errors.New("manager already has remote API address")
- }
- // If an AdvertiseAddr was specified, we use that as our
- // externally-reachable address.
- advertiseAddr := addrs.AdvertiseAddr
- var advertiseAddrPort string
- if advertiseAddr == "" {
- // Otherwise, we know we are joining an existing swarm. Use a
- // wildcard address to trigger remote autodetection of our
- // address.
- var err error
- _, advertiseAddrPort, err = net.SplitHostPort(addrs.ListenAddr)
- if err != nil {
- return fmt.Errorf("missing or invalid listen address %s", addrs.ListenAddr)
- }
- // Even with an IPv6 listening address, it's okay to use
- // 0.0.0.0 here. Any "unspecified" (wildcard) IP will
- // be substituted with the actual source address.
- advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort)
- }
- l, err := net.Listen("tcp", addrs.ListenAddr)
- if err != nil {
- return errors.Wrap(err, "failed to listen on remote API address")
- }
- if advertiseAddrPort == "0" {
- advertiseAddr = l.Addr().String()
- addrs.ListenAddr = advertiseAddr
- }
- m.config.RemoteAPI = &addrs
- m.raftNode.SetAddr(ctx, advertiseAddr)
- m.remoteListener <- l
- return nil
- }
- // RemovedFromRaft returns a channel that's closed if the manager is removed
- // from the raft cluster. This should be used to trigger a manager shutdown.
- func (m *Manager) RemovedFromRaft() <-chan struct{} {
- return m.raftNode.RemovedFromRaft
- }
- // Addr returns tcp address on which remote api listens.
- func (m *Manager) Addr() string {
- m.addrMu.Lock()
- defer m.addrMu.Unlock()
- if m.config.RemoteAPI == nil {
- return ""
- }
- return m.config.RemoteAPI.ListenAddr
- }
- // Run starts all manager sub-systems and the gRPC server at the configured
- // address.
- // The call never returns unless an error occurs or `Stop()` is called.
- func (m *Manager) Run(parent context.Context) error {
- ctx, ctxCancel := context.WithCancel(parent)
- defer ctxCancel()
- m.cancelFunc = ctxCancel
- leadershipCh, cancel := m.raftNode.SubscribeLeadership()
- defer cancel()
- go m.handleLeadershipEvents(ctx, leadershipCh)
- authorize := func(ctx context.Context, roles []string) error {
- var (
- blacklistedCerts map[string]*api.BlacklistedCertificate
- clusters []*api.Cluster
- err error
- )
- m.raftNode.MemoryStore().View(func(readTx store.ReadTx) {
- clusters, err = store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
- })
- // Not having a cluster object yet means we can't check
- // the blacklist.
- if err == nil && len(clusters) == 1 {
- blacklistedCerts = clusters[0].BlacklistedCertificates
- }
- // Authorize the remote roles, ensure they can only be forwarded by managers
- _, err = ca.AuthorizeForwardedRoleAndOrg(ctx, roles, []string{ca.ManagerRole}, m.config.SecurityConfig.ClientTLSCreds.Organization(), blacklistedCerts)
- return err
- }
- baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.config.PluginGetter, drivers.New(m.config.PluginGetter))
- baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore())
- healthServer := health.NewHealthServer()
- localHealthServer := health.NewHealthServer()
- authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
- authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(m.watchServer, authorize)
- authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
- authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize)
- authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize)
- authenticatedDispatcherAPI := api.NewAuthenticatedWrapperDispatcherServer(m.dispatcher, authorize)
- authenticatedCAAPI := api.NewAuthenticatedWrapperCAServer(m.caserver, authorize)
- authenticatedNodeCAAPI := api.NewAuthenticatedWrapperNodeCAServer(m.caserver, authorize)
- authenticatedRaftAPI := api.NewAuthenticatedWrapperRaftServer(m.raftNode, authorize)
- authenticatedHealthAPI := api.NewAuthenticatedWrapperHealthServer(healthServer, authorize)
- authenticatedRaftMembershipAPI := api.NewAuthenticatedWrapperRaftMembershipServer(m.raftNode, authorize)
- proxyDispatcherAPI := api.NewRaftProxyDispatcherServer(authenticatedDispatcherAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
- proxyCAAPI := api.NewRaftProxyCAServer(authenticatedCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
- proxyNodeCAAPI := api.NewRaftProxyNodeCAServer(authenticatedNodeCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
- proxyRaftMembershipAPI := api.NewRaftProxyRaftMembershipServer(authenticatedRaftMembershipAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
- proxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(authenticatedResourceAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
- proxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(authenticatedLogBrokerAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
- // The following local proxies are only wired up to receive requests
- // from a trusted local socket, and these requests don't use TLS,
- // therefore the requests they handle locally should bypass
- // authorization. When requests are proxied from these servers, they
- // are sent as requests from this manager rather than forwarded
- // requests (it has no TLS information to put in the metadata map).
- forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
- handleRequestLocally := func(ctx context.Context) (context.Context, error) {
- remoteAddr := "127.0.0.1:0"
- m.addrMu.Lock()
- if m.config.RemoteAPI != nil {
- if m.config.RemoteAPI.AdvertiseAddr != "" {
- remoteAddr = m.config.RemoteAPI.AdvertiseAddr
- } else {
- remoteAddr = m.config.RemoteAPI.ListenAddr
- }
- }
- m.addrMu.Unlock()
- creds := m.config.SecurityConfig.ClientTLSCreds
- nodeInfo := ca.RemoteNodeInfo{
- Roles: []string{creds.Role()},
- Organization: creds.Organization(),
- NodeID: creds.NodeID(),
- RemoteAddr: remoteAddr,
- }
- return context.WithValue(ctx, ca.LocalRequestKey, nodeInfo), nil
- }
- localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
- localProxyLogsAPI := api.NewRaftProxyLogsServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
- localProxyDispatcherAPI := api.NewRaftProxyDispatcherServer(m.dispatcher, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
- localProxyCAAPI := api.NewRaftProxyCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
- localProxyNodeCAAPI := api.NewRaftProxyNodeCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
- localProxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(baseResourceAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
- localProxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
- // Everything registered on m.server should be an authenticated
- // wrapper, or a proxy wrapping an authenticated wrapper!
- api.RegisterCAServer(m.server, proxyCAAPI)
- api.RegisterNodeCAServer(m.server, proxyNodeCAAPI)
- api.RegisterRaftServer(m.server, authenticatedRaftAPI)
- api.RegisterHealthServer(m.server, authenticatedHealthAPI)
- api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI)
- api.RegisterControlServer(m.server, authenticatedControlAPI)
- api.RegisterWatchServer(m.server, authenticatedWatchAPI)
- api.RegisterLogsServer(m.server, authenticatedLogsServerAPI)
- api.RegisterLogBrokerServer(m.server, proxyLogBrokerAPI)
- api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI)
- api.RegisterDispatcherServer(m.server, proxyDispatcherAPI)
- grpc_prometheus.Register(m.server)
- api.RegisterControlServer(m.localserver, localProxyControlAPI)
- api.RegisterWatchServer(m.localserver, m.watchServer)
- api.RegisterLogsServer(m.localserver, localProxyLogsAPI)
- api.RegisterHealthServer(m.localserver, localHealthServer)
- api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI)
- api.RegisterCAServer(m.localserver, localProxyCAAPI)
- api.RegisterNodeCAServer(m.localserver, localProxyNodeCAAPI)
- api.RegisterResourceAllocatorServer(m.localserver, localProxyResourceAPI)
- api.RegisterLogBrokerServer(m.localserver, localProxyLogBrokerAPI)
- grpc_prometheus.Register(m.localserver)
- healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING)
- localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)
- if err := m.watchServer.Start(ctx); err != nil {
- log.G(ctx).WithError(err).Error("watch server failed to start")
- }
- go m.serveListener(ctx, m.remoteListener)
- go m.serveListener(ctx, m.controlListener)
- defer func() {
- m.server.Stop()
- m.localserver.Stop()
- }()
- // Set the raft server as serving for the health server
- healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING)
- if err := m.raftNode.JoinAndStart(ctx); err != nil {
- // Don't block future calls to Stop.
- close(m.started)
- return errors.Wrap(err, "can't initialize raft node")
- }
- localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING)
- // Start metrics collection.
- m.collector = metrics.NewCollector(m.raftNode.MemoryStore())
- go func(collector *metrics.Collector) {
- if err := collector.Run(ctx); err != nil {
- log.G(ctx).WithError(err).Error("collector failed with an error")
- }
- }(m.collector)
- close(m.started)
- go func() {
- err := m.raftNode.Run(ctx)
- if err != nil {
- log.G(ctx).WithError(err).Error("raft node stopped")
- m.Stop(ctx, false)
- }
- }()
- if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
- return err
- }
- c, err := raft.WaitForCluster(ctx, m.raftNode)
- if err != nil {
- return err
- }
- raftConfig := c.Spec.Raft
- if err := m.watchForClusterChanges(ctx); err != nil {
- return err
- }
- if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick {
- log.G(ctx).Warningf("election tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.ElectionTick, raftConfig.ElectionTick)
- }
- if int(raftConfig.HeartbeatTick) != m.raftNode.Config.HeartbeatTick {
- log.G(ctx).Warningf("heartbeat tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.HeartbeatTick, raftConfig.HeartbeatTick)
- }
- // wait for an error in serving.
- err = <-m.errServe
- m.mu.Lock()
- if m.stopped {
- m.mu.Unlock()
- return nil
- }
- m.mu.Unlock()
- m.Stop(ctx, false)
- return err
- }
- const stopTimeout = 8 * time.Second
- // Stop stops the manager. It immediately closes all open connections and
- // active RPCs as well as stopping the manager's subsystems. If clearData is
- // set, the raft logs, snapshots, and keys will be erased.
- func (m *Manager) Stop(ctx context.Context, clearData bool) {
- log.G(ctx).Info("Stopping manager")
- // It's not safe to start shutting down while the manager is still
- // starting up.
- <-m.started
- // the mutex stops us from trying to stop while we're already stopping, or
- // from returning before we've finished stopping.
- m.mu.Lock()
- defer m.mu.Unlock()
- if m.stopped {
- return
- }
- m.stopped = true
- srvDone, localSrvDone := make(chan struct{}), make(chan struct{})
- go func() {
- m.server.GracefulStop()
- close(srvDone)
- }()
- go func() {
- m.localserver.GracefulStop()
- close(localSrvDone)
- }()
- m.raftNode.Cancel()
- if m.collector != nil {
- m.collector.Stop()
- }
- // The following components are gRPC services that are
- // registered when creating the manager and will need
- // to be re-registered if they are recreated.
- // For simplicity, they are not nilled out.
- m.dispatcher.Stop()
- m.logbroker.Stop()
- m.watchServer.Stop()
- m.caserver.Stop()
- if m.allocator != nil {
- m.allocator.Stop()
- }
- if m.replicatedOrchestrator != nil {
- m.replicatedOrchestrator.Stop()
- }
- if m.globalOrchestrator != nil {
- m.globalOrchestrator.Stop()
- }
- if m.taskReaper != nil {
- m.taskReaper.Stop()
- }
- if m.constraintEnforcer != nil {
- m.constraintEnforcer.Stop()
- }
- if m.scheduler != nil {
- m.scheduler.Stop()
- }
- if m.roleManager != nil {
- m.roleManager.Stop()
- }
- if m.keyManager != nil {
- m.keyManager.Stop()
- }
- if clearData {
- m.raftNode.ClearData()
- }
- m.cancelFunc()
- <-m.raftNode.Done()
- timer := time.AfterFunc(stopTimeout, func() {
- m.server.Stop()
- m.localserver.Stop()
- })
- defer timer.Stop()
- // TODO: we're not waiting on ctx because it very well could be passed from Run,
- // which is already cancelled here. We need to refactor that.
- select {
- case <-srvDone:
- <-localSrvDone
- case <-localSrvDone:
- <-srvDone
- }
- log.G(ctx).Info("Manager shut down")
- // mutex is released and Run can return now
- }
- func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
- securityConfig := m.config.SecurityConfig
- nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
- logger := log.G(ctx).WithFields(logrus.Fields{
- "node.id": nodeID,
- "node.role": ca.ManagerRole,
- })
- kekData := ca.KEKData{Version: cluster.Meta.Version.Index}
- for _, encryptionKey := range cluster.UnlockKeys {
- if encryptionKey.Subsystem == ca.ManagerRole {
- kekData.KEK = encryptionKey.Key
- break
- }
- }
- updated, unlockedToLocked, err := m.dekRotator.MaybeUpdateKEK(kekData)
- if err != nil {
- logger.WithError(err).Errorf("failed to re-encrypt TLS key with a new KEK")
- return err
- }
- if updated {
- logger.Debug("successfully rotated KEK")
- }
- if unlockedToLocked {
- // a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews;
- // don't wait because it might take a bit
- go func() {
- insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
- conn, err := grpc.Dial(
- m.config.ControlAPI,
- grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
- grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
- grpc.WithTransportCredentials(insecureCreds),
- grpc.WithDialer(
- func(addr string, timeout time.Duration) (net.Conn, error) {
- return xnet.DialTimeoutLocal(addr, timeout)
- }),
- )
- if err != nil {
- logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster")
- return
- }
- defer conn.Close()
- connBroker := connectionbroker.New(remotes.NewRemotes())
- connBroker.SetLocalConn(conn)
- if err := ca.RenewTLSConfigNow(ctx, securityConfig, connBroker, m.config.RootCAPaths); err != nil {
- logger.WithError(err).Error("failed to download new TLS certificate after locking the cluster")
- }
- }()
- }
- return nil
- }
- func (m *Manager) watchForClusterChanges(ctx context.Context) error {
- clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
- var cluster *api.Cluster
- clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(),
- func(tx store.ReadTx) error {
- cluster = store.GetCluster(tx, clusterID)
- if cluster == nil {
- return fmt.Errorf("unable to get current cluster")
- }
- return nil
- },
- api.EventUpdateCluster{
- Cluster: &api.Cluster{ID: clusterID},
- Checks: []api.ClusterCheckFunc{api.ClusterCheckID},
- },
- )
- if err != nil {
- return err
- }
- if err := m.updateKEK(ctx, cluster); err != nil {
- return err
- }
- go func() {
- for {
- select {
- case event := <-clusterWatch:
- clusterEvent := event.(api.EventUpdateCluster)
- m.updateKEK(ctx, clusterEvent.Cluster)
- case <-ctx.Done():
- clusterWatchCancel()
- return
- }
- }
- }()
- return nil
- }
- // getLeaderNodeID is a small helper function returning a string with the
- // leader's node ID. it is only used for logging, and should not be relied on
- // to give a node ID for actual operational purposes (because it returns errors
- // as nicely decorated strings)
- func (m *Manager) getLeaderNodeID() string {
- // get the current leader ID. this variable tracks the leader *only* for
- // the purposes of logging leadership changes, and should not be relied on
- // for other purposes
- leader, leaderErr := m.raftNode.Leader()
- switch leaderErr {
- case raft.ErrNoRaftMember:
- // this is an unlikely case, but we have to handle it. this means this
- // node is not a member of the raft quorum. this won't look very pretty
- // in logs ("leadership changed from aslkdjfa to ErrNoRaftMember") but
- // it also won't be very common
- return "not yet part of a raft cluster"
- case raft.ErrNoClusterLeader:
- return "no cluster leader"
- default:
- id, err := m.raftNode.GetNodeIDByRaftID(leader)
- // the only possible error here is "ErrMemberUnknown"
- if err != nil {
- return "an unknown node"
- }
- return id
- }
- }
- // handleLeadershipEvents handles the is leader event or is follower event.
- func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan events.Event) {
- // get the current leader and save it for logging leadership changes in
- // this loop
- oldLeader := m.getLeaderNodeID()
- for {
- select {
- case leadershipEvent := <-leadershipCh:
- m.mu.Lock()
- if m.stopped {
- m.mu.Unlock()
- return
- }
- newState := leadershipEvent.(raft.LeadershipState)
- if newState == raft.IsLeader {
- m.becomeLeader(ctx)
- leaderMetric.Set(1)
- } else if newState == raft.IsFollower {
- m.becomeFollower()
- leaderMetric.Set(0)
- }
- m.mu.Unlock()
- newLeader := m.getLeaderNodeID()
- // maybe we should use logrus fields for old and new leader, so
- // that users are better able to ingest leadership changes into log
- // aggregators?
- log.G(ctx).Infof("leadership changed from %v to %v", oldLeader, newLeader)
- case <-ctx.Done():
- return
- }
- }
- }
- // serveListener serves a listener for local and non local connections.
- func (m *Manager) serveListener(ctx context.Context, lCh <-chan net.Listener) {
- var l net.Listener
- select {
- case l = <-lCh:
- case <-ctx.Done():
- return
- }
- ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
- logrus.Fields{
- "proto": l.Addr().Network(),
- "addr": l.Addr().String(),
- }))
- if _, ok := l.(*net.TCPListener); !ok {
- log.G(ctx).Info("Listening for local connections")
- // we need to disallow double closes because UnixListener.Close
- // can delete unix-socket file of newer listener. grpc calls
- // Close twice indeed: in Serve and in Stop.
- m.errServe <- m.localserver.Serve(&closeOnceListener{Listener: l})
- } else {
- log.G(ctx).Info("Listening for connections")
- m.errServe <- m.server.Serve(l)
- }
- }
- // becomeLeader starts the subsystems that are run on the leader.
- func (m *Manager) becomeLeader(ctx context.Context) {
- s := m.raftNode.MemoryStore()
- rootCA := m.config.SecurityConfig.RootCA()
- nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
- raftCfg := raft.DefaultRaftConfig()
- raftCfg.ElectionTick = uint32(m.raftNode.Config.ElectionTick)
- raftCfg.HeartbeatTick = uint32(m.raftNode.Config.HeartbeatTick)
- clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
- initialCAConfig := ca.DefaultCAConfig()
- initialCAConfig.ExternalCAs = m.config.ExternalCAs
- var (
- unlockKeys []*api.EncryptionKey
- err error
- )
- if m.config.AutoLockManagers {
- unlockKeys = []*api.EncryptionKey{{
- Subsystem: ca.ManagerRole,
- Key: m.config.UnlockKey,
- }}
- }
- s.Update(func(tx store.Tx) error {
- // Add a default cluster object to the
- // store. Don't check the error because
- // we expect this to fail unless this
- // is a brand new cluster.
- clusterObj := defaultClusterObject(
- clusterID,
- initialCAConfig,
- raftCfg,
- api.EncryptionConfig{AutoLockManagers: m.config.AutoLockManagers},
- unlockKeys,
- rootCA,
- m.config.FIPS,
- nil,
- 0,
- 0)
- // If defaultAddrPool is valid we update cluster object with new value
- // If VXLANUDPPort is not 0 then we call update cluster object with new value
- if m.config.NetworkConfig != nil {
- if m.config.NetworkConfig.DefaultAddrPool != nil {
- clusterObj.DefaultAddressPool = m.config.NetworkConfig.DefaultAddrPool
- clusterObj.SubnetSize = m.config.NetworkConfig.SubnetSize
- }
- if m.config.NetworkConfig.VXLANUDPPort != 0 {
- clusterObj.VXLANUDPPort = m.config.NetworkConfig.VXLANUDPPort
- }
- }
- err := store.CreateCluster(tx, clusterObj)
- if err != nil && err != store.ErrExist {
- log.G(ctx).WithError(err).Errorf("error creating cluster object")
- }
- // Add Node entry for ourself, if one
- // doesn't exist already.
- freshCluster := nil == store.CreateNode(tx, managerNode(nodeID, m.config.Availability, clusterObj.VXLANUDPPort))
- if freshCluster {
- // This is a fresh swarm cluster. Add to store now any initial
- // cluster resource, like the default ingress network which
- // provides the routing mesh for this cluster.
- log.G(ctx).Info("Creating default ingress network")
- if err := store.CreateNetwork(tx, newIngressNetwork()); err != nil {
- log.G(ctx).WithError(err).Error("failed to create default ingress network")
- }
- }
- // Create now the static predefined if the store does not contain predefined
- // networks like bridge/host node-local networks which
- // are known to be present in each cluster node. This is needed
- // in order to allow running services on the predefined docker
- // networks like `bridge` and `host`.
- for _, p := range allocator.PredefinedNetworks() {
- if err := store.CreateNetwork(tx, newPredefinedNetwork(p.Name, p.Driver)); err != nil && err != store.ErrNameConflict {
- log.G(ctx).WithError(err).Error("failed to create predefined network " + p.Name)
- }
- }
- return nil
- })
- m.replicatedOrchestrator = replicated.NewReplicatedOrchestrator(s)
- m.constraintEnforcer = constraintenforcer.New(s)
- m.globalOrchestrator = global.NewGlobalOrchestrator(s)
- m.taskReaper = taskreaper.New(s)
- m.scheduler = scheduler.New(s)
- m.keyManager = keymanager.New(s, keymanager.DefaultConfig())
- m.roleManager = newRoleManager(s, m.raftNode)
- // TODO(stevvooe): Allocate a context that can be used to
- // shutdown underlying manager processes when leadership isTestUpdaterRollback
- // lost.
- // If DefaultAddrPool is null, Read from store and check if
- // DefaultAddrPool info is stored in cluster object
- // If VXLANUDPPort is 0, read it from the store - cluster object
- if m.config.NetworkConfig == nil || m.config.NetworkConfig.DefaultAddrPool == nil || m.config.NetworkConfig.VXLANUDPPort == 0 {
- var cluster *api.Cluster
- s.View(func(tx store.ReadTx) {
- cluster = store.GetCluster(tx, clusterID)
- })
- if cluster.DefaultAddressPool != nil {
- if m.config.NetworkConfig == nil {
- m.config.NetworkConfig = &cnmallocator.NetworkConfig{}
- }
- m.config.NetworkConfig.DefaultAddrPool = append(m.config.NetworkConfig.DefaultAddrPool, cluster.DefaultAddressPool...)
- m.config.NetworkConfig.SubnetSize = cluster.SubnetSize
- }
- if cluster.VXLANUDPPort != 0 {
- if m.config.NetworkConfig == nil {
- m.config.NetworkConfig = &cnmallocator.NetworkConfig{}
- }
- m.config.NetworkConfig.VXLANUDPPort = cluster.VXLANUDPPort
- }
- }
- m.allocator, err = allocator.New(s, m.config.PluginGetter, m.config.NetworkConfig)
- if err != nil {
- log.G(ctx).WithError(err).Error("failed to create allocator")
- // TODO(stevvooe): It doesn't seem correct here to fail
- // creating the allocator but then use it anyway.
- }
- if m.keyManager != nil {
- go func(keyManager *keymanager.KeyManager) {
- if err := keyManager.Run(ctx); err != nil {
- log.G(ctx).WithError(err).Error("keymanager failed with an error")
- }
- }(m.keyManager)
- }
- go func(d *dispatcher.Dispatcher) {
- // Initialize the dispatcher.
- d.Init(m.raftNode, dispatcher.DefaultConfig(), drivers.New(m.config.PluginGetter), m.config.SecurityConfig)
- if err := d.Run(ctx); err != nil {
- log.G(ctx).WithError(err).Error("Dispatcher exited with an error")
- }
- }(m.dispatcher)
- if err := m.logbroker.Start(ctx); err != nil {
- log.G(ctx).WithError(err).Error("LogBroker failed to start")
- }
- go func(server *ca.Server) {
- if err := server.Run(ctx); err != nil {
- log.G(ctx).WithError(err).Error("CA signer exited with an error")
- }
- }(m.caserver)
- // Start all sub-components in separate goroutines.
- // TODO(aluzzardi): This should have some kind of error handling so that
- // any component that goes down would bring the entire manager down.
- if m.allocator != nil {
- go func(allocator *allocator.Allocator) {
- if err := allocator.Run(ctx); err != nil {
- log.G(ctx).WithError(err).Error("allocator exited with an error")
- }
- }(m.allocator)
- }
- go func(scheduler *scheduler.Scheduler) {
- if err := scheduler.Run(ctx); err != nil {
- log.G(ctx).WithError(err).Error("scheduler exited with an error")
- }
- }(m.scheduler)
- go func(constraintEnforcer *constraintenforcer.ConstraintEnforcer) {
- constraintEnforcer.Run()
- }(m.constraintEnforcer)
- go func(taskReaper *taskreaper.TaskReaper) {
- taskReaper.Run(ctx)
- }(m.taskReaper)
- go func(orchestrator *replicated.Orchestrator) {
- if err := orchestrator.Run(ctx); err != nil {
- log.G(ctx).WithError(err).Error("replicated orchestrator exited with an error")
- }
- }(m.replicatedOrchestrator)
- go func(globalOrchestrator *global.Orchestrator) {
- if err := globalOrchestrator.Run(ctx); err != nil {
- log.G(ctx).WithError(err).Error("global orchestrator exited with an error")
- }
- }(m.globalOrchestrator)
- go func(roleManager *roleManager) {
- roleManager.Run(ctx)
- }(m.roleManager)
- }
- // becomeFollower shuts down the subsystems that are only run by the leader.
- func (m *Manager) becomeFollower() {
- // The following components are gRPC services that are
- // registered when creating the manager and will need
- // to be re-registered if they are recreated.
- // For simplicity, they are not nilled out.
- m.dispatcher.Stop()
- m.logbroker.Stop()
- m.caserver.Stop()
- if m.allocator != nil {
- m.allocator.Stop()
- m.allocator = nil
- }
- m.constraintEnforcer.Stop()
- m.constraintEnforcer = nil
- m.replicatedOrchestrator.Stop()
- m.replicatedOrchestrator = nil
- m.globalOrchestrator.Stop()
- m.globalOrchestrator = nil
- m.taskReaper.Stop()
- m.taskReaper = nil
- m.scheduler.Stop()
- m.scheduler = nil
- m.roleManager.Stop()
- m.roleManager = nil
- if m.keyManager != nil {
- m.keyManager.Stop()
- m.keyManager = nil
- }
- }
- // defaultClusterObject creates a default cluster.
- func defaultClusterObject(
- clusterID string,
- initialCAConfig api.CAConfig,
- raftCfg api.RaftConfig,
- encryptionConfig api.EncryptionConfig,
- initialUnlockKeys []*api.EncryptionKey,
- rootCA *ca.RootCA,
- fips bool,
- defaultAddressPool []string,
- subnetSize uint32,
- vxlanUDPPort uint32) *api.Cluster {
- var caKey []byte
- if rcaSigner, err := rootCA.Signer(); err == nil {
- caKey = rcaSigner.Key
- }
- return &api.Cluster{
- ID: clusterID,
- Spec: api.ClusterSpec{
- Annotations: api.Annotations{
- Name: store.DefaultClusterName,
- },
- Orchestration: api.OrchestrationConfig{
- TaskHistoryRetentionLimit: defaultTaskHistoryRetentionLimit,
- },
- Dispatcher: api.DispatcherConfig{
- HeartbeatPeriod: gogotypes.DurationProto(dispatcher.DefaultHeartBeatPeriod),
- },
- Raft: raftCfg,
- CAConfig: initialCAConfig,
- EncryptionConfig: encryptionConfig,
- },
- RootCA: api.RootCA{
- CAKey: caKey,
- CACert: rootCA.Certs,
- CACertHash: rootCA.Digest.String(),
- JoinTokens: api.JoinTokens{
- Worker: ca.GenerateJoinToken(rootCA, fips),
- Manager: ca.GenerateJoinToken(rootCA, fips),
- },
- },
- UnlockKeys: initialUnlockKeys,
- FIPS: fips,
- DefaultAddressPool: defaultAddressPool,
- SubnetSize: subnetSize,
- VXLANUDPPort: vxlanUDPPort,
- }
- }
- // managerNode creates a new node with NodeRoleManager role.
- func managerNode(nodeID string, availability api.NodeSpec_Availability, vxlanPort uint32) *api.Node {
- return &api.Node{
- ID: nodeID,
- Certificate: api.Certificate{
- CN: nodeID,
- Role: api.NodeRoleManager,
- Status: api.IssuanceStatus{
- State: api.IssuanceStateIssued,
- },
- },
- Spec: api.NodeSpec{
- DesiredRole: api.NodeRoleManager,
- Membership: api.NodeMembershipAccepted,
- Availability: availability,
- },
- VXLANUDPPort: vxlanPort,
- }
- }
- // newIngressNetwork returns the network object for the default ingress
- // network, the network which provides the routing mesh. Caller will save to
- // store this object once, at fresh cluster creation. It is expected to
- // call this function inside a store update transaction.
- func newIngressNetwork() *api.Network {
- return &api.Network{
- ID: identity.NewID(),
- Spec: api.NetworkSpec{
- Ingress: true,
- Annotations: api.Annotations{
- Name: "ingress",
- },
- DriverConfig: &api.Driver{},
- IPAM: &api.IPAMOptions{
- Driver: &api.Driver{},
- Configs: []*api.IPAMConfig{
- {
- Subnet: "10.255.0.0/16",
- },
- },
- },
- },
- }
- }
- // Creates a network object representing one of the predefined networks
- // known to be statically created on the cluster nodes. These objects
- // are populated in the store at cluster creation solely in order to
- // support running services on the nodes' predefined networks.
- // External clients can filter these predefined networks by looking
- // at the predefined label.
- func newPredefinedNetwork(name, driver string) *api.Network {
- return &api.Network{
- ID: identity.NewID(),
- Spec: api.NetworkSpec{
- Annotations: api.Annotations{
- Name: name,
- Labels: map[string]string{
- networkallocator.PredefinedLabel: "true",
- },
- },
- DriverConfig: &api.Driver{Name: driver},
- },
- }
- }
|