|
@@ -35,8 +35,8 @@ var (
|
|
|
// BaseTransfer contains protocols common transfer details for an upload or a download.
|
|
|
type BaseTransfer struct { //nolint:maligned
|
|
|
ID int64
|
|
|
- BytesSent int64
|
|
|
- BytesReceived int64
|
|
|
+ BytesSent atomic.Int64
|
|
|
+ BytesReceived atomic.Int64
|
|
|
Fs vfs.Fs
|
|
|
File vfs.File
|
|
|
Connection *BaseConnection
|
|
@@ -52,7 +52,7 @@ type BaseTransfer struct { //nolint:maligned
|
|
|
truncatedSize int64
|
|
|
isNewFile bool
|
|
|
transferType int
|
|
|
- AbortTransfer int32
|
|
|
+ AbortTransfer atomic.Bool
|
|
|
aTime time.Time
|
|
|
mTime time.Time
|
|
|
transferQuota dataprovider.TransferQuota
|
|
@@ -79,14 +79,14 @@ func NewBaseTransfer(file vfs.File, conn *BaseConnection, cancelFn func(), fsPat
|
|
|
InitialSize: initialSize,
|
|
|
isNewFile: isNewFile,
|
|
|
requestPath: requestPath,
|
|
|
- BytesSent: 0,
|
|
|
- BytesReceived: 0,
|
|
|
MaxWriteSize: maxWriteSize,
|
|
|
- AbortTransfer: 0,
|
|
|
truncatedSize: truncatedSize,
|
|
|
transferQuota: transferQuota,
|
|
|
Fs: fs,
|
|
|
}
|
|
|
+ t.AbortTransfer.Store(false)
|
|
|
+ t.BytesSent.Store(0)
|
|
|
+ t.BytesReceived.Store(0)
|
|
|
|
|
|
conn.AddTransfer(t)
|
|
|
return t
|
|
@@ -115,19 +115,19 @@ func (t *BaseTransfer) GetType() int {
|
|
|
// GetSize returns the transferred size
|
|
|
func (t *BaseTransfer) GetSize() int64 {
|
|
|
if t.transferType == TransferDownload {
|
|
|
- return atomic.LoadInt64(&t.BytesSent)
|
|
|
+ return t.BytesSent.Load()
|
|
|
}
|
|
|
- return atomic.LoadInt64(&t.BytesReceived)
|
|
|
+ return t.BytesReceived.Load()
|
|
|
}
|
|
|
|
|
|
// GetDownloadedSize returns the transferred size
|
|
|
func (t *BaseTransfer) GetDownloadedSize() int64 {
|
|
|
- return atomic.LoadInt64(&t.BytesSent)
|
|
|
+ return t.BytesSent.Load()
|
|
|
}
|
|
|
|
|
|
// GetUploadedSize returns the transferred size
|
|
|
func (t *BaseTransfer) GetUploadedSize() int64 {
|
|
|
- return atomic.LoadInt64(&t.BytesReceived)
|
|
|
+ return t.BytesReceived.Load()
|
|
|
}
|
|
|
|
|
|
// GetStartTime returns the start time
|
|
@@ -153,7 +153,7 @@ func (t *BaseTransfer) SignalClose(err error) {
|
|
|
t.Lock()
|
|
|
t.errAbort = err
|
|
|
t.Unlock()
|
|
|
- atomic.StoreInt32(&(t.AbortTransfer), 1)
|
|
|
+ t.AbortTransfer.Store(true)
|
|
|
}
|
|
|
|
|
|
// GetTruncatedSize returns the truncated sized if this is an upload overwriting
|
|
@@ -217,11 +217,11 @@ func (t *BaseTransfer) CheckRead() error {
|
|
|
return nil
|
|
|
}
|
|
|
if t.transferQuota.AllowedTotalSize > 0 {
|
|
|
- if atomic.LoadInt64(&t.BytesSent)+atomic.LoadInt64(&t.BytesReceived) > t.transferQuota.AllowedTotalSize {
|
|
|
+ if t.BytesSent.Load()+t.BytesReceived.Load() > t.transferQuota.AllowedTotalSize {
|
|
|
return t.Connection.GetReadQuotaExceededError()
|
|
|
}
|
|
|
} else if t.transferQuota.AllowedDLSize > 0 {
|
|
|
- if atomic.LoadInt64(&t.BytesSent) > t.transferQuota.AllowedDLSize {
|
|
|
+ if t.BytesSent.Load() > t.transferQuota.AllowedDLSize {
|
|
|
return t.Connection.GetReadQuotaExceededError()
|
|
|
}
|
|
|
}
|
|
@@ -230,18 +230,18 @@ func (t *BaseTransfer) CheckRead() error {
|
|
|
|
|
|
// CheckWrite returns an error if write if not allowed
|
|
|
func (t *BaseTransfer) CheckWrite() error {
|
|
|
- if t.MaxWriteSize > 0 && atomic.LoadInt64(&t.BytesReceived) > t.MaxWriteSize {
|
|
|
+ if t.MaxWriteSize > 0 && t.BytesReceived.Load() > t.MaxWriteSize {
|
|
|
return t.Connection.GetQuotaExceededError()
|
|
|
}
|
|
|
if t.transferQuota.AllowedULSize == 0 && t.transferQuota.AllowedTotalSize == 0 {
|
|
|
return nil
|
|
|
}
|
|
|
if t.transferQuota.AllowedTotalSize > 0 {
|
|
|
- if atomic.LoadInt64(&t.BytesSent)+atomic.LoadInt64(&t.BytesReceived) > t.transferQuota.AllowedTotalSize {
|
|
|
+ if t.BytesSent.Load()+t.BytesReceived.Load() > t.transferQuota.AllowedTotalSize {
|
|
|
return t.Connection.GetQuotaExceededError()
|
|
|
}
|
|
|
} else if t.transferQuota.AllowedULSize > 0 {
|
|
|
- if atomic.LoadInt64(&t.BytesReceived) > t.transferQuota.AllowedULSize {
|
|
|
+ if t.BytesReceived.Load() > t.transferQuota.AllowedULSize {
|
|
|
return t.Connection.GetQuotaExceededError()
|
|
|
}
|
|
|
}
|
|
@@ -261,14 +261,14 @@ func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
|
|
|
if t.MaxWriteSize > 0 {
|
|
|
sizeDiff := initialSize - size
|
|
|
t.MaxWriteSize += sizeDiff
|
|
|
- metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived),
|
|
|
+ metric.TransferCompleted(t.BytesSent.Load(), t.BytesReceived.Load(),
|
|
|
t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
|
|
|
if t.transferQuota.HasSizeLimits() {
|
|
|
go func(ulSize, dlSize int64, user dataprovider.User) {
|
|
|
dataprovider.UpdateUserTransferQuota(&user, ulSize, dlSize, false) //nolint:errcheck
|
|
|
- }(atomic.LoadInt64(&t.BytesReceived), atomic.LoadInt64(&t.BytesSent), t.Connection.User)
|
|
|
+ }(t.BytesReceived.Load(), t.BytesSent.Load(), t.Connection.User)
|
|
|
}
|
|
|
- atomic.StoreInt64(&t.BytesReceived, 0)
|
|
|
+ t.BytesReceived.Store(0)
|
|
|
}
|
|
|
t.Unlock()
|
|
|
}
|
|
@@ -276,7 +276,7 @@ func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
|
|
|
fsPath, size, t.MaxWriteSize, t.InitialSize, err)
|
|
|
return initialSize, err
|
|
|
}
|
|
|
- if size == 0 && atomic.LoadInt64(&t.BytesSent) == 0 {
|
|
|
+ if size == 0 && t.BytesSent.Load() == 0 {
|
|
|
// for cloud providers the file is always truncated to zero, we don't support append/resume for uploads
|
|
|
// for buffered SFTP we can have buffered bytes so we returns an error
|
|
|
if !vfs.IsBufferedSFTPFs(t.Fs) {
|
|
@@ -302,8 +302,8 @@ func (t *BaseTransfer) TransferError(err error) {
|
|
|
}
|
|
|
elapsed := time.Since(t.start).Nanoseconds() / 1000000
|
|
|
t.Connection.Log(logger.LevelError, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
|
|
|
- "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, atomic.LoadInt64(&t.BytesSent),
|
|
|
- atomic.LoadInt64(&t.BytesReceived), elapsed)
|
|
|
+ "bytes received: %v transfer running since %v ms", t.fsPath, t.ErrTransfer, t.BytesSent.Load(),
|
|
|
+ t.BytesReceived.Load(), elapsed)
|
|
|
}
|
|
|
|
|
|
func (t *BaseTransfer) getUploadFileSize() (int64, error) {
|
|
@@ -333,7 +333,7 @@ func (t *BaseTransfer) checkUploadOutsideHomeDir(err error) int {
|
|
|
t.Connection.Log(logger.LevelWarn, "upload in temp path cannot be renamed, delete temporary file: %#v, deletion error: %v",
|
|
|
t.effectiveFsPath, err)
|
|
|
// the file is outside the home dir so don't update the quota
|
|
|
- atomic.StoreInt64(&t.BytesReceived, 0)
|
|
|
+ t.BytesReceived.Store(0)
|
|
|
t.MinWriteOffset = 0
|
|
|
return 1
|
|
|
}
|
|
@@ -351,18 +351,18 @@ func (t *BaseTransfer) Close() error {
|
|
|
if t.isNewFile {
|
|
|
numFiles = 1
|
|
|
}
|
|
|
- metric.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived),
|
|
|
+ metric.TransferCompleted(t.BytesSent.Load(), t.BytesReceived.Load(),
|
|
|
t.transferType, t.ErrTransfer, vfs.IsSFTPFs(t.Fs))
|
|
|
if t.transferQuota.HasSizeLimits() {
|
|
|
- dataprovider.UpdateUserTransferQuota(&t.Connection.User, atomic.LoadInt64(&t.BytesReceived), //nolint:errcheck
|
|
|
- atomic.LoadInt64(&t.BytesSent), false)
|
|
|
+ dataprovider.UpdateUserTransferQuota(&t.Connection.User, t.BytesReceived.Load(), //nolint:errcheck
|
|
|
+ t.BytesSent.Load(), false)
|
|
|
}
|
|
|
if t.File != nil && t.Connection.IsQuotaExceededError(t.ErrTransfer) {
|
|
|
// if quota is exceeded we try to remove the partial file for uploads to local filesystem
|
|
|
err = t.Fs.Remove(t.File.Name(), false)
|
|
|
if err == nil {
|
|
|
numFiles--
|
|
|
- atomic.StoreInt64(&t.BytesReceived, 0)
|
|
|
+ t.BytesReceived.Store(0)
|
|
|
t.MinWriteOffset = 0
|
|
|
}
|
|
|
t.Connection.Log(logger.LevelWarn, "upload denied due to space limit, delete temporary file: %#v, deletion error: %v",
|
|
@@ -380,7 +380,7 @@ func (t *BaseTransfer) Close() error {
|
|
|
t.ErrTransfer, t.effectiveFsPath, err)
|
|
|
if err == nil {
|
|
|
numFiles--
|
|
|
- atomic.StoreInt64(&t.BytesReceived, 0)
|
|
|
+ t.BytesReceived.Store(0)
|
|
|
t.MinWriteOffset = 0
|
|
|
}
|
|
|
}
|
|
@@ -388,12 +388,12 @@ func (t *BaseTransfer) Close() error {
|
|
|
elapsed := time.Since(t.start).Nanoseconds() / 1000000
|
|
|
var uploadFileSize int64
|
|
|
if t.transferType == TransferDownload {
|
|
|
- logger.TransferLog(downloadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesSent), t.Connection.User.Username,
|
|
|
+ logger.TransferLog(downloadLogSender, t.fsPath, elapsed, t.BytesSent.Load(), t.Connection.User.Username,
|
|
|
t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
|
|
|
ExecuteActionNotification(t.Connection, operationDownload, t.fsPath, t.requestPath, "", "", "", //nolint:errcheck
|
|
|
- atomic.LoadInt64(&t.BytesSent), t.ErrTransfer)
|
|
|
+ t.BytesSent.Load(), t.ErrTransfer)
|
|
|
} else {
|
|
|
- uploadFileSize = atomic.LoadInt64(&t.BytesReceived) + t.MinWriteOffset
|
|
|
+ uploadFileSize = t.BytesReceived.Load() + t.MinWriteOffset
|
|
|
if statSize, errStat := t.getUploadFileSize(); errStat == nil {
|
|
|
uploadFileSize = statSize
|
|
|
}
|
|
@@ -401,7 +401,7 @@ func (t *BaseTransfer) Close() error {
|
|
|
numFiles, uploadFileSize = t.executeUploadHook(numFiles, uploadFileSize)
|
|
|
t.updateQuota(numFiles, uploadFileSize)
|
|
|
t.updateTimes()
|
|
|
- logger.TransferLog(uploadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesReceived), t.Connection.User.Username,
|
|
|
+ logger.TransferLog(uploadLogSender, t.fsPath, elapsed, t.BytesReceived.Load(), t.Connection.User.Username,
|
|
|
t.Connection.ID, t.Connection.protocol, t.Connection.localAddr, t.Connection.remoteAddr, t.ftpMode)
|
|
|
}
|
|
|
if t.ErrTransfer != nil {
|
|
@@ -428,11 +428,11 @@ func (t *BaseTransfer) updateTransferTimestamps(uploadFileSize int64) {
|
|
|
}
|
|
|
return
|
|
|
}
|
|
|
- if t.Connection.User.FirstDownload == 0 && !t.Connection.downloadDone.Load() && atomic.LoadInt64(&t.BytesSent) > 0 {
|
|
|
+ if t.Connection.User.FirstDownload == 0 && !t.Connection.downloadDone.Load() && t.BytesSent.Load() > 0 {
|
|
|
if err := dataprovider.UpdateUserTransferTimestamps(t.Connection.User.Username, false); err == nil {
|
|
|
t.Connection.downloadDone.Store(true)
|
|
|
ExecuteActionNotification(t.Connection, operationFirstDownload, t.fsPath, t.requestPath, "", //nolint:errcheck
|
|
|
- "", "", atomic.LoadInt64(&t.BytesSent), t.ErrTransfer)
|
|
|
+ "", "", t.BytesSent.Load(), t.ErrTransfer)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -449,7 +449,7 @@ func (t *BaseTransfer) executeUploadHook(numFiles int, fileSize int64) (int, int
|
|
|
if err == nil {
|
|
|
numFiles--
|
|
|
fileSize = 0
|
|
|
- atomic.StoreInt64(&t.BytesReceived, 0)
|
|
|
+ t.BytesReceived.Store(0)
|
|
|
t.MinWriteOffset = 0
|
|
|
} else {
|
|
|
t.Connection.Log(logger.LevelWarn, "unable to remove path %q after upload hook failure: %v", t.fsPath, err)
|
|
@@ -494,10 +494,10 @@ func (t *BaseTransfer) HandleThrottle() {
|
|
|
var trasferredBytes int64
|
|
|
if t.transferType == TransferDownload {
|
|
|
wantedBandwidth = t.Connection.User.DownloadBandwidth
|
|
|
- trasferredBytes = atomic.LoadInt64(&t.BytesSent)
|
|
|
+ trasferredBytes = t.BytesSent.Load()
|
|
|
} else {
|
|
|
wantedBandwidth = t.Connection.User.UploadBandwidth
|
|
|
- trasferredBytes = atomic.LoadInt64(&t.BytesReceived)
|
|
|
+ trasferredBytes = t.BytesReceived.Load()
|
|
|
}
|
|
|
if wantedBandwidth > 0 {
|
|
|
// real and wanted elapsed as milliseconds, bytes as kilobytes
|