Query DB to get fallback DC
This commit is contained in:
parent
e33d85412c
commit
89b01f0a39
3 changed files with 61 additions and 16 deletions
|
@ -61,7 +61,7 @@ type Controller struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanupController *controller.ObjectCleanupController, s3Config *s3config.S3Config, queueRepo *repo.QueueRepository, taskLockingRepo *repo.TaskLockRepository, fileRepo *repo.FileRepository, collectionRepo *repo.CollectionRepository, hostName string) *Controller {
|
func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanupController *controller.ObjectCleanupController, s3Config *s3config.S3Config, queueRepo *repo.QueueRepository, taskLockingRepo *repo.TaskLockRepository, fileRepo *repo.FileRepository, collectionRepo *repo.CollectionRepository, hostName string) *Controller {
|
||||||
embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetDerivedStorageDataCenter()}
|
embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetWasabiDerivedDC(), s3Config.GetDerivedStorageDataCenter()}
|
||||||
cache := make(map[string]*s3manager.Downloader, len(embeddingDcs))
|
cache := make(map[string]*s3manager.Downloader, len(embeddingDcs))
|
||||||
for i := range embeddingDcs {
|
for i := range embeddingDcs {
|
||||||
s3Client := s3Config.GetS3Client(embeddingDcs[i])
|
s3Client := s3Config.GetS3Client(embeddingDcs[i])
|
||||||
|
@ -369,22 +369,37 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d
|
||||||
// check if the error is due to object not found
|
// check if the error is due to object not found
|
||||||
if s3Err, ok := err.(awserr.RequestFailure); ok {
|
if s3Err, ok := err.(awserr.RequestFailure); ok {
|
||||||
if s3Err.Code() == s3.ErrCodeNoSuchKey {
|
if s3Err.Code() == s3.ErrCodeNoSuchKey {
|
||||||
|
var srcDc, destDc string
|
||||||
|
destDc = c.S3Config.GetDerivedStorageDataCenter()
|
||||||
|
// todo:(neeraj) Refactor this later to get available the DC from the DB instead of
|
||||||
|
// querying the DB. This will help in case of multiple DCs and avoid querying the DB
|
||||||
|
// for each object.
|
||||||
|
// For initial migration, as we know that original DC was b2, and if the embedding is not found
|
||||||
|
// in the new derived DC, we can try to fetch it from the B2 DC.
|
||||||
if c.derivedStorageDataCenter != c.S3Config.GetHotBackblazeDC() {
|
if c.derivedStorageDataCenter != c.S3Config.GetHotBackblazeDC() {
|
||||||
// todo:(neeraj) Refactor this later to get available the DC from the DB and use that to
|
// embeddings ideally should ideally be in the default hot bucket b2
|
||||||
// copy the object to currently active DC for derived storage
|
srcDc = c.S3Config.GetHotBackblazeDC()
|
||||||
// If derived and hot bucket are different, try to copy from hot bucket
|
} else {
|
||||||
copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey, c.S3Config.GetHotBackblazeDC(), c.derivedStorageDataCenter)
|
_, modelName, fileID := c.getEmbeddingObjectDetails(objectKey)
|
||||||
|
activeDcs, err := c.Repo.GetOtherDCsForFileAndModel(context.Background(), fileID, modelName, c.derivedStorageDataCenter)
|
||||||
|
if err != nil {
|
||||||
|
return ente.EmbeddingObject{}, stacktrace.Propagate(err, "failed to get other dc")
|
||||||
|
}
|
||||||
|
if len(activeDcs) > 0 {
|
||||||
|
srcDc = activeDcs[0]
|
||||||
|
} else {
|
||||||
|
ctxLogger.Error("Object not found in any dc ", s3Err)
|
||||||
|
return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey, srcDc, destDc)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ctxLogger.Info("Got the object from hot bucket object")
|
ctxLogger.Infof("Got object from dc %s", srcDc)
|
||||||
return *copyEmbeddingObject, nil
|
return *copyEmbeddingObject, nil
|
||||||
} else {
|
} else {
|
||||||
ctxLogger.WithError(err).Error("Failed to copy from hot bucket object")
|
ctxLogger.WithError(err).Errorf("Failed to get object from fallback dc %s", srcDc)
|
||||||
}
|
}
|
||||||
return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "")
|
return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "")
|
||||||
} else {
|
|
||||||
ctxLogger.Error("Object not found: ", s3Err)
|
|
||||||
return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ctxLogger.Error("Failed to fetch object: ", err)
|
ctxLogger.Error("Failed to fetch object: ", err)
|
||||||
|
|
|
@ -5,10 +5,9 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/lib/pq"
|
|
||||||
|
|
||||||
"github.com/ente-io/museum/ente"
|
"github.com/ente-io/museum/ente"
|
||||||
"github.com/ente-io/stacktrace"
|
"github.com/ente-io/stacktrace"
|
||||||
|
"github.com/lib/pq"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -118,6 +117,33 @@ func (r *Repository) GetDatacenters(ctx context.Context, fileID int64) ([]string
|
||||||
return datacenters, nil
|
return datacenters, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetOtherDCsForFileAndModel returns the list of datacenters where the embeddings are stored for a given file and model, excluding the ignoredDC
|
||||||
|
func (r *Repository) GetOtherDCsForFileAndModel(ctx context.Context, fileID int64, model string, ignoredDC string) ([]string, error) {
|
||||||
|
rows, err := r.DB.QueryContext(ctx, `SELECT datacenters FROM embeddings WHERE file_id = $1 AND model = $2`, fileID, model)
|
||||||
|
if err != nil {
|
||||||
|
return nil, stacktrace.Propagate(err, "")
|
||||||
|
}
|
||||||
|
uniqueDatacenters := make(map[string]bool)
|
||||||
|
for rows.Next() {
|
||||||
|
var datacenters []string
|
||||||
|
err = rows.Scan(pq.Array(&datacenters))
|
||||||
|
if err != nil {
|
||||||
|
return nil, stacktrace.Propagate(err, "")
|
||||||
|
}
|
||||||
|
for _, dc := range datacenters {
|
||||||
|
// add to uniqueDatacenters if it is not the ignoredDC
|
||||||
|
if dc != ignoredDC {
|
||||||
|
uniqueDatacenters[dc] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
datacenters := make([]string, 0, len(uniqueDatacenters))
|
||||||
|
for dc := range uniqueDatacenters {
|
||||||
|
datacenters = append(datacenters, dc)
|
||||||
|
}
|
||||||
|
return datacenters, nil
|
||||||
|
}
|
||||||
|
|
||||||
// RemoveDatacenter removes the given datacenter from the list of datacenters
|
// RemoveDatacenter removes the given datacenter from the list of datacenters
|
||||||
func (r *Repository) RemoveDatacenter(ctx context.Context, fileID int64, dc string) error {
|
func (r *Repository) RemoveDatacenter(ctx context.Context, fileID int64, dc string) error {
|
||||||
_, err := r.DB.ExecContext(ctx, `UPDATE embeddings SET datacenters = array_remove(datacenters, $1) WHERE file_id = $2`, dc, fileID)
|
_, err := r.DB.ExecContext(ctx, `UPDATE embeddings SET datacenters = array_remove(datacenters, $1) WHERE file_id = $2`, dc, fileID)
|
||||||
|
|
|
@ -202,6 +202,10 @@ func (config *S3Config) GetHotWasabiDC() string {
|
||||||
return dcWasabiEuropeCentral_v3
|
return dcWasabiEuropeCentral_v3
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (config *S3Config) GetWasabiDerivedDC() string {
|
||||||
|
return dcWasabiEuropeCentralDerived
|
||||||
|
}
|
||||||
|
|
||||||
// Return the name of the cold Scaleway data center
|
// Return the name of the cold Scaleway data center
|
||||||
func (config *S3Config) GetColdScalewayDC() string {
|
func (config *S3Config) GetColdScalewayDC() string {
|
||||||
return dcSCWEuropeFrance_v3
|
return dcSCWEuropeFrance_v3
|
||||||
|
|
Loading…
Reference in a new issue