From 90bce505c4069ef68242df3dc386168cba0aabb8 Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Tue, 24 Oct 2023 19:14:33 +0200 Subject: [PATCH] improve conditional resuming of uploads Signed-off-by: Nicola Murino --- go.mod | 4 +- go.sum | 8 +-- internal/ftpd/handler.go | 2 +- internal/ftpd/transfer.go | 2 +- internal/plugin/auth.go | 2 +- internal/vfs/azblobfs.go | 21 +++++--- internal/vfs/gcsfs.go | 102 ++++++++++++++++++++++++-------------- internal/vfs/s3fs.go | 16 ++++-- internal/vfs/vfs.go | 31 ++++++++++++ 9 files changed, 132 insertions(+), 56 deletions(-) diff --git a/go.mod b/go.mod index 9e6de3bb..23059480 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.4 github.com/jackc/pgx/v5 v5.4.3 github.com/jlaffaye/ftp v0.0.0-20201112195030-9aae4d151126 - github.com/klauspost/compress v1.17.1 + github.com/klauspost/compress v1.17.2 github.com/lestrrat-go/jwx/v2 v2.0.15 github.com/lithammer/shortuuid/v3 v3.0.7 github.com/mattn/go-sqlite3 v1.14.17 @@ -113,7 +113,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/s2a-go v0.1.7 // indirect - github.com/googleapis/enterprise-certificate-proxy v0.3.1 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/go.sum b/go.sum index 401b5f97..acad1d3b 100644 --- a/go.sum +++ b/go.sum @@ -286,8 +286,8 @@ github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/wire v0.5.0 h1:I7ELFeVBr3yfPIcc8+MWvrjk+3VjbcSzoXm3JVa+jD8= github.com/google/wire v0.5.0/go.mod h1:ngWDr9Qvq3yZA10YrxfyGELY/AFWGVpy9c1LTRi1EoU= -github.com/googleapis/enterprise-certificate-proxy v0.3.1 h1:SBWmZhjUDRorQxrN0nwzf+AHBxnbFjViHQS4P0yVpmQ= -github.com/googleapis/enterprise-certificate-proxy v0.3.1/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= +github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= +github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= @@ -327,8 +327,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g= -github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= diff --git a/internal/ftpd/handler.go b/internal/ftpd/handler.go index 207eb173..f18e14c6 100644 --- a/internal/ftpd/handler.go +++ b/internal/ftpd/handler.go @@ -501,7 +501,7 @@ func (c *Connection) handleFTPUploadToExistingFile(fs vfs.Fs, flags int, resolve baseTransfer := common.NewBaseTransfer(file, c.BaseConnection, cancelFn, resolvedPath, filePath, requestPath, common.TransferUpload, minWriteOffset, initialSize, maxWriteSize, truncatedSize, false, fs, transferQuota) baseTransfer.SetFtpMode(c.getFTPMode()) - t := newTransfer(baseTransfer, w, nil, 0) + t := newTransfer(baseTransfer, w, nil, minWriteOffset) return t, nil } diff --git a/internal/ftpd/transfer.go b/internal/ftpd/transfer.go index 212d5957..a90bf426 100644 --- a/internal/ftpd/transfer.go +++ b/internal/ftpd/transfer.go @@ -101,7 +101,7 @@ func (t *transfer) Seek(offset int64, whence int) (int64, error) { } return ret, err } - if t.reader != nil && t.expectedOffset == offset && whence == io.SeekStart { + if (t.reader != nil || t.writer != nil) && t.expectedOffset == offset && whence == io.SeekStart { return offset, nil } t.TransferError(errors.New("seek is unsupported for this transfer")) diff --git a/internal/plugin/auth.go b/internal/plugin/auth.go index 3e6001c4..a9527440 100644 --- a/internal/plugin/auth.go +++ b/internal/plugin/auth.go @@ -136,7 +136,7 @@ func (p *authPlugin) initialize() error { }) rpcClient, err := client.Client() if err != nil { - logger.Debug(logSender, "", "unable to get rpc client for kms plugin %q: %v", p.config.Cmd, err) + logger.Debug(logSender, "", "unable to get rpc client for auth plugin %q: %v", p.config.Cmd, err) return err } raw, err := rpcClient.Dispense(auth.PluginName) diff --git a/internal/vfs/azblobfs.go b/internal/vfs/azblobfs.go index cd46239d..ffe348fc 100644 --- a/internal/vfs/azblobfs.go +++ b/internal/vfs/azblobfs.go @@ -237,7 +237,12 @@ func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, PipeWriter, } ctx, cancelFn := context.WithCancel(context.Background()) - p := NewPipeWriter(w) + var p PipeWriter + if checks&CheckResume != 0 { + p = newPipeWriterAtOffset(w, 0) + } else { + p = NewPipeWriter(w) + } headers := blob.HTTPHeaders{} var contentType string var metadata map[string]*string @@ -268,7 +273,10 @@ func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, PipeWriter, readCh := make(chan error, 1) go func() { - err = fs.downloadToWriter(name, p) + n, err := fs.downloadToWriter(name, p) + pw := p.(*pipeWriterAtOffset) + pw.offset = 0 + pw.writeOffset = n readCh <- err }() @@ -1195,17 +1203,18 @@ func (fs *AzureBlobFs) getCopyOptions() *blob.StartCopyFromURLOptions { return copyOptions } -func (fs *AzureBlobFs) downloadToWriter(name string, w PipeWriter) error { +func (fs *AzureBlobFs) downloadToWriter(name string, w PipeWriter) (int64, error) { fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name) ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout) defer cancelFn() blockBlob := fs.containerClient.NewBlockBlobClient(name) err := fs.handleMultipartDownload(ctx, blockBlob, 0, w, nil) + n := w.GetWrittenBytes() fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v", - name, w.GetWrittenBytes(), err) - metric.AZTransferCompleted(w.GetWrittenBytes(), 1, err) - return err + name, n, err) + metric.AZTransferCompleted(n, 1, err) + return n, err } func (fs *AzureBlobFs) getStorageID() string { diff --git a/internal/vfs/gcsfs.go b/internal/vfs/gcsfs.go index 392b02b8..281b47e2 100644 --- a/internal/vfs/gcsfs.go +++ b/internal/vfs/gcsfs.go @@ -32,6 +32,7 @@ import ( "cloud.google.com/go/storage" "github.com/eikenb/pipeat" "github.com/pkg/sftp" + "github.com/rs/xid" "google.golang.org/api/googleapi" "google.golang.org/api/iterator" "google.golang.org/api/option" @@ -178,12 +179,17 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func() if err != nil { return nil, nil, nil, err } + var partialFileName string + var attrs *storage.ObjectAttrs + var statErr error + bkt := fs.svc.Bucket(fs.config.Bucket) obj := bkt.Object(name) + if flag == -1 { obj = obj.If(storage.Conditions{DoesNotExist: true}) } else { - attrs, statErr := fs.headObject(name) + attrs, statErr = fs.headObject(name) if statErr == nil { obj = obj.If(storage.Conditions{GenerationMatch: attrs.Generation}) } else if fs.IsNotExist(statErr) { @@ -192,10 +198,27 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func() fsLog(fs, logger.LevelWarn, "unable to set precondition for %q, stat err: %v", name, statErr) } } - p := NewPipeWriter(w) - ctx, cancelFn := context.WithCancel(context.Background()) - objectWriter := obj.NewWriter(ctx) + + var p PipeWriter + var objectWriter *storage.Writer + if checks&CheckResume != 0 { + if statErr != nil { + cancelFn() + r.Close() + w.Close() + return nil, nil, nil, fmt.Errorf("unable to resume %q stat error: %w", name, statErr) + } + p = newPipeWriterAtOffset(w, attrs.Size) + partialFileName = fs.getTempObject(name) + partialObj := bkt.Object(partialFileName) + partialObj = partialObj.If(storage.Conditions{DoesNotExist: true}) + objectWriter = partialObj.NewWriter(ctx) + } else { + p = NewPipeWriter(w) + objectWriter = obj.NewWriter(ctx) + } + if fs.config.UploadPartSize > 0 { objectWriter.ChunkSize = int(fs.config.UploadPartSize) * 1024 * 1024 } @@ -218,6 +241,11 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func() if err == nil { err = closeErr } + if err == nil && partialFileName != "" { + partialObject := bkt.Object(partialFileName) + partialObject = partialObject.If(storage.Conditions{GenerationMatch: objectWriter.Attrs().Generation}) + err = fs.composeObjects(ctx, obj, partialObject) + } r.CloseWithError(err) //nolint:errcheck p.Done(err) fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %v, err: %+v", @@ -225,23 +253,6 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func() metric.GCSTransferCompleted(n, 0, err) }() - if checks&CheckResume != 0 { - readCh := make(chan error, 1) - - go func() { - err = fs.downloadToWriter(name, p) - readCh <- err - }() - - err = <-readCh - if err != nil { - cancelFn() - p.Close() - fsLog(fs, logger.LevelDebug, "download before resume failed, writer closed and read cancelled") - return nil, nil, nil, err - } - } - return nil, p, cancelFn, nil } @@ -290,6 +301,9 @@ func (fs *GCSFs) Remove(name string, isDir bool) error { err := obj.Delete(ctx) if isDir && fs.IsNotExist(err) { // we can have directories without a trailing "/" (created using v2.1.0 and before) + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) + defer cancelFn() + err = fs.svc.Bucket(fs.config.Bucket).Object(strings.TrimSuffix(name, "/")).Delete(ctx) } metric.GCSDeleteObjectCompleted(err) @@ -442,8 +456,8 @@ func (*GCSFs) IsUploadResumeSupported() bool { // IsConditionalUploadResumeSupported returns if resuming uploads is supported // for the specified size -func (*GCSFs) IsConditionalUploadResumeSupported(size int64) bool { - return size <= resumeMaxSize +func (*GCSFs) IsConditionalUploadResumeSupported(_ int64) bool { + return true } // IsAtomicUploadSupported returns true if atomic upload is supported. @@ -777,22 +791,30 @@ func (fs *GCSFs) setWriterAttrs(objectWriter *storage.Writer, contentType string } } -func (fs *GCSFs) downloadToWriter(name string, w PipeWriter) error { - fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name) - ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout) +func (fs *GCSFs) composeObjects(ctx context.Context, dst, partialObject *storage.ObjectHandle) error { + fsLog(fs, logger.LevelDebug, "start object compose for partial file %q, destination %q", + partialObject.ObjectName(), dst.ObjectName()) + composer := dst.ComposerFrom(dst, partialObject) + if fs.config.StorageClass != "" { + composer.StorageClass = fs.config.StorageClass + } + if fs.config.ACL != "" { + composer.PredefinedACL = fs.config.ACL + } + contentType := mime.TypeByExtension(path.Ext(dst.ObjectName())) + if contentType != "" { + composer.ContentType = contentType + } + _, err := composer.Run(ctx) + fsLog(fs, logger.LevelDebug, "object compose for %q finished, err: %v", dst.ObjectName(), err) + + delCtx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() - bkt := fs.svc.Bucket(fs.config.Bucket) - obj := bkt.Object(name) - objectReader, err := obj.NewRangeReader(ctx, 0, -1) - if err != nil { - fsLog(fs, logger.LevelDebug, "unable to start download before resuming upload, path %q, err: %v", name, err) - return err - } - n, err := io.Copy(w, objectReader) - fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v", - name, n, err) - metric.GCSTransferCompleted(n, 1, err) + errDelete := partialObject.Delete(delCtx) + metric.GCSDeleteObjectCompleted(errDelete) + fsLog(fs, logger.LevelDebug, "deleted partial file %q after composing with %q, err: %v", + partialObject.ObjectName(), dst.ObjectName(), errDelete) return err } @@ -976,6 +998,12 @@ func (*GCSFs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) { return nil, ErrStorageSizeUnavailable } +func (*GCSFs) getTempObject(name string) string { + dir := filepath.Dir(name) + guid := xid.New().String() + return filepath.Join(dir, ".sftpgo-partial."+guid+"."+filepath.Base(name)) +} + func (fs *GCSFs) getStorageID() string { return fmt.Sprintf("gs://%v", fs.config.Bucket) } diff --git a/internal/vfs/s3fs.go b/internal/vfs/s3fs.go index b9d0066e..c59b6cbd 100644 --- a/internal/vfs/s3fs.go +++ b/internal/vfs/s3fs.go @@ -252,7 +252,12 @@ func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), if err != nil { return nil, nil, nil, err } - p := NewPipeWriter(w) + var p PipeWriter + if checks&CheckResume != 0 { + p = newPipeWriterAtOffset(w, 0) + } else { + p = NewPipeWriter(w) + } ctx, cancelFn := context.WithCancel(context.Background()) uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) { u.Concurrency = fs.config.UploadConcurrency @@ -292,7 +297,10 @@ func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), readCh := make(chan error, 1) go func() { - err = fs.downloadToWriter(name, p) + n, err := fs.downloadToWriter(name, p) + pw := p.(*pipeWriterAtOffset) + pw.offset = 0 + pw.writeOffset = n readCh <- err }() @@ -1050,7 +1058,7 @@ func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) { return nil, ErrStorageSizeUnavailable } -func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) error { +func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) (int64, error) { fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name) ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout) defer cancelFn() @@ -1072,7 +1080,7 @@ func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) error { fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v", name, n, err) metric.S3TransferCompleted(n, 1, err) - return err + return n, err } func (fs *S3Fs) getStorageID() string { diff --git a/internal/vfs/vfs.go b/internal/vfs/vfs.go index 7ad3a9d8..313da055 100644 --- a/internal/vfs/vfs.go +++ b/internal/vfs/vfs.go @@ -746,6 +746,37 @@ func (p *pipeWriter) Done(err error) { p.done <- true } +func newPipeWriterAtOffset(w *pipeat.PipeWriterAt, offset int64) PipeWriter { + return &pipeWriterAtOffset{ + pipeWriter: &pipeWriter{ + PipeWriterAt: w, + err: nil, + done: make(chan bool), + }, + offset: offset, + writeOffset: offset, + } +} + +type pipeWriterAtOffset struct { + *pipeWriter + offset int64 + writeOffset int64 +} + +func (p *pipeWriterAtOffset) WriteAt(buf []byte, off int64) (int, error) { + if off < p.offset { + return 0, fmt.Errorf("invalid offset %d, minimum accepted %d", off, p.offset) + } + return p.pipeWriter.WriteAt(buf, off-p.offset) +} + +func (p *pipeWriterAtOffset) Write(buf []byte) (int, error) { + n, err := p.WriteAt(buf, p.writeOffset) + p.writeOffset += int64(n) + return n, err +} + // NewPipeReader initializes a new PipeReader func NewPipeReader(r *pipeat.PipeReaderAt) *PipeReader { return &PipeReader{