Browse Source

azure: implement multipart uploads using low level API

The high level wrapper seems to hang if there are network issues
Nicola Murino 4 năm trước cách đây
mục cha
commit
9b49f63a97
3 tập tin đã thay đổi với 166 bổ sung5 xóa
  1. 1 1
      common/transfer.go
  2. 163 2
      vfs/azblobfs.go
  3. 2 2
      vfs/vfs.go

+ 1 - 1
common/transfer.go

@@ -227,7 +227,7 @@ func (t *BaseTransfer) Close() error {
 		if err == nil {
 			fileSize = info.Size()
 		}
-		t.Connection.Log(logger.LevelDebug, "upload file size %v stat error %v", fileSize, err)
+		t.Connection.Log(logger.LevelDebug, "uploaded file size %v stat error: %v", fileSize, err)
 		t.updateQuota(numFiles, fileSize)
 		logger.TransferLog(uploadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesReceived), t.Connection.User.Username,
 			t.Connection.ID, t.Connection.protocol)

+ 163 - 2
vfs/azblobfs.go

@@ -3,7 +3,9 @@
 package vfs
 
 import (
+	"bytes"
 	"context"
+	"encoding/base64"
 	"errors"
 	"fmt"
 	"io"
@@ -14,6 +16,7 @@ import (
 	"path"
 	"path/filepath"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/Azure/azure-storage-blob-go/azblob"
@@ -268,12 +271,16 @@ func (fs AzureBlobFs) Create(name string, flag int) (*os.File, *PipeWriter, func
 	go func() {
 		defer cancelFn()
 
-		uploadOptions := azblob.UploadStreamToBlockBlobOptions{
+		/*uploadOptions := azblob.UploadStreamToBlockBlobOptions{
 			BufferSize:      int(fs.config.UploadPartSize),
 			BlobHTTPHeaders: headers,
 			MaxBuffers:      fs.config.UploadConcurrency,
 		}
-		_, err := azblob.UploadStreamToBlockBlob(ctx, r, blobBlockURL, uploadOptions)
+		// UploadStreamToBlockBlob seems to have issues if there is an error, for example
+		// if we shutdown Azurite while uploading it hangs, so we use our own wrapper for
+		// the low level functions
+		_, err := azblob.UploadStreamToBlockBlob(ctx, r, blobBlockURL, uploadOptions)*/
+		err := fs.handleMultipartUpload(ctx, r, blobBlockURL, headers)
 		r.CloseWithError(err) //nolint:errcheck
 		p.Done(err)
 		fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %v", name, r.GetReadedBytes(), err)
@@ -445,7 +452,11 @@ func (fs AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
 			if blobPrefix.Name == "/" {
 				continue
 			}
+			// sometime we have duplicate prefixes, maybe an Azurite bug
 			name := strings.TrimPrefix(blobPrefix.Name, prefix)
+			if _, ok := prefixes[strings.TrimSuffix(name, "/")]; ok {
+				continue
+			}
 			result = append(result, NewFileInfo(name, true, 0, time.Now(), false))
 			prefixes[strings.TrimSuffix(name, "/")] = true
 		}
@@ -463,6 +474,7 @@ func (fs AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
 					if _, ok := prefixes[name]; ok {
 						continue
 					}
+					prefixes[name] = true
 				}
 			}
 			result = append(result, NewFileInfo(name, isDir, size, blobInfo.Properties.LastModified, false))
@@ -722,3 +734,152 @@ func (fs *AzureBlobFs) getPrefixForStat(name string) string {
 	}
 	return prefix
 }
+
+func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader, blockBlobURL azblob.BlockBlobURL,
+	httpHeaders azblob.BlobHTTPHeaders) error {
+	partSize := fs.config.UploadPartSize
+	guard := make(chan struct{}, fs.config.UploadConcurrency)
+	blockCtxTimeout := time.Duration(fs.config.UploadPartSize/(1024*1024)) * time.Minute
+
+	// sync.Pool seems to use a lot of memory so prefer our own, very simple, allocator
+	// we only need to recycle few byte slices
+	pool := newBufferAllocator(int(partSize))
+	finished := false
+	binaryBlockID := make([]byte, 8)
+	var blocks []string
+	var wg sync.WaitGroup
+	var errOnce sync.Once
+	var poolError error
+
+	poolCtx, poolCancel := context.WithCancel(ctx)
+	defer poolCancel()
+
+	for part := 0; !finished; part++ {
+		buf := pool.getBuffer()
+
+		n, err := fs.readFill(reader, buf)
+		if err == io.EOF {
+			// read finished, if n > 0 we need to process the last data chunck
+			if n == 0 {
+				pool.releaseBuffer(buf)
+				break
+			}
+			finished = true
+		} else if err != nil {
+			pool.releaseBuffer(buf)
+			return err
+		}
+
+		fs.incrementBlockID(binaryBlockID)
+		blockID := base64.StdEncoding.EncodeToString(binaryBlockID)
+		blocks = append(blocks, blockID)
+
+		guard <- struct{}{}
+		if poolError != nil {
+			fsLog(fs, logger.LevelDebug, "pool error, upload for part %v not started", part)
+			pool.releaseBuffer(buf)
+			break
+		}
+
+		wg.Add(1)
+		go func(blockID string, buf []byte, bufSize int) {
+			defer wg.Done()
+			bufferReader := bytes.NewReader(buf[:bufSize])
+			innerCtx, cancelFn := context.WithDeadline(poolCtx, time.Now().Add(blockCtxTimeout))
+			defer cancelFn()
+
+			_, err := blockBlobURL.StageBlock(innerCtx, blockID, bufferReader, azblob.LeaseAccessConditions{}, nil)
+			pool.releaseBuffer(buf)
+			if err != nil {
+				errOnce.Do(func() {
+					poolError = err
+					fsLog(fs, logger.LevelDebug, "multipart upload error: %v", poolError)
+					poolCancel()
+				})
+			}
+			<-guard
+		}(blockID, buf, n)
+	}
+
+	wg.Wait()
+	close(guard)
+	pool.free()
+
+	if poolError != nil {
+		return poolError
+	}
+
+	_, err := blockBlobURL.CommitBlockList(ctx, blocks, httpHeaders, azblob.Metadata{}, azblob.BlobAccessConditions{})
+	return err
+}
+
+// copied from rclone
+func (fs *AzureBlobFs) readFill(r io.Reader, buf []byte) (n int, err error) {
+	var nn int
+	for n < len(buf) && err == nil {
+		nn, err = r.Read(buf[n:])
+		n += nn
+	}
+	return n, err
+}
+
+// copied from rclone
+func (fs *AzureBlobFs) incrementBlockID(blockID []byte) {
+	for i, digit := range blockID {
+		newDigit := digit + 1
+		blockID[i] = newDigit
+		if newDigit >= digit {
+			// exit if no carry
+			break
+		}
+	}
+}
+
+type bufferAllocator struct {
+	sync.Mutex
+	available  [][]byte
+	bufferSize int
+}
+
+func newBufferAllocator(size int) *bufferAllocator {
+	return &bufferAllocator{
+		bufferSize: size,
+	}
+}
+
+func (b *bufferAllocator) getBuffer() []byte {
+	b.Lock()
+	defer b.Unlock()
+
+	if len(b.available) > 0 {
+		var result []byte
+
+		truncLength := len(b.available) - 1
+		result = b.available[truncLength]
+
+		b.available[truncLength] = nil
+		b.available = b.available[:truncLength]
+
+		return result
+	}
+
+	return make([]byte, b.bufferSize)
+}
+
+func (b *bufferAllocator) releaseBuffer(buf []byte) {
+	b.Lock()
+	defer b.Unlock()
+
+	if len(buf) != b.bufferSize {
+		return
+	}
+
+	b.available = append(b.available, buf)
+}
+
+func (b *bufferAllocator) free() {
+	b.Lock()
+	defer b.Unlock()
+
+	b.available = nil
+}

+ 2 - 2
vfs/vfs.go

@@ -244,7 +244,7 @@ func ValidateS3FsConfig(config *S3FsConfig) error {
 	if config.UploadPartSize != 0 && (config.UploadPartSize < 5 || config.UploadPartSize > 5000) {
 		return errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)")
 	}
-	if config.UploadConcurrency < 0 {
+	if config.UploadConcurrency < 0 || config.UploadConcurrency > 64 {
 		return fmt.Errorf("invalid upload concurrency: %v", config.UploadConcurrency)
 	}
 	return nil
@@ -300,7 +300,7 @@ func ValidateAzBlobFsConfig(config *AzBlobFsConfig) error {
 	if config.UploadPartSize < 0 || config.UploadPartSize > 100 {
 		return fmt.Errorf("invalid upload part size: %v", config.UploadPartSize)
 	}
-	if config.UploadConcurrency < 0 {
+	if config.UploadConcurrency < 0 || config.UploadConcurrency > 64 {
 		return fmt.Errorf("invalid upload concurrency: %v", config.UploadConcurrency)
 	}
 	return nil