|
@@ -112,6 +112,7 @@ type Node struct {
|
|
|
confState raftpb.ConfState
|
|
|
appliedIndex uint64
|
|
|
snapshotIndex uint64
|
|
|
+ writtenIndex uint64
|
|
|
|
|
|
ticker clock.Ticker
|
|
|
doneCh chan struct{}
|
|
@@ -133,10 +134,11 @@ type Node struct {
|
|
|
// to stop.
|
|
|
stopped chan struct{}
|
|
|
|
|
|
- lastSendToMember map[uint64]chan struct{}
|
|
|
- raftLogger *storage.EncryptedRaftLogger
|
|
|
- keyRotator EncryptionKeyRotator
|
|
|
- rotationQueued bool
|
|
|
+ lastSendToMember map[uint64]chan struct{}
|
|
|
+ raftLogger *storage.EncryptedRaftLogger
|
|
|
+ keyRotator EncryptionKeyRotator
|
|
|
+ rotationQueued bool
|
|
|
+ waitForAppliedIndex uint64
|
|
|
}
|
|
|
|
|
|
// NodeOptions provides node-level options.
|
|
@@ -269,6 +271,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
|
|
|
n.confState = snapshot.Metadata.ConfState
|
|
|
n.appliedIndex = snapshot.Metadata.Index
|
|
|
n.snapshotIndex = snapshot.Metadata.Index
|
|
|
+ n.writtenIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
|
|
|
|
|
|
if loadAndStartErr == storage.ErrNoWAL {
|
|
|
if n.opts.JoinAddr != "" {
|
|
@@ -471,9 +474,12 @@ func (n *Node) Run(ctx context.Context) error {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // in case the previous attempt to update the key failed
|
|
|
+ n.maybeMarkRotationFinished(ctx)
|
|
|
+
|
|
|
// Trigger a snapshot every once in awhile
|
|
|
if n.snapshotInProgress == nil &&
|
|
|
- (n.keyRotator.NeedsRotation() || raftConfig.SnapshotInterval > 0 &&
|
|
|
+ (n.needsSnapshot() || raftConfig.SnapshotInterval > 0 &&
|
|
|
n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval) {
|
|
|
n.doSnapshot(ctx, raftConfig)
|
|
|
}
|
|
@@ -510,13 +516,15 @@ func (n *Node) Run(ctx context.Context) error {
|
|
|
n.snapshotIndex = snapshotIndex
|
|
|
}
|
|
|
n.snapshotInProgress = nil
|
|
|
- if n.rotationQueued {
|
|
|
+ n.maybeMarkRotationFinished(ctx)
|
|
|
+ if n.rotationQueued && n.needsSnapshot() {
|
|
|
// there was a key rotation that took place before while the snapshot
|
|
|
// was in progress - we have to take another snapshot and encrypt with the new key
|
|
|
+ n.rotationQueued = false
|
|
|
n.doSnapshot(ctx, n.getCurrentRaftConfig())
|
|
|
}
|
|
|
case <-n.keyRotator.RotationNotify():
|
|
|
- // There are 2 separate checks: rotationQueued, and keyRotator.NeedsRotation().
|
|
|
+ // There are 2 separate checks: rotationQueued, and n.needsSnapshot().
|
|
|
// We set rotationQueued so that when we are notified of a rotation, we try to
|
|
|
// do a snapshot as soon as possible. However, if there is an error while doing
|
|
|
// the snapshot, we don't want to hammer the node attempting to do snapshots over
|
|
@@ -525,7 +533,7 @@ func (n *Node) Run(ctx context.Context) error {
|
|
|
switch {
|
|
|
case n.snapshotInProgress != nil:
|
|
|
n.rotationQueued = true
|
|
|
- case n.keyRotator.NeedsRotation():
|
|
|
+ case n.needsSnapshot():
|
|
|
n.doSnapshot(ctx, n.getCurrentRaftConfig())
|
|
|
}
|
|
|
case <-n.removeRaftCh:
|
|
@@ -540,6 +548,35 @@ func (n *Node) Run(ctx context.Context) error {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (n *Node) needsSnapshot() bool {
|
|
|
+ if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
|
|
|
+ keys := n.keyRotator.GetKeys()
|
|
|
+ if keys.PendingDEK != nil {
|
|
|
+ n.raftLogger.RotateEncryptionKey(keys.PendingDEK)
|
|
|
+ // we want to wait for the last index written with the old DEK to be commited, else a snapshot taken
|
|
|
+ // may have an index less than the index of a WAL written with an old DEK. We want the next snapshot
|
|
|
+ // written with the new key to supercede any WAL written with an old DEK.
|
|
|
+ n.waitForAppliedIndex = n.writtenIndex
|
|
|
+ // if there is already a snapshot at this index, bump the index up one, because we want the next snapshot
|
|
|
+ if n.waitForAppliedIndex == n.snapshotIndex {
|
|
|
+ n.waitForAppliedIndex++
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
|
|
|
+}
|
|
|
+
|
|
|
+func (n *Node) maybeMarkRotationFinished(ctx context.Context) {
|
|
|
+ if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotIndex {
|
|
|
+ // this means we tried to rotate - so finish the rotation
|
|
|
+ if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil {
|
|
|
+ log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation")
|
|
|
+ } else {
|
|
|
+ n.waitForAppliedIndex = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (n *Node) getCurrentRaftConfig() api.RaftConfig {
|
|
|
raftConfig := DefaultRaftConfig()
|
|
|
n.memoryStore.View(func(readTx store.ReadTx) {
|
|
@@ -1190,6 +1227,13 @@ func (n *Node) saveToStorage(
|
|
|
return ErrApplySnapshot
|
|
|
}
|
|
|
|
|
|
+ if len(entries) > 0 {
|
|
|
+ lastIndex := entries[len(entries)-1].Index
|
|
|
+ if lastIndex > n.writtenIndex {
|
|
|
+ n.writtenIndex = lastIndex
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
if err = n.raftStore.Append(entries); err != nil {
|
|
|
return ErrAppendEntry
|
|
|
}
|