|
@@ -22,9 +22,10 @@ import (
|
|
"strconv"
|
|
"strconv"
|
|
"strings"
|
|
"strings"
|
|
"sync"
|
|
"sync"
|
|
|
|
+ "sync/atomic"
|
|
"time"
|
|
"time"
|
|
|
|
|
|
- "github.com/hashicorp/go-multierror"
|
|
|
|
|
|
+ multierror "github.com/hashicorp/go-multierror"
|
|
sockaddr "github.com/hashicorp/go-sockaddr"
|
|
sockaddr "github.com/hashicorp/go-sockaddr"
|
|
"github.com/miekg/dns"
|
|
"github.com/miekg/dns"
|
|
)
|
|
)
|
|
@@ -35,11 +36,14 @@ type Memberlist struct {
|
|
numNodes uint32 // Number of known nodes (estimate)
|
|
numNodes uint32 // Number of known nodes (estimate)
|
|
|
|
|
|
config *Config
|
|
config *Config
|
|
- shutdown bool
|
|
|
|
|
|
+ shutdown int32 // Used as an atomic boolean value
|
|
shutdownCh chan struct{}
|
|
shutdownCh chan struct{}
|
|
- leave bool
|
|
|
|
|
|
+ leave int32 // Used as an atomic boolean value
|
|
leaveBroadcast chan struct{}
|
|
leaveBroadcast chan struct{}
|
|
|
|
|
|
|
|
+ shutdownLock sync.Mutex // Serializes calls to Shutdown
|
|
|
|
+ leaveLock sync.Mutex // Serializes calls to Leave
|
|
|
|
+
|
|
transport Transport
|
|
transport Transport
|
|
handoff chan msgHandoff
|
|
handoff chan msgHandoff
|
|
|
|
|
|
@@ -113,15 +117,44 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
|
|
BindPort: conf.BindPort,
|
|
BindPort: conf.BindPort,
|
|
Logger: logger,
|
|
Logger: logger,
|
|
}
|
|
}
|
|
- nt, err := NewNetTransport(nc)
|
|
|
|
|
|
+
|
|
|
|
+ // See comment below for details about the retry in here.
|
|
|
|
+ makeNetRetry := func(limit int) (*NetTransport, error) {
|
|
|
|
+ var err error
|
|
|
|
+ for try := 0; try < limit; try++ {
|
|
|
|
+ var nt *NetTransport
|
|
|
|
+ if nt, err = NewNetTransport(nc); err == nil {
|
|
|
|
+ return nt, nil
|
|
|
|
+ }
|
|
|
|
+ if strings.Contains(err.Error(), "address already in use") {
|
|
|
|
+ logger.Printf("[DEBUG] memberlist: Got bind error: %v", err)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return nil, fmt.Errorf("failed to obtain an address: %v", err)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // The dynamic bind port operation is inherently racy because
|
|
|
|
+ // even though we are using the kernel to find a port for us, we
|
|
|
|
+ // are attempting to bind multiple protocols (and potentially
|
|
|
|
+ // multiple addresses) with the same port number. We build in a
|
|
|
|
+ // few retries here since this often gets transient errors in
|
|
|
|
+ // busy unit tests.
|
|
|
|
+ limit := 1
|
|
|
|
+ if conf.BindPort == 0 {
|
|
|
|
+ limit = 10
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ nt, err := makeNetRetry(limit)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Could not set up network transport: %v", err)
|
|
return nil, fmt.Errorf("Could not set up network transport: %v", err)
|
|
}
|
|
}
|
|
-
|
|
|
|
if conf.BindPort == 0 {
|
|
if conf.BindPort == 0 {
|
|
port := nt.GetAutoBindPort()
|
|
port := nt.GetAutoBindPort()
|
|
conf.BindPort = port
|
|
conf.BindPort = port
|
|
- logger.Printf("[DEBUG] Using dynamic bind port %d", port)
|
|
|
|
|
|
+ conf.AdvertisePort = port
|
|
|
|
+ logger.Printf("[DEBUG] memberlist: Using dynamic bind port %d", port)
|
|
}
|
|
}
|
|
transport = nt
|
|
transport = nt
|
|
}
|
|
}
|
|
@@ -275,23 +308,17 @@ func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, err
|
|
// resolveAddr is used to resolve the address into an address,
|
|
// resolveAddr is used to resolve the address into an address,
|
|
// port, and error. If no port is given, use the default
|
|
// port, and error. If no port is given, use the default
|
|
func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
|
|
func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
|
|
- // Normalize the incoming string to host:port so we can apply Go's
|
|
|
|
- // parser to it.
|
|
|
|
- port := uint16(0)
|
|
|
|
- if !hasPort(hostStr) {
|
|
|
|
- hostStr += ":" + strconv.Itoa(m.config.BindPort)
|
|
|
|
- }
|
|
|
|
|
|
+ // This captures the supplied port, or the default one.
|
|
|
|
+ hostStr = ensurePort(hostStr, m.config.BindPort)
|
|
host, sport, err := net.SplitHostPort(hostStr)
|
|
host, sport, err := net.SplitHostPort(hostStr)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
-
|
|
|
|
- // This will capture the supplied port, or the default one added above.
|
|
|
|
lport, err := strconv.ParseUint(sport, 10, 16)
|
|
lport, err := strconv.ParseUint(sport, 10, 16)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
- port = uint16(lport)
|
|
|
|
|
|
+ port := uint16(lport)
|
|
|
|
|
|
// If it looks like an IP address we are done. The SplitHostPort() above
|
|
// If it looks like an IP address we are done. The SplitHostPort() above
|
|
// will make sure the host part is in good shape for parsing, even for
|
|
// will make sure the host part is in good shape for parsing, even for
|
|
@@ -525,18 +552,17 @@ func (m *Memberlist) NumMembers() (alive int) {
|
|
// This method is safe to call multiple times, but must not be called
|
|
// This method is safe to call multiple times, but must not be called
|
|
// after the cluster is already shut down.
|
|
// after the cluster is already shut down.
|
|
func (m *Memberlist) Leave(timeout time.Duration) error {
|
|
func (m *Memberlist) Leave(timeout time.Duration) error {
|
|
- m.nodeLock.Lock()
|
|
|
|
- // We can't defer m.nodeLock.Unlock() because m.deadNode will also try to
|
|
|
|
- // acquire a lock so we need to Unlock before that.
|
|
|
|
|
|
+ m.leaveLock.Lock()
|
|
|
|
+ defer m.leaveLock.Unlock()
|
|
|
|
|
|
- if m.shutdown {
|
|
|
|
- m.nodeLock.Unlock()
|
|
|
|
|
|
+ if m.hasShutdown() {
|
|
panic("leave after shutdown")
|
|
panic("leave after shutdown")
|
|
}
|
|
}
|
|
|
|
|
|
- if !m.leave {
|
|
|
|
- m.leave = true
|
|
|
|
|
|
+ if !m.hasLeft() {
|
|
|
|
+ atomic.StoreInt32(&m.leave, 1)
|
|
|
|
|
|
|
|
+ m.nodeLock.Lock()
|
|
state, ok := m.nodeMap[m.config.Name]
|
|
state, ok := m.nodeMap[m.config.Name]
|
|
m.nodeLock.Unlock()
|
|
m.nodeLock.Unlock()
|
|
if !ok {
|
|
if !ok {
|
|
@@ -562,8 +588,6 @@ func (m *Memberlist) Leave(timeout time.Duration) error {
|
|
return fmt.Errorf("timeout waiting for leave broadcast")
|
|
return fmt.Errorf("timeout waiting for leave broadcast")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
- m.nodeLock.Unlock()
|
|
|
|
}
|
|
}
|
|
|
|
|
|
return nil
|
|
return nil
|
|
@@ -605,21 +629,31 @@ func (m *Memberlist) ProtocolVersion() uint8 {
|
|
//
|
|
//
|
|
// This method is safe to call multiple times.
|
|
// This method is safe to call multiple times.
|
|
func (m *Memberlist) Shutdown() error {
|
|
func (m *Memberlist) Shutdown() error {
|
|
- m.nodeLock.Lock()
|
|
|
|
- defer m.nodeLock.Unlock()
|
|
|
|
|
|
+ m.shutdownLock.Lock()
|
|
|
|
+ defer m.shutdownLock.Unlock()
|
|
|
|
|
|
- if m.shutdown {
|
|
|
|
|
|
+ if m.hasShutdown() {
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
// Shut down the transport first, which should block until it's
|
|
// Shut down the transport first, which should block until it's
|
|
// completely torn down. If we kill the memberlist-side handlers
|
|
// completely torn down. If we kill the memberlist-side handlers
|
|
// those I/O handlers might get stuck.
|
|
// those I/O handlers might get stuck.
|
|
- m.transport.Shutdown()
|
|
|
|
|
|
+ if err := m.transport.Shutdown(); err != nil {
|
|
|
|
+ m.logger.Printf("[ERR] Failed to shutdown transport: %v", err)
|
|
|
|
+ }
|
|
|
|
|
|
// Now tear down everything else.
|
|
// Now tear down everything else.
|
|
- m.shutdown = true
|
|
|
|
|
|
+ atomic.StoreInt32(&m.shutdown, 1)
|
|
close(m.shutdownCh)
|
|
close(m.shutdownCh)
|
|
m.deschedule()
|
|
m.deschedule()
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+func (m *Memberlist) hasShutdown() bool {
|
|
|
|
+ return atomic.LoadInt32(&m.shutdown) == 1
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (m *Memberlist) hasLeft() bool {
|
|
|
|
+ return atomic.LoadInt32(&m.leave) == 1
|
|
|
|
+}
|