add more upload modes

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino 2023-10-25 19:05:37 +02:00
parent 90bce505c4
commit 691e44c1dc
No known key found for this signature in database
GPG key ID: 935D2952DEC4EECF
13 changed files with 61 additions and 59 deletions

View file

@ -62,7 +62,7 @@ The configuration file contains the following sections:
- **"common"**, configuration parameters shared among all the supported protocols - **"common"**, configuration parameters shared among all the supported protocols
- `idle_timeout`, integer. Time in minutes after which an idle client will be disconnected. 0 means disabled. Default: 15 - `idle_timeout`, integer. Time in minutes after which an idle client will be disconnected. 0 means disabled. Default: 15
- `upload_mode` integer. 0 means standard: the files are uploaded directly to the requested path. 1 means atomic: files are uploaded to a temporary path and renamed to the requested path when the client ends the upload. Atomic mode avoids problems such as a web server that serves partial files when the files are being uploaded. In atomic mode, if there is an upload error, the temporary file is deleted and so the requested upload path will not contain a partial file. 2 means atomic with resume support: same as atomic but if there is an upload error, the temporary file is renamed to the requested path and not deleted. This way, a client can reconnect and resume the upload. Ignored for cloud-based storage backends (uploads are always atomic and upload resume is not supported, by default, for these backends) and for SFTP backend if buffering is enabled. Default: `0` - `upload_mode` integer. `0` means standard: the files are uploaded directly to the requested path. `1` means atomic: files are uploaded to a temporary path and renamed to the requested path when the client ends the upload. Atomic mode avoids problems such as a web server that serves partial files when the files are being uploaded. In atomic mode, if there is an upload error, the temporary file is deleted and so the requested upload path will not contain a partial file. `2` means atomic with resume support: same as atomic but if there is an upload error, the temporary file is renamed to the requested path and not deleted. This way, a client can reconnect and resume the upload. `4` means files for S3 backend are stored even if a client-side upload error is detected. `8` means files for Google Cloud Storage backend are stored even if a client-side upload error is detected. `16` means files for Azure Blob backend are stored even if a client-side upload error is detected. Ignored for SFTP backend if buffering is enabled. The flags can be combined, if you provide both `1` and `2`, `2` will be used. Default: `0`
- `actions`, struct. It contains the command to execute and/or the HTTP URL to notify and the trigger conditions. See [Custom Actions](./custom-actions.md) for more details - `actions`, struct. It contains the command to execute and/or the HTTP URL to notify and the trigger conditions. See [Custom Actions](./custom-actions.md) for more details
- `execute_on`, list of strings. Valid values are `pre-download`, `download`, `first-download`, `pre-upload`, `upload`, `first-upload`, `pre-delete`, `delete`, `rename`, `mkdir`, `rmdir`, `ssh_cmd`, `copy`. Leave empty to disable actions. - `execute_on`, list of strings. Valid values are `pre-download`, `download`, `first-download`, `pre-upload`, `upload`, `first-upload`, `pre-delete`, `delete`, `rename`, `mkdir`, `rmdir`, `ssh_cmd`, `copy`. Leave empty to disable actions.
- `execute_sync`, list of strings. Actions, defined in the `execute_on` list above, to be performed synchronously. The `pre-*` actions are always executed synchronously while the other ones are asynchronous. Executing an action synchronously means that SFTPGo will not return a result code to the client (which is waiting for it) until your hook have completed its execution. Leave empty to execute only the defined `pre-*` hook synchronously - `execute_sync`, list of strings. Actions, defined in the `execute_on` list above, to be performed synchronously. The `pre-*` actions are always executed synchronously while the other ones are asynchronous. Executing an action synchronously means that SFTPGo will not return a result code to the client (which is waiting for it) until your hook have completed its execution. Leave empty to execute only the defined `pre-*` hook synchronously

View file

@ -27,8 +27,7 @@ Some SFTP commands don't work over S3:
- `chown` and `chmod` will fail. If you want to silently ignore these method set `setstat_mode` to `1` or `2` in your configuration file - `chown` and `chmod` will fail. If you want to silently ignore these method set `setstat_mode` to `1` or `2` in your configuration file
- `truncate`, `symlink`, `readlink` are not supported - `truncate`, `symlink`, `readlink` are not supported
- opening a file for both reading and writing at the same time is not supported - opening a file for both reading and writing at the same time is not supported
- resuming uploads is not supported - resuming uploads is tricky and disabled by default
- upload mode `atomic` is ignored since S3 uploads are already atomic
Other notes: Other notes:

4
go.mod
View file

@ -10,10 +10,10 @@ require (
github.com/alexedwards/argon2id v1.0.0 github.com/alexedwards/argon2id v1.0.0
github.com/amoghe/go-crypt v0.0.0-20220222110647-20eada5f5964 github.com/amoghe/go-crypt v0.0.0-20220222110647-20eada5f5964
github.com/aws/aws-sdk-go-v2 v1.21.2 github.com/aws/aws-sdk-go-v2 v1.21.2
github.com/aws/aws-sdk-go-v2/config v1.19.0 github.com/aws/aws-sdk-go-v2/config v1.19.1
github.com/aws/aws-sdk-go-v2/credentials v1.13.43 github.com/aws/aws-sdk-go-v2/credentials v1.13.43
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.13 github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.13
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.91 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.92
github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.16.2 github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.16.2
github.com/aws/aws-sdk-go-v2/service/s3 v1.40.2 github.com/aws/aws-sdk-go-v2/service/s3 v1.40.2
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.21.6 github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.21.6

8
go.sum
View file

@ -75,14 +75,14 @@ github.com/aws/aws-sdk-go-v2 v1.21.2 h1:+LXZ0sgo8quN9UOKXXzAWRT3FWd4NxeXWOZom9pE
github.com/aws/aws-sdk-go-v2 v1.21.2/go.mod h1:ErQhvNuEMhJjweavOYhxVkn2RUx7kQXVATHrjKtxIpM= github.com/aws/aws-sdk-go-v2 v1.21.2/go.mod h1:ErQhvNuEMhJjweavOYhxVkn2RUx7kQXVATHrjKtxIpM=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.14 h1:Sc82v7tDQ/vdU1WtuSyzZ1I7y/68j//HJ6uozND1IDs= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.14 h1:Sc82v7tDQ/vdU1WtuSyzZ1I7y/68j//HJ6uozND1IDs=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.14/go.mod h1:9NCTOURS8OpxvoAVHq79LK81/zC78hfRWFn+aL0SPcY= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.14/go.mod h1:9NCTOURS8OpxvoAVHq79LK81/zC78hfRWFn+aL0SPcY=
github.com/aws/aws-sdk-go-v2/config v1.19.0 h1:AdzDvwH6dWuVARCl3RTLGRc4Ogy+N7yLFxVxXe1ClQ0= github.com/aws/aws-sdk-go-v2/config v1.19.1 h1:oe3vqcGftyk40icfLymhhhNysAwk0NfiwkDi2GTPMXs=
github.com/aws/aws-sdk-go-v2/config v1.19.0/go.mod h1:ZwDUgFnQgsazQTnWfeLWk5GjeqTQTL8lMkoE1UXzxdE= github.com/aws/aws-sdk-go-v2/config v1.19.1/go.mod h1:ZwDUgFnQgsazQTnWfeLWk5GjeqTQTL8lMkoE1UXzxdE=
github.com/aws/aws-sdk-go-v2/credentials v1.13.43 h1:LU8vo40zBlo3R7bAvBVy/ku4nxGEyZe9N8MqAeFTzF8= github.com/aws/aws-sdk-go-v2/credentials v1.13.43 h1:LU8vo40zBlo3R7bAvBVy/ku4nxGEyZe9N8MqAeFTzF8=
github.com/aws/aws-sdk-go-v2/credentials v1.13.43/go.mod h1:zWJBz1Yf1ZtX5NGax9ZdNjhhI4rgjfgsyk6vTY1yfVg= github.com/aws/aws-sdk-go-v2/credentials v1.13.43/go.mod h1:zWJBz1Yf1ZtX5NGax9ZdNjhhI4rgjfgsyk6vTY1yfVg=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.13 h1:PIktER+hwIG286DqXyvVENjgLTAwGgoeriLDD5C+YlQ= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.13 h1:PIktER+hwIG286DqXyvVENjgLTAwGgoeriLDD5C+YlQ=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.13/go.mod h1:f/Ib/qYjhV2/qdsf79H3QP/eRE4AkVyEf6sk7XfZ1tg= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.13/go.mod h1:f/Ib/qYjhV2/qdsf79H3QP/eRE4AkVyEf6sk7XfZ1tg=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.91 h1:haAyxKHwoE+y/TJt+qHcPQf1dCViyyGbWcKjjYUllTE= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.92 h1:nLA7dGFC6v4P6b+hzqt5GqIGmIuN+jTJzojfdOLXWFE=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.91/go.mod h1:ACQ6ta5YFlfSOz2c9A+EVYawLxFMZ0rI3Q0A0tGieKo= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.92/go.mod h1:h+ei9z19AhoN+Dac92DwkzfbJ4mFUea92xgl5pKSG0Q=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43 h1:nFBQlGtkbPzp/NjZLuFxRqmT91rLJkgvsEQs68h962Y= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43 h1:nFBQlGtkbPzp/NjZLuFxRqmT91rLJkgvsEQs68h962Y=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43/go.mod h1:auo+PiyLl0n1l8A0e8RIeR8tOzYPfZZH/JNlrJ8igTQ= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43/go.mod h1:auo+PiyLl0n1l8A0e8RIeR8tOzYPfZZH/JNlrJ8igTQ=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37 h1:JRVhO25+r3ar2mKGP7E0LDl8K9/G36gjlqca5iQbaqc= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37 h1:JRVhO25+r3ar2mKGP7E0LDl8K9/G36gjlqca5iQbaqc=

View file

@ -111,9 +111,12 @@ const (
// Upload modes // Upload modes
const ( const (
UploadModeStandard = iota UploadModeStandard = 0
UploadModeAtomic UploadModeAtomic = 1
UploadModeAtomicWithResume UploadModeAtomicWithResume = 2
UploadModeS3StoreOnError = 4
UploadModeGCSStoreOnError = 8
UploadModeAzureBlobStoreOnError = 16
) )
func init() { func init() {
@ -231,6 +234,7 @@ func Initialize(c Configuration, isShared int) error {
vfs.SetRenameMode(c.RenameMode) vfs.SetRenameMode(c.RenameMode)
vfs.SetReadMetadataMode(c.Metadata.Read) vfs.SetReadMetadataMode(c.Metadata.Read)
vfs.SetResumeMaxSize(c.ResumeMaxSize) vfs.SetResumeMaxSize(c.ResumeMaxSize)
vfs.SetUploadMode(c.UploadMode)
dataprovider.SetAllowSelfConnections(c.AllowSelfConnections) dataprovider.SetAllowSelfConnections(c.AllowSelfConnections)
transfersChecker = getTransfersChecker(isShared) transfersChecker = getTransfersChecker(isShared)
return nil return nil
@ -510,6 +514,9 @@ type Configuration struct {
// 2 means atomic with resume support: as atomic but if there is an upload error the temporary // 2 means atomic with resume support: as atomic but if there is an upload error the temporary
// file is renamed to the requested path and not deleted, this way a client can reconnect and resume // file is renamed to the requested path and not deleted, this way a client can reconnect and resume
// the upload. // the upload.
// 4 means files for S3 backend are stored even if a client-side upload error is detected.
// 8 means files for Google Cloud Storage backend are stored even if a client-side upload error is detected.
// 16 means files for Azure Blob backend are stored even if a client-side upload error is detected.
UploadMode int `json:"upload_mode" mapstructure:"upload_mode"` UploadMode int `json:"upload_mode" mapstructure:"upload_mode"`
// Actions to execute for SFTP file operations and SSH commands // Actions to execute for SFTP file operations and SSH commands
Actions ProtocolActions `json:"actions" mapstructure:"actions"` Actions ProtocolActions `json:"actions" mapstructure:"actions"`
@ -601,7 +608,7 @@ type Configuration struct {
// IsAtomicUploadEnabled returns true if atomic upload is enabled // IsAtomicUploadEnabled returns true if atomic upload is enabled
func (c *Configuration) IsAtomicUploadEnabled() bool { func (c *Configuration) IsAtomicUploadEnabled() bool {
return c.UploadMode == UploadModeAtomic || c.UploadMode == UploadModeAtomicWithResume return c.UploadMode&UploadModeAtomic != 0 || c.UploadMode&UploadModeAtomicWithResume != 0
} }
func (c *Configuration) initializeProxyProtocol() error { func (c *Configuration) initializeProxyProtocol() error {

View file

@ -1086,7 +1086,7 @@ func (c *BaseConnection) checkRecursiveRenameDirPermissions(fsSrc, fsDst vfs.Fs,
if err != nil { if err != nil {
return c.GetFsError(fsSrc, err) return c.GetFsError(fsSrc, err)
} }
if walkedPath != sourcePath && vfs.HasImplicitAtomicUploads(fsSrc) && Config.RenameMode == 0 { if walkedPath != sourcePath && !vfs.IsRenameAtomic(fsSrc) && Config.RenameMode == 0 {
c.Log(logger.LevelInfo, "cannot rename non empty directory %q on this filesystem", virtualSourcePath) c.Log(logger.LevelInfo, "cannot rename non empty directory %q on this filesystem", virtualSourcePath)
return c.GetOpUnsupportedError() return c.GetOpUnsupportedError()
} }

View file

@ -389,7 +389,7 @@ func (t *BaseTransfer) Close() error {
t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %q, deletion error: %v", t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %q, deletion error: %v",
t.effectiveFsPath, err) t.effectiveFsPath, err)
} else if t.isAtomicUpload() { } else if t.isAtomicUpload() {
if t.ErrTransfer == nil || Config.UploadMode == UploadModeAtomicWithResume { if t.ErrTransfer == nil || Config.UploadMode&UploadModeAtomicWithResume != 0 {
_, _, err = t.Fs.Rename(t.effectiveFsPath, t.fsPath) _, _, err = t.Fs.Rename(t.effectiveFsPath, t.fsPath)
t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %q -> %q, error: %v", t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %q -> %q, error: %v",
t.effectiveFsPath, t.fsPath, err) t.effectiveFsPath, t.fsPath, err)

View file

@ -747,10 +747,6 @@ func LoadConfig(configDir, configFile string) error {
return nil return nil
} }
func isUploadModeValid() bool {
return globalConf.Common.UploadMode >= 0 && globalConf.Common.UploadMode <= 2
}
func isProxyProtocolValid() bool { func isProxyProtocolValid() bool {
return globalConf.Common.ProxyProtocol >= 0 && globalConf.Common.ProxyProtocol <= 2 return globalConf.Common.ProxyProtocol >= 0 && globalConf.Common.ProxyProtocol <= 2
} }
@ -775,13 +771,6 @@ func resetInvalidConfigs() {
logger.Warn(logSender, "", "Non-fatal configuration error: %v", warn) logger.Warn(logSender, "", "Non-fatal configuration error: %v", warn)
logger.WarnToConsole("Non-fatal configuration error: %v", warn) logger.WarnToConsole("Non-fatal configuration error: %v", warn)
} }
if !isUploadModeValid() {
warn := fmt.Sprintf("invalid upload_mode 0, 1 and 2 are supported, configured: %v reset upload_mode to 0",
globalConf.Common.UploadMode)
globalConf.Common.UploadMode = 0
logger.Warn(logSender, "", "Non-fatal configuration error: %v", warn)
logger.WarnToConsole("Non-fatal configuration error: %v", warn)
}
if !isProxyProtocolValid() { if !isProxyProtocolValid() {
warn := fmt.Sprintf("invalid proxy_protocol 0, 1 and 2 are supported, configured: %v reset proxy_protocol to 0", warn := fmt.Sprintf("invalid proxy_protocol 0, 1 and 2 are supported, configured: %v reset proxy_protocol to 0",
globalConf.Common.ProxyProtocol) globalConf.Common.ProxyProtocol)

View file

@ -188,28 +188,6 @@ func TestEnabledSSHCommands(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
} }
func TestInvalidUploadMode(t *testing.T) {
reset()
confName := tempConfigName + ".json"
configFilePath := filepath.Join(configDir, confName)
err := config.LoadConfig(configDir, "")
assert.NoError(t, err)
commonConf := config.GetCommonConfig()
commonConf.UploadMode = 10
c := make(map[string]common.Configuration)
c["common"] = commonConf
jsonConf, err := json.Marshal(c)
assert.NoError(t, err)
err = os.WriteFile(configFilePath, jsonConf, os.ModePerm)
assert.NoError(t, err)
err = config.LoadConfig(configDir, confName)
assert.NoError(t, err)
assert.Equal(t, 0, config.GetCommonConfig().UploadMode)
err = os.Remove(configFilePath)
assert.NoError(t, err)
}
func TestInvalidExternalAuthScope(t *testing.T) { func TestInvalidExternalAuthScope(t *testing.T) {
reset() reset()

View file

@ -289,6 +289,9 @@ func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, PipeWriter,
} }
} }
if uploadMode&16 != 0 {
return nil, p, nil, nil
}
return nil, p, cancelFn, nil return nil, p, cancelFn, nil
} }

View file

@ -225,13 +225,7 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func()
if fs.config.UploadPartMaxTime > 0 { if fs.config.UploadPartMaxTime > 0 {
objectWriter.ChunkRetryDeadline = time.Duration(fs.config.UploadPartMaxTime) * time.Second objectWriter.ChunkRetryDeadline = time.Duration(fs.config.UploadPartMaxTime) * time.Second
} }
var contentType string fs.setWriterAttrs(objectWriter, flag, name)
if flag == -1 {
contentType = dirMimeType
} else {
contentType = mime.TypeByExtension(path.Ext(name))
}
fs.setWriterAttrs(objectWriter, contentType)
go func() { go func() {
defer cancelFn() defer cancelFn()
@ -253,6 +247,9 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func()
metric.GCSTransferCompleted(n, 0, err) metric.GCSTransferCompleted(n, 0, err)
}() }()
if uploadMode&8 != 0 {
return nil, p, nil, nil
}
return nil, p, cancelFn, nil return nil, p, cancelFn, nil
} }
@ -779,7 +776,13 @@ func (fs *GCSFs) getObjectStat(name string) (os.FileInfo, error) {
return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, attrs.Size, attrs.Updated, false)) return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, attrs.Size, attrs.Updated, false))
} }
func (fs *GCSFs) setWriterAttrs(objectWriter *storage.Writer, contentType string) { func (fs *GCSFs) setWriterAttrs(objectWriter *storage.Writer, flag int, name string) {
var contentType string
if flag == -1 {
contentType = dirMimeType
} else {
contentType = mime.TypeByExtension(path.Ext(name))
}
if contentType != "" { if contentType != "" {
objectWriter.ObjectAttrs.ContentType = contentType objectWriter.ObjectAttrs.ContentType = contentType
} }

View file

@ -313,6 +313,9 @@ func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(),
} }
} }
if uploadMode&4 != 0 {
return nil, p, nil, nil
}
return nil, p, cancelFn, nil return nil, p, cancelFn, nil
} }

View file

@ -65,6 +65,7 @@ var (
renameMode int renameMode int
readMetadata int readMetadata int
resumeMaxSize int64 resumeMaxSize int64
uploadMode int
) )
// SetAllowSelfConnections sets the desired behaviour for self connections // SetAllowSelfConnections sets the desired behaviour for self connections
@ -103,6 +104,11 @@ func SetResumeMaxSize(val int64) {
resumeMaxSize = val resumeMaxSize = val
} }
// SetUploadMode sets the upload mode
func SetUploadMode(val int) {
uploadMode = val
}
// Fs defines the interface for filesystem backends // Fs defines the interface for filesystem backends
type Fs interface { type Fs interface {
Name() string Name() string
@ -896,16 +902,30 @@ func HasTruncateSupport(fs Fs) bool {
return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs) return IsLocalOsFs(fs) || IsSFTPFs(fs) || IsHTTPFs(fs)
} }
// IsRenameAtomic returns true if renaming a directory is supposed to be atomic
func IsRenameAtomic(fs Fs) bool {
if strings.HasPrefix(fs.Name(), s3fsName) {
return false
}
if strings.HasPrefix(fs.Name(), gcsfsName) {
return false
}
if strings.HasPrefix(fs.Name(), azBlobFsName) {
return false
}
return true
}
// HasImplicitAtomicUploads returns true if the fs don't persists partial files on error // HasImplicitAtomicUploads returns true if the fs don't persists partial files on error
func HasImplicitAtomicUploads(fs Fs) bool { func HasImplicitAtomicUploads(fs Fs) bool {
if strings.HasPrefix(fs.Name(), s3fsName) { if strings.HasPrefix(fs.Name(), s3fsName) {
return true return uploadMode&4 == 0
} }
if strings.HasPrefix(fs.Name(), gcsfsName) { if strings.HasPrefix(fs.Name(), gcsfsName) {
return true return uploadMode&8 == 0
} }
if strings.HasPrefix(fs.Name(), azBlobFsName) { if strings.HasPrefix(fs.Name(), azBlobFsName) {
return true return uploadMode&16 == 0
} }
return false return false
} }