trash_file_metadata.go 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package controller
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "github.com/ente-io/museum/pkg/repo"
  7. "github.com/ente-io/museum/pkg/utils/time"
  8. log "github.com/sirupsen/logrus"
  9. )
  10. // DropFileMetadataCron removes the metadata for deleted files
  11. func (t *TrashController) DropFileMetadataCron() {
  12. ctx := context.Background()
  13. lockName := "dropTrashedFileMetadata"
  14. logger := log.WithField("cron", lockName)
  15. if t.dropFileMetadataRunning {
  16. logger.Info("already running")
  17. return
  18. }
  19. t.dropFileMetadataRunning = true
  20. defer func() {
  21. t.dropFileMetadataRunning = false
  22. }()
  23. lockStatus, err := t.TaskLockRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), t.HostName)
  24. if err != nil || !lockStatus {
  25. logger.Error("Unable to acquire lock")
  26. return
  27. }
  28. defer func() {
  29. releaseErr := t.TaskLockRepo.ReleaseLock(lockName)
  30. if releaseErr != nil {
  31. logger.WithError(releaseErr).Error("Error while releasing lock")
  32. }
  33. }()
  34. items, err := t.QueueRepo.GetItemsReadyForDeletion(repo.DropFileEncMedataQueue, 10)
  35. if err != nil {
  36. logger.WithError(err).Error("getItemsReadyForDeletion failed")
  37. return
  38. }
  39. if len(items) == 0 {
  40. logger.Info("add entry for dropping fileMetadata")
  41. // insert entry with 0 as the last epochTime till when metadata is dropped.
  42. err = t.QueueRepo.InsertItem(context.Background(), repo.DropFileEncMedataQueue, "0")
  43. if err != nil {
  44. logger.WithError(err).Error("failed to insert entry")
  45. }
  46. return
  47. }
  48. if len(items) > 1 {
  49. logger.Error(fmt.Sprintf("queue %s should not have more than one entry", repo.DropFileEncMedataQueue))
  50. }
  51. qItem := items[0]
  52. droppedMetadataTill, parseErr := strconv.ParseInt(qItem.Item, 10, 64)
  53. if parseErr != nil {
  54. logger.WithError(parseErr).Error("failed to parse time")
  55. return
  56. }
  57. fileIDsWithUpdatedAt, err := t.TrashRepo.GetFileIdsForDroppingMetadata(droppedMetadataTill)
  58. if err != nil {
  59. logger.Error("error during next items fetch", err)
  60. return
  61. }
  62. if len(fileIDsWithUpdatedAt) == 0 {
  63. logger.Info("no pending entry")
  64. return
  65. }
  66. var maxUpdatedAt = int64(0)
  67. fileIDs := make([]int64, 0)
  68. for _, item := range fileIDsWithUpdatedAt {
  69. fileIDs = append(fileIDs, item.FileID)
  70. if item.UpdatedAt > maxUpdatedAt {
  71. maxUpdatedAt = item.UpdatedAt
  72. }
  73. }
  74. ctxLogger := logger.WithFields(log.Fields{
  75. "maxUpdatedAt": maxUpdatedAt,
  76. "fileIds": fileIDs,
  77. })
  78. ctxLogger.Info("start dropping metadata")
  79. err = t.FileRepo.DropFilesMetadata(ctx, fileIDs)
  80. if err != nil {
  81. ctxLogger.WithError(err).Error("failed to scrub data")
  82. return
  83. }
  84. updateErr := t.QueueRepo.UpdateItem(ctx, repo.DropFileEncMedataQueue, qItem.Id, strconv.FormatInt(maxUpdatedAt, 10))
  85. if updateErr != nil {
  86. ctxLogger.WithError(updateErr).Error("failed to update queueItem")
  87. return
  88. }
  89. ctxLogger.Info("successfully dropped metadata")
  90. }