atomic upload mode: remove temporary file on error

If a transfer error is detected, then the partial temporary file will
be removed and not renamed to requested path
This commit is contained in:
Nicola Murino 2019-09-10 18:47:21 +02:00
parent 7010f513e3
commit c1effdf701
5 changed files with 107 additions and 18 deletions

View file

@ -118,9 +118,9 @@ The `sftpgo` configuration file contains the following sections:
- `max_auth_tries` integer. Maximum number of authentication attempts permitted per connection. If set to a negative number, the number of attempts are unlimited. If set to zero, the number of attempts are limited to 6. - `max_auth_tries` integer. Maximum number of authentication attempts permitted per connection. If set to a negative number, the number of attempts are unlimited. If set to zero, the number of attempts are limited to 6.
- `umask`, string. Umask for the new files and directories. This setting has no effect on Windows. Default: "0022" - `umask`, string. Umask for the new files and directories. This setting has no effect on Windows. Default: "0022"
- `banner`, string. Identification string used by the server. Default "SFTPGo" - `banner`, string. Identification string used by the server. Default "SFTPGo"
- `upload_mode` integer. 0 means standard, the files are uploaded directly to the requested path. 1 means atomic: files are uploaded to a temporary path and renamed to the requested path when the client ends the upload. Atomic mode avoids problems such as a web server that serves partial files when the files are being uploaded - `upload_mode` integer. 0 means standard, the files are uploaded directly to the requested path. 1 means atomic: files are uploaded to a temporary path and renamed to the requested path when the client ends the upload. Atomic mode avoids problems such as a web server that serves partial files when the files are being uploaded. In atomic mode if there is an upload error the temporary file is deleted and so the requested upload path will not contain a partial file.
- `actions`, struct. It contains the command to execute and/or the HTTP URL to notify and the trigger conditions - `actions`, struct. It contains the command to execute and/or the HTTP URL to notify and the trigger conditions
- `execute_on`, list of strings. Valid values are `download`, `upload`, `delete`, `rename`. On folder deletion a `delete` notification will be sent for each deleted file. Leave empty to disable actions. - `execute_on`, list of strings. Valid values are `download`, `upload`, `delete`, `rename`. On folder deletion a `delete` notification will be sent for each deleted file. Actions will be not executed if an error is detected and so a partial file is uploaded or downloaded. Leave empty to disable actions.
- `command`, string. Absolute path to the command to execute. Leave empty to disable. The command is invoked with the following arguments: - `command`, string. Absolute path to the command to execute. Leave empty to disable. The command is invoked with the following arguments:
- `action`, any valid `execute_on` string - `action`, any valid `execute_on` string
- `username`, user who did the action - `username`, user who did the action

View file

@ -674,3 +674,44 @@ func TestSCPUploadFiledata(t *testing.T) {
} }
os.Remove(testfile) os.Remove(testfile)
} }
func TestUploadError(t *testing.T) {
connection := Connection{
User: dataprovider.User{
Username: "testuser",
},
protocol: protocolSCP,
}
testfile := "testfile"
fileTempName := "temptestfile"
file, _ := os.Create(fileTempName)
transfer := Transfer{
file: file,
path: testfile,
start: time.Now(),
bytesSent: 0,
bytesReceived: 100,
user: connection.User,
connectionID: "",
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: true,
protocol: connection.protocol,
transferError: nil,
isFinished: false,
}
addTransfer(&transfer)
transfer.TransferError(fmt.Errorf("fake error"))
transfer.Close()
if transfer.bytesReceived > 0 {
t.Errorf("byte sent should be 0 for a failed transfer: %v", transfer.bytesSent)
}
_, err := os.Stat(testfile)
if !os.IsNotExist(err) {
t.Errorf("file uploaded must be deleted after an error: %v", err)
}
_, err = os.Stat(fileTempName)
if !os.IsNotExist(err) {
t.Errorf("file uploaded must be deleted after an error: %v", err)
}
}

View file

@ -232,7 +232,7 @@ func (c Configuration) AcceptInboundConnection(conn net.Conn, config *ssh.Server
lock: new(sync.Mutex), lock: new(sync.Mutex),
sshConn: sconn, sshConn: sconn,
} }
connection.Log(logger.LevelInfo, logSender, "User id: %d, logged in with: %#v, name: %#v, home_dir: %#v", connection.Log(logger.LevelInfo, logSender, "User id: %d, logged in with: %#v, username: %#v, home_dir: %#v",
user.ID, loginType, user.Username, user.HomeDir) user.ID, loginType, user.Username, user.HomeDir)
go ssh.DiscardRequests(reqs) go ssh.DiscardRequests(reqs)
@ -270,7 +270,7 @@ func (c Configuration) AcceptInboundConnection(conn net.Conn, config *ssh.Server
var msg execMsg var msg execMsg
if err := ssh.Unmarshal(req.Payload, &msg); err == nil { if err := ssh.Unmarshal(req.Payload, &msg); err == nil {
name, scpArgs, err := parseCommandPayload(msg.Command) name, scpArgs, err := parseCommandPayload(msg.Command)
connection.Log(logger.LevelDebug, logSender, "new exec command: %v args: %v user: %v, error: %v", connection.Log(logger.LevelDebug, logSender, "new exec command: %#v args: %v user: %v, error: %v",
name, scpArgs, name, scpArgs,
connection.User.Username, err) connection.User.Username, err)
if err == nil && name == "scp" && len(scpArgs) >= 2 { if err == nil && name == "scp" && len(scpArgs) >= 2 {

View file

@ -252,6 +252,7 @@ func TestBasicSFTPHandling(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestDirCommands(t *testing.T) { func TestDirCommands(t *testing.T) {
@ -321,6 +322,7 @@ func TestDirCommands(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestSymlink(t *testing.T) { func TestSymlink(t *testing.T) {
@ -370,6 +372,7 @@ func TestSymlink(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestStat(t *testing.T) { func TestStat(t *testing.T) {
@ -426,6 +429,7 @@ func TestStat(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
// basic tests to verify virtual chroot, should be improved to cover more cases ... // basic tests to verify virtual chroot, should be improved to cover more cases ...
@ -498,6 +502,7 @@ func TestEscapeHomeDir(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestHomeSpecialChars(t *testing.T) { func TestHomeSpecialChars(t *testing.T) {
@ -544,6 +549,7 @@ func TestHomeSpecialChars(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestLogin(t *testing.T) { func TestLogin(t *testing.T) {
@ -612,6 +618,7 @@ func TestLogin(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestLoginAfterUserUpdateEmptyPwd(t *testing.T) { func TestLoginAfterUserUpdateEmptyPwd(t *testing.T) {
@ -645,6 +652,7 @@ func TestLoginAfterUserUpdateEmptyPwd(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestLoginAfterUserUpdateEmptyPubKey(t *testing.T) { func TestLoginAfterUserUpdateEmptyPubKey(t *testing.T) {
@ -678,6 +686,7 @@ func TestLoginAfterUserUpdateEmptyPubKey(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestMaxSessions(t *testing.T) { func TestMaxSessions(t *testing.T) {
@ -710,6 +719,7 @@ func TestMaxSessions(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestQuotaFileReplace(t *testing.T) { func TestQuotaFileReplace(t *testing.T) {
@ -781,6 +791,7 @@ func TestQuotaFileReplace(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestQuotaScan(t *testing.T) { func TestQuotaScan(t *testing.T) {
@ -846,6 +857,7 @@ func TestQuotaScan(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestMultipleQuotaScans(t *testing.T) { func TestMultipleQuotaScans(t *testing.T) {
@ -896,6 +908,7 @@ func TestQuotaSize(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestBandwidthAndConnections(t *testing.T) { func TestBandwidthAndConnections(t *testing.T) {
@ -967,6 +980,7 @@ func TestBandwidthAndConnections(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestMissingFile(t *testing.T) { func TestMissingFile(t *testing.T) {
@ -991,6 +1005,7 @@ func TestMissingFile(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestOverwriteDirWithFile(t *testing.T) { func TestOverwriteDirWithFile(t *testing.T) {
@ -1042,6 +1057,7 @@ func TestOverwriteDirWithFile(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestPasswordsHashPbkdf2Sha1(t *testing.T) { func TestPasswordsHashPbkdf2Sha1(t *testing.T) {
@ -1074,6 +1090,7 @@ func TestPasswordsHashPbkdf2Sha1(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestPasswordsHashPbkdf2Sha256(t *testing.T) { func TestPasswordsHashPbkdf2Sha256(t *testing.T) {
@ -1106,6 +1123,7 @@ func TestPasswordsHashPbkdf2Sha256(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestPasswordsHashPbkdf2Sha512(t *testing.T) { func TestPasswordsHashPbkdf2Sha512(t *testing.T) {
@ -1138,6 +1156,7 @@ func TestPasswordsHashPbkdf2Sha512(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestPasswordsHashBcrypt(t *testing.T) { func TestPasswordsHashBcrypt(t *testing.T) {
@ -1170,6 +1189,7 @@ func TestPasswordsHashBcrypt(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestPermList(t *testing.T) { func TestPermList(t *testing.T) {
@ -1199,6 +1219,7 @@ func TestPermList(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestPermDownload(t *testing.T) { func TestPermDownload(t *testing.T) {
@ -1240,6 +1261,7 @@ func TestPermDownload(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestPermUpload(t *testing.T) { func TestPermUpload(t *testing.T) {
@ -1272,6 +1294,7 @@ func TestPermUpload(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestPermDelete(t *testing.T) { func TestPermDelete(t *testing.T) {
@ -1308,6 +1331,7 @@ func TestPermDelete(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestPermRename(t *testing.T) { func TestPermRename(t *testing.T) {
@ -1348,6 +1372,7 @@ func TestPermRename(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestPermCreateDirs(t *testing.T) { func TestPermCreateDirs(t *testing.T) {
@ -1384,6 +1409,7 @@ func TestPermCreateDirs(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestPermSymlink(t *testing.T) { func TestPermSymlink(t *testing.T) {
@ -1424,6 +1450,7 @@ func TestPermSymlink(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("unable to remove user: %v", err) t.Errorf("unable to remove user: %v", err)
} }
os.RemoveAll(user.GetHomeDir())
} }
func TestSSHConnection(t *testing.T) { func TestSSHConnection(t *testing.T) {
@ -2038,7 +2065,7 @@ func TestSCPErrors(t *testing.T) {
// it is need to reach all the code in CheckIdleConnections // it is need to reach all the code in CheckIdleConnections
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
cmd.Process.Kill() cmd.Process.Kill()
waitForNoActiveTransfer()
cmd = getScpUploadCommand(testFilePath, remoteUpPath, false, false) cmd = getScpUploadCommand(testFilePath, remoteUpPath, false, false)
go func() { go func() {
if cmd.Run() == nil { if cmd.Run() == nil {
@ -2050,6 +2077,7 @@ func TestSCPErrors(t *testing.T) {
// it is need to reach all the code in CheckIdleConnections // it is need to reach all the code in CheckIdleConnections
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
cmd.Process.Kill() cmd.Process.Kill()
waitForNoActiveTransfer()
err = os.Remove(testFilePath) err = os.Remove(testFilePath)
if err != nil { if err != nil {
t.Errorf("error removing test file") t.Errorf("error removing test file")
@ -2297,6 +2325,12 @@ func getScpUploadCommand(localPath, remotePath string, preserveTime, remoteToRem
return exec.Command(scpPath, args...) return exec.Command(scpPath, args...)
} }
func waitForNoActiveTransfer() {
for len(sftpd.GetConnectionsStats()) > 0 {
time.Sleep(100 * time.Millisecond)
}
}
func waitForActiveTransfer() { func waitForActiveTransfer() {
stats := sftpd.GetConnectionsStats() stats := sftpd.GetConnectionsStats()
for len(stats) < 1 { for len(stats) < 1 {

View file

@ -40,8 +40,9 @@ type Transfer struct {
// For example network or client issues // For example network or client issues
func (t *Transfer) TransferError(err error) { func (t *Transfer) TransferError(err error) {
t.transferError = err t.transferError = err
elapsed := time.Since(t.start).Nanoseconds() / 1000000
logger.Warn(logSender, t.connectionID, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+ logger.Warn(logSender, t.connectionID, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
"bytes received: %v", t.path, t.transferError, t.bytesSent, t.bytesReceived) "bytes received: %v transfer running since %v ms", t.path, t.transferError, t.bytesSent, t.bytesReceived, elapsed)
} }
// ReadAt reads len(p) bytes from the File to download starting at byte offset off and updates the bytes sent. // ReadAt reads len(p) bytes from the File to download starting at byte offset off and updates the bytes sent.
@ -65,19 +66,37 @@ func (t *Transfer) WriteAt(p []byte, off int64) (n int, err error) {
} }
// Close it is called when the transfer is completed. // Close it is called when the transfer is completed.
// It closes the underlying file, log the transfer info, update the user quota, for uploads, and execute any defined actions. // It closes the underlying file, log the transfer info, update the user quota (for uploads)
// and execute any defined actions.
// If there is an error no action will be executed and, in atomic mode, we try to delete
// the temporary file
func (t *Transfer) Close() error { func (t *Transfer) Close() error {
err := t.file.Close() err := t.file.Close()
if t.isFinished { if t.isFinished {
return err return err
} }
if t.transferType == transferUpload && t.file.Name() != t.path { t.isFinished = true
err = os.Rename(t.file.Name(), t.path) numFiles := 0
logger.Debug(logSender, t.connectionID, "atomic upload completed, rename: %#v -> %#v, error: %v", if t.isNewFile {
t.file.Name(), t.path, err) numFiles = 1
}
if t.transferType == transferUpload && t.file.Name() != t.path {
if t.transferError == nil {
err = os.Rename(t.file.Name(), t.path)
logger.Debug(logSender, t.connectionID, "atomic upload completed, rename: %#v -> %#v, error: %v",
t.file.Name(), t.path, err)
} else {
err = os.Remove(t.file.Name())
logger.Warn(logSender, t.connectionID, "atomic upload completed with error: \"%v\", delete temporary file: %#v, "+
"deletion error: %v", t.transferError, t.file.Name(), err)
if err == nil {
numFiles--
t.bytesReceived = 0
}
}
} }
elapsed := time.Since(t.start).Nanoseconds() / 1000000
if t.transferError == nil { if t.transferError == nil {
elapsed := time.Since(t.start).Nanoseconds() / 1000000
if t.transferType == transferDownload { if t.transferType == transferDownload {
logger.TransferLog(downloadLogSender, t.path, elapsed, t.bytesSent, t.user.Username, t.connectionID, t.protocol) logger.TransferLog(downloadLogSender, t.path, elapsed, t.bytesSent, t.user.Username, t.connectionID, t.protocol)
executeAction(operationDownload, t.user.Username, t.path, "") executeAction(operationDownload, t.user.Username, t.path, "")
@ -87,14 +106,9 @@ func (t *Transfer) Close() error {
} }
} }
removeTransfer(t) removeTransfer(t)
if t.transferType == transferUpload { if t.transferType == transferUpload && (numFiles != 0 || t.bytesReceived > 0) {
numFiles := 0
if t.isNewFile {
numFiles = 1
}
dataprovider.UpdateUserQuota(dataProvider, t.user, numFiles, t.bytesReceived, false) dataprovider.UpdateUserQuota(dataProvider, t.user, numFiles, t.bytesReceived, false)
} }
t.isFinished = true
return err return err
} }