add pipeReaderAt and pipeWriterAt interfaces

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino 2024-08-25 15:17:17 +02:00
parent 56ef9355da
commit dc42680e1c
No known key found for this signature in database
GPG key ID: 935D2952DEC4EECF
10 changed files with 59 additions and 40 deletions

View file

@ -42,7 +42,6 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/eikenb/pipeat"
"github.com/google/uuid"
"github.com/pkg/sftp"
@ -213,7 +212,7 @@ func (fs *AzureBlobFs) Lstat(name string) (os.FileInfo, error) {
// Open opens the named file for reading
func (fs *AzureBlobFs) Open(name string, offset int64) (File, PipeReader, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, fs.config.DownloadPartSize*int64(fs.config.DownloadConcurrency)+1)
if err != nil {
return nil, nil, nil, err
}
@ -241,7 +240,7 @@ func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, PipeWriter,
return nil, nil, nil, err
}
}
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, fs.config.UploadPartSize+1024*1024)
if err != nil {
return nil, nil, nil, err
}

View file

@ -24,7 +24,6 @@ import (
"net/http"
"os"
"github.com/eikenb/pipeat"
"github.com/minio/sio"
"golang.org/x/crypto/hkdf"
@ -89,7 +88,7 @@ func (fs *CryptFs) Open(name string, offset int64) (File, PipeReader, func(), er
f.Close()
return nil, nil, nil, err
}
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, 0)
if err != nil {
f.Close()
return nil, nil, nil, err
@ -175,7 +174,7 @@ func (fs *CryptFs) Create(name string, _, _ int) (File, PipeWriter, func(), erro
f.Close()
return nil, nil, nil, err
}
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, 0)
if err != nil {
f.Close()
return nil, nil, nil, err

View file

@ -32,7 +32,6 @@ import (
"time"
"cloud.google.com/go/storage"
"github.com/eikenb/pipeat"
"github.com/pkg/sftp"
"github.com/rs/xid"
"google.golang.org/api/googleapi"
@ -89,13 +88,16 @@ func NewGCSFs(connectionID, localTempDir, mountPath string, config GCSFsConfig)
}
ctx := context.Background()
if fs.config.AutomaticCredentials > 0 {
fs.svc, err = storage.NewClient(ctx)
fs.svc, err = storage.NewClient(ctx, storage.WithJSONReads())
} else {
err = fs.config.Credentials.TryDecrypt()
if err != nil {
return fs, err
}
fs.svc, err = storage.NewClient(ctx, option.WithCredentialsJSON([]byte(fs.config.Credentials.GetPayload())))
fs.svc, err = storage.NewClient(ctx,
storage.WithJSONReads(),
option.WithCredentialsJSON([]byte(fs.config.Credentials.GetPayload())),
)
}
return fs, err
}
@ -128,7 +130,7 @@ func (fs *GCSFs) Lstat(name string) (os.FileInfo, error) {
// Open opens the named file for reading
func (fs *GCSFs) Open(name string, offset int64) (File, PipeReader, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, 0)
if err != nil {
return nil, nil, nil, err
}
@ -176,7 +178,11 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func()
return nil, nil, nil, err
}
}
r, w, err := pipeat.PipeInDir(fs.localTempDir)
chunkSize := googleapi.DefaultUploadChunkSize
if fs.config.UploadPartSize > 0 {
chunkSize = int(fs.config.UploadPartSize) * 1024 * 1024
}
r, w, err := createPipeFn(fs.localTempDir, int64(chunkSize+1024*1024))
if err != nil {
return nil, nil, nil, err
}
@ -220,9 +226,7 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func()
objectWriter = obj.NewWriter(ctx)
}
if fs.config.UploadPartSize > 0 {
objectWriter.ChunkSize = int(fs.config.UploadPartSize) * 1024 * 1024
}
objectWriter.ChunkSize = chunkSize
if fs.config.UploadPartMaxTime > 0 {
objectWriter.ChunkRetryDeadline = time.Duration(fs.config.UploadPartMaxTime) * time.Second
}

View file

@ -32,7 +32,6 @@ import (
"strings"
"time"
"github.com/eikenb/pipeat"
"github.com/pkg/sftp"
"github.com/sftpgo/sdk"
@ -317,7 +316,7 @@ func (fs *HTTPFs) Lstat(name string) (os.FileInfo, error) {
// Open opens the named file for reading
func (fs *HTTPFs) Open(name string, offset int64) (File, PipeReader, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, 0)
if err != nil {
return nil, nil, nil, err
}
@ -351,7 +350,7 @@ func (fs *HTTPFs) Open(name string, offset int64) (File, PipeReader, func(), err
// Create creates or opens the named file for writing
func (fs *HTTPFs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, 0)
if err != nil {
return nil, nil, nil, err
}

View file

@ -28,7 +28,6 @@ import (
"strings"
"time"
"github.com/eikenb/pipeat"
fscopy "github.com/otiai10/copy"
"github.com/pkg/sftp"
"github.com/rs/xid"
@ -116,7 +115,7 @@ func (fs *OsFs) Open(name string, offset int64) (File, PipeReader, func(), error
if fs.readBufferSize <= 0 {
return f, nil, nil, err
}
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, 0)
if err != nil {
f.Close()
return nil, nil, nil, err
@ -149,7 +148,7 @@ func (fs *OsFs) Create(name string, flag, _ int) (File, PipeWriter, func(), erro
if err != nil {
return nil, nil, nil, err
}
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, 0)
if err != nil {
f.Close()
return nil, nil, nil, err

View file

@ -49,7 +49,6 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/eikenb/pipeat"
"github.com/pkg/sftp"
"github.com/drakkan/sftpgo/v2/internal/logger"
@ -229,7 +228,7 @@ func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) {
// Open opens the named file for reading
func (fs *S3Fs) Open(name string, offset int64) (File, PipeReader, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, fs.config.DownloadPartSize*int64(fs.config.DownloadConcurrency)+1)
if err != nil {
return nil, nil, nil, err
}
@ -287,7 +286,7 @@ func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(),
return nil, nil, nil, err
}
}
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, fs.config.UploadPartSize+1024*1024)
if err != nil {
return nil, nil, nil, err
}

View file

@ -35,7 +35,6 @@ import (
"sync/atomic"
"time"
"github.com/eikenb/pipeat"
"github.com/pkg/sftp"
"github.com/robfig/cron/v3"
"github.com/rs/xid"
@ -405,7 +404,7 @@ func (fs *SFTPFs) Open(name string, offset int64) (File, PipeReader, func(), err
if fs.config.BufferSize == 0 {
return f, nil, nil, nil
}
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, 0)
if err != nil {
f.Close()
return nil, nil, nil, err
@ -445,7 +444,7 @@ func (fs *SFTPFs) Create(name string, flag, _ int) (File, PipeWriter, func(), er
if err != nil {
return nil, nil, nil, err
}
r, w, err := pipeat.PipeInDir(fs.localTempDir)
r, w, err := createPipeFn(fs.localTempDir, 0)
if err != nil {
f.Close()
return nil, nil, nil, err

View file

@ -73,6 +73,12 @@ var (
uploadMode int
)
var (
createPipeFn = func(dirPath string, _ int64) (pipeReaderAt, pipeWriterAt, error) {
return pipeat.PipeInDir(dirPath)
}
)
// SetAllowSelfConnections sets the desired behaviour for self connections
func SetAllowSelfConnections(value int) {
allowSelfConnections = value
@ -196,6 +202,22 @@ type PipeReader interface {
Metadata() map[string]string
}
type pipeReaderAt interface {
Read(p []byte) (int, error)
ReadAt(p []byte, offset int64) (int, error)
GetReadedBytes() int64
Close() error
CloseWithError(err error) error
}
type pipeWriterAt interface {
Write(p []byte) (int, error)
WriteAt(p []byte, offset int64) (int, error)
GetWrittenBytes() int64
Close() error
CloseWithError(err error) error
}
// DirLister defines an interface for a directory lister
type DirLister interface {
Next(limit int) ([]os.FileInfo, error)
@ -867,25 +889,25 @@ func (c *CryptFsConfig) validate() error {
return nil
}
// pipeWriter defines a wrapper for pipeat.PipeWriterAt.
// pipeWriter defines a wrapper for a pipeWriterAt.
type pipeWriter struct {
*pipeat.PipeWriterAt
pipeWriterAt
err error
done chan bool
}
// NewPipeWriter initializes a new PipeWriter
func NewPipeWriter(w *pipeat.PipeWriterAt) PipeWriter {
func NewPipeWriter(w pipeWriterAt) PipeWriter {
return &pipeWriter{
PipeWriterAt: w,
pipeWriterAt: w,
err: nil,
done: make(chan bool),
}
}
// Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any.
// Close waits for the upload to end, closes the pipeWriterAt and returns an error if any.
func (p *pipeWriter) Close() error {
p.PipeWriterAt.Close() //nolint:errcheck // the returned error is always null
p.pipeWriterAt.Close() //nolint:errcheck // the returned error is always null
<-p.done
return p.err
}
@ -897,10 +919,10 @@ func (p *pipeWriter) Done(err error) {
p.done <- true
}
func newPipeWriterAtOffset(w *pipeat.PipeWriterAt, offset int64) PipeWriter {
func newPipeWriterAtOffset(w pipeWriterAt, offset int64) PipeWriter {
return &pipeWriterAtOffset{
pipeWriter: &pipeWriter{
PipeWriterAt: w,
pipeWriterAt: w,
err: nil,
done: make(chan bool),
},
@ -929,15 +951,15 @@ func (p *pipeWriterAtOffset) Write(buf []byte) (int, error) {
}
// NewPipeReader initializes a new PipeReader
func NewPipeReader(r *pipeat.PipeReaderAt) PipeReader {
func NewPipeReader(r pipeReaderAt) PipeReader {
return &pipeReader{
PipeReaderAt: r,
pipeReaderAt: r,
}
}
// pipeReader defines a wrapper for pipeat.PipeReaderAt.
type pipeReader struct {
*pipeat.PipeReaderAt
pipeReaderAt
mu sync.RWMutex
metadata map[string]string
}

View file

@ -28,7 +28,6 @@ import (
"time"
"github.com/drakkan/webdav"
"github.com/eikenb/pipeat"
"github.com/drakkan/sftpgo/v2/internal/common"
"github.com/drakkan/sftpgo/v2/internal/dataprovider"
@ -52,7 +51,7 @@ type webDavFile struct {
readTried atomic.Bool
}
func newWebDavFile(baseTransfer *common.BaseTransfer, pipeWriter vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt) *webDavFile {
func newWebDavFile(baseTransfer *common.BaseTransfer, pipeWriter vfs.PipeWriter, pipeReader vfs.PipeReader) *webDavFile {
var writer io.WriteCloser
var reader io.ReadCloser
if baseTransfer.File != nil {

View file

@ -806,7 +806,7 @@ func TestTransferReadWriteErrors(t *testing.T) {
r, w, err := pipeat.Pipe()
assert.NoError(t, err)
davFile = newWebDavFile(baseTransfer, nil, r)
davFile = newWebDavFile(baseTransfer, nil, vfs.NewPipeReader(r))
davFile.Connection.RemoveTransfer(davFile.BaseTransfer)
davFile = newWebDavFile(baseTransfer, vfs.NewPipeWriter(w), nil)
davFile.Connection.RemoveTransfer(davFile.BaseTransfer)