Update vendored swarmkit to 999addf

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2016-12-07 17:59:31 -08:00
parent 4d92237de1
commit aca0bdab13
9 changed files with 106 additions and 88 deletions

View file

@ -100,7 +100,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit 522d951f733c821cdc33cccca6127c15a2b6de38
github.com/docker/swarmkit 999addf86dad33479756c83620ed727ef50bce57
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf v0.3
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View file

@ -4,7 +4,6 @@ import (
"errors"
"path/filepath"
"reflect"
"regexp"
"strconv"
"strings"
@ -26,9 +25,6 @@ var (
errModeChangeNotAllowed = errors.New("service mode change is not allowed")
)
// Regexp pattern for hostname to conform RFC 1123
var hostnamePattern = regexp.MustCompile("^(([[:alnum:]]|[[:alnum:]][[:alnum:]\\-]*[[:alnum:]])\\.)*([[:alnum:]]|[[:alnum:]][[:alnum:]\\-]*[[:alnum:]])$")
func validateResources(r *api.Resources) error {
if r == nil {
return nil
@ -115,10 +111,6 @@ func validateContainerSpec(container *api.ContainerSpec) error {
return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: missing in service spec")
}
if err := validateHostname(container.Hostname); err != nil {
return err
}
if container.Image == "" {
return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: image reference must be provided")
}
@ -138,15 +130,6 @@ func validateContainerSpec(container *api.ContainerSpec) error {
return nil
}
func validateHostname(hostname string) error {
if hostname != "" {
if len(hostname) > 63 || !hostnamePattern.MatchString(hostname) {
return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: %s is not valid hostname", hostname)
}
}
return nil
}
func validateTask(taskSpec api.TaskSpec) error {
if err := validateResourceRequirements(taskSpec.Resources); err != nil {
return err

View file

@ -152,7 +152,8 @@ func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
defer lb.mu.Unlock()
delete(lb.registeredSubscriptions, subscription.message.ID)
subscription.message.Close = true
subscription.Close()
lb.subscriptionQueue.Publish(subscription)
}
@ -321,7 +322,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
case v := <-subscriptionCh:
subscription := v.(*subscription)
if subscription.message.Close {
if subscription.Closed() {
log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
delete(activeSubscriptions, subscription.message.ID)
} else {

View file

@ -137,6 +137,18 @@ func (s *subscription) Err() error {
return fmt.Errorf("warning: incomplete log stream. some logs could not be retrieved for the following reasons: %s", strings.Join(messages, ", "))
}
func (s *subscription) Close() {
s.mu.Lock()
s.message.Close = true
s.mu.Unlock()
}
func (s *subscription) Closed() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.message.Close
}
func (s *subscription) match() {
s.mu.Lock()
defer s.mu.Unlock()

View file

@ -385,26 +385,39 @@ func (m *Manager) Run(parent context.Context) error {
close(m.started)
errCh := make(chan error, 1)
go func() {
err := m.raftNode.Run(ctx)
if err != nil {
errCh <- err
log.G(ctx).WithError(err).Error("raft node stopped")
m.Stop(ctx)
}
}()
if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
returnErr := func(err error) error {
select {
case runErr := <-errCh:
if runErr == raft.ErrMemberRemoved {
return runErr
}
default:
}
return err
}
if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
return returnErr(err)
}
c, err := raft.WaitForCluster(ctx, m.raftNode)
if err != nil {
return err
return returnErr(err)
}
raftConfig := c.Spec.Raft
if err := m.watchForKEKChanges(ctx); err != nil {
return err
return returnErr(err)
}
if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick {
@ -423,7 +436,8 @@ func (m *Manager) Run(parent context.Context) error {
}
m.mu.Unlock()
m.Stop(ctx)
return err
return returnErr(err)
}
const stopTimeout = 8 * time.Second

View file

@ -109,10 +109,10 @@ type Node struct {
// shutting down the node.
waitProp sync.WaitGroup
confState raftpb.ConfState
appliedIndex uint64
snapshotIndex uint64
writtenIndex uint64
confState raftpb.ConfState
appliedIndex uint64
snapshotMeta raftpb.SnapshotMetadata
writtenWALIndex uint64
ticker clock.Ticker
doneCh chan struct{}
@ -127,7 +127,7 @@ type Node struct {
// used for membership management checks
membershipLock sync.Mutex
snapshotInProgress chan uint64
snapshotInProgress chan raftpb.SnapshotMetadata
asyncTasks sync.WaitGroup
// stopped chan is used for notifying grpc handlers that raft node going
@ -270,8 +270,8 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
n.confState = snapshot.Metadata.ConfState
n.appliedIndex = snapshot.Metadata.Index
n.snapshotIndex = snapshot.Metadata.Index
n.writtenIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
n.snapshotMeta = snapshot.Metadata
n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
if loadAndStartErr == storage.ErrNoWAL {
if n.opts.JoinAddr != "" {
@ -428,7 +428,7 @@ func (n *Node) Run(ctx context.Context) error {
log.G(ctx).WithError(err).Error("failed to restore from snapshot")
}
n.appliedIndex = rd.Snapshot.Metadata.Index
n.snapshotIndex = rd.Snapshot.Metadata.Index
n.snapshotMeta = rd.Snapshot.Metadata
n.confState = rd.Snapshot.Metadata.ConfState
}
@ -479,8 +479,8 @@ func (n *Node) Run(ctx context.Context) error {
// Trigger a snapshot every once in awhile
if n.snapshotInProgress == nil &&
(n.needsSnapshot() || raftConfig.SnapshotInterval > 0 &&
n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval) {
(n.needsSnapshot(ctx) || raftConfig.SnapshotInterval > 0 &&
n.appliedIndex-n.snapshotMeta.Index >= raftConfig.SnapshotInterval) {
n.doSnapshot(ctx, raftConfig)
}
@ -511,17 +511,21 @@ func (n *Node) Run(ctx context.Context) error {
}
}
case snapshotIndex := <-n.snapshotInProgress:
if snapshotIndex > n.snapshotIndex {
n.snapshotIndex = snapshotIndex
case snapshotMeta := <-n.snapshotInProgress:
raftConfig := n.getCurrentRaftConfig()
if snapshotMeta.Index > n.snapshotMeta.Index {
n.snapshotMeta = snapshotMeta
if err := n.raftLogger.GC(snapshotMeta.Index, snapshotMeta.Term, raftConfig.KeepOldSnapshots); err != nil {
log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs")
}
}
n.snapshotInProgress = nil
n.maybeMarkRotationFinished(ctx)
if n.rotationQueued && n.needsSnapshot() {
if n.rotationQueued && n.needsSnapshot(ctx) {
// there was a key rotation that took place before while the snapshot
// was in progress - we have to take another snapshot and encrypt with the new key
n.rotationQueued = false
n.doSnapshot(ctx, n.getCurrentRaftConfig())
n.doSnapshot(ctx, raftConfig)
}
case <-n.keyRotator.RotationNotify():
// There are 2 separate checks: rotationQueued, and n.needsSnapshot().
@ -533,7 +537,7 @@ func (n *Node) Run(ctx context.Context) error {
switch {
case n.snapshotInProgress != nil:
n.rotationQueued = true
case n.needsSnapshot():
case n.needsSnapshot(ctx):
n.doSnapshot(ctx, n.getCurrentRaftConfig())
}
case <-n.removeRaftCh:
@ -548,7 +552,7 @@ func (n *Node) Run(ctx context.Context) error {
}
}
func (n *Node) needsSnapshot() bool {
func (n *Node) needsSnapshot(ctx context.Context) bool {
if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
keys := n.keyRotator.GetKeys()
if keys.PendingDEK != nil {
@ -556,23 +560,40 @@ func (n *Node) needsSnapshot() bool {
// we want to wait for the last index written with the old DEK to be commited, else a snapshot taken
// may have an index less than the index of a WAL written with an old DEK. We want the next snapshot
// written with the new key to supercede any WAL written with an old DEK.
n.waitForAppliedIndex = n.writtenIndex
// if there is already a snapshot at this index, bump the index up one, because we want the next snapshot
if n.waitForAppliedIndex == n.snapshotIndex {
n.waitForAppliedIndex++
n.waitForAppliedIndex = n.writtenWALIndex
// if there is already a snapshot at this index or higher, bump the wait index up to 1 higher than the current
// snapshot index, because the rotation cannot be completed until the next snapshot
if n.waitForAppliedIndex <= n.snapshotMeta.Index {
n.waitForAppliedIndex = n.snapshotMeta.Index + 1
}
log.G(ctx).Debugf(
"beginning raft DEK rotation - last indices written with the old key are (snapshot: %d, WAL: %d) - waiting for snapshot of index %d to be written before rotation can be completed", n.snapshotMeta.Index, n.writtenWALIndex, n.waitForAppliedIndex)
}
}
return n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
result := n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
if result {
log.G(ctx).Debugf(
"a snapshot at index %d is needed in order to complete raft DEK rotation - a snapshot with index >= %d can now be triggered",
n.waitForAppliedIndex, n.appliedIndex)
}
return result
}
func (n *Node) maybeMarkRotationFinished(ctx context.Context) {
if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotIndex {
if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotMeta.Index {
// this means we tried to rotate - so finish the rotation
if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil {
log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation")
} else {
log.G(ctx).Debugf(
"a snapshot with index %d is available, which completes the DEK rotation requiring a snapshot of at least index %d - throwing away DEK and older snapshots encrypted with the old key",
n.snapshotMeta.Index, n.waitForAppliedIndex)
n.waitForAppliedIndex = 0
if err := n.raftLogger.GC(n.snapshotMeta.Index, n.snapshotMeta.Term, 0); err != nil {
log.G(ctx).WithError(err).Error("failed to remove old snapshots and WALs that were written with the previous raft DEK")
}
}
}
}
@ -954,13 +975,13 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
defer cancel()
if err := member.HealthCheck(healthCtx); err != nil {
n.processRaftMessageLogger(ctx, msg).Debug("member which sent vote request failed health check")
n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("member which sent vote request failed health check")
return &api.ProcessRaftMessageResponse{}, nil
}
}
if msg.Message.Type == raftpb.MsgProp {
// We don't accepted forwarded proposals. Our
// We don't accept forwarded proposals. Our
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
@ -1249,8 +1270,8 @@ func (n *Node) saveToStorage(
if len(entries) > 0 {
lastIndex := entries[len(entries)-1].Index
if lastIndex > n.writtenIndex {
n.writtenIndex = lastIndex
if lastIndex > n.writtenWALIndex {
n.writtenWALIndex = lastIndex
}
}
@ -1300,25 +1321,32 @@ func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership.
defer n.asyncTasks.Done()
defer close(thisSend)
ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout)
defer cancel()
if lastSend != nil {
waitCtx, waitCancel := context.WithTimeout(ctx, n.opts.SendTimeout)
defer waitCancel()
select {
case <-lastSend:
case <-ctx.Done():
case <-waitCtx.Done():
return
}
select {
case <-waitCtx.Done():
return
default:
}
}
ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout)
defer cancel()
if n.cluster.IsIDRemoved(m.To) {
// Should not send to removed members
return
}
var (
conn *membership.Member
)
var conn *membership.Member
if toMember, ok := members[m.To]; ok {
conn = toMember
} else {

View file

@ -167,11 +167,11 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
viewStarted := make(chan struct{})
n.asyncTasks.Add(1)
n.snapshotInProgress = make(chan uint64, 1) // buffered in case Shutdown is called during the snapshot
go func(appliedIndex, snapshotIndex uint64) {
n.snapshotInProgress = make(chan raftpb.SnapshotMetadata, 1) // buffered in case Shutdown is called during the snapshot
go func(appliedIndex uint64, snapshotMeta raftpb.SnapshotMetadata) {
defer func() {
n.asyncTasks.Done()
n.snapshotInProgress <- snapshotIndex
n.snapshotInProgress <- snapshotMeta
}()
var err error
n.memoryStore.View(func(tx store.ReadTx) {
@ -197,7 +197,7 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
log.G(ctx).WithError(err).Error("failed to save snapshot")
return
}
snapshotIndex = appliedIndex
snapshotMeta = snap.Metadata
if appliedIndex > raftConfig.LogEntriesForSlowFollowers {
err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers)
@ -205,14 +205,10 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
log.G(ctx).WithError(err).Error("failed to compact snapshot")
}
}
if err := n.raftLogger.GC(snap.Metadata.Index, snap.Metadata.Term, raftConfig.KeepOldSnapshots); err != nil {
log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs")
}
} else if err != raft.ErrSnapOutOfDate {
log.G(ctx).WithError(err).Error("failed to create snapshot")
}
}(n.appliedIndex, n.snapshotIndex)
}(n.appliedIndex, n.snapshotMeta)
// Wait for the goroutine to establish a read transaction, to make
// sure it sees the state as of this moment.

View file

@ -2,7 +2,6 @@ package storage
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
@ -358,7 +357,7 @@ func (e *EncryptedRaftLogger) Close(ctx context.Context) {
e.snapshotter = nil
}
// Clear closes the existing WAL and moves away the WAL and snapshot.
// Clear closes the existing WAL and removes the WAL and snapshot.
func (e *EncryptedRaftLogger) Clear(ctx context.Context) error {
e.encoderMu.Lock()
defer e.encoderMu.Unlock()
@ -370,23 +369,7 @@ func (e *EncryptedRaftLogger) Clear(ctx context.Context) error {
}
e.snapshotter = nil
newWALDir, err := ioutil.TempDir(e.StateDir, "wal.")
if err != nil {
return err
}
os.RemoveAll(newWALDir)
if err = os.Rename(e.walDir(), newWALDir); err != nil {
return err
}
newSnapDir, err := ioutil.TempDir(e.StateDir, "snap.")
if err != nil {
return err
}
os.RemoveAll(newSnapDir)
if err := os.Rename(e.snapDir(), newSnapDir); err != nil {
return err
}
os.RemoveAll(e.walDir())
os.RemoveAll(e.snapDir())
return nil
}

View file

@ -22,6 +22,7 @@ import (
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager"
"github.com/docker/swarmkit/manager/encryption"
"github.com/docker/swarmkit/manager/state/raft"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/xnet"
"github.com/pkg/errors"
@ -718,7 +719,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
case <-done:
// Fail out if m.Run() returns error, otherwise wait for
// role change.
if runErr != nil {
if runErr != nil && runErr != raft.ErrMemberRemoved {
err = runErr
} else {
err = <-roleChanged