|
@@ -409,7 +409,7 @@ func (n *Node) Run(ctx context.Context) error {
|
|
|
|
|
|
// Save entries to storage
|
|
|
if err := n.saveToStorage(ctx, &raftConfig, rd.HardState, rd.Entries, rd.Snapshot); err != nil {
|
|
|
- log.G(ctx).WithError(err).Error("failed to save entries to storage")
|
|
|
+ return errors.Wrap(err, "failed to save entries to storage")
|
|
|
}
|
|
|
|
|
|
if len(rd.Messages) != 0 {
|
|
@@ -714,11 +714,20 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
|
|
|
defer n.membershipLock.Unlock()
|
|
|
|
|
|
if !n.IsMember() {
|
|
|
- return nil, ErrNoRaftMember
|
|
|
+ return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrNoRaftMember.Error())
|
|
|
}
|
|
|
|
|
|
if !n.isLeader() {
|
|
|
- return nil, ErrLostLeadership
|
|
|
+ return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ // A single manager must not be able to join the raft cluster twice. If
|
|
|
+ // it did, that would cause the quorum to be computed incorrectly. This
|
|
|
+ // could happen if the WAL was deleted from an active manager.
|
|
|
+ for _, m := range n.cluster.Members() {
|
|
|
+ if m.NodeID == nodeInfo.NodeID {
|
|
|
+ return nil, grpc.Errorf(codes.AlreadyExists, "%s", "a raft member with this node ID already exists")
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Find a unique ID for the joining member.
|
|
@@ -738,7 +747,7 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
|
|
|
|
|
|
requestHost, requestPort, err := net.SplitHostPort(remoteAddr)
|
|
|
if err != nil {
|
|
|
- return nil, fmt.Errorf("invalid address %s in raft join request", remoteAddr)
|
|
|
+ return nil, grpc.Errorf(codes.InvalidArgument, "invalid address %s in raft join request", remoteAddr)
|
|
|
}
|
|
|
|
|
|
requestIP := net.ParseIP(requestHost)
|
|
@@ -994,6 +1003,11 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
|
|
|
defer n.stopMu.RUnlock()
|
|
|
|
|
|
if n.IsMember() {
|
|
|
+ if msg.Message.To != n.Config.ID {
|
|
|
+ n.processRaftMessageLogger(ctx, msg).Errorf("received message intended for raft_id %x", msg.Message.To)
|
|
|
+ return &api.ProcessRaftMessageResponse{}, nil
|
|
|
+ }
|
|
|
+
|
|
|
if err := n.raftNode.Step(ctx, *msg.Message); err != nil {
|
|
|
n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("raft Step failed")
|
|
|
}
|