improve conditional resuming of uploads
Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
parent
320e404e4d
commit
90bce505c4
9 changed files with 132 additions and 56 deletions
4
go.mod
4
go.mod
|
@ -38,7 +38,7 @@ require (
|
|||
github.com/hashicorp/go-retryablehttp v0.7.4
|
||||
github.com/jackc/pgx/v5 v5.4.3
|
||||
github.com/jlaffaye/ftp v0.0.0-20201112195030-9aae4d151126
|
||||
github.com/klauspost/compress v1.17.1
|
||||
github.com/klauspost/compress v1.17.2
|
||||
github.com/lestrrat-go/jwx/v2 v2.0.15
|
||||
github.com/lithammer/shortuuid/v3 v3.0.7
|
||||
github.com/mattn/go-sqlite3 v1.14.17
|
||||
|
@ -113,7 +113,7 @@ require (
|
|||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/google/s2a-go v0.1.7 // indirect
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.1 // indirect
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
|
||||
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
|
||||
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
|
|
8
go.sum
8
go.sum
|
@ -286,8 +286,8 @@ github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
|
|||
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/wire v0.5.0 h1:I7ELFeVBr3yfPIcc8+MWvrjk+3VjbcSzoXm3JVa+jD8=
|
||||
github.com/google/wire v0.5.0/go.mod h1:ngWDr9Qvq3yZA10YrxfyGELY/AFWGVpy9c1LTRi1EoU=
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.1 h1:SBWmZhjUDRorQxrN0nwzf+AHBxnbFjViHQS4P0yVpmQ=
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.1/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
|
||||
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
|
||||
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
|
||||
github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas=
|
||||
|
@ -327,8 +327,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC
|
|||
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
|
||||
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g=
|
||||
github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
|
||||
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
|
||||
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
|
||||
|
|
|
@ -501,7 +501,7 @@ func (c *Connection) handleFTPUploadToExistingFile(fs vfs.Fs, flags int, resolve
|
|||
baseTransfer := common.NewBaseTransfer(file, c.BaseConnection, cancelFn, resolvedPath, filePath, requestPath,
|
||||
common.TransferUpload, minWriteOffset, initialSize, maxWriteSize, truncatedSize, false, fs, transferQuota)
|
||||
baseTransfer.SetFtpMode(c.getFTPMode())
|
||||
t := newTransfer(baseTransfer, w, nil, 0)
|
||||
t := newTransfer(baseTransfer, w, nil, minWriteOffset)
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ func (t *transfer) Seek(offset int64, whence int) (int64, error) {
|
|||
}
|
||||
return ret, err
|
||||
}
|
||||
if t.reader != nil && t.expectedOffset == offset && whence == io.SeekStart {
|
||||
if (t.reader != nil || t.writer != nil) && t.expectedOffset == offset && whence == io.SeekStart {
|
||||
return offset, nil
|
||||
}
|
||||
t.TransferError(errors.New("seek is unsupported for this transfer"))
|
||||
|
|
|
@ -136,7 +136,7 @@ func (p *authPlugin) initialize() error {
|
|||
})
|
||||
rpcClient, err := client.Client()
|
||||
if err != nil {
|
||||
logger.Debug(logSender, "", "unable to get rpc client for kms plugin %q: %v", p.config.Cmd, err)
|
||||
logger.Debug(logSender, "", "unable to get rpc client for auth plugin %q: %v", p.config.Cmd, err)
|
||||
return err
|
||||
}
|
||||
raw, err := rpcClient.Dispense(auth.PluginName)
|
||||
|
|
|
@ -237,7 +237,12 @@ func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, PipeWriter,
|
|||
}
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
|
||||
p := NewPipeWriter(w)
|
||||
var p PipeWriter
|
||||
if checks&CheckResume != 0 {
|
||||
p = newPipeWriterAtOffset(w, 0)
|
||||
} else {
|
||||
p = NewPipeWriter(w)
|
||||
}
|
||||
headers := blob.HTTPHeaders{}
|
||||
var contentType string
|
||||
var metadata map[string]*string
|
||||
|
@ -268,7 +273,10 @@ func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, PipeWriter,
|
|||
readCh := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
err = fs.downloadToWriter(name, p)
|
||||
n, err := fs.downloadToWriter(name, p)
|
||||
pw := p.(*pipeWriterAtOffset)
|
||||
pw.offset = 0
|
||||
pw.writeOffset = n
|
||||
readCh <- err
|
||||
}()
|
||||
|
||||
|
@ -1195,17 +1203,18 @@ func (fs *AzureBlobFs) getCopyOptions() *blob.StartCopyFromURLOptions {
|
|||
return copyOptions
|
||||
}
|
||||
|
||||
func (fs *AzureBlobFs) downloadToWriter(name string, w PipeWriter) error {
|
||||
func (fs *AzureBlobFs) downloadToWriter(name string, w PipeWriter) (int64, error) {
|
||||
fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
|
||||
ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
|
||||
defer cancelFn()
|
||||
|
||||
blockBlob := fs.containerClient.NewBlockBlobClient(name)
|
||||
err := fs.handleMultipartDownload(ctx, blockBlob, 0, w, nil)
|
||||
n := w.GetWrittenBytes()
|
||||
fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
|
||||
name, w.GetWrittenBytes(), err)
|
||||
metric.AZTransferCompleted(w.GetWrittenBytes(), 1, err)
|
||||
return err
|
||||
name, n, err)
|
||||
metric.AZTransferCompleted(n, 1, err)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (fs *AzureBlobFs) getStorageID() string {
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"cloud.google.com/go/storage"
|
||||
"github.com/eikenb/pipeat"
|
||||
"github.com/pkg/sftp"
|
||||
"github.com/rs/xid"
|
||||
"google.golang.org/api/googleapi"
|
||||
"google.golang.org/api/iterator"
|
||||
"google.golang.org/api/option"
|
||||
|
@ -178,12 +179,17 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func()
|
|||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
var partialFileName string
|
||||
var attrs *storage.ObjectAttrs
|
||||
var statErr error
|
||||
|
||||
bkt := fs.svc.Bucket(fs.config.Bucket)
|
||||
obj := bkt.Object(name)
|
||||
|
||||
if flag == -1 {
|
||||
obj = obj.If(storage.Conditions{DoesNotExist: true})
|
||||
} else {
|
||||
attrs, statErr := fs.headObject(name)
|
||||
attrs, statErr = fs.headObject(name)
|
||||
if statErr == nil {
|
||||
obj = obj.If(storage.Conditions{GenerationMatch: attrs.Generation})
|
||||
} else if fs.IsNotExist(statErr) {
|
||||
|
@ -192,10 +198,27 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func()
|
|||
fsLog(fs, logger.LevelWarn, "unable to set precondition for %q, stat err: %v", name, statErr)
|
||||
}
|
||||
}
|
||||
p := NewPipeWriter(w)
|
||||
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
objectWriter := obj.NewWriter(ctx)
|
||||
|
||||
var p PipeWriter
|
||||
var objectWriter *storage.Writer
|
||||
if checks&CheckResume != 0 {
|
||||
if statErr != nil {
|
||||
cancelFn()
|
||||
r.Close()
|
||||
w.Close()
|
||||
return nil, nil, nil, fmt.Errorf("unable to resume %q stat error: %w", name, statErr)
|
||||
}
|
||||
p = newPipeWriterAtOffset(w, attrs.Size)
|
||||
partialFileName = fs.getTempObject(name)
|
||||
partialObj := bkt.Object(partialFileName)
|
||||
partialObj = partialObj.If(storage.Conditions{DoesNotExist: true})
|
||||
objectWriter = partialObj.NewWriter(ctx)
|
||||
} else {
|
||||
p = NewPipeWriter(w)
|
||||
objectWriter = obj.NewWriter(ctx)
|
||||
}
|
||||
|
||||
if fs.config.UploadPartSize > 0 {
|
||||
objectWriter.ChunkSize = int(fs.config.UploadPartSize) * 1024 * 1024
|
||||
}
|
||||
|
@ -218,6 +241,11 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func()
|
|||
if err == nil {
|
||||
err = closeErr
|
||||
}
|
||||
if err == nil && partialFileName != "" {
|
||||
partialObject := bkt.Object(partialFileName)
|
||||
partialObject = partialObject.If(storage.Conditions{GenerationMatch: objectWriter.Attrs().Generation})
|
||||
err = fs.composeObjects(ctx, obj, partialObject)
|
||||
}
|
||||
r.CloseWithError(err) //nolint:errcheck
|
||||
p.Done(err)
|
||||
fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %v, err: %+v",
|
||||
|
@ -225,23 +253,6 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func()
|
|||
metric.GCSTransferCompleted(n, 0, err)
|
||||
}()
|
||||
|
||||
if checks&CheckResume != 0 {
|
||||
readCh := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
err = fs.downloadToWriter(name, p)
|
||||
readCh <- err
|
||||
}()
|
||||
|
||||
err = <-readCh
|
||||
if err != nil {
|
||||
cancelFn()
|
||||
p.Close()
|
||||
fsLog(fs, logger.LevelDebug, "download before resume failed, writer closed and read cancelled")
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return nil, p, cancelFn, nil
|
||||
}
|
||||
|
||||
|
@ -290,6 +301,9 @@ func (fs *GCSFs) Remove(name string, isDir bool) error {
|
|||
err := obj.Delete(ctx)
|
||||
if isDir && fs.IsNotExist(err) {
|
||||
// we can have directories without a trailing "/" (created using v2.1.0 and before)
|
||||
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
err = fs.svc.Bucket(fs.config.Bucket).Object(strings.TrimSuffix(name, "/")).Delete(ctx)
|
||||
}
|
||||
metric.GCSDeleteObjectCompleted(err)
|
||||
|
@ -442,8 +456,8 @@ func (*GCSFs) IsUploadResumeSupported() bool {
|
|||
|
||||
// IsConditionalUploadResumeSupported returns if resuming uploads is supported
|
||||
// for the specified size
|
||||
func (*GCSFs) IsConditionalUploadResumeSupported(size int64) bool {
|
||||
return size <= resumeMaxSize
|
||||
func (*GCSFs) IsConditionalUploadResumeSupported(_ int64) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// IsAtomicUploadSupported returns true if atomic upload is supported.
|
||||
|
@ -777,22 +791,30 @@ func (fs *GCSFs) setWriterAttrs(objectWriter *storage.Writer, contentType string
|
|||
}
|
||||
}
|
||||
|
||||
func (fs *GCSFs) downloadToWriter(name string, w PipeWriter) error {
|
||||
fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
|
||||
ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
|
||||
func (fs *GCSFs) composeObjects(ctx context.Context, dst, partialObject *storage.ObjectHandle) error {
|
||||
fsLog(fs, logger.LevelDebug, "start object compose for partial file %q, destination %q",
|
||||
partialObject.ObjectName(), dst.ObjectName())
|
||||
composer := dst.ComposerFrom(dst, partialObject)
|
||||
if fs.config.StorageClass != "" {
|
||||
composer.StorageClass = fs.config.StorageClass
|
||||
}
|
||||
if fs.config.ACL != "" {
|
||||
composer.PredefinedACL = fs.config.ACL
|
||||
}
|
||||
contentType := mime.TypeByExtension(path.Ext(dst.ObjectName()))
|
||||
if contentType != "" {
|
||||
composer.ContentType = contentType
|
||||
}
|
||||
_, err := composer.Run(ctx)
|
||||
fsLog(fs, logger.LevelDebug, "object compose for %q finished, err: %v", dst.ObjectName(), err)
|
||||
|
||||
delCtx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
|
||||
defer cancelFn()
|
||||
|
||||
bkt := fs.svc.Bucket(fs.config.Bucket)
|
||||
obj := bkt.Object(name)
|
||||
objectReader, err := obj.NewRangeReader(ctx, 0, -1)
|
||||
if err != nil {
|
||||
fsLog(fs, logger.LevelDebug, "unable to start download before resuming upload, path %q, err: %v", name, err)
|
||||
return err
|
||||
}
|
||||
n, err := io.Copy(w, objectReader)
|
||||
fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
|
||||
name, n, err)
|
||||
metric.GCSTransferCompleted(n, 1, err)
|
||||
errDelete := partialObject.Delete(delCtx)
|
||||
metric.GCSDeleteObjectCompleted(errDelete)
|
||||
fsLog(fs, logger.LevelDebug, "deleted partial file %q after composing with %q, err: %v",
|
||||
partialObject.ObjectName(), dst.ObjectName(), errDelete)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -976,6 +998,12 @@ func (*GCSFs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
|
|||
return nil, ErrStorageSizeUnavailable
|
||||
}
|
||||
|
||||
func (*GCSFs) getTempObject(name string) string {
|
||||
dir := filepath.Dir(name)
|
||||
guid := xid.New().String()
|
||||
return filepath.Join(dir, ".sftpgo-partial."+guid+"."+filepath.Base(name))
|
||||
}
|
||||
|
||||
func (fs *GCSFs) getStorageID() string {
|
||||
return fmt.Sprintf("gs://%v", fs.config.Bucket)
|
||||
}
|
||||
|
|
|
@ -252,7 +252,12 @@ func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(),
|
|||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
p := NewPipeWriter(w)
|
||||
var p PipeWriter
|
||||
if checks&CheckResume != 0 {
|
||||
p = newPipeWriterAtOffset(w, 0)
|
||||
} else {
|
||||
p = NewPipeWriter(w)
|
||||
}
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
uploader := manager.NewUploader(fs.svc, func(u *manager.Uploader) {
|
||||
u.Concurrency = fs.config.UploadConcurrency
|
||||
|
@ -292,7 +297,10 @@ func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(),
|
|||
readCh := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
err = fs.downloadToWriter(name, p)
|
||||
n, err := fs.downloadToWriter(name, p)
|
||||
pw := p.(*pipeWriterAtOffset)
|
||||
pw.offset = 0
|
||||
pw.writeOffset = n
|
||||
readCh <- err
|
||||
}()
|
||||
|
||||
|
@ -1050,7 +1058,7 @@ func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
|
|||
return nil, ErrStorageSizeUnavailable
|
||||
}
|
||||
|
||||
func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) error {
|
||||
func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) (int64, error) {
|
||||
fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
|
||||
ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
|
||||
defer cancelFn()
|
||||
|
@ -1072,7 +1080,7 @@ func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) error {
|
|||
fsLog(fs, logger.LevelDebug, "download before resuming upload completed, path %q size: %d, err: %+v",
|
||||
name, n, err)
|
||||
metric.S3TransferCompleted(n, 1, err)
|
||||
return err
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (fs *S3Fs) getStorageID() string {
|
||||
|
|
|
@ -746,6 +746,37 @@ func (p *pipeWriter) Done(err error) {
|
|||
p.done <- true
|
||||
}
|
||||
|
||||
func newPipeWriterAtOffset(w *pipeat.PipeWriterAt, offset int64) PipeWriter {
|
||||
return &pipeWriterAtOffset{
|
||||
pipeWriter: &pipeWriter{
|
||||
PipeWriterAt: w,
|
||||
err: nil,
|
||||
done: make(chan bool),
|
||||
},
|
||||
offset: offset,
|
||||
writeOffset: offset,
|
||||
}
|
||||
}
|
||||
|
||||
type pipeWriterAtOffset struct {
|
||||
*pipeWriter
|
||||
offset int64
|
||||
writeOffset int64
|
||||
}
|
||||
|
||||
func (p *pipeWriterAtOffset) WriteAt(buf []byte, off int64) (int, error) {
|
||||
if off < p.offset {
|
||||
return 0, fmt.Errorf("invalid offset %d, minimum accepted %d", off, p.offset)
|
||||
}
|
||||
return p.pipeWriter.WriteAt(buf, off-p.offset)
|
||||
}
|
||||
|
||||
func (p *pipeWriterAtOffset) Write(buf []byte) (int, error) {
|
||||
n, err := p.WriteAt(buf, p.writeOffset)
|
||||
p.writeOffset += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
// NewPipeReader initializes a new PipeReader
|
||||
func NewPipeReader(r *pipeat.PipeReaderAt) *PipeReader {
|
||||
return &PipeReader{
|
||||
|
|
Loading…
Reference in a new issue