Merge pull request #28680 from cyli/1.13.0-rc2-swarmkit-reendor

Include swarmkik dek rotation fixes in re-vendor
This commit is contained in:
Sebastiaan van Stijn 2016-11-22 10:50:33 +01:00 committed by GitHub
commit 309662d03f
3 changed files with 53 additions and 27 deletions

View file

@ -100,7 +100,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit a2080913b9cf2cac309845a28902896d65d3f527
github.com/docker/swarmkit 1b2daeccc519c894044f6f72a40d8d5662dc6bf6
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf v0.3
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View file

@ -112,6 +112,7 @@ type Node struct {
confState raftpb.ConfState
appliedIndex uint64
snapshotIndex uint64
writtenIndex uint64
ticker clock.Ticker
doneCh chan struct{}
@ -137,6 +138,7 @@ type Node struct {
raftLogger *storage.EncryptedRaftLogger
keyRotator EncryptionKeyRotator
rotationQueued bool
waitForAppliedIndex uint64
}
// NodeOptions provides node-level options.
@ -269,6 +271,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
n.confState = snapshot.Metadata.ConfState
n.appliedIndex = snapshot.Metadata.Index
n.snapshotIndex = snapshot.Metadata.Index
n.writtenIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
if loadAndStartErr == storage.ErrNoWAL {
if n.opts.JoinAddr != "" {
@ -471,9 +474,12 @@ func (n *Node) Run(ctx context.Context) error {
}
}
// in case the previous attempt to update the key failed
n.maybeMarkRotationFinished(ctx)
// Trigger a snapshot every once in awhile
if n.snapshotInProgress == nil &&
(n.keyRotator.NeedsRotation() || raftConfig.SnapshotInterval > 0 &&
(n.needsSnapshot() || raftConfig.SnapshotInterval > 0 &&
n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval) {
n.doSnapshot(ctx, raftConfig)
}
@ -510,13 +516,15 @@ func (n *Node) Run(ctx context.Context) error {
n.snapshotIndex = snapshotIndex
}
n.snapshotInProgress = nil
if n.rotationQueued {
n.maybeMarkRotationFinished(ctx)
if n.rotationQueued && n.needsSnapshot() {
// there was a key rotation that took place before while the snapshot
// was in progress - we have to take another snapshot and encrypt with the new key
n.rotationQueued = false
n.doSnapshot(ctx, n.getCurrentRaftConfig())
}
case <-n.keyRotator.RotationNotify():
// There are 2 separate checks: rotationQueued, and keyRotator.NeedsRotation().
// There are 2 separate checks: rotationQueued, and n.needsSnapshot().
// We set rotationQueued so that when we are notified of a rotation, we try to
// do a snapshot as soon as possible. However, if there is an error while doing
// the snapshot, we don't want to hammer the node attempting to do snapshots over
@ -525,7 +533,7 @@ func (n *Node) Run(ctx context.Context) error {
switch {
case n.snapshotInProgress != nil:
n.rotationQueued = true
case n.keyRotator.NeedsRotation():
case n.needsSnapshot():
n.doSnapshot(ctx, n.getCurrentRaftConfig())
}
case <-n.removeRaftCh:
@ -540,6 +548,35 @@ func (n *Node) Run(ctx context.Context) error {
}
}
func (n *Node) needsSnapshot() bool {
if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
keys := n.keyRotator.GetKeys()
if keys.PendingDEK != nil {
n.raftLogger.RotateEncryptionKey(keys.PendingDEK)
// we want to wait for the last index written with the old DEK to be commited, else a snapshot taken
// may have an index less than the index of a WAL written with an old DEK. We want the next snapshot
// written with the new key to supercede any WAL written with an old DEK.
n.waitForAppliedIndex = n.writtenIndex
// if there is already a snapshot at this index, bump the index up one, because we want the next snapshot
if n.waitForAppliedIndex == n.snapshotIndex {
n.waitForAppliedIndex++
}
}
}
return n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
}
func (n *Node) maybeMarkRotationFinished(ctx context.Context) {
if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotIndex {
// this means we tried to rotate - so finish the rotation
if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil {
log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation")
} else {
n.waitForAppliedIndex = 0
}
}
}
func (n *Node) getCurrentRaftConfig() api.RaftConfig {
raftConfig := DefaultRaftConfig()
n.memoryStore.View(func(readTx store.ReadTx) {
@ -1190,6 +1227,13 @@ func (n *Node) saveToStorage(
return ErrApplySnapshot
}
if len(entries) > 0 {
lastIndex := entries[len(entries)-1].Index
if lastIndex > n.writtenIndex {
n.writtenIndex = lastIndex
}
}
if err = n.raftStore.Append(entries); err != nil {
return ErrAppendEntry
}

View file

@ -165,17 +165,6 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
}
snapshot.Membership.Removed = n.cluster.Removed()
// maybe start rotation
n.rotationQueued = false
var newEncryptionKeys *EncryptionKeys
if n.keyRotator.NeedsRotation() {
keys := n.keyRotator.GetKeys()
if keys.PendingDEK != nil {
n.raftLogger.RotateEncryptionKey(keys.PendingDEK)
newEncryptionKeys = &EncryptionKeys{CurrentDEK: keys.PendingDEK}
}
}
viewStarted := make(chan struct{})
n.asyncTasks.Add(1)
n.snapshotInProgress = make(chan uint64, 1) // buffered in case Shutdown is called during the snapshot
@ -209,13 +198,6 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
return
}
snapshotIndex = appliedIndex
if newEncryptionKeys != nil {
// this means we tried to rotate - so finish the rotation
if err := n.keyRotator.UpdateKeys(*newEncryptionKeys); err != nil {
log.G(ctx).WithError(err).Error(
"failed to update encryption keys after a rotation - will wait for the next snapshot")
}
}
if appliedIndex > raftConfig.LogEntriesForSlowFollowers {
err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers)