sftpgo-mirror/webdavd/file.go

290 lines
6.5 KiB
Go
Raw Normal View History

2020-08-11 21:56:10 +00:00
package webdavd
import (
"context"
"errors"
"io"
"mime"
"os"
"path"
"sync/atomic"
"time"
"github.com/eikenb/pipeat"
"golang.org/x/net/webdav"
"github.com/drakkan/sftpgo/common"
"github.com/drakkan/sftpgo/vfs"
)
var errTransferAborted = errors.New("transfer aborted")
type webDavFile struct {
*common.BaseTransfer
writer io.WriteCloser
reader io.ReadCloser
isFinished bool
startOffset int64
info os.FileInfo
2020-08-11 21:56:10 +00:00
}
func newWebDavFile(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt,
info os.FileInfo) *webDavFile {
2020-08-11 21:56:10 +00:00
var writer io.WriteCloser
var reader io.ReadCloser
if baseTransfer.File != nil {
writer = baseTransfer.File
reader = baseTransfer.File
} else if pipeWriter != nil {
writer = pipeWriter
} else if pipeReader != nil {
reader = pipeReader
}
return &webDavFile{
BaseTransfer: baseTransfer,
writer: writer,
reader: reader,
isFinished: false,
startOffset: 0,
info: info,
}
}
type webDavFileInfo struct {
os.FileInfo
file *webDavFile
}
// ContentType implements webdav.ContentTyper interface
func (fi webDavFileInfo) ContentType(ctx context.Context) (string, error) {
2020-09-28 20:12:46 +00:00
contentType := mime.TypeByExtension(path.Ext(fi.file.GetVirtualPath()))
if contentType != "" {
2020-08-11 21:56:10 +00:00
return contentType, nil
}
if c, ok := fi.file.Fs.(vfs.MimeTyper); ok {
2020-08-11 21:56:10 +00:00
contentType, err := c.GetMimeType(fi.file.GetFsPath())
return contentType, err
}
return contentType, webdav.ErrNotImplemented
}
// Readdir reads directory entries from the handle
func (f *webDavFile) Readdir(count int) ([]os.FileInfo, error) {
if f.isDir() {
return f.Connection.ListDir(f.GetFsPath(), f.GetVirtualPath())
}
return nil, errors.New("we can only list directories contents, this is not a directory")
}
// Stat the handle
func (f *webDavFile) Stat() (os.FileInfo, error) {
if f.info != nil {
fi := webDavFileInfo{
FileInfo: f.info,
file: f,
}
return fi, nil
}
f.Lock()
closed := f.isFinished
errUpload := f.ErrTransfer
f.Unlock()
if f.GetType() == common.TransferUpload && closed && errUpload == nil {
info := webDavFileInfo{
2020-08-22 12:52:17 +00:00
FileInfo: vfs.NewFileInfo(f.GetFsPath(), false, atomic.LoadInt64(&f.BytesReceived), time.Now(), false),
2020-08-11 21:56:10 +00:00
file: f,
}
return info, nil
}
info, err := f.Fs.Stat(f.GetFsPath())
2020-08-11 21:56:10 +00:00
if err != nil {
return info, err
}
fi := webDavFileInfo{
FileInfo: info,
file: f,
}
return fi, err
}
// Read reads the contents to downloads.
func (f *webDavFile) Read(p []byte) (n int, err error) {
if atomic.LoadInt32(&f.AbortTransfer) == 1 {
return 0, errTransferAborted
}
f.Connection.UpdateLastActivity()
// the file is read sequentially we don't need to check for concurrent reads and so
// lock the transfer while opening the remote file
if f.reader == nil {
if f.GetType() != common.TransferDownload {
f.TransferError(common.ErrOpUnsupported)
return 0, common.ErrOpUnsupported
}
2020-10-25 07:18:48 +00:00
_, r, cancelFn, e := f.Fs.Open(f.GetFsPath(), 0)
2020-08-11 21:56:10 +00:00
f.Lock()
f.reader = r
2020-10-25 07:18:48 +00:00
f.ErrTransfer = e
2020-08-11 21:56:10 +00:00
f.BaseTransfer.SetCancelFn(cancelFn)
f.startOffset = 0
f.Unlock()
2020-10-25 07:18:48 +00:00
if e != nil {
return 0, e
2020-08-11 21:56:10 +00:00
}
}
2020-10-25 07:18:48 +00:00
n, err = f.reader.Read(p)
atomic.AddInt64(&f.BytesSent, int64(n))
2020-08-11 21:56:10 +00:00
2020-10-25 07:18:48 +00:00
if err != nil && err != io.EOF {
f.TransferError(err)
return
2020-08-11 21:56:10 +00:00
}
f.HandleThrottle()
2020-10-25 07:18:48 +00:00
return
2020-08-11 21:56:10 +00:00
}
// Write writes the uploaded contents.
func (f *webDavFile) Write(p []byte) (n int, err error) {
if atomic.LoadInt32(&f.AbortTransfer) == 1 {
return 0, errTransferAborted
}
f.Connection.UpdateLastActivity()
2020-10-25 07:18:48 +00:00
n, err = f.writer.Write(p)
atomic.AddInt64(&f.BytesReceived, int64(n))
2020-08-11 21:56:10 +00:00
2020-10-25 07:18:48 +00:00
if f.MaxWriteSize > 0 && err == nil && atomic.LoadInt64(&f.BytesReceived) > f.MaxWriteSize {
err = common.ErrQuotaExceeded
2020-08-11 21:56:10 +00:00
}
2020-10-25 07:18:48 +00:00
if err != nil {
f.TransferError(err)
return
2020-08-11 21:56:10 +00:00
}
f.HandleThrottle()
2020-10-25 07:18:48 +00:00
return
2020-08-11 21:56:10 +00:00
}
// Seek sets the offset for the next Read or Write on the writer to offset,
// interpreted according to whence: 0 means relative to the origin of the file,
// 1 means relative to the current offset, and 2 means relative to the end.
// It returns the new offset and an error, if any.
func (f *webDavFile) Seek(offset int64, whence int) (int64, error) {
f.Connection.UpdateLastActivity()
if f.File != nil {
ret, err := f.File.Seek(offset, whence)
if err != nil {
f.TransferError(err)
}
return ret, err
}
if f.GetType() == common.TransferDownload {
readOffset := f.startOffset + atomic.LoadInt64(&f.BytesSent)
if offset == 0 && readOffset == 0 {
if whence == io.SeekStart {
return 0, nil
} else if whence == io.SeekEnd && f.info != nil {
return f.info.Size(), nil
}
}
// close the reader and create a new one at startByte
if f.reader != nil {
f.reader.Close() //nolint:errcheck
}
startByte := int64(0)
atomic.StoreInt64(&f.BytesReceived, 0)
atomic.StoreInt64(&f.BytesSent, 0)
switch whence {
case io.SeekStart:
startByte = offset
case io.SeekCurrent:
startByte = readOffset + offset
case io.SeekEnd:
if f.info != nil {
startByte = f.info.Size() - offset
} else {
err := errors.New("unable to get file size, seek from end not possible")
f.TransferError(err)
return 0, err
}
}
_, r, cancelFn, err := f.Fs.Open(f.GetFsPath(), startByte)
2020-08-11 21:56:10 +00:00
f.Lock()
if err == nil {
f.startOffset = startByte
f.reader = r
}
f.ErrTransfer = err
f.BaseTransfer.SetCancelFn(cancelFn)
f.Unlock()
return startByte, err
}
return 0, common.ErrOpUnsupported
}
// Close closes the open directory or the current transfer
func (f *webDavFile) Close() error {
if err := f.setFinished(); err != nil {
return err
}
err := f.closeIO()
if f.isTransfer() {
errBaseClose := f.BaseTransfer.Close()
if errBaseClose != nil {
err = errBaseClose
}
} else {
f.Connection.RemoveTransfer(f.BaseTransfer)
}
return f.Connection.GetFsError(err)
}
func (f *webDavFile) closeIO() error {
var err error
if f.File != nil {
err = f.File.Close()
} else if f.writer != nil {
err = f.writer.Close()
f.Lock()
// we set ErrTransfer here so quota is not updated, in this case the uploads are atomic
if err != nil && f.ErrTransfer == nil {
f.ErrTransfer = err
}
f.Unlock()
} else if f.reader != nil {
err = f.reader.Close()
}
return err
}
func (f *webDavFile) setFinished() error {
f.Lock()
defer f.Unlock()
if f.isFinished {
return common.ErrTransferClosed
}
f.isFinished = true
return nil
}
func (f *webDavFile) isDir() bool {
if f.info == nil {
return false
}
return f.info.IsDir()
}
func (f *webDavFile) isTransfer() bool {
if f.GetType() == common.TransferDownload {
return (f.reader != nil)
}
return true
}