trash.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package controller
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "github.com/ente-io/museum/ente"
  8. "github.com/ente-io/museum/pkg/repo"
  9. "github.com/ente-io/museum/pkg/utils/time"
  10. "github.com/ente-io/stacktrace"
  11. "github.com/google/uuid"
  12. log "github.com/sirupsen/logrus"
  13. )
  14. // TrashController has the business logic related to trash feature
  15. type TrashController struct {
  16. TrashRepo *repo.TrashRepository
  17. FileRepo *repo.FileRepository
  18. CollectionRepo *repo.CollectionRepository
  19. QueueRepo *repo.QueueRepository
  20. TaskLockRepo *repo.TaskLockRepository
  21. HostName string
  22. dropFileMetadataRunning bool
  23. collectionTrashRunning bool
  24. emptyTrashRunning bool
  25. // deleteAgedTrashRunning indicates whether the cron to delete trashed files which are in trash
  26. // since repo.TrashDurationInDays is running
  27. deleteAgedTrashRunning bool
  28. }
  29. // GetDiff returns the changes in user's trash since a timestamp, along with hasMore bool flag.
  30. func (t *TrashController) GetDiff(userID int64, sinceTime int64, stripMetadata bool, app ente.App) ([]ente.Trash, bool, error) {
  31. trashFilesDiff, hasMore, err := t.getDiff(userID, sinceTime, repo.TrashDiffLimit, app)
  32. if err != nil {
  33. return nil, false, err
  34. }
  35. // hide private metadata before returning files info in diff
  36. if stripMetadata {
  37. for _, trashFile := range trashFilesDiff {
  38. if trashFile.IsDeleted {
  39. trashFile.File.MagicMetadata = nil
  40. trashFile.File.PubicMagicMetadata = nil
  41. trashFile.File.Metadata = ente.FileAttributes{}
  42. trashFile.File.Info = nil
  43. }
  44. }
  45. }
  46. return trashFilesDiff, hasMore, err
  47. }
  48. // GetDiff returns the diff in user's trash since a timestamp, along with hasMore bool flag.
  49. // The function will never return partial result for a version. To maintain this promise, it will not be able to honor
  50. // the limit parameter. Based on the db state, compared to the limit, the diff length can be
  51. // less (case 1), more (case 2), or same (case 3, 4)
  52. // Example: Assume we have 11 files with following versions: v0, v1, v1, v1, v1, v1, v1, v1, v2, v2, v2 (count = 7 v1, 3 v2)
  53. // client has synced up till version v0.
  54. // case 1: ( sinceTime: v0, limit = 8):
  55. // The method will discard the entries with version v2 and return only 7 entries with version v1.
  56. // case 2: (sinceTime: v0, limit 5):
  57. // Instead of returning 5 entries with version V1, method will return all 7 entries with version v1.
  58. // case 3: (sinceTime: v0, limit 7):
  59. // The method will return all 7 entries with version V1.
  60. // case 4: (sinceTime: v0, limit >=10):
  61. // The method will all 10 entries in the diff
  62. func (t *TrashController) getDiff(userID int64, sinceTime int64, limit int, app ente.App) ([]ente.Trash, bool, error) {
  63. // request for limit +1 files
  64. diffLimitPlusOne, err := t.TrashRepo.GetDiff(userID, sinceTime, limit+1, app)
  65. if err != nil {
  66. return nil, false, stacktrace.Propagate(err, "")
  67. }
  68. if len(diffLimitPlusOne) <= limit {
  69. // case 4: all files changed after sinceTime are included.
  70. return diffLimitPlusOne, false, nil
  71. }
  72. lastFileVersion := diffLimitPlusOne[limit].UpdatedAt
  73. filteredDiffs := t.removeFilesWithVersion(diffLimitPlusOne, lastFileVersion)
  74. if len(filteredDiffs) > 0 { // case 1 or case 3
  75. return filteredDiffs, true, nil
  76. }
  77. // case 2
  78. diff, err := t.TrashRepo.GetFilesWithVersion(userID, lastFileVersion)
  79. if err != nil {
  80. return nil, false, stacktrace.Propagate(err, "")
  81. }
  82. return diff, true, nil
  83. }
  84. // Delete files permanently, queues up the file for deletion & free up the space based on file's object size
  85. func (t *TrashController) Delete(ctx context.Context, request ente.DeleteTrashFilesRequest) error {
  86. err := t.TrashRepo.Delete(ctx, request.OwnerID, request.FileIDs)
  87. if err != nil {
  88. return stacktrace.Propagate(err, "")
  89. }
  90. return nil
  91. }
  92. func (t *TrashController) EmptyTrash(ctx context.Context, userID int64, req ente.EmptyTrashRequest) error {
  93. err := t.TrashRepo.EmptyTrash(ctx, userID, req.LastUpdatedAt)
  94. if err != nil {
  95. return stacktrace.Propagate(err, "")
  96. }
  97. defer t.ProcessEmptyTrashRequests()
  98. return nil
  99. }
  100. func (t *TrashController) CleanupTrashedCollections() {
  101. ctxLogger := log.WithFields(log.Fields{
  102. "flow": "trash_collection",
  103. "id": uuid.New().String(),
  104. })
  105. item_processed_count := 0
  106. if t.collectionTrashRunning {
  107. ctxLogger.Info("Already moving collection to trash, skipping cron")
  108. return
  109. }
  110. t.collectionTrashRunning = true
  111. defer func() {
  112. ctxLogger.WithField("items_processed", item_processed_count).Info("cron run finished")
  113. t.collectionTrashRunning = false
  114. }()
  115. // process delete collection request for DELETE V3
  116. itemsV3, err2 := t.QueueRepo.GetItemsReadyForDeletion(repo.TrashCollectionQueueV3, 100)
  117. if err2 != nil {
  118. log.Error("Could not fetch from collection trash queue", err2)
  119. return
  120. }
  121. item_processed_count += len(itemsV3)
  122. for _, item := range itemsV3 {
  123. t.trashCollection(item, repo.TrashCollectionQueueV3, ctxLogger)
  124. }
  125. }
  126. func (t *TrashController) ProcessEmptyTrashRequests() {
  127. if t.emptyTrashRunning {
  128. log.Info("Already processing empty trash requests, skipping cron")
  129. return
  130. }
  131. t.emptyTrashRunning = true
  132. defer func() {
  133. t.emptyTrashRunning = false
  134. }()
  135. items, err := t.QueueRepo.GetItemsReadyForDeletion(repo.TrashEmptyQueue, 100)
  136. if err != nil {
  137. log.Error("Could not fetch from emptyTrashQueue queue", err)
  138. return
  139. }
  140. for _, item := range items {
  141. t.emptyTrash(item)
  142. }
  143. }
  144. // DeleteAgedTrashedFiles delete trashed files which are in trash since repo.TrashDurationInDays
  145. func (t *TrashController) DeleteAgedTrashedFiles() {
  146. if t.deleteAgedTrashRunning {
  147. log.Info("Already deleting older trashed files, skipping cron")
  148. return
  149. }
  150. t.deleteAgedTrashRunning = true
  151. defer func() {
  152. t.deleteAgedTrashRunning = false
  153. }()
  154. lockName := "DeleteAgedTrashedFiles"
  155. lockStatus, err := t.TaskLockRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), t.HostName)
  156. if err != nil || !lockStatus {
  157. log.Error("Unable to acquire lock to DeleteAgedTrashedFiles")
  158. return
  159. }
  160. defer func() {
  161. releaseErr := t.TaskLockRepo.ReleaseLock(lockName)
  162. if releaseErr != nil {
  163. log.WithError(releaseErr).Error("Error while releasing aged trash lock")
  164. }
  165. }()
  166. userIDToFileMap, err := t.TrashRepo.GetUserIDToFileIDsMapForDeletion()
  167. if err != nil {
  168. log.Error("Could not fetch trashed files for deletion", err)
  169. return
  170. }
  171. for userID, fileIDs := range userIDToFileMap {
  172. ctxLogger := log.WithFields(log.Fields{
  173. "user_id": userID,
  174. "fileIds": fileIDs,
  175. })
  176. ctxLogger.Info("start deleting old files from trash")
  177. err = t.TrashRepo.Delete(context.Background(), userID, fileIDs)
  178. if err != nil {
  179. ctxLogger.WithError(err).Error("failed to delete file from trash")
  180. continue
  181. }
  182. ctxLogger.Info("successfully deleted old files from trash")
  183. }
  184. }
  185. // removeFilesWithVersion returns filtered list of trashedFiles are removing all files with given version.
  186. // Important: The method assumes that trashedFiles are sorted by increasing order of Trash.UpdatedAt
  187. func (t *TrashController) removeFilesWithVersion(trashedFiles []ente.Trash, version int64) []ente.Trash {
  188. var i = len(trashedFiles) - 1
  189. for ; i >= 0; i-- {
  190. if trashedFiles[i].UpdatedAt != version {
  191. // found index (from end) where file's version is different from given version
  192. break
  193. }
  194. }
  195. return trashedFiles[0 : i+1]
  196. }
  197. func (t *TrashController) trashCollection(item repo.QueueItem, queueName string, logger *log.Entry) {
  198. cID, _ := strconv.ParseInt(item.Item, 10, 64)
  199. collection, err := t.CollectionRepo.Get(cID)
  200. if err != nil {
  201. log.Error("Could not fetch collection "+item.Item, err)
  202. return
  203. }
  204. ctxLogger := logger.WithFields(log.Fields{
  205. "collection_id": cID,
  206. "user_id": collection.Owner.ID,
  207. "queue": queueName,
  208. "flow": "trash_collection",
  209. })
  210. // to avoid race conditions while finding exclusive files, lock at user level, instead of individual collection
  211. lockName := fmt.Sprintf("CollectionTrash:%d", collection.Owner.ID)
  212. lockStatus, err := t.TaskLockRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), t.HostName)
  213. if err != nil || !lockStatus {
  214. if err == nil {
  215. ctxLogger.Error("lock is already taken for deleting collection")
  216. } else {
  217. ctxLogger.WithError(err).Error("critical: error while acquiring lock")
  218. }
  219. return
  220. }
  221. defer func() {
  222. releaseErr := t.TaskLockRepo.ReleaseLock(lockName)
  223. if releaseErr != nil {
  224. ctxLogger.WithError(releaseErr).Error("Error while releasing lock")
  225. }
  226. }()
  227. ctxLogger.Info("start trashing collection")
  228. err = t.CollectionRepo.TrashV3(context.Background(), cID)
  229. if err != nil {
  230. ctxLogger.WithError(err).Error("failed to trash collection")
  231. return
  232. }
  233. err = t.QueueRepo.DeleteItem(queueName, item.Item)
  234. if err != nil {
  235. ctxLogger.WithError(err).Error("failed to delete item from queue")
  236. return
  237. }
  238. }
  239. func (t *TrashController) emptyTrash(item repo.QueueItem) {
  240. lockName := fmt.Sprintf("EmptyTrash:%s", item.Item)
  241. lockStatus, err := t.TaskLockRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), t.HostName)
  242. split := strings.Split(item.Item, repo.EmptyTrashQueueItemSeparator)
  243. userID, _ := strconv.ParseInt(split[0], 10, 64)
  244. lastUpdateAt, _ := strconv.ParseInt(split[1], 10, 64)
  245. ctxLogger := log.WithFields(log.Fields{
  246. "user_id": userID,
  247. "lastUpdatedAt": lastUpdateAt,
  248. "flow": "empty_trash",
  249. })
  250. if err != nil || !lockStatus {
  251. if err == nil {
  252. // todo: error only when lock is help for more than X durat
  253. ctxLogger.Error("lock is already taken for emptying trash")
  254. } else {
  255. ctxLogger.WithError(err).Error("critical: error while acquiring lock")
  256. }
  257. return
  258. }
  259. defer func() {
  260. releaseErr := t.TaskLockRepo.ReleaseLock(lockName)
  261. if releaseErr != nil {
  262. log.WithError(releaseErr).Error("Error while releasing lock")
  263. }
  264. }()
  265. ctxLogger.Info("Start emptying trash")
  266. fileIDs, err := t.TrashRepo.GetFilesIDsForDeletion(userID, lastUpdateAt)
  267. if err != nil {
  268. ctxLogger.WithError(err).Error("Failed to fetch fileIDs")
  269. return
  270. }
  271. ctx := context.Background()
  272. size := len(fileIDs)
  273. limit := repo.TrashBatchSize
  274. for lb := 0; lb < size; lb += limit {
  275. ub := lb + limit
  276. if ub > size {
  277. ub = size
  278. }
  279. batch := fileIDs[lb:ub]
  280. err = t.TrashRepo.Delete(ctx, userID, batch)
  281. if err != nil {
  282. ctxLogger.WithField("batchIDs", batch).WithError(err).Error("Failed while deleting batch")
  283. return
  284. }
  285. }
  286. err = t.QueueRepo.DeleteItem(repo.TrashEmptyQueue, item.Item)
  287. if err != nil {
  288. log.Error("Error while removing item from queue "+item.Item, err)
  289. return
  290. }
  291. ctxLogger.Info("Finished emptying trash")
  292. }