transfers: improve errors detection

We can now properly report write errors if for example no space left on
device.

For downloads we check the downloaded size with the expected one
This commit is contained in:
Nicola Murino 2020-01-10 19:20:22 +01:00
parent 5cf4a47b48
commit 1d9bb54073
6 changed files with 133 additions and 17 deletions

View file

@ -37,7 +37,6 @@ type Connection struct {
// last activity for this connection // last activity for this connection
lastActivity time.Time lastActivity time.Time
protocol string protocol string
lock *sync.Mutex
netConn net.Conn netConn net.Conn
channel ssh.Channel channel ssh.Channel
command string command string
@ -61,10 +60,8 @@ func (c Connection) Fileread(request *sftp.Request) (io.ReaderAt, error) {
return nil, getSFTPErrorFromOSError(err) return nil, getSFTPErrorFromOSError(err)
} }
c.lock.Lock() fi, err := os.Stat(p)
defer c.lock.Unlock() if err != nil {
if _, err := os.Stat(p); err != nil {
return nil, getSFTPErrorFromOSError(err) return nil, getSFTPErrorFromOSError(err)
} }
@ -91,6 +88,8 @@ func (c Connection) Fileread(request *sftp.Request) (io.ReaderAt, error) {
transferError: nil, transferError: nil,
isFinished: false, isFinished: false,
minWriteOffset: 0, minWriteOffset: 0,
expectedSize: fi.Size(),
lock: new(sync.Mutex),
} }
addTransfer(&transfer) addTransfer(&transfer)
return &transfer, nil return &transfer, nil
@ -109,9 +108,6 @@ func (c Connection) Filewrite(request *sftp.Request) (io.WriterAt, error) {
filePath = getUploadTempFilePath(p) filePath = getUploadTempFilePath(p)
} }
c.lock.Lock()
defer c.lock.Unlock()
stat, statErr := os.Stat(p) stat, statErr := os.Stat(p)
if os.IsNotExist(statErr) { if os.IsNotExist(statErr) {
if !c.User.HasPerm(dataprovider.PermUpload, path.Dir(request.Filepath)) { if !c.User.HasPerm(dataprovider.PermUpload, path.Dir(request.Filepath)) {
@ -448,6 +444,7 @@ func (c Connection) handleSFTPUploadToNewFile(requestPath, filePath string) (io.
transferError: nil, transferError: nil,
isFinished: false, isFinished: false,
minWriteOffset: 0, minWriteOffset: 0,
lock: new(sync.Mutex),
} }
addTransfer(&transfer) addTransfer(&transfer)
return &transfer, nil return &transfer, nil
@ -503,6 +500,7 @@ func (c Connection) handleSFTPUploadToExistingFile(pflags sftp.FileOpenFlags, re
transferError: nil, transferError: nil,
isFinished: false, isFinished: false,
minWriteOffset: minWriteOffset, minWriteOffset: minWriteOffset,
lock: new(sync.Mutex),
} }
addTransfer(&transfer) addTransfer(&transfer)
return &transfer, nil return &transfer, nil

View file

@ -9,6 +9,7 @@ import (
"os" "os"
"runtime" "runtime"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -136,6 +137,7 @@ func TestUploadResumeInvalidOffset(t *testing.T) {
transferError: nil, transferError: nil,
isFinished: false, isFinished: false,
minWriteOffset: 10, minWriteOffset: 10,
lock: new(sync.Mutex),
} }
_, err := transfer.WriteAt([]byte("test"), 0) _, err := transfer.WriteAt([]byte("test"), 0)
if err == nil { if err == nil {
@ -144,6 +146,76 @@ func TestUploadResumeInvalidOffset(t *testing.T) {
os.Remove(testfile) os.Remove(testfile)
} }
func TestIncompleteDownload(t *testing.T) {
testfile := "testfile"
file, _ := os.Create(testfile)
transfer := Transfer{
file: file,
path: file.Name(),
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: dataprovider.User{
Username: "testuser",
},
connectionID: "",
transferType: transferDownload,
lastActivity: time.Now(),
isNewFile: false,
protocol: protocolSFTP,
transferError: nil,
isFinished: false,
minWriteOffset: 0,
expectedSize: 10,
lock: new(sync.Mutex),
}
err := transfer.Close()
if err == nil {
t.Error("upoload must fail the expected size does not match")
}
os.Remove(testfile)
}
func TestReadWriteErrors(t *testing.T) {
testfile := "testfile"
file, _ := os.Create(testfile)
transfer := Transfer{
file: file,
path: file.Name(),
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
user: dataprovider.User{
Username: "testuser",
},
connectionID: "",
transferType: transferDownload,
lastActivity: time.Now(),
isNewFile: false,
protocol: protocolSFTP,
transferError: nil,
isFinished: false,
minWriteOffset: 0,
expectedSize: 10,
lock: new(sync.Mutex),
}
file.Close()
_, err := transfer.WriteAt([]byte("test"), 0)
if err == nil {
t.Error("writing to closed file must fail")
}
buf := make([]byte, 32768)
_, err = transfer.ReadAt(buf, 0)
if err == nil {
t.Error("reading from a closed file must fail")
}
err = transfer.Close()
if err == nil {
t.Error("upoload must fail the expected size does not match")
}
os.Remove(testfile)
}
func TestUploadFiles(t *testing.T) { func TestUploadFiles(t *testing.T) {
oldUploadMode := uploadMode oldUploadMode := uploadMode
uploadMode = uploadModeAtomic uploadMode = uploadModeAtomic
@ -550,7 +622,9 @@ func TestSystemCommandErrors(t *testing.T) {
WriteError: nil, WriteError: nil,
} }
sshCmd.connection.channel = &mockSSHChannel sshCmd.connection.channel = &mockSSHChannel
transfer := Transfer{transferType: transferDownload} transfer := Transfer{
transferType: transferDownload,
lock: new(sync.Mutex)}
destBuff := make([]byte, 65535) destBuff := make([]byte, 65535)
dst := bytes.NewBuffer(destBuff) dst := bytes.NewBuffer(destBuff)
_, err = transfer.copyFromReaderToWriter(dst, sshCmd.connection.channel, 0) _, err = transfer.copyFromReaderToWriter(dst, sshCmd.connection.channel, 0)
@ -1071,6 +1145,7 @@ func TestSCPUploadFiledata(t *testing.T) {
transferError: nil, transferError: nil,
isFinished: false, isFinished: false,
minWriteOffset: 0, minWriteOffset: 0,
lock: new(sync.Mutex),
} }
addTransfer(&transfer) addTransfer(&transfer)
err := scpCommand.getUploadFileData(2, &transfer) err := scpCommand.getUploadFileData(2, &transfer)
@ -1151,12 +1226,13 @@ func TestUploadError(t *testing.T) {
transferError: nil, transferError: nil,
isFinished: false, isFinished: false,
minWriteOffset: 0, minWriteOffset: 0,
lock: new(sync.Mutex),
} }
addTransfer(&transfer) addTransfer(&transfer)
transfer.TransferError(fmt.Errorf("fake error")) transfer.TransferError(fmt.Errorf("fake error"))
transfer.Close() transfer.Close()
if transfer.bytesReceived > 0 { if transfer.bytesReceived > 0 {
t.Errorf("byte sent should be 0 for a failed transfer: %v", transfer.bytesSent) t.Errorf("bytes received should be 0 for a failed transfer: %v", transfer.bytesReceived)
} }
_, err := os.Stat(testfile) _, err := os.Stat(testfile)
if !os.IsNotExist(err) { if !os.IsNotExist(err) {

View file

@ -10,6 +10,7 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/drakkan/sftpgo/dataprovider" "github.com/drakkan/sftpgo/dataprovider"
@ -212,6 +213,7 @@ func (c *scpCommand) handleUploadFile(requestPath, filePath string, sizeToRead i
transferError: nil, transferError: nil,
isFinished: false, isFinished: false,
minWriteOffset: 0, minWriteOffset: 0,
lock: new(sync.Mutex),
} }
addTransfer(&transfer) addTransfer(&transfer)
@ -387,8 +389,9 @@ func (c *scpCommand) sendDownloadFileData(filePath string, stat os.FileInfo, tra
} }
buf := make([]byte, 32768) buf := make([]byte, 32768)
var n int
for { for {
n, err := transfer.ReadAt(buf, readed) n, err = transfer.ReadAt(buf, readed)
if err == nil || err == io.EOF { if err == nil || err == io.EOF {
if n > 0 { if n > 0 {
_, err = c.connection.channel.Write(buf[:n]) _, err = c.connection.channel.Write(buf[:n])
@ -471,6 +474,8 @@ func (c *scpCommand) handleDownload(filePath string) error {
transferError: nil, transferError: nil,
isFinished: false, isFinished: false,
minWriteOffset: 0, minWriteOffset: 0,
expectedSize: stat.Size(),
lock: new(sync.Mutex),
} }
addTransfer(&transfer) addTransfer(&transfer)

View file

@ -14,7 +14,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/drakkan/sftpgo/dataprovider" "github.com/drakkan/sftpgo/dataprovider"
@ -274,7 +273,6 @@ func (c Configuration) AcceptInboundConnection(conn net.Conn, config *ssh.Server
RemoteAddr: remoteAddr, RemoteAddr: remoteAddr,
StartTime: time.Now(), StartTime: time.Now(),
lastActivity: time.Now(), lastActivity: time.Now(),
lock: new(sync.Mutex),
netConn: conn, netConn: conn,
channel: nil, channel: nil,
} }

View file

@ -200,6 +200,7 @@ func (c *sshCommand) executeSystemCommand(command systemCommand) error {
transferError: nil, transferError: nil,
isFinished: false, isFinished: false,
minWriteOffset: 0, minWriteOffset: 0,
lock: new(sync.Mutex),
} }
addTransfer(&transfer) addTransfer(&transfer)
defer removeTransfer(&transfer) defer removeTransfer(&transfer)
@ -227,6 +228,7 @@ func (c *sshCommand) executeSystemCommand(command systemCommand) error {
transferError: nil, transferError: nil,
isFinished: false, isFinished: false,
minWriteOffset: 0, minWriteOffset: 0,
lock: new(sync.Mutex),
} }
addTransfer(&transfer) addTransfer(&transfer)
defer removeTransfer(&transfer) defer removeTransfer(&transfer)
@ -255,6 +257,7 @@ func (c *sshCommand) executeSystemCommand(command systemCommand) error {
transferError: nil, transferError: nil,
isFinished: false, isFinished: false,
minWriteOffset: 0, minWriteOffset: 0,
lock: new(sync.Mutex),
} }
addTransfer(&transfer) addTransfer(&transfer)
defer removeTransfer(&transfer) defer removeTransfer(&transfer)

View file

@ -1,9 +1,11 @@
package sftpd package sftpd
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
"sync"
"time" "time"
"github.com/drakkan/sftpgo/dataprovider" "github.com/drakkan/sftpgo/dataprovider"
@ -33,11 +35,18 @@ type Transfer struct {
transferError error transferError error
isFinished bool isFinished bool
minWriteOffset int64 minWriteOffset int64
expectedSize int64
lock *sync.Mutex
} }
// TransferError is called if there is an unexpected error. // TransferError is called if there is an unexpected error.
// For example network or client issues // For example network or client issues
func (t *Transfer) TransferError(err error) { func (t *Transfer) TransferError(err error) {
t.lock.Lock()
defer t.lock.Unlock()
if t.transferError != nil {
return
}
t.transferError = err t.transferError = err
elapsed := time.Since(t.start).Nanoseconds() / 1000000 elapsed := time.Since(t.start).Nanoseconds() / 1000000
logger.Warn(logSender, t.connectionID, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+ logger.Warn(logSender, t.connectionID, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
@ -49,7 +58,13 @@ func (t *Transfer) TransferError(err error) {
func (t *Transfer) ReadAt(p []byte, off int64) (n int, err error) { func (t *Transfer) ReadAt(p []byte, off int64) (n int, err error) {
t.lastActivity = time.Now() t.lastActivity = time.Now()
readed, e := t.file.ReadAt(p, off) readed, e := t.file.ReadAt(p, off)
t.lock.Lock()
t.bytesSent += int64(readed) t.bytesSent += int64(readed)
t.lock.Unlock()
if e != nil && e != io.EOF {
t.TransferError(e)
return readed, e
}
t.handleThrottle() t.handleThrottle()
return readed, e return readed, e
} }
@ -59,11 +74,18 @@ func (t *Transfer) ReadAt(p []byte, off int64) (n int, err error) {
func (t *Transfer) WriteAt(p []byte, off int64) (n int, err error) { func (t *Transfer) WriteAt(p []byte, off int64) (n int, err error) {
t.lastActivity = time.Now() t.lastActivity = time.Now()
if off < t.minWriteOffset { if off < t.minWriteOffset {
logger.Warn(logSender, t.connectionID, "Invalid write offset %v minimum valid value %v", off, t.minWriteOffset) err := fmt.Errorf("Invalid write offset: %v minimum valid value: %v", off, t.minWriteOffset)
return 0, fmt.Errorf("invalid write offset %v", off) t.TransferError(err)
return 0, err
} }
written, e := t.file.WriteAt(p, off) written, e := t.file.WriteAt(p, off)
t.lock.Lock()
t.bytesReceived += int64(written) t.bytesReceived += int64(written)
t.lock.Unlock()
if e != nil {
t.TransferError(e)
return written, e
}
t.handleThrottle() t.handleThrottle()
return written, e return written, e
} }
@ -74,15 +96,18 @@ func (t *Transfer) WriteAt(p []byte, off int64) (n int, err error) {
// If there is an error no action will be executed and, in atomic mode, we try to delete // If there is an error no action will be executed and, in atomic mode, we try to delete
// the temporary file // the temporary file
func (t *Transfer) Close() error { func (t *Transfer) Close() error {
err := t.file.Close() t.lock.Lock()
defer t.lock.Unlock()
if t.isFinished { if t.isFinished {
return err return errors.New("transfer already closed")
} }
err := t.file.Close()
t.isFinished = true t.isFinished = true
numFiles := 0 numFiles := 0
if t.isNewFile { if t.isNewFile {
numFiles = 1 numFiles = 1
} }
t.checkDownloadSize()
if t.transferType == transferUpload && t.file.Name() != t.path { if t.transferType == transferUpload && t.file.Name() != t.path {
if t.transferError == nil || uploadMode == uploadModeAtomicWithResume { if t.transferError == nil || uploadMode == uploadModeAtomicWithResume {
err = os.Rename(t.file.Name(), t.path) err = os.Rename(t.file.Name(), t.path)
@ -107,6 +132,11 @@ func (t *Transfer) Close() error {
logger.TransferLog(uploadLogSender, t.path, elapsed, t.bytesReceived, t.user.Username, t.connectionID, t.protocol) logger.TransferLog(uploadLogSender, t.path, elapsed, t.bytesReceived, t.user.Username, t.connectionID, t.protocol)
go executeAction(operationUpload, t.user.Username, t.path, "", "", t.bytesReceived+t.minWriteOffset) go executeAction(operationUpload, t.user.Username, t.path, "", "", t.bytesReceived+t.minWriteOffset)
} }
} else {
logger.Warn(logSender, t.connectionID, "transfer error: %v, path: %#v", t.transferError, t.path)
if err == nil {
err = t.transferError
}
} }
metrics.TransferCompleted(t.bytesSent, t.bytesReceived, t.transferType, t.transferError) metrics.TransferCompleted(t.bytesSent, t.bytesReceived, t.transferType, t.transferError)
removeTransfer(t) removeTransfer(t)
@ -116,6 +146,12 @@ func (t *Transfer) Close() error {
return err return err
} }
func (t *Transfer) checkDownloadSize() {
if t.transferType == transferDownload && t.transferError == nil && t.bytesSent < t.expectedSize {
t.transferError = fmt.Errorf("incomplete download: %v/%v bytes transferred", t.bytesSent, t.expectedSize)
}
}
func (t *Transfer) handleThrottle() { func (t *Transfer) handleThrottle() {
var wantedBandwidth int64 var wantedBandwidth int64
var trasferredBytes int64 var trasferredBytes int64