transfer.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package ftpd
  2. import (
  3. "errors"
  4. "io"
  5. "sync/atomic"
  6. "github.com/eikenb/pipeat"
  7. "github.com/drakkan/sftpgo/common"
  8. "github.com/drakkan/sftpgo/vfs"
  9. )
  10. // transfer contains the transfer details for an upload or a download.
  11. // It implements the ftpserver.FileTransfer interface to handle files downloads and uploads
  12. type transfer struct {
  13. *common.BaseTransfer
  14. writer io.WriteCloser
  15. reader io.ReadCloser
  16. isFinished bool
  17. expectedOffset int64
  18. }
  19. func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt,
  20. expectedOffset int64) *transfer {
  21. var writer io.WriteCloser
  22. var reader io.ReadCloser
  23. if baseTransfer.File != nil {
  24. writer = baseTransfer.File
  25. reader = baseTransfer.File
  26. } else if pipeWriter != nil {
  27. writer = pipeWriter
  28. } else if pipeReader != nil {
  29. reader = pipeReader
  30. }
  31. return &transfer{
  32. BaseTransfer: baseTransfer,
  33. writer: writer,
  34. reader: reader,
  35. isFinished: false,
  36. expectedOffset: expectedOffset,
  37. }
  38. }
  39. // Read reads the contents to downloads.
  40. func (t *transfer) Read(p []byte) (n int, err error) {
  41. t.Connection.UpdateLastActivity()
  42. n, err = t.reader.Read(p)
  43. atomic.AddInt64(&t.BytesSent, int64(n))
  44. if err != nil && err != io.EOF {
  45. t.TransferError(err)
  46. return
  47. }
  48. t.HandleThrottle()
  49. return
  50. }
  51. // Write writes the uploaded contents.
  52. func (t *transfer) Write(p []byte) (n int, err error) {
  53. t.Connection.UpdateLastActivity()
  54. n, err = t.writer.Write(p)
  55. atomic.AddInt64(&t.BytesReceived, int64(n))
  56. if t.MaxWriteSize > 0 && err == nil && atomic.LoadInt64(&t.BytesReceived) > t.MaxWriteSize {
  57. err = common.ErrQuotaExceeded
  58. }
  59. if err != nil {
  60. t.TransferError(err)
  61. return
  62. }
  63. t.HandleThrottle()
  64. return
  65. }
  66. // Seek sets the offset to resume an upload or a download
  67. func (t *transfer) Seek(offset int64, whence int) (int64, error) {
  68. t.Connection.UpdateLastActivity()
  69. if t.File != nil {
  70. ret, err := t.File.Seek(offset, whence)
  71. if err != nil {
  72. t.TransferError(err)
  73. }
  74. return ret, err
  75. }
  76. if t.reader != nil && t.expectedOffset == offset && whence == io.SeekStart {
  77. return offset, nil
  78. }
  79. t.TransferError(errors.New("seek is unsupported for this transfer"))
  80. return 0, common.ErrOpUnsupported
  81. }
  82. // Close it is called when the transfer is completed.
  83. func (t *transfer) Close() error {
  84. if err := t.setFinished(); err != nil {
  85. return err
  86. }
  87. err := t.closeIO()
  88. errBaseClose := t.BaseTransfer.Close()
  89. if errBaseClose != nil {
  90. err = errBaseClose
  91. }
  92. return t.Connection.GetFsError(err)
  93. }
  94. func (t *transfer) closeIO() error {
  95. var err error
  96. if t.File != nil {
  97. err = t.File.Close()
  98. } else if t.writer != nil {
  99. err = t.writer.Close()
  100. t.Lock()
  101. // we set ErrTransfer here so quota is not updated, in this case the uploads are atomic
  102. if err != nil && t.ErrTransfer == nil {
  103. t.ErrTransfer = err
  104. }
  105. t.Unlock()
  106. } else if t.reader != nil {
  107. err = t.reader.Close()
  108. }
  109. return err
  110. }
  111. func (t *transfer) setFinished() error {
  112. t.Lock()
  113. defer t.Unlock()
  114. if t.isFinished {
  115. return common.ErrTransferClosed
  116. }
  117. t.isFinished = true
  118. return nil
  119. }