queue.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package repo
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "github.com/sirupsen/logrus"
  7. "strconv"
  8. "strings"
  9. "github.com/ente-io/museum/pkg/utils/time"
  10. "github.com/ente-io/stacktrace"
  11. )
  12. // QueueRepository defines methods to insert, delete items from queue
  13. type QueueRepository struct {
  14. DB *sql.DB
  15. }
  16. // itemDeletionDelayInMinMap tracks the delay (in min) after which an item is ready to be processed.
  17. // -ve entry indicates that the item should be processed immediately, without any delay.
  18. var itemDeletionDelayInMinMap = map[string]int64{
  19. DropFileEncMedataQueue: -1 * 24 * 60, // -ve value to ensure attributes are immediately removed
  20. DeleteObjectQueue: 45 * 24 * 60, // 45 days in minutes
  21. DeleteEmbeddingsQueue: -1 * 24 * 60, // -ve value to ensure embeddings are immediately removed
  22. TrashCollectionQueueV3: -1 * 24 * 60, // -ve value to ensure collections are immediately marked as trashed
  23. TrashEmptyQueue: -1 * 24 * 60, // -ve value to ensure empty trash request are processed in next cron run
  24. RemoveComplianceHoldQueue: -1 * 24 * 60, // -ve value to ensure compliance hold is removed in next cron run
  25. }
  26. const (
  27. DropFileEncMedataQueue string = "dropFileEncMetata"
  28. DeleteObjectQueue string = "deleteObject"
  29. DeleteEmbeddingsQueue string = "deleteEmbedding"
  30. OutdatedObjectsQueue string = "outdatedObject"
  31. // Deprecated: Keeping it till we clean up items from the queue DB.
  32. TrashCollectionQueue string = "trashCollection"
  33. TrashCollectionQueueV3 string = "trashCollectionV3"
  34. TrashEmptyQueue string = "trashEmpty"
  35. RemoveComplianceHoldQueue string = "removeComplianceHold"
  36. BatchSize int = 30000
  37. )
  38. type QueueItem struct {
  39. Id int64
  40. Item string
  41. }
  42. // InsertItem adds entry in the queue with given queueName and item. If entry already exists, it's no-op
  43. func (repo *QueueRepository) InsertItem(ctx context.Context, queueName string, item string) error {
  44. _, err := repo.DB.ExecContext(ctx, `INSERT INTO queue(queue_name, item) VALUES($1, $2)
  45. ON CONFLICT (queue_name, item) DO NOTHING`, queueName, item)
  46. if err != nil {
  47. return stacktrace.Propagate(err, "")
  48. }
  49. return nil
  50. }
  51. func (repo *QueueRepository) UpdateItem(ctx context.Context, queueName string, queueID int64, item string) error {
  52. rows, err := repo.DB.ExecContext(ctx, `UPDATE queue SET item = $1 WHERE queue_name = $2 AND queue_id = $3 AND is_deleted=false`, item, queueName, queueID)
  53. if err != nil {
  54. return stacktrace.Propagate(err, "")
  55. }
  56. count, err := rows.RowsAffected()
  57. if err != nil {
  58. return stacktrace.Propagate(err, "")
  59. }
  60. if count == 0 {
  61. return fmt.Errorf("no item found with queueID: %d for queue %s", queueID, queueName)
  62. }
  63. return nil
  64. }
  65. func (repo *QueueRepository) RequeueItem(ctx context.Context, queueName string, queueID int64) error {
  66. rows, err := repo.DB.ExecContext(ctx, `UPDATE queue SET is_deleted = false WHERE queue_name = $1 AND queue_id = $2`, queueName, queueID)
  67. if err != nil {
  68. return stacktrace.Propagate(err, "")
  69. }
  70. count, err := rows.RowsAffected()
  71. if err != nil {
  72. return stacktrace.Propagate(err, "")
  73. }
  74. if count == 0 {
  75. return fmt.Errorf("no item found with queueID: %d for queue %s", queueID, queueName)
  76. }
  77. logrus.Infof("Re-queued %d item with queueID: %d for queue %s", count, queueID, queueName)
  78. return nil
  79. }
  80. // AddItems adds a list of item against a specified queue
  81. func (repo *QueueRepository) AddItems(ctx context.Context, tx *sql.Tx, queueName string, items []string) error {
  82. if len(items) == 0 {
  83. return nil
  84. }
  85. lb := 0
  86. size := len(items)
  87. for lb < size {
  88. ub := lb + BatchSize
  89. if ub > size {
  90. ub = size
  91. }
  92. slicedList := items[lb:ub]
  93. query := "INSERT INTO queue(queue_name, item) VALUES "
  94. var inserts []string
  95. var params []interface{}
  96. for i, v := range slicedList {
  97. inserts = append(inserts, `($`+strconv.Itoa(2*i+1)+`,$`+strconv.Itoa(2*i+2)+`)`)
  98. params = append(params, queueName, v)
  99. }
  100. queryVals := strings.Join(inserts, ",")
  101. query = query + queryVals
  102. query = query + " ON CONFLICT (queue_name, item) DO NOTHING"
  103. _, err := tx.ExecContext(ctx, query, params...)
  104. if err != nil {
  105. return stacktrace.Propagate(err, "")
  106. }
  107. lb += BatchSize
  108. }
  109. return nil
  110. }
  111. func (repo *QueueRepository) DeleteItem(queueName string, item string) error {
  112. _, err := repo.DB.Exec(`UPDATE queue SET is_deleted = $1 WHERE queue_name = $2 AND item=$3`, true, queueName, item)
  113. return stacktrace.Propagate(err, "")
  114. }
  115. // GetItemsReadyForDeletion method, for a given queue name, returns a list of QueueItem which are ready for deletion
  116. func (repo *QueueRepository) GetItemsReadyForDeletion(queueName string, count int) ([]QueueItem, error) {
  117. delayInMin, ok := itemDeletionDelayInMinMap[queueName]
  118. if !ok {
  119. return nil, stacktrace.Propagate(fmt.Errorf("missing delay for %s", queueName), "")
  120. }
  121. rows, err := repo.DB.Query(`SELECT queue_id, item FROM queue WHERE
  122. queue_name=$1 and created_at <= $2 and is_deleted = false order by created_at ASC LIMIT $3`,
  123. queueName, time.MicrosecondsBeforeMinutes(delayInMin), count)
  124. if err != nil {
  125. return nil, stacktrace.Propagate(err, "")
  126. }
  127. defer rows.Close()
  128. items := make([]QueueItem, 0)
  129. for rows.Next() {
  130. var item QueueItem
  131. err = rows.Scan(&item.Id, &item.Item)
  132. if err != nil {
  133. return items, stacktrace.Propagate(err, "")
  134. }
  135. items = append(items, item)
  136. }
  137. return items, stacktrace.Propagate(err, "")
  138. }