|
@@ -18,7 +18,15 @@ import (
|
|
// range. This range is inclusive.
|
|
// range. This range is inclusive.
|
|
const (
|
|
const (
|
|
ProtocolVersionMin uint8 = 1
|
|
ProtocolVersionMin uint8 = 1
|
|
- ProtocolVersionMax = 2
|
|
|
|
|
|
+
|
|
|
|
+ // Version 3 added support for TCP pings but we kept the default
|
|
|
|
+ // protocol version at 2 to ease transition to this new feature.
|
|
|
|
+ // A memberlist speaking version 2 of the protocol will attempt
|
|
|
|
+ // to TCP ping another memberlist who understands version 3 or
|
|
|
|
+ // greater.
|
|
|
|
+ ProtocolVersion2Compatible = 2
|
|
|
|
+
|
|
|
|
+ ProtocolVersionMax = 3
|
|
)
|
|
)
|
|
|
|
|
|
// messageType is an integer ID of a type of message that can be received
|
|
// messageType is an integer ID of a type of message that can be received
|
|
@@ -79,7 +87,8 @@ type indirectPingReq struct {
|
|
|
|
|
|
// ack response is sent for a ping
|
|
// ack response is sent for a ping
|
|
type ackResp struct {
|
|
type ackResp struct {
|
|
- SeqNo uint32
|
|
|
|
|
|
+ SeqNo uint32
|
|
|
|
+ Payload []byte
|
|
}
|
|
}
|
|
|
|
|
|
// suspect is broadcast when we suspect a node is dead
|
|
// suspect is broadcast when we suspect a node is dead
|
|
@@ -119,6 +128,11 @@ type pushPullHeader struct {
|
|
Join bool // Is this a join request or a anti-entropy run
|
|
Join bool // Is this a join request or a anti-entropy run
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// userMsgHeader is used to encapsulate a userMsg
|
|
|
|
+type userMsgHeader struct {
|
|
|
|
+ UserMsgLen int // Encodes the byte lengh of user state
|
|
|
|
+}
|
|
|
|
+
|
|
// pushNodeState is used for pushPullReq when we are
|
|
// pushNodeState is used for pushPullReq when we are
|
|
// transfering out node states
|
|
// transfering out node states
|
|
type pushNodeState struct {
|
|
type pushNodeState struct {
|
|
@@ -185,54 +199,65 @@ func (m *Memberlist) tcpListen() {
|
|
|
|
|
|
// handleConn handles a single incoming TCP connection
|
|
// handleConn handles a single incoming TCP connection
|
|
func (m *Memberlist) handleConn(conn *net.TCPConn) {
|
|
func (m *Memberlist) handleConn(conn *net.TCPConn) {
|
|
- m.logger.Printf("[DEBUG] memberlist: Responding to push/pull sync with: %s", conn.RemoteAddr())
|
|
|
|
|
|
+ m.logger.Printf("[DEBUG] memberlist: TCP connection %s", LogConn(conn))
|
|
|
|
+
|
|
defer conn.Close()
|
|
defer conn.Close()
|
|
metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1)
|
|
metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1)
|
|
|
|
|
|
- join, remoteNodes, userState, err := m.readRemoteState(conn)
|
|
|
|
|
|
+ conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
|
|
|
|
+ msgType, bufConn, dec, err := m.readTCP(conn)
|
|
if err != nil {
|
|
if err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Failed to receive remote state: %s", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
- if err := m.sendLocalState(conn, join); err != nil {
|
|
|
|
- m.logger.Printf("[ERR] memberlist: Failed to push local state: %s", err)
|
|
|
|
- }
|
|
|
|
|
|
+ switch msgType {
|
|
|
|
+ case userMsg:
|
|
|
|
+ if err := m.readUserMsg(bufConn, dec); err != nil {
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn))
|
|
|
|
+ }
|
|
|
|
+ case pushPullMsg:
|
|
|
|
+ join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
|
|
|
|
+ if err != nil {
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn))
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
|
|
- if err := m.verifyProtocol(remoteNodes); err != nil {
|
|
|
|
- m.logger.Printf("[ERR] memberlist: Push/pull verification failed: %s", err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+ if err := m.sendLocalState(conn, join); err != nil {
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to push local state: %s %s", err, LogConn(conn))
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
|
|
- // Invoke the merge delegate if any
|
|
|
|
- if join && m.config.Merge != nil {
|
|
|
|
- nodes := make([]*Node, len(remoteNodes))
|
|
|
|
- for idx, n := range remoteNodes {
|
|
|
|
- nodes[idx] = &Node{
|
|
|
|
- Name: n.Name,
|
|
|
|
- Addr: n.Addr,
|
|
|
|
- Port: n.Port,
|
|
|
|
- Meta: n.Meta,
|
|
|
|
- PMin: n.Vsn[0],
|
|
|
|
- PMax: n.Vsn[1],
|
|
|
|
- PCur: n.Vsn[2],
|
|
|
|
- DMin: n.Vsn[3],
|
|
|
|
- DMax: n.Vsn[4],
|
|
|
|
- DCur: n.Vsn[5],
|
|
|
|
- }
|
|
|
|
|
|
+ if err := m.mergeRemoteState(join, remoteNodes, userState); err != nil {
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed push/pull merge: %s %s", err, LogConn(conn))
|
|
|
|
+ return
|
|
}
|
|
}
|
|
- if m.config.Merge.NotifyMerge(nodes) {
|
|
|
|
- m.logger.Printf("[WARN] memberlist: Cluster merge canceled")
|
|
|
|
|
|
+ case pingMsg:
|
|
|
|
+ var p ping
|
|
|
|
+ if err := dec.Decode(&p); err != nil {
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to decode TCP ping: %s %s", err, LogConn(conn))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
|
|
- // Merge the membership state
|
|
|
|
- m.mergeState(remoteNodes)
|
|
|
|
|
|
+ if p.Node != "" && p.Node != m.config.Name {
|
|
|
|
+ m.logger.Printf("[WARN] memberlist: Got ping for unexpected node %s %s", p.Node, LogConn(conn))
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
|
|
- // Invoke the delegate for user state
|
|
|
|
- if m.config.Delegate != nil {
|
|
|
|
- m.config.Delegate.MergeRemoteState(userState, join)
|
|
|
|
|
|
+ ack := ackResp{p.SeqNo, nil}
|
|
|
|
+ out, err := encode(ackRespMsg, &ack)
|
|
|
|
+ if err != nil {
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to encode TCP ack: %s", err)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ err = m.rawSendMsgTCP(conn, out.Bytes())
|
|
|
|
+ if err != nil {
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to send TCP ack: %s %s", err, LogConn(conn))
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ default:
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Received invalid msgType (%d) %s", msgType, LogConn(conn))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -265,29 +290,30 @@ func (m *Memberlist) udpListen() {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // Capture the reception time of the packet as close to the
|
|
|
|
+ // system calls as possible.
|
|
|
|
+ lastPacket = time.Now()
|
|
|
|
+
|
|
// Check the length
|
|
// Check the length
|
|
if n < 1 {
|
|
if n < 1 {
|
|
- m.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes). From: %s",
|
|
|
|
- len(buf), addr)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s",
|
|
|
|
+ len(buf), LogAddress(addr))
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
|
|
|
|
- // Capture the current time
|
|
|
|
- lastPacket = time.Now()
|
|
|
|
-
|
|
|
|
// Ingest this packet
|
|
// Ingest this packet
|
|
metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n))
|
|
metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n))
|
|
- m.ingestPacket(buf[:n], addr)
|
|
|
|
|
|
+ m.ingestPacket(buf[:n], addr, lastPacket)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func (m *Memberlist) ingestPacket(buf []byte, from net.Addr) {
|
|
|
|
|
|
+func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time) {
|
|
// Check if encryption is enabled
|
|
// Check if encryption is enabled
|
|
if m.config.EncryptionEnabled() {
|
|
if m.config.EncryptionEnabled() {
|
|
// Decrypt the payload
|
|
// Decrypt the payload
|
|
plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, nil)
|
|
plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, nil)
|
|
if err != nil {
|
|
if err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
@@ -296,10 +322,10 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr) {
|
|
}
|
|
}
|
|
|
|
|
|
// Handle the command
|
|
// Handle the command
|
|
- m.handleCommand(buf, from)
|
|
|
|
|
|
+ m.handleCommand(buf, from, timestamp)
|
|
}
|
|
}
|
|
|
|
|
|
-func (m *Memberlist) handleCommand(buf []byte, from net.Addr) {
|
|
|
|
|
|
+func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) {
|
|
// Decode the message type
|
|
// Decode the message type
|
|
msgType := messageType(buf[0])
|
|
msgType := messageType(buf[0])
|
|
buf = buf[1:]
|
|
buf = buf[1:]
|
|
@@ -307,16 +333,16 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr) {
|
|
// Switch on the msgType
|
|
// Switch on the msgType
|
|
switch msgType {
|
|
switch msgType {
|
|
case compoundMsg:
|
|
case compoundMsg:
|
|
- m.handleCompound(buf, from)
|
|
|
|
|
|
+ m.handleCompound(buf, from, timestamp)
|
|
case compressMsg:
|
|
case compressMsg:
|
|
- m.handleCompressed(buf, from)
|
|
|
|
|
|
+ m.handleCompressed(buf, from, timestamp)
|
|
|
|
|
|
case pingMsg:
|
|
case pingMsg:
|
|
m.handlePing(buf, from)
|
|
m.handlePing(buf, from)
|
|
case indirectPingMsg:
|
|
case indirectPingMsg:
|
|
m.handleIndirectPing(buf, from)
|
|
m.handleIndirectPing(buf, from)
|
|
case ackRespMsg:
|
|
case ackRespMsg:
|
|
- m.handleAck(buf, from)
|
|
|
|
|
|
+ m.handleAck(buf, from, timestamp)
|
|
|
|
|
|
case suspectMsg:
|
|
case suspectMsg:
|
|
fallthrough
|
|
fallthrough
|
|
@@ -328,11 +354,11 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr) {
|
|
select {
|
|
select {
|
|
case m.handoff <- msgHandoff{msgType, buf, from}:
|
|
case m.handoff <- msgHandoff{msgType, buf, from}:
|
|
default:
|
|
default:
|
|
- m.logger.Printf("[WARN] memberlist: UDP handler queue full, dropping message (%d)", msgType)
|
|
|
|
|
|
+ m.logger.Printf("[WARN] memberlist: UDP handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
|
|
}
|
|
}
|
|
|
|
|
|
default:
|
|
default:
|
|
- m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported. From: %s", msgType, from)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported %s", msgType, LogAddress(from))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -357,7 +383,7 @@ func (m *Memberlist) udpHandler() {
|
|
case userMsg:
|
|
case userMsg:
|
|
m.handleUser(buf, from)
|
|
m.handleUser(buf, from)
|
|
default:
|
|
default:
|
|
- m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported. From: %s (handler)", msgType, from)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: UDP msg type (%d) not supported %s (handler)", msgType, LogAddress(from))
|
|
}
|
|
}
|
|
|
|
|
|
case <-m.shutdownCh:
|
|
case <-m.shutdownCh:
|
|
@@ -366,46 +392,50 @@ func (m *Memberlist) udpHandler() {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func (m *Memberlist) handleCompound(buf []byte, from net.Addr) {
|
|
|
|
|
|
+func (m *Memberlist) handleCompound(buf []byte, from net.Addr, timestamp time.Time) {
|
|
// Decode the parts
|
|
// Decode the parts
|
|
trunc, parts, err := decodeCompoundMessage(buf)
|
|
trunc, parts, err := decodeCompoundMessage(buf)
|
|
if err != nil {
|
|
if err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Failed to decode compound request: %s", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to decode compound request: %s %s", err, LogAddress(from))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
// Log any truncation
|
|
// Log any truncation
|
|
if trunc > 0 {
|
|
if trunc > 0 {
|
|
- m.logger.Printf("[WARN] memberlist: Compound request had %d truncated messages", trunc)
|
|
|
|
|
|
+ m.logger.Printf("[WARN] memberlist: Compound request had %d truncated messages %s", trunc, LogAddress(from))
|
|
}
|
|
}
|
|
|
|
|
|
// Handle each message
|
|
// Handle each message
|
|
for _, part := range parts {
|
|
for _, part := range parts {
|
|
- m.handleCommand(part, from)
|
|
|
|
|
|
+ m.handleCommand(part, from, timestamp)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
func (m *Memberlist) handlePing(buf []byte, from net.Addr) {
|
|
func (m *Memberlist) handlePing(buf []byte, from net.Addr) {
|
|
var p ping
|
|
var p ping
|
|
if err := decode(buf, &p); err != nil {
|
|
if err := decode(buf, &p); err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Failed to decode ping request: %s", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to decode ping request: %s %s", err, LogAddress(from))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
// If node is provided, verify that it is for us
|
|
// If node is provided, verify that it is for us
|
|
if p.Node != "" && p.Node != m.config.Name {
|
|
if p.Node != "" && p.Node != m.config.Name {
|
|
- m.logger.Printf("[WARN] memberlist: Got ping for unexpected node '%s'", p.Node)
|
|
|
|
|
|
+ m.logger.Printf("[WARN] memberlist: Got ping for unexpected node '%s' %s", p.Node, LogAddress(from))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- ack := ackResp{p.SeqNo}
|
|
|
|
|
|
+ var ack ackResp
|
|
|
|
+ ack.SeqNo = p.SeqNo
|
|
|
|
+ if m.config.Ping != nil {
|
|
|
|
+ ack.Payload = m.config.Ping.AckPayload()
|
|
|
|
+ }
|
|
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil {
|
|
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Failed to send ack: %s", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogAddress(from))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
|
|
func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
|
|
var ind indirectPingReq
|
|
var ind indirectPingReq
|
|
if err := decode(buf, &ind); err != nil {
|
|
if err := decode(buf, &ind); err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Failed to decode indirect ping request: %s", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to decode indirect ping request: %s %s", err, LogAddress(from))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
@@ -421,33 +451,33 @@ func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
|
|
destAddr := &net.UDPAddr{IP: ind.Target, Port: int(ind.Port)}
|
|
destAddr := &net.UDPAddr{IP: ind.Target, Port: int(ind.Port)}
|
|
|
|
|
|
// Setup a response handler to relay the ack
|
|
// Setup a response handler to relay the ack
|
|
- respHandler := func() {
|
|
|
|
- ack := ackResp{ind.SeqNo}
|
|
|
|
|
|
+ respHandler := func(payload []byte, timestamp time.Time) {
|
|
|
|
+ ack := ackResp{ind.SeqNo, nil}
|
|
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil {
|
|
if err := m.encodeAndSendMsg(from, ackRespMsg, &ack); err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout)
|
|
m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout)
|
|
|
|
|
|
// Send the ping
|
|
// Send the ping
|
|
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
|
|
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func (m *Memberlist) handleAck(buf []byte, from net.Addr) {
|
|
|
|
|
|
+func (m *Memberlist) handleAck(buf []byte, from net.Addr, timestamp time.Time) {
|
|
var ack ackResp
|
|
var ack ackResp
|
|
if err := decode(buf, &ack); err != nil {
|
|
if err := decode(buf, &ack); err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Failed to decode ack response: %s", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to decode ack response: %s %s", err, LogAddress(from))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- m.invokeAckHandler(ack.SeqNo)
|
|
|
|
|
|
+ m.invokeAckHandler(ack, timestamp)
|
|
}
|
|
}
|
|
|
|
|
|
func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {
|
|
func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {
|
|
var sus suspect
|
|
var sus suspect
|
|
if err := decode(buf, &sus); err != nil {
|
|
if err := decode(buf, &sus); err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Failed to decode suspect message: %s", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to decode suspect message: %s %s", err, LogAddress(from))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
m.suspectNode(&sus)
|
|
m.suspectNode(&sus)
|
|
@@ -456,7 +486,7 @@ func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {
|
|
func (m *Memberlist) handleAlive(buf []byte, from net.Addr) {
|
|
func (m *Memberlist) handleAlive(buf []byte, from net.Addr) {
|
|
var live alive
|
|
var live alive
|
|
if err := decode(buf, &live); err != nil {
|
|
if err := decode(buf, &live); err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Failed to decode alive message: %s", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to decode alive message: %s %s", err, LogAddress(from))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
@@ -472,7 +502,7 @@ func (m *Memberlist) handleAlive(buf []byte, from net.Addr) {
|
|
func (m *Memberlist) handleDead(buf []byte, from net.Addr) {
|
|
func (m *Memberlist) handleDead(buf []byte, from net.Addr) {
|
|
var d dead
|
|
var d dead
|
|
if err := decode(buf, &d); err != nil {
|
|
if err := decode(buf, &d); err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Failed to decode dead message: %s", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to decode dead message: %s %s", err, LogAddress(from))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
m.deadNode(&d)
|
|
m.deadNode(&d)
|
|
@@ -487,16 +517,16 @@ func (m *Memberlist) handleUser(buf []byte, from net.Addr) {
|
|
}
|
|
}
|
|
|
|
|
|
// handleCompressed is used to unpack a compressed message
|
|
// handleCompressed is used to unpack a compressed message
|
|
-func (m *Memberlist) handleCompressed(buf []byte, from net.Addr) {
|
|
|
|
|
|
+func (m *Memberlist) handleCompressed(buf []byte, from net.Addr, timestamp time.Time) {
|
|
// Try to decode the payload
|
|
// Try to decode the payload
|
|
payload, err := decompressPayload(buf)
|
|
payload, err := decompressPayload(buf)
|
|
if err != nil {
|
|
if err != nil {
|
|
- m.logger.Printf("[ERR] memberlist: Failed to decompress payload: %v", err)
|
|
|
|
|
|
+ m.logger.Printf("[ERR] memberlist: Failed to decompress payload: %v %s", err, LogAddress(from))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
// Recursively handle the payload
|
|
// Recursively handle the payload
|
|
- m.handleCommand(payload, from)
|
|
|
|
|
|
+ m.handleCommand(payload, from, timestamp)
|
|
}
|
|
}
|
|
|
|
|
|
// encodeAndSendMsg is used to combine the encoding and sending steps
|
|
// encodeAndSendMsg is used to combine the encoding and sending steps
|
|
@@ -523,7 +553,7 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
|
|
|
|
|
|
// Fast path if nothing to piggypack
|
|
// Fast path if nothing to piggypack
|
|
if len(extra) == 0 {
|
|
if len(extra) == 0 {
|
|
- return m.rawSendMsg(to, msg)
|
|
|
|
|
|
+ return m.rawSendMsgUDP(to, msg)
|
|
}
|
|
}
|
|
|
|
|
|
// Join all the messages
|
|
// Join all the messages
|
|
@@ -535,11 +565,11 @@ func (m *Memberlist) sendMsg(to net.Addr, msg []byte) error {
|
|
compound := makeCompoundMessage(msgs)
|
|
compound := makeCompoundMessage(msgs)
|
|
|
|
|
|
// Send the message
|
|
// Send the message
|
|
- return m.rawSendMsg(to, compound.Bytes())
|
|
|
|
|
|
+ return m.rawSendMsgUDP(to, compound.Bytes())
|
|
}
|
|
}
|
|
|
|
|
|
-// rawSendMsg is used to send a UDP message to another host without modification
|
|
|
|
-func (m *Memberlist) rawSendMsg(to net.Addr, msg []byte) error {
|
|
|
|
|
|
+// rawSendMsgUDP is used to send a UDP message to another host without modification
|
|
|
|
+func (m *Memberlist) rawSendMsgUDP(to net.Addr, msg []byte) error {
|
|
// Check if we have compression enabled
|
|
// Check if we have compression enabled
|
|
if m.config.EnableCompression {
|
|
if m.config.EnableCompression {
|
|
buf, err := compressPayload(msg)
|
|
buf, err := compressPayload(msg)
|
|
@@ -571,7 +601,72 @@ func (m *Memberlist) rawSendMsg(to net.Addr, msg []byte) error {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
|
|
|
|
-// sendState is used to initiate a push/pull over TCP with a remote node
|
|
|
|
|
|
+// rawSendMsgTCP is used to send a TCP message to another host without modification
|
|
|
|
+func (m *Memberlist) rawSendMsgTCP(conn net.Conn, sendBuf []byte) error {
|
|
|
|
+ // Check if compresion is enabled
|
|
|
|
+ if m.config.EnableCompression {
|
|
|
|
+ compBuf, err := compressPayload(sendBuf)
|
|
|
|
+ if err != nil {
|
|
|
|
+ m.logger.Printf("[ERROR] memberlist: Failed to compress payload: %v", err)
|
|
|
|
+ } else {
|
|
|
|
+ sendBuf = compBuf.Bytes()
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Check if encryption is enabled
|
|
|
|
+ if m.config.EncryptionEnabled() {
|
|
|
|
+ crypt, err := m.encryptLocalState(sendBuf)
|
|
|
|
+ if err != nil {
|
|
|
|
+ m.logger.Printf("[ERROR] memberlist: Failed to encrypt local state: %v", err)
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ sendBuf = crypt
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Write out the entire send buffer
|
|
|
|
+ metrics.IncrCounter([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf)))
|
|
|
|
+
|
|
|
|
+ if n, err := conn.Write(sendBuf); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ } else if n != len(sendBuf) {
|
|
|
|
+ return fmt.Errorf("only %d of %d bytes written", n, len(sendBuf))
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// sendTCPUserMsg is used to send a TCP userMsg to another host
|
|
|
|
+func (m *Memberlist) sendTCPUserMsg(to net.Addr, sendBuf []byte) error {
|
|
|
|
+ dialer := net.Dialer{Timeout: m.config.TCPTimeout}
|
|
|
|
+ conn, err := dialer.Dial("tcp", to.String())
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ defer conn.Close()
|
|
|
|
+
|
|
|
|
+ bufConn := bytes.NewBuffer(nil)
|
|
|
|
+
|
|
|
|
+ if err := bufConn.WriteByte(byte(userMsg)); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Send our node state
|
|
|
|
+ header := userMsgHeader{UserMsgLen: len(sendBuf)}
|
|
|
|
+ hd := codec.MsgpackHandle{}
|
|
|
|
+ enc := codec.NewEncoder(bufConn, &hd)
|
|
|
|
+
|
|
|
|
+ if err := enc.Encode(&header); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if _, err := bufConn.Write(sendBuf); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return m.rawSendMsgTCP(conn, bufConn.Bytes())
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// sendAndReceiveState is used to initiate a push/pull over TCP with a remote node
|
|
func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([]pushNodeState, []byte, error) {
|
|
func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([]pushNodeState, []byte, error) {
|
|
// Attempt to connect
|
|
// Attempt to connect
|
|
dialer := net.Dialer{Timeout: m.config.TCPTimeout}
|
|
dialer := net.Dialer{Timeout: m.config.TCPTimeout}
|
|
@@ -589,15 +684,21 @@ func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([
|
|
return nil, nil, err
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
|
|
- // Read remote state
|
|
|
|
- _, remote, userState, err := m.readRemoteState(conn)
|
|
|
|
|
|
+ conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
|
|
|
|
+ msgType, bufConn, dec, err := m.readTCP(conn)
|
|
if err != nil {
|
|
if err != nil {
|
|
- err := fmt.Errorf("Reading remote state failed: %v", err)
|
|
|
|
return nil, nil, err
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
|
|
- // Return the remote state
|
|
|
|
- return remote, userState, nil
|
|
|
|
|
|
+ // Quit if not push/pull
|
|
|
|
+ if msgType != pushPullMsg {
|
|
|
|
+ err := fmt.Errorf("received invalid msgType (%d), expected pushPullMsg (%d) %s", msgType, pushPullMsg, LogConn(conn))
|
|
|
|
+ return nil, nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Read remote state
|
|
|
|
+ _, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
|
|
|
|
+ return remoteNodes, userState, err
|
|
}
|
|
}
|
|
|
|
|
|
// sendLocalState is invoked to send our local state over a tcp connection
|
|
// sendLocalState is invoked to send our local state over a tcp connection
|
|
@@ -658,34 +759,7 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error {
|
|
}
|
|
}
|
|
|
|
|
|
// Get the send buffer
|
|
// Get the send buffer
|
|
- sendBuf := bufConn.Bytes()
|
|
|
|
-
|
|
|
|
- // Check if compresion is enabled
|
|
|
|
- if m.config.EnableCompression {
|
|
|
|
- compBuf, err := compressPayload(bufConn.Bytes())
|
|
|
|
- if err != nil {
|
|
|
|
- m.logger.Printf("[ERROR] memberlist: Failed to compress local state: %v", err)
|
|
|
|
- } else {
|
|
|
|
- sendBuf = compBuf.Bytes()
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Check if encryption is enabled
|
|
|
|
- if m.config.EncryptionEnabled() {
|
|
|
|
- crypt, err := m.encryptLocalState(sendBuf)
|
|
|
|
- if err != nil {
|
|
|
|
- m.logger.Printf("[ERROR] memberlist: Failed to encrypt local state: %v", err)
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
- sendBuf = crypt
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Write out the entire send buffer
|
|
|
|
- metrics.IncrCounter([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf)))
|
|
|
|
- if _, err := conn.Write(sendBuf); err != nil {
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
- return nil
|
|
|
|
|
|
+ return m.rawSendMsgTCP(conn, bufConn.Bytes())
|
|
}
|
|
}
|
|
|
|
|
|
// encryptLocalState is used to help encrypt local state before sending
|
|
// encryptLocalState is used to help encrypt local state before sending
|
|
@@ -743,38 +817,36 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader) ([]byte, error) {
|
|
return decryptPayload(keys, cipherBytes, dataBytes)
|
|
return decryptPayload(keys, cipherBytes, dataBytes)
|
|
}
|
|
}
|
|
|
|
|
|
-// recvRemoteState is used to read the remote state from a connection
|
|
|
|
-func (m *Memberlist) readRemoteState(conn net.Conn) (bool, []pushNodeState, []byte, error) {
|
|
|
|
- // Setup a deadline
|
|
|
|
- conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
|
|
|
|
-
|
|
|
|
|
|
+// readTCP is used to read the start of a TCP stream.
|
|
|
|
+// it decrypts and decompresses the stream if necessary
|
|
|
|
+func (m *Memberlist) readTCP(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) {
|
|
// Created a buffered reader
|
|
// Created a buffered reader
|
|
var bufConn io.Reader = bufio.NewReader(conn)
|
|
var bufConn io.Reader = bufio.NewReader(conn)
|
|
|
|
|
|
// Read the message type
|
|
// Read the message type
|
|
buf := [1]byte{0}
|
|
buf := [1]byte{0}
|
|
if _, err := bufConn.Read(buf[:]); err != nil {
|
|
if _, err := bufConn.Read(buf[:]); err != nil {
|
|
- return false, nil, nil, err
|
|
|
|
|
|
+ return 0, nil, nil, err
|
|
}
|
|
}
|
|
msgType := messageType(buf[0])
|
|
msgType := messageType(buf[0])
|
|
|
|
|
|
// Check if the message is encrypted
|
|
// Check if the message is encrypted
|
|
if msgType == encryptMsg {
|
|
if msgType == encryptMsg {
|
|
if !m.config.EncryptionEnabled() {
|
|
if !m.config.EncryptionEnabled() {
|
|
- return false, nil, nil,
|
|
|
|
|
|
+ return 0, nil, nil,
|
|
fmt.Errorf("Remote state is encrypted and encryption is not configured")
|
|
fmt.Errorf("Remote state is encrypted and encryption is not configured")
|
|
}
|
|
}
|
|
|
|
|
|
plain, err := m.decryptRemoteState(bufConn)
|
|
plain, err := m.decryptRemoteState(bufConn)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return false, nil, nil, err
|
|
|
|
|
|
+ return 0, nil, nil, err
|
|
}
|
|
}
|
|
|
|
|
|
// Reset message type and bufConn
|
|
// Reset message type and bufConn
|
|
msgType = messageType(plain[0])
|
|
msgType = messageType(plain[0])
|
|
bufConn = bytes.NewReader(plain[1:])
|
|
bufConn = bytes.NewReader(plain[1:])
|
|
} else if m.config.EncryptionEnabled() {
|
|
} else if m.config.EncryptionEnabled() {
|
|
- return false, nil, nil,
|
|
|
|
|
|
+ return 0, nil, nil,
|
|
fmt.Errorf("Encryption is configured but remote state is not encrypted")
|
|
fmt.Errorf("Encryption is configured but remote state is not encrypted")
|
|
}
|
|
}
|
|
|
|
|
|
@@ -786,11 +858,11 @@ func (m *Memberlist) readRemoteState(conn net.Conn) (bool, []pushNodeState, []by
|
|
if msgType == compressMsg {
|
|
if msgType == compressMsg {
|
|
var c compress
|
|
var c compress
|
|
if err := dec.Decode(&c); err != nil {
|
|
if err := dec.Decode(&c); err != nil {
|
|
- return false, nil, nil, err
|
|
|
|
|
|
+ return 0, nil, nil, err
|
|
}
|
|
}
|
|
decomp, err := decompressBuffer(&c)
|
|
decomp, err := decompressBuffer(&c)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return false, nil, nil, err
|
|
|
|
|
|
+ return 0, nil, nil, err
|
|
}
|
|
}
|
|
|
|
|
|
// Reset the message type
|
|
// Reset the message type
|
|
@@ -803,12 +875,11 @@ func (m *Memberlist) readRemoteState(conn net.Conn) (bool, []pushNodeState, []by
|
|
dec = codec.NewDecoder(bufConn, &hd)
|
|
dec = codec.NewDecoder(bufConn, &hd)
|
|
}
|
|
}
|
|
|
|
|
|
- // Quit if not push/pull
|
|
|
|
- if msgType != pushPullMsg {
|
|
|
|
- err := fmt.Errorf("received invalid msgType (%d)", msgType)
|
|
|
|
- return false, nil, nil, err
|
|
|
|
- }
|
|
|
|
|
|
+ return msgType, bufConn, dec, nil
|
|
|
|
+}
|
|
|
|
|
|
|
|
+// readRemoteState is used to read the remote state from a connection
|
|
|
|
+func (m *Memberlist) readRemoteState(bufConn io.Reader, dec *codec.Decoder) (bool, []pushNodeState, []byte, error) {
|
|
// Read the push/pull header
|
|
// Read the push/pull header
|
|
var header pushPullHeader
|
|
var header pushPullHeader
|
|
if err := dec.Decode(&header); err != nil {
|
|
if err := dec.Decode(&header); err != nil {
|
|
@@ -821,7 +892,7 @@ func (m *Memberlist) readRemoteState(conn net.Conn) (bool, []pushNodeState, []by
|
|
// Try to decode all the states
|
|
// Try to decode all the states
|
|
for i := 0; i < header.Nodes; i++ {
|
|
for i := 0; i < header.Nodes; i++ {
|
|
if err := dec.Decode(&remoteNodes[i]); err != nil {
|
|
if err := dec.Decode(&remoteNodes[i]); err != nil {
|
|
- return false, remoteNodes, nil, err
|
|
|
|
|
|
+ return false, nil, nil, err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -836,7 +907,7 @@ func (m *Memberlist) readRemoteState(conn net.Conn) (bool, []pushNodeState, []by
|
|
bytes, header.UserStateLen)
|
|
bytes, header.UserStateLen)
|
|
}
|
|
}
|
|
if err != nil {
|
|
if err != nil {
|
|
- return false, remoteNodes, nil, err
|
|
|
|
|
|
+ return false, nil, nil, err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -850,3 +921,119 @@ func (m *Memberlist) readRemoteState(conn net.Conn) (bool, []pushNodeState, []by
|
|
|
|
|
|
return header.Join, remoteNodes, userBuf, nil
|
|
return header.Join, remoteNodes, userBuf, nil
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+// mergeRemoteState is used to merge the remote state with our local state
|
|
|
|
+func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, userBuf []byte) error {
|
|
|
|
+ if err := m.verifyProtocol(remoteNodes); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Invoke the merge delegate if any
|
|
|
|
+ if join && m.config.Merge != nil {
|
|
|
|
+ nodes := make([]*Node, len(remoteNodes))
|
|
|
|
+ for idx, n := range remoteNodes {
|
|
|
|
+ nodes[idx] = &Node{
|
|
|
|
+ Name: n.Name,
|
|
|
|
+ Addr: n.Addr,
|
|
|
|
+ Port: n.Port,
|
|
|
|
+ Meta: n.Meta,
|
|
|
|
+ PMin: n.Vsn[0],
|
|
|
|
+ PMax: n.Vsn[1],
|
|
|
|
+ PCur: n.Vsn[2],
|
|
|
|
+ DMin: n.Vsn[3],
|
|
|
|
+ DMax: n.Vsn[4],
|
|
|
|
+ DCur: n.Vsn[5],
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if err := m.config.Merge.NotifyMerge(nodes); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Merge the membership state
|
|
|
|
+ m.mergeState(remoteNodes)
|
|
|
|
+
|
|
|
|
+ // Invoke the delegate for user state
|
|
|
|
+ if userBuf != nil && m.config.Delegate != nil {
|
|
|
|
+ m.config.Delegate.MergeRemoteState(userBuf, join)
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// readUserMsg is used to decode a userMsg from a TCP stream
|
|
|
|
+func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
|
|
|
|
+ // Read the user message header
|
|
|
|
+ var header userMsgHeader
|
|
|
|
+ if err := dec.Decode(&header); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Read the user message into a buffer
|
|
|
|
+ var userBuf []byte
|
|
|
|
+ if header.UserMsgLen > 0 {
|
|
|
|
+ userBuf = make([]byte, header.UserMsgLen)
|
|
|
|
+ bytes, err := io.ReadAtLeast(bufConn, userBuf, header.UserMsgLen)
|
|
|
|
+ if err == nil && bytes != header.UserMsgLen {
|
|
|
|
+ err = fmt.Errorf(
|
|
|
|
+ "Failed to read full user message (%d / %d)",
|
|
|
|
+ bytes, header.UserMsgLen)
|
|
|
|
+ }
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ d := m.config.Delegate
|
|
|
|
+ if d != nil {
|
|
|
|
+ d.NotifyMsg(userBuf)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// sendPingAndWaitForAck makes a TCP connection to the given address, sends
|
|
|
|
+// a ping, and waits for an ack. All of this is done as a series of blocking
|
|
|
|
+// operations, given the deadline. The bool return parameter is true if we
|
|
|
|
+// we able to round trip a ping to the other node.
|
|
|
|
+func (m *Memberlist) sendPingAndWaitForAck(destAddr net.Addr, ping ping, deadline time.Time) (bool, error) {
|
|
|
|
+ dialer := net.Dialer{Deadline: deadline}
|
|
|
|
+ conn, err := dialer.Dial("tcp", destAddr.String())
|
|
|
|
+ if err != nil {
|
|
|
|
+ // If the node is actually dead we expect this to fail, so we
|
|
|
|
+ // shouldn't spam the logs with it. After this point, errors
|
|
|
|
+ // with the connection are real, unexpected errors and should
|
|
|
|
+ // get propagated up.
|
|
|
|
+ return false, nil
|
|
|
|
+ }
|
|
|
|
+ defer conn.Close()
|
|
|
|
+ conn.SetDeadline(deadline)
|
|
|
|
+
|
|
|
|
+ out, err := encode(pingMsg, &ping)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return false, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if err = m.rawSendMsgTCP(conn, out.Bytes()); err != nil {
|
|
|
|
+ return false, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ msgType, _, dec, err := m.readTCP(conn)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return false, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if msgType != ackRespMsg {
|
|
|
|
+ return false, fmt.Errorf("Unexpected msgType (%d) from TCP ping %s", msgType, LogConn(conn))
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ var ack ackResp
|
|
|
|
+ if err = dec.Decode(&ack); err != nil {
|
|
|
|
+ return false, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if ack.SeqNo != ping.SeqNo {
|
|
|
|
+ return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d) from TCP ping %s", ack.SeqNo, ping.SeqNo, LogConn(conn))
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return true, nil
|
|
|
|
+}
|