diff --git a/cmd/portable.go b/cmd/portable.go index d61c7a56..228f664b 100644 --- a/cmd/portable.go +++ b/cmd/portable.go @@ -69,6 +69,8 @@ var ( portableAzKeyPrefix string portableAzULPartSize int portableAzULConcurrency int + portableAzDLPartSize int + portableAzDLConcurrency int portableAzUseEmulator bool portableCryptPassphrase string portableSFTPEndpoint string @@ -191,14 +193,16 @@ Please take a look at the usage below to customize the serving parameters`, }, AzBlobConfig: vfs.AzBlobFsConfig{ BaseAzBlobFsConfig: sdk.BaseAzBlobFsConfig{ - Container: portableAzContainer, - AccountName: portableAzAccountName, - Endpoint: portableAzEndpoint, - AccessTier: portableAzAccessTier, - KeyPrefix: portableAzKeyPrefix, - UseEmulator: portableAzUseEmulator, - UploadPartSize: int64(portableAzULPartSize), - UploadConcurrency: portableAzULConcurrency, + Container: portableAzContainer, + AccountName: portableAzAccountName, + Endpoint: portableAzEndpoint, + AccessTier: portableAzAccessTier, + KeyPrefix: portableAzKeyPrefix, + UseEmulator: portableAzUseEmulator, + UploadPartSize: int64(portableAzULPartSize), + UploadConcurrency: portableAzULConcurrency, + DownloadPartSize: int64(portableAzDLPartSize), + DownloadConcurrency: portableAzDLConcurrency, }, AccountKey: kms.NewPlainSecret(portableAzAccountKey), SASURL: kms.NewPlainSecret(portableAzSASURL), @@ -328,9 +332,13 @@ container setting`) portableCmd.Flags().StringVar(&portableAzKeyPrefix, "az-key-prefix", "", `Allows to restrict access to the virtual folder identified by this prefix and its contents`) - portableCmd.Flags().IntVar(&portableAzULPartSize, "az-upload-part-size", 4, `The buffer size for multipart uploads + portableCmd.Flags().IntVar(&portableAzULPartSize, "az-upload-part-size", 5, `The buffer size for multipart uploads (MB)`) - portableCmd.Flags().IntVar(&portableAzULConcurrency, "az-upload-concurrency", 2, `How many parts are uploaded in + portableCmd.Flags().IntVar(&portableAzULConcurrency, "az-upload-concurrency", 5, `How many parts are uploaded in +parallel`) + portableCmd.Flags().IntVar(&portableAzDLPartSize, "az-download-part-size", 5, `The buffer size for multipart downloads +(MB)`) + portableCmd.Flags().IntVar(&portableAzDLConcurrency, "az-download-concurrency", 5, `How many parts are downloaded in parallel`) portableCmd.Flags().BoolVar(&portableAzUseEmulator, "az-use-emulator", false, "") portableCmd.Flags().StringVar(&portableCryptPassphrase, "crypto-passphrase", "", `Passphrase for encryption/decryption`) diff --git a/docs/portable-mode.md b/docs/portable-mode.md index 007c4135..88a3bcb6 100644 --- a/docs/portable-mode.md +++ b/docs/portable-mode.md @@ -30,6 +30,10 @@ Flags: --az-account-key string --az-account-name string --az-container string + --az-download-concurrency int How many parts are downloaded in + parallel (default 5) + --az-download-part-size int The buffer size for multipart downloads + (MB) (default 5) --az-endpoint string Leave empty to use the default: "blob.core.windows.net" --az-key-prefix string Allows to restrict access to the @@ -37,9 +41,9 @@ Flags: prefix and its contents --az-sas-url string Shared access signature URL --az-upload-concurrency int How many parts are uploaded in - parallel (default 2) + parallel (default 5) --az-upload-part-size int The buffer size for multipart uploads - (MB) (default 4) + (MB) (default 5) --az-use-emulator --crypto-passphrase string Passphrase for encryption/decryption --denied-patterns stringArray Denied file patterns case insensitive. diff --git a/go.mod b/go.mod index 7a1e59a8..a61a1e08 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/rs/cors v1.8.2 github.com/rs/xid v1.3.0 github.com/rs/zerolog v1.26.2-0.20220203140311-fc26014bd4e1 - github.com/sftpgo/sdk v0.1.1-0.20220219112139-4616f3c10321 + github.com/sftpgo/sdk v0.1.1-0.20220221175917-da8bdf77ce76 github.com/shirou/gopsutil/v3 v3.22.1 github.com/spf13/afero v1.8.1 github.com/spf13/cobra v1.3.0 diff --git a/go.sum b/go.sum index 38c2dce1..dc3e81d3 100644 --- a/go.sum +++ b/go.sum @@ -696,8 +696,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4 h1:PT+ElG/UUFMfqy5HrxJxNzj3QBOf7dZwupeVC+mG1Lo= github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4/go.mod h1:MnkX001NG75g3p8bhFycnyIjeQoOjGL6CEIsdE/nKSY= -github.com/sftpgo/sdk v0.1.1-0.20220219112139-4616f3c10321 h1:woOiGu0/qrh2nzCQLlX7k3VK2s+kB+wIGaS/jh40/9o= -github.com/sftpgo/sdk v0.1.1-0.20220219112139-4616f3c10321/go.mod h1:zqCRMcwS28IViwekJHNkFu4GqSfyVmOQTlh8h3icAXE= +github.com/sftpgo/sdk v0.1.1-0.20220221175917-da8bdf77ce76 h1:6mLGNio6XJaweaKvVmUHLanDznABa2F2PEbS16fWnxg= +github.com/sftpgo/sdk v0.1.1-0.20220221175917-da8bdf77ce76/go.mod h1:zqCRMcwS28IViwekJHNkFu4GqSfyVmOQTlh8h3icAXE= github.com/shirou/gopsutil/v3 v3.22.1 h1:33y31Q8J32+KstqPfscvFwBlNJ6xLaBy4xqBXzlYV5w= github.com/shirou/gopsutil/v3 v3.22.1/go.mod h1:WapW1AOOPlHyXr+yOyw3uYx36enocrtSoSBy0L5vUHY= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= diff --git a/httpd/httpd_test.go b/httpd/httpd_test.go index ee51c071..d2898b78 100644 --- a/httpd/httpd_test.go +++ b/httpd/httpd_test.go @@ -2749,6 +2749,7 @@ func TestUserAzureBlobConfig(t *testing.T) { user.FsConfig.AzBlobConfig.AccountKey = kms.NewPlainSecret("Server-Account-Key") user.FsConfig.AzBlobConfig.Endpoint = "http://127.0.0.1:9000" user.FsConfig.AzBlobConfig.UploadPartSize = 8 + user.FsConfig.AzBlobConfig.DownloadPartSize = 6 user, _, err = httpdtest.UpdateUser(user, http.StatusOK, "") assert.NoError(t, err) initialPayload := user.FsConfig.AzBlobConfig.AccountKey.GetPayload() @@ -2788,6 +2789,7 @@ func TestUserAzureBlobConfig(t *testing.T) { user.FsConfig.AzBlobConfig.Endpoint = "http://localhost:9001" user.FsConfig.AzBlobConfig.KeyPrefix = "somedir/subdir" user.FsConfig.AzBlobConfig.UploadConcurrency = 5 + user.FsConfig.AzBlobConfig.DownloadConcurrency = 4 user, _, err = httpdtest.UpdateUser(user, http.StatusOK, "") assert.NoError(t, err) assert.Equal(t, sdkkms.SecretStatusSecretBox, user.FsConfig.AzBlobConfig.AccountKey.GetStatus()) @@ -2803,6 +2805,8 @@ func TestUserAzureBlobConfig(t *testing.T) { user.FsConfig.AzBlobConfig.AccountKey = kms.NewEmptySecret() user.FsConfig.AzBlobConfig.UploadPartSize = 6 user.FsConfig.AzBlobConfig.UploadConcurrency = 4 + user.FsConfig.AzBlobConfig.DownloadPartSize = 3 + user.FsConfig.AzBlobConfig.DownloadConcurrency = 5 user, _, err = httpdtest.UpdateUser(user, http.StatusOK, "") assert.NoError(t, err) assert.Nil(t, user.FsConfig.AzBlobConfig.AccountKey) @@ -15417,6 +15421,8 @@ func TestWebUserAzureBlobMock(t *testing.T) { user.FsConfig.AzBlobConfig.KeyPrefix = "somedir/subdir/" user.FsConfig.AzBlobConfig.UploadPartSize = 5 user.FsConfig.AzBlobConfig.UploadConcurrency = 4 + user.FsConfig.AzBlobConfig.DownloadPartSize = 3 + user.FsConfig.AzBlobConfig.DownloadConcurrency = 6 user.FsConfig.AzBlobConfig.UseEmulator = true form := make(url.Values) form.Set(csrfFormToken, csrfToken) @@ -15469,8 +15475,26 @@ func TestWebUserAzureBlobMock(t *testing.T) { req.Header.Set("Content-Type", contentType) rr = executeRequest(req) checkResponseCode(t, http.StatusOK, rr) - // now add the user + // test invalid az_download_part_size form.Set("az_upload_concurrency", strconv.Itoa(user.FsConfig.AzBlobConfig.UploadConcurrency)) + form.Set("az_download_part_size", "a") + b, contentType, _ = getMultipartFormData(form, "", "") + req, _ = http.NewRequest(http.MethodPost, path.Join(webUserPath, user.Username), &b) + setJWTCookieForReq(req, webToken) + req.Header.Set("Content-Type", contentType) + rr = executeRequest(req) + checkResponseCode(t, http.StatusOK, rr) + // test invalid az_download_concurrency + form.Set("az_download_part_size", strconv.FormatInt(user.FsConfig.AzBlobConfig.DownloadPartSize, 10)) + form.Set("az_download_concurrency", "a") + b, contentType, _ = getMultipartFormData(form, "", "") + req, _ = http.NewRequest(http.MethodPost, path.Join(webUserPath, user.Username), &b) + setJWTCookieForReq(req, webToken) + req.Header.Set("Content-Type", contentType) + rr = executeRequest(req) + checkResponseCode(t, http.StatusOK, rr) + // now add the user + form.Set("az_download_concurrency", strconv.Itoa(user.FsConfig.AzBlobConfig.DownloadConcurrency)) b, contentType, _ = getMultipartFormData(form, "", "") req, _ = http.NewRequest(http.MethodPost, path.Join(webUserPath, user.Username), &b) setJWTCookieForReq(req, webToken) @@ -15491,6 +15515,8 @@ func TestWebUserAzureBlobMock(t *testing.T) { assert.Equal(t, updateUser.FsConfig.AzBlobConfig.KeyPrefix, user.FsConfig.AzBlobConfig.KeyPrefix) assert.Equal(t, updateUser.FsConfig.AzBlobConfig.UploadPartSize, user.FsConfig.AzBlobConfig.UploadPartSize) assert.Equal(t, updateUser.FsConfig.AzBlobConfig.UploadConcurrency, user.FsConfig.AzBlobConfig.UploadConcurrency) + assert.Equal(t, updateUser.FsConfig.AzBlobConfig.DownloadPartSize, user.FsConfig.AzBlobConfig.DownloadPartSize) + assert.Equal(t, updateUser.FsConfig.AzBlobConfig.DownloadConcurrency, user.FsConfig.AzBlobConfig.DownloadConcurrency) assert.Equal(t, 2, len(updateUser.Filters.FilePatterns)) assert.Equal(t, sdkkms.SecretStatusSecretBox, updateUser.FsConfig.AzBlobConfig.AccountKey.GetStatus()) assert.NotEmpty(t, updateUser.FsConfig.AzBlobConfig.AccountKey.GetPayload()) diff --git a/httpd/webadmin.go b/httpd/webadmin.go index 7e885553..cd6d1150 100644 --- a/httpd/webadmin.go +++ b/httpd/webadmin.go @@ -1058,6 +1058,14 @@ func getAzureConfig(r *http.Request) (vfs.AzBlobFsConfig, error) { return config, err } config.UploadConcurrency, err = strconv.Atoi(r.Form.Get("az_upload_concurrency")) + if err != nil { + return config, err + } + config.DownloadPartSize, err = strconv.ParseInt(r.Form.Get("az_download_part_size"), 10, 64) + if err != nil { + return config, err + } + config.DownloadConcurrency, err = strconv.Atoi(r.Form.Get("az_download_concurrency")) return config, err } diff --git a/httpdtest/httpdtest.go b/httpdtest/httpdtest.go index d55dcd6d..949122fe 100644 --- a/httpdtest/httpdtest.go +++ b/httpdtest/httpdtest.go @@ -1392,6 +1392,12 @@ func compareAzBlobConfig(expected *vfs.Filesystem, actual *vfs.Filesystem) error if expected.AzBlobConfig.UploadConcurrency != actual.AzBlobConfig.UploadConcurrency { return errors.New("azure Blob upload concurrency mismatch") } + if expected.AzBlobConfig.DownloadPartSize != actual.AzBlobConfig.DownloadPartSize { + return errors.New("azure Blob download part size mismatch") + } + if expected.AzBlobConfig.DownloadConcurrency != actual.AzBlobConfig.DownloadConcurrency { + return errors.New("azure Blob download concurrency mismatch") + } if expected.AzBlobConfig.KeyPrefix != actual.AzBlobConfig.KeyPrefix && expected.AzBlobConfig.KeyPrefix+"/" != actual.AzBlobConfig.KeyPrefix { return errors.New("azure Blob key prefix mismatch") diff --git a/openapi/openapi.yaml b/openapi/openapi.yaml index 7eb89a8b..116a9f89 100644 --- a/openapi/openapi.yaml +++ b/openapi/openapi.yaml @@ -4776,10 +4776,16 @@ components: description: 'optional endpoint. Default is "blob.core.windows.net". If you use the emulator the endpoint must include the protocol, for example "http://127.0.0.1:10000"' upload_part_size: type: integer - description: 'the buffer size (in MB) to use for multipart uploads. If this value is set to zero, the default value (4MB) will be used.' + description: 'the buffer size (in MB) to use for multipart uploads. If this value is set to zero, the default value (5MB) will be used.' upload_concurrency: type: integer - description: 'the number of parts to upload in parallel. If this value is set to zero, the default value (2) will be used' + description: 'the number of parts to upload in parallel. If this value is set to zero, the default value (5) will be used' + download_part_size: + type: integer + description: 'the buffer size (in MB) to use for multipart downloads. If this value is set to zero, the default value (5MB) will be used.' + download_concurrency: + type: integer + description: 'the number of parts to download in parallel. If this value is set to zero, the default value (5) will be used' access_tier: type: string enum: @@ -5688,7 +5694,7 @@ components: type: array items: type: string - description: 'Features for the current build. Available features are "portable", "bolt", "mysql", "sqlite", "pgsql", "s3", "gcs", "metrics". If a feature is available it has a "+" prefix, otherwise a "-" prefix' + description: 'Features for the current build. Available features are `portable`, `bolt`, `mysql`, `sqlite`, `pgsql`, `s3`, `gcs`, `metrics`. If a feature is available it has a `+` prefix, otherwise a `-` prefix' Token: type: object properties: diff --git a/templates/webadmin/fsconfig.html b/templates/webadmin/fsconfig.html index 58729bb3..f1697148 100644 --- a/templates/webadmin/fsconfig.html +++ b/templates/webadmin/fsconfig.html @@ -279,12 +279,12 @@
- +
- - - The buffer size for multipart uploads. Zero means the default (4 MB) + + + The buffer size for multipart uploads. Zero means the default (5 MB)
@@ -292,9 +292,30 @@
- - How many parts are uploaded in parallel. Zero means the default (2) + aria-describedby="AzULConcurrencyHelpBlock"> + + How many parts are uploaded in parallel. Zero means the default (5) + +
+
+ +
+ +
+ + + The buffer size for multipart downloads. Zero means the default (5 MB) + +
+
+ +
+ + + How many parts are downloaded in parallel. Zero means the default (5)
diff --git a/vfs/azblobfs.go b/vfs/azblobfs.go index 6f8c17f5..0a59c075 100644 --- a/vfs/azblobfs.go +++ b/vfs/azblobfs.go @@ -33,7 +33,6 @@ import ( const ( azureDefaultEndpoint = "blob.core.windows.net" - maxResultsPerPage = 1000 ) // AzureBlobFs is a Fs implementation for Azure Blob storage. @@ -193,37 +192,15 @@ func (fs *AzureBlobFs) Open(name string, offset int64) (File, *pipeat.PipeReader return nil, nil, nil, err } ctx, cancelFn := context.WithCancel(context.Background()) - blockBlob := fs.containerClient.NewBlockBlobClient(name) - blobDownloadResponse, err := blockBlob.Download(ctx, &azblob.DownloadBlobOptions{ - Offset: &offset, - }) - if err != nil { - r.Close() - w.Close() - cancelFn() - return nil, nil, nil, err - } - body := blobDownloadResponse.Body(&azblob.RetryReaderOptions{ - MaxRetryRequests: 2, - }) go func() { defer cancelFn() - defer body.Close() - /*err := blockBlob.DownloadBlobToWriterAt(ctx, offset, 0, w, azblob.HighLevelDownloadFromBlobOptions{ - // add download part size and concurrency - BlockSize: fs.config.UploadPartSize, - Parallelism: uint16(fs.config.UploadConcurrency), - RetryReaderOptionsPerBlock: azblob.RetryReaderOptions{ - MaxRetryRequests: 2, - }, - })*/ - n, err := io.Copy(w, body) + err := fs.handleMultipartDownload(ctx, blockBlob, offset, w) w.CloseWithError(err) //nolint:errcheck - fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err) - metric.AZTransferCompleted(n, 1, err) + fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, w.GetWrittenBytes(), err) + metric.AZTransferCompleted(w.GetWrittenBytes(), 1, err) }() return nil, r, cancelFn, nil @@ -253,15 +230,6 @@ func (fs *AzureBlobFs) Create(name string, flag int) (File, *PipeWriter, func(), go func() { defer cancelFn() - /*uploadOptions := azblob.UploadStreamToBlockBlobOptions{ - BufferSize: int(fs.config.UploadPartSize), - MaxBuffers: fs.config.UploadConcurrency, - HTTPHeaders: &headers, - } - if fs.config.AccessTier != "" { - uploadOptions.AccessTier = (*azblob.AccessTier)(&fs.config.AccessTier) - } - _, err := blockBlob.UploadStreamToBlockBlob(ctx, r, uploadOptions)*/ err := fs.handleMultipartUpload(ctx, r, blockBlob, &headers) r.CloseWithError(err) //nolint:errcheck p.Done(err) @@ -449,11 +417,11 @@ func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) { } prefixes := make(map[string]bool) - maxResults := int32(maxResultsPerPage) + timeout := int32(fs.ctxTimeout / time.Second) pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobHierarchySegmentOptions{ - Include: []azblob.ListBlobsIncludeItem{}, - Prefix: &prefix, - Maxresults: &maxResults, + Include: []azblob.ListBlobsIncludeItem{}, + Prefix: &prefix, + Timeout: &timeout, }) hasNext := true @@ -574,10 +542,10 @@ func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) { numFiles := 0 size := int64(0) - maxResults := int32(maxResultsPerPage) + timeout := int32(fs.ctxTimeout / time.Second) pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobFlatSegmentOptions{ - Prefix: &fs.config.KeyPrefix, - Maxresults: &maxResults, + Prefix: &fs.config.KeyPrefix, + Timeout: &timeout, }) hasNext := true @@ -615,11 +583,11 @@ func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, e prefix = strings.TrimPrefix(fsPrefix, "/") } - maxResults := int32(maxResultsPerPage) + timeout := int32(fs.ctxTimeout / time.Second) pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobHierarchySegmentOptions{ - Include: []azblob.ListBlobsIncludeItem{}, - Prefix: &prefix, - Maxresults: &maxResults, + Include: []azblob.ListBlobsIncludeItem{}, + Prefix: &prefix, + Timeout: &timeout, }) hasNext := true @@ -700,10 +668,10 @@ func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error { } } - maxResults := int32(maxResultsPerPage) + timeout := int32(fs.ctxTimeout / time.Second) pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobFlatSegmentOptions{ - Prefix: &prefix, - Maxresults: &maxResults, + Prefix: &prefix, + Timeout: &timeout, }) hasNext := true @@ -826,11 +794,22 @@ func (fs *AzureBlobFs) setConfigDefaults() { fs.config.Endpoint = azureDefaultEndpoint } if fs.config.UploadPartSize == 0 { - fs.config.UploadPartSize = 4 + fs.config.UploadPartSize = 5 + } + if fs.config.UploadPartSize < 1024*1024 { + fs.config.UploadPartSize *= 1024 * 1024 } - fs.config.UploadPartSize *= 1024 * 1024 if fs.config.UploadConcurrency == 0 { - fs.config.UploadConcurrency = 2 + fs.config.UploadConcurrency = 5 + } + if fs.config.DownloadPartSize == 0 { + fs.config.DownloadPartSize = 5 + } + if fs.config.DownloadPartSize < 1024*1024 { + fs.config.DownloadPartSize *= 1024 * 1024 + } + if fs.config.DownloadConcurrency == 0 { + fs.config.DownloadConcurrency = 5 } } @@ -854,20 +833,19 @@ func (fs *AzureBlobFs) hasContents(name string) (bool, error) { } maxResults := int32(1) + timeout := int32(fs.ctxTimeout / time.Second) pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobFlatSegmentOptions{ Maxresults: &maxResults, Prefix: &prefix, + Timeout: &timeout, }) ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() - for pager.NextPage(ctx) { + if pager.NextPage(ctx) { resp := pager.PageResponse() result = len(resp.ContainerListBlobFlatSegmentResult.Segment.BlobItems) > 0 - if result { - break - } } err := pager.Err() @@ -875,6 +853,111 @@ func (fs *AzureBlobFs) hasContents(name string) (bool, error) { return result, err } +func (fs *AzureBlobFs) downloadPart(ctx context.Context, blockBlob azblob.BlockBlobClient, buf []byte, + w io.WriterAt, offset, count, writeOffset int64, +) error { + if count == 0 { + return nil + } + resp, err := blockBlob.Download(ctx, &azblob.DownloadBlobOptions{ + Offset: &offset, + Count: &count, + }) + if err != nil { + return err + } + body := resp.Body(&azblob.RetryReaderOptions{MaxRetryRequests: 2}) + defer body.Close() + + _, err = io.ReadAtLeast(body, buf, int(count)) + if err != nil { + return err + } + + _, err = fs.writeAtFull(w, buf, writeOffset, int(count)) + return err +} + +func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob azblob.BlockBlobClient, + offset int64, writer io.WriterAt, +) error { + props, err := blockBlob.GetProperties(ctx, &azblob.GetBlobPropertiesOptions{ + BlobAccessConditions: &azblob.BlobAccessConditions{}, + }) + if err != nil { + fsLog(fs, logger.LevelError, "unable to get blob properties, download aborted: %+v", err) + return err + } + contentLength := util.GetIntFromPointer(props.ContentLength) + sizeToDownload := contentLength - offset + if sizeToDownload < 0 { + fsLog(fs, logger.LevelError, "invalid multipart download size or offset, size: %v, offset: %v, size to download: %v", + contentLength, offset, sizeToDownload) + return errors.New("the requested offset exceeds the file size") + } + if sizeToDownload == 0 { + fsLog(fs, logger.LevelDebug, "nothing to download, offset %v, content length %v", offset, contentLength) + return nil + } + partSize := fs.config.DownloadPartSize + guard := make(chan struct{}, fs.config.DownloadConcurrency) + blockCtxTimeout := time.Duration(fs.config.DownloadPartSize/(1024*1024)) * time.Minute + pool := newBufferAllocator(int(partSize)) + finished := false + var wg sync.WaitGroup + var errOnce sync.Once + var poolError error + + poolCtx, poolCancel := context.WithCancel(ctx) + defer poolCancel() + + for part := 0; !finished; part++ { + start := offset + end := offset + partSize + if end >= contentLength { + end = contentLength + finished = true + } + writeOffset := int64(part) * partSize + offset = end + + guard <- struct{}{} + if poolError != nil { + fsLog(fs, logger.LevelDebug, "pool error, download for part %v not started", part) + break + } + + buf := pool.getBuffer() + wg.Add(1) + go func(start, end, writeOffset int64, buf []byte) { + defer func() { + pool.releaseBuffer(buf) + <-guard + wg.Done() + }() + + innerCtx, cancelFn := context.WithDeadline(poolCtx, time.Now().Add(blockCtxTimeout)) + defer cancelFn() + + count := end - start + err := fs.downloadPart(innerCtx, blockBlob, buf, writer, start, count, writeOffset) + if err != nil { + errOnce.Do(func() { + poolError = err + fsLog(fs, logger.LevelError, "multipart download error: %v", poolError) + poolCancel() + }) + } + }(start, end, writeOffset, buf) + } + + wg.Wait() + close(guard) + pool.free() + + return poolError +} + func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader, blockBlob azblob.BlockBlobClient, httpHeaders *azblob.BlobHTTPHeaders, ) error { @@ -918,14 +1001,18 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read guard <- struct{}{} if poolError != nil { - fsLog(fs, logger.LevelDebug, "pool error, upload for part %v not started", part) + fsLog(fs, logger.LevelError, "pool error, upload for part %v not started", part) pool.releaseBuffer(buf) break } wg.Add(1) go func(blockID string, buf []byte, bufSize int) { - defer wg.Done() + defer func() { + pool.releaseBuffer(buf) + <-guard + wg.Done() + }() bufferReader := &bytesReaderWrapper{ Reader: bytes.NewReader(buf[:bufSize]), @@ -941,8 +1028,6 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read poolCancel() }) } - pool.releaseBuffer(buf) - <-guard }(blockID, buf, n) } @@ -965,8 +1050,20 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read return err } +func (*AzureBlobFs) writeAtFull(w io.WriterAt, buf []byte, offset int64, count int) (int, error) { + written := 0 + for written < count { + n, err := w.WriteAt(buf[written:count], offset+int64(written)) + written += n + if err != nil { + return written, err + } + } + return written, nil +} + // copied from rclone -func (fs *AzureBlobFs) readFill(r io.Reader, buf []byte) (n int, err error) { +func (*AzureBlobFs) readFill(r io.Reader, buf []byte) (n int, err error) { var nn int for n < len(buf) && err == nil { nn, err = r.Read(buf[n:]) @@ -976,7 +1073,7 @@ func (fs *AzureBlobFs) readFill(r io.Reader, buf []byte) (n int, err error) { } // copied from rclone -func (fs *AzureBlobFs) incrementBlockID(blockID []byte) { +func (*AzureBlobFs) incrementBlockID(blockID []byte) { for i, digit := range blockID { newDigit := digit + 1 blockID[i] = newDigit diff --git a/vfs/filesystem.go b/vfs/filesystem.go index 58bb80df..19254a5a 100644 --- a/vfs/filesystem.go +++ b/vfs/filesystem.go @@ -271,14 +271,16 @@ func (f *Filesystem) GetACopy() Filesystem { }, AzBlobConfig: AzBlobFsConfig{ BaseAzBlobFsConfig: sdk.BaseAzBlobFsConfig{ - Container: f.AzBlobConfig.Container, - AccountName: f.AzBlobConfig.AccountName, - Endpoint: f.AzBlobConfig.Endpoint, - KeyPrefix: f.AzBlobConfig.KeyPrefix, - UploadPartSize: f.AzBlobConfig.UploadPartSize, - UploadConcurrency: f.AzBlobConfig.UploadConcurrency, - UseEmulator: f.AzBlobConfig.UseEmulator, - AccessTier: f.AzBlobConfig.AccessTier, + Container: f.AzBlobConfig.Container, + AccountName: f.AzBlobConfig.AccountName, + Endpoint: f.AzBlobConfig.Endpoint, + KeyPrefix: f.AzBlobConfig.KeyPrefix, + UploadPartSize: f.AzBlobConfig.UploadPartSize, + UploadConcurrency: f.AzBlobConfig.UploadConcurrency, + DownloadPartSize: f.AzBlobConfig.DownloadPartSize, + DownloadConcurrency: f.AzBlobConfig.DownloadConcurrency, + UseEmulator: f.AzBlobConfig.UseEmulator, + AccessTier: f.AzBlobConfig.AccessTier, }, AccountKey: f.AzBlobConfig.AccountKey.Clone(), SASURL: f.AzBlobConfig.SASURL.Clone(), diff --git a/vfs/vfs.go b/vfs/vfs.go index afce556f..cf8d1b06 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -409,12 +409,22 @@ func (c *AzBlobFsConfig) isEqual(other *AzBlobFsConfig) bool { if c.UploadConcurrency != other.UploadConcurrency { return false } + if c.DownloadPartSize != other.DownloadPartSize { + return false + } + if c.DownloadConcurrency != other.DownloadConcurrency { + return false + } if c.UseEmulator != other.UseEmulator { return false } if c.AccessTier != other.AccessTier { return false } + return c.isSecretEqual(other) +} + +func (c *AzBlobFsConfig) isSecretEqual(other *AzBlobFsConfig) bool { if c.AccountKey == nil { c.AccountKey = kms.NewEmptySecret() } @@ -461,6 +471,22 @@ func (c *AzBlobFsConfig) checkCredentials() error { return nil } +func (c *AzBlobFsConfig) checkPartSizeAndConcurrency() error { + if c.UploadPartSize < 0 || c.UploadPartSize > 100 { + return fmt.Errorf("invalid upload part size: %v", c.UploadPartSize) + } + if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 { + return fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency) + } + if c.DownloadPartSize < 0 || c.DownloadPartSize > 100 { + return fmt.Errorf("invalid download part size: %v", c.DownloadPartSize) + } + if c.DownloadConcurrency < 0 || c.DownloadConcurrency > 64 { + return fmt.Errorf("invalid upload concurrency: %v", c.DownloadConcurrency) + } + return nil +} + // Validate returns an error if the configuration is not valid func (c *AzBlobFsConfig) Validate() error { if c.AccountKey == nil { @@ -485,11 +511,8 @@ func (c *AzBlobFsConfig) Validate() error { c.KeyPrefix += "/" } } - if c.UploadPartSize < 0 || c.UploadPartSize > 100 { - return fmt.Errorf("invalid upload part size: %v", c.UploadPartSize) - } - if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 { - return fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency) + if err := c.checkPartSizeAndConcurrency(); err != nil { + return err } if !util.IsStringInSlice(c.AccessTier, validAzAccessTier) { return fmt.Errorf("invalid access tier %#v, valid values: \"''%v\"", c.AccessTier, strings.Join(validAzAccessTier, ", "))