diff --git a/common/transfer.go b/common/transfer.go index 1b172cc4..80bd6ac3 100644 --- a/common/transfer.go +++ b/common/transfer.go @@ -227,7 +227,7 @@ func (t *BaseTransfer) Close() error { if err == nil { fileSize = info.Size() } - t.Connection.Log(logger.LevelDebug, "upload file size %v stat error %v", fileSize, err) + t.Connection.Log(logger.LevelDebug, "uploaded file size %v stat error: %v", fileSize, err) t.updateQuota(numFiles, fileSize) logger.TransferLog(uploadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesReceived), t.Connection.User.Username, t.Connection.ID, t.Connection.protocol) diff --git a/vfs/azblobfs.go b/vfs/azblobfs.go index 4828602a..14de0c93 100644 --- a/vfs/azblobfs.go +++ b/vfs/azblobfs.go @@ -3,7 +3,9 @@ package vfs import ( + "bytes" "context" + "encoding/base64" "errors" "fmt" "io" @@ -14,6 +16,7 @@ import ( "path" "path/filepath" "strings" + "sync" "time" "github.com/Azure/azure-storage-blob-go/azblob" @@ -268,12 +271,16 @@ func (fs AzureBlobFs) Create(name string, flag int) (*os.File, *PipeWriter, func go func() { defer cancelFn() - uploadOptions := azblob.UploadStreamToBlockBlobOptions{ + /*uploadOptions := azblob.UploadStreamToBlockBlobOptions{ BufferSize: int(fs.config.UploadPartSize), BlobHTTPHeaders: headers, MaxBuffers: fs.config.UploadConcurrency, } - _, err := azblob.UploadStreamToBlockBlob(ctx, r, blobBlockURL, uploadOptions) + // UploadStreamToBlockBlob seems to have issues if there is an error, for example + // if we shutdown Azurite while uploading it hangs, so we use our own wrapper for + // the low level functions + _, err := azblob.UploadStreamToBlockBlob(ctx, r, blobBlockURL, uploadOptions)*/ + err := fs.handleMultipartUpload(ctx, r, blobBlockURL, headers) r.CloseWithError(err) //nolint:errcheck p.Done(err) fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %v", name, r.GetReadedBytes(), err) @@ -445,7 +452,11 @@ func (fs AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) { if blobPrefix.Name == "/" { continue } + // sometime we have duplicate prefixes, maybe an Azurite bug name := strings.TrimPrefix(blobPrefix.Name, prefix) + if _, ok := prefixes[strings.TrimSuffix(name, "/")]; ok { + continue + } result = append(result, NewFileInfo(name, true, 0, time.Now(), false)) prefixes[strings.TrimSuffix(name, "/")] = true } @@ -463,6 +474,7 @@ func (fs AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) { if _, ok := prefixes[name]; ok { continue } + prefixes[name] = true } } result = append(result, NewFileInfo(name, isDir, size, blobInfo.Properties.LastModified, false)) @@ -722,3 +734,152 @@ func (fs *AzureBlobFs) getPrefixForStat(name string) string { } return prefix } + +func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader, blockBlobURL azblob.BlockBlobURL, + httpHeaders azblob.BlobHTTPHeaders) error { + partSize := fs.config.UploadPartSize + guard := make(chan struct{}, fs.config.UploadConcurrency) + blockCtxTimeout := time.Duration(fs.config.UploadPartSize/(1024*1024)) * time.Minute + + // sync.Pool seems to use a lot of memory so prefer our own, very simple, allocator + // we only need to recycle few byte slices + pool := newBufferAllocator(int(partSize)) + finished := false + binaryBlockID := make([]byte, 8) + var blocks []string + var wg sync.WaitGroup + var errOnce sync.Once + var poolError error + + poolCtx, poolCancel := context.WithCancel(ctx) + defer poolCancel() + + for part := 0; !finished; part++ { + buf := pool.getBuffer() + + n, err := fs.readFill(reader, buf) + if err == io.EOF { + // read finished, if n > 0 we need to process the last data chunck + if n == 0 { + pool.releaseBuffer(buf) + break + } + finished = true + } else if err != nil { + pool.releaseBuffer(buf) + return err + } + + fs.incrementBlockID(binaryBlockID) + blockID := base64.StdEncoding.EncodeToString(binaryBlockID) + blocks = append(blocks, blockID) + + guard <- struct{}{} + if poolError != nil { + fsLog(fs, logger.LevelDebug, "pool error, upload for part %v not started", part) + pool.releaseBuffer(buf) + break + } + + wg.Add(1) + go func(blockID string, buf []byte, bufSize int) { + defer wg.Done() + bufferReader := bytes.NewReader(buf[:bufSize]) + innerCtx, cancelFn := context.WithDeadline(poolCtx, time.Now().Add(blockCtxTimeout)) + defer cancelFn() + + _, err := blockBlobURL.StageBlock(innerCtx, blockID, bufferReader, azblob.LeaseAccessConditions{}, nil) + pool.releaseBuffer(buf) + if err != nil { + errOnce.Do(func() { + poolError = err + fsLog(fs, logger.LevelDebug, "multipart upload error: %v", poolError) + poolCancel() + }) + } + <-guard + }(blockID, buf, n) + } + + wg.Wait() + close(guard) + pool.free() + + if poolError != nil { + return poolError + } + + _, err := blockBlobURL.CommitBlockList(ctx, blocks, httpHeaders, azblob.Metadata{}, azblob.BlobAccessConditions{}) + return err +} + +// copied from rclone +func (fs *AzureBlobFs) readFill(r io.Reader, buf []byte) (n int, err error) { + var nn int + for n < len(buf) && err == nil { + nn, err = r.Read(buf[n:]) + n += nn + } + return n, err +} + +// copied from rclone +func (fs *AzureBlobFs) incrementBlockID(blockID []byte) { + for i, digit := range blockID { + newDigit := digit + 1 + blockID[i] = newDigit + if newDigit >= digit { + // exit if no carry + break + } + } +} + +type bufferAllocator struct { + sync.Mutex + available [][]byte + bufferSize int +} + +func newBufferAllocator(size int) *bufferAllocator { + return &bufferAllocator{ + bufferSize: size, + } +} + +func (b *bufferAllocator) getBuffer() []byte { + b.Lock() + defer b.Unlock() + + if len(b.available) > 0 { + var result []byte + + truncLength := len(b.available) - 1 + result = b.available[truncLength] + + b.available[truncLength] = nil + b.available = b.available[:truncLength] + + return result + } + + return make([]byte, b.bufferSize) +} + +func (b *bufferAllocator) releaseBuffer(buf []byte) { + b.Lock() + defer b.Unlock() + + if len(buf) != b.bufferSize { + return + } + + b.available = append(b.available, buf) +} + +func (b *bufferAllocator) free() { + b.Lock() + defer b.Unlock() + + b.available = nil +} diff --git a/vfs/vfs.go b/vfs/vfs.go index 9355cd75..3dc8192c 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -244,7 +244,7 @@ func ValidateS3FsConfig(config *S3FsConfig) error { if config.UploadPartSize != 0 && (config.UploadPartSize < 5 || config.UploadPartSize > 5000) { return errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)") } - if config.UploadConcurrency < 0 { + if config.UploadConcurrency < 0 || config.UploadConcurrency > 64 { return fmt.Errorf("invalid upload concurrency: %v", config.UploadConcurrency) } return nil @@ -300,7 +300,7 @@ func ValidateAzBlobFsConfig(config *AzBlobFsConfig) error { if config.UploadPartSize < 0 || config.UploadPartSize > 100 { return fmt.Errorf("invalid upload part size: %v", config.UploadPartSize) } - if config.UploadConcurrency < 0 { + if config.UploadConcurrency < 0 || config.UploadConcurrency > 64 { return fmt.Errorf("invalid upload concurrency: %v", config.UploadConcurrency) } return nil