From 1770da545d94e4014b05500bf26e2828c04d674c Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Fri, 13 Mar 2020 19:13:58 +0100 Subject: [PATCH] s3: upload concurrency is now configurable Please note that if the upload bandwidth between the SFTP client and SFTPGo is greater than the upload bandwidth between SFTPGo and S3 then the SFTP client have to wait for the upload of the last parts to S3 after it ends the file upload to SFTPGo, and it may time out. Keep this in mind if you customize parts size and upload concurrency --- dataprovider/user.go | 16 +++++++++------- docs/account.md | 1 + docs/s3.md | 2 ++ httpd/api_utils.go | 5 ++++- httpd/httpd_test.go | 29 +++++++++++++++++++++++++---- httpd/internal_test.go | 5 +++++ httpd/schema/openapi.yaml | 2 +- httpd/web.go | 4 ++++ scripts/README.md | 3 ++- scripts/sftpgo_api_cli.py | 23 +++++++++++++---------- templates/user.html | 8 ++++++++ vfs/s3fs.go | 14 ++++++++++---- vfs/vfs.go | 3 +++ 13 files changed, 87 insertions(+), 28 deletions(-) diff --git a/dataprovider/user.go b/dataprovider/user.go index 175b0e27..40f4e99e 100644 --- a/dataprovider/user.go +++ b/dataprovider/user.go @@ -530,13 +530,15 @@ func (u *User) getACopy() User { fsConfig := Filesystem{ Provider: u.FsConfig.Provider, S3Config: vfs.S3FsConfig{ - Bucket: u.FsConfig.S3Config.Bucket, - Region: u.FsConfig.S3Config.Region, - AccessKey: u.FsConfig.S3Config.AccessKey, - AccessSecret: u.FsConfig.S3Config.AccessSecret, - Endpoint: u.FsConfig.S3Config.Endpoint, - StorageClass: u.FsConfig.S3Config.StorageClass, - KeyPrefix: u.FsConfig.S3Config.KeyPrefix, + Bucket: u.FsConfig.S3Config.Bucket, + Region: u.FsConfig.S3Config.Region, + AccessKey: u.FsConfig.S3Config.AccessKey, + AccessSecret: u.FsConfig.S3Config.AccessSecret, + Endpoint: u.FsConfig.S3Config.Endpoint, + StorageClass: u.FsConfig.S3Config.StorageClass, + KeyPrefix: u.FsConfig.S3Config.KeyPrefix, + UploadPartSize: u.FsConfig.S3Config.UploadPartSize, + UploadConcurrency: u.FsConfig.S3Config.UploadConcurrency, }, GCSConfig: vfs.GCSFsConfig{ Bucket: u.FsConfig.GCSConfig.Bucket, diff --git a/docs/account.md b/docs/account.md index 0824d5a6..64b31d6f 100644 --- a/docs/account.md +++ b/docs/account.md @@ -47,6 +47,7 @@ For each account, the following properties can be configured: - `s3_storage_class`, leave blank to use the default or specify a valid AWS [storage class](https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html) - `s3_key_prefix`, allows to restrict access to the virtual folder identified by this prefix and its contents - `s3_upload_part_size`, the buffer size for multipart uploads (MB). Zero means the default (5 MB). Minimum is 5 +- `s3_upload_concurrency` how many parts are uploaded in parallel - `gcs_bucket`, required for GCS filesystem - `gcs_credentials`, Google Cloud Storage JSON credentials base64 encoded - `gcs_automatic_credentials`, integer. Set to 1 to use Application Default Credentials strategy or set to 0 to use explicit credentials via `gcs_credentials` diff --git a/docs/s3.md b/docs/s3.md index 66bba726..a9e479da 100644 --- a/docs/s3.md +++ b/docs/s3.md @@ -13,6 +13,8 @@ Specifying a different `key_prefix`, you can assign different virtual folders of SFTPGo uses multipart uploads and parallel downloads for storing and retrieving files from S3. +For multipart uploads you can customize the parts size and the upload concurrency. Please note that if the upload bandwidth between the SFTP client and SFTPGo is greater than the upload bandwidth between SFTPGo and S3 then the SFTP client have to wait for the upload of the last parts to S3 after it ends the file upload to SFTPGo, and it may time out. Keep this in mind if you customize these parameters. + The configured bucket must exist. Some SFTP commands don't work over S3: diff --git a/httpd/api_utils.go b/httpd/api_utils.go index d194a437..eb534529 100644 --- a/httpd/api_utils.go +++ b/httpd/api_utils.go @@ -482,7 +482,10 @@ func compareS3Config(expected *dataprovider.User, actual *dataprovider.User) err return errors.New("S3 storage class mismatch") } if expected.FsConfig.S3Config.UploadPartSize != actual.FsConfig.S3Config.UploadPartSize { - return errors.New("S3 upload part size class mismatch") + return errors.New("S3 upload part size mismatch") + } + if expected.FsConfig.S3Config.UploadConcurrency != actual.FsConfig.S3Config.UploadConcurrency { + return errors.New("S3 upload concurrency mismatch") } if expected.FsConfig.S3Config.KeyPrefix != actual.FsConfig.S3Config.KeyPrefix && expected.FsConfig.S3Config.KeyPrefix+"/" != actual.FsConfig.S3Config.KeyPrefix { diff --git a/httpd/httpd_test.go b/httpd/httpd_test.go index b154e70a..c70be603 100644 --- a/httpd/httpd_test.go +++ b/httpd/httpd_test.go @@ -400,6 +400,12 @@ func TestAddUserInvalidFsConfig(t *testing.T) { if err != nil { t.Errorf("unexpected error adding user with invalid fs config: %v", err) } + u.FsConfig.S3Config.UploadPartSize = 0 + u.FsConfig.S3Config.UploadConcurrency = -1 + _, _, err = httpd.AddUser(u, http.StatusBadRequest) + if err != nil { + t.Errorf("unexpected error adding user with invalid fs config: %v", err) + } u = getTestUser() u.FsConfig.Provider = 2 u.FsConfig.GCSConfig.Bucket = "" @@ -646,6 +652,7 @@ func TestUserS3Config(t *testing.T) { user.FsConfig.S3Config.AccessKey = "Server-Access-Key" user.FsConfig.S3Config.AccessSecret = "Server-Access-Secret" user.FsConfig.S3Config.Endpoint = "http://127.0.0.1:9000" + user.FsConfig.S3Config.UploadPartSize = 8 user, _, err = httpd.UpdateUser(user, http.StatusOK) if err != nil { t.Errorf("unable to update user: %v", err) @@ -668,6 +675,7 @@ func TestUserS3Config(t *testing.T) { user.FsConfig.S3Config.AccessKey = "Server-Access-Key1" user.FsConfig.S3Config.Endpoint = "http://localhost:9000" user.FsConfig.S3Config.KeyPrefix = "somedir/subdir" + user.FsConfig.S3Config.UploadConcurrency = 5 user, _, err = httpd.UpdateUser(user, http.StatusOK) if err != nil { t.Errorf("unable to update user: %v", err) @@ -679,6 +687,8 @@ func TestUserS3Config(t *testing.T) { user.FsConfig.S3Config.AccessSecret = "" user.FsConfig.S3Config.Endpoint = "" user.FsConfig.S3Config.KeyPrefix = "" + user.FsConfig.S3Config.UploadPartSize = 0 + user.FsConfig.S3Config.UploadConcurrency = 0 user, _, err = httpd.UpdateUser(user, http.StatusOK) if err != nil { t.Errorf("unable to update user: %v", err) @@ -691,6 +701,8 @@ func TestUserS3Config(t *testing.T) { user.FsConfig.S3Config.AccessSecret = "" user.FsConfig.S3Config.Endpoint = "" user.FsConfig.S3Config.KeyPrefix = "somedir/subdir" + user.FsConfig.S3Config.UploadPartSize = 6 + user.FsConfig.S3Config.UploadConcurrency = 4 user, _, err = httpd.UpdateUser(user, http.StatusOK) if err != nil { t.Errorf("unable to update user: %v", err) @@ -2017,6 +2029,7 @@ func TestWebUserS3Mock(t *testing.T) { user.FsConfig.S3Config.StorageClass = "Standard" user.FsConfig.S3Config.KeyPrefix = "somedir/subdir/" user.FsConfig.S3Config.UploadPartSize = 5 + user.FsConfig.S3Config.UploadConcurrency = 4 form := make(url.Values) form.Set("username", user.Username) form.Set("home_dir", user.HomeDir) @@ -2050,8 +2063,16 @@ func TestWebUserS3Mock(t *testing.T) { req.Header.Set("Content-Type", contentType) rr = executeRequest(req) checkResponseCode(t, http.StatusOK, rr.Code) - // now add the user + // test invalid s3_concurrency form.Set("s3_upload_part_size", strconv.FormatInt(user.FsConfig.S3Config.UploadPartSize, 10)) + form.Set("s3_upload_concurrency", "a") + b, contentType, _ = getMultipartFormData(form, "", "") + req, _ = http.NewRequest(http.MethodPost, webUserPath+"/"+strconv.FormatInt(user.ID, 10), &b) + req.Header.Set("Content-Type", contentType) + rr = executeRequest(req) + checkResponseCode(t, http.StatusOK, rr.Code) + // now add the user + form.Set("s3_upload_concurrency", strconv.Itoa(user.FsConfig.S3Config.UploadConcurrency)) b, contentType, _ = getMultipartFormData(form, "", "") req, _ = http.NewRequest(http.MethodPost, webUserPath+"/"+strconv.FormatInt(user.ID, 10), &b) req.Header.Set("Content-Type", contentType) @@ -2072,9 +2093,6 @@ func TestWebUserS3Mock(t *testing.T) { if updateUser.ExpirationDate != 1577836800000 { t.Errorf("invalid expiration date: %v", updateUser.ExpirationDate) } - if updateUser.FsConfig.Provider != user.FsConfig.Provider { - t.Error("fs provider mismatch") - } if updateUser.FsConfig.S3Config.Bucket != user.FsConfig.S3Config.Bucket { t.Error("s3 bucket mismatch") } @@ -2099,6 +2117,9 @@ func TestWebUserS3Mock(t *testing.T) { if updateUser.FsConfig.S3Config.UploadPartSize != user.FsConfig.S3Config.UploadPartSize { t.Error("s3 upload part size mismatch") } + if updateUser.FsConfig.S3Config.UploadConcurrency != user.FsConfig.S3Config.UploadConcurrency { + t.Error("s3 upload concurrency mismatch") + } if len(updateUser.Filters.FileExtensions) != 2 { t.Errorf("unexpected extensions filter: %+v", updateUser.Filters.FileExtensions) } diff --git a/httpd/internal_test.go b/httpd/internal_test.go index c7e7b5c2..86fda0b6 100644 --- a/httpd/internal_test.go +++ b/httpd/internal_test.go @@ -369,6 +369,11 @@ func TestCompareUserFsConfig(t *testing.T) { t.Errorf("S3 upload part size does not match") } expected.FsConfig.S3Config.UploadPartSize = 0 + expected.FsConfig.S3Config.UploadConcurrency = 3 + err = compareUserFsConfig(expected, actual) + if err == nil { + t.Errorf("S3 upload concurrency does not match") + } } func TestCompareUserGCSConfig(t *testing.T) { diff --git a/httpd/schema/openapi.yaml b/httpd/schema/openapi.yaml index eae65b42..893cdb3a 100644 --- a/httpd/schema/openapi.yaml +++ b/httpd/schema/openapi.yaml @@ -1009,7 +1009,7 @@ components: description: the buffer size (in MB) to use for multipart uploads. The minimum allowed part size is 5MB, and if this value is set to zero, the default value (5MB) for the AWS SDK will be used. The minimum allowed value is 5. upload_concurrency: type: integer - description: the number of parts to upload in parallel. If this value is set to zero, 2 will be used + description: the number of parts to upload in parallel. If this value is set to zero, the default value (2) will be used key_prefix: type: string description: key_prefix is similar to a chroot directory for a local filesystem. If specified the SFTP user will only see contents that starts with this prefix and so you can restrict access to a specific virtual folder. The prefix, if not empty, must not start with "/" and must end with "/". If empty the whole bucket contents will be available diff --git a/httpd/web.go b/httpd/web.go index beb1e36d..3e4e2a06 100644 --- a/httpd/web.go +++ b/httpd/web.go @@ -333,6 +333,10 @@ func getFsConfigFromUserPostFields(r *http.Request) (dataprovider.Filesystem, er if err != nil { return fs, err } + fs.S3Config.UploadConcurrency, err = strconv.Atoi(r.Form.Get("s3_upload_concurrency")) + if err != nil { + return fs, err + } } else if fs.Provider == 2 { fs.GCSConfig.Bucket = r.Form.Get("gcs_bucket") fs.GCSConfig.StorageClass = r.Form.Get("gcs_storage_class") diff --git a/scripts/README.md b/scripts/README.md index af420750..e4b86dee 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -44,7 +44,7 @@ Let's see a sample usage for each REST API. Command: ``` -python sftpgo_api_cli.py add-user test_username --password "test_pwd" --home-dir="/tmp/test_home_dir" --uid 33 --gid 1000 --max-sessions 2 --quota-size 0 --quota-files 3 --permissions "list" "download" "upload" "delete" "rename" "create_dirs" "overwrite" --subdirs-permissions "/dir1::list,download" "/dir2::*" --upload-bandwidth 100 --download-bandwidth 60 --status 0 --expiration-date 2019-01-01 --allowed-ip "192.168.1.1/32" --fs S3 --s3-bucket test --s3-region eu-west-1 --s3-access-key accesskey --s3-access-secret secret --s3-endpoint "http://127.0.0.1:9000" --s3-storage-class Standard --s3-key-prefix "vfolder/" --s3-upload-part-size 10 --denied-login-methods "password" "keyboard-interactive" --allowed-extensions "/dir1::.jpg,.png" "/dir2::.rar,.png" --denied-extensions "/dir3::.zip,.rar" +python sftpgo_api_cli.py add-user test_username --password "test_pwd" --home-dir="/tmp/test_home_dir" --uid 33 --gid 1000 --max-sessions 2 --quota-size 0 --quota-files 3 --permissions "list" "download" "upload" "delete" "rename" "create_dirs" "overwrite" --subdirs-permissions "/dir1::list,download" "/dir2::*" --upload-bandwidth 100 --download-bandwidth 60 --status 0 --expiration-date 2019-01-01 --allowed-ip "192.168.1.1/32" --fs S3 --s3-bucket test --s3-region eu-west-1 --s3-access-key accesskey --s3-access-secret secret --s3-endpoint "http://127.0.0.1:9000" --s3-storage-class Standard --s3-key-prefix "vfolder/" --s3-upload-part-size 10 --s3-upload-concurrency 4 --denied-login-methods "password" "keyboard-interactive" --allowed-extensions "/dir1::.jpg,.png" "/dir2::.rar,.png" --denied-extensions "/dir3::.zip,.rar" ``` Output: @@ -64,6 +64,7 @@ Output: "key_prefix": "vfolder/", "region": "eu-west-1", "storage_class": "Standard", + "upload_concurrency": 4, "upload_part_size": 10 } }, diff --git a/scripts/sftpgo_api_cli.py b/scripts/sftpgo_api_cli.py index dbdba6f5..e1bfe919 100755 --- a/scripts/sftpgo_api_cli.py +++ b/scripts/sftpgo_api_cli.py @@ -77,7 +77,7 @@ class SFTPGoApiRequests: s3_region='', s3_access_key='', s3_access_secret='', s3_endpoint='', s3_storage_class='', s3_key_prefix='', gcs_bucket='', gcs_key_prefix='', gcs_storage_class='', gcs_credentials_file='', gcs_automatic_credentials='automatic', denied_login_methods=[], virtual_folders=[], - denied_extensions=[], allowed_extensions=[], s3_upload_part_size=0): + denied_extensions=[], allowed_extensions=[], s3_upload_part_size=0, s3_upload_concurrency=0): user = {'id':user_id, 'username':username, 'uid':uid, 'gid':gid, 'max_sessions':max_sessions, 'quota_size':quota_size, 'quota_files':quota_files, 'upload_bandwidth':upload_bandwidth, 'download_bandwidth':download_bandwidth, @@ -101,7 +101,7 @@ class SFTPGoApiRequests: user.update({'filesystem':self.buildFsConfig(fs_provider, s3_bucket, s3_region, s3_access_key, s3_access_secret, s3_endpoint, s3_storage_class, s3_key_prefix, gcs_bucket, gcs_key_prefix, gcs_storage_class, gcs_credentials_file, - gcs_automatic_credentials, s3_upload_part_size)}) + gcs_automatic_credentials, s3_upload_part_size, s3_upload_concurrency)}) return user def buildVirtualFolders(self, vfolders): @@ -204,12 +204,12 @@ class SFTPGoApiRequests: def buildFsConfig(self, fs_provider, s3_bucket, s3_region, s3_access_key, s3_access_secret, s3_endpoint, s3_storage_class, s3_key_prefix, gcs_bucket, gcs_key_prefix, gcs_storage_class, - gcs_credentials_file, gcs_automatic_credentials, s3_upload_part_size): + gcs_credentials_file, gcs_automatic_credentials, s3_upload_part_size, s3_upload_concurrency): fs_config = {'provider':0} if fs_provider == 'S3': s3config = {'bucket':s3_bucket, 'region':s3_region, 'access_key':s3_access_key, 'access_secret': s3_access_secret, 'endpoint':s3_endpoint, 'storage_class':s3_storage_class, 'key_prefix': - s3_key_prefix, 'upload_part_size':s3_upload_part_size} + s3_key_prefix, 'upload_part_size':s3_upload_part_size, 'upload_concurrency':s3_upload_concurrency} fs_config.update({'provider':1, 's3config':s3config}) elif fs_provider == 'GCS': gcsconfig = {'bucket':gcs_bucket, 'key_prefix':gcs_key_prefix, 'storage_class':gcs_storage_class} @@ -239,13 +239,13 @@ class SFTPGoApiRequests: s3_access_key='', s3_access_secret='', s3_endpoint='', s3_storage_class='', s3_key_prefix='', gcs_bucket='', gcs_key_prefix='', gcs_storage_class='', gcs_credentials_file='', gcs_automatic_credentials='automatic', denied_login_methods=[], virtual_folders=[], denied_extensions=[], allowed_extensions=[], - s3_upload_part_size=0): + s3_upload_part_size=0, s3_upload_concurrency=0): u = self.buildUserObject(0, username, password, public_keys, home_dir, uid, gid, max_sessions, quota_size, quota_files, self.buildPermissions(perms, subdirs_permissions), upload_bandwidth, download_bandwidth, status, expiration_date, allowed_ip, denied_ip, fs_provider, s3_bucket, s3_region, s3_access_key, s3_access_secret, s3_endpoint, s3_storage_class, s3_key_prefix, gcs_bucket, gcs_key_prefix, gcs_storage_class, gcs_credentials_file, gcs_automatic_credentials, denied_login_methods, virtual_folders, denied_extensions, - allowed_extensions, s3_upload_part_size) + allowed_extensions, s3_upload_part_size, s3_upload_concurrency) r = requests.post(self.userPath, json=u, auth=self.auth, verify=self.verify) self.printResponse(r) @@ -255,13 +255,13 @@ class SFTPGoApiRequests: s3_bucket='', s3_region='', s3_access_key='', s3_access_secret='', s3_endpoint='', s3_storage_class='', s3_key_prefix='', gcs_bucket='', gcs_key_prefix='', gcs_storage_class='', gcs_credentials_file='', gcs_automatic_credentials='automatic', denied_login_methods=[], virtual_folders=[], denied_extensions=[], - allowed_extensions=[], s3_upload_part_size=0): + allowed_extensions=[], s3_upload_part_size=0, s3_upload_concurrency=0): u = self.buildUserObject(user_id, username, password, public_keys, home_dir, uid, gid, max_sessions, quota_size, quota_files, self.buildPermissions(perms, subdirs_permissions), upload_bandwidth, download_bandwidth, status, expiration_date, allowed_ip, denied_ip, fs_provider, s3_bucket, s3_region, s3_access_key, s3_access_secret, s3_endpoint, s3_storage_class, s3_key_prefix, gcs_bucket, gcs_key_prefix, gcs_storage_class, gcs_credentials_file, gcs_automatic_credentials, denied_login_methods, virtual_folders, denied_extensions, - allowed_extensions, s3_upload_part_size) + allowed_extensions, s3_upload_part_size, s3_upload_concurrency) r = requests.put(urlparse.urljoin(self.userPath, 'user/' + str(user_id)), json=u, auth=self.auth, verify=self.verify) self.printResponse(r) @@ -540,6 +540,8 @@ def addCommonUserArguments(parser): parser.add_argument('--s3-storage-class', type=str, default='', help='Default: %(default)s') parser.add_argument('--s3-upload-part-size', type=int, default=0, help='The buffer size for multipart uploads (MB). ' + 'Zero means the default (5 MB). Minimum is 5. Default: %(default)s') + parser.add_argument('--s3-upload-concurrency', type=int, default=0, help='How many parts are uploaded in parallel. ' + + 'Zero means the default (2). Default: %(default)s') parser.add_argument('--gcs-bucket', type=str, default='', help='Default: %(default)s') parser.add_argument('--gcs-key-prefix', type=str, default='', help='Virtual root directory. If non empty only this ' + 'directory and its contents will be available. Cannot start with "/". For example "folder/subfolder/".' + @@ -660,7 +662,7 @@ if __name__ == '__main__': args.s3_endpoint, args.s3_storage_class, args.s3_key_prefix, args.gcs_bucket, args.gcs_key_prefix, args.gcs_storage_class, args.gcs_credentials_file, args.gcs_automatic_credentials, args.denied_login_methods, args.virtual_folders, args.denied_extensions, args.allowed_extensions, - args.s3_upload_part_size) + args.s3_upload_part_size, args.s3_upload_concurrency) elif args.command == 'update-user': api.updateUser(args.id, args.username, args.password, args.public_keys, args.home_dir, args.uid, args.gid, args.max_sessions, args.quota_size, args.quota_files, args.permissions, args.upload_bandwidth, @@ -669,7 +671,8 @@ if __name__ == '__main__': args.s3_access_key, args.s3_access_secret, args.s3_endpoint, args.s3_storage_class, args.s3_key_prefix, args.gcs_bucket, args.gcs_key_prefix, args.gcs_storage_class, args.gcs_credentials_file, args.gcs_automatic_credentials, args.denied_login_methods, - args.virtual_folders, args.denied_extensions, args.allowed_extensions, args.s3_upload_part_size) + args.virtual_folders, args.denied_extensions, args.allowed_extensions, args.s3_upload_part_size, + args.s3_upload_concurrency) elif args.command == 'delete-user': api.deleteUser(args.id) elif args.command == 'get-users': diff --git a/templates/user.html b/templates/user.html index 6ff7fc46..519e0e89 100644 --- a/templates/user.html +++ b/templates/user.html @@ -310,6 +310,14 @@
+ +
+ + + How many parts are uploaded in parallel. Zero means the default (2) + +
diff --git a/vfs/s3fs.go b/vfs/s3fs.go index 4fe8859c..e511ba03 100644 --- a/vfs/s3fs.go +++ b/vfs/s3fs.go @@ -39,10 +39,13 @@ type S3FsConfig struct { // The buffer size (in MB) to use for multipart uploads. The minimum allowed part size is 5MB, // and if this value is set to zero, the default value (5MB) for the AWS SDK will be used. // The minimum allowed value is 5. - // Please note that if the upload bandwidth between the SFTP client and SFTPGo is greater than the - // upload bandwidth between SFTPGo and S3 then the SFTP client will idle wait for the uploads of - // the last parts, and it could timeout. Keep this in mind if you customize these parameters. + // Please note that if the upload bandwidth between the SFTP client and SFTPGo is greater than + // the upload bandwidth between SFTPGo and S3 then the SFTP client have to wait for the upload + // of the last parts to S3 after it ends the file upload to SFTPGo, and it may time out. + // Keep this in mind if you customize these parameters. UploadPartSize int64 `json:"upload_part_size,omitempty"` + // How many parts are uploaded in parallel + UploadConcurrency int `json:"upload_concurrency,omitempty"` } // S3Fs is a Fs implementation for Amazon S3 compatible object storage. @@ -93,6 +96,9 @@ func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) { } else { fs.config.UploadPartSize *= 1024 * 1024 } + if fs.config.UploadConcurrency == 0 { + fs.config.UploadConcurrency = 2 + } sessOpts := session.Options{ Config: *awsConfig, @@ -213,7 +219,7 @@ func (fs S3Fs) Create(name string, flag int) (*os.File, *pipeat.PipeWriterAt, fu Body: r, StorageClass: utils.NilIfEmpty(fs.config.StorageClass), }, func(u *s3manager.Uploader) { - u.Concurrency = 2 + u.Concurrency = fs.config.UploadConcurrency u.PartSize = fs.config.UploadPartSize }) r.CloseWithError(err) diff --git a/vfs/vfs.go b/vfs/vfs.go index 759216a4..a0f70c58 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -106,6 +106,9 @@ func ValidateS3FsConfig(config *S3FsConfig) error { if config.UploadPartSize != 0 && config.UploadPartSize < 5 { return errors.New("upload_part_size cannot be != 0 and lower than 5 (MB)") } + if config.UploadConcurrency < 0 { + return fmt.Errorf("invalid upload concurrency: %v", config.UploadConcurrency) + } return nil }