Merge pull request #29232 from aaronlehmann/vendor-swarmkit-999addf

[1.13] Update vendored swarmkit to 999addf
This commit is contained in:
Victor Vieux 2016-12-09 13:51:32 -08:00 committed by GitHub
commit edd9c522b7
9 changed files with 106 additions and 88 deletions

View file

@ -100,7 +100,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster # cluster
github.com/docker/swarmkit 522d951f733c821cdc33cccca6127c15a2b6de38 github.com/docker/swarmkit 999addf86dad33479756c83620ed727ef50bce57
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf v0.3 github.com/gogo/protobuf v0.3
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View file

@ -4,7 +4,6 @@ import (
"errors" "errors"
"path/filepath" "path/filepath"
"reflect" "reflect"
"regexp"
"strconv" "strconv"
"strings" "strings"
@ -26,9 +25,6 @@ var (
errModeChangeNotAllowed = errors.New("service mode change is not allowed") errModeChangeNotAllowed = errors.New("service mode change is not allowed")
) )
// Regexp pattern for hostname to conform RFC 1123
var hostnamePattern = regexp.MustCompile("^(([[:alnum:]]|[[:alnum:]][[:alnum:]\\-]*[[:alnum:]])\\.)*([[:alnum:]]|[[:alnum:]][[:alnum:]\\-]*[[:alnum:]])$")
func validateResources(r *api.Resources) error { func validateResources(r *api.Resources) error {
if r == nil { if r == nil {
return nil return nil
@ -115,10 +111,6 @@ func validateContainerSpec(container *api.ContainerSpec) error {
return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: missing in service spec") return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: missing in service spec")
} }
if err := validateHostname(container.Hostname); err != nil {
return err
}
if container.Image == "" { if container.Image == "" {
return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: image reference must be provided") return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: image reference must be provided")
} }
@ -138,15 +130,6 @@ func validateContainerSpec(container *api.ContainerSpec) error {
return nil return nil
} }
func validateHostname(hostname string) error {
if hostname != "" {
if len(hostname) > 63 || !hostnamePattern.MatchString(hostname) {
return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: %s is not valid hostname", hostname)
}
}
return nil
}
func validateTask(taskSpec api.TaskSpec) error { func validateTask(taskSpec api.TaskSpec) error {
if err := validateResourceRequirements(taskSpec.Resources); err != nil { if err := validateResourceRequirements(taskSpec.Resources); err != nil {
return err return err

View file

@ -152,7 +152,8 @@ func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
defer lb.mu.Unlock() defer lb.mu.Unlock()
delete(lb.registeredSubscriptions, subscription.message.ID) delete(lb.registeredSubscriptions, subscription.message.ID)
subscription.message.Close = true
subscription.Close()
lb.subscriptionQueue.Publish(subscription) lb.subscriptionQueue.Publish(subscription)
} }
@ -321,7 +322,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
case v := <-subscriptionCh: case v := <-subscriptionCh:
subscription := v.(*subscription) subscription := v.(*subscription)
if subscription.message.Close { if subscription.Closed() {
log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed") log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
delete(activeSubscriptions, subscription.message.ID) delete(activeSubscriptions, subscription.message.ID)
} else { } else {

View file

@ -137,6 +137,18 @@ func (s *subscription) Err() error {
return fmt.Errorf("warning: incomplete log stream. some logs could not be retrieved for the following reasons: %s", strings.Join(messages, ", ")) return fmt.Errorf("warning: incomplete log stream. some logs could not be retrieved for the following reasons: %s", strings.Join(messages, ", "))
} }
func (s *subscription) Close() {
s.mu.Lock()
s.message.Close = true
s.mu.Unlock()
}
func (s *subscription) Closed() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.message.Close
}
func (s *subscription) match() { func (s *subscription) match() {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()

View file

@ -385,26 +385,39 @@ func (m *Manager) Run(parent context.Context) error {
close(m.started) close(m.started)
errCh := make(chan error, 1)
go func() { go func() {
err := m.raftNode.Run(ctx) err := m.raftNode.Run(ctx)
if err != nil { if err != nil {
errCh <- err
log.G(ctx).WithError(err).Error("raft node stopped") log.G(ctx).WithError(err).Error("raft node stopped")
m.Stop(ctx) m.Stop(ctx)
} }
}() }()
if err := raft.WaitForLeader(ctx, m.raftNode); err != nil { returnErr := func(err error) error {
select {
case runErr := <-errCh:
if runErr == raft.ErrMemberRemoved {
return runErr
}
default:
}
return err return err
} }
if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
return returnErr(err)
}
c, err := raft.WaitForCluster(ctx, m.raftNode) c, err := raft.WaitForCluster(ctx, m.raftNode)
if err != nil { if err != nil {
return err return returnErr(err)
} }
raftConfig := c.Spec.Raft raftConfig := c.Spec.Raft
if err := m.watchForKEKChanges(ctx); err != nil { if err := m.watchForKEKChanges(ctx); err != nil {
return err return returnErr(err)
} }
if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick { if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick {
@ -423,7 +436,8 @@ func (m *Manager) Run(parent context.Context) error {
} }
m.mu.Unlock() m.mu.Unlock()
m.Stop(ctx) m.Stop(ctx)
return err
return returnErr(err)
} }
const stopTimeout = 8 * time.Second const stopTimeout = 8 * time.Second

View file

@ -109,10 +109,10 @@ type Node struct {
// shutting down the node. // shutting down the node.
waitProp sync.WaitGroup waitProp sync.WaitGroup
confState raftpb.ConfState confState raftpb.ConfState
appliedIndex uint64 appliedIndex uint64
snapshotIndex uint64 snapshotMeta raftpb.SnapshotMetadata
writtenIndex uint64 writtenWALIndex uint64
ticker clock.Ticker ticker clock.Ticker
doneCh chan struct{} doneCh chan struct{}
@ -127,7 +127,7 @@ type Node struct {
// used for membership management checks // used for membership management checks
membershipLock sync.Mutex membershipLock sync.Mutex
snapshotInProgress chan uint64 snapshotInProgress chan raftpb.SnapshotMetadata
asyncTasks sync.WaitGroup asyncTasks sync.WaitGroup
// stopped chan is used for notifying grpc handlers that raft node going // stopped chan is used for notifying grpc handlers that raft node going
@ -270,8 +270,8 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
n.confState = snapshot.Metadata.ConfState n.confState = snapshot.Metadata.ConfState
n.appliedIndex = snapshot.Metadata.Index n.appliedIndex = snapshot.Metadata.Index
n.snapshotIndex = snapshot.Metadata.Index n.snapshotMeta = snapshot.Metadata
n.writtenIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
if loadAndStartErr == storage.ErrNoWAL { if loadAndStartErr == storage.ErrNoWAL {
if n.opts.JoinAddr != "" { if n.opts.JoinAddr != "" {
@ -428,7 +428,7 @@ func (n *Node) Run(ctx context.Context) error {
log.G(ctx).WithError(err).Error("failed to restore from snapshot") log.G(ctx).WithError(err).Error("failed to restore from snapshot")
} }
n.appliedIndex = rd.Snapshot.Metadata.Index n.appliedIndex = rd.Snapshot.Metadata.Index
n.snapshotIndex = rd.Snapshot.Metadata.Index n.snapshotMeta = rd.Snapshot.Metadata
n.confState = rd.Snapshot.Metadata.ConfState n.confState = rd.Snapshot.Metadata.ConfState
} }
@ -479,8 +479,8 @@ func (n *Node) Run(ctx context.Context) error {
// Trigger a snapshot every once in awhile // Trigger a snapshot every once in awhile
if n.snapshotInProgress == nil && if n.snapshotInProgress == nil &&
(n.needsSnapshot() || raftConfig.SnapshotInterval > 0 && (n.needsSnapshot(ctx) || raftConfig.SnapshotInterval > 0 &&
n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval) { n.appliedIndex-n.snapshotMeta.Index >= raftConfig.SnapshotInterval) {
n.doSnapshot(ctx, raftConfig) n.doSnapshot(ctx, raftConfig)
} }
@ -511,17 +511,21 @@ func (n *Node) Run(ctx context.Context) error {
} }
} }
case snapshotIndex := <-n.snapshotInProgress: case snapshotMeta := <-n.snapshotInProgress:
if snapshotIndex > n.snapshotIndex { raftConfig := n.getCurrentRaftConfig()
n.snapshotIndex = snapshotIndex if snapshotMeta.Index > n.snapshotMeta.Index {
n.snapshotMeta = snapshotMeta
if err := n.raftLogger.GC(snapshotMeta.Index, snapshotMeta.Term, raftConfig.KeepOldSnapshots); err != nil {
log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs")
}
} }
n.snapshotInProgress = nil n.snapshotInProgress = nil
n.maybeMarkRotationFinished(ctx) n.maybeMarkRotationFinished(ctx)
if n.rotationQueued && n.needsSnapshot() { if n.rotationQueued && n.needsSnapshot(ctx) {
// there was a key rotation that took place before while the snapshot // there was a key rotation that took place before while the snapshot
// was in progress - we have to take another snapshot and encrypt with the new key // was in progress - we have to take another snapshot and encrypt with the new key
n.rotationQueued = false n.rotationQueued = false
n.doSnapshot(ctx, n.getCurrentRaftConfig()) n.doSnapshot(ctx, raftConfig)
} }
case <-n.keyRotator.RotationNotify(): case <-n.keyRotator.RotationNotify():
// There are 2 separate checks: rotationQueued, and n.needsSnapshot(). // There are 2 separate checks: rotationQueued, and n.needsSnapshot().
@ -533,7 +537,7 @@ func (n *Node) Run(ctx context.Context) error {
switch { switch {
case n.snapshotInProgress != nil: case n.snapshotInProgress != nil:
n.rotationQueued = true n.rotationQueued = true
case n.needsSnapshot(): case n.needsSnapshot(ctx):
n.doSnapshot(ctx, n.getCurrentRaftConfig()) n.doSnapshot(ctx, n.getCurrentRaftConfig())
} }
case <-n.removeRaftCh: case <-n.removeRaftCh:
@ -548,7 +552,7 @@ func (n *Node) Run(ctx context.Context) error {
} }
} }
func (n *Node) needsSnapshot() bool { func (n *Node) needsSnapshot(ctx context.Context) bool {
if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() { if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
keys := n.keyRotator.GetKeys() keys := n.keyRotator.GetKeys()
if keys.PendingDEK != nil { if keys.PendingDEK != nil {
@ -556,23 +560,40 @@ func (n *Node) needsSnapshot() bool {
// we want to wait for the last index written with the old DEK to be commited, else a snapshot taken // we want to wait for the last index written with the old DEK to be commited, else a snapshot taken
// may have an index less than the index of a WAL written with an old DEK. We want the next snapshot // may have an index less than the index of a WAL written with an old DEK. We want the next snapshot
// written with the new key to supercede any WAL written with an old DEK. // written with the new key to supercede any WAL written with an old DEK.
n.waitForAppliedIndex = n.writtenIndex n.waitForAppliedIndex = n.writtenWALIndex
// if there is already a snapshot at this index, bump the index up one, because we want the next snapshot // if there is already a snapshot at this index or higher, bump the wait index up to 1 higher than the current
if n.waitForAppliedIndex == n.snapshotIndex { // snapshot index, because the rotation cannot be completed until the next snapshot
n.waitForAppliedIndex++ if n.waitForAppliedIndex <= n.snapshotMeta.Index {
n.waitForAppliedIndex = n.snapshotMeta.Index + 1
} }
log.G(ctx).Debugf(
"beginning raft DEK rotation - last indices written with the old key are (snapshot: %d, WAL: %d) - waiting for snapshot of index %d to be written before rotation can be completed", n.snapshotMeta.Index, n.writtenWALIndex, n.waitForAppliedIndex)
} }
} }
return n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
result := n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
if result {
log.G(ctx).Debugf(
"a snapshot at index %d is needed in order to complete raft DEK rotation - a snapshot with index >= %d can now be triggered",
n.waitForAppliedIndex, n.appliedIndex)
}
return result
} }
func (n *Node) maybeMarkRotationFinished(ctx context.Context) { func (n *Node) maybeMarkRotationFinished(ctx context.Context) {
if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotIndex { if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotMeta.Index {
// this means we tried to rotate - so finish the rotation // this means we tried to rotate - so finish the rotation
if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil { if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil {
log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation") log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation")
} else { } else {
log.G(ctx).Debugf(
"a snapshot with index %d is available, which completes the DEK rotation requiring a snapshot of at least index %d - throwing away DEK and older snapshots encrypted with the old key",
n.snapshotMeta.Index, n.waitForAppliedIndex)
n.waitForAppliedIndex = 0 n.waitForAppliedIndex = 0
if err := n.raftLogger.GC(n.snapshotMeta.Index, n.snapshotMeta.Term, 0); err != nil {
log.G(ctx).WithError(err).Error("failed to remove old snapshots and WALs that were written with the previous raft DEK")
}
} }
} }
} }
@ -954,13 +975,13 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
defer cancel() defer cancel()
if err := member.HealthCheck(healthCtx); err != nil { if err := member.HealthCheck(healthCtx); err != nil {
n.processRaftMessageLogger(ctx, msg).Debug("member which sent vote request failed health check") n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("member which sent vote request failed health check")
return &api.ProcessRaftMessageResponse{}, nil return &api.ProcessRaftMessageResponse{}, nil
} }
} }
if msg.Message.Type == raftpb.MsgProp { if msg.Message.Type == raftpb.MsgProp {
// We don't accepted forwarded proposals. Our // We don't accept forwarded proposals. Our
// current architecture depends on only the leader // current architecture depends on only the leader
// making proposals, so in-flight proposals can be // making proposals, so in-flight proposals can be
// guaranteed not to conflict. // guaranteed not to conflict.
@ -1249,8 +1270,8 @@ func (n *Node) saveToStorage(
if len(entries) > 0 { if len(entries) > 0 {
lastIndex := entries[len(entries)-1].Index lastIndex := entries[len(entries)-1].Index
if lastIndex > n.writtenIndex { if lastIndex > n.writtenWALIndex {
n.writtenIndex = lastIndex n.writtenWALIndex = lastIndex
} }
} }
@ -1300,25 +1321,32 @@ func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership.
defer n.asyncTasks.Done() defer n.asyncTasks.Done()
defer close(thisSend) defer close(thisSend)
ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout)
defer cancel()
if lastSend != nil { if lastSend != nil {
waitCtx, waitCancel := context.WithTimeout(ctx, n.opts.SendTimeout)
defer waitCancel()
select { select {
case <-lastSend: case <-lastSend:
case <-ctx.Done(): case <-waitCtx.Done():
return return
} }
select {
case <-waitCtx.Done():
return
default:
}
} }
ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout)
defer cancel()
if n.cluster.IsIDRemoved(m.To) { if n.cluster.IsIDRemoved(m.To) {
// Should not send to removed members // Should not send to removed members
return return
} }
var ( var conn *membership.Member
conn *membership.Member
)
if toMember, ok := members[m.To]; ok { if toMember, ok := members[m.To]; ok {
conn = toMember conn = toMember
} else { } else {

View file

@ -167,11 +167,11 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
viewStarted := make(chan struct{}) viewStarted := make(chan struct{})
n.asyncTasks.Add(1) n.asyncTasks.Add(1)
n.snapshotInProgress = make(chan uint64, 1) // buffered in case Shutdown is called during the snapshot n.snapshotInProgress = make(chan raftpb.SnapshotMetadata, 1) // buffered in case Shutdown is called during the snapshot
go func(appliedIndex, snapshotIndex uint64) { go func(appliedIndex uint64, snapshotMeta raftpb.SnapshotMetadata) {
defer func() { defer func() {
n.asyncTasks.Done() n.asyncTasks.Done()
n.snapshotInProgress <- snapshotIndex n.snapshotInProgress <- snapshotMeta
}() }()
var err error var err error
n.memoryStore.View(func(tx store.ReadTx) { n.memoryStore.View(func(tx store.ReadTx) {
@ -197,7 +197,7 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
log.G(ctx).WithError(err).Error("failed to save snapshot") log.G(ctx).WithError(err).Error("failed to save snapshot")
return return
} }
snapshotIndex = appliedIndex snapshotMeta = snap.Metadata
if appliedIndex > raftConfig.LogEntriesForSlowFollowers { if appliedIndex > raftConfig.LogEntriesForSlowFollowers {
err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers) err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers)
@ -205,14 +205,10 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
log.G(ctx).WithError(err).Error("failed to compact snapshot") log.G(ctx).WithError(err).Error("failed to compact snapshot")
} }
} }
if err := n.raftLogger.GC(snap.Metadata.Index, snap.Metadata.Term, raftConfig.KeepOldSnapshots); err != nil {
log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs")
}
} else if err != raft.ErrSnapOutOfDate { } else if err != raft.ErrSnapOutOfDate {
log.G(ctx).WithError(err).Error("failed to create snapshot") log.G(ctx).WithError(err).Error("failed to create snapshot")
} }
}(n.appliedIndex, n.snapshotIndex) }(n.appliedIndex, n.snapshotMeta)
// Wait for the goroutine to establish a read transaction, to make // Wait for the goroutine to establish a read transaction, to make
// sure it sees the state as of this moment. // sure it sees the state as of this moment.

View file

@ -2,7 +2,6 @@ package storage
import ( import (
"fmt" "fmt"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
@ -358,7 +357,7 @@ func (e *EncryptedRaftLogger) Close(ctx context.Context) {
e.snapshotter = nil e.snapshotter = nil
} }
// Clear closes the existing WAL and moves away the WAL and snapshot. // Clear closes the existing WAL and removes the WAL and snapshot.
func (e *EncryptedRaftLogger) Clear(ctx context.Context) error { func (e *EncryptedRaftLogger) Clear(ctx context.Context) error {
e.encoderMu.Lock() e.encoderMu.Lock()
defer e.encoderMu.Unlock() defer e.encoderMu.Unlock()
@ -370,23 +369,7 @@ func (e *EncryptedRaftLogger) Clear(ctx context.Context) error {
} }
e.snapshotter = nil e.snapshotter = nil
newWALDir, err := ioutil.TempDir(e.StateDir, "wal.") os.RemoveAll(e.walDir())
if err != nil { os.RemoveAll(e.snapDir())
return err
}
os.RemoveAll(newWALDir)
if err = os.Rename(e.walDir(), newWALDir); err != nil {
return err
}
newSnapDir, err := ioutil.TempDir(e.StateDir, "snap.")
if err != nil {
return err
}
os.RemoveAll(newSnapDir)
if err := os.Rename(e.snapDir(), newSnapDir); err != nil {
return err
}
return nil return nil
} }

View file

@ -22,6 +22,7 @@ import (
"github.com/docker/swarmkit/log" "github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager" "github.com/docker/swarmkit/manager"
"github.com/docker/swarmkit/manager/encryption" "github.com/docker/swarmkit/manager/encryption"
"github.com/docker/swarmkit/manager/state/raft"
"github.com/docker/swarmkit/remotes" "github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/xnet" "github.com/docker/swarmkit/xnet"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -718,7 +719,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
case <-done: case <-done:
// Fail out if m.Run() returns error, otherwise wait for // Fail out if m.Run() returns error, otherwise wait for
// role change. // role change.
if runErr != nil { if runErr != nil && runErr != raft.ErrMemberRemoved {
err = runErr err = runErr
} else { } else {
err = <-roleChanged err = <-roleChanged