|
@@ -10,8 +10,10 @@ import (
|
|
|
"log"
|
|
|
"math/rand"
|
|
|
"net"
|
|
|
+ "os"
|
|
|
"strconv"
|
|
|
"sync"
|
|
|
+ "sync/atomic"
|
|
|
"time"
|
|
|
|
|
|
"github.com/armon/go-metrics"
|
|
@@ -25,7 +27,7 @@ import (
|
|
|
// version to memberlist below.
|
|
|
const (
|
|
|
ProtocolVersionMin uint8 = 2
|
|
|
- ProtocolVersionMax = 4
|
|
|
+ ProtocolVersionMax = 5
|
|
|
)
|
|
|
|
|
|
const (
|
|
@@ -65,16 +67,15 @@ type Serf struct {
|
|
|
memberLock sync.RWMutex
|
|
|
members map[string]*memberState
|
|
|
|
|
|
- // Circular buffers for recent intents, used
|
|
|
- // in case we get the intent before the relevant event
|
|
|
- recentLeave []nodeIntent
|
|
|
- recentLeaveIndex int
|
|
|
- recentJoin []nodeIntent
|
|
|
- recentJoinIndex int
|
|
|
+ // recentIntents the lamport time and type of intent for a given node in
|
|
|
+ // case we get an intent before the relevant memberlist event. This is
|
|
|
+ // indexed by node, and always store the latest lamport time / intent
|
|
|
+ // we've seen. The memberLock protects this structure.
|
|
|
+ recentIntents map[string]nodeIntent
|
|
|
|
|
|
eventBroadcasts *memberlist.TransmitLimitedQueue
|
|
|
eventBuffer []*userEvents
|
|
|
- eventJoinIgnore bool
|
|
|
+ eventJoinIgnore atomic.Value
|
|
|
eventMinTime LamportTime
|
|
|
eventLock sync.RWMutex
|
|
|
|
|
@@ -179,10 +180,18 @@ type memberState struct {
|
|
|
leaveTime time.Time // wall clock time of leave
|
|
|
}
|
|
|
|
|
|
-// nodeIntent is used to buffer intents for out-of-order deliveries
|
|
|
+// nodeIntent is used to buffer intents for out-of-order deliveries.
|
|
|
type nodeIntent struct {
|
|
|
+ // Type is the intent being tracked. Only messageJoinType and
|
|
|
+ // messageLeaveType are tracked.
|
|
|
+ Type messageType
|
|
|
+
|
|
|
+ // WallTime is the wall clock time we saw this intent in order to
|
|
|
+ // expire it from the buffer.
|
|
|
+ WallTime time.Time
|
|
|
+
|
|
|
+ // LTime is the Lamport time, used for cluster-wide ordering of events.
|
|
|
LTime LamportTime
|
|
|
- Node string
|
|
|
}
|
|
|
|
|
|
// userEvent is used to buffer events to prevent re-delivery
|
|
@@ -214,8 +223,8 @@ type queries struct {
|
|
|
}
|
|
|
|
|
|
const (
|
|
|
- UserEventSizeLimit = 512 // Maximum byte size for event name and payload
|
|
|
snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot
|
|
|
+ UserEventSizeLimit = 9 * 1024 // Maximum 9KB for event name and payload
|
|
|
)
|
|
|
|
|
|
// Create creates a new Serf instance, starting all the background tasks
|
|
@@ -233,14 +242,28 @@ func Create(conf *Config) (*Serf, error) {
|
|
|
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
|
|
}
|
|
|
|
|
|
+ if conf.UserEventSizeLimit > UserEventSizeLimit {
|
|
|
+ return nil, fmt.Errorf("user event size limit exceeds limit of %d bytes", UserEventSizeLimit)
|
|
|
+ }
|
|
|
+
|
|
|
+ logger := conf.Logger
|
|
|
+ if logger == nil {
|
|
|
+ logOutput := conf.LogOutput
|
|
|
+ if logOutput == nil {
|
|
|
+ logOutput = os.Stderr
|
|
|
+ }
|
|
|
+ logger = log.New(logOutput, "", log.LstdFlags)
|
|
|
+ }
|
|
|
+
|
|
|
serf := &Serf{
|
|
|
config: conf,
|
|
|
- logger: log.New(conf.LogOutput, "", log.LstdFlags),
|
|
|
+ logger: logger,
|
|
|
members: make(map[string]*memberState),
|
|
|
queryResponse: make(map[LamportTime]*QueryResponse),
|
|
|
shutdownCh: make(chan struct{}),
|
|
|
state: SerfAlive,
|
|
|
}
|
|
|
+ serf.eventJoinIgnore.Store(false)
|
|
|
|
|
|
// Check that the meta data length is okay
|
|
|
if len(serf.encodeTags(conf.Tags)) > memberlist.MetaMaxSize {
|
|
@@ -295,7 +318,6 @@ func Create(conf *Config) (*Serf, error) {
|
|
|
conf.RejoinAfterLeave,
|
|
|
serf.logger,
|
|
|
&serf.clock,
|
|
|
- serf.coordClient,
|
|
|
conf.EventCh,
|
|
|
serf.shutdownCh)
|
|
|
if err != nil {
|
|
@@ -321,27 +343,20 @@ func Create(conf *Config) (*Serf, error) {
|
|
|
// Setup the various broadcast queues, which we use to send our own
|
|
|
// custom broadcasts along the gossip channel.
|
|
|
serf.broadcasts = &memberlist.TransmitLimitedQueue{
|
|
|
- NumNodes: func() int {
|
|
|
- return len(serf.members)
|
|
|
- },
|
|
|
+ NumNodes: serf.NumNodes,
|
|
|
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
|
|
}
|
|
|
serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{
|
|
|
- NumNodes: func() int {
|
|
|
- return len(serf.members)
|
|
|
- },
|
|
|
+ NumNodes: serf.NumNodes,
|
|
|
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
|
|
}
|
|
|
serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{
|
|
|
- NumNodes: func() int {
|
|
|
- return len(serf.members)
|
|
|
- },
|
|
|
+ NumNodes: serf.NumNodes,
|
|
|
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
|
|
}
|
|
|
|
|
|
// Create the buffer for recent intents
|
|
|
- serf.recentJoin = make([]nodeIntent, conf.RecentIntentBuffer)
|
|
|
- serf.recentLeave = make([]nodeIntent, conf.RecentIntentBuffer)
|
|
|
+ serf.recentIntents = make(map[string]nodeIntent)
|
|
|
|
|
|
// Create a buffer for events and queries
|
|
|
serf.eventBuffer = make([]*userEvents, conf.EventBuffer)
|
|
@@ -426,14 +441,25 @@ func (s *Serf) KeyManager() *KeyManager {
|
|
|
}
|
|
|
|
|
|
// UserEvent is used to broadcast a custom user event with a given
|
|
|
-// name and payload. The events must be fairly small, and if the
|
|
|
-// size limit is exceeded and error will be returned. If coalesce is enabled,
|
|
|
-// nodes are allowed to coalesce this event. Coalescing is only available
|
|
|
-// starting in v0.2
|
|
|
+// name and payload. If the configured size limit is exceeded and error will be returned.
|
|
|
+// If coalesce is enabled, nodes are allowed to coalesce this event.
|
|
|
+// Coalescing is only available starting in v0.2
|
|
|
func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error {
|
|
|
- // Check the size limit
|
|
|
- if len(name)+len(payload) > UserEventSizeLimit {
|
|
|
- return fmt.Errorf("user event exceeds limit of %d bytes", UserEventSizeLimit)
|
|
|
+ payloadSizeBeforeEncoding := len(name) + len(payload)
|
|
|
+
|
|
|
+ // Check size before encoding to prevent needless encoding and return early if it's over the specified limit.
|
|
|
+ if payloadSizeBeforeEncoding > s.config.UserEventSizeLimit {
|
|
|
+ return fmt.Errorf(
|
|
|
+ "user event exceeds configured limit of %d bytes before encoding",
|
|
|
+ s.config.UserEventSizeLimit,
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ if payloadSizeBeforeEncoding > UserEventSizeLimit {
|
|
|
+ return fmt.Errorf(
|
|
|
+ "user event exceeds sane limit of %d bytes before encoding",
|
|
|
+ UserEventSizeLimit,
|
|
|
+ )
|
|
|
}
|
|
|
|
|
|
// Create a message
|
|
@@ -443,16 +469,34 @@ func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error {
|
|
|
Payload: payload,
|
|
|
CC: coalesce,
|
|
|
}
|
|
|
- s.eventClock.Increment()
|
|
|
-
|
|
|
- // Process update locally
|
|
|
- s.handleUserEvent(&msg)
|
|
|
|
|
|
// Start broadcasting the event
|
|
|
raw, err := encodeMessage(messageUserEventType, &msg)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+
|
|
|
+ // Check the size after encoding to be sure again that
|
|
|
+ // we're not attempting to send over the specified size limit.
|
|
|
+ if len(raw) > s.config.UserEventSizeLimit {
|
|
|
+ return fmt.Errorf(
|
|
|
+ "encoded user event exceeds configured limit of %d bytes after encoding",
|
|
|
+ s.config.UserEventSizeLimit,
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(raw) > UserEventSizeLimit {
|
|
|
+ return fmt.Errorf(
|
|
|
+ "encoded user event exceeds sane limit of %d bytes before encoding",
|
|
|
+ UserEventSizeLimit,
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ s.eventClock.Increment()
|
|
|
+
|
|
|
+ // Process update locally
|
|
|
+ s.handleUserEvent(&msg)
|
|
|
+
|
|
|
s.eventBroadcasts.QueueBroadcast(&broadcast{
|
|
|
msg: raw,
|
|
|
})
|
|
@@ -493,15 +537,16 @@ func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryRes
|
|
|
|
|
|
// Create a message
|
|
|
q := messageQuery{
|
|
|
- LTime: s.queryClock.Time(),
|
|
|
- ID: uint32(rand.Int31()),
|
|
|
- Addr: local.Addr,
|
|
|
- Port: local.Port,
|
|
|
- Filters: filters,
|
|
|
- Flags: flags,
|
|
|
- Timeout: params.Timeout,
|
|
|
- Name: name,
|
|
|
- Payload: payload,
|
|
|
+ LTime: s.queryClock.Time(),
|
|
|
+ ID: uint32(rand.Int31()),
|
|
|
+ Addr: local.Addr,
|
|
|
+ Port: local.Port,
|
|
|
+ Filters: filters,
|
|
|
+ Flags: flags,
|
|
|
+ RelayFactor: params.RelayFactor,
|
|
|
+ Timeout: params.Timeout,
|
|
|
+ Name: name,
|
|
|
+ Payload: payload,
|
|
|
}
|
|
|
|
|
|
// Encode the query
|
|
@@ -582,9 +627,9 @@ func (s *Serf) Join(existing []string, ignoreOld bool) (int, error) {
|
|
|
// Ignore any events from a potential join. This is safe since we hold
|
|
|
// the joinLock and nobody else can be doing a Join
|
|
|
if ignoreOld {
|
|
|
- s.eventJoinIgnore = true
|
|
|
+ s.eventJoinIgnore.Store(true)
|
|
|
defer func() {
|
|
|
- s.eventJoinIgnore = false
|
|
|
+ s.eventJoinIgnore.Store(false)
|
|
|
}()
|
|
|
}
|
|
|
|
|
@@ -679,6 +724,13 @@ func (s *Serf) Leave() error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+ // Wait for the leave to propagate through the cluster. The broadcast
|
|
|
+ // timeout is how long we wait for the message to go out from our own
|
|
|
+ // queue, but this wait is for that message to propagate through the
|
|
|
+ // cluster. In particular, we want to stay up long enough to service
|
|
|
+ // any probes from other nodes before they learn about us leaving.
|
|
|
+ time.Sleep(s.config.LeavePropagateDelay)
|
|
|
+
|
|
|
// Transition to Left only if we not already shutdown
|
|
|
s.stateLock.Lock()
|
|
|
if s.state != SerfShutdown {
|
|
@@ -729,15 +781,26 @@ func (s *Serf) Members() []Member {
|
|
|
return members
|
|
|
}
|
|
|
|
|
|
-// RemoveFailedNode forcibly removes a failed node from the cluster
|
|
|
+// RemoveFailedNode is a backwards compatible form
|
|
|
+// of forceleave
|
|
|
+func (s *Serf) RemoveFailedNode(node string) error {
|
|
|
+ return s.forceLeave(node, false)
|
|
|
+}
|
|
|
+
|
|
|
+func (s *Serf) RemoveFailedNodePrune(node string) error {
|
|
|
+ return s.forceLeave(node, true)
|
|
|
+}
|
|
|
+
|
|
|
+// ForceLeave forcibly removes a failed node from the cluster
|
|
|
// immediately, instead of waiting for the reaper to eventually reclaim it.
|
|
|
// This also has the effect that Serf will no longer attempt to reconnect
|
|
|
// to this node.
|
|
|
-func (s *Serf) RemoveFailedNode(node string) error {
|
|
|
+func (s *Serf) forceLeave(node string, prune bool) error {
|
|
|
// Construct the message to broadcast
|
|
|
msg := messageLeave{
|
|
|
LTime: s.clock.Time(),
|
|
|
Node: node,
|
|
|
+ Prune: prune,
|
|
|
}
|
|
|
s.clock.Increment()
|
|
|
|
|
@@ -785,13 +848,15 @@ func (s *Serf) Shutdown() error {
|
|
|
s.logger.Printf("[WARN] serf: Shutdown without a Leave")
|
|
|
}
|
|
|
|
|
|
+ // Wait to close the shutdown channel until after we've shut down the
|
|
|
+ // memberlist and its associated network resources, since the shutdown
|
|
|
+ // channel signals that we are cleaned up outside of Serf.
|
|
|
s.state = SerfShutdown
|
|
|
- close(s.shutdownCh)
|
|
|
-
|
|
|
err := s.memberlist.Shutdown()
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ close(s.shutdownCh)
|
|
|
|
|
|
// Wait for the snapshoter to finish if we have one
|
|
|
if s.snapshotter != nil {
|
|
@@ -855,22 +920,25 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) {
|
|
|
},
|
|
|
}
|
|
|
|
|
|
- // Check if we have a join intent and use the LTime
|
|
|
- if join := recentIntent(s.recentJoin, n.Name); join != nil {
|
|
|
- member.statusLTime = join.LTime
|
|
|
+ // Check if we have a join or leave intent. The intent buffer
|
|
|
+ // will only hold one event for this node, so the more recent
|
|
|
+ // one will take effect.
|
|
|
+ if join, ok := recentIntent(s.recentIntents, n.Name, messageJoinType); ok {
|
|
|
+ member.statusLTime = join
|
|
|
}
|
|
|
-
|
|
|
- // Check if we have a leave intent
|
|
|
- if leave := recentIntent(s.recentLeave, n.Name); leave != nil {
|
|
|
- if leave.LTime > member.statusLTime {
|
|
|
- member.Status = StatusLeaving
|
|
|
- member.statusLTime = leave.LTime
|
|
|
- }
|
|
|
+ if leave, ok := recentIntent(s.recentIntents, n.Name, messageLeaveType); ok {
|
|
|
+ member.Status = StatusLeaving
|
|
|
+ member.statusLTime = leave
|
|
|
}
|
|
|
|
|
|
s.members[n.Name] = member
|
|
|
} else {
|
|
|
oldStatus = member.Status
|
|
|
+ deadTime := time.Now().Sub(member.leaveTime)
|
|
|
+ if oldStatus == StatusFailed && deadTime < s.config.FlapTimeout {
|
|
|
+ metrics.IncrCounter([]string{"serf", "member", "flap"}, 1)
|
|
|
+ }
|
|
|
+
|
|
|
member.Status = StatusAlive
|
|
|
member.leaveTime = time.Time{}
|
|
|
member.Addr = net.IP(n.Addr)
|
|
@@ -1003,6 +1071,7 @@ func (s *Serf) handleNodeUpdate(n *memberlist.Node) {
|
|
|
|
|
|
// handleNodeLeaveIntent is called when an intent to leave is received.
|
|
|
func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
+
|
|
|
// Witness a potentially newer time
|
|
|
s.clock.Witness(leaveMsg.LTime)
|
|
|
|
|
@@ -1011,18 +1080,8 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
|
|
|
member, ok := s.members[leaveMsg.Node]
|
|
|
if !ok {
|
|
|
- // If we've already seen this message don't rebroadcast
|
|
|
- if recentIntent(s.recentLeave, leaveMsg.Node) != nil {
|
|
|
- return false
|
|
|
- }
|
|
|
-
|
|
|
- // We don't know this member so store it in a buffer for now
|
|
|
- s.recentLeave[s.recentLeaveIndex] = nodeIntent{
|
|
|
- LTime: leaveMsg.LTime,
|
|
|
- Node: leaveMsg.Node,
|
|
|
- }
|
|
|
- s.recentLeaveIndex = (s.recentLeaveIndex + 1) % len(s.recentLeave)
|
|
|
- return true
|
|
|
+ // Rebroadcast only if this was an update we hadn't seen before.
|
|
|
+ return upsertIntent(s.recentIntents, leaveMsg.Node, messageLeaveType, leaveMsg.LTime, time.Now)
|
|
|
}
|
|
|
|
|
|
// If the message is old, then it is irrelevant and we can skip it
|
|
@@ -1043,6 +1102,10 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
case StatusAlive:
|
|
|
member.Status = StatusLeaving
|
|
|
member.statusLTime = leaveMsg.LTime
|
|
|
+
|
|
|
+ if leaveMsg.Prune {
|
|
|
+ s.handlePrune(member)
|
|
|
+ }
|
|
|
return true
|
|
|
case StatusFailed:
|
|
|
member.Status = StatusLeft
|
|
@@ -1051,6 +1114,7 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
// Remove from the failed list and add to the left list. We add
|
|
|
// to the left list so that when we do a sync, other nodes will
|
|
|
// remove it from their failed list.
|
|
|
+
|
|
|
s.failedMembers = removeOldMember(s.failedMembers, member.Name)
|
|
|
s.leftMembers = append(s.leftMembers, member)
|
|
|
|
|
@@ -1065,12 +1129,40 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
Members: []Member{member.Member},
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if leaveMsg.Prune {
|
|
|
+ s.handlePrune(member)
|
|
|
+ }
|
|
|
+
|
|
|
+ return true
|
|
|
+
|
|
|
+ case StatusLeaving, StatusLeft:
|
|
|
+ if leaveMsg.Prune {
|
|
|
+ s.handlePrune(member)
|
|
|
+ }
|
|
|
return true
|
|
|
default:
|
|
|
return false
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// handlePrune waits for nodes that are leaving and then forcibly
|
|
|
+// erases a member from the list of members
|
|
|
+func (s *Serf) handlePrune(member *memberState) {
|
|
|
+ if member.Status == StatusLeaving {
|
|
|
+ time.Sleep(s.config.BroadcastTimeout + s.config.LeavePropagateDelay)
|
|
|
+ }
|
|
|
+
|
|
|
+ s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr)
|
|
|
+
|
|
|
+ //If we are leaving or left we may be in that list of members
|
|
|
+ if member.Status == StatusLeaving || member.Status == StatusLeft {
|
|
|
+ s.leftMembers = removeOldMember(s.leftMembers, member.Name)
|
|
|
+ }
|
|
|
+ s.eraseNode(member)
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
// handleNodeJoinIntent is called when a node broadcasts a
|
|
|
// join message to set the lamport time of its join
|
|
|
func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool {
|
|
@@ -1082,15 +1174,8 @@ func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool {
|
|
|
|
|
|
member, ok := s.members[joinMsg.Node]
|
|
|
if !ok {
|
|
|
- // If we've already seen this message don't rebroadcast
|
|
|
- if recentIntent(s.recentJoin, joinMsg.Node) != nil {
|
|
|
- return false
|
|
|
- }
|
|
|
-
|
|
|
- // We don't know this member so store it in a buffer for now
|
|
|
- s.recentJoin[s.recentJoinIndex] = nodeIntent{LTime: joinMsg.LTime, Node: joinMsg.Node}
|
|
|
- s.recentJoinIndex = (s.recentJoinIndex + 1) % len(s.recentJoin)
|
|
|
- return true
|
|
|
+ // Rebroadcast only if this was an update we hadn't seen before.
|
|
|
+ return upsertIntent(s.recentIntents, joinMsg.Node, messageJoinType, joinMsg.LTime, time.Now)
|
|
|
}
|
|
|
|
|
|
// Check if this time is newer than what we have
|
|
@@ -1245,19 +1330,23 @@ func (s *Serf) handleQuery(query *messageQuery) bool {
|
|
|
if err := s.memberlist.SendTo(&addr, raw); err != nil {
|
|
|
s.logger.Printf("[ERR] serf: failed to send ack: %v", err)
|
|
|
}
|
|
|
+ if err := s.relayResponse(query.RelayFactor, addr, &ack); err != nil {
|
|
|
+ s.logger.Printf("[ERR] serf: failed to relay ack: %v", err)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if s.config.EventCh != nil {
|
|
|
s.config.EventCh <- &Query{
|
|
|
- LTime: query.LTime,
|
|
|
- Name: query.Name,
|
|
|
- Payload: query.Payload,
|
|
|
- serf: s,
|
|
|
- id: query.ID,
|
|
|
- addr: query.Addr,
|
|
|
- port: query.Port,
|
|
|
- deadline: time.Now().Add(query.Timeout),
|
|
|
+ LTime: query.LTime,
|
|
|
+ Name: query.Name,
|
|
|
+ Payload: query.Payload,
|
|
|
+ serf: s,
|
|
|
+ id: query.ID,
|
|
|
+ addr: query.Addr,
|
|
|
+ port: query.Port,
|
|
|
+ deadline: time.Now().Add(query.Timeout),
|
|
|
+ relayFactor: query.RelayFactor,
|
|
|
}
|
|
|
}
|
|
|
return rebroadcast
|
|
@@ -1290,25 +1379,37 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
|
|
|
|
|
|
// Process each type of response
|
|
|
if resp.Ack() {
|
|
|
+ // Exit early if this is a duplicate ack
|
|
|
+ if _, ok := query.acks[resp.From]; ok {
|
|
|
+ metrics.IncrCounter([]string{"serf", "query_duplicate_acks"}, 1)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
metrics.IncrCounter([]string{"serf", "query_acks"}, 1)
|
|
|
select {
|
|
|
case query.ackCh <- resp.From:
|
|
|
+ query.acks[resp.From] = struct{}{}
|
|
|
default:
|
|
|
- s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping")
|
|
|
+ s.logger.Printf("[WARN] serf: Failed to deliver query ack, dropping")
|
|
|
}
|
|
|
} else {
|
|
|
+ // Exit early if this is a duplicate response
|
|
|
+ if _, ok := query.responses[resp.From]; ok {
|
|
|
+ metrics.IncrCounter([]string{"serf", "query_duplicate_responses"}, 1)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
|
|
|
- select {
|
|
|
- case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}:
|
|
|
- default:
|
|
|
- s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping")
|
|
|
+ err := query.sendResponse(NodeResponse{From: resp.From, Payload: resp.Payload})
|
|
|
+ if err != nil {
|
|
|
+ s.logger.Printf("[WARN] %v", err)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// handleNodeConflict is invoked when a join detects a conflict over a name.
|
|
|
// This means two different nodes (IP/Port) are claiming the same name. Memberlist
|
|
|
-// will reject the "new" node mapping, but we can still be notified
|
|
|
+// will reject the "new" node mapping, but we can still be notified.
|
|
|
func (s *Serf) handleNodeConflict(existing, other *memberlist.Node) {
|
|
|
// Log a basic warning if the node is not us...
|
|
|
if existing.Name != s.config.NodeName {
|
|
@@ -1361,7 +1462,7 @@ func (s *Serf) resolveNodeConflict() {
|
|
|
|
|
|
// Update the counters
|
|
|
responses++
|
|
|
- if bytes.Equal(member.Addr, local.Addr) && member.Port == local.Port {
|
|
|
+ if member.Addr.Equal(local.Addr) && member.Port == local.Port {
|
|
|
matching++
|
|
|
}
|
|
|
}
|
|
@@ -1382,14 +1483,41 @@ func (s *Serf) resolveNodeConflict() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// handleReap periodically reaps the list of failed and left members.
|
|
|
+//eraseNode takes a node completely out of the member list
|
|
|
+func (s *Serf) eraseNode(m *memberState) {
|
|
|
+ // Delete from members
|
|
|
+ delete(s.members, m.Name)
|
|
|
+
|
|
|
+ // Tell the coordinate client the node has gone away and delete
|
|
|
+ // its cached coordinates.
|
|
|
+ if !s.config.DisableCoordinates {
|
|
|
+ s.coordClient.ForgetNode(m.Name)
|
|
|
+
|
|
|
+ s.coordCacheLock.Lock()
|
|
|
+ delete(s.coordCache, m.Name)
|
|
|
+ s.coordCacheLock.Unlock()
|
|
|
+ }
|
|
|
+
|
|
|
+ // Send an event along
|
|
|
+ if s.config.EventCh != nil {
|
|
|
+ s.config.EventCh <- MemberEvent{
|
|
|
+ Type: EventMemberReap,
|
|
|
+ Members: []Member{m.Member},
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// handleReap periodically reaps the list of failed and left members, as well
|
|
|
+// as old buffered intents.
|
|
|
func (s *Serf) handleReap() {
|
|
|
for {
|
|
|
select {
|
|
|
case <-time.After(s.config.ReapInterval):
|
|
|
s.memberLock.Lock()
|
|
|
- s.failedMembers = s.reap(s.failedMembers, s.config.ReconnectTimeout)
|
|
|
- s.leftMembers = s.reap(s.leftMembers, s.config.TombstoneTimeout)
|
|
|
+ now := time.Now()
|
|
|
+ s.failedMembers = s.reap(s.failedMembers, now, s.config.ReconnectTimeout)
|
|
|
+ s.leftMembers = s.reap(s.leftMembers, now, s.config.TombstoneTimeout)
|
|
|
+ reapIntents(s.recentIntents, now, s.config.RecentIntentTimeout)
|
|
|
s.memberLock.Unlock()
|
|
|
case <-s.shutdownCh:
|
|
|
return
|
|
@@ -1413,8 +1541,7 @@ func (s *Serf) handleReconnect() {
|
|
|
// reap is called with a list of old members and a timeout, and removes
|
|
|
// members that have exceeded the timeout. The members are removed from
|
|
|
// both the old list and the members itself. Locking is left to the caller.
|
|
|
-func (s *Serf) reap(old []*memberState, timeout time.Duration) []*memberState {
|
|
|
- now := time.Now()
|
|
|
+func (s *Serf) reap(old []*memberState, now time.Time, timeout time.Duration) []*memberState {
|
|
|
n := len(old)
|
|
|
for i := 0; i < n; i++ {
|
|
|
m := old[i]
|
|
@@ -1430,27 +1557,10 @@ func (s *Serf) reap(old []*memberState, timeout time.Duration) []*memberState {
|
|
|
n--
|
|
|
i--
|
|
|
|
|
|
- // Delete from members
|
|
|
- delete(s.members, m.Name)
|
|
|
-
|
|
|
- // Tell the coordinate client the node has gone away and delete
|
|
|
- // its cached coordinates.
|
|
|
- if !s.config.DisableCoordinates {
|
|
|
- s.coordClient.ForgetNode(m.Name)
|
|
|
-
|
|
|
- s.coordCacheLock.Lock()
|
|
|
- delete(s.coordCache, m.Name)
|
|
|
- s.coordCacheLock.Unlock()
|
|
|
- }
|
|
|
-
|
|
|
- // Send an event along
|
|
|
+ // Delete from members and send out event
|
|
|
s.logger.Printf("[INFO] serf: EventMemberReap: %s", m.Name)
|
|
|
- if s.config.EventCh != nil {
|
|
|
- s.config.EventCh <- MemberEvent{
|
|
|
- Type: EventMemberReap,
|
|
|
- Members: []Member{m.Member},
|
|
|
- }
|
|
|
- }
|
|
|
+ s.eraseNode(m)
|
|
|
+
|
|
|
}
|
|
|
|
|
|
return old
|
|
@@ -1485,7 +1595,7 @@ func (s *Serf) reconnect() {
|
|
|
}
|
|
|
|
|
|
// Select a random member to try and join
|
|
|
- idx := int(rand.Uint32() % uint32(n))
|
|
|
+ idx := rand.Int31n(int32(n))
|
|
|
mem := s.failedMembers[idx]
|
|
|
s.memberLock.RUnlock()
|
|
|
|
|
@@ -1497,21 +1607,37 @@ func (s *Serf) reconnect() {
|
|
|
s.memberlist.Join([]string{addr.String()})
|
|
|
}
|
|
|
|
|
|
+// getQueueMax will get the maximum queue depth, which might be dynamic depending
|
|
|
+// on how Serf is configured.
|
|
|
+func (s *Serf) getQueueMax() int {
|
|
|
+ max := s.config.MaxQueueDepth
|
|
|
+ if s.config.MinQueueDepth > 0 {
|
|
|
+ s.memberLock.RLock()
|
|
|
+ max = 2 * len(s.members)
|
|
|
+ s.memberLock.RUnlock()
|
|
|
+
|
|
|
+ if max < s.config.MinQueueDepth {
|
|
|
+ max = s.config.MinQueueDepth
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return max
|
|
|
+}
|
|
|
+
|
|
|
// checkQueueDepth periodically checks the size of a queue to see if
|
|
|
// it is too large
|
|
|
func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) {
|
|
|
for {
|
|
|
select {
|
|
|
- case <-time.After(time.Second):
|
|
|
+ case <-time.After(s.config.QueueCheckInterval):
|
|
|
numq := queue.NumQueued()
|
|
|
metrics.AddSample([]string{"serf", "queue", name}, float32(numq))
|
|
|
if numq >= s.config.QueueDepthWarning {
|
|
|
s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq)
|
|
|
}
|
|
|
- if numq > s.config.MaxQueueDepth {
|
|
|
+ if max := s.getQueueMax(); numq > max {
|
|
|
s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!",
|
|
|
- name, numq, s.config.MaxQueueDepth)
|
|
|
- queue.Prune(s.config.MaxQueueDepth)
|
|
|
+ name, numq, max)
|
|
|
+ queue.Prune(max)
|
|
|
}
|
|
|
case <-s.shutdownCh:
|
|
|
return
|
|
@@ -1533,24 +1659,46 @@ func removeOldMember(old []*memberState, name string) []*memberState {
|
|
|
return old
|
|
|
}
|
|
|
|
|
|
-// recentIntent checks the recent intent buffer for a matching
|
|
|
-// entry for a given node, and either returns the message or nil
|
|
|
-func recentIntent(recent []nodeIntent, node string) (intent *nodeIntent) {
|
|
|
- for i := 0; i < len(recent); i++ {
|
|
|
- // Break fast if we hit a zero entry
|
|
|
- if recent[i].LTime == 0 {
|
|
|
- break
|
|
|
+// reapIntents clears out any intents that are older than the timeout. Make sure
|
|
|
+// the memberLock is held when passing in the Serf instance's recentIntents
|
|
|
+// member.
|
|
|
+func reapIntents(intents map[string]nodeIntent, now time.Time, timeout time.Duration) {
|
|
|
+ for node, intent := range intents {
|
|
|
+ if now.Sub(intent.WallTime) > timeout {
|
|
|
+ delete(intents, node)
|
|
|
}
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
- // Check for a node match
|
|
|
- if recent[i].Node == node {
|
|
|
- // Take the most recent entry
|
|
|
- if intent == nil || recent[i].LTime > intent.LTime {
|
|
|
- intent = &recent[i]
|
|
|
- }
|
|
|
+// upsertIntent will update an existing intent with the supplied Lamport time,
|
|
|
+// or create a new entry. This will return true if a new entry was added. The
|
|
|
+// stamper is used to capture the wall clock time for expiring these buffered
|
|
|
+// intents. Make sure the memberLock is held when passing in the Serf instance's
|
|
|
+// recentIntents member.
|
|
|
+func upsertIntent(intents map[string]nodeIntent, node string, itype messageType,
|
|
|
+ ltime LamportTime, stamper func() time.Time) bool {
|
|
|
+ if intent, ok := intents[node]; !ok || ltime > intent.LTime {
|
|
|
+ intents[node] = nodeIntent{
|
|
|
+ Type: itype,
|
|
|
+ WallTime: stamper(),
|
|
|
+ LTime: ltime,
|
|
|
}
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
+// recentIntent checks the recent intent buffer for a matching entry for a given
|
|
|
+// node, and returns the Lamport time, if an intent is present, indicated by the
|
|
|
+// returned boolean. Make sure the memberLock is held for read when passing in
|
|
|
+// the Serf instance's recentIntents member.
|
|
|
+func recentIntent(intents map[string]nodeIntent, node string, itype messageType) (LamportTime, bool) {
|
|
|
+ if intent, ok := intents[node]; ok && intent.Type == itype {
|
|
|
+ return intent.LTime, true
|
|
|
}
|
|
|
- return
|
|
|
+
|
|
|
+ return LamportTime(0), false
|
|
|
}
|
|
|
|
|
|
// handleRejoin attempts to reconnect to previously known alive nodes
|
|
@@ -1613,10 +1761,18 @@ func (s *Serf) Stats() map[string]string {
|
|
|
toString := func(v uint64) string {
|
|
|
return strconv.FormatUint(v, 10)
|
|
|
}
|
|
|
+ s.memberLock.RLock()
|
|
|
+ members := toString(uint64(len(s.members)))
|
|
|
+ failed := toString(uint64(len(s.failedMembers)))
|
|
|
+ left := toString(uint64(len(s.leftMembers)))
|
|
|
+ health_score := toString(uint64(s.memberlist.GetHealthScore()))
|
|
|
+
|
|
|
+ s.memberLock.RUnlock()
|
|
|
stats := map[string]string{
|
|
|
- "members": toString(uint64(len(s.members))),
|
|
|
- "failed": toString(uint64(len(s.failedMembers))),
|
|
|
- "left": toString(uint64(len(s.leftMembers))),
|
|
|
+ "members": members,
|
|
|
+ "failed": failed,
|
|
|
+ "left": left,
|
|
|
+ "health_score": health_score,
|
|
|
"member_time": toString(uint64(s.clock.Time())),
|
|
|
"event_time": toString(uint64(s.eventClock.Time())),
|
|
|
"query_time": toString(uint64(s.queryClock.Time())),
|
|
@@ -1625,6 +1781,9 @@ func (s *Serf) Stats() map[string]string {
|
|
|
"query_queue": toString(uint64(s.queryBroadcasts.NumQueued())),
|
|
|
"encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()),
|
|
|
}
|
|
|
+ if !s.config.DisableCoordinates {
|
|
|
+ stats["coordinate_resets"] = toString(uint64(s.coordClient.Stats().Resets))
|
|
|
+ }
|
|
|
return stats
|
|
|
}
|
|
|
|