delete.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package embedding
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/ente-io/museum/pkg/repo"
  6. "github.com/ente-io/museum/pkg/utils/auth"
  7. "github.com/ente-io/museum/pkg/utils/time"
  8. "github.com/ente-io/stacktrace"
  9. "github.com/gin-gonic/gin"
  10. log "github.com/sirupsen/logrus"
  11. "strconv"
  12. )
  13. func (c *Controller) DeleteAll(ctx *gin.Context) error {
  14. userID := auth.GetUserID(ctx.Request.Header)
  15. err := c.Repo.DeleteAll(ctx, userID)
  16. if err != nil {
  17. return stacktrace.Propagate(err, "")
  18. }
  19. return nil
  20. }
  21. // CleanupDeletedEmbeddings clears all embeddings for deleted files from the object store
  22. func (c *Controller) CleanupDeletedEmbeddings() {
  23. log.Info("Cleaning up deleted embeddings")
  24. if c.cleanupCronRunning {
  25. log.Info("Skipping CleanupDeletedEmbeddings cron run as another instance is still running")
  26. return
  27. }
  28. c.cleanupCronRunning = true
  29. defer func() {
  30. c.cleanupCronRunning = false
  31. }()
  32. items, err := c.QueueRepo.GetItemsReadyForDeletion(repo.DeleteEmbeddingsQueue, 200)
  33. if err != nil {
  34. log.WithError(err).Error("Failed to fetch items from queue")
  35. return
  36. }
  37. for _, i := range items {
  38. c.deleteEmbedding(i)
  39. }
  40. }
  41. func (c *Controller) deleteEmbedding(qItem repo.QueueItem) {
  42. lockName := fmt.Sprintf("Embedding:%s", qItem.Item)
  43. lockStatus, err := c.TaskLockingRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), c.HostName)
  44. ctxLogger := log.WithField("item", qItem.Item).WithField("queue_id", qItem.Id)
  45. if err != nil || !lockStatus {
  46. ctxLogger.Warn("unable to acquire lock")
  47. return
  48. }
  49. defer func() {
  50. err = c.TaskLockingRepo.ReleaseLock(lockName)
  51. if err != nil {
  52. ctxLogger.Errorf("Error while releasing lock %s", err)
  53. }
  54. }()
  55. ctxLogger.Info("Deleting all embeddings")
  56. fileID, _ := strconv.ParseInt(qItem.Item, 10, 64)
  57. ownerID, err := c.FileRepo.GetOwnerID(fileID)
  58. if err != nil {
  59. ctxLogger.WithError(err).Error("Failed to fetch ownerID")
  60. return
  61. }
  62. prefix := c.getEmbeddingObjectPrefix(ownerID, fileID)
  63. datacenters, err := c.Repo.GetDatacenters(context.Background(), fileID)
  64. if err != nil {
  65. ctxLogger.WithError(err).Error("Failed to fetch datacenters")
  66. return
  67. }
  68. ctxLogger.Infof("Deleting from all datacenters %v", datacenters)
  69. for i := range datacenters {
  70. dc := datacenters[i]
  71. err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, dc)
  72. if err != nil {
  73. ctxLogger.WithError(err).
  74. WithField("dc", dc).
  75. Errorf("Failed to delete all objects from %s", datacenters[i])
  76. return
  77. } else {
  78. removeErr := c.Repo.RemoveDatacenter(context.Background(), fileID, datacenters[i])
  79. if removeErr != nil {
  80. ctxLogger.WithError(removeErr).
  81. WithField("dc", dc).
  82. Error("Failed to remove datacenter from db")
  83. return
  84. }
  85. }
  86. }
  87. noDcs, noDcErr := c.Repo.GetDatacenters(context.Background(), fileID)
  88. if len(noDcs) > 0 || noDcErr != nil {
  89. ctxLogger.Errorf("Failed to delete from all datacenters %s", noDcs)
  90. return
  91. }
  92. err = c.Repo.Delete(fileID)
  93. if err != nil {
  94. ctxLogger.WithError(err).Error("Failed to remove from db")
  95. return
  96. }
  97. err = c.QueueRepo.DeleteItem(repo.DeleteEmbeddingsQueue, qItem.Item)
  98. if err != nil {
  99. ctxLogger.WithError(err).Error("Failed to remove item from the queue")
  100. return
  101. }
  102. ctxLogger.Info("Successfully deleted all embeddings")
  103. }