123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- package controller
- import (
- "context"
- "fmt"
- "strconv"
- "strings"
- "github.com/ente-io/museum/ente"
- "github.com/ente-io/museum/pkg/repo"
- "github.com/ente-io/museum/pkg/utils/time"
- "github.com/ente-io/stacktrace"
- "github.com/google/uuid"
- log "github.com/sirupsen/logrus"
- )
- // TrashController has the business logic related to trash feature
- type TrashController struct {
- TrashRepo *repo.TrashRepository
- FileRepo *repo.FileRepository
- CollectionRepo *repo.CollectionRepository
- QueueRepo *repo.QueueRepository
- TaskLockRepo *repo.TaskLockRepository
- HostName string
- dropFileMetadataRunning bool
- collectionTrashRunning bool
- emptyTrashRunning bool
- // deleteAgedTrashRunning indicates whether the cron to delete trashed files which are in trash
- // since repo.TrashDurationInDays is running
- deleteAgedTrashRunning bool
- }
- // GetDiff returns the changes in user's trash since a timestamp, along with hasMore bool flag.
- func (t *TrashController) GetDiff(userID int64, sinceTime int64, stripMetadata bool, app ente.App) ([]ente.Trash, bool, error) {
- trashFilesDiff, hasMore, err := t.getDiff(userID, sinceTime, repo.TrashDiffLimit, app)
- if err != nil {
- return nil, false, err
- }
- // hide private metadata before returning files info in diff
- if stripMetadata {
- for _, trashFile := range trashFilesDiff {
- if trashFile.IsDeleted {
- trashFile.File.MagicMetadata = nil
- trashFile.File.PubicMagicMetadata = nil
- trashFile.File.Metadata = ente.FileAttributes{}
- trashFile.File.Info = nil
- }
- }
- }
- return trashFilesDiff, hasMore, err
- }
- // GetDiff returns the diff in user's trash since a timestamp, along with hasMore bool flag.
- // The function will never return partial result for a version. To maintain this promise, it will not be able to honor
- // the limit parameter. Based on the db state, compared to the limit, the diff length can be
- // less (case 1), more (case 2), or same (case 3, 4)
- // Example: Assume we have 11 files with following versions: v0, v1, v1, v1, v1, v1, v1, v1, v2, v2, v2 (count = 7 v1, 3 v2)
- // client has synced up till version v0.
- // case 1: ( sinceTime: v0, limit = 8):
- // The method will discard the entries with version v2 and return only 7 entries with version v1.
- // case 2: (sinceTime: v0, limit 5):
- // Instead of returning 5 entries with version V1, method will return all 7 entries with version v1.
- // case 3: (sinceTime: v0, limit 7):
- // The method will return all 7 entries with version V1.
- // case 4: (sinceTime: v0, limit >=10):
- // The method will all 10 entries in the diff
- func (t *TrashController) getDiff(userID int64, sinceTime int64, limit int, app ente.App) ([]ente.Trash, bool, error) {
- // request for limit +1 files
- diffLimitPlusOne, err := t.TrashRepo.GetDiff(userID, sinceTime, limit+1, app)
- if err != nil {
- return nil, false, stacktrace.Propagate(err, "")
- }
- if len(diffLimitPlusOne) <= limit {
- // case 4: all files changed after sinceTime are included.
- return diffLimitPlusOne, false, nil
- }
- lastFileVersion := diffLimitPlusOne[limit].UpdatedAt
- filteredDiffs := t.removeFilesWithVersion(diffLimitPlusOne, lastFileVersion)
- if len(filteredDiffs) > 0 { // case 1 or case 3
- return filteredDiffs, true, nil
- }
- // case 2
- diff, err := t.TrashRepo.GetFilesWithVersion(userID, lastFileVersion)
- if err != nil {
- return nil, false, stacktrace.Propagate(err, "")
- }
- return diff, true, nil
- }
- // Delete files permanently, queues up the file for deletion & free up the space based on file's object size
- func (t *TrashController) Delete(ctx context.Context, request ente.DeleteTrashFilesRequest) error {
- err := t.TrashRepo.Delete(ctx, request.OwnerID, request.FileIDs)
- if err != nil {
- return stacktrace.Propagate(err, "")
- }
- return nil
- }
- func (t *TrashController) EmptyTrash(ctx context.Context, userID int64, req ente.EmptyTrashRequest) error {
- err := t.TrashRepo.EmptyTrash(ctx, userID, req.LastUpdatedAt)
- if err != nil {
- return stacktrace.Propagate(err, "")
- }
- defer t.ProcessEmptyTrashRequests()
- return nil
- }
- func (t *TrashController) CleanupTrashedCollections() {
- ctxLogger := log.WithFields(log.Fields{
- "flow": "trash_collection",
- "id": uuid.New().String(),
- })
- item_processed_count := 0
- if t.collectionTrashRunning {
- ctxLogger.Info("Already moving collection to trash, skipping cron")
- return
- }
- t.collectionTrashRunning = true
- defer func() {
- ctxLogger.WithField("items_processed", item_processed_count).Info("cron run finished")
- t.collectionTrashRunning = false
- }()
- // process delete collection request for DELETE V3
- itemsV3, err2 := t.QueueRepo.GetItemsReadyForDeletion(repo.TrashCollectionQueueV3, 100)
- if err2 != nil {
- log.Error("Could not fetch from collection trash queue", err2)
- return
- }
- item_processed_count += len(itemsV3)
- for _, item := range itemsV3 {
- t.trashCollection(item, repo.TrashCollectionQueueV3, ctxLogger)
- }
- }
- func (t *TrashController) ProcessEmptyTrashRequests() {
- if t.emptyTrashRunning {
- log.Info("Already processing empty trash requests, skipping cron")
- return
- }
- t.emptyTrashRunning = true
- defer func() {
- t.emptyTrashRunning = false
- }()
- items, err := t.QueueRepo.GetItemsReadyForDeletion(repo.TrashEmptyQueue, 100)
- if err != nil {
- log.Error("Could not fetch from emptyTrashQueue queue", err)
- return
- }
- for _, item := range items {
- t.emptyTrash(item)
- }
- }
- // DeleteAgedTrashedFiles delete trashed files which are in trash since repo.TrashDurationInDays
- func (t *TrashController) DeleteAgedTrashedFiles() {
- if t.deleteAgedTrashRunning {
- log.Info("Already deleting older trashed files, skipping cron")
- return
- }
- t.deleteAgedTrashRunning = true
- defer func() {
- t.deleteAgedTrashRunning = false
- }()
- lockName := "DeleteAgedTrashedFiles"
- lockStatus, err := t.TaskLockRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), t.HostName)
- if err != nil || !lockStatus {
- log.Error("Unable to acquire lock to DeleteAgedTrashedFiles")
- return
- }
- defer func() {
- releaseErr := t.TaskLockRepo.ReleaseLock(lockName)
- if releaseErr != nil {
- log.WithError(releaseErr).Error("Error while releasing aged trash lock")
- }
- }()
- userIDToFileMap, err := t.TrashRepo.GetUserIDToFileIDsMapForDeletion()
- if err != nil {
- log.Error("Could not fetch trashed files for deletion", err)
- return
- }
- for userID, fileIDs := range userIDToFileMap {
- ctxLogger := log.WithFields(log.Fields{
- "user_id": userID,
- "fileIds": fileIDs,
- })
- ctxLogger.Info("start deleting old files from trash")
- err = t.TrashRepo.Delete(context.Background(), userID, fileIDs)
- if err != nil {
- ctxLogger.WithError(err).Error("failed to delete file from trash")
- continue
- }
- ctxLogger.Info("successfully deleted old files from trash")
- }
- }
- // removeFilesWithVersion returns filtered list of trashedFiles are removing all files with given version.
- // Important: The method assumes that trashedFiles are sorted by increasing order of Trash.UpdatedAt
- func (t *TrashController) removeFilesWithVersion(trashedFiles []ente.Trash, version int64) []ente.Trash {
- var i = len(trashedFiles) - 1
- for ; i >= 0; i-- {
- if trashedFiles[i].UpdatedAt != version {
- // found index (from end) where file's version is different from given version
- break
- }
- }
- return trashedFiles[0 : i+1]
- }
- func (t *TrashController) trashCollection(item repo.QueueItem, queueName string, logger *log.Entry) {
- cID, _ := strconv.ParseInt(item.Item, 10, 64)
- collection, err := t.CollectionRepo.Get(cID)
- if err != nil {
- log.Error("Could not fetch collection "+item.Item, err)
- return
- }
- ctxLogger := logger.WithFields(log.Fields{
- "collection_id": cID,
- "user_id": collection.Owner.ID,
- "queue": queueName,
- "flow": "trash_collection",
- })
- // to avoid race conditions while finding exclusive files, lock at user level, instead of individual collection
- lockName := fmt.Sprintf("CollectionTrash:%d", collection.Owner.ID)
- lockStatus, err := t.TaskLockRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), t.HostName)
- if err != nil || !lockStatus {
- if err == nil {
- ctxLogger.Error("lock is already taken for deleting collection")
- } else {
- ctxLogger.WithError(err).Error("critical: error while acquiring lock")
- }
- return
- }
- defer func() {
- releaseErr := t.TaskLockRepo.ReleaseLock(lockName)
- if releaseErr != nil {
- ctxLogger.WithError(releaseErr).Error("Error while releasing lock")
- }
- }()
- ctxLogger.Info("start trashing collection")
- err = t.CollectionRepo.TrashV3(context.Background(), cID)
- if err != nil {
- ctxLogger.WithError(err).Error("failed to trash collection")
- return
- }
- err = t.QueueRepo.DeleteItem(queueName, item.Item)
- if err != nil {
- ctxLogger.WithError(err).Error("failed to delete item from queue")
- return
- }
- }
- func (t *TrashController) emptyTrash(item repo.QueueItem) {
- lockName := fmt.Sprintf("EmptyTrash:%s", item.Item)
- lockStatus, err := t.TaskLockRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), t.HostName)
- split := strings.Split(item.Item, repo.EmptyTrashQueueItemSeparator)
- userID, _ := strconv.ParseInt(split[0], 10, 64)
- lastUpdateAt, _ := strconv.ParseInt(split[1], 10, 64)
- ctxLogger := log.WithFields(log.Fields{
- "user_id": userID,
- "lastUpdatedAt": lastUpdateAt,
- "flow": "empty_trash",
- })
- if err != nil || !lockStatus {
- if err == nil {
- // todo: error only when lock is help for more than X durat
- ctxLogger.Error("lock is already taken for emptying trash")
- } else {
- ctxLogger.WithError(err).Error("critical: error while acquiring lock")
- }
- return
- }
- defer func() {
- releaseErr := t.TaskLockRepo.ReleaseLock(lockName)
- if releaseErr != nil {
- log.WithError(releaseErr).Error("Error while releasing lock")
- }
- }()
- ctxLogger.Info("Start emptying trash")
- fileIDs, err := t.TrashRepo.GetFilesIDsForDeletion(userID, lastUpdateAt)
- if err != nil {
- ctxLogger.WithError(err).Error("Failed to fetch fileIDs")
- return
- }
- ctx := context.Background()
- size := len(fileIDs)
- limit := repo.TrashBatchSize
- for lb := 0; lb < size; lb += limit {
- ub := lb + limit
- if ub > size {
- ub = size
- }
- batch := fileIDs[lb:ub]
- err = t.TrashRepo.Delete(ctx, userID, batch)
- if err != nil {
- ctxLogger.WithField("batchIDs", batch).WithError(err).Error("Failed while deleting batch")
- return
- }
- }
- err = t.QueueRepo.DeleteItem(repo.TrashEmptyQueue, item.Item)
- if err != nil {
- log.Error("Error while removing item from queue "+item.Item, err)
- return
- }
- ctxLogger.Info("Finished emptying trash")
- }
|