Cloud backends: add support for FTP REST command
So partial downloads are now supported as for local fs
This commit is contained in:
parent
8839c34d53
commit
fa41bfd06a
12 changed files with 74 additions and 45 deletions
|
@ -241,7 +241,7 @@ func (c *Connection) ReadDir(name string) ([]os.FileInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetHandle implements ClientDriverExtentionFileTransfer
|
// GetHandle implements ClientDriverExtentionFileTransfer
|
||||||
func (c *Connection) GetHandle(name string, flags int) (ftpserver.FileTransfer, error) {
|
func (c *Connection) GetHandle(name string, flags int, offset int64) (ftpserver.FileTransfer, error) {
|
||||||
c.UpdateLastActivity()
|
c.UpdateLastActivity()
|
||||||
|
|
||||||
p, err := c.Fs.ResolvePath(name)
|
p, err := c.Fs.ResolvePath(name)
|
||||||
|
@ -251,10 +251,10 @@ func (c *Connection) GetHandle(name string, flags int) (ftpserver.FileTransfer,
|
||||||
if flags&os.O_WRONLY != 0 {
|
if flags&os.O_WRONLY != 0 {
|
||||||
return c.uploadFile(p, name, flags)
|
return c.uploadFile(p, name, flags)
|
||||||
}
|
}
|
||||||
return c.downloadFile(p, name)
|
return c.downloadFile(p, name, offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Connection) downloadFile(fsPath, ftpPath string) (ftpserver.FileTransfer, error) {
|
func (c *Connection) downloadFile(fsPath, ftpPath string, offset int64) (ftpserver.FileTransfer, error) {
|
||||||
if !c.User.HasPerm(dataprovider.PermDownload, path.Dir(ftpPath)) {
|
if !c.User.HasPerm(dataprovider.PermDownload, path.Dir(ftpPath)) {
|
||||||
return nil, c.GetPermissionDeniedError()
|
return nil, c.GetPermissionDeniedError()
|
||||||
}
|
}
|
||||||
|
@ -264,7 +264,7 @@ func (c *Connection) downloadFile(fsPath, ftpPath string) (ftpserver.FileTransfe
|
||||||
return nil, c.GetPermissionDeniedError()
|
return nil, c.GetPermissionDeniedError()
|
||||||
}
|
}
|
||||||
|
|
||||||
file, r, cancelFn, err := c.Fs.Open(fsPath)
|
file, r, cancelFn, err := c.Fs.Open(fsPath, offset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.Log(logger.LevelWarn, "could not open file %#v for reading: %+v", fsPath, err)
|
c.Log(logger.LevelWarn, "could not open file %#v for reading: %+v", fsPath, err)
|
||||||
return nil, c.GetFsError(err)
|
return nil, c.GetFsError(err)
|
||||||
|
@ -272,7 +272,7 @@ func (c *Connection) downloadFile(fsPath, ftpPath string) (ftpserver.FileTransfe
|
||||||
|
|
||||||
baseTransfer := common.NewBaseTransfer(file, c.BaseConnection, cancelFn, fsPath, ftpPath, common.TransferDownload,
|
baseTransfer := common.NewBaseTransfer(file, c.BaseConnection, cancelFn, fsPath, ftpPath, common.TransferDownload,
|
||||||
0, 0, false)
|
0, 0, false)
|
||||||
t := newTransfer(baseTransfer, nil, r, 0)
|
t := newTransfer(baseTransfer, nil, r, 0, offset)
|
||||||
|
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
@ -330,7 +330,7 @@ func (c *Connection) handleFTPUploadToNewFile(resolvedPath, filePath, requestPat
|
||||||
|
|
||||||
baseTransfer := common.NewBaseTransfer(file, c.BaseConnection, cancelFn, resolvedPath, requestPath,
|
baseTransfer := common.NewBaseTransfer(file, c.BaseConnection, cancelFn, resolvedPath, requestPath,
|
||||||
common.TransferUpload, 0, 0, true)
|
common.TransferUpload, 0, 0, true)
|
||||||
t := newTransfer(baseTransfer, w, nil, quotaResult.GetRemainingSize())
|
t := newTransfer(baseTransfer, w, nil, quotaResult.GetRemainingSize(), 0)
|
||||||
|
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
@ -395,7 +395,7 @@ func (c *Connection) handleFTPUploadToExistingFile(flags int, resolvedPath, file
|
||||||
|
|
||||||
baseTransfer := common.NewBaseTransfer(file, c.BaseConnection, cancelFn, resolvedPath, requestPath,
|
baseTransfer := common.NewBaseTransfer(file, c.BaseConnection, cancelFn, resolvedPath, requestPath,
|
||||||
common.TransferUpload, minWriteOffset, initialSize, false)
|
common.TransferUpload, minWriteOffset, initialSize, false)
|
||||||
t := newTransfer(baseTransfer, w, nil, maxWriteSize)
|
t := newTransfer(baseTransfer, w, nil, maxWriteSize, 0)
|
||||||
|
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -292,7 +292,7 @@ func TestResolvePathErrors(t *testing.T) {
|
||||||
if assert.Error(t, err) {
|
if assert.Error(t, err) {
|
||||||
assert.EqualError(t, err, common.ErrGenericFailure.Error())
|
assert.EqualError(t, err, common.ErrGenericFailure.Error())
|
||||||
}
|
}
|
||||||
_, err = connection.GetHandle("", 0)
|
_, err = connection.GetHandle("", 0, 0)
|
||||||
if assert.Error(t, err) {
|
if assert.Error(t, err) {
|
||||||
assert.EqualError(t, err, common.ErrGenericFailure.Error())
|
assert.EqualError(t, err, common.ErrGenericFailure.Error())
|
||||||
}
|
}
|
||||||
|
@ -400,9 +400,11 @@ func TestTransferErrors(t *testing.T) {
|
||||||
}
|
}
|
||||||
baseTransfer := common.NewBaseTransfer(file, connection.BaseConnection, nil, file.Name(), testfile, common.TransferDownload,
|
baseTransfer := common.NewBaseTransfer(file, connection.BaseConnection, nil, file.Name(), testfile, common.TransferDownload,
|
||||||
0, 0, false)
|
0, 0, false)
|
||||||
tr := newTransfer(baseTransfer, nil, nil, 0)
|
tr := newTransfer(baseTransfer, nil, nil, 0, 0)
|
||||||
err = tr.Close()
|
err = tr.Close()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
_, err = tr.Seek(10, 0)
|
||||||
|
assert.Error(t, err)
|
||||||
buf := make([]byte, 64)
|
buf := make([]byte, 64)
|
||||||
_, err = tr.Read(buf)
|
_, err = tr.Read(buf)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
|
@ -416,7 +418,10 @@ func TestTransferErrors(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
baseTransfer = common.NewBaseTransfer(nil, connection.BaseConnection, nil, testfile, testfile,
|
baseTransfer = common.NewBaseTransfer(nil, connection.BaseConnection, nil, testfile, testfile,
|
||||||
common.TransferUpload, 0, 0, false)
|
common.TransferUpload, 0, 0, false)
|
||||||
tr = newTransfer(baseTransfer, nil, r, 0)
|
tr = newTransfer(baseTransfer, nil, r, 0, 10)
|
||||||
|
pos, err := tr.Seek(10, 0)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, pos, tr.expectedOffset)
|
||||||
err = tr.closeIO()
|
err = tr.closeIO()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
@ -425,11 +430,7 @@ func TestTransferErrors(t *testing.T) {
|
||||||
pipeWriter := vfs.NewPipeWriter(w)
|
pipeWriter := vfs.NewPipeWriter(w)
|
||||||
baseTransfer = common.NewBaseTransfer(nil, connection.BaseConnection, nil, testfile, testfile,
|
baseTransfer = common.NewBaseTransfer(nil, connection.BaseConnection, nil, testfile, testfile,
|
||||||
common.TransferUpload, 0, 0, false)
|
common.TransferUpload, 0, 0, false)
|
||||||
tr = newTransfer(baseTransfer, pipeWriter, nil, 0)
|
tr = newTransfer(baseTransfer, pipeWriter, nil, 0, 0)
|
||||||
_, err = tr.Seek(1, 0)
|
|
||||||
if assert.Error(t, err) {
|
|
||||||
assert.EqualError(t, err, common.ErrOpUnsupported.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
err = r.Close()
|
err = r.Close()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
@ -440,6 +441,10 @@ func TestTransferErrors(t *testing.T) {
|
||||||
}()
|
}()
|
||||||
err = tr.closeIO()
|
err = tr.closeIO()
|
||||||
assert.EqualError(t, err, errFake.Error())
|
assert.EqualError(t, err, errFake.Error())
|
||||||
|
_, err = tr.Seek(1, 0)
|
||||||
|
if assert.Error(t, err) {
|
||||||
|
assert.EqualError(t, err, common.ErrOpUnsupported.Error())
|
||||||
|
}
|
||||||
err = os.Remove(testfile)
|
err = os.Remove(testfile)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,22 +15,25 @@ import (
|
||||||
"github.com/drakkan/sftpgo/logger"
|
"github.com/drakkan/sftpgo/logger"
|
||||||
"github.com/drakkan/sftpgo/metrics"
|
"github.com/drakkan/sftpgo/metrics"
|
||||||
"github.com/drakkan/sftpgo/utils"
|
"github.com/drakkan/sftpgo/utils"
|
||||||
|
"github.com/drakkan/sftpgo/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Server implements the ftpserverlib MainDriver interface
|
// Server implements the ftpserverlib MainDriver interface
|
||||||
type Server struct {
|
type Server struct {
|
||||||
config *Configuration
|
config *Configuration
|
||||||
certMgr *common.CertManager
|
certMgr *common.CertManager
|
||||||
initialMsg string
|
initialMsg string
|
||||||
|
statusBanner string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer returns a new FTP server driver
|
// NewServer returns a new FTP server driver
|
||||||
func NewServer(config *Configuration, configDir string) (*Server, error) {
|
func NewServer(config *Configuration, configDir string) (*Server, error) {
|
||||||
var err error
|
var err error
|
||||||
server := &Server{
|
server := &Server{
|
||||||
config: config,
|
config: config,
|
||||||
certMgr: nil,
|
certMgr: nil,
|
||||||
initialMsg: config.Banner,
|
initialMsg: config.Banner,
|
||||||
|
statusBanner: fmt.Sprintf("SFTPGo %v FTP Server", version.Get().Version),
|
||||||
}
|
}
|
||||||
certificateFile := getConfigPath(config.CertificateFile, configDir)
|
certificateFile := getConfigPath(config.CertificateFile, configDir)
|
||||||
certificateKeyFile := getConfigPath(config.CertificateKeyFile, configDir)
|
certificateKeyFile := getConfigPath(config.CertificateKeyFile, configDir)
|
||||||
|
@ -86,7 +89,8 @@ func (s *Server) GetSettings() (*ftpserver.Settings, error) {
|
||||||
PassiveTransferPortRange: portRange,
|
PassiveTransferPortRange: portRange,
|
||||||
ActiveTransferPortNon20: s.config.ActiveTransfersPortNon20,
|
ActiveTransferPortNon20: s.config.ActiveTransfersPortNon20,
|
||||||
IdleTimeout: -1,
|
IdleTimeout: -1,
|
||||||
ConnectionTimeout: 30,
|
ConnectionTimeout: 20,
|
||||||
|
Banner: s.statusBanner,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package ftpd
|
package ftpd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
@ -14,14 +15,15 @@ import (
|
||||||
// It implements the ftpserver.FileTransfer interface to handle files downloads and uploads
|
// It implements the ftpserver.FileTransfer interface to handle files downloads and uploads
|
||||||
type transfer struct {
|
type transfer struct {
|
||||||
*common.BaseTransfer
|
*common.BaseTransfer
|
||||||
writer io.WriteCloser
|
writer io.WriteCloser
|
||||||
reader io.ReadCloser
|
reader io.ReadCloser
|
||||||
isFinished bool
|
isFinished bool
|
||||||
maxWriteSize int64
|
maxWriteSize int64
|
||||||
|
expectedOffset int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt,
|
func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt,
|
||||||
maxWriteSize int64) *transfer {
|
maxWriteSize, expectedOffset int64) *transfer {
|
||||||
var writer io.WriteCloser
|
var writer io.WriteCloser
|
||||||
var reader io.ReadCloser
|
var reader io.ReadCloser
|
||||||
if baseTransfer.File != nil {
|
if baseTransfer.File != nil {
|
||||||
|
@ -33,11 +35,12 @@ func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter,
|
||||||
reader = pipeReader
|
reader = pipeReader
|
||||||
}
|
}
|
||||||
return &transfer{
|
return &transfer{
|
||||||
BaseTransfer: baseTransfer,
|
BaseTransfer: baseTransfer,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
reader: reader,
|
reader: reader,
|
||||||
isFinished: false,
|
isFinished: false,
|
||||||
maxWriteSize: maxWriteSize,
|
maxWriteSize: maxWriteSize,
|
||||||
|
expectedOffset: expectedOffset,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,8 +84,16 @@ func (t *transfer) Write(p []byte) (n int, err error) {
|
||||||
// Seek sets the offset to resume an upload or a download
|
// Seek sets the offset to resume an upload or a download
|
||||||
func (t *transfer) Seek(offset int64, whence int) (int64, error) {
|
func (t *transfer) Seek(offset int64, whence int) (int64, error) {
|
||||||
if t.File != nil {
|
if t.File != nil {
|
||||||
return t.File.Seek(offset, whence)
|
ret, err := t.File.Seek(offset, whence)
|
||||||
|
if err != nil {
|
||||||
|
t.TransferError(err)
|
||||||
|
}
|
||||||
|
return ret, err
|
||||||
}
|
}
|
||||||
|
if t.reader != nil && t.expectedOffset == offset && whence == io.SeekStart {
|
||||||
|
return offset, nil
|
||||||
|
}
|
||||||
|
t.TransferError(errors.New("seek is unsupported for this transfer"))
|
||||||
return 0, common.ErrOpUnsupported
|
return 0, common.ErrOpUnsupported
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -48,6 +48,6 @@ require (
|
||||||
|
|
||||||
replace (
|
replace (
|
||||||
github.com/jlaffaye/ftp => github.com/drakkan/ftp v0.0.0-20200730125632-b21eac28818c
|
github.com/jlaffaye/ftp => github.com/drakkan/ftp v0.0.0-20200730125632-b21eac28818c
|
||||||
github.com/fclairamb/ftpserverlib => github.com/drakkan/ftpserverlib v0.0.0-20200731174440-5032a4cc72f5
|
github.com/fclairamb/ftpserverlib => github.com/drakkan/ftpserverlib v0.0.0-20200731183125-82c4b2b9bb35
|
||||||
golang.org/x/crypto => github.com/drakkan/crypto v0.0.0-20200731130417-7674a892f9b1
|
golang.org/x/crypto => github.com/drakkan/crypto v0.0.0-20200731130417-7674a892f9b1
|
||||||
)
|
)
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -105,8 +105,8 @@ github.com/drakkan/crypto v0.0.0-20200731130417-7674a892f9b1 h1:EjZwDKOSCvUUXilY
|
||||||
github.com/drakkan/crypto v0.0.0-20200731130417-7674a892f9b1/go.mod h1:v3bhWOXGYda7H5d2s5t9XA6th3fxW3s0MQxU1R96G/w=
|
github.com/drakkan/crypto v0.0.0-20200731130417-7674a892f9b1/go.mod h1:v3bhWOXGYda7H5d2s5t9XA6th3fxW3s0MQxU1R96G/w=
|
||||||
github.com/drakkan/ftp v0.0.0-20200730125632-b21eac28818c h1:QSXIWohSNn0negBVSKEjKTpdpGEsW7weVW8QNzviLHY=
|
github.com/drakkan/ftp v0.0.0-20200730125632-b21eac28818c h1:QSXIWohSNn0negBVSKEjKTpdpGEsW7weVW8QNzviLHY=
|
||||||
github.com/drakkan/ftp v0.0.0-20200730125632-b21eac28818c/go.mod h1:2lmrmq866uF2tnje75wQHzmPXhmSWUt7Gyx2vgK1RCU=
|
github.com/drakkan/ftp v0.0.0-20200730125632-b21eac28818c/go.mod h1:2lmrmq866uF2tnje75wQHzmPXhmSWUt7Gyx2vgK1RCU=
|
||||||
github.com/drakkan/ftpserverlib v0.0.0-20200731174440-5032a4cc72f5 h1:BdTp34B+hh3MXvaf50ALb3Hx6QXHiprx01IC+u/VUnU=
|
github.com/drakkan/ftpserverlib v0.0.0-20200731183125-82c4b2b9bb35 h1:fS1f/T5ruUySzUD5m4VFhnEXjgPAUm/KwLZG+s/W83E=
|
||||||
github.com/drakkan/ftpserverlib v0.0.0-20200731174440-5032a4cc72f5/go.mod h1:Jwd+zOP3T0kwiCQcgjpu3VWtc7AI6Nu4UPN2HYqaniM=
|
github.com/drakkan/ftpserverlib v0.0.0-20200731183125-82c4b2b9bb35/go.mod h1:Jwd+zOP3T0kwiCQcgjpu3VWtc7AI6Nu4UPN2HYqaniM=
|
||||||
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||||
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
|
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
|
||||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||||
|
|
|
@ -66,7 +66,7 @@ func (c *Connection) Fileread(request *sftp.Request) (io.ReaderAt, error) {
|
||||||
return nil, c.GetFsError(err)
|
return nil, c.GetFsError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
file, r, cancelFn, err := c.Fs.Open(p)
|
file, r, cancelFn, err := c.Fs.Open(p, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.Log(logger.LevelWarn, "could not open file %#v for reading: %+v", p, err)
|
c.Log(logger.LevelWarn, "could not open file %#v for reading: %+v", p, err)
|
||||||
return nil, c.GetFsError(err)
|
return nil, c.GetFsError(err)
|
||||||
|
|
|
@ -475,7 +475,7 @@ func (c *scpCommand) handleDownload(filePath string) error {
|
||||||
c.sendErrorMessage(common.ErrPermissionDenied)
|
c.sendErrorMessage(common.ErrPermissionDenied)
|
||||||
}
|
}
|
||||||
|
|
||||||
file, r, cancelFn, err := c.connection.Fs.Open(p)
|
file, r, cancelFn, err := c.connection.Fs.Open(p, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.connection.Log(logger.LevelError, "could not open file %#v for reading: %v", p, err)
|
c.connection.Log(logger.LevelError, "could not open file %#v for reading: %v", p, err)
|
||||||
c.sendErrorMessage(c.connection.GetFsError(err))
|
c.sendErrorMessage(c.connection.GetFsError(err))
|
||||||
|
|
|
@ -133,7 +133,7 @@ func (fs GCSFs) Lstat(name string) (os.FileInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens the named file for reading
|
// Open opens the named file for reading
|
||||||
func (fs GCSFs) Open(name string) (*os.File, *pipeat.PipeReaderAt, func(), error) {
|
func (fs GCSFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
|
||||||
r, w, err := pipeat.AsyncWriterPipeInDir(fs.localTempDir)
|
r, w, err := pipeat.AsyncWriterPipeInDir(fs.localTempDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
|
@ -141,7 +141,11 @@ func (fs GCSFs) Open(name string) (*os.File, *pipeat.PipeReaderAt, func(), error
|
||||||
bkt := fs.svc.Bucket(fs.config.Bucket)
|
bkt := fs.svc.Bucket(fs.config.Bucket)
|
||||||
obj := bkt.Object(name)
|
obj := bkt.Object(name)
|
||||||
ctx, cancelFn := context.WithCancel(context.Background())
|
ctx, cancelFn := context.WithCancel(context.Background())
|
||||||
objectReader, err := obj.NewReader(ctx)
|
objectReader, err := obj.NewRangeReader(ctx, offset, -1)
|
||||||
|
if err == nil && offset > 0 && objectReader.Attrs.ContentEncoding == "gzip" {
|
||||||
|
err = fmt.Errorf("Range request is not possible for gzip content encoding, requested offset %v", offset)
|
||||||
|
objectReader.Close()
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Close()
|
r.Close()
|
||||||
w.Close()
|
w.Close()
|
||||||
|
|
|
@ -59,7 +59,7 @@ func (OsFs) Lstat(name string) (os.FileInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens the named file for reading
|
// Open opens the named file for reading
|
||||||
func (OsFs) Open(name string) (*os.File, *pipeat.PipeReaderAt, func(), error) {
|
func (OsFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
|
||||||
f, err := os.Open(name)
|
f, err := os.Open(name)
|
||||||
return f, nil, nil, err
|
return f, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
11
vfs/s3fs.go
11
vfs/s3fs.go
|
@ -163,19 +163,24 @@ func (fs S3Fs) Lstat(name string) (os.FileInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens the named file for reading
|
// Open opens the named file for reading
|
||||||
func (fs S3Fs) Open(name string) (*os.File, *pipeat.PipeReaderAt, func(), error) {
|
func (fs S3Fs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) {
|
||||||
r, w, err := pipeat.AsyncWriterPipeInDir(fs.localTempDir)
|
r, w, err := pipeat.AsyncWriterPipeInDir(fs.localTempDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
ctx, cancelFn := context.WithCancel(context.Background())
|
ctx, cancelFn := context.WithCancel(context.Background())
|
||||||
downloader := s3manager.NewDownloaderWithClient(fs.svc)
|
downloader := s3manager.NewDownloaderWithClient(fs.svc)
|
||||||
|
var streamRange *string
|
||||||
|
if offset > 0 {
|
||||||
|
streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer cancelFn()
|
defer cancelFn()
|
||||||
key := name
|
|
||||||
n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{
|
n, err := downloader.DownloadWithContext(ctx, w, &s3.GetObjectInput{
|
||||||
Bucket: aws.String(fs.config.Bucket),
|
Bucket: aws.String(fs.config.Bucket),
|
||||||
Key: aws.String(key),
|
Key: aws.String(name),
|
||||||
|
Range: streamRange,
|
||||||
})
|
})
|
||||||
w.CloseWithError(err) //nolint:errcheck // the returned error is always null
|
w.CloseWithError(err) //nolint:errcheck // the returned error is always null
|
||||||
fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
|
fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
|
||||||
|
|
|
@ -22,7 +22,7 @@ type Fs interface {
|
||||||
ConnectionID() string
|
ConnectionID() string
|
||||||
Stat(name string) (os.FileInfo, error)
|
Stat(name string) (os.FileInfo, error)
|
||||||
Lstat(name string) (os.FileInfo, error)
|
Lstat(name string) (os.FileInfo, error)
|
||||||
Open(name string) (*os.File, *pipeat.PipeReaderAt, func(), error)
|
Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error)
|
||||||
Create(name string, flag int) (*os.File, *PipeWriter, func(), error)
|
Create(name string, flag int) (*os.File, *PipeWriter, func(), error)
|
||||||
Rename(source, target string) error
|
Rename(source, target string) error
|
||||||
Remove(name string, isDir bool) error
|
Remove(name string, isDir bool) error
|
||||||
|
|
Loading…
Reference in a new issue