|
@@ -4,6 +4,7 @@ import (
|
|
"context"
|
|
"context"
|
|
"fmt"
|
|
"fmt"
|
|
"sync"
|
|
"sync"
|
|
|
|
+ "time"
|
|
|
|
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/sirupsen/logrus"
|
|
|
|
|
|
@@ -16,6 +17,8 @@ import (
|
|
"github.com/moby/swarmkit/v2/volumequeue"
|
|
"github.com/moby/swarmkit/v2/volumequeue"
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+const CSI_CALL_TIMEOUT = 15 * time.Second
|
|
|
|
+
|
|
// volumeState keeps track of the state of a volume on this node.
|
|
// volumeState keeps track of the state of a volume on this node.
|
|
type volumeState struct {
|
|
type volumeState struct {
|
|
// volume is the actual VolumeAssignment for this volume
|
|
// volume is the actual VolumeAssignment for this volume
|
|
@@ -87,14 +90,35 @@ func (r *volumes) tryVolume(ctx context.Context, id string, attempt uint) {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // create a sub-context with a timeout. because we can only process one
|
|
|
|
+ // volume at a time, if we rely on the server-side or default timeout, we
|
|
|
|
+ // may be waiting a very long time for a particular volume to fail.
|
|
|
|
+ //
|
|
|
|
+ // TODO(dperny): there is almost certainly a more intelligent way to do
|
|
|
|
+ // this. For example, we could:
|
|
|
|
+ //
|
|
|
|
+ // * Change code such that we can service volumes managed by different
|
|
|
|
+ // plugins at the same time.
|
|
|
|
+ // * Take longer timeouts when we don't have any other volumes in the
|
|
|
|
+ // queue
|
|
|
|
+ // * Have interruptible attempts, so that if we're taking longer
|
|
|
|
+ // timeouts, we can abort them to service new volumes.
|
|
|
|
+ //
|
|
|
|
+ // These are too complicated to be worth the engineering effort at this
|
|
|
|
+ // time.
|
|
|
|
+
|
|
|
|
+ timeoutCtx, cancel := context.WithTimeout(ctx, CSI_CALL_TIMEOUT)
|
|
|
|
+ // always gotta call the WithTimeout cancel
|
|
|
|
+ defer cancel()
|
|
|
|
+
|
|
if !vs.remove {
|
|
if !vs.remove {
|
|
- if err := r.publishVolume(ctx, vs.volume); err != nil {
|
|
|
|
- log.G(ctx).WithError(err).Info("publishing volume failed")
|
|
|
|
|
|
+ if err := r.publishVolume(timeoutCtx, vs.volume); err != nil {
|
|
|
|
+ log.G(timeoutCtx).WithError(err).Info("publishing volume failed")
|
|
r.pendingVolumes.Enqueue(id, attempt+1)
|
|
r.pendingVolumes.Enqueue(id, attempt+1)
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- if err := r.unpublishVolume(ctx, vs.volume); err != nil {
|
|
|
|
- log.G(ctx).WithError(err).Info("upublishing volume failed")
|
|
|
|
|
|
+ if err := r.unpublishVolume(timeoutCtx, vs.volume); err != nil {
|
|
|
|
+ log.G(timeoutCtx).WithError(err).Info("upublishing volume failed")
|
|
r.pendingVolumes.Enqueue(id, attempt+1)
|
|
r.pendingVolumes.Enqueue(id, attempt+1)
|
|
} else {
|
|
} else {
|
|
// if unpublishing was successful, then call the callback
|
|
// if unpublishing was successful, then call the callback
|