transfer.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package common
  2. import (
  3. "errors"
  4. "path"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/drakkan/sftpgo/v2/dataprovider"
  9. "github.com/drakkan/sftpgo/v2/logger"
  10. "github.com/drakkan/sftpgo/v2/metrics"
  11. "github.com/drakkan/sftpgo/v2/vfs"
  12. )
  13. var (
  14. // ErrTransferClosed defines the error returned for a closed transfer
  15. ErrTransferClosed = errors.New("transfer already closed")
  16. )
  17. // BaseTransfer contains protocols common transfer details for an upload or a download.
  18. type BaseTransfer struct { //nolint:maligned
  19. ID uint64
  20. BytesSent int64
  21. BytesReceived int64
  22. Fs vfs.Fs
  23. File vfs.File
  24. Connection *BaseConnection
  25. cancelFn func()
  26. fsPath string
  27. effectiveFsPath string
  28. requestPath string
  29. start time.Time
  30. MaxWriteSize int64
  31. MinWriteOffset int64
  32. InitialSize int64
  33. isNewFile bool
  34. transferType int
  35. AbortTransfer int32
  36. sync.Mutex
  37. ErrTransfer error
  38. }
  39. // NewBaseTransfer returns a new BaseTransfer and adds it to the given connection
  40. func NewBaseTransfer(file vfs.File, conn *BaseConnection, cancelFn func(), fsPath, effectiveFsPath, requestPath string,
  41. transferType int, minWriteOffset, initialSize, maxWriteSize int64, isNewFile bool, fs vfs.Fs) *BaseTransfer {
  42. t := &BaseTransfer{
  43. ID: conn.GetTransferID(),
  44. File: file,
  45. Connection: conn,
  46. cancelFn: cancelFn,
  47. fsPath: fsPath,
  48. effectiveFsPath: effectiveFsPath,
  49. start: time.Now(),
  50. transferType: transferType,
  51. MinWriteOffset: minWriteOffset,
  52. InitialSize: initialSize,
  53. isNewFile: isNewFile,
  54. requestPath: requestPath,
  55. BytesSent: 0,
  56. BytesReceived: 0,
  57. MaxWriteSize: maxWriteSize,
  58. AbortTransfer: 0,
  59. Fs: fs,
  60. }
  61. conn.AddTransfer(t)
  62. return t
  63. }
  64. // GetID returns the transfer ID
  65. func (t *BaseTransfer) GetID() uint64 {
  66. return t.ID
  67. }
  68. // GetType returns the transfer type
  69. func (t *BaseTransfer) GetType() int {
  70. return t.transferType
  71. }
  72. // GetSize returns the transferred size
  73. func (t *BaseTransfer) GetSize() int64 {
  74. if t.transferType == TransferDownload {
  75. return atomic.LoadInt64(&t.BytesSent)
  76. }
  77. return atomic.LoadInt64(&t.BytesReceived)
  78. }
  79. // GetStartTime returns the start time
  80. func (t *BaseTransfer) GetStartTime() time.Time {
  81. return t.start
  82. }
  83. // SignalClose signals that the transfer should be closed.
  84. // For same protocols, for example WebDAV, we have no
  85. // access to the network connection, so we use this method
  86. // to make the next read or write to fail
  87. func (t *BaseTransfer) SignalClose() {
  88. atomic.StoreInt32(&(t.AbortTransfer), 1)
  89. }
  90. // GetVirtualPath returns the transfer virtual path
  91. func (t *BaseTransfer) GetVirtualPath() string {
  92. return t.requestPath
  93. }
  94. // GetFsPath returns the transfer filesystem path
  95. func (t *BaseTransfer) GetFsPath() string {
  96. return t.fsPath
  97. }
  98. // GetRealFsPath returns the real transfer filesystem path.
  99. // If atomic uploads are enabled this differ from fsPath
  100. func (t *BaseTransfer) GetRealFsPath(fsPath string) string {
  101. if fsPath == t.GetFsPath() {
  102. if t.File != nil {
  103. return t.File.Name()
  104. }
  105. return t.fsPath
  106. }
  107. return ""
  108. }
  109. // SetCancelFn sets the cancel function for the transfer
  110. func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
  111. t.cancelFn = cancelFn
  112. }
  113. // Truncate changes the size of the opened file.
  114. // Supported for local fs only
  115. func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
  116. if fsPath == t.GetFsPath() {
  117. if t.File != nil {
  118. initialSize := t.InitialSize
  119. err := t.File.Truncate(size)
  120. if err == nil {
  121. t.Lock()
  122. t.InitialSize = size
  123. if t.MaxWriteSize > 0 {
  124. sizeDiff := initialSize - size
  125. t.MaxWriteSize += sizeDiff
  126. metrics.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
  127. atomic.StoreInt64(&t.BytesReceived, 0)
  128. }
  129. t.Unlock()
  130. }
  131. t.Connection.Log(logger.LevelDebug, "file %#v truncated to size %v max write size %v new initial size %v err: %v",
  132. fsPath, size, t.MaxWriteSize, t.InitialSize, err)
  133. return initialSize, err
  134. }
  135. if size == 0 && atomic.LoadInt64(&t.BytesSent) == 0 {
  136. // for cloud providers the file is always truncated to zero, we don't support append/resume for uploads
  137. // for buffered SFTP we can have buffered bytes so we returns an error
  138. if !vfs.IsBufferedSFTPFs(t.Fs) {
  139. return 0, nil
  140. }
  141. }
  142. return 0, vfs.ErrVfsUnsupported
  143. }
  144. return 0, errTransferMismatch
  145. }
  146. // TransferError is called if there is an unexpected error.
  147. // For example network or client issues
  148. func (t *BaseTransfer) TransferError(err error) {
  149. t.Lock()
  150. defer t.Unlock()
  151. if t.ErrTransfer != nil {
  152. return
  153. }
  154. t.ErrTransfer = err
  155. if t.cancelFn != nil {
  156. t.cancelFn()
  157. }
  158. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  159. t.Connection.Log(logger.LevelWarn, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
  160. "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, atomic.LoadInt64(&t.BytesSent),
  161. atomic.LoadInt64(&t.BytesReceived), elapsed)
  162. }
  163. func (t *BaseTransfer) getUploadFileSize() (int64, error) {
  164. var fileSize int64
  165. info, err := t.Fs.Stat(t.fsPath)
  166. if err == nil {
  167. fileSize = info.Size()
  168. }
  169. if vfs.IsCryptOsFs(t.Fs) && t.ErrTransfer != nil {
  170. errDelete := t.Fs.Remove(t.fsPath, false)
  171. if errDelete != nil {
  172. t.Connection.Log(logger.LevelWarn, "error removing partial crypto file %#v: %v", t.fsPath, errDelete)
  173. }
  174. }
  175. return fileSize, err
  176. }
  177. // Close it is called when the transfer is completed.
  178. // It logs the transfer info, updates the user quota (for uploads)
  179. // and executes any defined action.
  180. // If there is an error no action will be executed and, in atomic mode,
  181. // we try to delete the temporary file
  182. func (t *BaseTransfer) Close() error {
  183. defer t.Connection.RemoveTransfer(t)
  184. var err error
  185. numFiles := 0
  186. if t.isNewFile {
  187. numFiles = 1
  188. }
  189. metrics.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
  190. if t.ErrTransfer == ErrQuotaExceeded && t.File != nil {
  191. // if quota is exceeded we try to remove the partial file for uploads to local filesystem
  192. err = t.Fs.Remove(t.File.Name(), false)
  193. if err == nil {
  194. numFiles--
  195. atomic.StoreInt64(&t.BytesReceived, 0)
  196. t.MinWriteOffset = 0
  197. }
  198. t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %#v, deletion error: %v",
  199. t.File.Name(), err)
  200. } else if t.transferType == TransferUpload && t.effectiveFsPath != t.fsPath {
  201. if t.ErrTransfer == nil || Config.UploadMode == UploadModeAtomicWithResume {
  202. err = t.Fs.Rename(t.effectiveFsPath, t.fsPath)
  203. t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %#v -> %#v, error: %v",
  204. t.effectiveFsPath, t.fsPath, err)
  205. } else {
  206. err = t.Fs.Remove(t.effectiveFsPath, false)
  207. t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %#v, "+
  208. "deletion error: %v", t.ErrTransfer, t.effectiveFsPath, err)
  209. if err == nil {
  210. numFiles--
  211. atomic.StoreInt64(&t.BytesReceived, 0)
  212. t.MinWriteOffset = 0
  213. }
  214. }
  215. }
  216. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  217. if t.transferType == TransferDownload {
  218. logger.TransferLog(downloadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesSent), t.Connection.User.Username,
  219. t.Connection.ID, t.Connection.protocol, t.Connection.remoteAddr)
  220. ExecuteActionNotification(&t.Connection.User, operationDownload, t.fsPath, t.requestPath, "", "", t.Connection.protocol,
  221. atomic.LoadInt64(&t.BytesSent), t.ErrTransfer)
  222. } else {
  223. fileSize := atomic.LoadInt64(&t.BytesReceived) + t.MinWriteOffset
  224. if statSize, err := t.getUploadFileSize(); err == nil {
  225. fileSize = statSize
  226. }
  227. t.Connection.Log(logger.LevelDebug, "uploaded file size %v", fileSize)
  228. t.updateQuota(numFiles, fileSize)
  229. logger.TransferLog(uploadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesReceived), t.Connection.User.Username,
  230. t.Connection.ID, t.Connection.protocol, t.Connection.remoteAddr)
  231. ExecuteActionNotification(&t.Connection.User, operationUpload, t.fsPath, t.requestPath, "", "", t.Connection.protocol, fileSize,
  232. t.ErrTransfer)
  233. }
  234. if t.ErrTransfer != nil {
  235. t.Connection.Log(logger.LevelWarn, "transfer error: %v, path: %#v", t.ErrTransfer, t.fsPath)
  236. if err == nil {
  237. err = t.ErrTransfer
  238. }
  239. }
  240. return err
  241. }
  242. func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
  243. // S3 uploads are atomic, if there is an error nothing is uploaded
  244. if t.File == nil && t.ErrTransfer != nil {
  245. return false
  246. }
  247. sizeDiff := fileSize - t.InitialSize
  248. if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff > 0) {
  249. vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
  250. if err == nil {
  251. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
  252. sizeDiff, false)
  253. if vfolder.IsIncludedInUserQuota() {
  254. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  255. }
  256. } else {
  257. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  258. }
  259. return true
  260. }
  261. return false
  262. }
  263. // HandleThrottle manage bandwidth throttling
  264. func (t *BaseTransfer) HandleThrottle() {
  265. var wantedBandwidth int64
  266. var trasferredBytes int64
  267. if t.transferType == TransferDownload {
  268. wantedBandwidth = t.Connection.User.DownloadBandwidth
  269. trasferredBytes = atomic.LoadInt64(&t.BytesSent)
  270. } else {
  271. wantedBandwidth = t.Connection.User.UploadBandwidth
  272. trasferredBytes = atomic.LoadInt64(&t.BytesReceived)
  273. }
  274. if wantedBandwidth > 0 {
  275. // real and wanted elapsed as milliseconds, bytes as kilobytes
  276. realElapsed := time.Since(t.start).Nanoseconds() / 1000000
  277. // trasferredBytes / 1024 = KB/s, we multiply for 1000 to get milliseconds
  278. wantedElapsed := 1000 * (trasferredBytes / 1024) / wantedBandwidth
  279. if wantedElapsed > realElapsed {
  280. toSleep := time.Duration(wantedElapsed - realElapsed)
  281. time.Sleep(toSleep * time.Millisecond)
  282. }
  283. }
  284. }