From e1a2451c22f3adb51b752cc6ac29e181651516b5 Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Sun, 11 Jul 2021 18:39:45 +0200 Subject: [PATCH] s3: allow to configure the chunk download timeout --- httpd/httpd_test.go | 20 +++++++++++++++++++- httpd/schema/openapi.yaml | 3 +++ httpd/webadmin.go | 4 ++++ httpdtest/httpdtest.go | 3 +++ sdk/filesystem.go | 3 +++ templates/webadmin/fsconfig.html | 12 +++++++++++- vfs/filesystem.go | 21 +++++++++++---------- vfs/s3fs.go | 16 ++++++++++++---- 8 files changed, 66 insertions(+), 16 deletions(-) diff --git a/httpd/httpd_test.go b/httpd/httpd_test.go index 4248aa49..66423886 100644 --- a/httpd/httpd_test.go +++ b/httpd/httpd_test.go @@ -1590,6 +1590,7 @@ func TestUserS3Config(t *testing.T) { user.FsConfig.S3Config.AccessSecret = kms.NewPlainSecret("Server-Access-Secret") user.FsConfig.S3Config.Endpoint = "http://127.0.0.1:9000" user.FsConfig.S3Config.UploadPartSize = 8 + user.FsConfig.S3Config.DownloadPartMaxTime = 60 folderName := "vfolderName" user.VirtualFolders = append(user.VirtualFolders, vfs.VirtualFolder{ BaseVirtualFolder: vfs.BaseVirtualFolder{ @@ -1612,6 +1613,7 @@ func TestUserS3Config(t *testing.T) { assert.NotEmpty(t, user.FsConfig.S3Config.AccessSecret.GetPayload()) assert.Empty(t, user.FsConfig.S3Config.AccessSecret.GetAdditionalData()) assert.Empty(t, user.FsConfig.S3Config.AccessSecret.GetKey()) + assert.Equal(t, 60, user.FsConfig.S3Config.DownloadPartMaxTime) if assert.Len(t, user.VirtualFolders, 1) { folder := user.VirtualFolders[0] assert.Equal(t, kms.SecretStatusSecretBox, folder.FsConfig.CryptConfig.Passphrase.GetStatus()) @@ -7268,6 +7270,7 @@ func TestUserTemplateMock(t *testing.T) { form.Add("hooks", "external_auth_disabled") form.Add("hooks", "check_password_disabled") form.Set("disable_fs_checks", "checked") + form.Set("s3_download_part_max_time", "0") // test invalid s3_upload_part_size form.Set("s3_upload_part_size", "a") b, contentType, _ := getMultipartFormData(form, "", "") @@ -7437,6 +7440,7 @@ func TestFolderTemplateMock(t *testing.T) { form.Set("s3_upload_part_size", "5") form.Set("s3_upload_concurrency", "4") + form.Set("s3_download_part_max_time", "0") b, contentType, _ = getMultipartFormData(form, "", "") req, _ = http.NewRequest(http.MethodPost, webTemplateFolder, &b) setJWTCookieForReq(req, token) @@ -7514,6 +7518,7 @@ func TestWebUserS3Mock(t *testing.T) { user.FsConfig.S3Config.KeyPrefix = "somedir/subdir/" user.FsConfig.S3Config.UploadPartSize = 5 user.FsConfig.S3Config.UploadConcurrency = 4 + user.FsConfig.S3Config.DownloadPartMaxTime = 60 user.Description = "s3 tèst user" form := make(url.Values) form.Set(csrfFormToken, csrfToken) @@ -7557,8 +7562,17 @@ func TestWebUserS3Mock(t *testing.T) { req.Header.Set("Content-Type", contentType) rr = executeRequest(req) checkResponseCode(t, http.StatusOK, rr) - // test invalid s3_concurrency + // test invalid download max part time form.Set("s3_upload_part_size", strconv.FormatInt(user.FsConfig.S3Config.UploadPartSize, 10)) + form.Set("s3_download_part_max_time", "a") + b, contentType, _ = getMultipartFormData(form, "", "") + req, _ = http.NewRequest(http.MethodPost, path.Join(webUserPath, user.Username), &b) + setJWTCookieForReq(req, webToken) + req.Header.Set("Content-Type", contentType) + rr = executeRequest(req) + checkResponseCode(t, http.StatusOK, rr) + // test invalid s3_concurrency + form.Set("s3_download_part_max_time", strconv.Itoa(user.FsConfig.S3Config.DownloadPartMaxTime)) form.Set("s3_upload_concurrency", "a") b, contentType, _ = getMultipartFormData(form, "", "") req, _ = http.NewRequest(http.MethodPost, path.Join(webUserPath, user.Username), &b) @@ -7590,6 +7604,7 @@ func TestWebUserS3Mock(t *testing.T) { assert.Equal(t, updateUser.FsConfig.S3Config.KeyPrefix, user.FsConfig.S3Config.KeyPrefix) assert.Equal(t, updateUser.FsConfig.S3Config.UploadPartSize, user.FsConfig.S3Config.UploadPartSize) assert.Equal(t, updateUser.FsConfig.S3Config.UploadConcurrency, user.FsConfig.S3Config.UploadConcurrency) + assert.Equal(t, updateUser.FsConfig.S3Config.DownloadPartMaxTime, user.FsConfig.S3Config.DownloadPartMaxTime) assert.Equal(t, 2, len(updateUser.Filters.FilePatterns)) assert.Equal(t, kms.SecretStatusSecretBox, updateUser.FsConfig.S3Config.AccessSecret.GetStatus()) assert.NotEmpty(t, updateUser.FsConfig.S3Config.AccessSecret.GetPayload()) @@ -8233,6 +8248,7 @@ func TestS3WebFolderMock(t *testing.T) { S3KeyPrefix := "somedir/subdir/" S3UploadPartSize := 5 S3UploadConcurrency := 4 + S3MaxPartDownloadTime := 120 form := make(url.Values) form.Set("mapped_path", mappedPath) form.Set("name", folderName) @@ -8246,6 +8262,7 @@ func TestS3WebFolderMock(t *testing.T) { form.Set("s3_endpoint", S3Endpoint) form.Set("s3_key_prefix", S3KeyPrefix) form.Set("s3_upload_part_size", strconv.Itoa(S3UploadPartSize)) + form.Set("s3_download_part_max_time", strconv.Itoa(S3MaxPartDownloadTime)) form.Set("s3_upload_concurrency", "a") form.Set(csrfFormToken, csrfToken) b, contentType, err := getMultipartFormData(form, "", "") @@ -8287,6 +8304,7 @@ func TestS3WebFolderMock(t *testing.T) { assert.Equal(t, S3KeyPrefix, folder.FsConfig.S3Config.KeyPrefix) assert.Equal(t, S3UploadConcurrency, folder.FsConfig.S3Config.UploadConcurrency) assert.Equal(t, int64(S3UploadPartSize), folder.FsConfig.S3Config.UploadPartSize) + assert.Equal(t, S3MaxPartDownloadTime, folder.FsConfig.S3Config.DownloadPartMaxTime) // update S3UploadConcurrency = 10 form.Set("s3_upload_concurrency", "b") diff --git a/httpd/schema/openapi.yaml b/httpd/schema/openapi.yaml index cca7cac9..3a8216a6 100644 --- a/httpd/schema/openapi.yaml +++ b/httpd/schema/openapi.yaml @@ -2154,6 +2154,9 @@ components: upload_concurrency: type: integer description: 'the number of parts to upload in parallel. If this value is set to zero, the default value (2) will be used' + download_part_max_time: + type: integer + description: 'the maximum time allowed, in seconds, to download a single chunk (5MB). 0 means no timeout. Ignored for non-multipart downloads.' key_prefix: type: string description: 'key_prefix is similar to a chroot directory for a local filesystem. If specified the user will only see contents that starts with this prefix and so you can restrict access to a specific virtual folder. The prefix, if not empty, must not start with "/" and must end with "/". If empty the whole bucket contents will be available' diff --git a/httpd/webadmin.go b/httpd/webadmin.go index 610ff956..457e1b81 100644 --- a/httpd/webadmin.go +++ b/httpd/webadmin.go @@ -686,6 +686,10 @@ func getS3Config(r *http.Request) (vfs.S3FsConfig, error) { if err != nil { return config, err } + config.DownloadPartMaxTime, err = strconv.Atoi(r.Form.Get("s3_download_part_max_time")) + if err != nil { + return config, err + } config.UploadConcurrency, err = strconv.Atoi(r.Form.Get("s3_upload_concurrency")) return config, err } diff --git a/httpdtest/httpdtest.go b/httpdtest/httpdtest.go index 4d96b05f..bc237cf6 100644 --- a/httpdtest/httpdtest.go +++ b/httpdtest/httpdtest.go @@ -1057,6 +1057,9 @@ func compareS3Config(expected *vfs.Filesystem, actual *vfs.Filesystem) error { if expected.S3Config.UploadPartSize != actual.S3Config.UploadPartSize { return errors.New("fs S3 upload part size mismatch") } + if expected.S3Config.DownloadPartMaxTime != actual.S3Config.DownloadPartMaxTime { + return errors.New("fs S3 download part max time mismatch") + } if expected.S3Config.UploadConcurrency != actual.S3Config.UploadConcurrency { return errors.New("fs S3 upload concurrency mismatch") } diff --git a/sdk/filesystem.go b/sdk/filesystem.go index 554bab18..ae90e738 100644 --- a/sdk/filesystem.go +++ b/sdk/filesystem.go @@ -109,6 +109,9 @@ type S3FsConfig struct { UploadPartSize int64 `json:"upload_part_size,omitempty"` // How many parts are uploaded in parallel UploadConcurrency int `json:"upload_concurrency,omitempty"` + // DownloadPartMaxTime defines the maximum time allowed, in seconds, to download a single chunk (5MB). + // 0 means no timeout. Ignored for non-multipart downloads. + DownloadPartMaxTime int `json:"download_part_max_time,omitempty"` } // GCSFsConfig defines the configuration for Google Cloud Storage based filesystem diff --git a/templates/webadmin/fsconfig.html b/templates/webadmin/fsconfig.html index 38bfb815..ca82280f 100644 --- a/templates/webadmin/fsconfig.html +++ b/templates/webadmin/fsconfig.html @@ -78,8 +78,18 @@
+ +
+ + + Max time limit, in seconds, to download a single part (5MB). 0 means no limit + +
+
-
+
diff --git a/vfs/filesystem.go b/vfs/filesystem.go index a12cdeb4..1636bc60 100644 --- a/vfs/filesystem.go +++ b/vfs/filesystem.go @@ -221,22 +221,23 @@ func (f *Filesystem) HideConfidentialData() { } } -// GetACopy returns a copy +// GetACopy returns a filesystem copy func (f *Filesystem) GetACopy() Filesystem { f.SetEmptySecretsIfNil() fs := Filesystem{ Provider: f.Provider, S3Config: S3FsConfig{ S3FsConfig: sdk.S3FsConfig{ - Bucket: f.S3Config.Bucket, - Region: f.S3Config.Region, - AccessKey: f.S3Config.AccessKey, - AccessSecret: f.S3Config.AccessSecret.Clone(), - Endpoint: f.S3Config.Endpoint, - StorageClass: f.S3Config.StorageClass, - KeyPrefix: f.S3Config.KeyPrefix, - UploadPartSize: f.S3Config.UploadPartSize, - UploadConcurrency: f.S3Config.UploadConcurrency, + Bucket: f.S3Config.Bucket, + Region: f.S3Config.Region, + AccessKey: f.S3Config.AccessKey, + AccessSecret: f.S3Config.AccessSecret.Clone(), + Endpoint: f.S3Config.Endpoint, + StorageClass: f.S3Config.StorageClass, + KeyPrefix: f.S3Config.KeyPrefix, + UploadPartSize: f.S3Config.UploadPartSize, + UploadConcurrency: f.S3Config.UploadConcurrency, + DownloadPartMaxTime: f.S3Config.DownloadPartMaxTime, }, }, GCSConfig: GCSFsConfig{ diff --git a/vfs/s3fs.go b/vfs/s3fs.go index fcaad899..e5e0b35d 100644 --- a/vfs/s3fs.go +++ b/vfs/s3fs.go @@ -16,6 +16,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" @@ -178,10 +179,17 @@ func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fun } ctx, cancelFn := context.WithCancel(context.Background()) downloader := s3manager.NewDownloaderWithClient(fs.svc) - /*downloader.RequestOptions = append(downloader.RequestOptions, func(r *request.Request) { - newCtx, _ := context.WithTimeout(r.Context(), time.Minute) - r.SetContext(newCtx) - })*/ + if offset == 0 && fs.config.DownloadPartMaxTime > 0 { + downloader.RequestOptions = append(downloader.RequestOptions, func(r *request.Request) { + chunkCtx, cancel := context.WithTimeout(r.Context(), time.Duration(fs.config.DownloadPartMaxTime)*time.Second) + r.SetContext(chunkCtx) + + go func() { + <-ctx.Done() + cancel() + }() + }) + } var streamRange *string if offset > 0 { streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))