transfer.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. package common
  2. import (
  3. "errors"
  4. "path"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/drakkan/sftpgo/v2/dataprovider"
  9. "github.com/drakkan/sftpgo/v2/logger"
  10. "github.com/drakkan/sftpgo/v2/metric"
  11. "github.com/drakkan/sftpgo/v2/vfs"
  12. )
  13. var (
  14. // ErrTransferClosed defines the error returned for a closed transfer
  15. ErrTransferClosed = errors.New("transfer already closed")
  16. )
  17. // BaseTransfer contains protocols common transfer details for an upload or a download.
  18. type BaseTransfer struct { //nolint:maligned
  19. ID int64
  20. BytesSent int64
  21. BytesReceived int64
  22. Fs vfs.Fs
  23. File vfs.File
  24. Connection *BaseConnection
  25. cancelFn func()
  26. fsPath string
  27. effectiveFsPath string
  28. requestPath string
  29. ftpMode string
  30. start time.Time
  31. MaxWriteSize int64
  32. MinWriteOffset int64
  33. InitialSize int64
  34. truncatedSize int64
  35. isNewFile bool
  36. transferType int
  37. AbortTransfer int32
  38. aTime time.Time
  39. mTime time.Time
  40. transferQuota dataprovider.TransferQuota
  41. sync.Mutex
  42. errAbort error
  43. ErrTransfer error
  44. }
  45. // NewBaseTransfer returns a new BaseTransfer and adds it to the given connection
  46. func NewBaseTransfer(file vfs.File, conn *BaseConnection, cancelFn func(), fsPath, effectiveFsPath, requestPath string,
  47. transferType int, minWriteOffset, initialSize, maxWriteSize, truncatedSize int64, isNewFile bool, fs vfs.Fs,
  48. transferQuota dataprovider.TransferQuota,
  49. ) *BaseTransfer {
  50. t := &BaseTransfer{
  51. ID: conn.GetTransferID(),
  52. File: file,
  53. Connection: conn,
  54. cancelFn: cancelFn,
  55. fsPath: fsPath,
  56. effectiveFsPath: effectiveFsPath,
  57. start: time.Now(),
  58. transferType: transferType,
  59. MinWriteOffset: minWriteOffset,
  60. InitialSize: initialSize,
  61. isNewFile: isNewFile,
  62. requestPath: requestPath,
  63. BytesSent: 0,
  64. BytesReceived: 0,
  65. MaxWriteSize: maxWriteSize,
  66. AbortTransfer: 0,
  67. truncatedSize: truncatedSize,
  68. transferQuota: transferQuota,
  69. Fs: fs,
  70. }
  71. conn.AddTransfer(t)
  72. return t
  73. }
  74. // GetTransferQuota returns data transfer quota limits
  75. func (t *BaseTransfer) GetTransferQuota() dataprovider.TransferQuota {
  76. return t.transferQuota
  77. }
  78. // SetFtpMode sets the FTP mode for the current transfer
  79. func (t *BaseTransfer) SetFtpMode(mode string) {
  80. t.ftpMode = mode
  81. }
  82. // GetID returns the transfer ID
  83. func (t *BaseTransfer) GetID() int64 {
  84. return t.ID
  85. }
  86. // GetType returns the transfer type
  87. func (t *BaseTransfer) GetType() int {
  88. return t.transferType
  89. }
  90. // GetSize returns the transferred size
  91. func (t *BaseTransfer) GetSize() int64 {
  92. if t.transferType == TransferDownload {
  93. return atomic.LoadInt64(&t.BytesSent)
  94. }
  95. return atomic.LoadInt64(&t.BytesReceived)
  96. }
  97. // GetDownloadedSize returns the transferred size
  98. func (t *BaseTransfer) GetDownloadedSize() int64 {
  99. return atomic.LoadInt64(&t.BytesSent)
  100. }
  101. // GetUploadedSize returns the transferred size
  102. func (t *BaseTransfer) GetUploadedSize() int64 {
  103. return atomic.LoadInt64(&t.BytesReceived)
  104. }
  105. // GetStartTime returns the start time
  106. func (t *BaseTransfer) GetStartTime() time.Time {
  107. return t.start
  108. }
  109. // GetAbortError returns the error to send to the client if the transfer was aborted
  110. func (t *BaseTransfer) GetAbortError() error {
  111. t.Lock()
  112. defer t.Unlock()
  113. if t.errAbort != nil {
  114. return t.errAbort
  115. }
  116. return getQuotaExceededError(t.Connection.protocol)
  117. }
  118. // SignalClose signals that the transfer should be closed after the next read/write.
  119. // The optional error argument allow to send a specific error, otherwise a generic
  120. // transfer aborted error is sent
  121. func (t *BaseTransfer) SignalClose(err error) {
  122. t.Lock()
  123. t.errAbort = err
  124. t.Unlock()
  125. atomic.StoreInt32(&(t.AbortTransfer), 1)
  126. }
  127. // GetTruncatedSize returns the truncated sized if this is an upload overwriting
  128. // an existing file
  129. func (t *BaseTransfer) GetTruncatedSize() int64 {
  130. return t.truncatedSize
  131. }
  132. // HasSizeLimit returns true if there is an upload or download size limit
  133. func (t *BaseTransfer) HasSizeLimit() bool {
  134. if t.MaxWriteSize > 0 {
  135. return true
  136. }
  137. if t.transferQuota.HasSizeLimits() {
  138. return true
  139. }
  140. return false
  141. }
  142. // GetVirtualPath returns the transfer virtual path
  143. func (t *BaseTransfer) GetVirtualPath() string {
  144. return t.requestPath
  145. }
  146. // GetFsPath returns the transfer filesystem path
  147. func (t *BaseTransfer) GetFsPath() string {
  148. return t.fsPath
  149. }
  150. // SetTimes stores access and modification times if fsPath matches the current file
  151. func (t *BaseTransfer) SetTimes(fsPath string, atime time.Time, mtime time.Time) bool {
  152. if fsPath == t.GetFsPath() {
  153. t.aTime = atime
  154. t.mTime = mtime
  155. return true
  156. }
  157. return false
  158. }
  159. // GetRealFsPath returns the real transfer filesystem path.
  160. // If atomic uploads are enabled this differ from fsPath
  161. func (t *BaseTransfer) GetRealFsPath(fsPath string) string {
  162. if fsPath == t.GetFsPath() {
  163. if t.File != nil {
  164. return t.File.Name()
  165. }
  166. return t.fsPath
  167. }
  168. return ""
  169. }
  170. // SetCancelFn sets the cancel function for the transfer
  171. func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
  172. t.cancelFn = cancelFn
  173. }
  174. // CheckRead returns an error if read if not allowed
  175. func (t *BaseTransfer) CheckRead() error {
  176. if t.transferQuota.AllowedDLSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
  177. return nil
  178. }
  179. if t.transferQuota.AllowedTotalSize > 0 {
  180. if atomic.LoadInt64(&t.BytesSent)+atomic.LoadInt64(&t.BytesReceived) > t.transferQuota.AllowedTotalSize {
  181. return t.Connection.GetReadQuotaExceededError()
  182. }
  183. } else if t.transferQuota.AllowedDLSize > 0 {
  184. if atomic.LoadInt64(&t.BytesSent) > t.transferQuota.AllowedDLSize {
  185. return t.Connection.GetReadQuotaExceededError()
  186. }
  187. }
  188. return nil
  189. }
  190. // CheckWrite returns an error if write if not allowed
  191. func (t *BaseTransfer) CheckWrite() error {
  192. if t.MaxWriteSize > 0 && atomic.LoadInt64(&t.BytesReceived) > t.MaxWriteSize {
  193. return t.Connection.GetQuotaExceededError()
  194. }
  195. if t.transferQuota.AllowedULSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
  196. return nil
  197. }
  198. if t.transferQuota.AllowedTotalSize > 0 {
  199. if atomic.LoadInt64(&t.BytesSent)+atomic.LoadInt64(&t.BytesReceived) > t.transferQuota.AllowedTotalSize {
  200. return t.Connection.GetQuotaExceededError()
  201. }
  202. } else if t.transferQuota.AllowedULSize > 0 {
  203. if atomic.LoadInt64(&t.BytesReceived) > t.transferQuota.AllowedULSize {
  204. return t.Connection.GetQuotaExceededError()
  205. }
  206. }
  207. return nil
  208. }
  209. // Truncate changes the size of the opened file.
  210. // Supported for local fs only
  211. func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
  212. if fsPath == t.GetFsPath() {
  213. if t.File != nil {
  214. initialSize := t.InitialSize
  215. err := t.File.Truncate(size)
  216. if err == nil {
  217. t.Lock()
  218. t.InitialSize = size
  219. if t.MaxWriteSize > 0 {
  220. sizeDiff := initialSize - size
  221. t.MaxWriteSize += sizeDiff
  222. metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived),
  223. t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
  224. if t.transferQuota.HasSizeLimits() {
  225. go func(ulSize, dlSize int64, user dataprovider.User) {
  226. dataprovider.UpdateUserTransferQuota(&user, ulSize, dlSize, false) //nolint:errcheck
  227. }(atomic.LoadInt64(&t.BytesReceived), atomic.LoadInt64(&t.BytesSent), t.Connection.User)
  228. }
  229. atomic.StoreInt64(&t.BytesReceived, 0)
  230. }
  231. t.Unlock()
  232. }
  233. t.Connection.Log(logger.LevelDebug, "file %#v truncated to size %v max write size %v new initial size %v err: %v",
  234. fsPath, size, t.MaxWriteSize, t.InitialSize, err)
  235. return initialSize, err
  236. }
  237. if size == 0 && atomic.LoadInt64(&t.BytesSent) == 0 {
  238. // for cloud providers the file is always truncated to zero, we don't support append/resume for uploads
  239. // for buffered SFTP we can have buffered bytes so we returns an error
  240. if !vfs.IsBufferedSFTPFs(t.Fs) {
  241. return 0, nil
  242. }
  243. }
  244. return 0, vfs.ErrVfsUnsupported
  245. }
  246. return 0, errTransferMismatch
  247. }
  248. // TransferError is called if there is an unexpected error.
  249. // For example network or client issues
  250. func (t *BaseTransfer) TransferError(err error) {
  251. t.Lock()
  252. defer t.Unlock()
  253. if t.ErrTransfer != nil {
  254. return
  255. }
  256. t.ErrTransfer = err
  257. if t.cancelFn != nil {
  258. t.cancelFn()
  259. }
  260. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  261. t.Connection.Log(logger.LevelError, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
  262. "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, atomic.LoadInt64(&t.BytesSent),
  263. atomic.LoadInt64(&t.BytesReceived), elapsed)
  264. }
  265. func (t *BaseTransfer) getUploadFileSize() (int64, error) {
  266. var fileSize int64
  267. info, err := t.Fs.Stat(t.fsPath)
  268. if err == nil {
  269. fileSize = info.Size()
  270. }
  271. if vfs.IsCryptOsFs(t.Fs) && t.ErrTransfer != nil {
  272. errDelete := t.Fs.Remove(t.fsPath, false)
  273. if errDelete != nil {
  274. t.Connection.Log(logger.LevelWarn, "error removing partial crypto file %#v: %v", t.fsPath, errDelete)
  275. }
  276. }
  277. return fileSize, err
  278. }
  279. // return 1 if the file is outside the user home dir
  280. func (t *BaseTransfer) checkUploadOutsideHomeDir(err error) int {
  281. if err == nil {
  282. return 0
  283. }
  284. if Config.TempPath == "" {
  285. return 0
  286. }
  287. err = t.Fs.Remove(t.effectiveFsPath, false)
  288. t.Connection.Log(logger.LevelWarn, "upload in temp path cannot be renamed, delete temporary file: %#v, deletion error: %v",
  289. t.effectiveFsPath, err)
  290. // the file is outside the home dir so don't update the quota
  291. atomic.StoreInt64(&t.BytesReceived, 0)
  292. t.MinWriteOffset = 0
  293. return 1
  294. }
  295. // Close it is called when the transfer is completed.
  296. // It logs the transfer info, updates the user quota (for uploads)
  297. // and executes any defined action.
  298. // If there is an error no action will be executed and, in atomic mode,
  299. // we try to delete the temporary file
  300. func (t *BaseTransfer) Close() error {
  301. defer t.Connection.RemoveTransfer(t)
  302. var err error
  303. numFiles := 0
  304. if t.isNewFile {
  305. numFiles = 1
  306. }
  307. metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived),
  308. t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
  309. if t.transferQuota.HasSizeLimits() {
  310. dataprovider.UpdateUserTransferQuota(&t.Connection.User, atomic.LoadInt64(&t.BytesReceived), //nolint:errcheck
  311. atomic.LoadInt64(&t.BytesSent), false)
  312. }
  313. if t.File != nil && t.Connection.IsQuotaExceededError(t.ErrTransfer) {
  314. // if quota is exceeded we try to remove the partial file for uploads to local filesystem
  315. err = t.Fs.Remove(t.File.Name(), false)
  316. if err == nil {
  317. numFiles--
  318. atomic.StoreInt64(&t.BytesReceived, 0)
  319. t.MinWriteOffset = 0
  320. }
  321. t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %#v, deletion error: %v",
  322. t.File.Name(), err)
  323. } else if t.transferType == TransferUpload && t.effectiveFsPath != t.fsPath {
  324. if t.ErrTransfer == nil || Config.UploadMode == UploadModeAtomicWithResume {
  325. err = t.Fs.Rename(t.effectiveFsPath, t.fsPath)
  326. t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %#v -> %#v, error: %v",
  327. t.effectiveFsPath, t.fsPath, err)
  328. // the file must be removed if it is uploaded to a path outside the home dir and cannot be renamed
  329. numFiles -= t.checkUploadOutsideHomeDir(err)
  330. } else {
  331. err = t.Fs.Remove(t.effectiveFsPath, false)
  332. t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %#v, deletion error: %v",
  333. t.ErrTransfer, t.effectiveFsPath, err)
  334. if err == nil {
  335. numFiles--
  336. atomic.StoreInt64(&t.BytesReceived, 0)
  337. t.MinWriteOffset = 0
  338. }
  339. }
  340. }
  341. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  342. if t.transferType == TransferDownload {
  343. logger.TransferLog(downloadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesSent), t.Connection.User.Username,
  344. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
  345. ExecuteActionNotification(t.Connection, operationDownload, t.fsPath, t.requestPath, "", "", "",
  346. atomic.LoadInt64(&t.BytesSent), t.ErrTransfer)
  347. } else {
  348. fileSize := atomic.LoadInt64(&t.BytesReceived) + t.MinWriteOffset
  349. if statSize, errStat := t.getUploadFileSize(); errStat == nil {
  350. fileSize = statSize
  351. }
  352. t.Connection.Log(logger.LevelDebug, "uploaded file size %v", fileSize)
  353. t.updateQuota(numFiles, fileSize)
  354. t.updateTimes()
  355. logger.TransferLog(uploadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesReceived), t.Connection.User.Username,
  356. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
  357. ExecuteActionNotification(t.Connection, operationUpload, t.fsPath, t.requestPath, "", "", "", fileSize, t.ErrTransfer)
  358. }
  359. if t.ErrTransfer != nil {
  360. t.Connection.Log(logger.LevelError, "transfer error: %v, path: %#v", t.ErrTransfer, t.fsPath)
  361. if err == nil {
  362. err = t.ErrTransfer
  363. }
  364. }
  365. return err
  366. }
  367. func (t *BaseTransfer) updateTimes() {
  368. if !t.aTime.IsZero() && !t.mTime.IsZero() {
  369. err := t.Fs.Chtimes(t.fsPath, t.aTime, t.mTime, true)
  370. t.Connection.Log(logger.LevelDebug, "set times for file %#v, atime: %v, mtime: %v, err: %v",
  371. t.fsPath, t.aTime, t.mTime, err)
  372. }
  373. }
  374. func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
  375. // S3 uploads are atomic, if there is an error nothing is uploaded
  376. if t.File == nil && t.ErrTransfer != nil && !t.Connection.User.HasBufferedSFTP(t.GetVirtualPath()) {
  377. return false
  378. }
  379. sizeDiff := fileSize - t.InitialSize
  380. if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff > 0) {
  381. vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
  382. if err == nil {
  383. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
  384. sizeDiff, false)
  385. if vfolder.IsIncludedInUserQuota() {
  386. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  387. }
  388. } else {
  389. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  390. }
  391. return true
  392. }
  393. return false
  394. }
  395. // HandleThrottle manage bandwidth throttling
  396. func (t *BaseTransfer) HandleThrottle() {
  397. var wantedBandwidth int64
  398. var trasferredBytes int64
  399. if t.transferType == TransferDownload {
  400. wantedBandwidth = t.Connection.User.DownloadBandwidth
  401. trasferredBytes = atomic.LoadInt64(&t.BytesSent)
  402. } else {
  403. wantedBandwidth = t.Connection.User.UploadBandwidth
  404. trasferredBytes = atomic.LoadInt64(&t.BytesReceived)
  405. }
  406. if wantedBandwidth > 0 {
  407. // real and wanted elapsed as milliseconds, bytes as kilobytes
  408. realElapsed := time.Since(t.start).Nanoseconds() / 1000000
  409. // trasferredBytes / 1024 = KB/s, we multiply for 1000 to get milliseconds
  410. wantedElapsed := 1000 * (trasferredBytes / 1024) / wantedBandwidth
  411. if wantedElapsed > realElapsed {
  412. toSleep := time.Duration(wantedElapsed - realElapsed)
  413. time.Sleep(toSleep * time.Millisecond)
  414. }
  415. }
  416. }