diff --git a/vendor.conf b/vendor.conf index b2fb3f4c74..c0e9e91883 100644 --- a/vendor.conf +++ b/vendor.conf @@ -100,7 +100,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 # cluster -github.com/docker/swarmkit a2080913b9cf2cac309845a28902896d65d3f527 +github.com/docker/swarmkit 1b2daeccc519c894044f6f72a40d8d5662dc6bf6 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 github.com/gogo/protobuf v0.3 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go index c3ff5a8432..923dddf3e1 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go @@ -112,6 +112,7 @@ type Node struct { confState raftpb.ConfState appliedIndex uint64 snapshotIndex uint64 + writtenIndex uint64 ticker clock.Ticker doneCh chan struct{} @@ -133,10 +134,11 @@ type Node struct { // to stop. stopped chan struct{} - lastSendToMember map[uint64]chan struct{} - raftLogger *storage.EncryptedRaftLogger - keyRotator EncryptionKeyRotator - rotationQueued bool + lastSendToMember map[uint64]chan struct{} + raftLogger *storage.EncryptedRaftLogger + keyRotator EncryptionKeyRotator + rotationQueued bool + waitForAppliedIndex uint64 } // NodeOptions provides node-level options. @@ -269,6 +271,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { n.confState = snapshot.Metadata.ConfState n.appliedIndex = snapshot.Metadata.Index n.snapshotIndex = snapshot.Metadata.Index + n.writtenIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error if loadAndStartErr == storage.ErrNoWAL { if n.opts.JoinAddr != "" { @@ -471,9 +474,12 @@ func (n *Node) Run(ctx context.Context) error { } } + // in case the previous attempt to update the key failed + n.maybeMarkRotationFinished(ctx) + // Trigger a snapshot every once in awhile if n.snapshotInProgress == nil && - (n.keyRotator.NeedsRotation() || raftConfig.SnapshotInterval > 0 && + (n.needsSnapshot() || raftConfig.SnapshotInterval > 0 && n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval) { n.doSnapshot(ctx, raftConfig) } @@ -510,13 +516,15 @@ func (n *Node) Run(ctx context.Context) error { n.snapshotIndex = snapshotIndex } n.snapshotInProgress = nil - if n.rotationQueued { + n.maybeMarkRotationFinished(ctx) + if n.rotationQueued && n.needsSnapshot() { // there was a key rotation that took place before while the snapshot // was in progress - we have to take another snapshot and encrypt with the new key + n.rotationQueued = false n.doSnapshot(ctx, n.getCurrentRaftConfig()) } case <-n.keyRotator.RotationNotify(): - // There are 2 separate checks: rotationQueued, and keyRotator.NeedsRotation(). + // There are 2 separate checks: rotationQueued, and n.needsSnapshot(). // We set rotationQueued so that when we are notified of a rotation, we try to // do a snapshot as soon as possible. However, if there is an error while doing // the snapshot, we don't want to hammer the node attempting to do snapshots over @@ -525,7 +533,7 @@ func (n *Node) Run(ctx context.Context) error { switch { case n.snapshotInProgress != nil: n.rotationQueued = true - case n.keyRotator.NeedsRotation(): + case n.needsSnapshot(): n.doSnapshot(ctx, n.getCurrentRaftConfig()) } case <-n.removeRaftCh: @@ -540,6 +548,35 @@ func (n *Node) Run(ctx context.Context) error { } } +func (n *Node) needsSnapshot() bool { + if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() { + keys := n.keyRotator.GetKeys() + if keys.PendingDEK != nil { + n.raftLogger.RotateEncryptionKey(keys.PendingDEK) + // we want to wait for the last index written with the old DEK to be commited, else a snapshot taken + // may have an index less than the index of a WAL written with an old DEK. We want the next snapshot + // written with the new key to supercede any WAL written with an old DEK. + n.waitForAppliedIndex = n.writtenIndex + // if there is already a snapshot at this index, bump the index up one, because we want the next snapshot + if n.waitForAppliedIndex == n.snapshotIndex { + n.waitForAppliedIndex++ + } + } + } + return n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex +} + +func (n *Node) maybeMarkRotationFinished(ctx context.Context) { + if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotIndex { + // this means we tried to rotate - so finish the rotation + if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil { + log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation") + } else { + n.waitForAppliedIndex = 0 + } + } +} + func (n *Node) getCurrentRaftConfig() api.RaftConfig { raftConfig := DefaultRaftConfig() n.memoryStore.View(func(readTx store.ReadTx) { @@ -1190,6 +1227,13 @@ func (n *Node) saveToStorage( return ErrApplySnapshot } + if len(entries) > 0 { + lastIndex := entries[len(entries)-1].Index + if lastIndex > n.writtenIndex { + n.writtenIndex = lastIndex + } + } + if err = n.raftStore.Append(entries); err != nil { return ErrAppendEntry } diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go b/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go index 8a7dd422c1..a763c03dce 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go @@ -165,17 +165,6 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) { } snapshot.Membership.Removed = n.cluster.Removed() - // maybe start rotation - n.rotationQueued = false - var newEncryptionKeys *EncryptionKeys - if n.keyRotator.NeedsRotation() { - keys := n.keyRotator.GetKeys() - if keys.PendingDEK != nil { - n.raftLogger.RotateEncryptionKey(keys.PendingDEK) - newEncryptionKeys = &EncryptionKeys{CurrentDEK: keys.PendingDEK} - } - } - viewStarted := make(chan struct{}) n.asyncTasks.Add(1) n.snapshotInProgress = make(chan uint64, 1) // buffered in case Shutdown is called during the snapshot @@ -209,13 +198,6 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) { return } snapshotIndex = appliedIndex - if newEncryptionKeys != nil { - // this means we tried to rotate - so finish the rotation - if err := n.keyRotator.UpdateKeys(*newEncryptionKeys); err != nil { - log.G(ctx).WithError(err).Error( - "failed to update encryption keys after a rotation - will wait for the next snapshot") - } - } if appliedIndex > raftConfig.LogEntriesForSlowFollowers { err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers)