From 57c2545cd51c5fe7495c44e46013bbf320645c34 Mon Sep 17 00:00:00 2001 From: Bjorn Neergaard Date: Wed, 2 Nov 2022 15:04:49 -0600 Subject: [PATCH] vendor: github.com/moby/swarmkit/v2 v2.0.0-20221102165002-6341884e5fc9 full diff: https://github.com/moby/swarmkit/compare/48dd89375d0a...6341884e5fc9 Pulls in a set of fixes to SwarmKit's nascent Cluster Volumes support discovered during subsequent development and testing. Signed-off-by: Bjorn Neergaard --- vendor.mod | 2 +- vendor.sum | 4 +-- .../moby/swarmkit/v2/agent/csi/volumes.go | 32 ++++++++++++++++--- .../github.com/moby/swarmkit/v2/ca/server.go | 4 +-- .../swarmkit/v2/manager/controlapi/cluster.go | 2 +- .../swarmkit/v2/manager/controlapi/volume.go | 20 +++++++++--- .../moby/swarmkit/v2/manager/csi/manager.go | 13 +++++++- vendor/modules.txt | 2 +- 8 files changed, 63 insertions(+), 16 deletions(-) diff --git a/vendor.mod b/vendor.mod index 8ab662fef9..2de0d38c16 100644 --- a/vendor.mod +++ b/vendor.mod @@ -55,7 +55,7 @@ require ( github.com/moby/locker v1.0.1 github.com/moby/patternmatcher v0.5.0 github.com/moby/pubsub v1.0.0 - github.com/moby/swarmkit/v2 v2.0.0-20220721174824-48dd89375d0a + github.com/moby/swarmkit/v2 v2.0.0-20221102165002-6341884e5fc9 github.com/moby/sys/mount v0.3.3 github.com/moby/sys/mountinfo v0.6.2 github.com/moby/sys/sequential v0.5.0 diff --git a/vendor.sum b/vendor.sum index 7b48aa3b2f..c7a11c3be4 100644 --- a/vendor.sum +++ b/vendor.sum @@ -777,8 +777,8 @@ github.com/moby/patternmatcher v0.5.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YO github.com/moby/pubsub v1.0.0 h1:jkp/imWsmJz2f6LyFsk7EkVeN2HxR/HTTOY8kHrsxfA= github.com/moby/pubsub v1.0.0/go.mod h1:bXSO+3h5MNXXCaEG+6/NlAIk7MMZbySZlnB+cUQhKKc= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= -github.com/moby/swarmkit/v2 v2.0.0-20220721174824-48dd89375d0a h1:gLcTxHH4egYVhMVFWRxvWsb79Ok4kfTt1/irZNyovUY= -github.com/moby/swarmkit/v2 v2.0.0-20220721174824-48dd89375d0a/go.mod h1:/so6Lct4y1x14UprW/loFsOe6xoXVTlvh25V36ULXNQ= +github.com/moby/swarmkit/v2 v2.0.0-20221102165002-6341884e5fc9 h1:d/XCmjx1zKZdzlBX90kSGDex7V2GE2jdGDr9nXYZg/Q= +github.com/moby/swarmkit/v2 v2.0.0-20221102165002-6341884e5fc9/go.mod h1:/so6Lct4y1x14UprW/loFsOe6xoXVTlvh25V36ULXNQ= github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs= github.com/moby/sys/mount v0.3.3/go.mod h1:PBaEorSNTLG5t/+4EgukEQVlAvVEc6ZjTySwKdqp5K0= github.com/moby/sys/mountinfo v0.4.0/go.mod h1:rEr8tzG/lsIZHBtN/JjGG+LMYx9eXgW2JI+6q0qou+A= diff --git a/vendor/github.com/moby/swarmkit/v2/agent/csi/volumes.go b/vendor/github.com/moby/swarmkit/v2/agent/csi/volumes.go index c9c97a2ff0..a2127fc963 100644 --- a/vendor/github.com/moby/swarmkit/v2/agent/csi/volumes.go +++ b/vendor/github.com/moby/swarmkit/v2/agent/csi/volumes.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/sirupsen/logrus" @@ -16,6 +17,8 @@ import ( "github.com/moby/swarmkit/v2/volumequeue" ) +const CSI_CALL_TIMEOUT = 15 * time.Second + // volumeState keeps track of the state of a volume on this node. type volumeState struct { // volume is the actual VolumeAssignment for this volume @@ -87,14 +90,35 @@ func (r *volumes) tryVolume(ctx context.Context, id string, attempt uint) { return } + // create a sub-context with a timeout. because we can only process one + // volume at a time, if we rely on the server-side or default timeout, we + // may be waiting a very long time for a particular volume to fail. + // + // TODO(dperny): there is almost certainly a more intelligent way to do + // this. For example, we could: + // + // * Change code such that we can service volumes managed by different + // plugins at the same time. + // * Take longer timeouts when we don't have any other volumes in the + // queue + // * Have interruptible attempts, so that if we're taking longer + // timeouts, we can abort them to service new volumes. + // + // These are too complicated to be worth the engineering effort at this + // time. + + timeoutCtx, cancel := context.WithTimeout(ctx, CSI_CALL_TIMEOUT) + // always gotta call the WithTimeout cancel + defer cancel() + if !vs.remove { - if err := r.publishVolume(ctx, vs.volume); err != nil { - log.G(ctx).WithError(err).Info("publishing volume failed") + if err := r.publishVolume(timeoutCtx, vs.volume); err != nil { + log.G(timeoutCtx).WithError(err).Info("publishing volume failed") r.pendingVolumes.Enqueue(id, attempt+1) } } else { - if err := r.unpublishVolume(ctx, vs.volume); err != nil { - log.G(ctx).WithError(err).Info("upublishing volume failed") + if err := r.unpublishVolume(timeoutCtx, vs.volume); err != nil { + log.G(timeoutCtx).WithError(err).Info("upublishing volume failed") r.pendingVolumes.Enqueue(id, attempt+1) } else { // if unpublishing was successful, then call the callback diff --git a/vendor/github.com/moby/swarmkit/v2/ca/server.go b/vendor/github.com/moby/swarmkit/v2/ca/server.go index f5a5277553..44a51b5e24 100644 --- a/vendor/github.com/moby/swarmkit/v2/ca/server.go +++ b/vendor/github.com/moby/swarmkit/v2/ca/server.go @@ -695,7 +695,7 @@ func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster, reconci log.G(ctx).Warn("no certificate expiration specified, using default") } // Attempt to update our local RootCA with the new parameters - updatedRootCA, err := RootCAFromAPI(ctx, rCA, expiry) + updatedRootCA, err := RootCAFromAPI(rCA, expiry) if err != nil { return errors.Wrap(err, "invalid Root CA object in cluster") } @@ -901,7 +901,7 @@ func isFinalState(status api.IssuanceStatus) bool { } // RootCAFromAPI creates a RootCA object from an api.RootCA object -func RootCAFromAPI(ctx context.Context, apiRootCA *api.RootCA, expiry time.Duration) (RootCA, error) { +func RootCAFromAPI(apiRootCA *api.RootCA, expiry time.Duration) (RootCA, error) { var intermediates []byte signingCert := apiRootCA.CACert signingKey := apiRootCA.CAKey diff --git a/vendor/github.com/moby/swarmkit/v2/manager/controlapi/cluster.go b/vendor/github.com/moby/swarmkit/v2/manager/controlapi/cluster.go index fbee6f5f96..3a264079ec 100644 --- a/vendor/github.com/moby/swarmkit/v2/manager/controlapi/cluster.go +++ b/vendor/github.com/moby/swarmkit/v2/manager/controlapi/cluster.go @@ -119,7 +119,7 @@ func (s *Server) UpdateCluster(ctx context.Context, request *api.UpdateClusterRe } // This ensures that we have the current rootCA with which to generate tokens (expiration doesn't matter // for generating the tokens) - rootCA, err := ca.RootCAFromAPI(ctx, &cluster.RootCA, ca.DefaultNodeCertExpiration) + rootCA, err := ca.RootCAFromAPI(&cluster.RootCA, ca.DefaultNodeCertExpiration) if err != nil { log.G(ctx).WithField( "method", "(*controlapi.Server).UpdateCluster").WithError(err).Error("invalid cluster root CA") diff --git a/vendor/github.com/moby/swarmkit/v2/manager/controlapi/volume.go b/vendor/github.com/moby/swarmkit/v2/manager/controlapi/volume.go index 8b01eb5c5d..1d30e8965c 100644 --- a/vendor/github.com/moby/swarmkit/v2/manager/controlapi/volume.go +++ b/vendor/github.com/moby/swarmkit/v2/manager/controlapi/volume.go @@ -2,6 +2,7 @@ package controlapi import ( "context" + "reflect" "strings" "github.com/moby/swarmkit/v2/api" @@ -94,17 +95,28 @@ func (s *Server) UpdateVolume(ctx context.Context, request *api.UpdateVolumeRequ if request.Spec.Group != volume.Spec.Group { return status.Errorf(codes.InvalidArgument, "Group cannot be updated") } - if request.Spec.AccessibilityRequirements != volume.Spec.AccessibilityRequirements { + if !reflect.DeepEqual(request.Spec.AccessibilityRequirements, volume.Spec.AccessibilityRequirements) { return status.Errorf(codes.InvalidArgument, "AccessibilityRequirements cannot be updated") } - if request.Spec.Driver == nil || request.Spec.Driver.Name != volume.Spec.Driver.Name { + if !reflect.DeepEqual(request.Spec.Driver, volume.Spec.Driver) { return status.Errorf(codes.InvalidArgument, "Driver cannot be updated") } - if request.Spec.AccessMode.Scope != volume.Spec.AccessMode.Scope || request.Spec.AccessMode.Sharing != volume.Spec.AccessMode.Sharing { + if !reflect.DeepEqual(request.Spec.AccessMode, volume.Spec.AccessMode) { return status.Errorf(codes.InvalidArgument, "AccessMode cannot be updated") } + if !reflect.DeepEqual(request.Spec.Secrets, volume.Spec.Secrets) { + return status.Errorf(codes.InvalidArgument, "Secrets cannot be updated") + } + if !reflect.DeepEqual(request.Spec.CapacityRange, volume.Spec.CapacityRange) { + return status.Errorf(codes.InvalidArgument, "CapacityRange cannot be updated") + } + + // to further guard against changing fields we're not allowed to, don't + // replace the entire spec. just replace the fields we are allowed to + // change + volume.Spec.Annotations.Labels = request.Spec.Annotations.Labels + volume.Spec.Availability = request.Spec.Availability - volume.Spec = *request.Spec volume.Meta.Version = *request.VolumeVersion if err := store.UpdateVolume(tx, volume); err != nil { return err diff --git a/vendor/github.com/moby/swarmkit/v2/manager/csi/manager.go b/vendor/github.com/moby/swarmkit/v2/manager/csi/manager.go index dc9862fc4f..f1136ac253 100644 --- a/vendor/github.com/moby/swarmkit/v2/manager/csi/manager.go +++ b/vendor/github.com/moby/swarmkit/v2/manager/csi/manager.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/docker/go-events" "github.com/sirupsen/logrus" @@ -23,6 +24,10 @@ const ( // plugin interface is "docker.csicontroller/1.0". This gets only the CSI // plugins with Controller capability. DockerCSIPluginCap = "csicontroller" + + // CSIRPCTimeout is the client-side timeout duration for RPCs to the CSI + // plugin. + CSIRPCTimeout = 15 * time.Second ) type Manager struct { @@ -149,11 +154,17 @@ func (vm *Manager) run(pctx context.Context) { // processVolumes encapuslates the logic for processing pending Volumes. func (vm *Manager) processVolume(ctx context.Context, id string, attempt uint) { // set up log fields for a derrived context to pass to handleVolume. - dctx := log.WithFields(ctx, logrus.Fields{ + logCtx := log.WithFields(ctx, logrus.Fields{ "volume.id": id, "attempt": attempt, }) + // Set a client-side timeout. Without this, one really long server-side + // timeout can block processing all volumes until it completes or fails. + dctx, cancel := context.WithTimeout(logCtx, CSIRPCTimeout) + // always gotta call the WithTimeout cancel + defer cancel() + err := vm.handleVolume(dctx, id) // TODO(dperny): differentiate between retryable and non-retryable // errors. diff --git a/vendor/modules.txt b/vendor/modules.txt index 2c7958c043..11f9be1e03 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -600,7 +600,7 @@ github.com/moby/patternmatcher # github.com/moby/pubsub v1.0.0 ## explicit; go 1.19 github.com/moby/pubsub -# github.com/moby/swarmkit/v2 v2.0.0-20220721174824-48dd89375d0a +# github.com/moby/swarmkit/v2 v2.0.0-20221102165002-6341884e5fc9 ## explicit; go 1.17 github.com/moby/swarmkit/v2/agent github.com/moby/swarmkit/v2/agent/configs