add support for atomic upload

Atomic uploads are now configurable. The default upload mode remains
non atomic
This commit is contained in:
Nicola Murino 2019-08-04 09:37:58 +02:00
parent d2361da570
commit 80b9c40489
12 changed files with 319 additions and 94 deletions

View file

@ -18,6 +18,7 @@ Full featured and highly configurable SFTP server software
- REST API for users and quota management and real time reports for the active connections with possibility of forcibly closing a connection
- Log files are accurate and they are saved in the easily parsable JSON format
- Automatically terminating idle connections
- Atomic uploads are supported
## Platforms
@ -73,6 +74,7 @@ The `sftpgo.conf` configuration file contains the following sections:
- `max_auth_tries` integer. Maximum number of authentication attempts permitted per connection. If set to a negative number, the number of attempts are unlimited. If set to zero, the number of attempts are limited to 6.
- `umask`, string. Umask for the new files and directories. This setting has no effect on Windows. Default: "0022"
- `banner`, string. Identification string used by the server. Default "SFTPGo"
- `upload_mode` int. 0 means standard, the files are uploaded directly to the requested path. 1 means atomic: the files are uploaded to a temporary path and renamed to the requested path when the client ends the upload. Atomic mode avoid problems such as a web server that serves partial files when the files are being uploaded
- `actions`, struct. It contains the command to execute and/or the HTTP URL to notify and the trigger conditions
- `execute_on`, list of strings. Valid values are `download`, `upload`, `delete`, `rename`. On folder deletion a `delete` notification will be sent for each deleted file. Leave empty to disable actions.
- `command`, string. Absolute path to the command to execute. Leave empty to disable. The command is invoked with the following arguments:
@ -193,7 +195,7 @@ REST API is designed to run on localhost or on a trusted network, if you need ht
The OpenAPI 3 schema for the exposed API can be found inside the source tree: [openapi.yaml](https://github.com/drakkan/sftpgo/tree/master/api/schema/openapi.yaml "OpenAPI 3 specs").
A sample CLI client for the REST API can be found inside the source tree: [scripts](https://github.com/drakkan/sftpgo/tree/master/scripts "scripts") directory.
A sample CLI client for the REST API can be found inside the source tree [scripts](https://github.com/drakkan/sftpgo/tree/master/scripts "scripts") directory.
## Logs

View file

@ -7,6 +7,7 @@ package config
import (
"encoding/json"
"fmt"
"os"
"strings"
@ -41,6 +42,7 @@ func init() {
IdleTimeout: 15,
MaxAuthTries: 0,
Umask: "0022",
UploadMode: 0,
Actions: sftpd.Actions{
ExecuteOn: []string{},
Command: "",
@ -102,6 +104,13 @@ func LoadConfig(configPath string) error {
if strings.TrimSpace(globalConf.SFTPD.Banner) == "" {
globalConf.SFTPD.Banner = defaultBanner
}
if globalConf.SFTPD.UploadMode < 0 || globalConf.SFTPD.UploadMode > 1 {
err = fmt.Errorf("Invalid upload_mode 0 and 1 are supported, configured: %v reset upload_mode to 0",
globalConf.SFTPD.UploadMode)
globalConf.SFTPD.UploadMode = 0
logger.Warn(logSender, "Configuration error: %v", err)
logger.WarnToConsole("Configuration error: %v", err)
}
logger.Debug(logSender, "config loaded: %+v", globalConf)
return err
}

View file

@ -69,3 +69,24 @@ func TestEmptyBanner(t *testing.T) {
}
os.Remove(configFilePath)
}
func TestInvalidUploadMode(t *testing.T) {
configDir := ".."
confName := "temp.conf"
configFilePath := filepath.Join(configDir, confName)
config.LoadConfig(configFilePath)
sftpdConf := config.GetSFTPDConfig()
sftpdConf.UploadMode = 10
c := make(map[string]sftpd.Configuration)
c["sftpd"] = sftpdConf
jsonConf, _ := json.Marshal(c)
err := ioutil.WriteFile(configFilePath, jsonConf, 0666)
if err != nil {
t.Errorf("error saving temporary configuration")
}
err = config.LoadConfig(configFilePath)
if err == nil {
t.Errorf("Loading configuration with invalid upload_mode must fail")
}
os.Remove(configFilePath)
}

1
go.mod
View file

@ -12,6 +12,7 @@ require (
github.com/lib/pq v1.1.1
github.com/mattn/go-sqlite3 v1.10.0
github.com/pkg/sftp v1.10.0
github.com/rs/xid v1.2.1
github.com/rs/zerolog v1.14.3
github.com/stretchr/testify v1.3.0 // indirect
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4

1
go.sum
View file

@ -23,6 +23,7 @@ github.com/pkg/sftp v1.10.0 h1:DGA1KlA9esU6WcicH+P8PxFZOl15O6GYtab1cIJdOlE=
github.com/pkg/sftp v1.10.0/go.mod h1:NxmoDg/QLVWluQDUYG7XBZTLUpKeFa8e3aMf1BfjyHk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.14.3 h1:4EGfSkR2hJDB0s3oFfrlPqjU1e4WLncergLil3nEKW0=
github.com/rs/zerolog v1.14.3/go.mod h1:3WXPzbXEEliJ+a6UFE4vhIxV8qR1EML6ngzP9ug4eYg=

View file

@ -12,6 +12,7 @@ import (
"time"
"github.com/drakkan/sftpgo/utils"
"github.com/rs/xid"
"github.com/drakkan/sftpgo/dataprovider"
"github.com/drakkan/sftpgo/logger"
@ -94,6 +95,11 @@ func (c Connection) Filewrite(request *sftp.Request) (io.WriterAt, error) {
return nil, sftp.ErrSshFxNoSuchFile
}
filePath := p
if uploadMode == uploadModeAtomic {
filePath = getUploadTempFilePath(p)
}
c.lock.Lock()
defer c.lock.Unlock()
@ -101,45 +107,7 @@ func (c Connection) Filewrite(request *sftp.Request) (io.WriterAt, error) {
// If the file doesn't exist we need to create it, as well as the directory pathway
// leading up to where that file will be created.
if os.IsNotExist(statErr) {
if !c.hasSpace(true) {
logger.Info(logSender, "denying file write due to space limit")
return nil, sftp.ErrSshFxFailure
}
if _, err := os.Stat(filepath.Dir(p)); os.IsNotExist(err) {
if !c.User.HasPerm(dataprovider.PermCreateDirs) {
return nil, sftp.ErrSshFxPermissionDenied
}
}
err = c.createMissingDirs(p)
if err != nil {
logger.Error(logSender, "error making missing dir for path %v: %v", p, err)
return nil, sftp.ErrSshFxFailure
}
file, err := os.Create(p)
if err != nil {
logger.Error(logSender, "error creating file %v: %v", p, err)
return nil, sftp.ErrSshFxFailure
}
utils.SetPathPermissions(p, c.User.GetUID(), c.User.GetGID())
transfer := Transfer{
file: file,
path: p,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.User,
connectionID: c.ID,
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: true,
}
addTransfer(&transfer)
return &transfer, nil
return c.handleSFTPUploadToNewFile(p, filePath)
}
if statErr != nil {
@ -147,53 +115,13 @@ func (c Connection) Filewrite(request *sftp.Request) (io.WriterAt, error) {
return nil, sftp.ErrSshFxFailure
}
if !c.hasSpace(false) {
logger.Info(logSender, "denying file write due to space limit")
return nil, sftp.ErrSshFxFailure
}
// Not sure this would ever happen, but lets not find out.
// This happen if we upload a file that has the same name of an existing directory
if stat.IsDir() {
logger.Warn(logSender, "attempted to open a directory for writing to: %v", p)
return nil, sftp.ErrSshFxOpUnsupported
}
osFlags, trunc := getOSOpenFlags(request.Pflags())
if !trunc {
// see https://github.com/pkg/sftp/issues/295
logger.Info(logSender, "upload resume is not supported, returning error")
return nil, sftp.ErrSshFxOpUnsupported
}
// we use 0666 so the umask is applied
file, err := os.OpenFile(p, osFlags, 0666)
if err != nil {
logger.Error(logSender, "error opening existing file, flags: %v, source: %v, err: %v", request.Flags, p, err)
return nil, sftp.ErrSshFxFailure
}
if trunc {
// the file is truncated so we need to decrease quota size but not quota files
dataprovider.UpdateUserQuota(dataProvider, c.User, 0, -stat.Size(), false)
}
utils.SetPathPermissions(p, c.User.GetUID(), c.User.GetGID())
transfer := Transfer{
file: file,
path: p,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.User,
connectionID: c.ID,
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: false,
}
addTransfer(&transfer)
return &transfer, nil
return c.handleSFTPUploadToExistingFile(request.Pflags(), p, filePath, stat.Size())
}
// Filecmd hander for basic SFTP system calls related to files, but not anything to do with reading
@ -407,6 +335,101 @@ func (c Connection) handleSFTPRemove(path string) error {
return sftp.ErrSshFxOk
}
func (c Connection) handleSFTPUploadToNewFile(requestPath, filePath string) (io.WriterAt, error) {
if !c.hasSpace(true) {
logger.Info(logSender, "denying file write due to space limit")
return nil, sftp.ErrSshFxFailure
}
if _, err := os.Stat(filepath.Dir(requestPath)); os.IsNotExist(err) {
if !c.User.HasPerm(dataprovider.PermCreateDirs) {
return nil, sftp.ErrSshFxPermissionDenied
}
}
err := c.createMissingDirs(requestPath)
if err != nil {
logger.Error(logSender, "error making missing dir for path %v: %v", requestPath, err)
return nil, sftp.ErrSshFxFailure
}
file, err := os.Create(filePath)
if err != nil {
logger.Error(logSender, "error creating file %v: %v", requestPath, err)
return nil, sftp.ErrSshFxFailure
}
utils.SetPathPermissions(filePath, c.User.GetUID(), c.User.GetGID())
transfer := Transfer{
file: file,
path: requestPath,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.User,
connectionID: c.ID,
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: true,
}
addTransfer(&transfer)
return &transfer, nil
}
func (c Connection) handleSFTPUploadToExistingFile(pflags sftp.FileOpenFlags, requestPath, filePath string,
fileSize int64) (io.WriterAt, error) {
var err error
if !c.hasSpace(false) {
logger.Info(logSender, "denying file write due to space limit")
return nil, sftp.ErrSshFxFailure
}
osFlags, trunc := getOSOpenFlags(pflags)
if !trunc {
// see https://github.com/pkg/sftp/issues/295
logger.Info(logSender, "upload resume is not supported, returning error")
return nil, sftp.ErrSshFxOpUnsupported
}
if uploadMode == uploadModeAtomic {
err = os.Rename(requestPath, filePath)
if err != nil {
logger.Error(logSender, "error renaming existing file for atomic upload, source: %v, dest: %v, err: %v",
requestPath, filePath, err)
return nil, sftp.ErrSshFxFailure
}
}
// we use 0666 so the umask is applied
file, err := os.OpenFile(filePath, osFlags, 0666)
if err != nil {
logger.Error(logSender, "error opening existing file, flags: %v, source: %v, err: %v", pflags, filePath, err)
return nil, sftp.ErrSshFxFailure
}
// FIXME: this need to be changed when we add upload resume support
// the file is truncated so we need to decrease quota size but not quota files
dataprovider.UpdateUserQuota(dataProvider, c.User, 0, -fileSize, false)
utils.SetPathPermissions(filePath, c.User.GetUID(), c.User.GetGID())
transfer := Transfer{
file: file,
path: requestPath,
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: c.User,
connectionID: c.ID,
transferType: transferUpload,
lastActivity: time.Now(),
isNewFile: false,
}
addTransfer(&transfer)
return &transfer, nil
}
func (c Connection) hasSpace(checkFiles bool) bool {
if (checkFiles && c.User.QuotaFiles > 0) || c.User.QuotaSize > 0 {
numFile, size, err := dataprovider.GetUsedQuota(dataProvider, c.User.Username)
@ -565,3 +588,9 @@ func getOSOpenFlags(requestFlags sftp.FileOpenFlags) (flags int, trunc bool) {
}
return osFlags, truncateFile
}
func getUploadTempFilePath(path string) string {
dir := filepath.Dir(path)
guid := xid.New().String()
return filepath.Join(dir, ".sftpgo-upload."+guid+"."+filepath.Base(path))
}

View file

@ -1,8 +1,11 @@
package sftpd
import (
"os"
"runtime"
"testing"
"github.com/pkg/sftp"
)
func TestWrongActions(t *testing.T) {
@ -20,6 +23,10 @@ func TestWrongActions(t *testing.T) {
if err == nil {
t.Errorf("action with bad command must fail")
}
err = executeAction(operationDelete, "username", "path", "")
if err != nil {
t.Errorf("action not configured must silently fail")
}
actions.Command = ""
actions.HTTPNotificationURL = "http://foo\x7f.com/"
err = executeAction(operationDownload, "username", "path", "")
@ -43,3 +50,23 @@ func TestRemoveNonexistentQuotaScan(t *testing.T) {
t.Errorf("remove nonexistent transfer must fail")
}
}
func TestGetOSOpenFlags(t *testing.T) {
var flags sftp.FileOpenFlags
flags.Write = true
flags.Append = true
flags.Excl = true
osFlags, _ := getOSOpenFlags(flags)
if osFlags&os.O_WRONLY == 0 || osFlags&os.O_APPEND == 0 || osFlags&os.O_EXCL == 0 {
t.Errorf("error getting os flags from sftp file open flags")
}
}
func TestUploadResume(t *testing.T) {
c := Connection{}
var flags sftp.FileOpenFlags
_, err := c.handleSFTPUploadToExistingFile(flags, "", "", 0)
if err != sftp.ErrSshFxOpUnsupported {
t.Errorf("file resume is not supported")
}
}

View file

@ -43,6 +43,11 @@ type Configuration struct {
MaxAuthTries int `json:"max_auth_tries"`
// Umask for new files
Umask string `json:"umask"`
// UploadMode 0 means standard, the files are uploaded directly to the requested path.
// 1 means atomic: the files are uploaded to a temporary path and renamed to the requested path
// when the client ends the upload. Atomic mode avoid problems such as a web server that
// serves partial files when the files are being uploaded.
UploadMode int `json:"upload_mode"`
// Actions to execute on SFTP create, download, delete and rename
Actions Actions `json:"actions"`
// Keys are a list of host keys
@ -119,6 +124,7 @@ func (c Configuration) Initialize(configDir string) error {
}
actions = c.Actions
uploadMode = c.UploadMode
logger.Info(logSender, "server listener registered address: %v", listener.Addr().String())
if c.IdleTimeout > 0 {
startIdleTimer(time.Duration(c.IdleTimeout) * time.Minute)

View file

@ -42,6 +42,7 @@ var (
activeQuotaScans []ActiveQuotaScan
dataProvider dataprovider.Provider
actions Actions
uploadMode int
)
type connectionTransfer struct {

View file

@ -98,11 +98,15 @@ func TestMain(m *testing.M) {
sftpdConf := config.GetSFTPDConfig()
httpdConf := config.GetHTTPDConfig()
router := api.GetHTTPRouter()
// we run the test cases with UploadMode atomic. The non atomic code path
// simply does not execute some code so if it works in atomic mode will
// work in non atomic mode too
sftpdConf.UploadMode = 1
if runtime.GOOS == "windows" {
homeBasePath = "C:\\"
} else {
homeBasePath = "/tmp"
sftpdConf.Actions.ExecuteOn = []string{"download", "upload", "rename"}
sftpdConf.Actions.ExecuteOn = []string{"download", "upload", "rename", "delete"}
sftpdConf.Actions.Command = "/bin/true"
sftpdConf.Actions.HTTPNotificationURL = "http://127.0.0.1:8080/"
}
@ -231,14 +235,6 @@ func TestDirCommands(t *testing.T) {
t.Errorf("unable to create sftp client: %v", err)
} else {
defer client.Close()
_, err := client.Getwd()
if err != nil {
t.Errorf("unable to get working dir: %v", err)
}
_, err = client.ReadDir(".")
if err != nil {
t.Errorf("unable to read remote dir: %v", err)
}
err = client.Mkdir("test")
if err != nil {
t.Errorf("error mkdir: %v", err)
@ -251,10 +247,24 @@ func TestDirCommands(t *testing.T) {
if err != nil {
t.Errorf("error rmdir: %v", err)
}
err = client.MkdirAll("/test/test")
err = client.Mkdir("/test/test1")
if err != nil {
t.Errorf("error mkdir all: %v", err)
}
testFileName := "/test_file.dat"
testFilePath := filepath.Join(homeBasePath, testFileName)
testFileSize := int64(65535)
err = createTestFile(testFilePath, testFileSize)
if err != nil {
t.Errorf("unable to create test file: %v", err)
}
err = sftpUploadFile(testFilePath, filepath.Join("/test", testFileName), testFileSize, client)
if err != nil {
t.Errorf("file upload error: %v", err)
}
// internally client.Remove will call RemoveDirectory on failure
// the first remove will fail since test directory is not empty
// the RemoveDirectory called internally by client.Remove will succeed
err = client.Remove("/test")
if err != nil {
t.Errorf("error rmdir all: %v", err)
@ -263,6 +273,10 @@ func TestDirCommands(t *testing.T) {
if err == nil {
t.Errorf("stat for deleted dir must not succeed")
}
err = client.Remove("/test")
if err == nil {
t.Errorf("remove missing path must fail")
}
}
err = api.RemoveUser(user, http.StatusOK)
if err != nil {
@ -311,7 +325,7 @@ func TestSymlink(t *testing.T) {
}
}
func TestSetStat(t *testing.T) {
func TestStat(t *testing.T) {
usePubKey := false
user, err := api.AddUser(getTestUser(usePubKey), http.StatusOK)
if err != nil {
@ -352,6 +366,10 @@ func TestSetStat(t *testing.T) {
if fi.Mode().Perm() != newFi.Mode().Perm() {
t.Errorf("stat must remain unchanged")
}
_, err = client.ReadLink(testFileName)
if err == nil {
t.Errorf("readlink is not supported and must fail")
}
err = client.Remove(testFileName)
if err != nil {
t.Errorf("error removing uploaded file: %v", err)
@ -410,6 +428,24 @@ func TestEscapeHomeDir(t *testing.T) {
if err != nil {
t.Errorf("error removing uploaded file: %v", err)
}
linkPath = filepath.Join(homeBasePath, defaultUsername, testFileName)
err = os.Symlink(homeBasePath, linkPath)
if err != nil {
t.Errorf("error making local symlink: %v", err)
}
err = sftpDownloadFile(testFileName, testFilePath, 0, client)
if err == nil {
t.Errorf("download file outside home dir must fail")
}
err = sftpUploadFile(testFilePath, remoteDestPath, testFileSize, client)
if err == nil {
t.Errorf("overwrite a file outside home dir must fail")
}
err = client.Chmod(remoteDestPath, 0644)
if err == nil {
t.Errorf("setstat on a file outside home dir must fail")
}
os.Remove(linkPath)
}
err = api.RemoveUser(user, http.StatusOK)
if err != nil {
@ -886,6 +922,81 @@ func TestBandwidthAndConnections(t *testing.T) {
}
}
func TestMissingFile(t *testing.T) {
usePubKey := false
u := getTestUser(usePubKey)
user, err := api.AddUser(u, http.StatusOK)
if err != nil {
t.Errorf("unable to add user: %v", err)
}
client, err := getSftpClient(user, usePubKey)
if err != nil {
t.Errorf("unable to create sftp client: %v", err)
} else {
defer client.Close()
localDownloadPath := filepath.Join(homeBasePath, "test_download.dat")
err = sftpDownloadFile("missing_file", localDownloadPath, 0, client)
if err == nil {
t.Errorf("download missing file must fail")
}
}
err = api.RemoveUser(user, http.StatusOK)
if err != nil {
t.Errorf("unable to remove user: %v", err)
}
}
func TestOverwriteDirWithFile(t *testing.T) {
usePubKey := false
u := getTestUser(usePubKey)
user, err := api.AddUser(u, http.StatusOK)
if err != nil {
t.Errorf("unable to add user: %v", err)
}
client, err := getSftpClient(user, usePubKey)
if err != nil {
t.Errorf("unable to create sftp client: %v", err)
} else {
defer client.Close()
testFileSize := int64(65535)
testFileName := "test_file.dat"
testDirName := "test_dir"
testFilePath := filepath.Join(homeBasePath, testFileName)
err = createTestFile(testFilePath, testFileSize)
if err != nil {
t.Errorf("unable to create test file: %v", err)
}
err = client.Mkdir(testDirName)
if err != nil {
t.Errorf("mkdir error: %v", err)
}
err = sftpUploadFile(testFilePath, testDirName, testFileSize, client)
if err == nil {
t.Errorf("copying a file over an existing dir must fail")
}
err = sftpUploadFile(testFilePath, testFileName, testFileSize, client)
if err != nil {
t.Errorf("file upload error: %v", err)
}
err = client.Rename(testFileName, testDirName)
if err == nil {
t.Errorf("rename a file over an existing dir must fail")
}
err = client.RemoveDirectory(testDirName)
if err != nil {
t.Errorf("dir remove error: %v", err)
}
err = client.Remove(testFileName)
if err != nil {
t.Errorf("error removing uploaded file: %v", err)
}
}
err = api.RemoveUser(user, http.StatusOK)
if err != nil {
t.Errorf("unable to remove user: %v", err)
}
}
func TestPermList(t *testing.T) {
usePubKey := true
u := getTestUser(usePubKey)
@ -1262,8 +1373,14 @@ func sftpUploadFile(localSourcePath string, remoteDestPath string, expectedSize
if err != nil {
return err
}
defer destFile.Close()
_, err = io.Copy(destFile, srcFile)
if err != nil {
destFile.Close()
return err
}
// we need to close the file to trigger the close method on server
// we cannot defer closing or Lstat will fail for upload atomic mode
destFile.Close()
if expectedSize > 0 {
fi, err := client.Lstat(remoteDestPath)
if err != nil {

View file

@ -13,6 +13,11 @@ const (
transferDownload
)
const (
uploadModeStandard = iota
uploadModeAtomic
)
// Transfer contains the transfer details for an upload or a download.
// It implements the io Reader and Writer interface to handle files downloads and uploads
type Transfer struct {
@ -52,6 +57,11 @@ func (t *Transfer) WriteAt(p []byte, off int64) (n int, err error) {
// It closes the underlying file, log the transfer info, update the user quota, for uploads, and execute any defined actions.
func (t *Transfer) Close() error {
err := t.file.Close()
if t.transferType == transferUpload && t.file.Name() != t.path {
err = os.Rename(t.file.Name(), t.path)
logger.Debug(logSender, "atomic upload completed, rename: \"%v\" -> \"%v\", error: %v",
t.file.Name(), t.path, err)
}
elapsed := time.Since(t.start).Nanoseconds() / 1000000
if t.transferType == transferDownload {
logger.TransferLog(sftpdDownloadLogSender, t.path, elapsed, t.bytesSent, t.user.Username, t.connectionID)

View file

@ -6,6 +6,7 @@
"max_auth_tries": 0,
"umask": "0022",
"banner": "SFTPGo",
"upload_mode": 0,
"actions": {
"execute_on": [],
"command": "",