delete.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package embedding
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/ente-io/museum/pkg/repo"
  6. "github.com/ente-io/museum/pkg/utils/auth"
  7. "github.com/ente-io/museum/pkg/utils/time"
  8. "github.com/ente-io/stacktrace"
  9. "github.com/gin-gonic/gin"
  10. log "github.com/sirupsen/logrus"
  11. "strconv"
  12. )
  13. func (c *Controller) DeleteAll(ctx *gin.Context) error {
  14. userID := auth.GetUserID(ctx.Request.Header)
  15. err := c.Repo.DeleteAll(ctx, userID)
  16. if err != nil {
  17. return stacktrace.Propagate(err, "")
  18. }
  19. return nil
  20. }
  21. // CleanupDeletedEmbeddings clears all embeddings for deleted files from the object store
  22. func (c *Controller) CleanupDeletedEmbeddings() {
  23. log.Info("Cleaning up deleted embeddings")
  24. if c.cleanupCronRunning {
  25. log.Info("Skipping CleanupDeletedEmbeddings cron run as another instance is still running")
  26. return
  27. }
  28. c.cleanupCronRunning = true
  29. defer func() {
  30. c.cleanupCronRunning = false
  31. }()
  32. items, err := c.QueueRepo.GetItemsReadyForDeletion(repo.DeleteEmbeddingsQueue, 200)
  33. if err != nil {
  34. log.WithError(err).Error("Failed to fetch items from queue")
  35. return
  36. }
  37. for _, i := range items {
  38. c.deleteEmbedding(i)
  39. }
  40. }
  41. func (c *Controller) deleteEmbedding(qItem repo.QueueItem) {
  42. lockName := fmt.Sprintf("Embedding:%s", qItem.Item)
  43. lockStatus, err := c.TaskLockingRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), c.HostName)
  44. ctxLogger := log.WithField("item", qItem.Item).WithField("queue_id", qItem.Id)
  45. if err != nil || !lockStatus {
  46. ctxLogger.Warn("unable to acquire lock")
  47. return
  48. }
  49. defer func() {
  50. err = c.TaskLockingRepo.ReleaseLock(lockName)
  51. if err != nil {
  52. ctxLogger.Errorf("Error while releasing lock %s", err)
  53. }
  54. }()
  55. ctxLogger.Info("Deleting all embeddings")
  56. fileID, _ := strconv.ParseInt(qItem.Item, 10, 64)
  57. ownerID, err := c.FileRepo.GetOwnerID(fileID)
  58. if err != nil {
  59. ctxLogger.WithError(err).Error("Failed to fetch ownerID")
  60. return
  61. }
  62. prefix := c.getEmbeddingObjectPrefix(ownerID, fileID)
  63. datacenters, err := c.Repo.GetDatacenters(context.Background(), fileID)
  64. if err != nil {
  65. ctxLogger.WithError(err).Error("Failed to fetch datacenters")
  66. return
  67. }
  68. // Ensure that the object are deleted from active derived storage dc. Ideally, this section should never be executed
  69. // unless there's a bug in storing the DC or the service restarts before removing the rows from the table
  70. // todo:(neeraj): remove this section after a few weeks of deployment
  71. if len(datacenters) == 0 {
  72. ctxLogger.Warn("No datacenters found for file, ensuring deletion from derived storage and hot DC")
  73. err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetDerivedStorageDataCenter())
  74. if err != nil {
  75. ctxLogger.WithError(err).Error("Failed to delete all objects")
  76. return
  77. }
  78. // if Derived DC is different from hot DC, delete from hot DC as well
  79. if c.derivedStorageDataCenter != c.S3Config.GetHotDataCenter() {
  80. err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter())
  81. if err != nil {
  82. ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC")
  83. return
  84. }
  85. }
  86. } else {
  87. ctxLogger.Infof("Deleting from all datacenters %v", datacenters)
  88. }
  89. for i := range datacenters {
  90. err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, datacenters[i])
  91. if err != nil {
  92. ctxLogger.WithError(err).Errorf("Failed to delete all objects from %s", datacenters[i])
  93. return
  94. } else {
  95. removeErr := c.Repo.RemoveDatacenter(context.Background(), fileID, datacenters[i])
  96. if removeErr != nil {
  97. ctxLogger.WithError(removeErr).Error("Failed to remove datacenter from db")
  98. return
  99. }
  100. }
  101. }
  102. noDcs, noDcErr := c.Repo.GetDatacenters(context.Background(), fileID)
  103. if len(noDcs) > 0 || noDcErr != nil {
  104. ctxLogger.Errorf("Failed to delete from all datacenters %s", noDcs)
  105. return
  106. }
  107. err = c.Repo.Delete(fileID)
  108. if err != nil {
  109. ctxLogger.WithError(err).Error("Failed to remove from db")
  110. return
  111. }
  112. err = c.QueueRepo.DeleteItem(repo.DeleteEmbeddingsQueue, qItem.Item)
  113. if err != nil {
  114. ctxLogger.WithError(err).Error("Failed to remove item from the queue")
  115. return
  116. }
  117. ctxLogger.Info("Successfully deleted all embeddings")
  118. }