sftpd: add support for upload resume

we support resume only if the client sets the correct offset while resuming
the upload.
Based on the specs the offset is optional for resume, but all the tested
clients sets a right offset.
If an invalid offset is given we interrupt the transfer with the error
"Invalid write offset ..."

See https://github.com/pkg/sftp/issues/295

This commit add a new upload mode: "atomic with resume support", this acts
as atomic but if there is an upload error the temporary file is renamed
to the requested path and not deleted, this way a client can reconnect
and resume the upload
This commit is contained in:
Nicola Murino 2019-10-09 17:33:30 +02:00
parent 4f36c1de06
commit 1d917561fe
9 changed files with 331 additions and 137 deletions

View file

@ -125,7 +125,7 @@ The `sftpgo` configuration file contains the following sections:
- `max_auth_tries` integer. Maximum number of authentication attempts permitted per connection. If set to a negative number, the number of attempts are unlimited. If set to zero, the number of attempts are limited to 6.
- `umask`, string. Umask for the new files and directories. This setting has no effect on Windows. Default: "0022"
- `banner`, string. Identification string used by the server. Leave empty to use the default banner. Default "SFTPGo_version"
- `upload_mode` integer. 0 means standard, the files are uploaded directly to the requested path. 1 means atomic: files are uploaded to a temporary path and renamed to the requested path when the client ends the upload. Atomic mode avoids problems such as a web server that serves partial files when the files are being uploaded. In atomic mode if there is an upload error the temporary file is deleted and so the requested upload path will not contain a partial file.
- `upload_mode` integer. 0 means standard, the files are uploaded directly to the requested path. 1 means atomic: files are uploaded to a temporary path and renamed to the requested path when the client ends the upload. Atomic mode avoids problems such as a web server that serves partial files when the files are being uploaded. In atomic mode if there is an upload error the temporary file is deleted and so the requested upload path will not contain a partial file. 2 means atomic with resume support: as atomic but if there is an upload error the temporary file is renamed to the requested path and not deleted, this way a client can reconnect and resume the upload.
- `actions`, struct. It contains the command to execute and/or the HTTP URL to notify and the trigger conditions
- `execute_on`, list of strings. Valid values are `download`, `upload`, `delete`, `rename`. On folder deletion a `delete` notification will be sent for each deleted file. Actions will be not executed if an error is detected and so a partial file is uploaded or downloaded. Leave empty to disable actions. The `upload` condition includes both uploads to new files and overwrite existing files
- `command`, string. Absolute path to the command to execute. Leave empty to disable. The command is invoked with the following arguments:

View file

@ -140,7 +140,7 @@ func LoadConfig(configDir, configName string) error {
if strings.TrimSpace(globalConf.SFTPD.Banner) == "" {
globalConf.SFTPD.Banner = defaultBanner
}
if globalConf.SFTPD.UploadMode < 0 || globalConf.SFTPD.UploadMode > 1 {
if globalConf.SFTPD.UploadMode < 0 || globalConf.SFTPD.UploadMode > 2 {
err = fmt.Errorf("Invalid upload_mode 0 and 1 are supported, configured: %v reset upload_mode to 0",
globalConf.SFTPD.UploadMode)
globalConf.SFTPD.UploadMode = 0

View file

@ -75,19 +75,20 @@ func (c Connection) Fileread(request *sftp.Request) (io.ReaderAt, error) {
c.Log(logger.LevelDebug, logSender, "fileread requested for path: %#v", p)
transfer := Transfer{
file: file,
path: p,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.User,
connectionID: c.ID,
transferType: transferDownload,
lastActivity: time.Now(),
isNewFile: false,
protocol: c.protocol,
transferError: nil,
isFinished: false,
file: file,
path: p,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.User,
connectionID: c.ID,
transferType: transferDownload,
lastActivity: time.Now(),
isNewFile: false,
protocol: c.protocol,
transferError: nil,
isFinished: false,
minWriteOffset: 0,
}
addTransfer(&transfer)
return &transfer, nil
@ -106,7 +107,7 @@ func (c Connection) Filewrite(request *sftp.Request) (io.WriterAt, error) {
}
filePath := p
if uploadMode == uploadModeAtomic {
if isAtomicUploadEnabled() {
filePath = getUploadTempFilePath(p)
}
@ -376,19 +377,20 @@ func (c Connection) handleSFTPUploadToNewFile(requestPath, filePath string) (io.
utils.SetPathPermissions(filePath, c.User.GetUID(), c.User.GetGID())
transfer := Transfer{
file: file,
path: requestPath,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.User,
connectionID: c.ID,
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: true,
protocol: c.protocol,
transferError: nil,
isFinished: false,
file: file,
path: requestPath,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.User,
connectionID: c.ID,
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: true,
protocol: c.protocol,
transferError: nil,
isFinished: false,
minWriteOffset: 0,
}
addTransfer(&transfer)
return &transfer, nil
@ -402,15 +404,10 @@ func (c Connection) handleSFTPUploadToExistingFile(pflags sftp.FileOpenFlags, re
return nil, sftp.ErrSshFxFailure
}
minWriteOffset := int64(0)
osFlags := getOSOpenFlags(pflags)
if osFlags&os.O_TRUNC == 0 {
// see https://github.com/pkg/sftp/issues/295
c.Log(logger.LevelInfo, logSender, "upload resume is not supported, returning error for file: %#v", requestPath)
return nil, sftp.ErrSshFxOpUnsupported
}
if uploadMode == uploadModeAtomic {
if isAtomicUploadEnabled() {
err = os.Rename(requestPath, filePath)
if err != nil {
c.Log(logger.LevelError, logSender, "error renaming existing file for atomic upload, source: %#v, dest: %#v, err: %v",
@ -425,26 +422,30 @@ func (c Connection) handleSFTPUploadToExistingFile(pflags sftp.FileOpenFlags, re
return nil, sftp.ErrSshFxFailure
}
// FIXME: this need to be changed when we add upload resume support
// the file is truncated so we need to decrease quota size but not quota files
dataprovider.UpdateUserQuota(dataProvider, c.User, 0, -fileSize, false)
if pflags.Append && osFlags&os.O_TRUNC == 0 {
c.Log(logger.LevelDebug, logSender, "upload resume requested, file path: %#v initial size: %v", filePath, fileSize)
minWriteOffset = fileSize
} else {
dataprovider.UpdateUserQuota(dataProvider, c.User, 0, -fileSize, false)
}
utils.SetPathPermissions(filePath, c.User.GetUID(), c.User.GetGID())
transfer := Transfer{
file: file,
path: requestPath,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.User,
connectionID: c.ID,
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: false,
protocol: c.protocol,
transferError: nil,
isFinished: false,
file: file,
path: requestPath,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.User,
connectionID: c.ID,
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: false,
protocol: c.protocol,
transferError: nil,
isFinished: false,
minWriteOffset: minWriteOffset,
}
addTransfer(&transfer)
return &transfer, nil
@ -600,9 +601,10 @@ func getOSOpenFlags(requestFlags sftp.FileOpenFlags) (flags int) {
} else if requestFlags.Write {
osFlags |= os.O_WRONLY
}
if requestFlags.Append {
// we ignore Append flag since pkg/sftp use WriteAt that cannot work with os.O_APPEND
/*if requestFlags.Append {
osFlags |= os.O_APPEND
}
}*/
if requestFlags.Creat {
osFlags |= os.O_CREATE
}

View file

@ -98,21 +98,44 @@ func TestRemoveNonexistentQuotaScan(t *testing.T) {
func TestGetOSOpenFlags(t *testing.T) {
var flags sftp.FileOpenFlags
flags.Write = true
flags.Append = true
flags.Excl = true
osFlags := getOSOpenFlags(flags)
if osFlags&os.O_WRONLY == 0 || osFlags&os.O_APPEND == 0 || osFlags&os.O_EXCL == 0 {
if osFlags&os.O_WRONLY == 0 || osFlags&os.O_EXCL == 0 {
t.Errorf("error getting os flags from sftp file open flags")
}
flags.Append = true
// append flag should be ignored to allow resume
if osFlags&os.O_WRONLY == 0 || osFlags&os.O_EXCL == 0 {
t.Errorf("error getting os flags from sftp file open flags")
}
}
func TestUploadResume(t *testing.T) {
c := Connection{}
var flags sftp.FileOpenFlags
_, err := c.handleSFTPUploadToExistingFile(flags, "", "", 0)
if err != sftp.ErrSshFxOpUnsupported {
t.Errorf("file resume is not supported")
func TestUploadResumeInvalidOffset(t *testing.T) {
testfile := "testfile"
file, _ := os.Create(testfile)
transfer := Transfer{
file: file,
path: file.Name(),
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: dataprovider.User{
Username: "testuser",
},
connectionID: "",
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: false,
protocol: protocolSFTP,
transferError: nil,
isFinished: false,
minWriteOffset: 10,
}
_, err := transfer.WriteAt([]byte("test"), 0)
if err == nil {
t.Errorf("upload with invalid offset must fail")
}
os.Remove(testfile)
}
func TestUploadFiles(t *testing.T) {
@ -617,19 +640,20 @@ func TestSCPUploadFiledata(t *testing.T) {
}
file, _ := os.Create(testfile)
transfer := Transfer{
file: file,
path: file.Name(),
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: scpCommand.connection.User,
connectionID: "",
transferType: transferDownload,
lastActivity: time.Now(),
isNewFile: true,
protocol: connection.protocol,
transferError: nil,
isFinished: false,
file: file,
path: file.Name(),
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: scpCommand.connection.User,
connectionID: "",
transferType: transferDownload,
lastActivity: time.Now(),
isNewFile: true,
protocol: connection.protocol,
transferError: nil,
isFinished: false,
minWriteOffset: 0,
}
addTransfer(&transfer)
err := scpCommand.getUploadFileData(2, &transfer)
@ -684,6 +708,8 @@ func TestSCPUploadFiledata(t *testing.T) {
}
func TestUploadError(t *testing.T) {
oldUploadMode := uploadMode
uploadMode = uploadModeAtomic
connection := Connection{
User: dataprovider.User{
Username: "testuser",
@ -694,19 +720,20 @@ func TestUploadError(t *testing.T) {
fileTempName := "temptestfile"
file, _ := os.Create(fileTempName)
transfer := Transfer{
file: file,
path: testfile,
start: time.Now(),
bytesSent: 0,
bytesReceived: 100,
user: connection.User,
connectionID: "",
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: true,
protocol: connection.protocol,
transferError: nil,
isFinished: false,
file: file,
path: testfile,
start: time.Now(),
bytesSent: 0,
bytesReceived: 100,
user: connection.User,
connectionID: "",
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: true,
protocol: connection.protocol,
transferError: nil,
isFinished: false,
minWriteOffset: 0,
}
addTransfer(&transfer)
transfer.TransferError(fmt.Errorf("fake error"))
@ -722,6 +749,7 @@ func TestUploadError(t *testing.T) {
if !os.IsNotExist(err) {
t.Errorf("file uploaded must be deleted after an error: %v", err)
}
uploadMode = oldUploadMode
}
func TestConnectionStatusStruct(t *testing.T) {

View file

@ -217,19 +217,20 @@ func (c *scpCommand) handleUploadFile(requestPath, filePath string, sizeToRead i
utils.SetPathPermissions(filePath, c.connection.User.GetUID(), c.connection.User.GetGID())
transfer := Transfer{
file: file,
path: requestPath,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.connection.User,
connectionID: c.connection.ID,
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: isNewFile,
protocol: c.connection.protocol,
transferError: nil,
isFinished: false,
file: file,
path: requestPath,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.connection.User,
connectionID: c.connection.ID,
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: isNewFile,
protocol: c.connection.protocol,
transferError: nil,
isFinished: false,
minWriteOffset: 0,
}
addTransfer(&transfer)
@ -254,7 +255,7 @@ func (c *scpCommand) handleUpload(uploadFilePath string, sizeToRead int64) error
return err
}
filePath := p
if uploadMode == uploadModeAtomic {
if isAtomicUploadEnabled() {
filePath = getUploadTempFilePath(p)
}
stat, statErr := os.Stat(p)
@ -282,7 +283,7 @@ func (c *scpCommand) handleUpload(uploadFilePath string, sizeToRead int64) error
return err
}
if uploadMode == uploadModeAtomic {
if isAtomicUploadEnabled() {
err = os.Rename(p, filePath)
if err != nil {
c.connection.Log(logger.LevelError, logSenderSCP, "error renaming existing file for atomic upload, source: %#v, dest: %#v, err: %v",
@ -468,19 +469,20 @@ func (c *scpCommand) handleDownload(filePath string) error {
}
transfer := Transfer{
file: file,
path: p,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.connection.User,
connectionID: c.connection.ID,
transferType: transferDownload,
lastActivity: time.Now(),
isNewFile: false,
protocol: c.connection.protocol,
transferError: nil,
isFinished: false,
file: file,
path: p,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.connection.User,
connectionID: c.connection.ID,
transferType: transferDownload,
lastActivity: time.Now(),
isNewFile: false,
protocol: c.connection.protocol,
transferError: nil,
isFinished: false,
minWriteOffset: 0,
}
addTransfer(&transfer)

View file

@ -49,6 +49,11 @@ type Configuration struct {
// 1 means atomic: the files are uploaded to a temporary path and renamed to the requested path
// when the client ends the upload. Atomic mode avoid problems such as a web server that
// serves partial files when the files are being uploaded.
// In atomic mode if there is an upload error the temporary file is deleted and so the requested
// upload path will not contain a partial file.
// 2 means atomic with resume support: as atomic but if there is an upload error the temporary
// file is renamed to the requested path and not deleted, this way a client can reconnect and resume
// the upload.
UploadMode int `json:"upload_mode" mapstructure:"upload_mode"`
// Actions to execute on SFTP create, download, delete and rename
Actions Actions `json:"actions" mapstructure:"actions"`

View file

@ -37,6 +37,12 @@ const (
protocolSCP = "SCP"
)
const (
uploadModeStandard = iota
uploadModeAtomic
uploadModeAtomicWithResume
)
var (
mutex sync.RWMutex
openConnections map[string]Connection
@ -356,6 +362,10 @@ func updateConnectionActivity(id string) {
}
}
func isAtomicUploadEnabled() bool {
return uploadMode == uploadModeAtomic || uploadMode == uploadModeAtomicWithResume
}
func executeAction(operation string, username string, path string, target string) error {
if !utils.IsStringInSlice(operation, actions.ExecuteOn) {
return nil

View file

@ -1,7 +1,9 @@
package sftpd_test
import (
"bytes"
"crypto/rand"
"crypto/sha256"
"fmt"
"io"
"io/ioutil"
@ -112,10 +114,10 @@ func TestMain(m *testing.M) {
sftpdConf.LoginBannerFile = loginBannerFileName
// we need to test SCP support
sftpdConf.IsSCPEnabled = true
// we run the test cases with UploadMode atomic. The non atomic code path
// we run the test cases with UploadMode atomic and resume support. The non atomic code path
// simply does not execute some code so if it works in atomic mode will
// work in non atomic mode too
sftpdConf.UploadMode = 1
sftpdConf.UploadMode = 2
if runtime.GOOS == "windows" {
homeBasePath = "C:\\"
} else {
@ -187,6 +189,7 @@ func TestBasicSFTPHandling(t *testing.T) {
if err != nil {
t.Errorf("unable to add user: %v", err)
}
os.RemoveAll(user.GetHomeDir())
client, err := getSftpClient(user, usePubKey)
if err != nil {
t.Errorf("unable to create sftp client: %v", err)
@ -246,6 +249,67 @@ func TestBasicSFTPHandling(t *testing.T) {
os.RemoveAll(user.GetHomeDir())
}
func TestUploadResume(t *testing.T) {
usePubKey := false
u := getTestUser(usePubKey)
user, _, err := httpd.AddUser(u, http.StatusOK)
if err != nil {
t.Errorf("unable to add user: %v", err)
}
os.RemoveAll(user.GetHomeDir())
client, err := getSftpClient(user, usePubKey)
if err != nil {
t.Errorf("unable to create sftp client: %v", err)
} else {
defer client.Close()
testFileName := "test_file.dat"
testFilePath := filepath.Join(homeBasePath, testFileName)
testFileSize := int64(65535)
appendDataSize := int64(65535)
err = createTestFile(testFilePath, testFileSize)
if err != nil {
t.Errorf("unable to create test file: %v", err)
}
err = sftpUploadFile(testFilePath, testFileName, testFileSize, client)
if err != nil {
t.Errorf("file upload error: %v", err)
}
err = appendToTestFile(testFilePath, appendDataSize)
if err != nil {
t.Errorf("unable to append to test file: %v", err)
}
err = sftpUploadResumeFile(testFilePath, testFileName, testFileSize+appendDataSize, false, client)
if err != nil {
t.Errorf("file upload resume error: %v", err)
}
localDownloadPath := filepath.Join(homeBasePath, "test_download.dat")
err = sftpDownloadFile(testFileName, localDownloadPath, testFileSize+appendDataSize, client)
if err != nil {
t.Errorf("file download error: %v", err)
}
initialHash, err := computeFileHash(localDownloadPath)
if err != nil {
t.Errorf("error computing file hash: %v", err)
}
donwloadedFileHash, err := computeFileHash(localDownloadPath)
if err != nil {
t.Errorf("error computing downloaded file hash: %v", err)
}
if donwloadedFileHash != initialHash {
t.Errorf("resume failed: file hash does not match")
}
err = sftpUploadResumeFile(testFilePath, testFileName, testFileSize+appendDataSize, true, client)
if err == nil {
t.Errorf("file upload resume with invalid offset must fail")
}
}
_, err = httpd.RemoveUser(user, http.StatusOK)
if err != nil {
t.Errorf("unable to remove user: %v", err)
}
os.RemoveAll(user.GetHomeDir())
}
func TestDirCommands(t *testing.T) {
usePubKey := false
user, _, err := httpd.AddUser(getTestUser(usePubKey), http.StatusOK)
@ -2301,6 +2365,26 @@ func createTestFile(path string, size int64) error {
return ioutil.WriteFile(path, content, 0666)
}
func appendToTestFile(path string, size int64) error {
content := make([]byte, size)
_, err := rand.Read(content)
if err != nil {
return err
}
f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
return err
}
written, err := io.Copy(f, bytes.NewReader(content))
if err != nil {
return err
}
if int64(written) != size {
return fmt.Errorf("write error, written: %v/%v", written, size)
}
return nil
}
func sftpUploadFile(localSourcePath string, remoteDestPath string, expectedSize int64, client *sftp.Client) error {
srcFile, err := os.Open(localSourcePath)
if err != nil {
@ -2331,6 +2415,53 @@ func sftpUploadFile(localSourcePath string, remoteDestPath string, expectedSize
return err
}
func sftpUploadResumeFile(localSourcePath string, remoteDestPath string, expectedSize int64, invalidOffset bool,
client *sftp.Client) error {
srcFile, err := os.Open(localSourcePath)
if err != nil {
return err
}
defer srcFile.Close()
fi, err := client.Lstat(remoteDestPath)
if err != nil {
return err
}
if !invalidOffset {
_, err = srcFile.Seek(fi.Size(), 0)
if err != nil {
return err
}
}
destFile, err := client.OpenFile(remoteDestPath, os.O_WRONLY|os.O_APPEND)
if err != nil {
return err
}
if !invalidOffset {
_, err = destFile.Seek(fi.Size(), 0)
if err != nil {
return err
}
}
_, err = io.Copy(destFile, srcFile)
if err != nil {
destFile.Close()
return err
}
// we need to close the file to trigger the close method on server
// we cannot defer closing or Lstat will fail for upload atomic mode
destFile.Close()
if expectedSize > 0 {
fi, err := client.Lstat(remoteDestPath)
if err != nil {
return err
}
if fi.Size() != expectedSize {
return fmt.Errorf("uploaded file size does not match, actual: %v, expected: %v", fi.Size(), expectedSize)
}
}
return err
}
func sftpDownloadFile(remoteSourcePath string, localDestPath string, expectedSize int64, client *sftp.Client) error {
downloadDest, err := os.Create(localDestPath)
if err != nil {
@ -2432,6 +2563,21 @@ func getScpUploadCommand(localPath, remotePath string, preserveTime, remoteToRem
return exec.Command(scpPath, args...)
}
func computeFileHash(path string) (string, error) {
hash := ""
f, err := os.Open(path)
if err != nil {
return hash, err
}
defer f.Close()
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return hash, err
}
hash = fmt.Sprintf("%x", h.Sum(nil))
return hash, err
}
func waitForNoActiveTransfer() {
for len(sftpd.GetConnectionsStats()) > 0 {
time.Sleep(100 * time.Millisecond)

View file

@ -1,6 +1,7 @@
package sftpd
import (
"fmt"
"os"
"time"
@ -14,27 +15,23 @@ const (
transferDownload
)
const (
uploadModeStandard = iota
uploadModeAtomic
)
// Transfer contains the transfer details for an upload or a download.
// It implements the io Reader and Writer interface to handle files downloads and uploads
type Transfer struct {
file *os.File
path string
start time.Time
bytesSent int64
bytesReceived int64
user dataprovider.User
connectionID string
transferType int
lastActivity time.Time
isNewFile bool
protocol string
transferError error
isFinished bool
file *os.File
path string
start time.Time
bytesSent int64
bytesReceived int64
user dataprovider.User
connectionID string
transferType int
lastActivity time.Time
isNewFile bool
protocol string
transferError error
isFinished bool
minWriteOffset int64
}
// TransferError is called if there is an unexpected error.
@ -60,6 +57,10 @@ func (t *Transfer) ReadAt(p []byte, off int64) (n int, err error) {
// It handles upload bandwidth throttling too
func (t *Transfer) WriteAt(p []byte, off int64) (n int, err error) {
t.lastActivity = time.Now()
if off < t.minWriteOffset {
logger.Warn(logSender, t.connectionID, "Invalid write offset %v minimum valid value %v", off, t.minWriteOffset)
return 0, fmt.Errorf("Invalid write offset %v", off)
}
written, e := t.file.WriteAt(p, off)
t.bytesReceived += int64(written)
t.handleThrottle()
@ -82,7 +83,7 @@ func (t *Transfer) Close() error {
numFiles = 1
}
if t.transferType == transferUpload && t.file.Name() != t.path {
if t.transferError == nil {
if t.transferError == nil || uploadMode == uploadModeAtomicWithResume {
err = os.Rename(t.file.Name(), t.path)
logger.Debug(logSender, t.connectionID, "atomic upload completed, rename: %#v -> %#v, error: %v",
t.file.Name(), t.path, err)