Ver código fonte

[server] Add API to re-queue item for processing (#679)

## Description

## Tests
Neeraj Gupta 1 ano atrás
pai
commit
34743171f9

+ 2 - 0
server/cmd/museum/main.go

@@ -587,6 +587,7 @@ func main() {
 	privateAPI.POST("/storage-bonus/referral-claim", storageBonusHandler.ClaimReferral)
 
 	adminHandler := &api.AdminHandler{
+		QueueRepo:               queueRepo,
 		UserRepo:                userRepo,
 		CollectionRepo:          collectionRepo,
 		UserAuthRepo:            userAuthRepo,
@@ -615,6 +616,7 @@ func main() {
 	adminAPI.GET("/email-hash", adminHandler.GetEmailHash)
 	adminAPI.POST("/emails-from-hashes", adminHandler.GetEmailsFromHashes)
 	adminAPI.PUT("/user/subscription", adminHandler.UpdateSubscription)
+	adminAPI.POST("/queue/re-queue", adminHandler.ReQueueItem)
 	adminAPI.POST("/user/bf-2013", adminHandler.UpdateBFDeal)
 	adminAPI.POST("/job/clear-orphan-objects", adminHandler.ClearOrphanObjects)
 

+ 6 - 0
server/ente/admin.go

@@ -22,6 +22,12 @@ type AdminOpsForUserRequest struct {
 	UserID int64 `json:"userID" binding:"required"`
 }
 
+// ReQueueItemRequest puts an item back into the queue for processing.
+type ReQueueItemRequest struct {
+	ID        int64  `json:"id" binding:"required"`
+	QueueName string `json:"queueName" binding:"required"`
+}
+
 // RecoverAccount is used to recover accounts which are in soft-delete state.
 type RecoverAccountRequest struct {
 	UserID  int64  `json:"userID" binding:"required"`

+ 19 - 0
server/pkg/api/admin.go

@@ -33,6 +33,7 @@ import (
 
 // AdminHandler exposes request handlers for all admin related requests
 type AdminHandler struct {
+	QueueRepo               *repo.QueueRepository
 	UserRepo                *repo.UserRepository
 	CollectionRepo          *repo.CollectionRepository
 	UserAuthRepo            *repo.UserAuthRepository
@@ -305,6 +306,24 @@ func (h *AdminHandler) UpdateSubscription(c *gin.Context) {
 	c.JSON(http.StatusOK, gin.H{})
 }
 
+func (h *AdminHandler) ReQueueItem(c *gin.Context) {
+	var r ente.ReQueueItemRequest
+	if err := c.ShouldBindJSON(&r); err != nil {
+		handler.Error(c, stacktrace.Propagate(ente.ErrBadRequest, "Bad request"))
+		return
+	}
+	adminID := auth.GetUserID(c.Request.Header)
+	go h.DiscordController.NotifyAdminAction(
+		fmt.Sprintf("Admin (%d) requeueing item %d for queue: %s", adminID, r.ID, r.QueueName))
+	err := h.QueueRepo.RequeueItem(c, r.QueueName, r.ID)
+	if err != nil {
+		logrus.WithError(err).Error("Failed to re-queue item")
+		handler.Error(c, stacktrace.Propagate(err, ""))
+		return
+	}
+	c.JSON(http.StatusOK, gin.H{})
+}
+
 func (h *AdminHandler) UpdateBFDeal(c *gin.Context) {
 	var r ente.UpdateBlackFridayDeal
 	if err := c.ShouldBindJSON(&r); err != nil {

+ 17 - 0
server/pkg/repo/queue.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"database/sql"
 	"fmt"
+	"github.com/sirupsen/logrus"
 	"strconv"
 	"strings"
 
@@ -70,6 +71,22 @@ func (repo *QueueRepository) UpdateItem(ctx context.Context, queueName string, q
 	return nil
 }
 
+func (repo *QueueRepository) RequeueItem(ctx context.Context, queueName string, queueID int64) error {
+	rows, err := repo.DB.ExecContext(ctx, `UPDATE queue SET is_deleted = false WHERE queue_name = $1 AND queue_id = $2`, queueName, queueID)
+	if err != nil {
+		return stacktrace.Propagate(err, "")
+	}
+	count, err := rows.RowsAffected()
+	if err != nil {
+		return stacktrace.Propagate(err, "")
+	}
+	if count == 0 {
+		return fmt.Errorf("no item found with queueID: %d for queue %s", queueID, queueName)
+	}
+	logrus.Info("Re-queued %d item with queueID: %d for queue %s", count, queueID, queueName)
+	return nil
+}
+
 // AddItems adds a list of item against a specified queue
 func (repo *QueueRepository) AddItems(ctx context.Context, tx *sql.Tx, queueName string, items []string) error {
 	if len(items) == 0 {