Delete derived data from all datacenters

This commit is contained in:
Neeraj Gupta 2024-05-16 11:33:01 +05:30
parent 64ecdfa153
commit 6e204d828c
2 changed files with 71 additions and 9 deletions

View file

@ -264,33 +264,62 @@ func (c *Controller) deleteEmbedding(qItem repo.QueueItem) {
return
}
prefix := c.getEmbeddingObjectPrefix(ownerID, fileID)
err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetDerivedStorageDataCenter())
datacenters, err := c.Repo.GetDatacenters(context.Background(), fileID)
if err != nil {
ctxLogger.WithError(err).Error("Failed to delete all objects")
ctxLogger.WithError(err).Error("Failed to fetch datacenters")
return
}
// if Embeddings DC is different from hot DC, delete from hot DC as well
if !c.areDerivedAndHotBucketSame {
err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter())
// Ensure that the object are deleted from active derived storage dc. Ideally, this section should never be executed
// unless there's a bug in storing the DC or the service restarts before removing the rows from the table
// todo:(neeraj): remove this section after a few weeks of deployment
if len(datacenters) == 0 {
ctxLogger.Warn("No datacenters found for file, ensuring deletion from derived storage and hot DC")
err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetDerivedStorageDataCenter())
if err != nil {
ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC")
ctxLogger.WithError(err).Error("Failed to delete all objects")
return
}
// if Embeddings DC is different from hot DC, delete from hot DC as well
if !c.areDerivedAndHotBucketSame {
err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter())
if err != nil {
ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC")
return
}
}
} else {
ctxLogger.Info("Deleting from all datacenters %v", datacenters)
}
for i := range datacenters {
err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, datacenters[i])
if err != nil {
ctxLogger.WithError(err).Errorf("Failed to delete all objects from %s", datacenters[i])
return
} else {
removeErr := c.Repo.RemoveDatacenter(context.Background(), fileID, datacenters[i])
if removeErr != nil {
ctxLogger.WithError(removeErr).Error("Failed to remove datacenter from db")
return
}
}
}
noDcs, noDcErr := c.Repo.GetDatacenters(context.Background(), fileID)
if len(noDcs) > 0 || noDcErr != nil {
ctxLogger.Errorf("Failed to delete from all datacenters %s", noDcs)
return
}
err = c.Repo.Delete(fileID)
if err != nil {
ctxLogger.WithError(err).Error("Failed to remove from db")
return
}
err = c.QueueRepo.DeleteItem(repo.DeleteEmbeddingsQueue, qItem.Item)
if err != nil {
ctxLogger.WithError(err).Error("Failed to remove item from the queue")
return
}
ctxLogger.Info("Successfully deleted all embeddings")
}

View file

@ -93,6 +93,39 @@ func (r *Repository) Delete(fileID int64) error {
return nil
}
// GetDatacenters returns unique list of datacenters where derived embeddings are stored
func (r *Repository) GetDatacenters(ctx context.Context, fileID int64) ([]string, error) {
rows, err := r.DB.QueryContext(ctx, `SELECT datacenters FROM embeddings WHERE file_id = $1`, fileID)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
uniqueDatacenters := make(map[string]struct{})
for rows.Next() {
var datacenters []string
err = rows.Scan(pq.Array(&datacenters))
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
for _, dc := range datacenters {
uniqueDatacenters[dc] = struct{}{}
}
}
datacenters := make([]string, 0, len(uniqueDatacenters))
for dc := range uniqueDatacenters {
datacenters = append(datacenters, dc)
}
return datacenters, nil
}
// RemoveDatacenter removes the given datacenter from the list of datacenters
func (r *Repository) RemoveDatacenter(ctx context.Context, fileID int64, dc string) error {
_, err := r.DB.ExecContext(ctx, `UPDATE embeddings SET datacenters = array_remove(datacenters, $1) WHERE file_id = $2`, dc, fileID)
if err != nil {
return stacktrace.Propagate(err, "")
}
return nil
}
func convertRowsToEmbeddings(rows *sql.Rows) ([]ente.Embedding, error) {
defer func() {
if err := rows.Close(); err != nil {