From 691e44c1dc3e4c6762e2550ee64628b0c5df73f0 Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Wed, 25 Oct 2023 19:05:37 +0200 Subject: [PATCH] add more upload modes Signed-off-by: Nicola Murino --- docs/full-configuration.md | 2 +- docs/s3.md | 3 +-- go.mod | 4 ++-- go.sum | 8 ++++---- internal/common/common.go | 15 +++++++++++---- internal/common/connection.go | 2 +- internal/common/transfer.go | 2 +- internal/config/config.go | 11 ----------- internal/config/config_test.go | 22 ---------------------- internal/vfs/azblobfs.go | 3 +++ internal/vfs/gcsfs.go | 19 +++++++++++-------- internal/vfs/s3fs.go | 3 +++ internal/vfs/vfs.go | 26 +++++++++++++++++++++++--- 13 files changed, 61 insertions(+), 59 deletions(-) diff --git a/docs/full-configuration.md b/docs/full-configuration.md index 3287e569..b3c5bc5b 100644 --- a/docs/full-configuration.md +++ b/docs/full-configuration.md @@ -62,7 +62,7 @@ The configuration file contains the following sections: - **"common"**, configuration parameters shared among all the supported protocols - `idle_timeout`, integer. Time in minutes after which an idle client will be disconnected. 0 means disabled. Default: 15 - - `upload_mode` integer. 0 means standard: the files are uploaded directly to the requested path. 1 means atomic: files are uploaded to a temporary path and renamed to the requested path when the client ends the upload. Atomic mode avoids problems such as a web server that serves partial files when the files are being uploaded. In atomic mode, if there is an upload error, the temporary file is deleted and so the requested upload path will not contain a partial file. 2 means atomic with resume support: same as atomic but if there is an upload error, the temporary file is renamed to the requested path and not deleted. This way, a client can reconnect and resume the upload. Ignored for cloud-based storage backends (uploads are always atomic and upload resume is not supported, by default, for these backends) and for SFTP backend if buffering is enabled. Default: `0` + - `upload_mode` integer. `0` means standard: the files are uploaded directly to the requested path. `1` means atomic: files are uploaded to a temporary path and renamed to the requested path when the client ends the upload. Atomic mode avoids problems such as a web server that serves partial files when the files are being uploaded. In atomic mode, if there is an upload error, the temporary file is deleted and so the requested upload path will not contain a partial file. `2` means atomic with resume support: same as atomic but if there is an upload error, the temporary file is renamed to the requested path and not deleted. This way, a client can reconnect and resume the upload. `4` means files for S3 backend are stored even if a client-side upload error is detected. `8` means files for Google Cloud Storage backend are stored even if a client-side upload error is detected. `16` means files for Azure Blob backend are stored even if a client-side upload error is detected. Ignored for SFTP backend if buffering is enabled. The flags can be combined, if you provide both `1` and `2`, `2` will be used. Default: `0` - `actions`, struct. It contains the command to execute and/or the HTTP URL to notify and the trigger conditions. See [Custom Actions](./custom-actions.md) for more details - `execute_on`, list of strings. Valid values are `pre-download`, `download`, `first-download`, `pre-upload`, `upload`, `first-upload`, `pre-delete`, `delete`, `rename`, `mkdir`, `rmdir`, `ssh_cmd`, `copy`. Leave empty to disable actions. - `execute_sync`, list of strings. Actions, defined in the `execute_on` list above, to be performed synchronously. The `pre-*` actions are always executed synchronously while the other ones are asynchronous. Executing an action synchronously means that SFTPGo will not return a result code to the client (which is waiting for it) until your hook have completed its execution. Leave empty to execute only the defined `pre-*` hook synchronously diff --git a/docs/s3.md b/docs/s3.md index 0662210c..7cba3671 100644 --- a/docs/s3.md +++ b/docs/s3.md @@ -27,8 +27,7 @@ Some SFTP commands don't work over S3: - `chown` and `chmod` will fail. If you want to silently ignore these method set `setstat_mode` to `1` or `2` in your configuration file - `truncate`, `symlink`, `readlink` are not supported - opening a file for both reading and writing at the same time is not supported -- resuming uploads is not supported -- upload mode `atomic` is ignored since S3 uploads are already atomic +- resuming uploads is tricky and disabled by default Other notes: diff --git a/go.mod b/go.mod index 23059480..6d4dcf83 100644 --- a/go.mod +++ b/go.mod @@ -10,10 +10,10 @@ require ( github.com/alexedwards/argon2id v1.0.0 github.com/amoghe/go-crypt v0.0.0-20220222110647-20eada5f5964 github.com/aws/aws-sdk-go-v2 v1.21.2 - github.com/aws/aws-sdk-go-v2/config v1.19.0 + github.com/aws/aws-sdk-go-v2/config v1.19.1 github.com/aws/aws-sdk-go-v2/credentials v1.13.43 github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.13 - github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.91 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.92 github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.16.2 github.com/aws/aws-sdk-go-v2/service/s3 v1.40.2 github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.21.6 diff --git a/go.sum b/go.sum index acad1d3b..ecb0ad05 100644 --- a/go.sum +++ b/go.sum @@ -75,14 +75,14 @@ github.com/aws/aws-sdk-go-v2 v1.21.2 h1:+LXZ0sgo8quN9UOKXXzAWRT3FWd4NxeXWOZom9pE github.com/aws/aws-sdk-go-v2 v1.21.2/go.mod h1:ErQhvNuEMhJjweavOYhxVkn2RUx7kQXVATHrjKtxIpM= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.14 h1:Sc82v7tDQ/vdU1WtuSyzZ1I7y/68j//HJ6uozND1IDs= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.14/go.mod h1:9NCTOURS8OpxvoAVHq79LK81/zC78hfRWFn+aL0SPcY= -github.com/aws/aws-sdk-go-v2/config v1.19.0 h1:AdzDvwH6dWuVARCl3RTLGRc4Ogy+N7yLFxVxXe1ClQ0= -github.com/aws/aws-sdk-go-v2/config v1.19.0/go.mod h1:ZwDUgFnQgsazQTnWfeLWk5GjeqTQTL8lMkoE1UXzxdE= +github.com/aws/aws-sdk-go-v2/config v1.19.1 h1:oe3vqcGftyk40icfLymhhhNysAwk0NfiwkDi2GTPMXs= +github.com/aws/aws-sdk-go-v2/config v1.19.1/go.mod h1:ZwDUgFnQgsazQTnWfeLWk5GjeqTQTL8lMkoE1UXzxdE= github.com/aws/aws-sdk-go-v2/credentials v1.13.43 h1:LU8vo40zBlo3R7bAvBVy/ku4nxGEyZe9N8MqAeFTzF8= github.com/aws/aws-sdk-go-v2/credentials v1.13.43/go.mod h1:zWJBz1Yf1ZtX5NGax9ZdNjhhI4rgjfgsyk6vTY1yfVg= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.13 h1:PIktER+hwIG286DqXyvVENjgLTAwGgoeriLDD5C+YlQ= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.13/go.mod h1:f/Ib/qYjhV2/qdsf79H3QP/eRE4AkVyEf6sk7XfZ1tg= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.91 h1:haAyxKHwoE+y/TJt+qHcPQf1dCViyyGbWcKjjYUllTE= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.91/go.mod h1:ACQ6ta5YFlfSOz2c9A+EVYawLxFMZ0rI3Q0A0tGieKo= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.92 h1:nLA7dGFC6v4P6b+hzqt5GqIGmIuN+jTJzojfdOLXWFE= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.92/go.mod h1:h+ei9z19AhoN+Dac92DwkzfbJ4mFUea92xgl5pKSG0Q= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43 h1:nFBQlGtkbPzp/NjZLuFxRqmT91rLJkgvsEQs68h962Y= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43/go.mod h1:auo+PiyLl0n1l8A0e8RIeR8tOzYPfZZH/JNlrJ8igTQ= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37 h1:JRVhO25+r3ar2mKGP7E0LDl8K9/G36gjlqca5iQbaqc= diff --git a/internal/common/common.go b/internal/common/common.go index 98835f02..d8ec806b 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -111,9 +111,12 @@ const ( // Upload modes const ( - UploadModeStandard = iota - UploadModeAtomic - UploadModeAtomicWithResume + UploadModeStandard = 0 + UploadModeAtomic = 1 + UploadModeAtomicWithResume = 2 + UploadModeS3StoreOnError = 4 + UploadModeGCSStoreOnError = 8 + UploadModeAzureBlobStoreOnError = 16 ) func init() { @@ -231,6 +234,7 @@ func Initialize(c Configuration, isShared int) error { vfs.SetRenameMode(c.RenameMode) vfs.SetReadMetadataMode(c.Metadata.Read) vfs.SetResumeMaxSize(c.ResumeMaxSize) + vfs.SetUploadMode(c.UploadMode) dataprovider.SetAllowSelfConnections(c.AllowSelfConnections) transfersChecker = getTransfersChecker(isShared) return nil @@ -510,6 +514,9 @@ type Configuration struct { // 2 means atomic with resume support: as atomic but if there is an upload error the temporary // file is renamed to the requested path and not deleted, this way a client can reconnect and resume // the upload. + // 4 means files for S3 backend are stored even if a client-side upload error is detected. + // 8 means files for Google Cloud Storage backend are stored even if a client-side upload error is detected. + // 16 means files for Azure Blob backend are stored even if a client-side upload error is detected. UploadMode int `json:"upload_mode" mapstructure:"upload_mode"` // Actions to execute for SFTP file operations and SSH commands Actions ProtocolActions `json:"actions" mapstructure:"actions"` @@ -601,7 +608,7 @@ type Configuration struct { // IsAtomicUploadEnabled returns true if atomic upload is enabled func (c *Configuration) IsAtomicUploadEnabled() bool { - return c.UploadMode == UploadModeAtomic || c.UploadMode == UploadModeAtomicWithResume + return c.UploadMode&UploadModeAtomic != 0 || c.UploadMode&UploadModeAtomicWithResume != 0 } func (c *Configuration) initializeProxyProtocol() error { diff --git a/internal/common/connection.go b/internal/common/connection.go index 6b5aa2d5..76061738 100644 --- a/internal/common/connection.go +++ b/internal/common/connection.go @@ -1086,7 +1086,7 @@ func (c *BaseConnection) checkRecursiveRenameDirPermissions(fsSrc, fsDst vfs.Fs, if err != nil { return c.GetFsError(fsSrc, err) } - if walkedPath != sourcePath && vfs.HasImplicitAtomicUploads(fsSrc) && Config.RenameMode == 0 { + if walkedPath != sourcePath && !vfs.IsRenameAtomic(fsSrc) && Config.RenameMode == 0 { c.Log(logger.LevelInfo, "cannot rename non empty directory %q on this filesystem", virtualSourcePath) return c.GetOpUnsupportedError() } diff --git a/internal/common/transfer.go b/internal/common/transfer.go index 24ae1d11..f6a59b69 100644 --- a/internal/common/transfer.go +++ b/internal/common/transfer.go @@ -389,7 +389,7 @@ func (t *BaseTransfer) Close() error { t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %q, deletion error: %v", t.effectiveFsPath, err) } else if t.isAtomicUpload() { - if t.ErrTransfer == nil || Config.UploadMode == UploadModeAtomicWithResume { + if t.ErrTransfer == nil || Config.UploadMode&UploadModeAtomicWithResume != 0 { _, _, err = t.Fs.Rename(t.effectiveFsPath, t.fsPath) t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %q -> %q, error: %v", t.effectiveFsPath, t.fsPath, err) diff --git a/internal/config/config.go b/internal/config/config.go index d256fda8..fe3efedf 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -747,10 +747,6 @@ func LoadConfig(configDir, configFile string) error { return nil } -func isUploadModeValid() bool { - return globalConf.Common.UploadMode >= 0 && globalConf.Common.UploadMode <= 2 -} - func isProxyProtocolValid() bool { return globalConf.Common.ProxyProtocol >= 0 && globalConf.Common.ProxyProtocol <= 2 } @@ -775,13 +771,6 @@ func resetInvalidConfigs() { logger.Warn(logSender, "", "Non-fatal configuration error: %v", warn) logger.WarnToConsole("Non-fatal configuration error: %v", warn) } - if !isUploadModeValid() { - warn := fmt.Sprintf("invalid upload_mode 0, 1 and 2 are supported, configured: %v reset upload_mode to 0", - globalConf.Common.UploadMode) - globalConf.Common.UploadMode = 0 - logger.Warn(logSender, "", "Non-fatal configuration error: %v", warn) - logger.WarnToConsole("Non-fatal configuration error: %v", warn) - } if !isProxyProtocolValid() { warn := fmt.Sprintf("invalid proxy_protocol 0, 1 and 2 are supported, configured: %v reset proxy_protocol to 0", globalConf.Common.ProxyProtocol) diff --git a/internal/config/config_test.go b/internal/config/config_test.go index f7ee8a25..928de642 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -188,28 +188,6 @@ func TestEnabledSSHCommands(t *testing.T) { assert.NoError(t, err) } -func TestInvalidUploadMode(t *testing.T) { - reset() - - confName := tempConfigName + ".json" - configFilePath := filepath.Join(configDir, confName) - err := config.LoadConfig(configDir, "") - assert.NoError(t, err) - commonConf := config.GetCommonConfig() - commonConf.UploadMode = 10 - c := make(map[string]common.Configuration) - c["common"] = commonConf - jsonConf, err := json.Marshal(c) - assert.NoError(t, err) - err = os.WriteFile(configFilePath, jsonConf, os.ModePerm) - assert.NoError(t, err) - err = config.LoadConfig(configDir, confName) - assert.NoError(t, err) - assert.Equal(t, 0, config.GetCommonConfig().UploadMode) - err = os.Remove(configFilePath) - assert.NoError(t, err) -} - func TestInvalidExternalAuthScope(t *testing.T) { reset() diff --git a/internal/vfs/azblobfs.go b/internal/vfs/azblobfs.go index ffe348fc..67f33f63 100644 --- a/internal/vfs/azblobfs.go +++ b/internal/vfs/azblobfs.go @@ -289,6 +289,9 @@ func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, PipeWriter, } } + if uploadMode&16 != 0 { + return nil, p, nil, nil + } return nil, p, cancelFn, nil } diff --git a/internal/vfs/gcsfs.go b/internal/vfs/gcsfs.go index 281b47e2..02456b9c 100644 --- a/internal/vfs/gcsfs.go +++ b/internal/vfs/gcsfs.go @@ -225,13 +225,7 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func() if fs.config.UploadPartMaxTime > 0 { objectWriter.ChunkRetryDeadline = time.Duration(fs.config.UploadPartMaxTime) * time.Second } - var contentType string - if flag == -1 { - contentType = dirMimeType - } else { - contentType = mime.TypeByExtension(path.Ext(name)) - } - fs.setWriterAttrs(objectWriter, contentType) + fs.setWriterAttrs(objectWriter, flag, name) go func() { defer cancelFn() @@ -253,6 +247,9 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func() metric.GCSTransferCompleted(n, 0, err) }() + if uploadMode&8 != 0 { + return nil, p, nil, nil + } return nil, p, cancelFn, nil } @@ -779,7 +776,13 @@ func (fs *GCSFs) getObjectStat(name string) (os.FileInfo, error) { return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, attrs.Size, attrs.Updated, false)) } -func (fs *GCSFs) setWriterAttrs(objectWriter *storage.Writer, contentType string) { +func (fs *GCSFs) setWriterAttrs(objectWriter *storage.Writer, flag int, name string) { + var contentType string + if flag == -1 { + contentType = dirMimeType + } else { + contentType = mime.TypeByExtension(path.Ext(name)) + } if contentType != "" { objectWriter.ObjectAttrs.ContentType = contentType } diff --git a/internal/vfs/s3fs.go b/internal/vfs/s3fs.go index c59b6cbd..133e7a97 100644 --- a/internal/vfs/s3fs.go +++ b/internal/vfs/s3fs.go @@ -313,6 +313,9 @@ func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), } } + if uploadMode&4 != 0 { + return nil, p, nil, nil + } return nil, p, cancelFn, nil } diff --git a/internal/vfs/vfs.go b/internal/vfs/vfs.go index 313da055..217e08e4 100644 --- a/internal/vfs/vfs.go +++ b/internal/vfs/vfs.go @@ -65,6 +65,7 @@ var ( renameMode int readMetadata int resumeMaxSize int64 + uploadMode int ) // SetAllowSelfConnections sets the desired behaviour for self connections @@ -103,6 +104,11 @@ func SetResumeMaxSize(val int64) { resumeMaxSize = val } +// SetUploadMode sets the upload mode +func SetUploadMode(val int) { + uploadMode = val +} + // Fs defines the interface for filesystem backends type Fs interface { Name() string @@ -896,16 +902,30 @@ func HasTruncateSupport(fs Fs) bool { return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs) } +// IsRenameAtomic returns true if renaming a directory is supposed to be atomic +func IsRenameAtomic(fs Fs) bool { + if strings.HasPrefix(fs.Name(), s3fsName) { + return false + } + if strings.HasPrefix(fs.Name(), gcsfsName) { + return false + } + if strings.HasPrefix(fs.Name(), azBlobFsName) { + return false + } + return true +} + // HasImplicitAtomicUploads returns true if the fs don't persists partial files on error func HasImplicitAtomicUploads(fs Fs) bool { if strings.HasPrefix(fs.Name(), s3fsName) { - return true + return uploadMode&4 == 0 } if strings.HasPrefix(fs.Name(), gcsfsName) { - return true + return uploadMode&8 == 0 } if strings.HasPrefix(fs.Name(), azBlobFsName) { - return true + return uploadMode&16 == 0 } return false }