Merge pull request #2040 from fcrisciani/memberlist_revendor

Memberlist revendor and optimizations
This commit is contained in:
Flavio Crisciani 2018-01-24 09:20:20 -08:00 committed by GitHub
commit 4bf50246d1
13 changed files with 262 additions and 175 deletions

View file

@ -165,16 +165,19 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
}
}
nDB.RUnlock()
if !ok || network.leaving || !nodePresent {
// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
return false
}
nDB.Lock()
e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
if err == nil {
// We have the latest state. Ignore the event
// since it is stale.
if e.ltime >= tEvent.LTime {
nDB.Unlock()
return false
}
}
@ -195,8 +198,6 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
nDB.config.Hostname, nDB.config.NodeID, tEvent)
e.reapTime = nDB.config.reapEntryInterval
}
nDB.Lock()
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
nDB.Unlock()

View file

@ -26,13 +26,10 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
e.broadcastNodeEvent(mn.Addr, opCreate)
e.nDB.Lock()
defer e.nDB.Unlock()
// In case the node is rejoining after a failure or leave,
// wait until an explicit join message arrives before adding
// it to the nodes just to make sure this is not a stale
// join. If you don't know about this node add it immediately.
_, fOk := e.nDB.failedNodes[mn.Name]
_, lOk := e.nDB.leftNodes[mn.Name]
if fOk || lOk {
// just add the node back to active
if moved, _ := e.nDB.changeNodeState(mn.Name, nodeActiveState); moved {
return
}

View file

@ -322,6 +322,8 @@ func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
// GetEntry retrieves the value of a table entry in a given (network,
// table, key) tuple
func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
nDB.RLock()
defer nDB.RUnlock()
entry, err := nDB.getEntry(tname, nid, key)
if err != nil {
return nil, err
@ -331,9 +333,6 @@ func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
}
func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
nDB.RLock()
defer nDB.RUnlock()
e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
if !ok {
return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
@ -348,13 +347,10 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
// entry for the same tuple for which there is already an existing
// entry unless the current entry is deleting state.
func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
nDB.Lock()
oldEntry, err := nDB.getEntry(tname, nid, key)
if err != nil {
if _, ok := err.(types.NotFoundError); !ok {
return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err)
}
}
if oldEntry != nil && !oldEntry.deleting {
if err == nil || (oldEntry != nil && !oldEntry.deleting) {
nDB.Unlock()
return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
}
@ -364,14 +360,13 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
value: value,
}
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()
if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
}
nDB.Lock()
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()
return nil
}
@ -380,7 +375,9 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
// propagates this event to the cluster. It is an error to update a
// non-existent entry.
func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
if _, err := nDB.GetEntry(tname, nid, key); err != nil {
nDB.Lock()
if _, err := nDB.getEntry(tname, nid, key); err != nil {
nDB.Unlock()
return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
}
@ -390,14 +387,13 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
value: value,
}
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()
if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send table update event: %v", err)
}
nDB.Lock()
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()
return nil
}
@ -427,27 +423,29 @@ func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem
// table, key) tuple and if the NetworkDB is part of the cluster
// propagates this event to the cluster.
func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
value, err := nDB.GetEntry(tname, nid, key)
if err != nil {
return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
nDB.Lock()
oldEntry, err := nDB.getEntry(tname, nid, key)
if err != nil || oldEntry == nil || oldEntry.deleting {
nDB.Unlock()
return fmt.Errorf("cannot delete entry %s with network id %s and key %s "+
"does not exist or is already being deleted", tname, nid, key)
}
entry := &entry{
ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeID,
value: value,
value: oldEntry.value,
deleting: true,
reapTime: nDB.config.reapEntryInterval,
}
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()
if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send table delete event: %v", err)
}
nDB.Lock()
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()
return nil
}

View file

@ -735,3 +735,64 @@ func TestNodeReincarnation(t *testing.T) {
closeNetworkDBInstances(dbs)
}
func TestParallelCreate(t *testing.T) {
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
startCh := make(chan int)
doneCh := make(chan error)
var success int32
for i := 0; i < 20; i++ {
go func() {
<-startCh
err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
if err == nil {
atomic.AddInt32(&success, 1)
}
doneCh <- err
}()
}
close(startCh)
for i := 0; i < 20; i++ {
<-doneCh
}
close(doneCh)
// Only 1 write should have succeeded
assert.Equal(t, int32(1), success)
closeNetworkDBInstances(dbs)
}
func TestParallelDelete(t *testing.T) {
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
assert.NoError(t, err)
startCh := make(chan int)
doneCh := make(chan error)
var success int32
for i := 0; i < 20; i++ {
go func() {
<-startCh
err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
if err == nil {
atomic.AddInt32(&success, 1)
}
doneCh <- err
}()
}
close(startCh)
for i := 0; i < 20; i++ {
<-doneCh
}
close(doneCh)
// Only 1 write should have succeeded
assert.Equal(t, int32(1), success)
closeNetworkDBInstances(dbs)
}

View file

@ -27,7 +27,7 @@ github.com/gorilla/mux v1.1
github.com/hashicorp/consul v0.5.2
github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b
github.com/hashicorp/go-multierror fcdddc395df1ddf4247c69bd436e84cfa0733f7e
github.com/hashicorp/memberlist v0.1.0
github.com/hashicorp/memberlist 3d8438da9589e7b608a83ffac1ef8211486bcb7c
github.com/sean-/seed e2103e2c35297fb7e17febb81e49b312087a2372
github.com/hashicorp/go-sockaddr acd314c5781ea706c710d9ea70069fd2e110d61d
github.com/hashicorp/serf 598c54895cc5a7b1a24a398d635e8c0ea0959870

View file

@ -23,6 +23,8 @@ Please check your installation with:
go version
```
Run `make deps` to fetch dependencies before building
## Usage
Memberlist is surprisingly simple to use. An example is shown below:
@ -63,82 +65,11 @@ For complete documentation, see the associated [Godoc](http://godoc.org/github.c
## Protocol
memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf),
with a few minor adaptations, mostly to increase propagation speed and
memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf). However, we extend the protocol in a number of ways:
* Several extensions are made to increase propagation speed and
convergence rate.
* Another set of extensions, that we call Lifeguard, are made to make memberlist more robust in the presence of slow message processing (due to factors such as CPU starvation, and network delay or loss).
A high level overview of the memberlist protocol (based on SWIM) is
described below, but for details please read the full
[SWIM paper](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf)
followed by the memberlist source. We welcome any questions related
For details on all of these extensions, please read our paper "[Lifeguard : SWIM-ing with Situational Awareness](https://arxiv.org/abs/1707.00788)", along with the memberlist source. We welcome any questions related
to the protocol on our issue tracker.
### Protocol Description
memberlist begins by joining an existing cluster or starting a new
cluster. If starting a new cluster, additional nodes are expected to join
it. New nodes in an existing cluster must be given the address of at
least one existing member in order to join the cluster. The new member
does a full state sync with the existing member over TCP and begins gossiping its
existence to the cluster.
Gossip is done over UDP with a configurable but fixed fanout and interval.
This ensures that network usage is constant with regards to number of nodes, as opposed to
exponential growth that can occur with traditional heartbeat mechanisms.
Complete state exchanges with a random node are done periodically over
TCP, but much less often than gossip messages. This increases the likelihood
that the membership list converges properly since the full state is exchanged
and merged. The interval between full state exchanges is configurable or can
be disabled entirely.
Failure detection is done by periodic random probing using a configurable interval.
If the node fails to ack within a reasonable time (typically some multiple
of RTT), then an indirect probe as well as a direct TCP probe are attempted. An
indirect probe asks a configurable number of random nodes to probe the same node,
in case there are network issues causing our own node to fail the probe. The direct
TCP probe is used to help identify the common situation where networking is
misconfigured to allow TCP but not UDP. Without the TCP probe, a UDP-isolated node
would think all other nodes were suspect and could cause churn in the cluster when
it attempts a TCP-based state exchange with another node. It is not desirable to
operate with only TCP connectivity because convergence will be much slower, but it
is enabled so that memberlist can detect this situation and alert operators.
If both our probe, the indirect probes, and the direct TCP probe fail within a
configurable time, then the node is marked "suspicious" and this knowledge is
gossiped to the cluster. A suspicious node is still considered a member of
cluster. If the suspect member of the cluster does not dispute the suspicion
within a configurable period of time, the node is finally considered dead,
and this state is then gossiped to the cluster.
This is a brief and incomplete description of the protocol. For a better idea,
please read the
[SWIM paper](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf)
in its entirety, along with the memberlist source code.
### Changes from SWIM
As mentioned earlier, the memberlist protocol is based on SWIM but includes
minor changes, mostly to increase propagation speed and convergence rates.
The changes from SWIM are noted here:
* memberlist does a full state sync over TCP periodically. SWIM only propagates
changes over gossip. While both eventually reach convergence, the full state
sync increases the likelihood that nodes are fully converged more quickly,
at the expense of more bandwidth usage. This feature can be totally disabled
if you wish.
* memberlist has a dedicated gossip layer separate from the failure detection
protocol. SWIM only piggybacks gossip messages on top of probe/ack messages.
memberlist also piggybacks gossip messages on top of probe/ack messages, but
also will periodically send out dedicated gossip messages on their own. This
feature lets you have a higher gossip rate (for example once per 200ms)
and a slower failure detection rate (such as once per second), resulting
in overall faster convergence rates and data propagation speeds. This feature
can be totally disabed as well, if you wish.
* memberlist stores around the state of dead nodes for a set amount of time,
so that when full syncs are requested, the requester also receives information
about dead nodes. Because SWIM doesn't do full syncs, SWIM deletes dead node
state immediately upon learning that the node is dead. This change again helps
the cluster converge more quickly.

View file

@ -141,6 +141,16 @@ type Config struct {
GossipNodes int
GossipToTheDeadTime time.Duration
// GossipVerifyIncoming controls whether to enforce encryption for incoming
// gossip. It is used for upshifting from unencrypted to encrypted gossip on
// a running cluster.
GossipVerifyIncoming bool
// GossipVerifyOutgoing controls whether to enforce encryption for outgoing
// gossip. It is used for upshifting from unencrypted to encrypted gossip on
// a running cluster.
GossipVerifyOutgoing bool
// EnableCompression is used to control message compression. This can
// be used to reduce bandwidth usage at the cost of slightly more CPU
// utilization. This is only available starting at protocol version 1.
@ -225,7 +235,7 @@ func DefaultLANConfig() *Config {
TCPTimeout: 10 * time.Second, // Timeout after 10 seconds
IndirectChecks: 3, // Use 3 nodes for the indirect ping
RetransmitMult: 4, // Retransmit a message 4 * log(N+1) nodes
SuspicionMult: 5, // Suspect a node for 5 * log(N+1) * Interval
SuspicionMult: 4, // Suspect a node for 4 * log(N+1) * Interval
SuspicionMaxTimeoutMult: 6, // For 10k nodes this will give a max timeout of 120 seconds
PushPullInterval: 30 * time.Second, // Low frequency
ProbeTimeout: 500 * time.Millisecond, // Reasonable RTT time for LAN
@ -233,9 +243,11 @@ func DefaultLANConfig() *Config {
DisableTcpPings: false, // TCP pings are safe, even with mixed versions
AwarenessMaxMultiplier: 8, // Probe interval backs off to 8 seconds
GossipNodes: 3, // Gossip to 3 nodes
GossipInterval: 200 * time.Millisecond, // Gossip more rapidly
GossipToTheDeadTime: 30 * time.Second, // Same as push/pull
GossipNodes: 3, // Gossip to 3 nodes
GossipInterval: 200 * time.Millisecond, // Gossip more rapidly
GossipToTheDeadTime: 30 * time.Second, // Same as push/pull
GossipVerifyIncoming: true,
GossipVerifyOutgoing: true,
EnableCompression: true, // Enable compression by default

View file

@ -22,9 +22,10 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-multierror"
multierror "github.com/hashicorp/go-multierror"
sockaddr "github.com/hashicorp/go-sockaddr"
"github.com/miekg/dns"
)
@ -35,11 +36,14 @@ type Memberlist struct {
numNodes uint32 // Number of known nodes (estimate)
config *Config
shutdown bool
shutdown int32 // Used as an atomic boolean value
shutdownCh chan struct{}
leave bool
leave int32 // Used as an atomic boolean value
leaveBroadcast chan struct{}
shutdownLock sync.Mutex // Serializes calls to Shutdown
leaveLock sync.Mutex // Serializes calls to Leave
transport Transport
handoff chan msgHandoff
@ -113,15 +117,44 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
BindPort: conf.BindPort,
Logger: logger,
}
nt, err := NewNetTransport(nc)
// See comment below for details about the retry in here.
makeNetRetry := func(limit int) (*NetTransport, error) {
var err error
for try := 0; try < limit; try++ {
var nt *NetTransport
if nt, err = NewNetTransport(nc); err == nil {
return nt, nil
}
if strings.Contains(err.Error(), "address already in use") {
logger.Printf("[DEBUG] memberlist: Got bind error: %v", err)
continue
}
}
return nil, fmt.Errorf("failed to obtain an address: %v", err)
}
// The dynamic bind port operation is inherently racy because
// even though we are using the kernel to find a port for us, we
// are attempting to bind multiple protocols (and potentially
// multiple addresses) with the same port number. We build in a
// few retries here since this often gets transient errors in
// busy unit tests.
limit := 1
if conf.BindPort == 0 {
limit = 10
}
nt, err := makeNetRetry(limit)
if err != nil {
return nil, fmt.Errorf("Could not set up network transport: %v", err)
}
if conf.BindPort == 0 {
port := nt.GetAutoBindPort()
conf.BindPort = port
logger.Printf("[DEBUG] Using dynamic bind port %d", port)
conf.AdvertisePort = port
logger.Printf("[DEBUG] memberlist: Using dynamic bind port %d", port)
}
transport = nt
}
@ -275,23 +308,17 @@ func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16) ([]ipPort, err
// resolveAddr is used to resolve the address into an address,
// port, and error. If no port is given, use the default
func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
// Normalize the incoming string to host:port so we can apply Go's
// parser to it.
port := uint16(0)
if !hasPort(hostStr) {
hostStr += ":" + strconv.Itoa(m.config.BindPort)
}
// This captures the supplied port, or the default one.
hostStr = ensurePort(hostStr, m.config.BindPort)
host, sport, err := net.SplitHostPort(hostStr)
if err != nil {
return nil, err
}
// This will capture the supplied port, or the default one added above.
lport, err := strconv.ParseUint(sport, 10, 16)
if err != nil {
return nil, err
}
port = uint16(lport)
port := uint16(lport)
// If it looks like an IP address we are done. The SplitHostPort() above
// will make sure the host part is in good shape for parsing, even for
@ -525,18 +552,17 @@ func (m *Memberlist) NumMembers() (alive int) {
// This method is safe to call multiple times, but must not be called
// after the cluster is already shut down.
func (m *Memberlist) Leave(timeout time.Duration) error {
m.nodeLock.Lock()
// We can't defer m.nodeLock.Unlock() because m.deadNode will also try to
// acquire a lock so we need to Unlock before that.
m.leaveLock.Lock()
defer m.leaveLock.Unlock()
if m.shutdown {
m.nodeLock.Unlock()
if m.hasShutdown() {
panic("leave after shutdown")
}
if !m.leave {
m.leave = true
if !m.hasLeft() {
atomic.StoreInt32(&m.leave, 1)
m.nodeLock.Lock()
state, ok := m.nodeMap[m.config.Name]
m.nodeLock.Unlock()
if !ok {
@ -562,8 +588,6 @@ func (m *Memberlist) Leave(timeout time.Duration) error {
return fmt.Errorf("timeout waiting for leave broadcast")
}
}
} else {
m.nodeLock.Unlock()
}
return nil
@ -605,21 +629,31 @@ func (m *Memberlist) ProtocolVersion() uint8 {
//
// This method is safe to call multiple times.
func (m *Memberlist) Shutdown() error {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
m.shutdownLock.Lock()
defer m.shutdownLock.Unlock()
if m.shutdown {
if m.hasShutdown() {
return nil
}
// Shut down the transport first, which should block until it's
// completely torn down. If we kill the memberlist-side handlers
// those I/O handlers might get stuck.
m.transport.Shutdown()
if err := m.transport.Shutdown(); err != nil {
m.logger.Printf("[ERR] Failed to shutdown transport: %v", err)
}
// Now tear down everything else.
m.shutdown = true
atomic.StoreInt32(&m.shutdown, 1)
close(m.shutdownCh)
m.deschedule()
return nil
}
func (m *Memberlist) hasShutdown() bool {
return atomic.LoadInt32(&m.shutdown) == 1
}
func (m *Memberlist) hasLeft() bool {
return atomic.LoadInt32(&m.leave) == 1
}

View file

@ -55,6 +55,7 @@ const (
encryptMsg
nackRespMsg
hasCrcMsg
errMsg
)
// compressionType is used to specify the compression algorithm
@ -105,6 +106,11 @@ type nackResp struct {
SeqNo uint32
}
// err response is sent to relay the error from the remote end
type errResp struct {
Error string
}
// suspect is broadcast when we suspect a node is dead
type suspect struct {
Incarnation uint32
@ -209,6 +215,19 @@ func (m *Memberlist) handleConn(conn net.Conn) {
if err != nil {
if err != io.EOF {
m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn))
resp := errResp{err.Error()}
out, err := encode(errMsg, &resp)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode error response: %s", err)
return
}
err = m.rawSendMsgStream(conn, out.Bytes())
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send error: %s %s", err, LogConn(conn))
return
}
}
return
}
@ -283,8 +302,13 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time
// Decrypt the payload
plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, nil)
if err != nil {
m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from))
return
if !m.config.GossipVerifyIncoming {
// Treat the message as plaintext
plain = buf
} else {
m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from))
return
}
}
// Continue processing the plaintext buffer
@ -557,7 +581,7 @@ func (m *Memberlist) encodeAndSendMsg(addr string, msgType messageType, msg inte
func (m *Memberlist) sendMsg(addr string, msg []byte) error {
// Check if we can piggy back any messages
bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead
if m.config.EncryptionEnabled() {
if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
bytesAvail -= encryptOverhead(m.encryptionVersion())
}
extra := m.getBroadcasts(compoundOverhead, bytesAvail)
@ -621,7 +645,7 @@ func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error
}
// Check if we have encryption enabled
if m.config.EncryptionEnabled() {
if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
// Encrypt the payload
var buf bytes.Buffer
primaryKey := m.config.Keyring.GetPrimaryKey()
@ -652,7 +676,7 @@ func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error {
}
// Check if encryption is enabled
if m.config.EncryptionEnabled() {
if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
crypt, err := m.encryptLocalState(sendBuf)
if err != nil {
m.logger.Printf("[ERROR] memberlist: Failed to encrypt local state: %v", err)
@ -721,6 +745,14 @@ func (m *Memberlist) sendAndReceiveState(addr string, join bool) ([]pushNodeStat
return nil, nil, err
}
if msgType == errMsg {
var resp errResp
if err := dec.Decode(&resp); err != nil {
return nil, nil, err
}
return nil, nil, fmt.Errorf("remote error: %v", resp.Error)
}
// Quit if not push/pull
if msgType != pushPullMsg {
err := fmt.Errorf("received invalid msgType (%d), expected pushPullMsg (%d) %s", msgType, pushPullMsg, LogConn(conn))
@ -876,7 +908,7 @@ func (m *Memberlist) readStream(conn net.Conn) (messageType, io.Reader, *codec.D
// Reset message type and bufConn
msgType = messageType(plain[0])
bufConn = bytes.NewReader(plain[1:])
} else if m.config.EncryptionEnabled() {
} else if m.config.EncryptionEnabled() && m.config.GossipVerifyIncoming {
return 0, nil, nil,
fmt.Errorf("Encryption is configured but remote state is not encrypted")
}
@ -1027,7 +1059,7 @@ func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
// operations, given the deadline. The bool return parameter is true if we
// we able to round trip a ping to the other node.
func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time.Time) (bool, error) {
conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
conn, err := m.transport.DialTimeout(addr, deadline.Sub(time.Now()))
if err != nil {
// If the node is actually dead we expect this to fail, so we
// shouldn't spam the logs with it. After this point, errors

View file

@ -40,6 +40,11 @@ func (n *Node) Address() string {
return joinHostPort(n.Addr.String(), n.Port)
}
// String returns the node name
func (n *Node) String() string {
return n.Name
}
// NodeState is used to manage our state view of another node
type nodeState struct {
Node
@ -246,10 +251,17 @@ func (m *Memberlist) probeNode(node *nodeState) {
nackCh := make(chan struct{}, m.config.IndirectChecks+1)
m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
// Mark the sent time here, which should be after any pre-processing but
// before system calls to do the actual send. This probably over-reports
// a bit, but it's the best we can do. We had originally put this right
// after the I/O, but that would sometimes give negative RTT measurements
// which was not desirable.
sent := time.Now()
// Send a ping to the node. If this node looks like it's suspect or dead,
// also tack on a suspect message so that it has a chance to refute as
// soon as possible.
deadline := time.Now().Add(probeInterval)
deadline := sent.Add(probeInterval)
addr := node.Address()
if node.State == stateAlive {
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
@ -279,11 +291,6 @@ func (m *Memberlist) probeNode(node *nodeState) {
}
}
// Mark the sent time here, which should be after any pre-processing and
// system calls to do the actual send. This probably under-reports a bit,
// but it's the best we can do.
sent := time.Now()
// Arrange for our self-awareness to get updated. At this point we've
// sent the ping, so any return statement means the probe succeeded
// which will improve our health until we get to the failure scenarios
@ -830,7 +837,7 @@ func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
// in-queue to be processed but blocked by the locks above. If we let
// that aliveMsg process, it'll cause us to re-join the cluster. This
// ensures that we don't.
if m.leave && a.Node == m.config.Name {
if m.hasLeft() && a.Node == m.config.Name {
return
}
@ -1106,7 +1113,7 @@ func (m *Memberlist) deadNode(d *dead) {
// Check if this is us
if state.Name == m.config.Name {
// If we are not leaving we need to refute
if !m.leave {
if !m.hasLeft() {
m.refute(state, d.Incarnation)
m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
return // Do not mark ourself dead

View file

@ -117,7 +117,7 @@ func (s *suspicion) Confirm(from string) bool {
// stop the timer then we will call the timeout function directly from
// here.
n := atomic.AddInt32(&s.n, 1)
elapsed := time.Now().Sub(s.start)
elapsed := time.Since(s.start)
remaining := remainingSuspicionTime(n, s.k, elapsed, s.min, s.max)
if s.timer.Stop() {
if remaining > 0 {

View file

@ -17,7 +17,7 @@ type Packet struct {
// Timestamp is the time when the packet was received. This should be
// taken as close as possible to the actual receipt time to help make an
// accurate RTT measurements during probes.
// accurate RTT measurement during probes.
Timestamp time.Time
}

View file

@ -217,20 +217,6 @@ func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
return
}
// Given a string of the form "host", "host:port",
// "ipv6::addr" or "[ipv6::address]:port",
// return true if the string includes a port.
func hasPort(s string) bool {
last := strings.LastIndex(s, ":")
if last == -1 {
return false
}
if s[0] == '[' {
return s[last-1] == ']'
}
return strings.Index(s, ":") == last
}
// compressPayload takes an opaque input buffer, compresses it
// and wraps it in a compress{} message that is encoded.
func compressPayload(inp []byte) (*bytes.Buffer, error) {
@ -294,3 +280,31 @@ func decompressBuffer(c *compress) ([]byte, error) {
func joinHostPort(host string, port uint16) string {
return net.JoinHostPort(host, strconv.Itoa(int(port)))
}
// hasPort is given a string of the form "host", "host:port", "ipv6::address",
// or "[ipv6::address]:port", and returns true if the string includes a port.
func hasPort(s string) bool {
// IPv6 address in brackets.
if strings.LastIndex(s, "[") == 0 {
return strings.LastIndex(s, ":") > strings.LastIndex(s, "]")
}
// Otherwise the presence of a single colon determines if there's a port
// since IPv6 addresses outside of brackets (count > 1) can't have a
// port.
return strings.Count(s, ":") == 1
}
// ensurePort makes sure the given string has a port number on it, otherwise it
// appends the given port as a default.
func ensurePort(s string, port int) string {
if hasPort(s) {
return s
}
// If this is an IPv6 address, the join call will add another set of
// brackets, so we have to trim before we add the default port.
s = strings.Trim(s, "[]")
s = net.JoinHostPort(s, strconv.Itoa(port))
return s
}