S3: add a timeout for single part uploads

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino 2022-02-01 12:15:56 +01:00
parent d51adb041e
commit cd35636939
No known key found for this signature in database
GPG key ID: 2F1FB59433D5A8CB
10 changed files with 68 additions and 13 deletions

6
go.mod
View file

@ -39,7 +39,7 @@ require (
github.com/rs/cors v1.8.2
github.com/rs/xid v1.3.0
github.com/rs/zerolog v1.26.2-0.20211219225053-665519c4da50
github.com/sftpgo/sdk v0.0.0-20220130093602-2e82a333cdec
github.com/sftpgo/sdk v0.0.0-20220201111021-563c373f8012
github.com/shirou/gopsutil/v3 v3.21.13-0.20220106132423-a3ae4bc40d26
github.com/spf13/afero v1.8.0
github.com/spf13/cobra v1.3.0
@ -86,7 +86,7 @@ require (
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.10 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect
github.com/lestrrat-go/blackmagic v1.0.0 // indirect
@ -96,7 +96,7 @@ require (
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-ieproxy v0.0.3-0.20220115171849-ffa2c199638b // indirect
github.com/mattn/go-ieproxy v0.0.3 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/miekg/dns v1.1.45 // indirect

12
go.sum
View file

@ -510,8 +510,8 @@ github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.10 h1:fv5GKR+e2UgD+gcxQECVT5rBwAmlFLl2mkKm7WK3ODY=
github.com/klauspost/cpuid/v2 v2.0.10/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@ -566,8 +566,8 @@ github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E=
github.com/mattn/go-ieproxy v0.0.3-0.20220115171849-ffa2c199638b h1:hOk7BgJT/9Vt2aIrfXp0qA6hwY2JZSwX4Rmsgp8DJ6E=
github.com/mattn/go-ieproxy v0.0.3-0.20220115171849-ffa2c199638b/go.mod h1:6ZpRmhBaYuBX1U2za+9rC9iCGLsSp2tftelZne7CPko=
github.com/mattn/go-ieproxy v0.0.3 h1:YkaHmK1CzE5C4O7A3hv3TCbfNDPSCf0RKZFX+VhBeYk=
github.com/mattn/go-ieproxy v0.0.3/go.mod h1:6ZpRmhBaYuBX1U2za+9rC9iCGLsSp2tftelZne7CPko=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
@ -691,8 +691,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4 h1:PT+ElG/UUFMfqy5HrxJxNzj3QBOf7dZwupeVC+mG1Lo=
github.com/secsy/goftp v0.0.0-20200609142545-aa2de14babf4/go.mod h1:MnkX001NG75g3p8bhFycnyIjeQoOjGL6CEIsdE/nKSY=
github.com/sftpgo/sdk v0.0.0-20220130093602-2e82a333cdec h1:zdL+7nNYny5e87IDZMFReFHviKRenxmCGDwgLwHIrwU=
github.com/sftpgo/sdk v0.0.0-20220130093602-2e82a333cdec/go.mod h1:gcYbk4z578GfwbC9kJOz2rltYoPYUIcGZgV13r74MJw=
github.com/sftpgo/sdk v0.0.0-20220201111021-563c373f8012 h1:tkzS0kxhatqIVrWZePzsFlp1xQgR9q6Wt0UYKsBiCUU=
github.com/sftpgo/sdk v0.0.0-20220201111021-563c373f8012/go.mod h1:gcYbk4z578GfwbC9kJOz2rltYoPYUIcGZgV13r74MJw=
github.com/shirou/gopsutil/v3 v3.21.13-0.20220106132423-a3ae4bc40d26 h1:nkvraEu1xs6D3AimiR9SkIOCG6lVvVZRfwbbQ7fX1DY=
github.com/shirou/gopsutil/v3 v3.21.13-0.20220106132423-a3ae4bc40d26/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8ufT6fPQLdJzA=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=

View file

@ -2539,6 +2539,7 @@ func TestUserS3Config(t *testing.T) {
user.FsConfig.S3Config.Endpoint = "http://127.0.0.1:9000"
user.FsConfig.S3Config.UploadPartSize = 8
user.FsConfig.S3Config.DownloadPartMaxTime = 60
user.FsConfig.S3Config.UploadPartMaxTime = 40
user.FsConfig.S3Config.ForcePathStyle = true
user.FsConfig.S3Config.DownloadPartSize = 6
folderName := "vfolderName"
@ -2562,6 +2563,7 @@ func TestUserS3Config(t *testing.T) {
assert.Empty(t, user.FsConfig.S3Config.AccessSecret.GetAdditionalData())
assert.Empty(t, user.FsConfig.S3Config.AccessSecret.GetKey())
assert.Equal(t, 60, user.FsConfig.S3Config.DownloadPartMaxTime)
assert.Equal(t, 40, user.FsConfig.S3Config.UploadPartMaxTime)
if assert.Len(t, user.VirtualFolders, 1) {
folder := user.VirtualFolders[0]
assert.Equal(t, sdkkms.SecretStatusSecretBox, folder.FsConfig.CryptConfig.Passphrase.GetStatus())
@ -14258,6 +14260,7 @@ func TestUserTemplateMock(t *testing.T) {
form.Add("hooks", "check_password_disabled")
form.Set("disable_fs_checks", "checked")
form.Set("s3_download_part_max_time", "0")
form.Set("s3_upload_part_max_time", "0")
// test invalid s3_upload_part_size
form.Set("s3_upload_part_size", "a")
form.Set("form_action", "export_from_template")
@ -14485,6 +14488,7 @@ func TestFolderTemplateMock(t *testing.T) {
form.Set("s3_upload_part_size", "5")
form.Set("s3_upload_concurrency", "4")
form.Set("s3_download_part_max_time", "0")
form.Set("s3_upload_part_max_time", "0")
form.Set("s3_download_part_size", "6")
form.Set("s3_download_concurrency", "2")
b, contentType, _ = getMultipartFormData(form, "", "")
@ -14565,6 +14569,7 @@ func TestWebUserS3Mock(t *testing.T) {
user.FsConfig.S3Config.UploadPartSize = 5
user.FsConfig.S3Config.UploadConcurrency = 4
user.FsConfig.S3Config.DownloadPartMaxTime = 60
user.FsConfig.S3Config.UploadPartMaxTime = 120
user.FsConfig.S3Config.DownloadPartSize = 6
user.FsConfig.S3Config.DownloadConcurrency = 3
user.FsConfig.S3Config.ForcePathStyle = true
@ -14656,8 +14661,17 @@ func TestWebUserS3Mock(t *testing.T) {
req.Header.Set("Content-Type", contentType)
rr = executeRequest(req)
checkResponseCode(t, http.StatusOK, rr)
// now add the user
// test invalid s3_upload_part_max_time
form.Set("s3_download_part_max_time", strconv.Itoa(user.FsConfig.S3Config.DownloadPartMaxTime))
form.Set("s3_upload_part_max_time", "a")
b, contentType, _ = getMultipartFormData(form, "", "")
req, _ = http.NewRequest(http.MethodPost, path.Join(webUserPath, user.Username), &b)
setJWTCookieForReq(req, webToken)
req.Header.Set("Content-Type", contentType)
rr = executeRequest(req)
checkResponseCode(t, http.StatusOK, rr)
// now add the user
form.Set("s3_upload_part_max_time", strconv.Itoa(user.FsConfig.S3Config.UploadPartMaxTime))
b, contentType, _ = getMultipartFormData(form, "", "")
req, _ = http.NewRequest(http.MethodPost, path.Join(webUserPath, user.Username), &b)
setJWTCookieForReq(req, webToken)
@ -14682,6 +14696,7 @@ func TestWebUserS3Mock(t *testing.T) {
assert.Equal(t, updateUser.FsConfig.S3Config.UploadPartSize, user.FsConfig.S3Config.UploadPartSize)
assert.Equal(t, updateUser.FsConfig.S3Config.UploadConcurrency, user.FsConfig.S3Config.UploadConcurrency)
assert.Equal(t, updateUser.FsConfig.S3Config.DownloadPartMaxTime, user.FsConfig.S3Config.DownloadPartMaxTime)
assert.Equal(t, updateUser.FsConfig.S3Config.UploadPartMaxTime, user.FsConfig.S3Config.UploadPartMaxTime)
assert.Equal(t, updateUser.FsConfig.S3Config.DownloadPartSize, user.FsConfig.S3Config.DownloadPartSize)
assert.Equal(t, updateUser.FsConfig.S3Config.DownloadConcurrency, user.FsConfig.S3Config.DownloadConcurrency)
assert.True(t, updateUser.FsConfig.S3Config.ForcePathStyle)
@ -15355,6 +15370,7 @@ func TestS3WebFolderMock(t *testing.T) {
S3UploadPartSize := 5
S3UploadConcurrency := 4
S3MaxPartDownloadTime := 120
S3MaxPartUploadTime := 60
S3DownloadPartSize := 6
S3DownloadConcurrency := 3
form := make(url.Values)
@ -15374,6 +15390,7 @@ func TestS3WebFolderMock(t *testing.T) {
form.Set("s3_download_part_max_time", strconv.Itoa(S3MaxPartDownloadTime))
form.Set("s3_download_part_size", strconv.Itoa(S3DownloadPartSize))
form.Set("s3_download_concurrency", strconv.Itoa(S3DownloadConcurrency))
form.Set("s3_upload_part_max_time", strconv.Itoa(S3MaxPartUploadTime))
form.Set("s3_upload_concurrency", "a")
form.Set(csrfFormToken, csrfToken)
b, contentType, err := getMultipartFormData(form, "", "")
@ -15417,6 +15434,7 @@ func TestS3WebFolderMock(t *testing.T) {
assert.Equal(t, S3UploadConcurrency, folder.FsConfig.S3Config.UploadConcurrency)
assert.Equal(t, int64(S3UploadPartSize), folder.FsConfig.S3Config.UploadPartSize)
assert.Equal(t, S3MaxPartDownloadTime, folder.FsConfig.S3Config.DownloadPartMaxTime)
assert.Equal(t, S3MaxPartUploadTime, folder.FsConfig.S3Config.UploadPartMaxTime)
assert.Equal(t, S3DownloadConcurrency, folder.FsConfig.S3Config.DownloadConcurrency)
assert.Equal(t, int64(S3DownloadPartSize), folder.FsConfig.S3Config.DownloadPartSize)
assert.False(t, folder.FsConfig.S3Config.ForcePathStyle)

View file

@ -984,6 +984,10 @@ func getS3Config(r *http.Request) (vfs.S3FsConfig, error) {
}
config.ForcePathStyle = r.Form.Get("s3_force_path_style") != ""
config.DownloadPartMaxTime, err = strconv.Atoi(r.Form.Get("s3_download_part_max_time"))
if err != nil {
return config, err
}
config.UploadPartMaxTime, err = strconv.Atoi(r.Form.Get("s3_upload_part_max_time"))
return config, err
}

View file

@ -1305,6 +1305,9 @@ func compareS3Config(expected *vfs.Filesystem, actual *vfs.Filesystem) error { /
if expected.S3Config.DownloadPartMaxTime != actual.S3Config.DownloadPartMaxTime {
return errors.New("fs S3 download part max time mismatch")
}
if expected.S3Config.UploadPartMaxTime != actual.S3Config.UploadPartMaxTime {
return errors.New("fs S3 upload part max time mismatch")
}
if expected.S3Config.KeyPrefix != actual.S3Config.KeyPrefix &&
expected.S3Config.KeyPrefix+"/" != actual.S3Config.KeyPrefix {
return errors.New("fs S3 key prefix mismatch")

View file

@ -4611,6 +4611,9 @@ components:
upload_concurrency:
type: integer
description: 'the number of parts to upload in parallel. If this value is set to zero, the default value (5) will be used'
upload_part_max_time:
type: integer
description: 'the maximum time allowed, in seconds, to upload a single chunk (the chunk size is defined via "upload_part_size"). 0 means no timeout'
download_part_size:
type: integer
description: 'the buffer size (in MB) to use for multipart downloads. The minimum allowed part size is 5MB, and if this value is set to zero, the default value (5MB) for the AWS SDK will be used. The minimum allowed value is 5. Ignored for partial downloads'
@ -4619,7 +4622,7 @@ components:
description: 'the number of parts to download in parallel. If this value is set to zero, the default value (5) will be used. Ignored for partial downloads'
download_part_max_time:
type: integer
description: 'the maximum time allowed, in seconds, to download a single chunk (the chunk is defined via "download_part_size"). 0 means no timeout. Ignored for partial downloads.'
description: 'the maximum time allowed, in seconds, to download a single chunk (the chunk size is defined via "download_part_size"). 0 means no timeout. Ignored for partial downloads.'
force_path_style:
type: boolean
description: 'Set this to "true" to force the request to use path-style addressing, i.e., "http://s3.amazonaws.com/BUCKET/KEY". By default, the S3 client will use virtual hosted bucket addressing when possible ("http://BUCKET.s3.amazonaws.com/KEY")'

View file

@ -123,6 +123,16 @@
</div>
<div class="form-group row fsconfig fsconfig-s3fs">
<label for="idS3UploadTimeout" class="col-sm-2 col-form-label">UL Part Timeout (secs)</label>
<div class="col-sm-3">
<input type="number" class="form-control" id="idS3UploadTimeout" name="s3_upload_part_max_time"
placeholder="" value="{{.S3Config.UploadPartMaxTime}}" min="0"
aria-describedby="S3UploadTimeoutHelpBlock">
<small id="S3UploadTimeoutHelpBlock" class="form-text text-muted">
Max time limit, in seconds, to upload a single part. 0 means no limit
</small>
</div>
<div class="col-sm-2"></div>
<label for="idS3DownloadTimeout" class="col-sm-2 col-form-label">DL Part Timeout (secs)</label>
<div class="col-sm-3">
<input type="number" class="form-control" id="idS3DownloadTimeout" name="s3_download_part_max_time"
@ -132,9 +142,11 @@
Max time limit, in seconds, to download a single part. 0 means no limit
</small>
</div>
<div class="col-sm-2"></div>
</div>
<div class="form-group row fsconfig fsconfig-s3fs">
<label for="idS3ACL" class="col-sm-2 col-form-label">ACL</label>
<div class="col-sm-3">
<div class="col-sm-10">
<input type="text" class="form-control" id="idS3ACL" name="s3_acl" placeholder=""
value="{{.S3Config.ACL}}" maxlength="255" aria-describedby="S3ACLHelpBlock">
<small id="S3ACLHelpBlock" class="form-text text-muted">

View file

@ -253,6 +253,7 @@ func (f *Filesystem) GetACopy() Filesystem {
DownloadPartSize: f.S3Config.DownloadPartSize,
DownloadConcurrency: f.S3Config.DownloadConcurrency,
DownloadPartMaxTime: f.S3Config.DownloadPartMaxTime,
UploadPartMaxTime: f.S3Config.UploadPartMaxTime,
ForcePathStyle: f.S3Config.ForcePathStyle,
},
AccessSecret: f.S3Config.AccessSecret.Clone(),

View file

@ -232,6 +232,17 @@ func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error)
p := NewPipeWriter(w)
ctx, cancelFn := context.WithCancel(context.Background())
uploader := s3manager.NewUploaderWithClient(fs.svc)
if fs.config.UploadPartMaxTime > 0 {
uploader.RequestOptions = append(uploader.RequestOptions, func(r *request.Request) {
chunkCtx, cancel := context.WithTimeout(r.Context(), time.Duration(fs.config.UploadPartMaxTime)*time.Second)
r.SetContext(chunkCtx)
go func() {
<-ctx.Done()
cancel()
}()
})
}
go func() {
defer cancelFn()
key := name

View file

@ -198,6 +198,9 @@ func (c *S3FsConfig) isEqual(other *S3FsConfig) bool {
if c.DownloadPartMaxTime != other.DownloadPartMaxTime {
return false
}
if c.UploadPartMaxTime != other.UploadPartMaxTime {
return false
}
if c.ForcePathStyle != other.ForcePathStyle {
return false
}