transfer.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package common
  2. import (
  3. "errors"
  4. "os"
  5. "path"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/drakkan/sftpgo/dataprovider"
  10. "github.com/drakkan/sftpgo/logger"
  11. "github.com/drakkan/sftpgo/metrics"
  12. "github.com/drakkan/sftpgo/vfs"
  13. )
  14. var (
  15. // ErrTransferClosed defines the error returned for a closed transfer
  16. ErrTransferClosed = errors.New("transfer already closed")
  17. )
  18. // BaseTransfer contains protocols common transfer details for an upload or a download.
  19. type BaseTransfer struct { //nolint:maligned
  20. ID uint64
  21. Fs vfs.Fs
  22. File vfs.File
  23. Connection *BaseConnection
  24. cancelFn func()
  25. fsPath string
  26. start time.Time
  27. transferType int
  28. MinWriteOffset int64
  29. InitialSize int64
  30. isNewFile bool
  31. requestPath string
  32. BytesSent int64
  33. BytesReceived int64
  34. MaxWriteSize int64
  35. AbortTransfer int32
  36. sync.Mutex
  37. ErrTransfer error
  38. }
  39. // NewBaseTransfer returns a new BaseTransfer and adds it to the given connection
  40. func NewBaseTransfer(file vfs.File, conn *BaseConnection, cancelFn func(), fsPath, requestPath string, transferType int,
  41. minWriteOffset, initialSize, maxWriteSize int64, isNewFile bool, fs vfs.Fs) *BaseTransfer {
  42. t := &BaseTransfer{
  43. ID: conn.GetTransferID(),
  44. File: file,
  45. Connection: conn,
  46. cancelFn: cancelFn,
  47. fsPath: fsPath,
  48. start: time.Now(),
  49. transferType: transferType,
  50. MinWriteOffset: minWriteOffset,
  51. InitialSize: initialSize,
  52. isNewFile: isNewFile,
  53. requestPath: requestPath,
  54. BytesSent: 0,
  55. BytesReceived: 0,
  56. MaxWriteSize: maxWriteSize,
  57. AbortTransfer: 0,
  58. Fs: fs,
  59. }
  60. conn.AddTransfer(t)
  61. return t
  62. }
  63. // GetID returns the transfer ID
  64. func (t *BaseTransfer) GetID() uint64 {
  65. return t.ID
  66. }
  67. // GetType returns the transfer type
  68. func (t *BaseTransfer) GetType() int {
  69. return t.transferType
  70. }
  71. // GetSize returns the transferred size
  72. func (t *BaseTransfer) GetSize() int64 {
  73. if t.transferType == TransferDownload {
  74. return atomic.LoadInt64(&t.BytesSent)
  75. }
  76. return atomic.LoadInt64(&t.BytesReceived)
  77. }
  78. // GetStartTime returns the start time
  79. func (t *BaseTransfer) GetStartTime() time.Time {
  80. return t.start
  81. }
  82. // SignalClose signals that the transfer should be closed.
  83. // For same protocols, for example WebDAV, we have no
  84. // access to the network connection, so we use this method
  85. // to make the next read or write to fail
  86. func (t *BaseTransfer) SignalClose() {
  87. atomic.StoreInt32(&(t.AbortTransfer), 1)
  88. }
  89. // GetVirtualPath returns the transfer virtual path
  90. func (t *BaseTransfer) GetVirtualPath() string {
  91. return t.requestPath
  92. }
  93. // GetFsPath returns the transfer filesystem path
  94. func (t *BaseTransfer) GetFsPath() string {
  95. return t.fsPath
  96. }
  97. // GetRealFsPath returns the real transfer filesystem path.
  98. // If atomic uploads are enabled this differ from fsPath
  99. func (t *BaseTransfer) GetRealFsPath(fsPath string) string {
  100. if fsPath == t.GetFsPath() {
  101. if t.File != nil {
  102. return t.File.Name()
  103. }
  104. return t.fsPath
  105. }
  106. return ""
  107. }
  108. // SetCancelFn sets the cancel function for the transfer
  109. func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
  110. t.cancelFn = cancelFn
  111. }
  112. // Truncate changes the size of the opened file.
  113. // Supported for local fs only
  114. func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
  115. if fsPath == t.GetFsPath() {
  116. if t.File != nil {
  117. initialSize := t.InitialSize
  118. err := t.File.Truncate(size)
  119. if err == nil {
  120. t.Lock()
  121. t.InitialSize = size
  122. if t.MaxWriteSize > 0 {
  123. sizeDiff := initialSize - size
  124. t.MaxWriteSize += sizeDiff
  125. metrics.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
  126. atomic.StoreInt64(&t.BytesReceived, 0)
  127. }
  128. t.Unlock()
  129. }
  130. t.Connection.Log(logger.LevelDebug, "file %#v truncated to size %v max write size %v new initial size %v err: %v",
  131. fsPath, size, t.MaxWriteSize, t.InitialSize, err)
  132. return initialSize, err
  133. }
  134. if size == 0 && atomic.LoadInt64(&t.BytesSent) == 0 {
  135. // for cloud providers the file is always truncated to zero, we don't support append/resume for uploads
  136. return 0, nil
  137. }
  138. return 0, ErrOpUnsupported
  139. }
  140. return 0, errTransferMismatch
  141. }
  142. // TransferError is called if there is an unexpected error.
  143. // For example network or client issues
  144. func (t *BaseTransfer) TransferError(err error) {
  145. t.Lock()
  146. defer t.Unlock()
  147. if t.ErrTransfer != nil {
  148. return
  149. }
  150. t.ErrTransfer = err
  151. if t.cancelFn != nil {
  152. t.cancelFn()
  153. }
  154. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  155. t.Connection.Log(logger.LevelWarn, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
  156. "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, atomic.LoadInt64(&t.BytesSent),
  157. atomic.LoadInt64(&t.BytesReceived), elapsed)
  158. }
  159. func (t *BaseTransfer) getUploadFileSize() (int64, error) {
  160. var fileSize int64
  161. info, err := t.Fs.Stat(t.fsPath)
  162. if err == nil {
  163. fileSize = info.Size()
  164. }
  165. if vfs.IsCryptOsFs(t.Fs) && t.ErrTransfer != nil {
  166. errDelete := os.Remove(t.fsPath)
  167. if errDelete != nil {
  168. t.Connection.Log(logger.LevelWarn, "error removing partial crypto file %#v: %v", t.fsPath, errDelete)
  169. }
  170. }
  171. return fileSize, err
  172. }
  173. // Close it is called when the transfer is completed.
  174. // It logs the transfer info, updates the user quota (for uploads)
  175. // and executes any defined action.
  176. // If there is an error no action will be executed and, in atomic mode,
  177. // we try to delete the temporary file
  178. func (t *BaseTransfer) Close() error {
  179. defer t.Connection.RemoveTransfer(t)
  180. var err error
  181. numFiles := 0
  182. if t.isNewFile {
  183. numFiles = 1
  184. }
  185. metrics.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
  186. if t.ErrTransfer == ErrQuotaExceeded && t.File != nil {
  187. // if quota is exceeded we try to remove the partial file for uploads to local filesystem
  188. err = os.Remove(t.File.Name())
  189. if err == nil {
  190. numFiles--
  191. atomic.StoreInt64(&t.BytesReceived, 0)
  192. t.MinWriteOffset = 0
  193. }
  194. t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %#v, deletion error: %v",
  195. t.File.Name(), err)
  196. } else if t.transferType == TransferUpload && t.File != nil && t.File.Name() != t.fsPath {
  197. if t.ErrTransfer == nil || Config.UploadMode == UploadModeAtomicWithResume {
  198. err = os.Rename(t.File.Name(), t.fsPath)
  199. t.Connection.Log(logger.LevelDebug, "atomic upload completed, rename: %#v -> %#v, error: %v",
  200. t.File.Name(), t.fsPath, err)
  201. } else {
  202. err = os.Remove(t.File.Name())
  203. t.Connection.Log(logger.LevelWarn, "atomic upload completed with error: \"%v\", delete temporary file: %#v, "+
  204. "deletion error: %v", t.ErrTransfer, t.File.Name(), err)
  205. if err == nil {
  206. numFiles--
  207. atomic.StoreInt64(&t.BytesReceived, 0)
  208. t.MinWriteOffset = 0
  209. }
  210. }
  211. }
  212. elapsed := time.Since(t.start).Nanoseconds() / 1000000
  213. if t.transferType == TransferDownload {
  214. logger.TransferLog(downloadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesSent), t.Connection.User.Username,
  215. t.Connection.ID, t.Connection.protocol)
  216. action := newActionNotification(&t.Connection.User, operationDownload, t.fsPath, "", "", t.Connection.protocol,
  217. atomic.LoadInt64(&t.BytesSent), t.ErrTransfer)
  218. go actionHandler.Handle(action) //nolint:errcheck
  219. } else {
  220. fileSize := atomic.LoadInt64(&t.BytesReceived) + t.MinWriteOffset
  221. if statSize, err := t.getUploadFileSize(); err == nil {
  222. fileSize = statSize
  223. }
  224. t.Connection.Log(logger.LevelDebug, "uploaded file size %v", fileSize)
  225. t.updateQuota(numFiles, fileSize)
  226. logger.TransferLog(uploadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesReceived), t.Connection.User.Username,
  227. t.Connection.ID, t.Connection.protocol)
  228. action := newActionNotification(&t.Connection.User, operationUpload, t.fsPath, "", "", t.Connection.protocol,
  229. fileSize, t.ErrTransfer)
  230. go actionHandler.Handle(action) //nolint:errcheck
  231. }
  232. if t.ErrTransfer != nil {
  233. t.Connection.Log(logger.LevelWarn, "transfer error: %v, path: %#v", t.ErrTransfer, t.fsPath)
  234. if err == nil {
  235. err = t.ErrTransfer
  236. }
  237. }
  238. return err
  239. }
  240. func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
  241. // S3 uploads are atomic, if there is an error nothing is uploaded
  242. if t.File == nil && t.ErrTransfer != nil {
  243. return false
  244. }
  245. sizeDiff := fileSize - t.InitialSize
  246. if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff > 0) {
  247. vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
  248. if err == nil {
  249. dataprovider.UpdateVirtualFolderQuota(vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
  250. sizeDiff, false)
  251. if vfolder.IsIncludedInUserQuota() {
  252. dataprovider.UpdateUserQuota(t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  253. }
  254. } else {
  255. dataprovider.UpdateUserQuota(t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
  256. }
  257. return true
  258. }
  259. return false
  260. }
  261. // HandleThrottle manage bandwidth throttling
  262. func (t *BaseTransfer) HandleThrottle() {
  263. var wantedBandwidth int64
  264. var trasferredBytes int64
  265. if t.transferType == TransferDownload {
  266. wantedBandwidth = t.Connection.User.DownloadBandwidth
  267. trasferredBytes = atomic.LoadInt64(&t.BytesSent)
  268. } else {
  269. wantedBandwidth = t.Connection.User.UploadBandwidth
  270. trasferredBytes = atomic.LoadInt64(&t.BytesReceived)
  271. }
  272. if wantedBandwidth > 0 {
  273. // real and wanted elapsed as milliseconds, bytes as kilobytes
  274. realElapsed := time.Since(t.start).Nanoseconds() / 1000000
  275. // trasferredBytes / 1000 = KB/s, we multiply for 1000 to get milliseconds
  276. wantedElapsed := 1000 * (trasferredBytes / 1000) / wantedBandwidth
  277. if wantedElapsed > realElapsed {
  278. toSleep := time.Duration(wantedElapsed - realElapsed)
  279. time.Sleep(toSleep * time.Millisecond)
  280. }
  281. }
  282. }