Merge pull request #39531 from tonistiigi/swarm-ci-check

integration-cli: fix swarm tests flakiness
This commit is contained in:
Michael Crosby 2019-07-18 10:09:34 -04:00 committed by GitHub
commit fd6f0b1cab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 79 additions and 29 deletions

View file

@ -27,6 +27,7 @@ import (
"github.com/docker/docker/internal/test/request"
"github.com/docker/swarmkit/ca"
"github.com/go-check/check"
"github.com/pkg/errors"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
)
@ -313,13 +314,24 @@ func (s *DockerSwarmSuite) TestAPISwarmLeaderElection(c *check.C) {
leader *daemon.Daemon // keep track of leader
followers []*daemon.Daemon // keep track of followers
)
var lastErr error
checkLeader := func(nodes ...*daemon.Daemon) checkF {
return func(c *check.C) (interface{}, check.CommentInterface) {
// clear these out before each run
leader = nil
followers = nil
for _, d := range nodes {
if d.GetNode(c, d.NodeID()).ManagerStatus.Leader {
n := d.GetNode(c, d.NodeID(), func(err error) bool {
if strings.Contains(errors.Cause(err).Error(), context.DeadlineExceeded.Error()) || strings.Contains(err.Error(), "swarm does not have a leader") {
lastErr = err
return true
}
return false
})
if n == nil {
return false, check.Commentf("failed to get node: %v", lastErr)
}
if n.ManagerStatus.Leader {
leader = d
} else {
followers = append(followers, d)
@ -391,7 +403,7 @@ func (s *DockerSwarmSuite) TestAPISwarmRaftQuorum(c *check.C) {
defer cli.Close()
// d1 will eventually step down from leader because there is no longer an active quorum, wait for that to happen
waitAndAssert(c, defaultReconciliationTimeout, func(c *check.C) (interface{}, check.CommentInterface) {
waitAndAssert(c, defaultReconciliationTimeout*2, func(c *check.C) (interface{}, check.CommentInterface) {
_, err := cli.ServiceCreate(context.Background(), service.Spec, types.ServiceCreateOptions{})
return err.Error(), nil
}, checker.Contains, "Make sure more than half of the managers are online.")

View file

@ -1303,9 +1303,21 @@ func (s *DockerSwarmSuite) TestSwarmRotateUnlockKey(c *check.C) {
c.Assert(getNodeStatus(c, d), checker.Equals, swarm.LocalNodeStateActive)
outs, err = d.Cmd("node", "ls")
assert.NilError(c, err)
c.Assert(outs, checker.Not(checker.Contains), "Swarm is encrypted and needs to be unlocked")
retry := 0
for {
// an issue sometimes prevents leader to be available right away
outs, err = d.Cmd("node", "ls")
if err != nil && retry < 5 {
if strings.Contains(err.Error(), "swarm does not have a leader") {
retry++
time.Sleep(3 * time.Second)
continue
}
}
assert.NilError(c, err)
c.Assert(outs, checker.Not(checker.Contains), "Swarm is encrypted and needs to be unlocked")
break
}
unlockKey = newUnlockKey
}
@ -1383,9 +1395,21 @@ func (s *DockerSwarmSuite) TestSwarmClusterRotateUnlockKey(c *check.C) {
c.Assert(getNodeStatus(c, d), checker.Equals, swarm.LocalNodeStateActive)
outs, err = d.Cmd("node", "ls")
c.Assert(err, checker.IsNil, check.Commentf("%s", outs))
c.Assert(outs, checker.Not(checker.Contains), "Swarm is encrypted and needs to be unlocked")
retry := 0
for {
// an issue sometimes prevents leader to be available right away
outs, err = d.Cmd("node", "ls")
if err != nil && retry < 5 {
if strings.Contains(err.Error(), "swarm does not have a leader") {
retry++
time.Sleep(3 * time.Second)
continue
}
}
c.Assert(err, checker.IsNil, check.Commentf("%s", outs))
c.Assert(outs, checker.Not(checker.Contains), "Swarm is encrypted and needs to be unlocked")
break
}
}
unlockKey = newUnlockKey

View file

@ -15,7 +15,7 @@ import (
type NodeConstructor func(*swarm.Node)
// GetNode returns a swarm node identified by the specified id
func (d *Daemon) GetNode(t assert.TestingT, id string) *swarm.Node {
func (d *Daemon) GetNode(t assert.TestingT, id string, errCheck ...func(error) bool) *swarm.Node {
if ht, ok := t.(test.HelperT); ok {
ht.Helper()
}
@ -23,6 +23,13 @@ func (d *Daemon) GetNode(t assert.TestingT, id string) *swarm.Node {
defer cli.Close()
node, _, err := cli.NodeInspectWithRaw(context.Background(), id)
if err != nil {
for _, f := range errCheck {
if f(err) {
return nil
}
}
}
assert.NilError(t, err, "[%s] (*Daemon).GetNode: NodeInspectWithRaw(%q) failed", d.id, id)
assert.Check(t, node.ID == id)
return &node

View file

@ -130,7 +130,7 @@ github.com/containerd/ttrpc 699c4e40d1e7416e08bf7019c7ce
github.com/gogo/googleapis d31c731455cb061f42baff3bda55bad0118b126b # v1.2.0
# cluster
github.com/docker/swarmkit fb584e7b501ec4683b5c3e62476d76b8a7e7d9f6
github.com/docker/swarmkit 7dded76ec532741c1ad9736cd2bb6d6661f0a386
github.com/gogo/protobuf ba06b47c162d49f2af050fb4c75bcbc86a159d5c # v1.2.1
github.com/cloudflare/cfssl 5d63dbd981b5c408effbb58c442d54761ff94fbd # 1.3.2
github.com/fernet/fernet-go 9eac43b88a5efb8651d24de9b68e87567e029736

View file

@ -575,7 +575,7 @@ func (a *Agent) nodeDescriptionWithHostname(ctx context.Context, tlsInfo *api.No
// Override hostname and TLS info
if desc != nil {
if a.config.Hostname != "" && desc != nil {
if a.config.Hostname != "" {
desc.Hostname = a.config.Hostname
}
desc.TLSInfo = tlsInfo

View file

@ -3,6 +3,7 @@ package agent
import (
"context"
"errors"
"math"
"sync"
"time"
@ -64,6 +65,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
cc, err := agent.config.ConnBroker.Select(
grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
)
if err != nil {
@ -136,7 +138,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e
// `ctx` is done and hence fail to propagate the timeout error to the agent.
// If the error is not propogated to the agent, the agent will not close
// the session or rebuild a new session.
sessionCtx, cancelSession := context.WithCancel(ctx) // nolint: vet
sessionCtx, cancelSession := context.WithCancel(ctx) //nolint:govet
// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
@ -159,7 +161,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e
select {
case err := <-errChan:
if err != nil {
return err // nolint: vet
return err //nolint:govet
}
case <-time.After(dispatcherRPCTimeout):
cancelSession()

View file

@ -238,7 +238,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {
if err != nil {
return err
}
if err == nil && len(clusters) == 1 {
if len(clusters) == 1 {
heartbeatPeriod, err := gogotypes.DurationFromProto(clusters[0].Spec.Dispatcher.HeartbeatPeriod)
if err == nil && heartbeatPeriod > 0 {
d.config.HeartbeatPeriod = heartbeatPeriod

View file

@ -22,7 +22,7 @@ func (m *DriverProvider) NewSecretDriver(driver *api.Driver) (*SecretDriver, err
if m.pluginGetter == nil {
return nil, fmt.Errorf("plugin getter is nil")
}
if driver == nil && driver.Name == "" {
if driver == nil || driver.Name == "" {
return nil, fmt.Errorf("driver specification is nil")
}
// Search for the specified plugin

View file

@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
"math"
"net"
"os"
"path/filepath"
@ -758,6 +759,7 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
func(addr string, timeout time.Duration) (net.Conn, error) {
return xnet.DialTimeoutLocal(addr, timeout)
}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
)
if err != nil {
logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster")

View file

@ -516,20 +516,13 @@ func (r *Supervisor) Cancel(taskID string) {
<-delay.doneCh
}
// CancelAll aborts all pending restarts and waits for any instances of
// StartNow that have already triggered to complete.
// CancelAll aborts all pending restarts
func (r *Supervisor) CancelAll() {
var cancelled []delayedStart
r.mu.Lock()
for _, delay := range r.delays {
delay.cancel()
}
r.mu.Unlock()
for _, delay := range cancelled {
<-delay.doneCh
}
}
// ClearServiceHistory forgets restart history related to a given service ID.

View file

@ -47,22 +47,27 @@ func SetServiceTasksRemove(ctx context.Context, s *store.MemoryStore, service *a
err = s.Batch(func(batch *store.Batch) error {
for _, t := range tasks {
err := batch.Update(func(tx store.Tx) error {
// the task may have changed for some reason in the meantime
// since we read it out, so we need to get from the store again
// within the boundaries of a transaction
latestTask := store.GetTask(tx, t.ID)
// time travel is not allowed. if the current desired state is
// above the one we're trying to go to we can't go backwards.
// we have nothing to do and we should skip to the next task
if t.DesiredState > api.TaskStateRemove {
if latestTask.DesiredState > api.TaskStateRemove {
// log a warning, though. we shouln't be trying to rewrite
// a state to an earlier state
log.G(ctx).Warnf(
"cannot update task %v in desired state %v to an earlier desired state %v",
t.ID, t.DesiredState, api.TaskStateRemove,
latestTask.ID, latestTask.DesiredState, api.TaskStateRemove,
)
return nil
}
// update desired state to REMOVE
t.DesiredState = api.TaskStateRemove
latestTask.DesiredState = api.TaskStateRemove
if err := store.UpdateTask(tx, t); err != nil {
if err := store.UpdateTask(tx, latestTask); err != nil {
log.G(ctx).WithError(err).Errorf("failed transaction: update task desired state to REMOVE")
}
return nil

View file

@ -501,7 +501,10 @@ func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, remove
return fmt.Errorf("task %s not found while trying to shut it down", original.ID)
}
if t.DesiredState > api.TaskStateRunning {
return fmt.Errorf("task %s was already shut down when reached by updater", original.ID)
return fmt.Errorf(
"task %s was already shut down when reached by updater (state: %v)",
original.ID, t.DesiredState,
)
}
t.DesiredState = api.TaskStateShutdown
return store.UpdateTask(tx, t)

View file

@ -6,6 +6,7 @@ import (
"crypto/tls"
"encoding/json"
"io/ioutil"
"math"
"net"
"os"
"path/filepath"
@ -33,7 +34,7 @@ import (
"github.com/docker/swarmkit/manager/encryption"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/xnet"
"github.com/grpc-ecosystem/go-grpc-prometheus"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
@ -911,6 +912,7 @@ func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{})
opts := []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
}
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
opts = append(opts, grpc.WithTransportCredentials(insecureCreds))