|
@@ -105,10 +105,10 @@ type Node struct {
|
|
// shutting down the node.
|
|
// shutting down the node.
|
|
waitProp sync.WaitGroup
|
|
waitProp sync.WaitGroup
|
|
|
|
|
|
- confState raftpb.ConfState
|
|
|
|
- appliedIndex uint64
|
|
|
|
- snapshotIndex uint64
|
|
|
|
- writtenIndex uint64
|
|
|
|
|
|
+ confState raftpb.ConfState
|
|
|
|
+ appliedIndex uint64
|
|
|
|
+ snapshotMeta raftpb.SnapshotMetadata
|
|
|
|
+ writtenWALIndex uint64
|
|
|
|
|
|
ticker clock.Ticker
|
|
ticker clock.Ticker
|
|
doneCh chan struct{}
|
|
doneCh chan struct{}
|
|
@@ -123,7 +123,7 @@ type Node struct {
|
|
// used for membership management checks
|
|
// used for membership management checks
|
|
membershipLock sync.Mutex
|
|
membershipLock sync.Mutex
|
|
|
|
|
|
- snapshotInProgress chan uint64
|
|
|
|
|
|
+ snapshotInProgress chan raftpb.SnapshotMetadata
|
|
asyncTasks sync.WaitGroup
|
|
asyncTasks sync.WaitGroup
|
|
|
|
|
|
// stopped chan is used for notifying grpc handlers that raft node going
|
|
// stopped chan is used for notifying grpc handlers that raft node going
|
|
@@ -266,8 +266,8 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
|
|
|
|
|
|
n.confState = snapshot.Metadata.ConfState
|
|
n.confState = snapshot.Metadata.ConfState
|
|
n.appliedIndex = snapshot.Metadata.Index
|
|
n.appliedIndex = snapshot.Metadata.Index
|
|
- n.snapshotIndex = snapshot.Metadata.Index
|
|
|
|
- n.writtenIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
|
|
|
|
|
|
+ n.snapshotMeta = snapshot.Metadata
|
|
|
|
+ n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
|
|
|
|
|
|
if loadAndStartErr == storage.ErrNoWAL {
|
|
if loadAndStartErr == storage.ErrNoWAL {
|
|
if n.opts.JoinAddr != "" {
|
|
if n.opts.JoinAddr != "" {
|
|
@@ -424,7 +424,7 @@ func (n *Node) Run(ctx context.Context) error {
|
|
log.G(ctx).WithError(err).Error("failed to restore from snapshot")
|
|
log.G(ctx).WithError(err).Error("failed to restore from snapshot")
|
|
}
|
|
}
|
|
n.appliedIndex = rd.Snapshot.Metadata.Index
|
|
n.appliedIndex = rd.Snapshot.Metadata.Index
|
|
- n.snapshotIndex = rd.Snapshot.Metadata.Index
|
|
|
|
|
|
+ n.snapshotMeta = rd.Snapshot.Metadata
|
|
n.confState = rd.Snapshot.Metadata.ConfState
|
|
n.confState = rd.Snapshot.Metadata.ConfState
|
|
}
|
|
}
|
|
|
|
|
|
@@ -475,8 +475,8 @@ func (n *Node) Run(ctx context.Context) error {
|
|
|
|
|
|
// Trigger a snapshot every once in awhile
|
|
// Trigger a snapshot every once in awhile
|
|
if n.snapshotInProgress == nil &&
|
|
if n.snapshotInProgress == nil &&
|
|
- (n.needsSnapshot() || raftConfig.SnapshotInterval > 0 &&
|
|
|
|
- n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval) {
|
|
|
|
|
|
+ (n.needsSnapshot(ctx) || raftConfig.SnapshotInterval > 0 &&
|
|
|
|
+ n.appliedIndex-n.snapshotMeta.Index >= raftConfig.SnapshotInterval) {
|
|
n.doSnapshot(ctx, raftConfig)
|
|
n.doSnapshot(ctx, raftConfig)
|
|
}
|
|
}
|
|
|
|
|
|
@@ -507,17 +507,21 @@ func (n *Node) Run(ctx context.Context) error {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- case snapshotIndex := <-n.snapshotInProgress:
|
|
|
|
- if snapshotIndex > n.snapshotIndex {
|
|
|
|
- n.snapshotIndex = snapshotIndex
|
|
|
|
|
|
+ case snapshotMeta := <-n.snapshotInProgress:
|
|
|
|
+ raftConfig := n.getCurrentRaftConfig()
|
|
|
|
+ if snapshotMeta.Index > n.snapshotMeta.Index {
|
|
|
|
+ n.snapshotMeta = snapshotMeta
|
|
|
|
+ if err := n.raftLogger.GC(snapshotMeta.Index, snapshotMeta.Term, raftConfig.KeepOldSnapshots); err != nil {
|
|
|
|
+ log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs")
|
|
|
|
+ }
|
|
}
|
|
}
|
|
n.snapshotInProgress = nil
|
|
n.snapshotInProgress = nil
|
|
n.maybeMarkRotationFinished(ctx)
|
|
n.maybeMarkRotationFinished(ctx)
|
|
- if n.rotationQueued && n.needsSnapshot() {
|
|
|
|
|
|
+ if n.rotationQueued && n.needsSnapshot(ctx) {
|
|
// there was a key rotation that took place before while the snapshot
|
|
// there was a key rotation that took place before while the snapshot
|
|
// was in progress - we have to take another snapshot and encrypt with the new key
|
|
// was in progress - we have to take another snapshot and encrypt with the new key
|
|
n.rotationQueued = false
|
|
n.rotationQueued = false
|
|
- n.doSnapshot(ctx, n.getCurrentRaftConfig())
|
|
|
|
|
|
+ n.doSnapshot(ctx, raftConfig)
|
|
}
|
|
}
|
|
case <-n.keyRotator.RotationNotify():
|
|
case <-n.keyRotator.RotationNotify():
|
|
// There are 2 separate checks: rotationQueued, and n.needsSnapshot().
|
|
// There are 2 separate checks: rotationQueued, and n.needsSnapshot().
|
|
@@ -529,7 +533,7 @@ func (n *Node) Run(ctx context.Context) error {
|
|
switch {
|
|
switch {
|
|
case n.snapshotInProgress != nil:
|
|
case n.snapshotInProgress != nil:
|
|
n.rotationQueued = true
|
|
n.rotationQueued = true
|
|
- case n.needsSnapshot():
|
|
|
|
|
|
+ case n.needsSnapshot(ctx):
|
|
n.doSnapshot(ctx, n.getCurrentRaftConfig())
|
|
n.doSnapshot(ctx, n.getCurrentRaftConfig())
|
|
}
|
|
}
|
|
case <-n.removeRaftCh:
|
|
case <-n.removeRaftCh:
|
|
@@ -544,7 +548,7 @@ func (n *Node) Run(ctx context.Context) error {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-func (n *Node) needsSnapshot() bool {
|
|
|
|
|
|
+func (n *Node) needsSnapshot(ctx context.Context) bool {
|
|
if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
|
|
if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
|
|
keys := n.keyRotator.GetKeys()
|
|
keys := n.keyRotator.GetKeys()
|
|
if keys.PendingDEK != nil {
|
|
if keys.PendingDEK != nil {
|
|
@@ -552,23 +556,40 @@ func (n *Node) needsSnapshot() bool {
|
|
// we want to wait for the last index written with the old DEK to be committed, else a snapshot taken
|
|
// we want to wait for the last index written with the old DEK to be committed, else a snapshot taken
|
|
// may have an index less than the index of a WAL written with an old DEK. We want the next snapshot
|
|
// may have an index less than the index of a WAL written with an old DEK. We want the next snapshot
|
|
// written with the new key to supercede any WAL written with an old DEK.
|
|
// written with the new key to supercede any WAL written with an old DEK.
|
|
- n.waitForAppliedIndex = n.writtenIndex
|
|
|
|
- // if there is already a snapshot at this index, bump the index up one, because we want the next snapshot
|
|
|
|
- if n.waitForAppliedIndex == n.snapshotIndex {
|
|
|
|
- n.waitForAppliedIndex++
|
|
|
|
|
|
+ n.waitForAppliedIndex = n.writtenWALIndex
|
|
|
|
+ // if there is already a snapshot at this index or higher, bump the wait index up to 1 higher than the current
|
|
|
|
+ // snapshot index, because the rotation cannot be completed until the next snapshot
|
|
|
|
+ if n.waitForAppliedIndex <= n.snapshotMeta.Index {
|
|
|
|
+ n.waitForAppliedIndex = n.snapshotMeta.Index + 1
|
|
}
|
|
}
|
|
|
|
+ log.G(ctx).Debugf(
|
|
|
|
+ "beginning raft DEK rotation - last indices written with the old key are (snapshot: %d, WAL: %d) - waiting for snapshot of index %d to be written before rotation can be completed", n.snapshotMeta.Index, n.writtenWALIndex, n.waitForAppliedIndex)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- return n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
|
|
|
|
|
|
+
|
|
|
|
+ result := n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
|
|
|
|
+ if result {
|
|
|
|
+ log.G(ctx).Debugf(
|
|
|
|
+ "a snapshot at index %d is needed in order to complete raft DEK rotation - a snapshot with index >= %d can now be triggered",
|
|
|
|
+ n.waitForAppliedIndex, n.appliedIndex)
|
|
|
|
+ }
|
|
|
|
+ return result
|
|
}
|
|
}
|
|
|
|
|
|
func (n *Node) maybeMarkRotationFinished(ctx context.Context) {
|
|
func (n *Node) maybeMarkRotationFinished(ctx context.Context) {
|
|
- if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotIndex {
|
|
|
|
|
|
+ if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotMeta.Index {
|
|
// this means we tried to rotate - so finish the rotation
|
|
// this means we tried to rotate - so finish the rotation
|
|
if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil {
|
|
if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil {
|
|
log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation")
|
|
log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation")
|
|
} else {
|
|
} else {
|
|
|
|
+ log.G(ctx).Debugf(
|
|
|
|
+ "a snapshot with index %d is available, which completes the DEK rotation requiring a snapshot of at least index %d - throwing away DEK and older snapshots encrypted with the old key",
|
|
|
|
+ n.snapshotMeta.Index, n.waitForAppliedIndex)
|
|
n.waitForAppliedIndex = 0
|
|
n.waitForAppliedIndex = 0
|
|
|
|
+
|
|
|
|
+ if err := n.raftLogger.GC(n.snapshotMeta.Index, n.snapshotMeta.Term, 0); err != nil {
|
|
|
|
+ log.G(ctx).WithError(err).Error("failed to remove old snapshots and WALs that were written with the previous raft DEK")
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -950,13 +971,13 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
|
|
defer cancel()
|
|
defer cancel()
|
|
|
|
|
|
if err := member.HealthCheck(healthCtx); err != nil {
|
|
if err := member.HealthCheck(healthCtx); err != nil {
|
|
- n.processRaftMessageLogger(ctx, msg).Debug("member which sent vote request failed health check")
|
|
|
|
|
|
+ n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("member which sent vote request failed health check")
|
|
return &api.ProcessRaftMessageResponse{}, nil
|
|
return &api.ProcessRaftMessageResponse{}, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
if msg.Message.Type == raftpb.MsgProp {
|
|
if msg.Message.Type == raftpb.MsgProp {
|
|
- // We don't accepted forwarded proposals. Our
|
|
|
|
|
|
+ // We don't accept forwarded proposals. Our
|
|
// current architecture depends on only the leader
|
|
// current architecture depends on only the leader
|
|
// making proposals, so in-flight proposals can be
|
|
// making proposals, so in-flight proposals can be
|
|
// guaranteed not to conflict.
|
|
// guaranteed not to conflict.
|
|
@@ -1245,8 +1266,8 @@ func (n *Node) saveToStorage(
|
|
|
|
|
|
if len(entries) > 0 {
|
|
if len(entries) > 0 {
|
|
lastIndex := entries[len(entries)-1].Index
|
|
lastIndex := entries[len(entries)-1].Index
|
|
- if lastIndex > n.writtenIndex {
|
|
|
|
- n.writtenIndex = lastIndex
|
|
|
|
|
|
+ if lastIndex > n.writtenWALIndex {
|
|
|
|
+ n.writtenWALIndex = lastIndex
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1296,25 +1317,32 @@ func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership.
|
|
defer n.asyncTasks.Done()
|
|
defer n.asyncTasks.Done()
|
|
defer close(thisSend)
|
|
defer close(thisSend)
|
|
|
|
|
|
- ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout)
|
|
|
|
- defer cancel()
|
|
|
|
-
|
|
|
|
if lastSend != nil {
|
|
if lastSend != nil {
|
|
|
|
+ waitCtx, waitCancel := context.WithTimeout(ctx, n.opts.SendTimeout)
|
|
|
|
+ defer waitCancel()
|
|
|
|
+
|
|
select {
|
|
select {
|
|
case <-lastSend:
|
|
case <-lastSend:
|
|
- case <-ctx.Done():
|
|
|
|
|
|
+ case <-waitCtx.Done():
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ select {
|
|
|
|
+ case <-waitCtx.Done():
|
|
return
|
|
return
|
|
|
|
+ default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout)
|
|
|
|
+ defer cancel()
|
|
|
|
+
|
|
if n.cluster.IsIDRemoved(m.To) {
|
|
if n.cluster.IsIDRemoved(m.To) {
|
|
// Should not send to removed members
|
|
// Should not send to removed members
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
- var (
|
|
|
|
- conn *membership.Member
|
|
|
|
- )
|
|
|
|
|
|
+ var conn *membership.Member
|
|
if toMember, ok := members[m.To]; ok {
|
|
if toMember, ok := members[m.To]; ok {
|
|
conn = toMember
|
|
conn = toMember
|
|
} else {
|
|
} else {
|