|
@@ -363,39 +363,55 @@ func (n *Node) Run(ctx context.Context) error {
|
|
n.confState = rd.Snapshot.Metadata.ConfState
|
|
n.confState = rd.Snapshot.Metadata.ConfState
|
|
}
|
|
}
|
|
|
|
|
|
- // Process committed entries
|
|
|
|
- for _, entry := range rd.CommittedEntries {
|
|
|
|
- if err := n.processCommitted(entry); err != nil {
|
|
|
|
- n.Config.Logger.Error(err)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ // If we cease to be the leader, we must cancel any
|
|
|
|
+ // proposals that are currently waiting for a quorum to
|
|
|
|
+ // acknowledge them. It is still possible for these to
|
|
|
|
+ // become committed, but if that happens we will apply
|
|
|
|
+ // them as any follower would.
|
|
|
|
|
|
- // Trigger a snapshot every once in awhile
|
|
|
|
- if n.snapshotInProgress == nil &&
|
|
|
|
- raftConfig.SnapshotInterval > 0 &&
|
|
|
|
- n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval {
|
|
|
|
- n.doSnapshot(&raftConfig)
|
|
|
|
- }
|
|
|
|
|
|
+ // It is important that we cancel these proposals before
|
|
|
|
+ // calling processCommitted, so processCommitted does
|
|
|
|
+ // not deadlock.
|
|
|
|
|
|
- // If we cease to be the leader, we must cancel
|
|
|
|
- // any proposals that are currently waiting for
|
|
|
|
- // a quorum to acknowledge them. It is still
|
|
|
|
- // possible for these to become committed, but
|
|
|
|
- // if that happens we will apply them as any
|
|
|
|
- // follower would.
|
|
|
|
if rd.SoftState != nil {
|
|
if rd.SoftState != nil {
|
|
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
|
|
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
|
|
wasLeader = false
|
|
wasLeader = false
|
|
- n.wait.cancelAll()
|
|
|
|
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
|
|
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
|
|
atomic.StoreUint32(&n.signalledLeadership, 0)
|
|
atomic.StoreUint32(&n.signalledLeadership, 0)
|
|
n.leadershipBroadcast.Write(IsFollower)
|
|
n.leadershipBroadcast.Write(IsFollower)
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // It is important that we set n.signalledLeadership to 0
|
|
|
|
+ // before calling n.wait.cancelAll. When a new raft
|
|
|
|
+ // request is registered, it checks n.signalledLeadership
|
|
|
|
+ // afterwards, and cancels the registration if it is 0.
|
|
|
|
+ // If cancelAll was called first, this call might run
|
|
|
|
+ // before the new request registers, but
|
|
|
|
+ // signalledLeadership would be set after the check.
|
|
|
|
+ // Setting signalledLeadership before calling cancelAll
|
|
|
|
+ // ensures that if a new request is registered during
|
|
|
|
+ // this transition, it will either be cancelled by
|
|
|
|
+ // cancelAll, or by its own check of signalledLeadership.
|
|
|
|
+ n.wait.cancelAll()
|
|
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
|
|
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
|
|
wasLeader = true
|
|
wasLeader = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // Process committed entries
|
|
|
|
+ for _, entry := range rd.CommittedEntries {
|
|
|
|
+ if err := n.processCommitted(entry); err != nil {
|
|
|
|
+ n.Config.Logger.Error(err)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Trigger a snapshot every once in awhile
|
|
|
|
+ if n.snapshotInProgress == nil &&
|
|
|
|
+ raftConfig.SnapshotInterval > 0 &&
|
|
|
|
+ n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval {
|
|
|
|
+ n.doSnapshot(&raftConfig)
|
|
|
|
+ }
|
|
|
|
+
|
|
if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
|
|
if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
|
|
// If all the entries in the log have become
|
|
// If all the entries in the log have become
|
|
// committed, broadcast our leadership status.
|
|
// committed, broadcast our leadership status.
|
|
@@ -1129,7 +1145,11 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
|
|
|
|
|
|
r.ID = n.reqIDGen.Next()
|
|
r.ID = n.reqIDGen.Next()
|
|
|
|
|
|
- ch := n.wait.register(r.ID, cb)
|
|
|
|
|
|
+ // This must be derived from the context which is cancelled by stop()
|
|
|
|
+ // to avoid a deadlock on shutdown.
|
|
|
|
+ waitCtx, cancel := context.WithCancel(n.Ctx)
|
|
|
|
+
|
|
|
|
+ ch := n.wait.register(r.ID, cb, cancel)
|
|
|
|
|
|
// Do this check after calling register to avoid a race.
|
|
// Do this check after calling register to avoid a race.
|
|
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
|
|
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
|
|
@@ -1148,24 +1168,19 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
|
|
return nil, ErrRequestTooLarge
|
|
return nil, ErrRequestTooLarge
|
|
}
|
|
}
|
|
|
|
|
|
- // This must use the context which is cancelled by stop() to avoid a
|
|
|
|
- // deadlock on shutdown.
|
|
|
|
- err = n.Propose(n.Ctx, data)
|
|
|
|
|
|
+ err = n.Propose(waitCtx, data)
|
|
if err != nil {
|
|
if err != nil {
|
|
n.wait.cancel(r.ID)
|
|
n.wait.cancel(r.ID)
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
|
|
select {
|
|
select {
|
|
- case x, ok := <-ch:
|
|
|
|
- if ok {
|
|
|
|
- res := x.(*applyResult)
|
|
|
|
- return res.resp, res.err
|
|
|
|
- }
|
|
|
|
- return nil, ErrLostLeadership
|
|
|
|
- case <-n.Ctx.Done():
|
|
|
|
|
|
+ case x := <-ch:
|
|
|
|
+ res := x.(*applyResult)
|
|
|
|
+ return res.resp, res.err
|
|
|
|
+ case <-waitCtx.Done():
|
|
n.wait.cancel(r.ID)
|
|
n.wait.cancel(r.ID)
|
|
- return nil, ErrStopped
|
|
|
|
|
|
+ return nil, ErrLostLeadership
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
n.wait.cancel(r.ID)
|
|
n.wait.cancel(r.ID)
|
|
return nil, ctx.Err()
|
|
return nil, ctx.Err()
|
|
@@ -1177,10 +1192,12 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
|
|
// until the change is performed or there is an error.
|
|
// until the change is performed or there is an error.
|
|
func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error {
|
|
func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error {
|
|
cc.ID = n.reqIDGen.Next()
|
|
cc.ID = n.reqIDGen.Next()
|
|
- ch := n.wait.register(cc.ID, nil)
|
|
|
|
|
|
+
|
|
|
|
+ ctx, cancel := context.WithCancel(ctx)
|
|
|
|
+ ch := n.wait.register(cc.ID, nil, cancel)
|
|
|
|
|
|
if err := n.ProposeConfChange(ctx, cc); err != nil {
|
|
if err := n.ProposeConfChange(ctx, cc); err != nil {
|
|
- n.wait.trigger(cc.ID, nil)
|
|
|
|
|
|
+ n.wait.cancel(cc.ID)
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1194,7 +1211,7 @@ func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error {
|
|
}
|
|
}
|
|
return nil
|
|
return nil
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- n.wait.trigger(cc.ID, nil)
|
|
|
|
|
|
+ n.wait.cancel(cc.ID)
|
|
return ctx.Err()
|
|
return ctx.Err()
|
|
case <-n.Ctx.Done():
|
|
case <-n.Ctx.Done():
|
|
return ErrStopped
|
|
return ErrStopped
|
|
@@ -1237,6 +1254,11 @@ func (n *Node) processEntry(entry raftpb.Entry) error {
|
|
// position and cancelling the transaction. Create a new
|
|
// position and cancelling the transaction. Create a new
|
|
// transaction to commit the data.
|
|
// transaction to commit the data.
|
|
|
|
|
|
|
|
+ // It should not be possible for processInternalRaftRequest
|
|
|
|
+ // to be running in this situation, but out of caution we
|
|
|
|
+ // cancel any current invocations to avoid a deadlock.
|
|
|
|
+ n.wait.cancelAll()
|
|
|
|
+
|
|
err := n.memoryStore.ApplyStoreActions(r.Action)
|
|
err := n.memoryStore.ApplyStoreActions(r.Action)
|
|
if err != nil {
|
|
if err != nil {
|
|
log.G(context.Background()).Errorf("error applying actions from raft: %v", err)
|
|
log.G(context.Background()).Errorf("error applying actions from raft: %v", err)
|