controller.go 6.9 KB


  1. package data_cleanup
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/ente-io/museum/ente"
  7. entity "github.com/ente-io/museum/ente/data_cleanup"
  8. "github.com/ente-io/museum/pkg/repo"
  9. "github.com/ente-io/museum/pkg/repo/datacleanup"
  10. "github.com/ente-io/museum/pkg/utils/time"
  11. "github.com/ente-io/stacktrace"
  12. log "github.com/sirupsen/logrus"
  13. )
  14. type DeleteUserCleanupController struct {
  15. Repo *datacleanup.Repository
  16. UserRepo *repo.UserRepository
  17. CollectionRepo *repo.CollectionRepository
  18. TaskLockRepo *repo.TaskLockRepository
  19. TrashRepo *repo.TrashRepository
  20. UsageRepo *repo.UsageRepository
  21. running bool
  22. HostName string
  23. }
  24. const (
  25. // nextStageDelayInHoursOnError is number of afters after which next attempt should be made to process
  26. // current stage.
  27. nextStageDelayInHoursOnError = 2
  28. // maximum number of storage check attempt before moving to the next stage.
  29. maxStorageCheckAttempt = 10
  30. )
  31. // DeleteDataCron delete trashed files which are in trash since repo.TrashDurationInDays
  32. func (c *DeleteUserCleanupController) DeleteDataCron() {
  33. if c.running {
  34. log.Info("Already running DeleteDataCron, skipping cron")
  35. return
  36. }
  37. c.running = true
  38. defer func() {
  39. c.running = false
  40. }()
  41. ctx := context.Background()
  42. items, err := c.Repo.GetItemsPendingCompletion(ctx, 100)
  43. if err != nil {
  44. log.WithError(err).Info("Failed to get items for cleanup")
  45. return
  46. }
  47. if len(items) > 0 {
  48. log.WithField("count", len(items)).Info("Found pending items")
  49. for _, item := range items {
  50. c.deleteUserData(ctx, item)
  51. }
  52. }
  53. }
  54. func (c *DeleteUserCleanupController) deleteUserData(ctx context.Context, item *entity.DataCleanup) {
  55. logger := log.WithFields(log.Fields{
  56. "user_id": item.UserID,
  57. "stage": item.Stage,
  58. "attempt_count": item.StageAttemptCount,
  59. "flow": "delete_user_data",
  60. })
  61. lockName := fmt.Sprintf("delete_user_data-%d", item.UserID)
  62. lockStatus, err := c.TaskLockRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), c.HostName)
  63. if err != nil || !lockStatus {
  64. if err != nil {
  65. logger.Error("error while acquiring lock")
  66. } else {
  67. logger.Warn("lock is already head by another instance")
  68. }
  69. return
  70. }
  71. defer func() {
  72. releaseErr := c.TaskLockRepo.ReleaseLock(lockName)
  73. if releaseErr != nil {
  74. logger.WithError(releaseErr).Error("Error while releasing lock")
  75. }
  76. }()
  77. logger.Info(fmt.Sprintf("Delete data for stage %s", item.Stage))
  78. switch item.Stage {
  79. case entity.Scheduled:
  80. err = c.startCleanup(ctx, item)
  81. case entity.Collection:
  82. err = c.deleteCollections(ctx, item)
  83. case entity.Trash:
  84. err = c.emptyTrash(ctx, item)
  85. case entity.Storage:
  86. err = c.storageCheck(ctx, item)
  87. default:
  88. err = fmt.Errorf("unexpected stage %s", item.Stage)
  89. }
  90. if err != nil {
  91. logger.WithError(err).Error("error while processing data deletion")
  92. err2 := c.Repo.ScheduleNextAttemptAfterNHours(ctx, item.UserID, nextStageDelayInHoursOnError)
  93. if err2 != nil {
  94. logger.Error(err)
  95. return
  96. }
  97. }
  98. }
  99. // startClean up will just verify that user
  100. func (c *DeleteUserCleanupController) startCleanup(ctx context.Context, item *entity.DataCleanup) error {
  101. if err := c.isDeleted(item); err != nil {
  102. return stacktrace.Propagate(err, "")
  103. }
  104. // move to next stage for deleting collection
  105. return c.Repo.MoveToNextStage(ctx, item.UserID, entity.Collection, time.Microseconds())
  106. }
  107. // deleteCollection will schedule all the collections for deletion and queue up Trash stage to run after 30 min
  108. func (c *DeleteUserCleanupController) deleteCollections(ctx context.Context, item *entity.DataCleanup) error {
  109. collectionsMap, err := c.CollectionRepo.GetCollectionIDsOwnedByUser(item.UserID)
  110. if err != nil {
  111. return stacktrace.Propagate(err, "")
  112. }
  113. for collectionID, isAlreadyDeleted := range collectionsMap {
  114. if !isAlreadyDeleted {
  115. // Delete all files in the collection
  116. err = c.CollectionRepo.ScheduleDelete(collectionID)
  117. if err != nil {
  118. return stacktrace.Propagate(err, fmt.Sprintf("error while deleting collection %d", collectionID))
  119. }
  120. }
  121. }
  122. /* todo: neeraj : verify that all collection delete request are processed before moving to empty trash stage.
  123. */
  124. return c.Repo.MoveToNextStage(ctx, item.UserID, entity.Trash, time.MicrosecondsAfterMinutes(60))
  125. }
  126. func (c *DeleteUserCleanupController) emptyTrash(ctx context.Context, item *entity.DataCleanup) error {
  127. err := c.TrashRepo.EmptyTrash(ctx, item.UserID, time.Microseconds())
  128. if err != nil {
  129. return stacktrace.Propagate(err, "")
  130. }
  131. // schedule storage consumed check for the user after 60min. Trash should ideally get emptied after 60 min
  132. return c.Repo.MoveToNextStage(ctx, item.UserID, entity.Storage, time.MicrosecondsAfterMinutes(60))
  133. }
  134. func (c *DeleteUserCleanupController) completeCleanup(ctx context.Context, item *entity.DataCleanup) error {
  135. err := c.Repo.DeleteTableData(ctx, item.UserID)
  136. if err != nil {
  137. return stacktrace.Propagate(err, "failed to delete table data for user")
  138. }
  139. return c.Repo.MoveToNextStage(ctx, item.UserID, entity.Completed, time.Microseconds())
  140. }
  141. // storageCheck validates that user's usage is zero after all collections are deleted and trashed files are processed.
  142. // This check act as another data-integrity check for our db. If even after multiple attempts, storage is still not zero
  143. // we mark the clean-up as done.
  144. func (c *DeleteUserCleanupController) storageCheck(ctx context.Context, item *entity.DataCleanup) error {
  145. usage, err := c.UsageRepo.GetUsage(item.UserID)
  146. if err != nil {
  147. return stacktrace.Propagate(err, "")
  148. }
  149. if usage != 0 {
  150. // check if trash still has entry
  151. timeStamp, err2 := c.TrashRepo.GetTimeStampForLatestNonDeletedEntry(item.UserID)
  152. if err2 != nil {
  153. return stacktrace.Propagate(err2, "failed to fetch timestamp")
  154. }
  155. // no entry in trash
  156. if timeStamp != nil {
  157. log.WithFields(log.Fields{
  158. "user_id": item.UserID,
  159. "flow": "delete_user_data",
  160. "timeStamp": timeStamp,
  161. }).Info("trash is not empty")
  162. err = c.TrashRepo.EmptyTrash(ctx, item.UserID, *timeStamp)
  163. if err != nil {
  164. return stacktrace.Propagate(err, "")
  165. }
  166. } else if item.StageAttemptCount >= maxStorageCheckAttempt {
  167. // Note: if storage is still not zero after maxStorageCheckAttempt attempts and trash is empty, mark the clean-up as done
  168. return c.completeCleanup(ctx, item)
  169. }
  170. return fmt.Errorf("storage consumed is not zero: %d", usage)
  171. }
  172. return c.completeCleanup(ctx, item)
  173. }
  174. func (c *DeleteUserCleanupController) isDeleted(item *entity.DataCleanup) error {
  175. u, err := c.UserRepo.Get(item.UserID)
  176. if err == nil {
  177. // user is not deleted, double check by verifying email is not empty
  178. if u.Email != "" {
  179. remErr := c.Repo.RemoveScheduledDelete(context.Background(), item.UserID)
  180. if remErr != nil {
  181. return stacktrace.Propagate(remErr, "failed to remove scheduled delete entry")
  182. }
  183. }
  184. return stacktrace.Propagate(ente.NewBadRequestWithMessage("User ID is linked to undeleted account"), "")
  185. }
  186. if !errors.Is(err, ente.ErrUserDeleted) {
  187. return stacktrace.Propagate(err, "error while getting the user")
  188. }
  189. return nil
  190. }