|
@@ -12,6 +12,7 @@ import (
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/credentials"
|
|
|
|
+ "google.golang.org/grpc/peer"
|
|
|
|
|
|
"golang.org/x/net/context"
|
|
"golang.org/x/net/context"
|
|
|
|
|
|
@@ -35,9 +36,6 @@ import (
|
|
)
|
|
)
|
|
|
|
|
|
var (
|
|
var (
|
|
- // ErrHealthCheckFailure is returned when there is an issue with the initial handshake which means
|
|
|
|
- // that the address provided must be invalid or there is ongoing connectivity issues at join time.
|
|
|
|
- ErrHealthCheckFailure = errors.New("raft: could not connect to prospective new cluster member using its advertised address")
|
|
|
|
// ErrNoRaftMember is thrown when the node is not yet part of a raft cluster
|
|
// ErrNoRaftMember is thrown when the node is not yet part of a raft cluster
|
|
ErrNoRaftMember = errors.New("raft: node is not yet part of a raft cluster")
|
|
ErrNoRaftMember = errors.New("raft: node is not yet part of a raft cluster")
|
|
// ErrConfChangeRefused is returned when there is an issue with the configuration change
|
|
// ErrConfChangeRefused is returned when there is an issue with the configuration change
|
|
@@ -62,6 +60,9 @@ var (
|
|
ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
|
|
ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
|
|
// ErrNoClusterLeader is thrown when the cluster has no elected leader
|
|
// ErrNoClusterLeader is thrown when the cluster has no elected leader
|
|
ErrNoClusterLeader = errors.New("raft: no elected cluster leader")
|
|
ErrNoClusterLeader = errors.New("raft: no elected cluster leader")
|
|
|
|
+ // ErrMemberUnknown is sent in response to a message from an
|
|
|
|
+ // unrecognized peer.
|
|
|
|
+ ErrMemberUnknown = errors.New("raft: member unknown")
|
|
)
|
|
)
|
|
|
|
|
|
// LeadershipState indicates whether the node is a leader or follower.
|
|
// LeadershipState indicates whether the node is a leader or follower.
|
|
@@ -317,6 +318,7 @@ func DefaultNodeConfig() *raft.Config {
|
|
MaxSizePerMsg: math.MaxUint16,
|
|
MaxSizePerMsg: math.MaxUint16,
|
|
MaxInflightMsgs: 256,
|
|
MaxInflightMsgs: 256,
|
|
Logger: log.L,
|
|
Logger: log.L,
|
|
|
|
+ CheckQuorum: true,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -670,7 +672,7 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
|
|
// checkHealth tries to contact an aspiring member through its advertised address
|
|
// checkHealth tries to contact an aspiring member through its advertised address
|
|
// and checks if its raft server is running.
|
|
// and checks if its raft server is running.
|
|
func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Duration) error {
|
|
func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Duration) error {
|
|
- conn, err := dial(addr, "tcp", n.tlsCredentials, timeout)
|
|
|
|
|
|
+ conn, err := n.ConnectToMember(addr, timeout)
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
@@ -681,15 +683,10 @@ func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Durati
|
|
ctx = tctx
|
|
ctx = tctx
|
|
}
|
|
}
|
|
|
|
|
|
- client := api.NewHealthClient(conn)
|
|
|
|
- defer conn.Close()
|
|
|
|
|
|
+ defer conn.Conn.Close()
|
|
|
|
|
|
- resp, err := client.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
|
|
|
|
- if err != nil {
|
|
|
|
- return ErrHealthCheckFailure
|
|
|
|
- }
|
|
|
|
- if resp != nil && resp.Status != api.HealthCheckResponse_SERVING {
|
|
|
|
- return ErrHealthCheckFailure
|
|
|
|
|
|
+ if err := conn.HealthCheck(ctx); err != nil {
|
|
|
|
+ return errors.Wrap(err, "could not connect to prospective new cluster member using its advertised address")
|
|
}
|
|
}
|
|
|
|
|
|
return nil
|
|
return nil
|
|
@@ -715,8 +712,30 @@ func (n *Node) addMember(ctx context.Context, addr string, raftID uint64, nodeID
|
|
}
|
|
}
|
|
|
|
|
|
// Wait for a raft round to process the configuration change
|
|
// Wait for a raft round to process the configuration change
|
|
- err = n.configure(ctx, cc)
|
|
|
|
- return err
|
|
|
|
|
|
+ return n.configure(ctx, cc)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// updateMember submits a configuration change to change a member's address.
|
|
|
|
+func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nodeID string) error {
|
|
|
|
+ node := api.RaftMember{
|
|
|
|
+ RaftID: raftID,
|
|
|
|
+ NodeID: nodeID,
|
|
|
|
+ Addr: addr,
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ meta, err := node.Marshal()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ cc := raftpb.ConfChange{
|
|
|
|
+ Type: raftpb.ConfChangeUpdateNode,
|
|
|
|
+ NodeID: raftID,
|
|
|
|
+ Context: meta,
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Wait for a raft round to process the configuration change
|
|
|
|
+ return n.configure(ctx, cc)
|
|
}
|
|
}
|
|
|
|
|
|
// Leave asks to a member of the raft to remove
|
|
// Leave asks to a member of the raft to remove
|
|
@@ -799,7 +818,30 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
|
|
return nil, ErrMemberRemoved
|
|
return nil, ErrMemberRemoved
|
|
}
|
|
}
|
|
|
|
|
|
- n.cluster.ReportActive(msg.Message.From)
|
|
|
|
|
|
+ var sourceHost string
|
|
|
|
+ peer, ok := peer.FromContext(ctx)
|
|
|
|
+ if ok {
|
|
|
|
+ sourceHost, _, _ = net.SplitHostPort(peer.Addr.String())
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ n.cluster.ReportActive(msg.Message.From, sourceHost)
|
|
|
|
+
|
|
|
|
+ // Reject vote requests from unreachable peers
|
|
|
|
+ if msg.Message.Type == raftpb.MsgVote {
|
|
|
|
+ member := n.cluster.GetMember(msg.Message.From)
|
|
|
|
+ if member == nil || member.Conn == nil {
|
|
|
|
+ n.Config.Logger.Errorf("received vote request from unknown member %x", msg.Message.From)
|
|
|
|
+ return nil, ErrMemberUnknown
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ healthCtx, cancel := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval)
|
|
|
|
+ defer cancel()
|
|
|
|
+
|
|
|
|
+ if err := member.HealthCheck(healthCtx); err != nil {
|
|
|
|
+ n.Config.Logger.Warningf("member %x which sent vote request failed health check: %v", msg.Message.From, err)
|
|
|
|
+ return nil, errors.Wrap(err, "member unreachable")
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
if msg.Message.Type == raftpb.MsgProp {
|
|
if msg.Message.Type == raftpb.MsgProp {
|
|
// We don't accepted forwarded proposals. Our
|
|
// We don't accepted forwarded proposals. Our
|
|
@@ -1178,21 +1220,64 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess
|
|
}
|
|
}
|
|
n.ReportUnreachable(m.To)
|
|
n.ReportUnreachable(m.To)
|
|
|
|
|
|
|
|
+ lastSeenHost := n.cluster.LastSeenHost(m.To)
|
|
|
|
+ if lastSeenHost != "" {
|
|
|
|
+ // Check if address has changed
|
|
|
|
+ officialHost, officialPort, _ := net.SplitHostPort(conn.Addr)
|
|
|
|
+ if officialHost != lastSeenHost {
|
|
|
|
+ reconnectAddr := net.JoinHostPort(lastSeenHost, officialPort)
|
|
|
|
+ n.Config.Logger.Warningf("detected address change for %x (%s -> %s)", m.To, conn.Addr, reconnectAddr)
|
|
|
|
+ if err := n.handleAddressChange(conn, reconnectAddr); err != nil {
|
|
|
|
+ n.Config.Logger.Error(err)
|
|
|
|
+ }
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
// Bounce the connection
|
|
// Bounce the connection
|
|
newConn, err := n.ConnectToMember(conn.Addr, 0)
|
|
newConn, err := n.ConnectToMember(conn.Addr, 0)
|
|
if err != nil {
|
|
if err != nil {
|
|
n.Config.Logger.Errorf("could connect to member ID %x at %s: %v", m.To, conn.Addr, err)
|
|
n.Config.Logger.Errorf("could connect to member ID %x at %s: %v", m.To, conn.Addr, err)
|
|
- } else {
|
|
|
|
- err = n.cluster.ReplaceMemberConnection(m.To, conn, newConn)
|
|
|
|
- if err != nil {
|
|
|
|
- newConn.Conn.Close()
|
|
|
|
- }
|
|
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ err = n.cluster.ReplaceMemberConnection(m.To, conn, newConn, conn.Addr, false)
|
|
|
|
+ if err != nil {
|
|
|
|
+ n.Config.Logger.Errorf("failed to replace connection to raft member: %v", err)
|
|
|
|
+ newConn.Conn.Close()
|
|
}
|
|
}
|
|
} else if m.Type == raftpb.MsgSnap {
|
|
} else if m.Type == raftpb.MsgSnap {
|
|
n.ReportSnapshot(m.To, raft.SnapshotFinish)
|
|
n.ReportSnapshot(m.To, raft.SnapshotFinish)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func (n *Node) handleAddressChange(member *membership.Member, reconnectAddr string) error {
|
|
|
|
+ newConn, err := n.ConnectToMember(reconnectAddr, 0)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return errors.Wrapf(err, "could connect to member ID %x at observed address %s", member.RaftID, reconnectAddr)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ healthCtx, cancelHealth := context.WithTimeout(n.Ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval)
|
|
|
|
+ defer cancelHealth()
|
|
|
|
+
|
|
|
|
+ if err := newConn.HealthCheck(healthCtx); err != nil {
|
|
|
|
+ return errors.Wrapf(err, "%x failed health check at observed address %s", member.RaftID, reconnectAddr)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if err := n.cluster.ReplaceMemberConnection(member.RaftID, member, newConn, reconnectAddr, false); err != nil {
|
|
|
|
+ newConn.Conn.Close()
|
|
|
|
+ return errors.Wrap(err, "failed to replace connection to raft member")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // If we're the leader, write the address change to raft
|
|
|
|
+ updateCtx, cancelUpdate := context.WithTimeout(n.Ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval)
|
|
|
|
+ defer cancelUpdate()
|
|
|
|
+ if err := n.updateMember(updateCtx, reconnectAddr, member.RaftID, member.NodeID); err != nil {
|
|
|
|
+ return errors.Wrap(err, "failed to update member address in raft")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
type applyResult struct {
|
|
type applyResult struct {
|
|
resp proto.Message
|
|
resp proto.Message
|
|
err error
|
|
err error
|
|
@@ -1354,6 +1439,8 @@ func (n *Node) processConfChange(entry raftpb.Entry) {
|
|
switch cc.Type {
|
|
switch cc.Type {
|
|
case raftpb.ConfChangeAddNode:
|
|
case raftpb.ConfChangeAddNode:
|
|
err = n.applyAddNode(cc)
|
|
err = n.applyAddNode(cc)
|
|
|
|
+ case raftpb.ConfChangeUpdateNode:
|
|
|
|
+ err = n.applyUpdateNode(cc)
|
|
case raftpb.ConfChangeRemoveNode:
|
|
case raftpb.ConfChangeRemoveNode:
|
|
err = n.applyRemoveNode(cc)
|
|
err = n.applyRemoveNode(cc)
|
|
}
|
|
}
|
|
@@ -1387,6 +1474,43 @@ func (n *Node) applyAddNode(cc raftpb.ConfChange) error {
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// applyUpdateNode is called when we receive a ConfChange from a member in the
|
|
|
|
+// raft cluster which update the address of an existing node.
|
|
|
|
+func (n *Node) applyUpdateNode(cc raftpb.ConfChange) error {
|
|
|
|
+ newMember := &api.RaftMember{}
|
|
|
|
+ err := proto.Unmarshal(cc.Context, newMember)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ oldMember := n.cluster.GetMember(newMember.RaftID)
|
|
|
|
+
|
|
|
|
+ if oldMember == nil {
|
|
|
|
+ return ErrMemberUnknown
|
|
|
|
+ }
|
|
|
|
+ if oldMember.NodeID != newMember.NodeID {
|
|
|
|
+ // Should never happen; this is a sanity check
|
|
|
|
+ n.Config.Logger.Errorf("node ID mismatch on node update (old: %x, new: %x)", oldMember.NodeID, newMember.NodeID)
|
|
|
|
+ return errors.New("node ID mismatch match on node update")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if oldMember.Addr == newMember.Addr || oldMember.Conn == nil {
|
|
|
|
+ // nothing to do
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ newConn, err := n.ConnectToMember(newMember.Addr, 0)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return errors.Errorf("could connect to member ID %x at %s: %v", newMember.RaftID, newMember.Addr, err)
|
|
|
|
+ }
|
|
|
|
+ if err := n.cluster.ReplaceMemberConnection(newMember.RaftID, oldMember, newConn, newMember.Addr, true); err != nil {
|
|
|
|
+ newConn.Conn.Close()
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
// applyRemoveNode is called when we receive a ConfChange
|
|
// applyRemoveNode is called when we receive a ConfChange
|
|
// from a member in the raft cluster, this removes a node
|
|
// from a member in the raft cluster, this removes a node
|
|
// from the existing raft cluster
|
|
// from the existing raft cluster
|