transfer.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530
  1. // Copyright (C) 2019-2023 Nicola Murino
  2. //
  3. // This program is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU Affero General Public License as published
  5. // by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU Affero General Public License for more details.
  11. //
  12. // You should have received a copy of the GNU Affero General Public License
  13. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. package common
  15. import (
  16. "errors"
  17. "path"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. "github.com/drakkan/sftpgo/v2/internal/dataprovider"
  22. "github.com/drakkan/sftpgo/v2/internal/logger"
  23. "github.com/drakkan/sftpgo/v2/internal/metric"
  24. "github.com/drakkan/sftpgo/v2/internal/vfs"
  25. )
  26. var (
  27. // ErrTransferClosed defines the error returned for a closed transfer
  28. ErrTransferClosed = errors.New("transfer already closed")
  29. )
  30. // BaseTransfer contains protocols common transfer details for an upload or a download.
  31. type BaseTransfer struct { //nolint:maligned
  32. ID int64
  33. BytesSent atomic.Int64
  34. BytesReceived atomic.Int64
  35. Fs vfs.Fs
  36. File vfs.File
  37. Connection *BaseConnection
  38. cancelFn func()
  39. fsPath string
  40. effectiveFsPath string
  41. requestPath string
  42. ftpMode string
  43. start time.Time
  44. MaxWriteSize int64
  45. MinWriteOffset int64
  46. InitialSize int64
  47. truncatedSize int64
  48. isNewFile bool
  49. transferType int
  50. AbortTransfer atomic.Bool
  51. aTime time.Time
  52. mTime time.Time
  53. transferQuota dataprovider.TransferQuota
  54. sync.Mutex
  55. errAbort error
  56. ErrTransfer error
  57. }
  58. // NewBaseTransfer returns a new BaseTransfer and adds it to the given connection
  59. func NewBaseTransfer(file vfs.File, conn *BaseConnection, cancelFn func(), fsPath, effectiveFsPath, requestPath string,
  60. transferType int, minWriteOffset, initialSize, maxWriteSize, truncatedSize int64, isNewFile bool, fs vfs.Fs,
  61. transferQuota dataprovider.TransferQuota,
  62. ) *BaseTransfer {
  63. t := &BaseTransfer{
  64. ID: conn.GetTransferID(),
  65. File: file,
  66. Connection: conn,
  67. cancelFn: cancelFn,
  68. fsPath: fsPath,
  69. effectiveFsPath: effectiveFsPath,
  70. start: time.Now(),
  71. transferType: transferType,
  72. MinWriteOffset: minWriteOffset,
  73. InitialSize: initialSize,
  74. isNewFile: isNewFile,
  75. requestPath: requestPath,
  76. MaxWriteSize: maxWriteSize,
  77. truncatedSize: truncatedSize,
  78. transferQuota: transferQuota,
  79. Fs: fs,
  80. }
  81. t.AbortTransfer.Store(false)
  82. t.BytesSent.Store(0)
  83. t.BytesReceived.Store(0)
  84. conn.AddTransfer(t)
  85. return t
  86. }
  87. // GetTransferQuota returns data transfer quota limits
  88. func (t *BaseTransfer) GetTransferQuota() dataprovider.TransferQuota {
  89. return t.transferQuota
  90. }
  91. // SetFtpMode sets the FTP mode for the current transfer
  92. func (t *BaseTransfer) SetFtpMode(mode string) {
  93. t.ftpMode = mode
  94. }
  95. // GetID returns the transfer ID
  96. func (t *BaseTransfer) GetID() int64 {
  97. return t.ID
  98. }
  99. // GetType returns the transfer type
  100. func (t *BaseTransfer) GetType() int {
  101. return t.transferType
  102. }
  103. // GetSize returns the transferred size
  104. func (t *BaseTransfer) GetSize() int64 {
  105. if t.transferType == TransferDownload {
  106. return t.BytesSent.Load()
  107. }
  108. return t.BytesReceived.Load()
  109. }
  110. // GetDownloadedSize returns the transferred size
  111. func (t *BaseTransfer) GetDownloadedSize() int64 {
  112. return t.BytesSent.Load()
  113. }
  114. // GetUploadedSize returns the transferred size
  115. func (t *BaseTransfer) GetUploadedSize() int64 {
  116. return t.BytesReceived.Load()
  117. }
  118. // GetStartTime returns the start time
  119. func (t *BaseTransfer) GetStartTime() time.Time {
  120. return t.start
  121. }
  122. // GetAbortError returns the error to send to the client if the transfer was aborted
  123. func (t *BaseTransfer) GetAbortError() error {
  124. t.Lock()
  125. defer t.Unlock()
  126. if t.errAbort != nil {
  127. return t.errAbort
  128. }
  129. return getQuotaExceededError(t.Connection.protocol)
  130. }
  131. // SignalClose signals that the transfer should be closed after the next read/write.
  132. // The optional error argument allow to send a specific error, otherwise a generic
  133. // transfer aborted error is sent
  134. func (t *BaseTransfer) SignalClose(err error) {
  135. t.Lock()
  136. t.errAbort = err
  137. t.Unlock()
  138. t.AbortTransfer.Store(true)
  139. }
  140. // GetTruncatedSize returns the truncated sized if this is an upload overwriting
  141. // an existing file
  142. func (t *BaseTransfer) GetTruncatedSize() int64 {
  143. return t.truncatedSize
  144. }
  145. // HasSizeLimit returns true if there is an upload or download size limit
  146. func (t *BaseTransfer) HasSizeLimit() bool {
  147. if t.MaxWriteSize > 0 {
  148. return true
  149. }
  150. if t.transferQuota.HasSizeLimits() {
  151. return true
  152. }
  153. return false
  154. }
  155. // GetVirtualPath returns the transfer virtual path
  156. func (t *BaseTransfer) GetVirtualPath() string {
  157. return t.requestPath
  158. }
  159. // GetFsPath returns the transfer filesystem path
  160. func (t *BaseTransfer) GetFsPath() string {
  161. return t.fsPath
  162. }
  163. // SetTimes stores access and modification times if fsPath matches the current file
  164. func (t *BaseTransfer) SetTimes(fsPath string, atime time.Time, mtime time.Time) bool {
  165. if fsPath == t.GetFsPath() {
  166. t.aTime = atime
  167. t.mTime = mtime
  168. return true
  169. }
  170. return false
  171. }
  172. // GetRealFsPath returns the real transfer filesystem path.
  173. // If atomic uploads are enabled this differ from fsPath
  174. func (t *BaseTransfer) GetRealFsPath(fsPath string) string {
  175. if fsPath == t.GetFsPath() {
  176. if t.File != nil {
  177. return t.File.Name()
  178. }
  179. return t.fsPath
  180. }
  181. return ""
  182. }
  183. // SetCancelFn sets the cancel function for the transfer
  184. func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
  185. t.cancelFn = cancelFn
  186. }
  187. // CheckRead returns an error if read if not allowed
  188. func (t *BaseTransfer) CheckRead() error {
  189. if t.transferQuota.AllowedDLSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
  190. return nil
  191. }
  192. if t.transferQuota.AllowedTotalSize > 0 {
  193. if t.BytesSent.Load()+t.BytesReceived.Load() > t.transferQuota.AllowedTotalSize {
  194. return t.Connection.GetReadQuotaExceededError()
  195. }
  196. } else if t.transferQuota.AllowedDLSize > 0 {
  197. if t.BytesSent.Load() > t.transferQuota.AllowedDLSize {
  198. return t.Connection.GetReadQuotaExceededError()
  199. }
  200. }
  201. return nil
  202. }
  203. // CheckWrite returns an error if write if not allowed
  204. func (t *BaseTransfer) CheckWrite() error {
  205. if t.MaxWriteSize > 0 && t.BytesReceived.Load() > t.MaxWriteSize {
  206. return t.Connection.GetQuotaExceededError()
  207. }
  208. if t.transferQuota.AllowedULSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
  209. return nil
  210. }
  211. if t.transferQuota.AllowedTotalSize > 0 {
  212. if t.BytesSent.Load()+t.BytesReceived.Load() > t.transferQuota.AllowedTotalSize {
  213. return t.Connection.GetQuotaExceededError()
  214. }
  215. } else if t.transferQuota.AllowedULSize > 0 {
  216. if t.BytesReceived.Load() > t.transferQuota.AllowedULSize {
  217. return t.Connection.GetQuotaExceededError()
  218. }
  219. }
  220. return nil
  221. }
  222. // Truncate changes the size of the opened file.
  223. // Supported for local fs only
  224. func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
  225. if fsPath == t.GetFsPath() {
  226. if t.File != nil {
  227. initialSize := t.InitialSize
  228. err := t.File.Truncate(size)
  229. if err == nil {
  230. t.Lock()
  231. t.InitialSize = size
  232. if t.MaxWriteSize > 0 {
  233. sizeDiff := initialSize - size
  234. t.MaxWriteSize += sizeDiff
  235. metric.TransferCompleted(t.BytesSent.Load(), t.BytesReceived.Load(),
  236. t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
  237. if t.transferQuota.HasSizeLimits() {
  238. go func(ulSize, dlSize int64, user dataprovider.User) {
  239. dataprovider.UpdateUserTransferQuota(&user, ulSize, dlSize, false) //nolint:errcheck
  240. }(t.BytesReceived.Load(), t.BytesSent.Load(), t.Connection.User)
  241. }
  242. t.BytesReceived.Store(0)
  243. }
  244. t.Unlock()
  245. }
  246. t.Connection.Log(logger.LevelDebug, "file %#v truncated to size %v max write size %v new initial size %v err: %v",
  247. fsPath, size, t.MaxWriteSize, t.InitialSize, err)
  248. return initialSize, err
  249. }
  250. if size == 0 && t.BytesSent.Load() == 0 {
  251. // for cloud providers the file is always truncated to zero, we don't support append/resume for uploads
  252. // for buffered SFTP we can have buffered bytes so we returns an error
  253. if !vfs.IsBufferedSFTPFs(t.Fs) {
  254. return 0, nil
  255. }
  256. }
  257. return 0, vfs.ErrVfsUnsupported
  258. }
  259. return 0, errTransferMismatch
  260. }
  261. // TransferError is called if there is an unexpected error.
  262. // For example network or client issues
  263. func (t *BaseTransfer) TransferError(err error) {
  264. t.Lock()
  265. defer t.Unlock()
  266. if t.ErrTransfer != nil {
  267. return
  268. }
  269. t.ErrTransfer = err
  270. if t.cancelFn != nil {
  271. t.cancelFn()
  272. }
  273. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  274. t.Connection.Log(logger.LevelError, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
  275. "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, t.BytesSent.Load(),
  276. t.BytesReceived.Load(), elapsed)
  277. }
  278. func (t *BaseTransfer) getUploadFileSize() (int64, int, error) {
  279. var fileSize int64
  280. var deletedFiles int
  281. info, err := t.Fs.Stat(t.fsPath)
  282. if err == nil {
  283. fileSize = info.Size()
  284. }
  285. if t.ErrTransfer != nil && vfs.IsCryptOsFs(t.Fs) {
  286. errDelete := t.Fs.Remove(t.fsPath, false)
  287. if errDelete != nil {
  288. t.Connection.Log(logger.LevelWarn, "error removing partial crypto file %#v: %v", t.fsPath, errDelete)
  289. } else {
  290. fileSize = 0
  291. deletedFiles = 1
  292. t.BytesReceived.Store(0)
  293. t.MinWriteOffset = 0
  294. }
  295. }
  296. return fileSize, deletedFiles, err
  297. }
  298. // return 1 if the file is outside the user home dir
  299. func (t *BaseTransfer) checkUploadOutsideHomeDir(err error) int {
  300. if err == nil {
  301. return 0
  302. }
  303. if Config.TempPath == "" {
  304. return 0
  305. }
  306. err = t.Fs.Remove(t.effectiveFsPath, false)
  307. t.Connection.Log(logger.LevelWarn, "upload in temp path cannot be renamed, delete temporary file: %#v, deletion error: %v",
  308. t.effectiveFsPath, err)
  309. // the file is outside the home dir so don't update the quota
  310. t.BytesReceived.Store(0)
  311. t.MinWriteOffset = 0
  312. return 1
  313. }
  314. // Close it is called when the transfer is completed.
  315. // It logs the transfer info, updates the user quota (for uploads)
  316. // and executes any defined action.
  317. // If there is an error no action will be executed and, in atomic mode,
  318. // we try to delete the temporary file
  319. func (t *BaseTransfer) Close() error {
  320. defer t.Connection.RemoveTransfer(t)
  321. var err error
  322. numFiles := t.getUploadedFiles()
  323. metric.TransferCompleted(t.BytesSent.Load(), t.BytesReceived.Load(),
  324. t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
  325. if t.transferQuota.HasSizeLimits() {
  326. dataprovider.UpdateUserTransferQuota(&t.Connection.User, t.BytesReceived.Load(), //nolint:errcheck
  327. t.BytesSent.Load(), false)
  328. }
  329. if t.File != nil && t.Connection.IsQuotaExceededError(t.ErrTransfer) {
  330. // if quota is exceeded we try to remove the partial file for uploads to local filesystem
  331. err = t.Fs.Remove(t.File.Name(), false)
  332. if err == nil {
  333. t.BytesReceived.Store(0)
  334. t.MinWriteOffset = 0
  335. }
  336. t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %#v, deletion error: %v",
  337. t.File.Name(), err)
  338. } else if t.transferType == TransferUpload && t.effectiveFsPath != t.fsPath {
  339. if t.ErrTransfer == nil || Config.UploadMode == UploadModeAtomicWithResume {
  340. _, _, err = t.Fs.Rename(t.effectiveFsPath, t.fsPath)
  341. t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %#v -> %#v, error: %v",
  342. t.effectiveFsPath, t.fsPath, err)
  343. // the file must be removed if it is uploaded to a path outside the home dir and cannot be renamed
  344. t.checkUploadOutsideHomeDir(err)
  345. } else {
  346. err = t.Fs.Remove(t.effectiveFsPath, false)
  347. t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %#v, deletion error: %v",
  348. t.ErrTransfer, t.effectiveFsPath, err)
  349. if err == nil {
  350. t.BytesReceived.Store(0)
  351. t.MinWriteOffset = 0
  352. }
  353. }
  354. }
  355. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  356. var uploadFileSize int64
  357. if t.transferType == TransferDownload {
  358. logger.TransferLog(downloadLogSender, t.fsPath, elapsed, t.BytesSent.Load(), t.Connection.User.Username,
  359. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
  360. ExecuteActionNotification(t.Connection, operationDownload, t.fsPath, t.requestPath, "", "", "", //nolint:errcheck
  361. t.BytesSent.Load(), t.ErrTransfer, elapsed)
  362. } else {
  363. statSize, deletedFiles, errStat := t.getUploadFileSize()
  364. if errStat == nil {
  365. uploadFileSize = statSize
  366. } else {
  367. uploadFileSize = t.BytesReceived.Load() + t.MinWriteOffset
  368. if t.Fs.IsNotExist(errStat) {
  369. uploadFileSize = 0
  370. numFiles--
  371. }
  372. }
  373. numFiles -= deletedFiles
  374. t.Connection.Log(logger.LevelDebug, "upload file size %d, num files %d, deleted files %d, fs path %q",
  375. uploadFileSize, numFiles, deletedFiles, t.fsPath)
  376. numFiles, uploadFileSize = t.executeUploadHook(numFiles, uploadFileSize, elapsed)
  377. t.updateQuota(numFiles, uploadFileSize)
  378. t.updateTimes()
  379. logger.TransferLog(uploadLogSender, t.fsPath, elapsed, t.BytesReceived.Load(), t.Connection.User.Username,
  380. t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
  381. }
  382. if t.ErrTransfer != nil {
  383. t.Connection.Log(logger.LevelError, "transfer error: %v, path: %#v", t.ErrTransfer, t.fsPath)
  384. if err == nil {
  385. err = t.ErrTransfer
  386. }
  387. }
  388. t.updateTransferTimestamps(uploadFileSize, elapsed)
  389. return err
  390. }
  391. func (t *BaseTransfer) updateTransferTimestamps(uploadFileSize, elapsed int64) {
  392. if t.ErrTransfer != nil {
  393. return
  394. }
  395. if t.transferType == TransferUpload {
  396. if t.Connection.User.FirstUpload == 0 && !t.Connection.uploadDone.Load() {
  397. if err := dataprovider.UpdateUserTransferTimestamps(t.Connection.User.Username, true); err == nil {
  398. t.Connection.uploadDone.Store(true)
  399. ExecuteActionNotification(t.Connection, operationFirstUpload, t.fsPath, t.requestPath, "", //nolint:errcheck
  400. "", "", uploadFileSize, t.ErrTransfer, elapsed)
  401. }
  402. }
  403. return
  404. }
  405. if t.Connection.User.FirstDownload == 0 && !t.Connection.downloadDone.Load() && t.BytesSent.Load() > 0 {
  406. if err := dataprovider.UpdateUserTransferTimestamps(t.Connection.User.Username, false); err == nil {
  407. t.Connection.downloadDone.Store(true)
  408. ExecuteActionNotification(t.Connection, operationFirstDownload, t.fsPath, t.requestPath, "", //nolint:errcheck
  409. "", "", t.BytesSent.Load(), t.ErrTransfer, elapsed)
  410. }
  411. }
  412. }
  413. func (t *BaseTransfer) executeUploadHook(numFiles int, fileSize, elapsed int64) (int, int64) {
  414. err := ExecuteActionNotification(t.Connection, operationUpload, t.fsPath, t.requestPath, "", "", "",
  415. fileSize, t.ErrTransfer, elapsed)
  416. if err != nil {
  417. if t.ErrTransfer == nil {
  418. t.ErrTransfer = err
  419. }
  420. // try to remove the uploaded file
  421. err = t.Fs.Remove(t.fsPath, false)
  422. if err == nil {
  423. numFiles--
  424. fileSize = 0
  425. t.BytesReceived.Store(0)
  426. t.MinWriteOffset = 0
  427. } else {
  428. t.Connection.Log(logger.LevelWarn, "unable to remove path %q after upload hook failure: %v", t.fsPath, err)
  429. }
  430. }
  431. return numFiles, fileSize
  432. }
  433. func (t *BaseTransfer) getUploadedFiles() int {
  434. numFiles := 0
  435. if t.isNewFile {
  436. numFiles = 1
  437. }
  438. return numFiles
  439. }
  440. func (t *BaseTransfer) updateTimes() {
  441. if !t.aTime.IsZero() && !t.mTime.IsZero() {
  442. err := t.Fs.Chtimes(t.fsPath, t.aTime, t.mTime, true)
  443. t.Connection.Log(logger.LevelDebug, "set times for file %#v, atime: %v, mtime: %v, err: %v",
  444. t.fsPath, t.aTime, t.mTime, err)
  445. }
  446. }
  447. func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
  448. // Uploads on some filesystem (S3 and similar) are atomic, if there is an error nothing is uploaded
  449. if t.File == nil && t.ErrTransfer != nil && vfs.HasImplicitAtomicUploads(t.Fs) {
  450. return false
  451. }
  452. sizeDiff := fileSize - t.InitialSize
  453. if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff != 0) {
  454. vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
  455. if err == nil {
  456. dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
  457. sizeDiff, false)
  458. if vfolder.IsIncludedInUserQuota() {
  459. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  460. }
  461. } else {
  462. dataprovider.UpdateUserQuota(&t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  463. }
  464. return true
  465. }
  466. return false
  467. }
  468. // HandleThrottle manage bandwidth throttling
  469. func (t *BaseTransfer) HandleThrottle() {
  470. var wantedBandwidth int64
  471. var trasferredBytes int64
  472. if t.transferType == TransferDownload {
  473. wantedBandwidth = t.Connection.User.DownloadBandwidth
  474. trasferredBytes = t.BytesSent.Load()
  475. } else {
  476. wantedBandwidth = t.Connection.User.UploadBandwidth
  477. trasferredBytes = t.BytesReceived.Load()
  478. }
  479. if wantedBandwidth > 0 {
  480. // real and wanted elapsed as milliseconds, bytes as kilobytes
  481. realElapsed := time.Since(t.start).Nanoseconds() / 1000000
  482. // trasferredBytes / 1024 = KB/s, we multiply for 1000 to get milliseconds
  483. wantedElapsed := 1000 * (trasferredBytes / 1024) / wantedBandwidth
  484. if wantedElapsed > realElapsed {
  485. toSleep := time.Duration(wantedElapsed - realElapsed)
  486. time.Sleep(toSleep * time.Millisecond)
  487. }
  488. }
  489. }