mirror of
https://github.com/drakkan/sftpgo.git
synced 2024-11-25 17:10:28 +00:00
784b7585c1
so we don't have to update all the files every year Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
329 lines
9.7 KiB
Go
329 lines
9.7 KiB
Go
// Copyright (C) 2019 Nicola Murino
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published
|
|
// by the Free Software Foundation, version 3.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
package common
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/drakkan/sftpgo/v2/internal/dataprovider"
|
|
"github.com/drakkan/sftpgo/v2/internal/logger"
|
|
"github.com/drakkan/sftpgo/v2/internal/util"
|
|
)
|
|
|
|
type overquotaTransfer struct {
|
|
ConnID string
|
|
TransferID int64
|
|
TransferType int
|
|
}
|
|
|
|
type uploadAggregationKey struct {
|
|
Username string
|
|
FolderName string
|
|
}
|
|
|
|
// TransfersChecker defines the interface that transfer checkers must implement.
|
|
// A transfer checker ensure that multiple concurrent transfers does not exceeded
|
|
// the remaining user quota
|
|
type TransfersChecker interface {
|
|
AddTransfer(transfer dataprovider.ActiveTransfer)
|
|
RemoveTransfer(ID int64, connectionID string)
|
|
UpdateTransferCurrentSizes(ulSize, dlSize, ID int64, connectionID string)
|
|
GetOverquotaTransfers() []overquotaTransfer
|
|
}
|
|
|
|
func getTransfersChecker(isShared int) TransfersChecker {
|
|
if isShared == 1 {
|
|
logger.Info(logSender, "", "using provider transfer checker")
|
|
return &transfersCheckerDB{}
|
|
}
|
|
logger.Info(logSender, "", "using memory transfer checker")
|
|
return &transfersCheckerMem{}
|
|
}
|
|
|
|
type baseTransferChecker struct {
|
|
transfers []dataprovider.ActiveTransfer
|
|
}
|
|
|
|
func (t *baseTransferChecker) isDataTransferExceeded(user dataprovider.User, transfer dataprovider.ActiveTransfer, ulSize,
|
|
dlSize int64,
|
|
) bool {
|
|
ulQuota, dlQuota, totalQuota := user.GetDataTransferLimits()
|
|
if totalQuota > 0 {
|
|
allowedSize := totalQuota - (user.UsedUploadDataTransfer + user.UsedDownloadDataTransfer)
|
|
if ulSize+dlSize > allowedSize {
|
|
return transfer.CurrentDLSize > 0 || transfer.CurrentULSize > 0
|
|
}
|
|
}
|
|
if dlQuota > 0 {
|
|
allowedSize := dlQuota - user.UsedDownloadDataTransfer
|
|
if dlSize > allowedSize {
|
|
return transfer.CurrentDLSize > 0
|
|
}
|
|
}
|
|
if ulQuota > 0 {
|
|
allowedSize := ulQuota - user.UsedUploadDataTransfer
|
|
if ulSize > allowedSize {
|
|
return transfer.CurrentULSize > 0
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (t *baseTransferChecker) getRemainingDiskQuota(user dataprovider.User, folderName string) (int64, error) {
|
|
var result int64
|
|
|
|
if folderName != "" {
|
|
for _, folder := range user.VirtualFolders {
|
|
if folder.Name == folderName {
|
|
if folder.QuotaSize > 0 {
|
|
return folder.QuotaSize - folder.UsedQuotaSize, nil
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
if user.QuotaSize > 0 {
|
|
return user.QuotaSize - user.UsedQuotaSize, nil
|
|
}
|
|
}
|
|
|
|
return result, errors.New("no quota limit defined")
|
|
}
|
|
|
|
func (t *baseTransferChecker) aggregateTransfersByUser(usersToFetch map[string]bool,
|
|
) (map[string]bool, map[string][]dataprovider.ActiveTransfer) {
|
|
aggregations := make(map[string][]dataprovider.ActiveTransfer)
|
|
for _, transfer := range t.transfers {
|
|
aggregations[transfer.Username] = append(aggregations[transfer.Username], transfer)
|
|
if len(aggregations[transfer.Username]) > 1 {
|
|
if _, ok := usersToFetch[transfer.Username]; !ok {
|
|
usersToFetch[transfer.Username] = false
|
|
}
|
|
}
|
|
}
|
|
|
|
return usersToFetch, aggregations
|
|
}
|
|
|
|
func (t *baseTransferChecker) aggregateUploadTransfers() (map[string]bool, map[int][]dataprovider.ActiveTransfer) {
|
|
usersToFetch := make(map[string]bool)
|
|
aggregations := make(map[int][]dataprovider.ActiveTransfer)
|
|
var keys []uploadAggregationKey
|
|
|
|
for _, transfer := range t.transfers {
|
|
if transfer.Type != TransferUpload {
|
|
continue
|
|
}
|
|
key := -1
|
|
for idx, k := range keys {
|
|
if k.Username == transfer.Username && k.FolderName == transfer.FolderName {
|
|
key = idx
|
|
break
|
|
}
|
|
}
|
|
if key == -1 {
|
|
key = len(keys)
|
|
}
|
|
keys = append(keys, uploadAggregationKey{
|
|
Username: transfer.Username,
|
|
FolderName: transfer.FolderName,
|
|
})
|
|
|
|
aggregations[key] = append(aggregations[key], transfer)
|
|
if len(aggregations[key]) > 1 {
|
|
if transfer.FolderName != "" {
|
|
usersToFetch[transfer.Username] = true
|
|
} else {
|
|
if _, ok := usersToFetch[transfer.Username]; !ok {
|
|
usersToFetch[transfer.Username] = false
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return usersToFetch, aggregations
|
|
}
|
|
|
|
func (t *baseTransferChecker) getUsersToCheck(usersToFetch map[string]bool) (map[string]dataprovider.User, error) {
|
|
users, err := dataprovider.GetUsersForQuotaCheck(usersToFetch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
usersMap := make(map[string]dataprovider.User)
|
|
|
|
for _, user := range users {
|
|
usersMap[user.Username] = user
|
|
}
|
|
|
|
return usersMap, nil
|
|
}
|
|
|
|
func (t *baseTransferChecker) getOverquotaTransfers(usersToFetch map[string]bool,
|
|
uploadAggregations map[int][]dataprovider.ActiveTransfer,
|
|
userAggregations map[string][]dataprovider.ActiveTransfer,
|
|
) []overquotaTransfer {
|
|
if len(usersToFetch) == 0 {
|
|
return nil
|
|
}
|
|
usersMap, err := t.getUsersToCheck(usersToFetch)
|
|
if err != nil {
|
|
logger.Warn(logSender, "", "unable to check transfers, error getting users quota: %v", err)
|
|
return nil
|
|
}
|
|
|
|
var overquotaTransfers []overquotaTransfer
|
|
|
|
for _, transfers := range uploadAggregations {
|
|
username := transfers[0].Username
|
|
folderName := transfers[0].FolderName
|
|
remaningDiskQuota, err := t.getRemainingDiskQuota(usersMap[username], folderName)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
var usedDiskQuota int64
|
|
for _, tr := range transfers {
|
|
// We optimistically assume that a cloud transfer that replaces an existing
|
|
// file will be successful
|
|
usedDiskQuota += tr.CurrentULSize - tr.TruncatedSize
|
|
}
|
|
logger.Debug(logSender, "", "username %q, folder %q, concurrent transfers: %v, remaining disk quota (bytes): %v, disk quota used in ongoing transfers (bytes): %v",
|
|
username, folderName, len(transfers), remaningDiskQuota, usedDiskQuota)
|
|
if usedDiskQuota > remaningDiskQuota {
|
|
for _, tr := range transfers {
|
|
if tr.CurrentULSize > tr.TruncatedSize {
|
|
overquotaTransfers = append(overquotaTransfers, overquotaTransfer{
|
|
ConnID: tr.ConnID,
|
|
TransferID: tr.ID,
|
|
TransferType: tr.Type,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for username, transfers := range userAggregations {
|
|
var ulSize, dlSize int64
|
|
for _, tr := range transfers {
|
|
ulSize += tr.CurrentULSize
|
|
dlSize += tr.CurrentDLSize
|
|
}
|
|
logger.Debug(logSender, "", "username %q, concurrent transfers: %v, quota (bytes) used in ongoing transfers, ul: %v, dl: %v",
|
|
username, len(transfers), ulSize, dlSize)
|
|
for _, tr := range transfers {
|
|
if t.isDataTransferExceeded(usersMap[username], tr, ulSize, dlSize) {
|
|
overquotaTransfers = append(overquotaTransfers, overquotaTransfer{
|
|
ConnID: tr.ConnID,
|
|
TransferID: tr.ID,
|
|
TransferType: tr.Type,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
return overquotaTransfers
|
|
}
|
|
|
|
type transfersCheckerMem struct {
|
|
sync.RWMutex
|
|
baseTransferChecker
|
|
}
|
|
|
|
func (t *transfersCheckerMem) AddTransfer(transfer dataprovider.ActiveTransfer) {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
|
|
t.transfers = append(t.transfers, transfer)
|
|
}
|
|
|
|
func (t *transfersCheckerMem) RemoveTransfer(ID int64, connectionID string) {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
|
|
for idx, transfer := range t.transfers {
|
|
if transfer.ID == ID && transfer.ConnID == connectionID {
|
|
lastIdx := len(t.transfers) - 1
|
|
t.transfers[idx] = t.transfers[lastIdx]
|
|
t.transfers = t.transfers[:lastIdx]
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *transfersCheckerMem) UpdateTransferCurrentSizes(ulSize, dlSize, ID int64, connectionID string) {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
|
|
for idx := range t.transfers {
|
|
if t.transfers[idx].ID == ID && t.transfers[idx].ConnID == connectionID {
|
|
t.transfers[idx].CurrentDLSize = dlSize
|
|
t.transfers[idx].CurrentULSize = ulSize
|
|
t.transfers[idx].UpdatedAt = util.GetTimeAsMsSinceEpoch(time.Now())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *transfersCheckerMem) GetOverquotaTransfers() []overquotaTransfer {
|
|
t.RLock()
|
|
|
|
usersToFetch, uploadAggregations := t.aggregateUploadTransfers()
|
|
usersToFetch, userAggregations := t.aggregateTransfersByUser(usersToFetch)
|
|
|
|
t.RUnlock()
|
|
|
|
return t.getOverquotaTransfers(usersToFetch, uploadAggregations, userAggregations)
|
|
}
|
|
|
|
type transfersCheckerDB struct {
|
|
baseTransferChecker
|
|
lastCleanup time.Time
|
|
}
|
|
|
|
func (t *transfersCheckerDB) AddTransfer(transfer dataprovider.ActiveTransfer) {
|
|
dataprovider.AddActiveTransfer(transfer)
|
|
}
|
|
|
|
func (t *transfersCheckerDB) RemoveTransfer(ID int64, connectionID string) {
|
|
dataprovider.RemoveActiveTransfer(ID, connectionID)
|
|
}
|
|
|
|
func (t *transfersCheckerDB) UpdateTransferCurrentSizes(ulSize, dlSize, ID int64, connectionID string) {
|
|
dataprovider.UpdateActiveTransferSizes(ulSize, dlSize, ID, connectionID)
|
|
}
|
|
|
|
func (t *transfersCheckerDB) GetOverquotaTransfers() []overquotaTransfer {
|
|
if t.lastCleanup.IsZero() || t.lastCleanup.Add(periodicTimeoutCheckInterval*15).Before(time.Now()) {
|
|
before := time.Now().Add(-periodicTimeoutCheckInterval * 5)
|
|
err := dataprovider.CleanupActiveTransfers(before)
|
|
logger.Debug(logSender, "", "cleanup active transfers completed, err: %v", err)
|
|
if err == nil {
|
|
t.lastCleanup = time.Now()
|
|
}
|
|
}
|
|
var err error
|
|
from := time.Now().Add(-periodicTimeoutCheckInterval * 2)
|
|
t.transfers, err = dataprovider.GetActiveTransfers(from)
|
|
if err != nil {
|
|
logger.Error(logSender, "", "unable to check overquota transfers, error getting active transfers: %v", err)
|
|
return nil
|
|
}
|
|
|
|
usersToFetch, uploadAggregations := t.aggregateUploadTransfers()
|
|
usersToFetch, userAggregations := t.aggregateTransfersByUser(usersToFetch)
|
|
|
|
return t.getOverquotaTransfers(usersToFetch, uploadAggregations, userAggregations)
|
|
}
|