From cd3563693971736ee5cd3c12350fb936725fe4d5 Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Tue, 1 Feb 2022 12:15:56 +0100 Subject: [PATCH] S3: add a timeout for single part uploads Signed-off-by: Nicola Murino --- go.mod | 6 +++--- go.sum | 12 ++++++------ httpd/httpd_test.go | 20 +++++++++++++++++++- httpd/webadmin.go | 4 ++++ httpdtest/httpdtest.go | 3 +++ openapi/openapi.yaml | 5 ++++- templates/webadmin/fsconfig.html | 16 ++++++++++++++-- vfs/filesystem.go | 1 + vfs/s3fs.go | 11 +++++++++++ vfs/vfs.go | 3 +++ 10 files changed, 68 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 7ab7e568..3e5a5921 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/rs/cors v1.8.2 github.com/rs/xid v1.3.0 github.com/rs/zerolog v1.26.2-0.20211219225053-665519c4da50 - github.com/sftpgo/sdk v0.0.0-20220130093602-2e82a333cdec + github.com/sftpgo/sdk v0.0.0-20220201111021-563c373f8012 github.com/shirou/gopsutil/v3 v3.21.13-0.20220106132423-a3ae4bc40d26 github.com/spf13/afero v1.8.0 github.com/spf13/cobra v1.3.0 @@ -86,7 +86,7 @@ require ( github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/cpuid/v2 v2.0.9 // indirect + github.com/klauspost/cpuid/v2 v2.0.10 // indirect github.com/kr/fs v0.1.0 // indirect github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect github.com/lestrrat-go/blackmagic v1.0.0 // indirect @@ -96,7 +96,7 @@ require ( github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/mattn/go-colorable v0.1.12 // indirect - github.com/mattn/go-ieproxy v0.0.3-0.20220115171849-ffa2c199638b // indirect + github.com/mattn/go-ieproxy v0.0.3 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/miekg/dns v1.1.45 // indirect diff --git a/go.sum b/go.sum index 3525383c..b978f70c 100644 --- a/go.sum +++ b/go.sum @@ -510,8 +510,8 @@ github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw= github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= -github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.0.10 h1:fv5GKR+e2UgD+gcxQECVT5rBwAmlFLl2mkKm7WK3ODY= +github.com/klauspost/cpuid/v2 v2.0.10/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -566,8 +566,8 @@ github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= -github.com/mattn/go-ieproxy v0.0.3-0.20220115171849-ffa2c199638b h1:hOk7BgJT/9Vt2aIrfXp0qA6hwY2JZSwX4Rmsgp8DJ6E= -github.com/mattn/go-ieproxy v0.0.3-0.20220115171849-ffa2c199638b/go.mod h1:6ZpRmhBaYuBX1U2za+9rC9iCGLsSp2tftelZne7CPko= +github.com/mattn/go-ieproxy v0.0.3 h1:YkaHmK1CzE5C4O7A3hv3TCbfNDPSCf0RKZFX+VhBeYk= +github.com/mattn/go-ieproxy v0.0.3/go.mod h1:6ZpRmhBaYuBX1U2za+9rC9iCGLsSp2tftelZne7CPko= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= @@ -691,8 +691,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4 h1:PT+ElG/UUFMfqy5HrxJxNzj3QBOf7dZwupeVC+mG1Lo= github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4/go.mod h1:MnkX001NG75g3p8bhFycnyIjeQoOjGL6CEIsdE/nKSY= -github.com/sftpgo/sdk v0.0.0-20220130093602-2e82a333cdec h1:zdL+7nNYny5e87IDZMFReFHviKRenxmCGDwgLwHIrwU= -github.com/sftpgo/sdk v0.0.0-20220130093602-2e82a333cdec/go.mod h1:gcYbk4z578GfwbC9kJOz2rltYoPYUIcGZgV13r74MJw= +github.com/sftpgo/sdk v0.0.0-20220201111021-563c373f8012 h1:tkzS0kxhatqIVrWZePzsFlp1xQgR9q6Wt0UYKsBiCUU= +github.com/sftpgo/sdk v0.0.0-20220201111021-563c373f8012/go.mod h1:gcYbk4z578GfwbC9kJOz2rltYoPYUIcGZgV13r74MJw= github.com/shirou/gopsutil/v3 v3.21.13-0.20220106132423-a3ae4bc40d26 h1:nkvraEu1xs6D3AimiR9SkIOCG6lVvVZRfwbbQ7fX1DY= github.com/shirou/gopsutil/v3 v3.21.13-0.20220106132423-a3ae4bc40d26/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8ufT6fPQLdJzA= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= diff --git a/httpd/httpd_test.go b/httpd/httpd_test.go index a1791905..e90bcca1 100644 --- a/httpd/httpd_test.go +++ b/httpd/httpd_test.go @@ -2539,6 +2539,7 @@ func TestUserS3Config(t *testing.T) { user.FsConfig.S3Config.Endpoint = "http://127.0.0.1:9000" user.FsConfig.S3Config.UploadPartSize = 8 user.FsConfig.S3Config.DownloadPartMaxTime = 60 + user.FsConfig.S3Config.UploadPartMaxTime = 40 user.FsConfig.S3Config.ForcePathStyle = true user.FsConfig.S3Config.DownloadPartSize = 6 folderName := "vfolderName" @@ -2562,6 +2563,7 @@ func TestUserS3Config(t *testing.T) { assert.Empty(t, user.FsConfig.S3Config.AccessSecret.GetAdditionalData()) assert.Empty(t, user.FsConfig.S3Config.AccessSecret.GetKey()) assert.Equal(t, 60, user.FsConfig.S3Config.DownloadPartMaxTime) + assert.Equal(t, 40, user.FsConfig.S3Config.UploadPartMaxTime) if assert.Len(t, user.VirtualFolders, 1) { folder := user.VirtualFolders[0] assert.Equal(t, sdkkms.SecretStatusSecretBox, folder.FsConfig.CryptConfig.Passphrase.GetStatus()) @@ -14258,6 +14260,7 @@ func TestUserTemplateMock(t *testing.T) { form.Add("hooks", "check_password_disabled") form.Set("disable_fs_checks", "checked") form.Set("s3_download_part_max_time", "0") + form.Set("s3_upload_part_max_time", "0") // test invalid s3_upload_part_size form.Set("s3_upload_part_size", "a") form.Set("form_action", "export_from_template") @@ -14485,6 +14488,7 @@ func TestFolderTemplateMock(t *testing.T) { form.Set("s3_upload_part_size", "5") form.Set("s3_upload_concurrency", "4") form.Set("s3_download_part_max_time", "0") + form.Set("s3_upload_part_max_time", "0") form.Set("s3_download_part_size", "6") form.Set("s3_download_concurrency", "2") b, contentType, _ = getMultipartFormData(form, "", "") @@ -14565,6 +14569,7 @@ func TestWebUserS3Mock(t *testing.T) { user.FsConfig.S3Config.UploadPartSize = 5 user.FsConfig.S3Config.UploadConcurrency = 4 user.FsConfig.S3Config.DownloadPartMaxTime = 60 + user.FsConfig.S3Config.UploadPartMaxTime = 120 user.FsConfig.S3Config.DownloadPartSize = 6 user.FsConfig.S3Config.DownloadConcurrency = 3 user.FsConfig.S3Config.ForcePathStyle = true @@ -14656,8 +14661,17 @@ func TestWebUserS3Mock(t *testing.T) { req.Header.Set("Content-Type", contentType) rr = executeRequest(req) checkResponseCode(t, http.StatusOK, rr) - // now add the user + // test invalid s3_upload_part_max_time form.Set("s3_download_part_max_time", strconv.Itoa(user.FsConfig.S3Config.DownloadPartMaxTime)) + form.Set("s3_upload_part_max_time", "a") + b, contentType, _ = getMultipartFormData(form, "", "") + req, _ = http.NewRequest(http.MethodPost, path.Join(webUserPath, user.Username), &b) + setJWTCookieForReq(req, webToken) + req.Header.Set("Content-Type", contentType) + rr = executeRequest(req) + checkResponseCode(t, http.StatusOK, rr) + // now add the user + form.Set("s3_upload_part_max_time", strconv.Itoa(user.FsConfig.S3Config.UploadPartMaxTime)) b, contentType, _ = getMultipartFormData(form, "", "") req, _ = http.NewRequest(http.MethodPost, path.Join(webUserPath, user.Username), &b) setJWTCookieForReq(req, webToken) @@ -14682,6 +14696,7 @@ func TestWebUserS3Mock(t *testing.T) { assert.Equal(t, updateUser.FsConfig.S3Config.UploadPartSize, user.FsConfig.S3Config.UploadPartSize) assert.Equal(t, updateUser.FsConfig.S3Config.UploadConcurrency, user.FsConfig.S3Config.UploadConcurrency) assert.Equal(t, updateUser.FsConfig.S3Config.DownloadPartMaxTime, user.FsConfig.S3Config.DownloadPartMaxTime) + assert.Equal(t, updateUser.FsConfig.S3Config.UploadPartMaxTime, user.FsConfig.S3Config.UploadPartMaxTime) assert.Equal(t, updateUser.FsConfig.S3Config.DownloadPartSize, user.FsConfig.S3Config.DownloadPartSize) assert.Equal(t, updateUser.FsConfig.S3Config.DownloadConcurrency, user.FsConfig.S3Config.DownloadConcurrency) assert.True(t, updateUser.FsConfig.S3Config.ForcePathStyle) @@ -15355,6 +15370,7 @@ func TestS3WebFolderMock(t *testing.T) { S3UploadPartSize := 5 S3UploadConcurrency := 4 S3MaxPartDownloadTime := 120 + S3MaxPartUploadTime := 60 S3DownloadPartSize := 6 S3DownloadConcurrency := 3 form := make(url.Values) @@ -15374,6 +15390,7 @@ func TestS3WebFolderMock(t *testing.T) { form.Set("s3_download_part_max_time", strconv.Itoa(S3MaxPartDownloadTime)) form.Set("s3_download_part_size", strconv.Itoa(S3DownloadPartSize)) form.Set("s3_download_concurrency", strconv.Itoa(S3DownloadConcurrency)) + form.Set("s3_upload_part_max_time", strconv.Itoa(S3MaxPartUploadTime)) form.Set("s3_upload_concurrency", "a") form.Set(csrfFormToken, csrfToken) b, contentType, err := getMultipartFormData(form, "", "") @@ -15417,6 +15434,7 @@ func TestS3WebFolderMock(t *testing.T) { assert.Equal(t, S3UploadConcurrency, folder.FsConfig.S3Config.UploadConcurrency) assert.Equal(t, int64(S3UploadPartSize), folder.FsConfig.S3Config.UploadPartSize) assert.Equal(t, S3MaxPartDownloadTime, folder.FsConfig.S3Config.DownloadPartMaxTime) + assert.Equal(t, S3MaxPartUploadTime, folder.FsConfig.S3Config.UploadPartMaxTime) assert.Equal(t, S3DownloadConcurrency, folder.FsConfig.S3Config.DownloadConcurrency) assert.Equal(t, int64(S3DownloadPartSize), folder.FsConfig.S3Config.DownloadPartSize) assert.False(t, folder.FsConfig.S3Config.ForcePathStyle) diff --git a/httpd/webadmin.go b/httpd/webadmin.go index 2e669170..fef566ab 100644 --- a/httpd/webadmin.go +++ b/httpd/webadmin.go @@ -984,6 +984,10 @@ func getS3Config(r *http.Request) (vfs.S3FsConfig, error) { } config.ForcePathStyle = r.Form.Get("s3_force_path_style") != "" config.DownloadPartMaxTime, err = strconv.Atoi(r.Form.Get("s3_download_part_max_time")) + if err != nil { + return config, err + } + config.UploadPartMaxTime, err = strconv.Atoi(r.Form.Get("s3_upload_part_max_time")) return config, err } diff --git a/httpdtest/httpdtest.go b/httpdtest/httpdtest.go index f75145e7..d55dcd6d 100644 --- a/httpdtest/httpdtest.go +++ b/httpdtest/httpdtest.go @@ -1305,6 +1305,9 @@ func compareS3Config(expected *vfs.Filesystem, actual *vfs.Filesystem) error { / if expected.S3Config.DownloadPartMaxTime != actual.S3Config.DownloadPartMaxTime { return errors.New("fs S3 download part max time mismatch") } + if expected.S3Config.UploadPartMaxTime != actual.S3Config.UploadPartMaxTime { + return errors.New("fs S3 upload part max time mismatch") + } if expected.S3Config.KeyPrefix != actual.S3Config.KeyPrefix && expected.S3Config.KeyPrefix+"/" != actual.S3Config.KeyPrefix { return errors.New("fs S3 key prefix mismatch") diff --git a/openapi/openapi.yaml b/openapi/openapi.yaml index 86b4fe96..a1a47a4d 100644 --- a/openapi/openapi.yaml +++ b/openapi/openapi.yaml @@ -4611,6 +4611,9 @@ components: upload_concurrency: type: integer description: 'the number of parts to upload in parallel. If this value is set to zero, the default value (5) will be used' + upload_part_max_time: + type: integer + description: 'the maximum time allowed, in seconds, to upload a single chunk (the chunk size is defined via "upload_part_size"). 0 means no timeout' download_part_size: type: integer description: 'the buffer size (in MB) to use for multipart downloads. The minimum allowed part size is 5MB, and if this value is set to zero, the default value (5MB) for the AWS SDK will be used. The minimum allowed value is 5. Ignored for partial downloads' @@ -4619,7 +4622,7 @@ components: description: 'the number of parts to download in parallel. If this value is set to zero, the default value (5) will be used. Ignored for partial downloads' download_part_max_time: type: integer - description: 'the maximum time allowed, in seconds, to download a single chunk (the chunk is defined via "download_part_size"). 0 means no timeout. Ignored for partial downloads.' + description: 'the maximum time allowed, in seconds, to download a single chunk (the chunk size is defined via "download_part_size"). 0 means no timeout. Ignored for partial downloads.' force_path_style: type: boolean description: 'Set this to "true" to force the request to use path-style addressing, i.e., "http://s3.amazonaws.com/BUCKET/KEY". By default, the S3 client will use virtual hosted bucket addressing when possible ("http://BUCKET.s3.amazonaws.com/KEY")' diff --git a/templates/webadmin/fsconfig.html b/templates/webadmin/fsconfig.html index 66bad834..58729bb3 100644 --- a/templates/webadmin/fsconfig.html +++ b/templates/webadmin/fsconfig.html @@ -123,6 +123,16 @@
+ +
+ + + Max time limit, in seconds, to upload a single part. 0 means no limit + +
+
-
+
+ +
-
+
diff --git a/vfs/filesystem.go b/vfs/filesystem.go index a3fe2434..58bb80df 100644 --- a/vfs/filesystem.go +++ b/vfs/filesystem.go @@ -253,6 +253,7 @@ func (f *Filesystem) GetACopy() Filesystem { DownloadPartSize: f.S3Config.DownloadPartSize, DownloadConcurrency: f.S3Config.DownloadConcurrency, DownloadPartMaxTime: f.S3Config.DownloadPartMaxTime, + UploadPartMaxTime: f.S3Config.UploadPartMaxTime, ForcePathStyle: f.S3Config.ForcePathStyle, }, AccessSecret: f.S3Config.AccessSecret.Clone(), diff --git a/vfs/s3fs.go b/vfs/s3fs.go index 6a85cda3..12ef9413 100644 --- a/vfs/s3fs.go +++ b/vfs/s3fs.go @@ -232,6 +232,17 @@ func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error) p := NewPipeWriter(w) ctx, cancelFn := context.WithCancel(context.Background()) uploader := s3manager.NewUploaderWithClient(fs.svc) + if fs.config.UploadPartMaxTime > 0 { + uploader.RequestOptions = append(uploader.RequestOptions, func(r *request.Request) { + chunkCtx, cancel := context.WithTimeout(r.Context(), time.Duration(fs.config.UploadPartMaxTime)*time.Second) + r.SetContext(chunkCtx) + + go func() { + <-ctx.Done() + cancel() + }() + }) + } go func() { defer cancelFn() key := name diff --git a/vfs/vfs.go b/vfs/vfs.go index d6999960..afce556f 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -198,6 +198,9 @@ func (c *S3FsConfig) isEqual(other *S3FsConfig) bool { if c.DownloadPartMaxTime != other.DownloadPartMaxTime { return false } + if c.UploadPartMaxTime != other.UploadPartMaxTime { + return false + } if c.ForcePathStyle != other.ForcePathStyle { return false }