s3: allow to configure the chunk download timeout
This commit is contained in:
parent
7344366ce8
commit
e1a2451c22
8 changed files with 66 additions and 16 deletions
|
@ -1590,6 +1590,7 @@ func TestUserS3Config(t *testing.T) {
|
|||
user.FsConfig.S3Config.AccessSecret = kms.NewPlainSecret("Server-Access-Secret")
|
||||
user.FsConfig.S3Config.Endpoint = "http://127.0.0.1:9000"
|
||||
user.FsConfig.S3Config.UploadPartSize = 8
|
||||
user.FsConfig.S3Config.DownloadPartMaxTime = 60
|
||||
folderName := "vfolderName"
|
||||
user.VirtualFolders = append(user.VirtualFolders, vfs.VirtualFolder{
|
||||
BaseVirtualFolder: vfs.BaseVirtualFolder{
|
||||
|
@ -1612,6 +1613,7 @@ func TestUserS3Config(t *testing.T) {
|
|||
assert.NotEmpty(t, user.FsConfig.S3Config.AccessSecret.GetPayload())
|
||||
assert.Empty(t, user.FsConfig.S3Config.AccessSecret.GetAdditionalData())
|
||||
assert.Empty(t, user.FsConfig.S3Config.AccessSecret.GetKey())
|
||||
assert.Equal(t, 60, user.FsConfig.S3Config.DownloadPartMaxTime)
|
||||
if assert.Len(t, user.VirtualFolders, 1) {
|
||||
folder := user.VirtualFolders[0]
|
||||
assert.Equal(t, kms.SecretStatusSecretBox, folder.FsConfig.CryptConfig.Passphrase.GetStatus())
|
||||
|
@ -7268,6 +7270,7 @@ func TestUserTemplateMock(t *testing.T) {
|
|||
form.Add("hooks", "external_auth_disabled")
|
||||
form.Add("hooks", "check_password_disabled")
|
||||
form.Set("disable_fs_checks", "checked")
|
||||
form.Set("s3_download_part_max_time", "0")
|
||||
// test invalid s3_upload_part_size
|
||||
form.Set("s3_upload_part_size", "a")
|
||||
b, contentType, _ := getMultipartFormData(form, "", "")
|
||||
|
@ -7437,6 +7440,7 @@ func TestFolderTemplateMock(t *testing.T) {
|
|||
|
||||
form.Set("s3_upload_part_size", "5")
|
||||
form.Set("s3_upload_concurrency", "4")
|
||||
form.Set("s3_download_part_max_time", "0")
|
||||
b, contentType, _ = getMultipartFormData(form, "", "")
|
||||
req, _ = http.NewRequest(http.MethodPost, webTemplateFolder, &b)
|
||||
setJWTCookieForReq(req, token)
|
||||
|
@ -7514,6 +7518,7 @@ func TestWebUserS3Mock(t *testing.T) {
|
|||
user.FsConfig.S3Config.KeyPrefix = "somedir/subdir/"
|
||||
user.FsConfig.S3Config.UploadPartSize = 5
|
||||
user.FsConfig.S3Config.UploadConcurrency = 4
|
||||
user.FsConfig.S3Config.DownloadPartMaxTime = 60
|
||||
user.Description = "s3 tèst user"
|
||||
form := make(url.Values)
|
||||
form.Set(csrfFormToken, csrfToken)
|
||||
|
@ -7557,8 +7562,17 @@ func TestWebUserS3Mock(t *testing.T) {
|
|||
req.Header.Set("Content-Type", contentType)
|
||||
rr = executeRequest(req)
|
||||
checkResponseCode(t, http.StatusOK, rr)
|
||||
// test invalid s3_concurrency
|
||||
// test invalid download max part time
|
||||
form.Set("s3_upload_part_size", strconv.FormatInt(user.FsConfig.S3Config.UploadPartSize, 10))
|
||||
form.Set("s3_download_part_max_time", "a")
|
||||
b, contentType, _ = getMultipartFormData(form, "", "")
|
||||
req, _ = http.NewRequest(http.MethodPost, path.Join(webUserPath, user.Username), &b)
|
||||
setJWTCookieForReq(req, webToken)
|
||||
req.Header.Set("Content-Type", contentType)
|
||||
rr = executeRequest(req)
|
||||
checkResponseCode(t, http.StatusOK, rr)
|
||||
// test invalid s3_concurrency
|
||||
form.Set("s3_download_part_max_time", strconv.Itoa(user.FsConfig.S3Config.DownloadPartMaxTime))
|
||||
form.Set("s3_upload_concurrency", "a")
|
||||
b, contentType, _ = getMultipartFormData(form, "", "")
|
||||
req, _ = http.NewRequest(http.MethodPost, path.Join(webUserPath, user.Username), &b)
|
||||
|
@ -7590,6 +7604,7 @@ func TestWebUserS3Mock(t *testing.T) {
|
|||
assert.Equal(t, updateUser.FsConfig.S3Config.KeyPrefix, user.FsConfig.S3Config.KeyPrefix)
|
||||
assert.Equal(t, updateUser.FsConfig.S3Config.UploadPartSize, user.FsConfig.S3Config.UploadPartSize)
|
||||
assert.Equal(t, updateUser.FsConfig.S3Config.UploadConcurrency, user.FsConfig.S3Config.UploadConcurrency)
|
||||
assert.Equal(t, updateUser.FsConfig.S3Config.DownloadPartMaxTime, user.FsConfig.S3Config.DownloadPartMaxTime)
|
||||
assert.Equal(t, 2, len(updateUser.Filters.FilePatterns))
|
||||
assert.Equal(t, kms.SecretStatusSecretBox, updateUser.FsConfig.S3Config.AccessSecret.GetStatus())
|
||||
assert.NotEmpty(t, updateUser.FsConfig.S3Config.AccessSecret.GetPayload())
|
||||
|
@ -8233,6 +8248,7 @@ func TestS3WebFolderMock(t *testing.T) {
|
|||
S3KeyPrefix := "somedir/subdir/"
|
||||
S3UploadPartSize := 5
|
||||
S3UploadConcurrency := 4
|
||||
S3MaxPartDownloadTime := 120
|
||||
form := make(url.Values)
|
||||
form.Set("mapped_path", mappedPath)
|
||||
form.Set("name", folderName)
|
||||
|
@ -8246,6 +8262,7 @@ func TestS3WebFolderMock(t *testing.T) {
|
|||
form.Set("s3_endpoint", S3Endpoint)
|
||||
form.Set("s3_key_prefix", S3KeyPrefix)
|
||||
form.Set("s3_upload_part_size", strconv.Itoa(S3UploadPartSize))
|
||||
form.Set("s3_download_part_max_time", strconv.Itoa(S3MaxPartDownloadTime))
|
||||
form.Set("s3_upload_concurrency", "a")
|
||||
form.Set(csrfFormToken, csrfToken)
|
||||
b, contentType, err := getMultipartFormData(form, "", "")
|
||||
|
@ -8287,6 +8304,7 @@ func TestS3WebFolderMock(t *testing.T) {
|
|||
assert.Equal(t, S3KeyPrefix, folder.FsConfig.S3Config.KeyPrefix)
|
||||
assert.Equal(t, S3UploadConcurrency, folder.FsConfig.S3Config.UploadConcurrency)
|
||||
assert.Equal(t, int64(S3UploadPartSize), folder.FsConfig.S3Config.UploadPartSize)
|
||||
assert.Equal(t, S3MaxPartDownloadTime, folder.FsConfig.S3Config.DownloadPartMaxTime)
|
||||
// update
|
||||
S3UploadConcurrency = 10
|
||||
form.Set("s3_upload_concurrency", "b")
|
||||
|
|
|
@ -2154,6 +2154,9 @@ components:
|
|||
upload_concurrency:
|
||||
type: integer
|
||||
description: 'the number of parts to upload in parallel. If this value is set to zero, the default value (2) will be used'
|
||||
download_part_max_time:
|
||||
type: integer
|
||||
description: 'the maximum time allowed, in seconds, to download a single chunk (5MB). 0 means no timeout. Ignored for non-multipart downloads.'
|
||||
key_prefix:
|
||||
type: string
|
||||
description: 'key_prefix is similar to a chroot directory for a local filesystem. If specified the user will only see contents that starts with this prefix and so you can restrict access to a specific virtual folder. The prefix, if not empty, must not start with "/" and must end with "/". If empty the whole bucket contents will be available'
|
||||
|
|
|
@ -686,6 +686,10 @@ func getS3Config(r *http.Request) (vfs.S3FsConfig, error) {
|
|||
if err != nil {
|
||||
return config, err
|
||||
}
|
||||
config.DownloadPartMaxTime, err = strconv.Atoi(r.Form.Get("s3_download_part_max_time"))
|
||||
if err != nil {
|
||||
return config, err
|
||||
}
|
||||
config.UploadConcurrency, err = strconv.Atoi(r.Form.Get("s3_upload_concurrency"))
|
||||
return config, err
|
||||
}
|
||||
|
|
|
@ -1057,6 +1057,9 @@ func compareS3Config(expected *vfs.Filesystem, actual *vfs.Filesystem) error {
|
|||
if expected.S3Config.UploadPartSize != actual.S3Config.UploadPartSize {
|
||||
return errors.New("fs S3 upload part size mismatch")
|
||||
}
|
||||
if expected.S3Config.DownloadPartMaxTime != actual.S3Config.DownloadPartMaxTime {
|
||||
return errors.New("fs S3 download part max time mismatch")
|
||||
}
|
||||
if expected.S3Config.UploadConcurrency != actual.S3Config.UploadConcurrency {
|
||||
return errors.New("fs S3 upload concurrency mismatch")
|
||||
}
|
||||
|
|
|
@ -109,6 +109,9 @@ type S3FsConfig struct {
|
|||
UploadPartSize int64 `json:"upload_part_size,omitempty"`
|
||||
// How many parts are uploaded in parallel
|
||||
UploadConcurrency int `json:"upload_concurrency,omitempty"`
|
||||
// DownloadPartMaxTime defines the maximum time allowed, in seconds, to download a single chunk (5MB).
|
||||
// 0 means no timeout. Ignored for non-multipart downloads.
|
||||
DownloadPartMaxTime int `json:"download_part_max_time,omitempty"`
|
||||
}
|
||||
|
||||
// GCSFsConfig defines the configuration for Google Cloud Storage based filesystem
|
||||
|
|
|
@ -78,8 +78,18 @@
|
|||
</div>
|
||||
|
||||
<div class="form-group row fsconfig fsconfig-s3fs">
|
||||
<label for="idS3DownloadTimeout" class="col-sm-2 col-form-label">DL Part Timeout (secs)</label>
|
||||
<div class="col-sm-3">
|
||||
<input type="number" class="form-control" id="idS3DownloadTimeout" name="s3_download_part_max_time"
|
||||
placeholder="" value="{{.S3Config.DownloadPartMaxTime}}" min="0"
|
||||
aria-describedby="S3DownloadTimeoutHelpBlock">
|
||||
<small id="S3DownloadTimeoutHelpBlock" class="form-text text-muted">
|
||||
Max time limit, in seconds, to download a single part (5MB). 0 means no limit
|
||||
</small>
|
||||
</div>
|
||||
<div class="col-sm-2"></div>
|
||||
<label for="idS3KeyPrefix" class="col-sm-2 col-form-label">Key Prefix</label>
|
||||
<div class="col-sm-10">
|
||||
<div class="col-sm-3">
|
||||
<input type="text" class="form-control" id="idS3KeyPrefix" name="s3_key_prefix" placeholder=""
|
||||
value="{{.S3Config.KeyPrefix}}" maxlength="255" aria-describedby="S3KeyPrefixHelpBlock">
|
||||
<small id="S3KeyPrefixHelpBlock" class="form-text text-muted">
|
||||
|
|
|
@ -221,22 +221,23 @@ func (f *Filesystem) HideConfidentialData() {
|
|||
}
|
||||
}
|
||||
|
||||
// GetACopy returns a copy
|
||||
// GetACopy returns a filesystem copy
|
||||
func (f *Filesystem) GetACopy() Filesystem {
|
||||
f.SetEmptySecretsIfNil()
|
||||
fs := Filesystem{
|
||||
Provider: f.Provider,
|
||||
S3Config: S3FsConfig{
|
||||
S3FsConfig: sdk.S3FsConfig{
|
||||
Bucket: f.S3Config.Bucket,
|
||||
Region: f.S3Config.Region,
|
||||
AccessKey: f.S3Config.AccessKey,
|
||||
AccessSecret: f.S3Config.AccessSecret.Clone(),
|
||||
Endpoint: f.S3Config.Endpoint,
|
||||
StorageClass: f.S3Config.StorageClass,
|
||||
KeyPrefix: f.S3Config.KeyPrefix,
|
||||
UploadPartSize: f.S3Config.UploadPartSize,
|
||||
UploadConcurrency: f.S3Config.UploadConcurrency,
|
||||
Bucket: f.S3Config.Bucket,
|
||||
Region: f.S3Config.Region,
|
||||
AccessKey: f.S3Config.AccessKey,
|
||||
AccessSecret: f.S3Config.AccessSecret.Clone(),
|
||||
Endpoint: f.S3Config.Endpoint,
|
||||
StorageClass: f.S3Config.StorageClass,
|
||||
KeyPrefix: f.S3Config.KeyPrefix,
|
||||
UploadPartSize: f.S3Config.UploadPartSize,
|
||||
UploadConcurrency: f.S3Config.UploadConcurrency,
|
||||
DownloadPartMaxTime: f.S3Config.DownloadPartMaxTime,
|
||||
},
|
||||
},
|
||||
GCSConfig: GCSFsConfig{
|
||||
|
|
16
vfs/s3fs.go
16
vfs/s3fs.go
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
|
@ -178,10 +179,17 @@ func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fun
|
|||
}
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
downloader := s3manager.NewDownloaderWithClient(fs.svc)
|
||||
/*downloader.RequestOptions = append(downloader.RequestOptions, func(r *request.Request) {
|
||||
newCtx, _ := context.WithTimeout(r.Context(), time.Minute)
|
||||
r.SetContext(newCtx)
|
||||
})*/
|
||||
if offset == 0 && fs.config.DownloadPartMaxTime > 0 {
|
||||
downloader.RequestOptions = append(downloader.RequestOptions, func(r *request.Request) {
|
||||
chunkCtx, cancel := context.WithTimeout(r.Context(), time.Duration(fs.config.DownloadPartMaxTime)*time.Second)
|
||||
r.SetContext(chunkCtx)
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
cancel()
|
||||
}()
|
||||
})
|
||||
}
|
||||
var streamRange *string
|
||||
if offset > 0 {
|
||||
streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
|
||||
|
|
Loading…
Reference in a new issue