|
@@ -223,8 +223,8 @@ type queries struct {
|
|
|
}
|
|
|
|
|
|
const (
|
|
|
- UserEventSizeLimit = 512 // Maximum byte size for event name and payload
|
|
|
snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot
|
|
|
+ UserEventSizeLimit = 9 * 1024 // Maximum 9KB for event name and payload
|
|
|
)
|
|
|
|
|
|
// Create creates a new Serf instance, starting all the background tasks
|
|
@@ -242,6 +242,10 @@ func Create(conf *Config) (*Serf, error) {
|
|
|
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
|
|
}
|
|
|
|
|
|
+ if conf.UserEventSizeLimit > UserEventSizeLimit {
|
|
|
+ return nil, fmt.Errorf("user event size limit exceeds limit of %d bytes", UserEventSizeLimit)
|
|
|
+ }
|
|
|
+
|
|
|
logger := conf.Logger
|
|
|
if logger == nil {
|
|
|
logOutput := conf.LogOutput
|
|
@@ -437,14 +441,25 @@ func (s *Serf) KeyManager() *KeyManager {
|
|
|
}
|
|
|
|
|
|
// UserEvent is used to broadcast a custom user event with a given
|
|
|
-// name and payload. The events must be fairly small, and if the
|
|
|
-// size limit is exceeded and error will be returned. If coalesce is enabled,
|
|
|
-// nodes are allowed to coalesce this event. Coalescing is only available
|
|
|
-// starting in v0.2
|
|
|
+// name and payload. If the configured size limit is exceeded and error will be returned.
|
|
|
+// If coalesce is enabled, nodes are allowed to coalesce this event.
|
|
|
+// Coalescing is only available starting in v0.2
|
|
|
func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error {
|
|
|
- // Check the size limit
|
|
|
- if len(name)+len(payload) > UserEventSizeLimit {
|
|
|
- return fmt.Errorf("user event exceeds limit of %d bytes", UserEventSizeLimit)
|
|
|
+ payloadSizeBeforeEncoding := len(name) + len(payload)
|
|
|
+
|
|
|
+ // Check size before encoding to prevent needless encoding and return early if it's over the specified limit.
|
|
|
+ if payloadSizeBeforeEncoding > s.config.UserEventSizeLimit {
|
|
|
+ return fmt.Errorf(
|
|
|
+ "user event exceeds configured limit of %d bytes before encoding",
|
|
|
+ s.config.UserEventSizeLimit,
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ if payloadSizeBeforeEncoding > UserEventSizeLimit {
|
|
|
+ return fmt.Errorf(
|
|
|
+ "user event exceeds sane limit of %d bytes before encoding",
|
|
|
+ UserEventSizeLimit,
|
|
|
+ )
|
|
|
}
|
|
|
|
|
|
// Create a message
|
|
@@ -454,16 +469,34 @@ func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error {
|
|
|
Payload: payload,
|
|
|
CC: coalesce,
|
|
|
}
|
|
|
- s.eventClock.Increment()
|
|
|
-
|
|
|
- // Process update locally
|
|
|
- s.handleUserEvent(&msg)
|
|
|
|
|
|
// Start broadcasting the event
|
|
|
raw, err := encodeMessage(messageUserEventType, &msg)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+
|
|
|
+ // Check the size after encoding to be sure again that
|
|
|
+ // we're not attempting to send over the specified size limit.
|
|
|
+ if len(raw) > s.config.UserEventSizeLimit {
|
|
|
+ return fmt.Errorf(
|
|
|
+ "encoded user event exceeds configured limit of %d bytes after encoding",
|
|
|
+ s.config.UserEventSizeLimit,
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(raw) > UserEventSizeLimit {
|
|
|
+ return fmt.Errorf(
|
|
|
+ "encoded user event exceeds sane limit of %d bytes before encoding",
|
|
|
+ UserEventSizeLimit,
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ s.eventClock.Increment()
|
|
|
+
|
|
|
+ // Process update locally
|
|
|
+ s.handleUserEvent(&msg)
|
|
|
+
|
|
|
s.eventBroadcasts.QueueBroadcast(&broadcast{
|
|
|
msg: raw,
|
|
|
})
|
|
@@ -748,15 +781,26 @@ func (s *Serf) Members() []Member {
|
|
|
return members
|
|
|
}
|
|
|
|
|
|
-// RemoveFailedNode forcibly removes a failed node from the cluster
|
|
|
+// RemoveFailedNode is a backwards compatible form
|
|
|
+// of forceleave
|
|
|
+func (s *Serf) RemoveFailedNode(node string) error {
|
|
|
+ return s.forceLeave(node, false)
|
|
|
+}
|
|
|
+
|
|
|
+func (s *Serf) RemoveFailedNodePrune(node string) error {
|
|
|
+ return s.forceLeave(node, true)
|
|
|
+}
|
|
|
+
|
|
|
+// ForceLeave forcibly removes a failed node from the cluster
|
|
|
// immediately, instead of waiting for the reaper to eventually reclaim it.
|
|
|
// This also has the effect that Serf will no longer attempt to reconnect
|
|
|
// to this node.
|
|
|
-func (s *Serf) RemoveFailedNode(node string) error {
|
|
|
+func (s *Serf) forceLeave(node string, prune bool) error {
|
|
|
// Construct the message to broadcast
|
|
|
msg := messageLeave{
|
|
|
LTime: s.clock.Time(),
|
|
|
Node: node,
|
|
|
+ Prune: prune,
|
|
|
}
|
|
|
s.clock.Increment()
|
|
|
|
|
@@ -1027,6 +1071,7 @@ func (s *Serf) handleNodeUpdate(n *memberlist.Node) {
|
|
|
|
|
|
// handleNodeLeaveIntent is called when an intent to leave is received.
|
|
|
func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
+
|
|
|
// Witness a potentially newer time
|
|
|
s.clock.Witness(leaveMsg.LTime)
|
|
|
|
|
@@ -1057,6 +1102,10 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
case StatusAlive:
|
|
|
member.Status = StatusLeaving
|
|
|
member.statusLTime = leaveMsg.LTime
|
|
|
+
|
|
|
+ if leaveMsg.Prune {
|
|
|
+ s.handlePrune(member)
|
|
|
+ }
|
|
|
return true
|
|
|
case StatusFailed:
|
|
|
member.Status = StatusLeft
|
|
@@ -1065,6 +1114,7 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
// Remove from the failed list and add to the left list. We add
|
|
|
// to the left list so that when we do a sync, other nodes will
|
|
|
// remove it from their failed list.
|
|
|
+
|
|
|
s.failedMembers = removeOldMember(s.failedMembers, member.Name)
|
|
|
s.leftMembers = append(s.leftMembers, member)
|
|
|
|
|
@@ -1079,12 +1129,40 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
Members: []Member{member.Member},
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if leaveMsg.Prune {
|
|
|
+ s.handlePrune(member)
|
|
|
+ }
|
|
|
+
|
|
|
+ return true
|
|
|
+
|
|
|
+ case StatusLeaving, StatusLeft:
|
|
|
+ if leaveMsg.Prune {
|
|
|
+ s.handlePrune(member)
|
|
|
+ }
|
|
|
return true
|
|
|
default:
|
|
|
return false
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// handlePrune waits for nodes that are leaving and then forcibly
|
|
|
+// erases a member from the list of members
|
|
|
+func (s *Serf) handlePrune(member *memberState) {
|
|
|
+ if member.Status == StatusLeaving {
|
|
|
+ time.Sleep(s.config.BroadcastTimeout + s.config.LeavePropagateDelay)
|
|
|
+ }
|
|
|
+
|
|
|
+ s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr)
|
|
|
+
|
|
|
+ //If we are leaving or left we may be in that list of members
|
|
|
+ if member.Status == StatusLeaving || member.Status == StatusLeft {
|
|
|
+ s.leftMembers = removeOldMember(s.leftMembers, member.Name)
|
|
|
+ }
|
|
|
+ s.eraseNode(member)
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
// handleNodeJoinIntent is called when a node broadcasts a
|
|
|
// join message to set the lamport time of its join
|
|
|
func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool {
|
|
@@ -1405,6 +1483,30 @@ func (s *Serf) resolveNodeConflict() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+//eraseNode takes a node completely out of the member list
|
|
|
+func (s *Serf) eraseNode(m *memberState) {
|
|
|
+ // Delete from members
|
|
|
+ delete(s.members, m.Name)
|
|
|
+
|
|
|
+ // Tell the coordinate client the node has gone away and delete
|
|
|
+ // its cached coordinates.
|
|
|
+ if !s.config.DisableCoordinates {
|
|
|
+ s.coordClient.ForgetNode(m.Name)
|
|
|
+
|
|
|
+ s.coordCacheLock.Lock()
|
|
|
+ delete(s.coordCache, m.Name)
|
|
|
+ s.coordCacheLock.Unlock()
|
|
|
+ }
|
|
|
+
|
|
|
+ // Send an event along
|
|
|
+ if s.config.EventCh != nil {
|
|
|
+ s.config.EventCh <- MemberEvent{
|
|
|
+ Type: EventMemberReap,
|
|
|
+ Members: []Member{m.Member},
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// handleReap periodically reaps the list of failed and left members, as well
|
|
|
// as old buffered intents.
|
|
|
func (s *Serf) handleReap() {
|
|
@@ -1455,27 +1557,10 @@ func (s *Serf) reap(old []*memberState, now time.Time, timeout time.Duration) []
|
|
|
n--
|
|
|
i--
|
|
|
|
|
|
- // Delete from members
|
|
|
- delete(s.members, m.Name)
|
|
|
-
|
|
|
- // Tell the coordinate client the node has gone away and delete
|
|
|
- // its cached coordinates.
|
|
|
- if !s.config.DisableCoordinates {
|
|
|
- s.coordClient.ForgetNode(m.Name)
|
|
|
-
|
|
|
- s.coordCacheLock.Lock()
|
|
|
- delete(s.coordCache, m.Name)
|
|
|
- s.coordCacheLock.Unlock()
|
|
|
- }
|
|
|
-
|
|
|
- // Send an event along
|
|
|
+ // Delete from members and send out event
|
|
|
s.logger.Printf("[INFO] serf: EventMemberReap: %s", m.Name)
|
|
|
- if s.config.EventCh != nil {
|
|
|
- s.config.EventCh <- MemberEvent{
|
|
|
- Type: EventMemberReap,
|
|
|
- Members: []Member{m.Member},
|
|
|
- }
|
|
|
- }
|
|
|
+ s.eraseNode(m)
|
|
|
+
|
|
|
}
|
|
|
|
|
|
return old
|