diff --git a/internal/ftpd/transfer.go b/internal/ftpd/transfer.go index 230df835..212d5957 100644 --- a/internal/ftpd/transfer.go +++ b/internal/ftpd/transfer.go @@ -32,7 +32,7 @@ type transfer struct { expectedOffset int64 } -func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *vfs.PipeReader, +func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter vfs.PipeWriter, pipeReader *vfs.PipeReader, expectedOffset int64) *transfer { var writer io.WriteCloser var reader io.ReadCloser diff --git a/internal/httpd/file.go b/internal/httpd/file.go index 20b801f8..a3478130 100644 --- a/internal/httpd/file.go +++ b/internal/httpd/file.go @@ -28,7 +28,7 @@ type httpdFile struct { isFinished bool } -func newHTTPDFile(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *vfs.PipeReader) *httpdFile { +func newHTTPDFile(baseTransfer *common.BaseTransfer, pipeWriter vfs.PipeWriter, pipeReader *vfs.PipeReader) *httpdFile { var writer io.WriteCloser var reader io.ReadCloser if baseTransfer.File != nil { diff --git a/internal/sftpd/transfer.go b/internal/sftpd/transfer.go index b53977a4..91317446 100644 --- a/internal/sftpd/transfer.go +++ b/internal/sftpd/transfer.go @@ -58,7 +58,7 @@ type transfer struct { isFinished bool } -func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *vfs.PipeReader, +func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter vfs.PipeWriter, pipeReader *vfs.PipeReader, errForRead error) *transfer { var writer writerAtCloser var reader readerAtCloser diff --git a/internal/vfs/azblobfs.go b/internal/vfs/azblobfs.go index 6412849c..cd46239d 100644 --- a/internal/vfs/azblobfs.go +++ b/internal/vfs/azblobfs.go @@ -224,7 +224,7 @@ func (fs *AzureBlobFs) Open(name string, offset int64) (File, *PipeReader, func( } // Create creates or opens the named file for writing -func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, *PipeWriter, func(), error) { +func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) { if checks&CheckParentDir != 0 { _, err := fs.Stat(path.Dir(name)) if err != nil { @@ -1195,7 +1195,7 @@ func (fs *AzureBlobFs) getCopyOptions() *blob.StartCopyFromURLOptions { return copyOptions } -func (fs *AzureBlobFs) downloadToWriter(name string, w *PipeWriter) error { +func (fs *AzureBlobFs) downloadToWriter(name string, w PipeWriter) error { fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name) ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout) defer cancelFn() diff --git a/internal/vfs/cryptfs.go b/internal/vfs/cryptfs.go index 265cb937..ab7bf4d2 100644 --- a/internal/vfs/cryptfs.go +++ b/internal/vfs/cryptfs.go @@ -154,7 +154,7 @@ func (fs *CryptFs) Open(name string, offset int64) (File, *PipeReader, func(), e } // Create creates or opens the named file for writing -func (fs *CryptFs) Create(name string, _, _ int) (File, *PipeWriter, func(), error) { +func (fs *CryptFs) Create(name string, _, _ int) (File, PipeWriter, func(), error) { f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) if err != nil { return nil, nil, nil, err diff --git a/internal/vfs/gcsfs.go b/internal/vfs/gcsfs.go index 3f4dee52..392b02b8 100644 --- a/internal/vfs/gcsfs.go +++ b/internal/vfs/gcsfs.go @@ -167,7 +167,7 @@ func (fs *GCSFs) Open(name string, offset int64) (File, *PipeReader, func(), err } // Create creates or opens the named file for writing -func (fs *GCSFs) Create(name string, flag, checks int) (File, *PipeWriter, func(), error) { +func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) { if checks&CheckParentDir != 0 { _, err := fs.Stat(path.Dir(name)) if err != nil { @@ -777,7 +777,7 @@ func (fs *GCSFs) setWriterAttrs(objectWriter *storage.Writer, contentType string } } -func (fs *GCSFs) downloadToWriter(name string, w *PipeWriter) error { +func (fs *GCSFs) downloadToWriter(name string, w PipeWriter) error { fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name) ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout) defer cancelFn() diff --git a/internal/vfs/httpfs.go b/internal/vfs/httpfs.go index 2ceaa207..655a0273 100644 --- a/internal/vfs/httpfs.go +++ b/internal/vfs/httpfs.go @@ -332,7 +332,7 @@ func (fs *HTTPFs) Open(name string, offset int64) (File, *PipeReader, func(), er } // Create creates or opens the named file for writing -func (fs *HTTPFs) Create(name string, flag, checks int) (File, *PipeWriter, func(), error) { +func (fs *HTTPFs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) { r, w, err := pipeat.PipeInDir(fs.localTempDir) if err != nil { return nil, nil, nil, err diff --git a/internal/vfs/osfs.go b/internal/vfs/osfs.go index d7aad2cd..2cbff48f 100644 --- a/internal/vfs/osfs.go +++ b/internal/vfs/osfs.go @@ -134,7 +134,7 @@ func (fs *OsFs) Open(name string, offset int64) (File, *PipeReader, func(), erro } // Create creates or opens the named file for writing -func (fs *OsFs) Create(name string, flag, _ int) (File, *PipeWriter, func(), error) { +func (fs *OsFs) Create(name string, flag, _ int) (File, PipeWriter, func(), error) { if !fs.useWriteBuffering(flag) { var err error var f *os.File diff --git a/internal/vfs/s3fs.go b/internal/vfs/s3fs.go index e9f12a7a..b9d0066e 100644 --- a/internal/vfs/s3fs.go +++ b/internal/vfs/s3fs.go @@ -241,7 +241,7 @@ func (fs *S3Fs) Open(name string, offset int64) (File, *PipeReader, func(), erro } // Create creates or opens the named file for writing -func (fs *S3Fs) Create(name string, flag, checks int) (File, *PipeWriter, func(), error) { +func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) { if checks&CheckParentDir != 0 { _, err := fs.Stat(path.Dir(name)) if err != nil { @@ -1050,7 +1050,7 @@ func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) { return nil, ErrStorageSizeUnavailable } -func (fs *S3Fs) downloadToWriter(name string, w *PipeWriter) error { +func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) error { fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name) ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout) defer cancelFn() diff --git a/internal/vfs/sftpfs.go b/internal/vfs/sftpfs.go index 6fe0926a..b0526e12 100644 --- a/internal/vfs/sftpfs.go +++ b/internal/vfs/sftpfs.go @@ -372,7 +372,7 @@ func (fs *SFTPFs) Open(name string, offset int64) (File, *PipeReader, func(), er } // Create creates or opens the named file for writing -func (fs *SFTPFs) Create(name string, flag, _ int) (File, *PipeWriter, func(), error) { +func (fs *SFTPFs) Create(name string, flag, _ int) (File, PipeWriter, func(), error) { client, err := fs.conn.getClient() if err != nil { return nil, nil, nil, err diff --git a/internal/vfs/vfs.go b/internal/vfs/vfs.go index f8a9fd30..7ad3a9d8 100644 --- a/internal/vfs/vfs.go +++ b/internal/vfs/vfs.go @@ -110,7 +110,7 @@ type Fs interface { Stat(name string) (os.FileInfo, error) Lstat(name string) (os.FileInfo, error) Open(name string, offset int64) (File, *PipeReader, func(), error) - Create(name string, flag, checks int) (File, *PipeWriter, func(), error) + Create(name string, flag, checks int) (File, PipeWriter, func(), error) Rename(source, target string) (int, int64, error) Remove(name string, isDir bool) error Mkdir(name string) error @@ -174,6 +174,15 @@ type File interface { Truncate(size int64) error } +// PipeWriter defines an interface representing a SFTPGo pipe writer +type PipeWriter interface { + io.Writer + io.WriterAt + io.Closer + Done(err error) + GetWrittenBytes() int64 +} + // Metadater defines an interface to implement to return metadata for a file type Metadater interface { Metadata() map[string]string @@ -707,16 +716,16 @@ func (c *CryptFsConfig) validate() error { return nil } -// PipeWriter defines a wrapper for pipeat.PipeWriterAt. -type PipeWriter struct { +// pipeWriter defines a wrapper for pipeat.PipeWriterAt. +type pipeWriter struct { *pipeat.PipeWriterAt err error done chan bool } // NewPipeWriter initializes a new PipeWriter -func NewPipeWriter(w *pipeat.PipeWriterAt) *PipeWriter { - return &PipeWriter{ +func NewPipeWriter(w *pipeat.PipeWriterAt) PipeWriter { + return &pipeWriter{ PipeWriterAt: w, err: nil, done: make(chan bool), @@ -724,7 +733,7 @@ func NewPipeWriter(w *pipeat.PipeWriterAt) *PipeWriter { } // Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any. -func (p *PipeWriter) Close() error { +func (p *pipeWriter) Close() error { p.PipeWriterAt.Close() //nolint:errcheck // the returned error is always null <-p.done return p.err @@ -732,7 +741,7 @@ func (p *PipeWriter) Close() error { // Done unlocks other goroutines waiting on Close(). // It must be called when the upload ends -func (p *PipeWriter) Done(err error) { +func (p *pipeWriter) Done(err error) { p.err = err p.done <- true } diff --git a/internal/webdavd/file.go b/internal/webdavd/file.go index 1501efce..84faafaf 100644 --- a/internal/webdavd/file.go +++ b/internal/webdavd/file.go @@ -51,7 +51,7 @@ type webDavFile struct { readTried atomic.Bool } -func newWebDavFile(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt) *webDavFile { +func newWebDavFile(baseTransfer *common.BaseTransfer, pipeWriter vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt) *webDavFile { var writer io.WriteCloser var reader io.ReadCloser if baseTransfer.File != nil {