2016-06-14 02:52:49 +00:00
package cluster
2016-11-16 22:17:18 +00:00
//
// ## Swarmkit integration
//
// Cluster - static configurable object for accessing everything swarm related.
// Contains methods for connecting and controlling the cluster. Exists always,
// even if swarm mode is not enabled.
//
// NodeRunner - Manager for starting the swarmkit node. Is present only and
// always if swarm mode is enabled. Implements backoff restart loop in case of
// errors.
//
// NodeState - Information about the current node status including access to
// gRPC clients if a manager is active.
//
// ### Locking
//
// `cluster.controlMutex` - taken for the whole lifecycle of the processes that
// can reconfigure cluster(init/join/leave etc). Protects that one
// reconfiguration action has fully completed before another can start.
//
// `cluster.mu` - taken when the actual changes in cluster configurations
// happen. Different from `controlMutex` because in some cases we need to
// access current cluster state even if the long-running reconfiguration is
// going on. For example network stack may ask for the current cluster state in
// the middle of the shutdown. Any time current cluster state is asked you
// should take the read lock of `cluster.mu`. If you are writing an API
// responder that returns synchronously, hold `cluster.mu.RLock()` for the
// duration of the whole handler function. That ensures that node will not be
// shut down until the handler has finished.
//
// NodeRunner implements its internal locks that should not be used outside of
// the struct. Instead, you should just call `nodeRunner.State()` method to get
// the current state of the cluster(still need `cluster.mu.RLock()` to access
// `cluster.nr` reference itself). Most of the changes in NodeRunner happen
// because of an external event(network problem, unexpected swarmkit error) and
// Docker shouldn't take any locks that delay these changes from happening.
//
2016-06-14 02:52:49 +00:00
import (
"fmt"
2016-07-01 01:07:35 +00:00
"net"
2016-06-14 02:52:49 +00:00
"os"
"path/filepath"
"sync"
"time"
2016-08-23 23:50:15 +00:00
"github.com/docker/docker/api/types/network"
2016-09-06 18:18:12 +00:00
types "github.com/docker/docker/api/types/swarm"
2017-06-07 17:07:01 +00:00
"github.com/docker/docker/daemon/cluster/controllers/plugin"
2016-06-14 02:52:49 +00:00
executorpkg "github.com/docker/docker/daemon/cluster/executor"
2016-08-30 21:17:32 +00:00
"github.com/docker/docker/pkg/signal"
2017-04-30 21:51:43 +00:00
lncluster "github.com/docker/libnetwork/cluster"
2016-06-14 02:52:49 +00:00
swarmapi "github.com/docker/swarmkit/api"
2016-10-20 18:26:04 +00:00
swarmnode "github.com/docker/swarmkit/node"
2016-10-22 01:07:55 +00:00
"github.com/pkg/errors"
2017-07-26 21:42:13 +00:00
"github.com/sirupsen/logrus"
2016-06-14 02:52:49 +00:00
"golang.org/x/net/context"
)
const swarmDirName = "swarm"
const controlSocket = "control.sock"
2016-06-16 16:42:22 +00:00
const swarmConnectTimeout = 20 * time . Second
2016-07-15 17:58:21 +00:00
const swarmRequestTimeout = 20 * time . Second
2016-06-14 02:52:49 +00:00
const stateFile = "docker-state.json"
2016-06-21 21:27:04 +00:00
const defaultAddr = "0.0.0.0:2377"
2016-06-14 02:52:49 +00:00
const (
initialReconnectDelay = 100 * time . Millisecond
2016-06-16 16:42:22 +00:00
maxReconnectDelay = 30 * time . Second
2016-10-26 08:17:31 +00:00
contextPrefix = "com.docker.swarm"
2016-06-14 02:52:49 +00:00
)
2016-07-01 01:07:35 +00:00
// NetworkSubnetsProvider exposes functions for retrieving the subnets
// of networks managed by Docker, so they can be filtered.
type NetworkSubnetsProvider interface {
2017-02-28 09:51:40 +00:00
Subnets ( ) ( [ ] net . IPNet , [ ] net . IPNet )
2016-06-14 02:52:49 +00:00
}
// Config provides values for Cluster.
type Config struct {
2016-07-01 01:07:35 +00:00
Root string
Name string
Backend executorpkg . Backend
2017-06-07 17:07:01 +00:00
PluginBackend plugin . Backend
2016-07-01 01:07:35 +00:00
NetworkSubnetsProvider NetworkSubnetsProvider
// DefaultAdvertiseAddr is the default host/IP or network interface to use
// if no AdvertiseAddr value is specified.
DefaultAdvertiseAddr string
2016-08-19 20:06:28 +00:00
// path to store runtime state, such as the swarm control socket
RuntimeRoot string
2017-04-02 22:21:56 +00:00
// WatchStream is a channel to pass watch API notifications to daemon
WatchStream chan * swarmapi . WatchMessage
2016-06-14 02:52:49 +00:00
}
2016-06-24 18:52:28 +00:00
// Cluster provides capabilities to participate in a cluster as a worker or a
// manager.
2016-06-14 02:52:49 +00:00
type Cluster struct {
2016-11-16 22:17:18 +00:00
mu sync . RWMutex
controlMutex sync . RWMutex // protect init/join/leave user operations
nr * nodeRunner
root string
runtimeRoot string
config Config
2017-04-30 21:51:43 +00:00
configEvent chan lncluster . ConfigEventType // todo: make this array and goroutine safe
2016-11-16 22:17:18 +00:00
attachers map [ string ] * attacher
2017-04-02 22:21:56 +00:00
watchStream chan * swarmapi . WatchMessage
2016-08-23 23:50:15 +00:00
}
// attacher manages the in-memory attachment state of a container
// attachment to a global scope network managed by swarm manager. It
// helps in identifying the attachment ID via the taskID and the
// corresponding attachment configuration obtained from the manager.
type attacher struct {
2016-09-09 16:55:57 +00:00
taskID string
config * network . NetworkingConfig
2017-03-24 13:43:23 +00:00
inProgress bool
2016-09-09 16:55:57 +00:00
attachWaitCh chan * network . NetworkingConfig
attachCompleteCh chan struct { }
detachWaitCh chan struct { }
2016-06-20 23:35:33 +00:00
}
2016-06-14 02:52:49 +00:00
// New creates a new Cluster instance using provided config.
func New ( config Config ) ( * Cluster , error ) {
root := filepath . Join ( config . Root , swarmDirName )
if err := os . MkdirAll ( root , 0700 ) ; err != nil {
return nil , err
}
2016-08-19 20:06:28 +00:00
if config . RuntimeRoot == "" {
config . RuntimeRoot = root
}
if err := os . MkdirAll ( config . RuntimeRoot , 0700 ) ; err != nil {
return nil , err
}
2016-06-14 02:52:49 +00:00
c := & Cluster {
2016-06-20 23:35:33 +00:00
root : root ,
config : config ,
2017-04-30 21:51:43 +00:00
configEvent : make ( chan lncluster . ConfigEventType , 10 ) ,
2016-08-19 20:06:28 +00:00
runtimeRoot : config . RuntimeRoot ,
2016-08-23 23:50:15 +00:00
attachers : make ( map [ string ] * attacher ) ,
2017-04-02 22:21:56 +00:00
watchStream : config . WatchStream ,
2016-06-14 02:52:49 +00:00
}
2017-04-30 21:51:43 +00:00
return c , nil
}
// Start the Cluster instance
// TODO The split between New and Start can be join again when the SendClusterEvent
// method is no longer required
func ( c * Cluster ) Start ( ) error {
root := filepath . Join ( c . config . Root , swarmDirName )
2016-06-14 02:52:49 +00:00
2016-11-16 22:17:18 +00:00
nodeConfig , err := loadPersistentState ( root )
2016-06-14 02:52:49 +00:00
if err != nil {
if os . IsNotExist ( err ) {
2017-04-30 21:51:43 +00:00
return nil
2016-06-14 02:52:49 +00:00
}
2017-04-30 21:51:43 +00:00
return err
2016-06-14 02:52:49 +00:00
}
2016-11-16 22:17:18 +00:00
nr , err := c . newNodeRunner ( * nodeConfig )
2016-06-14 02:52:49 +00:00
if err != nil {
2017-04-30 21:51:43 +00:00
return err
2016-06-14 02:52:49 +00:00
}
2016-11-16 22:17:18 +00:00
c . nr = nr
2016-06-14 02:52:49 +00:00
select {
case <- time . After ( swarmConnectTimeout ) :
2016-11-01 04:05:01 +00:00
logrus . Error ( "swarm component could not be started before timeout was reached" )
2016-11-16 22:17:18 +00:00
case err := <- nr . Ready ( ) :
2016-06-14 02:52:49 +00:00
if err != nil {
2017-03-08 00:50:39 +00:00
logrus . WithError ( err ) . Error ( "swarm component could not be started" )
2017-04-30 21:51:43 +00:00
return nil
2016-06-14 02:52:49 +00:00
}
}
2017-04-30 21:51:43 +00:00
return nil
2016-06-14 02:52:49 +00:00
}
2016-11-16 22:17:18 +00:00
func ( c * Cluster ) newNodeRunner ( conf nodeStartConfig ) ( * nodeRunner , error ) {
2016-06-14 16:13:53 +00:00
if err := c . config . Backend . IsSwarmCompatible ( ) ; err != nil {
2016-06-20 23:35:33 +00:00
return nil , err
2016-06-14 02:52:49 +00:00
}
2016-07-01 01:07:35 +00:00
2016-10-21 20:31:45 +00:00
actualLocalAddr := conf . LocalAddr
2016-07-01 01:07:35 +00:00
if actualLocalAddr == "" {
// If localAddr was not specified, resolve it automatically
// based on the route to joinAddr. localAddr can only be left
// empty on "join".
2016-10-21 20:31:45 +00:00
listenHost , _ , err := net . SplitHostPort ( conf . ListenAddr )
2016-07-01 01:07:35 +00:00
if err != nil {
return nil , fmt . Errorf ( "could not parse listen address: %v" , err )
}
listenAddrIP := net . ParseIP ( listenHost )
if listenAddrIP == nil || ! listenAddrIP . IsUnspecified ( ) {
actualLocalAddr = listenHost
} else {
2016-10-21 20:31:45 +00:00
if conf . RemoteAddr == "" {
2016-07-01 01:07:35 +00:00
// Should never happen except using swarms created by
// old versions that didn't save remoteAddr.
2016-10-21 20:31:45 +00:00
conf . RemoteAddr = "8.8.8.8:53"
2016-07-01 01:07:35 +00:00
}
2016-10-21 20:31:45 +00:00
conn , err := net . Dial ( "udp" , conf . RemoteAddr )
2016-07-01 01:07:35 +00:00
if err != nil {
return nil , fmt . Errorf ( "could not find local IP address: %v" , err )
}
localHostPort := conn . LocalAddr ( ) . String ( )
actualLocalAddr , _ , _ = net . SplitHostPort ( localHostPort )
conn . Close ( )
}
}
2016-11-16 22:17:18 +00:00
nr := & nodeRunner { cluster : c }
nr . actualLocalAddr = actualLocalAddr
2016-10-22 01:07:55 +00:00
2016-11-16 22:17:18 +00:00
if err := nr . Start ( conf ) ; err != nil {
2016-06-20 23:35:33 +00:00
return nil , err
2016-06-14 02:52:49 +00:00
}
2016-07-01 01:07:35 +00:00
2017-01-14 04:14:03 +00:00
c . config . Backend . DaemonJoinsCluster ( c )
2016-06-14 02:52:49 +00:00
2016-11-16 22:17:18 +00:00
return nr , nil
2016-06-14 02:52:49 +00:00
}
2016-07-15 17:58:21 +00:00
func ( c * Cluster ) getRequestContext ( ) ( context . Context , func ( ) ) { // TODO: not needed when requests don't block on qourum lost
return context . WithTimeout ( context . Background ( ) , swarmRequestTimeout )
2016-06-14 02:52:49 +00:00
}
2016-06-24 18:52:28 +00:00
// IsManager returns true if Cluster is participating as a manager.
2016-06-14 02:52:49 +00:00
func ( c * Cluster ) IsManager ( ) bool {
2016-11-16 22:17:18 +00:00
c . mu . RLock ( )
defer c . mu . RUnlock ( )
return c . currentNodeState ( ) . IsActiveManager ( )
2016-06-14 02:52:49 +00:00
}
2016-06-24 18:52:28 +00:00
// IsAgent returns true if Cluster is participating as a worker/agent.
2016-06-14 02:52:49 +00:00
func ( c * Cluster ) IsAgent ( ) bool {
2016-11-16 22:17:18 +00:00
c . mu . RLock ( )
defer c . mu . RUnlock ( )
return c . currentNodeState ( ) . status == types . LocalNodeStateActive
2016-06-14 02:52:49 +00:00
}
2016-07-01 01:07:35 +00:00
// GetLocalAddress returns the local address.
func ( c * Cluster ) GetLocalAddress ( ) string {
2016-11-16 22:17:18 +00:00
c . mu . RLock ( )
defer c . mu . RUnlock ( )
return c . currentNodeState ( ) . actualLocalAddr
2016-07-01 01:07:35 +00:00
}
2016-09-23 01:43:54 +00:00
// GetListenAddress returns the listen address.
func ( c * Cluster ) GetListenAddress ( ) string {
2016-11-16 22:17:18 +00:00
c . mu . RLock ( )
defer c . mu . RUnlock ( )
if c . nr != nil {
return c . nr . config . ListenAddr
2016-10-21 20:31:45 +00:00
}
return ""
2016-09-23 01:43:54 +00:00
}
2016-07-01 01:07:35 +00:00
// GetAdvertiseAddress returns the remotely reachable address of this node.
func ( c * Cluster ) GetAdvertiseAddress ( ) string {
2016-11-16 22:17:18 +00:00
c . mu . RLock ( )
defer c . mu . RUnlock ( )
if c . nr != nil && c . nr . config . AdvertiseAddr != "" {
advertiseHost , _ , _ := net . SplitHostPort ( c . nr . config . AdvertiseAddr )
2016-07-01 01:07:35 +00:00
return advertiseHost
2016-06-14 02:52:49 +00:00
}
2016-11-16 22:17:18 +00:00
return c . currentNodeState ( ) . actualLocalAddr
2016-06-14 02:52:49 +00:00
}
2017-04-14 23:54:17 +00:00
// GetDataPathAddress returns the address to be used for the data path traffic, if specified.
func ( c * Cluster ) GetDataPathAddress ( ) string {
c . mu . RLock ( )
defer c . mu . RUnlock ( )
if c . nr != nil {
return c . nr . config . DataPathAddr
}
return ""
}
2017-04-28 00:06:16 +00:00
// GetRemoteAddressList returns the advertise address for each of the remote managers if
2016-06-14 02:52:49 +00:00
// available.
2017-04-28 00:06:16 +00:00
func ( c * Cluster ) GetRemoteAddressList ( ) [ ] string {
2016-11-16 22:17:18 +00:00
c . mu . RLock ( )
defer c . mu . RUnlock ( )
2017-04-28 00:06:16 +00:00
return c . getRemoteAddressList ( )
2016-06-14 02:52:49 +00:00
}
2017-04-28 00:06:16 +00:00
func ( c * Cluster ) getRemoteAddressList ( ) [ ] string {
2016-11-16 22:17:18 +00:00
state := c . currentNodeState ( )
if state . swarmNode == nil {
2017-04-28 00:06:16 +00:00
return [ ] string { }
2016-06-14 02:52:49 +00:00
}
2017-04-28 00:06:16 +00:00
2016-11-16 22:17:18 +00:00
nodeID := state . swarmNode . NodeID ( )
2017-04-28 00:06:16 +00:00
remotes := state . swarmNode . Remotes ( )
addressList := make ( [ ] string , 0 , len ( remotes ) )
for _ , r := range remotes {
2016-06-14 02:52:49 +00:00
if r . NodeID != nodeID {
2017-04-28 00:06:16 +00:00
addressList = append ( addressList , r . Addr )
2016-06-14 02:52:49 +00:00
}
}
2017-04-28 00:06:16 +00:00
return addressList
2016-06-14 02:52:49 +00:00
}
// ListenClusterEvents returns a channel that receives messages on cluster
// participation changes.
// todo: make cancelable and accessible to multiple callers
2017-04-30 21:51:43 +00:00
func ( c * Cluster ) ListenClusterEvents ( ) <- chan lncluster . ConfigEventType {
2016-06-14 02:52:49 +00:00
return c . configEvent
}
2016-11-16 22:17:18 +00:00
// currentNodeState should not be called without a read lock
func ( c * Cluster ) currentNodeState ( ) nodeState {
return c . nr . State ( )
2016-11-09 02:03:47 +00:00
}
2016-06-23 20:52:41 +00:00
// errNoManager returns error describing why manager commands can't be used.
// Call with read lock.
2016-11-16 22:17:18 +00:00
func ( c * Cluster ) errNoManager ( st nodeState ) error {
if st . swarmNode == nil {
2016-12-02 09:14:32 +00:00
if errors . Cause ( st . err ) == errSwarmLocked {
return errSwarmLocked
2016-10-22 01:07:55 +00:00
}
2016-12-02 09:14:32 +00:00
if st . err == errSwarmCertificatesExpired {
return errSwarmCertificatesExpired
2016-11-09 02:03:47 +00:00
}
2017-07-19 14:20:13 +00:00
return errors . WithStack ( notAvailableError ( "This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again." ) )
2016-06-23 20:52:41 +00:00
}
2016-11-16 22:17:18 +00:00
if st . swarmNode . Manager ( ) != nil {
2017-07-19 14:20:13 +00:00
return errors . WithStack ( notAvailableError ( "This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster." ) )
2016-06-23 20:52:41 +00:00
}
2017-07-19 14:20:13 +00:00
return errors . WithStack ( notAvailableError ( "This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager." ) )
2016-06-23 20:52:41 +00:00
}
2016-06-14 02:52:49 +00:00
// Cleanup stops active swarm node. This is run before daemon shutdown.
func ( c * Cluster ) Cleanup ( ) {
2016-11-16 22:17:18 +00:00
c . controlMutex . Lock ( )
defer c . controlMutex . Unlock ( )
c . mu . Lock ( )
node := c . nr
2016-06-14 02:52:49 +00:00
if node == nil {
2016-11-16 22:17:18 +00:00
c . mu . Unlock ( )
2016-06-14 02:52:49 +00:00
return
}
2016-11-16 22:17:18 +00:00
state := c . currentNodeState ( )
2017-04-08 01:27:35 +00:00
c . mu . Unlock ( )
2016-11-16 22:17:18 +00:00
if state . IsActiveManager ( ) {
active , reachable , unreachable , err := managerStats ( state . controlClient , state . NodeID ( ) )
2016-06-14 02:52:49 +00:00
if err == nil {
2016-08-19 20:49:58 +00:00
singlenode := active && isLastManager ( reachable , unreachable )
if active && ! singlenode && removingManagerCausesLossOfQuorum ( reachable , unreachable ) {
2016-06-14 02:52:49 +00:00
logrus . Errorf ( "Leaving cluster with %v managers left out of %v. Raft quorum will be lost." , reachable - 1 , reachable + unreachable )
}
}
}
2017-04-08 01:27:35 +00:00
2016-11-16 22:17:18 +00:00
if err := node . Stop ( ) ; err != nil {
logrus . Errorf ( "failed to shut down cluster node: %v" , err )
signal . DumpStacks ( "" )
}
2017-04-08 01:27:35 +00:00
c . mu . Lock ( )
2016-11-16 22:17:18 +00:00
c . nr = nil
2017-04-08 01:27:35 +00:00
c . mu . Unlock ( )
2016-06-14 02:52:49 +00:00
}
2016-11-16 22:17:18 +00:00
func managerStats ( client swarmapi . ControlClient , currentNodeID string ) ( current bool , reachable int , unreachable int , err error ) {
2016-07-15 17:58:21 +00:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
2016-11-16 22:17:18 +00:00
nodes , err := client . ListNodes ( ctx , & swarmapi . ListNodesRequest { } )
2016-06-14 02:52:49 +00:00
if err != nil {
return false , 0 , 0 , err
}
for _ , n := range nodes . Nodes {
if n . ManagerStatus != nil {
2016-06-15 00:23:01 +00:00
if n . ManagerStatus . Reachability == swarmapi . RaftMemberStatus_REACHABLE {
2016-06-14 02:52:49 +00:00
reachable ++
2016-11-16 22:17:18 +00:00
if n . ID == currentNodeID {
2016-06-14 02:52:49 +00:00
current = true
}
}
2016-06-15 00:23:01 +00:00
if n . ManagerStatus . Reachability == swarmapi . RaftMemberStatus_UNREACHABLE {
2016-06-14 02:52:49 +00:00
unreachable ++
}
}
}
return
}
2016-10-22 01:07:55 +00:00
func detectLockedError ( err error ) error {
2016-10-28 01:50:49 +00:00
if err == swarmnode . ErrInvalidUnlockKey {
2016-12-02 09:14:32 +00:00
return errors . WithStack ( errSwarmLocked )
2016-10-22 01:07:55 +00:00
}
return err
}
2017-02-28 10:12:11 +00:00
func ( c * Cluster ) lockedManagerAction ( fn func ( ctx context . Context , state nodeState ) error ) error {
c . mu . RLock ( )
defer c . mu . RUnlock ( )
state := c . currentNodeState ( )
if ! state . IsActiveManager ( ) {
return c . errNoManager ( state )
}
ctx , cancel := c . getRequestContext ( )
defer cancel ( )
return fn ( ctx , state )
}
2017-04-30 21:51:43 +00:00
// SendClusterEvent allows to send cluster events on the configEvent channel
// TODO This method should not be exposed.
// Currently it is used to notify the network controller that the keys are
// available
func ( c * Cluster ) SendClusterEvent ( event lncluster . ConfigEventType ) {
c . mu . RLock ( )
defer c . mu . RUnlock ( )
c . configEvent <- event
}