s3: upload concurrency is now configurable

Please note that if the upload bandwidth between the SFTP client and
SFTPGo is greater than the upload bandwidth between SFTPGo and S3 then
the SFTP client have to wait for the upload of the last parts to S3
after it ends the file upload to SFTPGo, and it may time out.
Keep this in mind if you customize parts size and upload concurrency
This commit is contained in:
Nicola Murino 2020-03-13 19:13:58 +01:00
parent de3e69f846
commit 1770da545d
13 changed files with 87 additions and 28 deletions

View file

@ -530,13 +530,15 @@ func (u *User) getACopy() User {
fsConfig := Filesystem{
Provider: u.FsConfig.Provider,
S3Config: vfs.S3FsConfig{
Bucket: u.FsConfig.S3Config.Bucket,
Region: u.FsConfig.S3Config.Region,
AccessKey: u.FsConfig.S3Config.AccessKey,
AccessSecret: u.FsConfig.S3Config.AccessSecret,
Endpoint: u.FsConfig.S3Config.Endpoint,
StorageClass: u.FsConfig.S3Config.StorageClass,
KeyPrefix: u.FsConfig.S3Config.KeyPrefix,
Bucket: u.FsConfig.S3Config.Bucket,
Region: u.FsConfig.S3Config.Region,
AccessKey: u.FsConfig.S3Config.AccessKey,
AccessSecret: u.FsConfig.S3Config.AccessSecret,
Endpoint: u.FsConfig.S3Config.Endpoint,
StorageClass: u.FsConfig.S3Config.StorageClass,
KeyPrefix: u.FsConfig.S3Config.KeyPrefix,
UploadPartSize: u.FsConfig.S3Config.UploadPartSize,
UploadConcurrency: u.FsConfig.S3Config.UploadConcurrency,
},
GCSConfig: vfs.GCSFsConfig{
Bucket: u.FsConfig.GCSConfig.Bucket,

View file

@ -47,6 +47,7 @@ For each account, the following properties can be configured:
- `s3_storage_class`, leave blank to use the default or specify a valid AWS [storage class](https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html)
- `s3_key_prefix`, allows to restrict access to the virtual folder identified by this prefix and its contents
- `s3_upload_part_size`, the buffer size for multipart uploads (MB). Zero means the default (5 MB). Minimum is 5
- `s3_upload_concurrency` how many parts are uploaded in parallel
- `gcs_bucket`, required for GCS filesystem
- `gcs_credentials`, Google Cloud Storage JSON credentials base64 encoded
- `gcs_automatic_credentials`, integer. Set to 1 to use Application Default Credentials strategy or set to 0 to use explicit credentials via `gcs_credentials`

View file

@ -13,6 +13,8 @@ Specifying a different `key_prefix`, you can assign different virtual folders of
SFTPGo uses multipart uploads and parallel downloads for storing and retrieving files from S3.
For multipart uploads you can customize the parts size and the upload concurrency. Please note that if the upload bandwidth between the SFTP client and SFTPGo is greater than the upload bandwidth between SFTPGo and S3 then the SFTP client have to wait for the upload of the last parts to S3 after it ends the file upload to SFTPGo, and it may time out. Keep this in mind if you customize these parameters.
The configured bucket must exist.
Some SFTP commands don't work over S3:

View file

@ -482,7 +482,10 @@ func compareS3Config(expected *dataprovider.User, actual *dataprovider.User) err
return errors.New("S3 storage class mismatch")
}
if expected.FsConfig.S3Config.UploadPartSize != actual.FsConfig.S3Config.UploadPartSize {
return errors.New("S3 upload part size class mismatch")
return errors.New("S3 upload part size mismatch")
}
if expected.FsConfig.S3Config.UploadConcurrency != actual.FsConfig.S3Config.UploadConcurrency {
return errors.New("S3 upload concurrency mismatch")
}
if expected.FsConfig.S3Config.KeyPrefix != actual.FsConfig.S3Config.KeyPrefix &&
expected.FsConfig.S3Config.KeyPrefix+"/" != actual.FsConfig.S3Config.KeyPrefix {

View file

@ -400,6 +400,12 @@ func TestAddUserInvalidFsConfig(t *testing.T) {
if err != nil {
t.Errorf("unexpected error adding user with invalid fs config: %v", err)
}
u.FsConfig.S3Config.UploadPartSize = 0
u.FsConfig.S3Config.UploadConcurrency = -1
_, _, err = httpd.AddUser(u, http.StatusBadRequest)
if err != nil {
t.Errorf("unexpected error adding user with invalid fs config: %v", err)
}
u = getTestUser()
u.FsConfig.Provider = 2
u.FsConfig.GCSConfig.Bucket = ""
@ -646,6 +652,7 @@ func TestUserS3Config(t *testing.T) {
user.FsConfig.S3Config.AccessKey = "Server-Access-Key"
user.FsConfig.S3Config.AccessSecret = "Server-Access-Secret"
user.FsConfig.S3Config.Endpoint = "http://127.0.0.1:9000"
user.FsConfig.S3Config.UploadPartSize = 8
user, _, err = httpd.UpdateUser(user, http.StatusOK)
if err != nil {
t.Errorf("unable to update user: %v", err)
@ -668,6 +675,7 @@ func TestUserS3Config(t *testing.T) {
user.FsConfig.S3Config.AccessKey = "Server-Access-Key1"
user.FsConfig.S3Config.Endpoint = "http://localhost:9000"
user.FsConfig.S3Config.KeyPrefix = "somedir/subdir"
user.FsConfig.S3Config.UploadConcurrency = 5
user, _, err = httpd.UpdateUser(user, http.StatusOK)
if err != nil {
t.Errorf("unable to update user: %v", err)
@ -679,6 +687,8 @@ func TestUserS3Config(t *testing.T) {
user.FsConfig.S3Config.AccessSecret = ""
user.FsConfig.S3Config.Endpoint = ""
user.FsConfig.S3Config.KeyPrefix = ""
user.FsConfig.S3Config.UploadPartSize = 0
user.FsConfig.S3Config.UploadConcurrency = 0
user, _, err = httpd.UpdateUser(user, http.StatusOK)
if err != nil {
t.Errorf("unable to update user: %v", err)
@ -691,6 +701,8 @@ func TestUserS3Config(t *testing.T) {
user.FsConfig.S3Config.AccessSecret = ""
user.FsConfig.S3Config.Endpoint = ""
user.FsConfig.S3Config.KeyPrefix = "somedir/subdir"
user.FsConfig.S3Config.UploadPartSize = 6
user.FsConfig.S3Config.UploadConcurrency = 4
user, _, err = httpd.UpdateUser(user, http.StatusOK)
if err != nil {
t.Errorf("unable to update user: %v", err)
@ -2017,6 +2029,7 @@ func TestWebUserS3Mock(t *testing.T) {
user.FsConfig.S3Config.StorageClass = "Standard"
user.FsConfig.S3Config.KeyPrefix = "somedir/subdir/"
user.FsConfig.S3Config.UploadPartSize = 5
user.FsConfig.S3Config.UploadConcurrency = 4
form := make(url.Values)
form.Set("username", user.Username)
form.Set("home_dir", user.HomeDir)
@ -2050,8 +2063,16 @@ func TestWebUserS3Mock(t *testing.T) {
req.Header.Set("Content-Type", contentType)
rr = executeRequest(req)
checkResponseCode(t, http.StatusOK, rr.Code)
// now add the user
// test invalid s3_concurrency
form.Set("s3_upload_part_size", strconv.FormatInt(user.FsConfig.S3Config.UploadPartSize, 10))
form.Set("s3_upload_concurrency", "a")
b, contentType, _ = getMultipartFormData(form, "", "")
req, _ = http.NewRequest(http.MethodPost, webUserPath+"/"+strconv.FormatInt(user.ID, 10), &b)
req.Header.Set("Content-Type", contentType)
rr = executeRequest(req)
checkResponseCode(t, http.StatusOK, rr.Code)
// now add the user
form.Set("s3_upload_concurrency", strconv.Itoa(user.FsConfig.S3Config.UploadConcurrency))
b, contentType, _ = getMultipartFormData(form, "", "")
req, _ = http.NewRequest(http.MethodPost, webUserPath+"/"+strconv.FormatInt(user.ID, 10), &b)
req.Header.Set("Content-Type", contentType)
@ -2072,9 +2093,6 @@ func TestWebUserS3Mock(t *testing.T) {
if updateUser.ExpirationDate != 1577836800000 {
t.Errorf("invalid expiration date: %v", updateUser.ExpirationDate)
}
if updateUser.FsConfig.Provider != user.FsConfig.Provider {
t.Error("fs provider mismatch")
}
if updateUser.FsConfig.S3Config.Bucket != user.FsConfig.S3Config.Bucket {
t.Error("s3 bucket mismatch")
}
@ -2099,6 +2117,9 @@ func TestWebUserS3Mock(t *testing.T) {
if updateUser.FsConfig.S3Config.UploadPartSize != user.FsConfig.S3Config.UploadPartSize {
t.Error("s3 upload part size mismatch")
}
if updateUser.FsConfig.S3Config.UploadConcurrency != user.FsConfig.S3Config.UploadConcurrency {
t.Error("s3 upload concurrency mismatch")
}
if len(updateUser.Filters.FileExtensions) != 2 {
t.Errorf("unexpected extensions filter: %+v", updateUser.Filters.FileExtensions)
}

View file

@ -369,6 +369,11 @@ func TestCompareUserFsConfig(t *testing.T) {
t.Errorf("S3 upload part size does not match")
}
expected.FsConfig.S3Config.UploadPartSize = 0
expected.FsConfig.S3Config.UploadConcurrency = 3
err = compareUserFsConfig(expected, actual)
if err == nil {
t.Errorf("S3 upload concurrency does not match")
}
}
func TestCompareUserGCSConfig(t *testing.T) {

View file

@ -1009,7 +1009,7 @@ components:
description: the buffer size (in MB) to use for multipart uploads. The minimum allowed part size is 5MB, and if this value is set to zero, the default value (5MB) for the AWS SDK will be used. The minimum allowed value is 5.
upload_concurrency:
type: integer
description: the number of parts to upload in parallel. If this value is set to zero, 2 will be used
description: the number of parts to upload in parallel. If this value is set to zero, the default value (2) will be used
key_prefix:
type: string
description: key_prefix is similar to a chroot directory for a local filesystem. If specified the SFTP user will only see contents that starts with this prefix and so you can restrict access to a specific virtual folder. The prefix, if not empty, must not start with "/" and must end with "/". If empty the whole bucket contents will be available

View file

@ -333,6 +333,10 @@ func getFsConfigFromUserPostFields(r *http.Request) (dataprovider.Filesystem, er
if err != nil {
return fs, err
}
fs.S3Config.UploadConcurrency, err = strconv.Atoi(r.Form.Get("s3_upload_concurrency"))
if err != nil {
return fs, err
}
} else if fs.Provider == 2 {
fs.GCSConfig.Bucket = r.Form.Get("gcs_bucket")
fs.GCSConfig.StorageClass = r.Form.Get("gcs_storage_class")

View file

@ -44,7 +44,7 @@ Let's see a sample usage for each REST API.
Command:
```
python sftpgo_api_cli.py add-user test_username --password "test_pwd" --home-dir="/tmp/test_home_dir" --uid 33 --gid 1000 --max-sessions 2 --quota-size 0 --quota-files 3 --permissions "list" "download" "upload" "delete" "rename" "create_dirs" "overwrite" --subdirs-permissions "/dir1::list,download" "/dir2::*" --upload-bandwidth 100 --download-bandwidth 60 --status 0 --expiration-date 2019-01-01 --allowed-ip "192.168.1.1/32" --fs S3 --s3-bucket test --s3-region eu-west-1 --s3-access-key accesskey --s3-access-secret secret --s3-endpoint "http://127.0.0.1:9000" --s3-storage-class Standard --s3-key-prefix "vfolder/" --s3-upload-part-size 10 --denied-login-methods "password" "keyboard-interactive" --allowed-extensions "/dir1::.jpg,.png" "/dir2::.rar,.png" --denied-extensions "/dir3::.zip,.rar"
python sftpgo_api_cli.py add-user test_username --password "test_pwd" --home-dir="/tmp/test_home_dir" --uid 33 --gid 1000 --max-sessions 2 --quota-size 0 --quota-files 3 --permissions "list" "download" "upload" "delete" "rename" "create_dirs" "overwrite" --subdirs-permissions "/dir1::list,download" "/dir2::*" --upload-bandwidth 100 --download-bandwidth 60 --status 0 --expiration-date 2019-01-01 --allowed-ip "192.168.1.1/32" --fs S3 --s3-bucket test --s3-region eu-west-1 --s3-access-key accesskey --s3-access-secret secret --s3-endpoint "http://127.0.0.1:9000" --s3-storage-class Standard --s3-key-prefix "vfolder/" --s3-upload-part-size 10 --s3-upload-concurrency 4 --denied-login-methods "password" "keyboard-interactive" --allowed-extensions "/dir1::.jpg,.png" "/dir2::.rar,.png" --denied-extensions "/dir3::.zip,.rar"
```
Output:
@ -64,6 +64,7 @@ Output:
"key_prefix": "vfolder/",
"region": "eu-west-1",
"storage_class": "Standard",
"upload_concurrency": 4,
"upload_part_size": 10
}
},

View file

@ -77,7 +77,7 @@ class SFTPGoApiRequests:
s3_region='', s3_access_key='', s3_access_secret='', s3_endpoint='', s3_storage_class='',
s3_key_prefix='', gcs_bucket='', gcs_key_prefix='', gcs_storage_class='', gcs_credentials_file='',
gcs_automatic_credentials='automatic', denied_login_methods=[], virtual_folders=[],
denied_extensions=[], allowed_extensions=[], s3_upload_part_size=0):
denied_extensions=[], allowed_extensions=[], s3_upload_part_size=0, s3_upload_concurrency=0):
user = {'id':user_id, 'username':username, 'uid':uid, 'gid':gid,
'max_sessions':max_sessions, 'quota_size':quota_size, 'quota_files':quota_files,
'upload_bandwidth':upload_bandwidth, 'download_bandwidth':download_bandwidth,
@ -101,7 +101,7 @@ class SFTPGoApiRequests:
user.update({'filesystem':self.buildFsConfig(fs_provider, s3_bucket, s3_region, s3_access_key, s3_access_secret,
s3_endpoint, s3_storage_class, s3_key_prefix, gcs_bucket,
gcs_key_prefix, gcs_storage_class, gcs_credentials_file,
gcs_automatic_credentials, s3_upload_part_size)})
gcs_automatic_credentials, s3_upload_part_size, s3_upload_concurrency)})
return user
def buildVirtualFolders(self, vfolders):
@ -204,12 +204,12 @@ class SFTPGoApiRequests:
def buildFsConfig(self, fs_provider, s3_bucket, s3_region, s3_access_key, s3_access_secret, s3_endpoint,
s3_storage_class, s3_key_prefix, gcs_bucket, gcs_key_prefix, gcs_storage_class,
gcs_credentials_file, gcs_automatic_credentials, s3_upload_part_size):
gcs_credentials_file, gcs_automatic_credentials, s3_upload_part_size, s3_upload_concurrency):
fs_config = {'provider':0}
if fs_provider == 'S3':
s3config = {'bucket':s3_bucket, 'region':s3_region, 'access_key':s3_access_key, 'access_secret':
s3_access_secret, 'endpoint':s3_endpoint, 'storage_class':s3_storage_class, 'key_prefix':
s3_key_prefix, 'upload_part_size':s3_upload_part_size}
s3_key_prefix, 'upload_part_size':s3_upload_part_size, 'upload_concurrency':s3_upload_concurrency}
fs_config.update({'provider':1, 's3config':s3config})
elif fs_provider == 'GCS':
gcsconfig = {'bucket':gcs_bucket, 'key_prefix':gcs_key_prefix, 'storage_class':gcs_storage_class}
@ -239,13 +239,13 @@ class SFTPGoApiRequests:
s3_access_key='', s3_access_secret='', s3_endpoint='', s3_storage_class='', s3_key_prefix='', gcs_bucket='',
gcs_key_prefix='', gcs_storage_class='', gcs_credentials_file='', gcs_automatic_credentials='automatic',
denied_login_methods=[], virtual_folders=[], denied_extensions=[], allowed_extensions=[],
s3_upload_part_size=0):
s3_upload_part_size=0, s3_upload_concurrency=0):
u = self.buildUserObject(0, username, password, public_keys, home_dir, uid, gid, max_sessions,
quota_size, quota_files, self.buildPermissions(perms, subdirs_permissions), upload_bandwidth, download_bandwidth,
status, expiration_date, allowed_ip, denied_ip, fs_provider, s3_bucket, s3_region, s3_access_key,
s3_access_secret, s3_endpoint, s3_storage_class, s3_key_prefix, gcs_bucket, gcs_key_prefix, gcs_storage_class,
gcs_credentials_file, gcs_automatic_credentials, denied_login_methods, virtual_folders, denied_extensions,
allowed_extensions, s3_upload_part_size)
allowed_extensions, s3_upload_part_size, s3_upload_concurrency)
r = requests.post(self.userPath, json=u, auth=self.auth, verify=self.verify)
self.printResponse(r)
@ -255,13 +255,13 @@ class SFTPGoApiRequests:
s3_bucket='', s3_region='', s3_access_key='', s3_access_secret='', s3_endpoint='', s3_storage_class='',
s3_key_prefix='', gcs_bucket='', gcs_key_prefix='', gcs_storage_class='', gcs_credentials_file='',
gcs_automatic_credentials='automatic', denied_login_methods=[], virtual_folders=[], denied_extensions=[],
allowed_extensions=[], s3_upload_part_size=0):
allowed_extensions=[], s3_upload_part_size=0, s3_upload_concurrency=0):
u = self.buildUserObject(user_id, username, password, public_keys, home_dir, uid, gid, max_sessions,
quota_size, quota_files, self.buildPermissions(perms, subdirs_permissions), upload_bandwidth, download_bandwidth,
status, expiration_date, allowed_ip, denied_ip, fs_provider, s3_bucket, s3_region, s3_access_key,
s3_access_secret, s3_endpoint, s3_storage_class, s3_key_prefix, gcs_bucket, gcs_key_prefix, gcs_storage_class,
gcs_credentials_file, gcs_automatic_credentials, denied_login_methods, virtual_folders, denied_extensions,
allowed_extensions, s3_upload_part_size)
allowed_extensions, s3_upload_part_size, s3_upload_concurrency)
r = requests.put(urlparse.urljoin(self.userPath, 'user/' + str(user_id)), json=u, auth=self.auth, verify=self.verify)
self.printResponse(r)
@ -540,6 +540,8 @@ def addCommonUserArguments(parser):
parser.add_argument('--s3-storage-class', type=str, default='', help='Default: %(default)s')
parser.add_argument('--s3-upload-part-size', type=int, default=0, help='The buffer size for multipart uploads (MB). ' +
'Zero means the default (5 MB). Minimum is 5. Default: %(default)s')
parser.add_argument('--s3-upload-concurrency', type=int, default=0, help='How many parts are uploaded in parallel. ' +
'Zero means the default (2). Default: %(default)s')
parser.add_argument('--gcs-bucket', type=str, default='', help='Default: %(default)s')
parser.add_argument('--gcs-key-prefix', type=str, default='', help='Virtual root directory. If non empty only this ' +
'directory and its contents will be available. Cannot start with "/". For example "folder/subfolder/".' +
@ -660,7 +662,7 @@ if __name__ == '__main__':
args.s3_endpoint, args.s3_storage_class, args.s3_key_prefix, args.gcs_bucket, args.gcs_key_prefix,
args.gcs_storage_class, args.gcs_credentials_file, args.gcs_automatic_credentials,
args.denied_login_methods, args.virtual_folders, args.denied_extensions, args.allowed_extensions,
args.s3_upload_part_size)
args.s3_upload_part_size, args.s3_upload_concurrency)
elif args.command == 'update-user':
api.updateUser(args.id, args.username, args.password, args.public_keys, args.home_dir, args.uid, args.gid,
args.max_sessions, args.quota_size, args.quota_files, args.permissions, args.upload_bandwidth,
@ -669,7 +671,8 @@ if __name__ == '__main__':
args.s3_access_key, args.s3_access_secret, args.s3_endpoint, args.s3_storage_class,
args.s3_key_prefix, args.gcs_bucket, args.gcs_key_prefix, args.gcs_storage_class,
args.gcs_credentials_file, args.gcs_automatic_credentials, args.denied_login_methods,
args.virtual_folders, args.denied_extensions, args.allowed_extensions, args.s3_upload_part_size)
args.virtual_folders, args.denied_extensions, args.allowed_extensions, args.s3_upload_part_size,
args.s3_upload_concurrency)
elif args.command == 'delete-user':
api.deleteUser(args.id)
elif args.command == 'get-users':

View file

@ -310,6 +310,14 @@
</small>
</div>
<div class="col-sm-2"></div>
<label for="idS3UploadConcurrency" class="col-sm-2 col-form-label">UL Concurrency</label>
<div class="col-sm-3">
<input type="number" class="form-control" id="idS3UploadConcurrency" name="s3_upload_concurrency" placeholder=""
value="{{.User.FsConfig.S3Config.UploadConcurrency}}" min="0" aria-describedby="S3ConcurrencyHelpBlock">
<small id="S3ConcurrencyHelpBlock" class="form-text text-muted">
How many parts are uploaded in parallel. Zero means the default (2)
</small>
</div>
</div>
<div class="form-group row s3">

View file

@ -39,10 +39,13 @@ type S3FsConfig struct {
// The buffer size (in MB) to use for multipart uploads. The minimum allowed part size is 5MB,
// and if this value is set to zero, the default value (5MB) for the AWS SDK will be used.
// The minimum allowed value is 5.
// Please note that if the upload bandwidth between the SFTP client and SFTPGo is greater than the
// upload bandwidth between SFTPGo and S3 then the SFTP client will idle wait for the uploads of
// the last parts, and it could timeout. Keep this in mind if you customize these parameters.
// Please note that if the upload bandwidth between the SFTP client and SFTPGo is greater than
// the upload bandwidth between SFTPGo and S3 then the SFTP client have to wait for the upload
// of the last parts to S3 after it ends the file upload to SFTPGo, and it may time out.
// Keep this in mind if you customize these parameters.
UploadPartSize int64 `json:"upload_part_size,omitempty"`
// How many parts are uploaded in parallel
UploadConcurrency int `json:"upload_concurrency,omitempty"`
}
// S3Fs is a Fs implementation for Amazon S3 compatible object storage.
@ -93,6 +96,9 @@ func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) {
} else {
fs.config.UploadPartSize *= 1024 * 1024
}
if fs.config.UploadConcurrency == 0 {
fs.config.UploadConcurrency = 2
}
sessOpts := session.Options{
Config: *awsConfig,
@ -213,7 +219,7 @@ func (fs S3Fs) Create(name string, flag int) (*os.File, *pipeat.PipeWriterAt, fu
Body: r,
StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
}, func(u *s3manager.Uploader) {
u.Concurrency = 2
u.Concurrency = fs.config.UploadConcurrency
u.PartSize = fs.config.UploadPartSize
})
r.CloseWithError(err)

View file

@ -106,6 +106,9 @@ func ValidateS3FsConfig(config *S3FsConfig) error {
if config.UploadPartSize != 0 && config.UploadPartSize < 5 {
return errors.New("upload_part_size cannot be != 0 and lower than 5 (MB)")
}
if config.UploadConcurrency < 0 {
return fmt.Errorf("invalid upload concurrency: %v", config.UploadConcurrency)
}
return nil
}