|
@@ -7,6 +7,7 @@ import (
|
|
"net"
|
|
"net"
|
|
"os"
|
|
"os"
|
|
"path/filepath"
|
|
"path/filepath"
|
|
|
|
+ "runtime"
|
|
"sync"
|
|
"sync"
|
|
"syscall"
|
|
"syscall"
|
|
"time"
|
|
"time"
|
|
@@ -30,6 +31,7 @@ import (
|
|
"github.com/docker/swarmkit/manager/state/raft"
|
|
"github.com/docker/swarmkit/manager/state/raft"
|
|
"github.com/docker/swarmkit/manager/state/store"
|
|
"github.com/docker/swarmkit/manager/state/store"
|
|
"github.com/docker/swarmkit/protobuf/ptypes"
|
|
"github.com/docker/swarmkit/protobuf/ptypes"
|
|
|
|
+ "github.com/docker/swarmkit/xnet"
|
|
"github.com/pkg/errors"
|
|
"github.com/pkg/errors"
|
|
"golang.org/x/net/context"
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc"
|
|
@@ -40,6 +42,16 @@ const (
|
|
defaultTaskHistoryRetentionLimit = 5
|
|
defaultTaskHistoryRetentionLimit = 5
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+// RemoteAddrs provides an listening address and an optional advertise address
|
|
|
|
+// for serving the remote API.
|
|
|
|
+type RemoteAddrs struct {
|
|
|
|
+ // Address to bind
|
|
|
|
+ ListenAddr string
|
|
|
|
+
|
|
|
|
+ // Address to advertise to remote nodes (optional).
|
|
|
|
+ AdvertiseAddr string
|
|
|
|
+}
|
|
|
|
+
|
|
// Config is used to tune the Manager.
|
|
// Config is used to tune the Manager.
|
|
type Config struct {
|
|
type Config struct {
|
|
SecurityConfig *ca.SecurityConfig
|
|
SecurityConfig *ca.SecurityConfig
|
|
@@ -48,13 +60,12 @@ type Config struct {
|
|
// will make certificate signing requests for node certificates.
|
|
// will make certificate signing requests for node certificates.
|
|
ExternalCAs []*api.ExternalCA
|
|
ExternalCAs []*api.ExternalCA
|
|
|
|
|
|
- ProtoAddr map[string]string
|
|
|
|
- // ProtoListener will be used for grpc serving if it's not nil,
|
|
|
|
- // ProtoAddr fields will be used to create listeners otherwise.
|
|
|
|
- ProtoListener map[string]net.Listener
|
|
|
|
|
|
+ // ControlAPI is an address for serving the control API.
|
|
|
|
+ ControlAPI string
|
|
|
|
|
|
- // AdvertiseAddr is a map of addresses to advertise, by protocol.
|
|
|
|
- AdvertiseAddr string
|
|
|
|
|
|
+ // RemoteAPI is a listening address for serving the remote API, and
|
|
|
|
+ // an optional advertise address.
|
|
|
|
+ RemoteAPI RemoteAddrs
|
|
|
|
|
|
// JoinRaft is an optional address of a node in an existing raft
|
|
// JoinRaft is an optional address of a node in an existing raft
|
|
// cluster to join.
|
|
// cluster to join.
|
|
@@ -81,7 +92,7 @@ type Config struct {
|
|
// subsystems.
|
|
// subsystems.
|
|
type Manager struct {
|
|
type Manager struct {
|
|
config *Config
|
|
config *Config
|
|
- listeners map[string]net.Listener
|
|
|
|
|
|
+ listeners []net.Listener
|
|
|
|
|
|
caserver *ca.Server
|
|
caserver *ca.Server
|
|
dispatcher *dispatcher.Dispatcher
|
|
dispatcher *dispatcher.Dispatcher
|
|
@@ -96,10 +107,11 @@ type Manager struct {
|
|
localserver *grpc.Server
|
|
localserver *grpc.Server
|
|
raftNode *raft.Node
|
|
raftNode *raft.Node
|
|
|
|
|
|
- mu sync.Mutex
|
|
|
|
|
|
+ cancelFunc context.CancelFunc
|
|
|
|
|
|
|
|
+ mu sync.Mutex
|
|
started chan struct{}
|
|
started chan struct{}
|
|
- stopped chan struct{}
|
|
|
|
|
|
+ stopped bool
|
|
}
|
|
}
|
|
|
|
|
|
type closeOnceListener struct {
|
|
type closeOnceListener struct {
|
|
@@ -119,41 +131,28 @@ func (l *closeOnceListener) Close() error {
|
|
func New(config *Config) (*Manager, error) {
|
|
func New(config *Config) (*Manager, error) {
|
|
dispatcherConfig := dispatcher.DefaultConfig()
|
|
dispatcherConfig := dispatcher.DefaultConfig()
|
|
|
|
|
|
- if config.ProtoAddr == nil {
|
|
|
|
- config.ProtoAddr = make(map[string]string)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if config.ProtoListener != nil && config.ProtoListener["tcp"] != nil {
|
|
|
|
- config.ProtoAddr["tcp"] = config.ProtoListener["tcp"].Addr().String()
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
// If an AdvertiseAddr was specified, we use that as our
|
|
// If an AdvertiseAddr was specified, we use that as our
|
|
// externally-reachable address.
|
|
// externally-reachable address.
|
|
- tcpAddr := config.AdvertiseAddr
|
|
|
|
|
|
+ advertiseAddr := config.RemoteAPI.AdvertiseAddr
|
|
|
|
|
|
- var tcpAddrPort string
|
|
|
|
- if tcpAddr == "" {
|
|
|
|
|
|
+ var advertiseAddrPort string
|
|
|
|
+ if advertiseAddr == "" {
|
|
// Otherwise, we know we are joining an existing swarm. Use a
|
|
// Otherwise, we know we are joining an existing swarm. Use a
|
|
// wildcard address to trigger remote autodetection of our
|
|
// wildcard address to trigger remote autodetection of our
|
|
// address.
|
|
// address.
|
|
var err error
|
|
var err error
|
|
- _, tcpAddrPort, err = net.SplitHostPort(config.ProtoAddr["tcp"])
|
|
|
|
|
|
+ _, advertiseAddrPort, err = net.SplitHostPort(config.RemoteAPI.ListenAddr)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return nil, fmt.Errorf("missing or invalid listen address %s", config.ProtoAddr["tcp"])
|
|
|
|
|
|
+ return nil, fmt.Errorf("missing or invalid listen address %s", config.RemoteAPI.ListenAddr)
|
|
}
|
|
}
|
|
|
|
|
|
// Even with an IPv6 listening address, it's okay to use
|
|
// Even with an IPv6 listening address, it's okay to use
|
|
// 0.0.0.0 here. Any "unspecified" (wildcard) IP will
|
|
// 0.0.0.0 here. Any "unspecified" (wildcard) IP will
|
|
// be substituted with the actual source address.
|
|
// be substituted with the actual source address.
|
|
- tcpAddr = net.JoinHostPort("0.0.0.0", tcpAddrPort)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- err := os.MkdirAll(filepath.Dir(config.ProtoAddr["unix"]), 0700)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, errors.Wrap(err, "failed to create socket directory")
|
|
|
|
|
|
+ advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort)
|
|
}
|
|
}
|
|
|
|
|
|
- err = os.MkdirAll(config.StateDir, 0700)
|
|
|
|
|
|
+ err := os.MkdirAll(config.StateDir, 0700)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create state directory")
|
|
return nil, errors.Wrap(err, "failed to create state directory")
|
|
}
|
|
}
|
|
@@ -164,41 +163,49 @@ func New(config *Config) (*Manager, error) {
|
|
return nil, errors.Wrap(err, "failed to create raft state directory")
|
|
return nil, errors.Wrap(err, "failed to create raft state directory")
|
|
}
|
|
}
|
|
|
|
|
|
- var listeners map[string]net.Listener
|
|
|
|
- if len(config.ProtoListener) > 0 {
|
|
|
|
- listeners = config.ProtoListener
|
|
|
|
- } else {
|
|
|
|
- listeners = make(map[string]net.Listener)
|
|
|
|
|
|
+ var listeners []net.Listener
|
|
|
|
|
|
- for proto, addr := range config.ProtoAddr {
|
|
|
|
- l, err := net.Listen(proto, addr)
|
|
|
|
|
|
+ // don't create a socket directory if we're on windows. we used named pipe
|
|
|
|
+ if runtime.GOOS != "windows" {
|
|
|
|
+ err := os.MkdirAll(filepath.Dir(config.ControlAPI), 0700)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, errors.Wrap(err, "failed to create socket directory")
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- // A unix socket may fail to bind if the file already
|
|
|
|
- // exists. Try replacing the file.
|
|
|
|
- unwrappedErr := err
|
|
|
|
- if op, ok := unwrappedErr.(*net.OpError); ok {
|
|
|
|
- unwrappedErr = op.Err
|
|
|
|
- }
|
|
|
|
- if sys, ok := unwrappedErr.(*os.SyscallError); ok {
|
|
|
|
- unwrappedErr = sys.Err
|
|
|
|
- }
|
|
|
|
- if proto == "unix" && unwrappedErr == syscall.EADDRINUSE {
|
|
|
|
- os.Remove(addr)
|
|
|
|
- l, err = net.Listen(proto, addr)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- } else if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
- if proto == "tcp" && tcpAddrPort == "0" {
|
|
|
|
- // in case of 0 port
|
|
|
|
- tcpAddr = l.Addr().String()
|
|
|
|
- }
|
|
|
|
- listeners[proto] = l
|
|
|
|
|
|
+ l, err := xnet.ListenLocal(config.ControlAPI)
|
|
|
|
+
|
|
|
|
+ // A unix socket may fail to bind if the file already
|
|
|
|
+ // exists. Try replacing the file.
|
|
|
|
+ if runtime.GOOS != "windows" {
|
|
|
|
+ unwrappedErr := err
|
|
|
|
+ if op, ok := unwrappedErr.(*net.OpError); ok {
|
|
|
|
+ unwrappedErr = op.Err
|
|
|
|
+ }
|
|
|
|
+ if sys, ok := unwrappedErr.(*os.SyscallError); ok {
|
|
|
|
+ unwrappedErr = sys.Err
|
|
}
|
|
}
|
|
|
|
+ if unwrappedErr == syscall.EADDRINUSE {
|
|
|
|
+ os.Remove(config.ControlAPI)
|
|
|
|
+ l, err = xnet.ListenLocal(config.ControlAPI)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, errors.Wrap(err, "failed to listen on control API address")
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ listeners = append(listeners, l)
|
|
|
|
+
|
|
|
|
+ l, err = net.Listen("tcp", config.RemoteAPI.ListenAddr)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, errors.Wrap(err, "failed to listen on remote API address")
|
|
|
|
+ }
|
|
|
|
+ if advertiseAddrPort == "0" {
|
|
|
|
+ advertiseAddr = l.Addr().String()
|
|
|
|
+ config.RemoteAPI.ListenAddr = advertiseAddr
|
|
|
|
+ }
|
|
|
|
+ listeners = append(listeners, l)
|
|
|
|
+
|
|
raftCfg := raft.DefaultNodeConfig()
|
|
raftCfg := raft.DefaultNodeConfig()
|
|
|
|
|
|
if config.ElectionTick > 0 {
|
|
if config.ElectionTick > 0 {
|
|
@@ -210,7 +217,7 @@ func New(config *Config) (*Manager, error) {
|
|
|
|
|
|
newNodeOpts := raft.NodeOptions{
|
|
newNodeOpts := raft.NodeOptions{
|
|
ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
|
|
ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
|
|
- Addr: tcpAddr,
|
|
|
|
|
|
+ Addr: advertiseAddr,
|
|
JoinAddr: config.JoinRaft,
|
|
JoinAddr: config.JoinRaft,
|
|
Config: raftCfg,
|
|
Config: raftCfg,
|
|
StateDir: raftStateDir,
|
|
StateDir: raftStateDir,
|
|
@@ -231,18 +238,14 @@ func New(config *Config) (*Manager, error) {
|
|
localserver: grpc.NewServer(opts...),
|
|
localserver: grpc.NewServer(opts...),
|
|
raftNode: raftNode,
|
|
raftNode: raftNode,
|
|
started: make(chan struct{}),
|
|
started: make(chan struct{}),
|
|
- stopped: make(chan struct{}),
|
|
|
|
}
|
|
}
|
|
|
|
|
|
return m, nil
|
|
return m, nil
|
|
}
|
|
}
|
|
|
|
|
|
// Addr returns tcp address on which remote api listens.
|
|
// Addr returns tcp address on which remote api listens.
|
|
-func (m *Manager) Addr() net.Addr {
|
|
|
|
- if l, ok := m.listeners["tcp"]; ok {
|
|
|
|
- return l.Addr()
|
|
|
|
- }
|
|
|
|
- return nil
|
|
|
|
|
|
+func (m *Manager) Addr() string {
|
|
|
|
+ return m.config.RemoteAPI.ListenAddr
|
|
}
|
|
}
|
|
|
|
|
|
// Run starts all manager sub-systems and the gRPC server at the configured
|
|
// Run starts all manager sub-systems and the gRPC server at the configured
|
|
@@ -252,14 +255,7 @@ func (m *Manager) Run(parent context.Context) error {
|
|
ctx, ctxCancel := context.WithCancel(parent)
|
|
ctx, ctxCancel := context.WithCancel(parent)
|
|
defer ctxCancel()
|
|
defer ctxCancel()
|
|
|
|
|
|
- // Harakiri.
|
|
|
|
- go func() {
|
|
|
|
- select {
|
|
|
|
- case <-ctx.Done():
|
|
|
|
- case <-m.stopped:
|
|
|
|
- ctxCancel()
|
|
|
|
- }
|
|
|
|
- }()
|
|
|
|
|
|
+ m.cancelFunc = ctxCancel
|
|
|
|
|
|
leadershipCh, cancel := m.raftNode.SubscribeLeadership()
|
|
leadershipCh, cancel := m.raftNode.SubscribeLeadership()
|
|
defer cancel()
|
|
defer cancel()
|
|
@@ -336,8 +332,8 @@ func (m *Manager) Run(parent context.Context) error {
|
|
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)
|
|
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)
|
|
|
|
|
|
errServe := make(chan error, len(m.listeners))
|
|
errServe := make(chan error, len(m.listeners))
|
|
- for proto, l := range m.listeners {
|
|
|
|
- go m.serveListener(ctx, errServe, proto, l)
|
|
|
|
|
|
+ for _, lis := range m.listeners {
|
|
|
|
+ go m.serveListener(ctx, errServe, lis)
|
|
}
|
|
}
|
|
|
|
|
|
defer func() {
|
|
defer func() {
|
|
@@ -383,24 +379,14 @@ func (m *Manager) Run(parent context.Context) error {
|
|
|
|
|
|
// wait for an error in serving.
|
|
// wait for an error in serving.
|
|
err = <-errServe
|
|
err = <-errServe
|
|
- select {
|
|
|
|
- // check to see if stopped was posted to. if so, we're in the process of
|
|
|
|
- // stopping, or done and that's why we got the error. if stopping is
|
|
|
|
- // deliberate, stopped will ALWAYS be closed before the error is trigger,
|
|
|
|
- // so this path will ALWAYS be taken if the stop was deliberate
|
|
|
|
- case <-m.stopped:
|
|
|
|
- // shutdown was requested, do not return an error
|
|
|
|
- // but first, we wait to acquire a mutex to guarantee that stopping is
|
|
|
|
- // finished. as long as we acquire the mutex BEFORE we return, we know
|
|
|
|
- // that stopping is stopped.
|
|
|
|
- m.mu.Lock()
|
|
|
|
|
|
+ m.mu.Lock()
|
|
|
|
+ if m.stopped {
|
|
m.mu.Unlock()
|
|
m.mu.Unlock()
|
|
return nil
|
|
return nil
|
|
- // otherwise, we'll get something from errServe, which indicates that an
|
|
|
|
- // error in serving has actually occurred and this isn't a planned shutdown
|
|
|
|
- default:
|
|
|
|
- return err
|
|
|
|
}
|
|
}
|
|
|
|
+ m.mu.Unlock()
|
|
|
|
+ m.Stop(ctx)
|
|
|
|
+ return err
|
|
}
|
|
}
|
|
|
|
|
|
const stopTimeout = 8 * time.Second
|
|
const stopTimeout = 8 * time.Second
|
|
@@ -417,13 +403,10 @@ func (m *Manager) Stop(ctx context.Context) {
|
|
// from returning before we've finished stopping.
|
|
// from returning before we've finished stopping.
|
|
m.mu.Lock()
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
defer m.mu.Unlock()
|
|
- select {
|
|
|
|
- // check to see that we've already stopped
|
|
|
|
- case <-m.stopped:
|
|
|
|
|
|
+ if m.stopped {
|
|
return
|
|
return
|
|
- default:
|
|
|
|
- // do nothing, we're stopping for the first time
|
|
|
|
}
|
|
}
|
|
|
|
+ m.stopped = true
|
|
|
|
|
|
srvDone, localSrvDone := make(chan struct{}), make(chan struct{})
|
|
srvDone, localSrvDone := make(chan struct{}), make(chan struct{})
|
|
go func() {
|
|
go func() {
|
|
@@ -460,11 +443,7 @@ func (m *Manager) Stop(ctx context.Context) {
|
|
m.keyManager.Stop()
|
|
m.keyManager.Stop()
|
|
}
|
|
}
|
|
|
|
|
|
- // once we start stopping, send a signal that we're doing so. this tells
|
|
|
|
- // Run that we've started stopping, when it gets the error from errServe
|
|
|
|
- // it also prevents the loop from processing any more stuff.
|
|
|
|
- close(m.stopped)
|
|
|
|
-
|
|
|
|
|
|
+ m.cancelFunc()
|
|
<-m.raftNode.Done()
|
|
<-m.raftNode.Done()
|
|
|
|
|
|
timer := time.AfterFunc(stopTimeout, func() {
|
|
timer := time.AfterFunc(stopTimeout, func() {
|
|
@@ -582,11 +561,9 @@ func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan
|
|
select {
|
|
select {
|
|
case leadershipEvent := <-leadershipCh:
|
|
case leadershipEvent := <-leadershipCh:
|
|
m.mu.Lock()
|
|
m.mu.Lock()
|
|
- select {
|
|
|
|
- case <-m.stopped:
|
|
|
|
|
|
+ if m.stopped {
|
|
m.mu.Unlock()
|
|
m.mu.Unlock()
|
|
return
|
|
return
|
|
- default:
|
|
|
|
}
|
|
}
|
|
newState := leadershipEvent.(raft.LeadershipState)
|
|
newState := leadershipEvent.(raft.LeadershipState)
|
|
|
|
|
|
@@ -596,8 +573,6 @@ func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan
|
|
m.becomeFollower()
|
|
m.becomeFollower()
|
|
}
|
|
}
|
|
m.mu.Unlock()
|
|
m.mu.Unlock()
|
|
- case <-m.stopped:
|
|
|
|
- return
|
|
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
return
|
|
return
|
|
}
|
|
}
|
|
@@ -605,20 +580,21 @@ func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan
|
|
}
|
|
}
|
|
|
|
|
|
// serveListener serves a listener for local and non local connections.
|
|
// serveListener serves a listener for local and non local connections.
|
|
-func (m *Manager) serveListener(ctx context.Context, errServe chan error, proto string, lis net.Listener) {
|
|
|
|
|
|
+func (m *Manager) serveListener(ctx context.Context, errServe chan error, l net.Listener) {
|
|
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
|
|
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
|
|
logrus.Fields{
|
|
logrus.Fields{
|
|
- "proto": lis.Addr().Network(),
|
|
|
|
- "addr": lis.Addr().String()}))
|
|
|
|
- if proto == "unix" {
|
|
|
|
|
|
+ "proto": l.Addr().Network(),
|
|
|
|
+ "addr": l.Addr().String(),
|
|
|
|
+ }))
|
|
|
|
+ if _, ok := l.(*net.TCPListener); !ok {
|
|
log.G(ctx).Info("Listening for local connections")
|
|
log.G(ctx).Info("Listening for local connections")
|
|
// we need to disallow double closes because UnixListener.Close
|
|
// we need to disallow double closes because UnixListener.Close
|
|
// can delete unix-socket file of newer listener. grpc calls
|
|
// can delete unix-socket file of newer listener. grpc calls
|
|
// Close twice indeed: in Serve and in Stop.
|
|
// Close twice indeed: in Serve and in Stop.
|
|
- errServe <- m.localserver.Serve(&closeOnceListener{Listener: lis})
|
|
|
|
|
|
+ errServe <- m.localserver.Serve(&closeOnceListener{Listener: l})
|
|
} else {
|
|
} else {
|
|
log.G(ctx).Info("Listening for connections")
|
|
log.G(ctx).Info("Listening for connections")
|
|
- errServe <- m.server.Serve(lis)
|
|
|
|
|
|
+ errServe <- m.server.Serve(l)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|