diff --git a/server/configurations/local.yaml b/server/configurations/local.yaml index a3cb71c9d..87502c271 100644 --- a/server/configurations/local.yaml +++ b/server/configurations/local.yaml @@ -125,15 +125,15 @@ s3: endpoint: region: bucket: - wasabi-eu-central-2-embeddings: + wasabi-eu-central-2-derived: key: secret: endpoint: region: bucket: - # Embeddings bucket is used for storing embeddings and other derived data from a file. + # Derived storage bucket is used for storing derived data like embeddings, preview etc. # By default, it is the same as the hot storage bucket. - # embeddings-bucket: wasabi-eu-central-2-embeddings + # derived-storage: wasabi-eu-central-2-derived # If true, enable some workarounds to allow us to use a local minio instance # for object storage. diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 5deb32d9a..81cc059bf 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -36,35 +36,35 @@ const ( ) type Controller struct { - Repo *embedding.Repository - AccessCtrl access.Controller - ObjectCleanupController *controller.ObjectCleanupController - S3Config *s3config.S3Config - QueueRepo *repo.QueueRepository - TaskLockingRepo *repo.TaskLockRepository - FileRepo *repo.FileRepository - CollectionRepo *repo.CollectionRepository - HostName string - cleanupCronRunning bool - embeddingS3Client *s3.S3 - embeddingBucket *string - areEmbeddingAndHotBucketSame bool + Repo *embedding.Repository + AccessCtrl access.Controller + ObjectCleanupController *controller.ObjectCleanupController + S3Config *s3config.S3Config + QueueRepo *repo.QueueRepository + TaskLockingRepo *repo.TaskLockRepository + FileRepo *repo.FileRepository + CollectionRepo *repo.CollectionRepository + HostName string + cleanupCronRunning bool + derivedStorageS3Client *s3.S3 + derivedStorageBucket *string + areDerivedAndHotBucketSame bool } func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanupController *controller.ObjectCleanupController, s3Config *s3config.S3Config, queueRepo *repo.QueueRepository, taskLockingRepo *repo.TaskLockRepository, fileRepo *repo.FileRepository, collectionRepo *repo.CollectionRepository, hostName string) *Controller { return &Controller{ - Repo: repo, - AccessCtrl: accessCtrl, - ObjectCleanupController: objectCleanupController, - S3Config: s3Config, - QueueRepo: queueRepo, - TaskLockingRepo: taskLockingRepo, - FileRepo: fileRepo, - CollectionRepo: collectionRepo, - HostName: hostName, - embeddingS3Client: s3Config.GetEmbeddingsS3Client(), - embeddingBucket: s3Config.GetEmbeddingsBucket(), - areEmbeddingAndHotBucketSame: s3Config.GetEmbeddingsBucket() == s3Config.GetHotBucket(), + Repo: repo, + AccessCtrl: accessCtrl, + ObjectCleanupController: objectCleanupController, + S3Config: s3Config, + QueueRepo: queueRepo, + TaskLockingRepo: taskLockingRepo, + FileRepo: fileRepo, + CollectionRepo: collectionRepo, + HostName: hostName, + derivedStorageS3Client: s3Config.GetDerivedStorageS3Client(), + derivedStorageBucket: s3Config.GetDerivedStorageBucket(), + areDerivedAndHotBucketSame: s3Config.GetDerivedStorageBucket() == s3Config.GetHotBucket(), } } @@ -265,13 +265,13 @@ func (c *Controller) deleteEmbedding(qItem repo.QueueItem) { } prefix := c.getEmbeddingObjectPrefix(ownerID, fileID) - err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetEmbeddingsDataCenter()) + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetDerivedStorageDataCenter()) if err != nil { ctxLogger.WithError(err).Error("Failed to delete all objects") return } // if Embeddings DC is different from hot DC, delete from hot DC as well - if !c.areEmbeddingAndHotBucketSame { + if !c.areDerivedAndHotBucketSame { err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter()) if err != nil { ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC") @@ -305,9 +305,9 @@ func (c *Controller) getEmbeddingObjectPrefix(userID int64, fileID int64) string // uploadObject uploads the embedding object to the object store and returns the object size func (c *Controller) uploadObject(obj ente.EmbeddingObject, key string) (int, error) { embeddingObj, _ := json.Marshal(obj) - uploader := s3manager.NewUploaderWithClient(c.embeddingS3Client) + uploader := s3manager.NewUploaderWithClient(c.derivedStorageS3Client) up := s3manager.UploadInput{ - Bucket: c.embeddingBucket, + Bucket: c.derivedStorageBucket, Key: &key, Body: bytes.NewReader(embeddingObj), } @@ -329,7 +329,7 @@ func (c *Controller) getEmbeddingObjectsParallel(objectKeys []string) ([]ente.Em var wg sync.WaitGroup var errs []error embeddingObjects := make([]ente.EmbeddingObject, len(objectKeys)) - downloader := s3manager.NewDownloaderWithClient(c.embeddingS3Client) + downloader := s3manager.NewDownloaderWithClient(c.derivedStorageS3Client) for i, objectKey := range objectKeys { wg.Add(1) @@ -366,7 +366,7 @@ type embeddingObjectResult struct { func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows []ente.Embedding) ([]embeddingObjectResult, error) { var wg sync.WaitGroup embeddingObjects := make([]embeddingObjectResult, len(dbEmbeddingRows)) - downloader := s3manager.NewDownloaderWithClient(c.embeddingS3Client) + downloader := s3manager.NewDownloaderWithClient(c.derivedStorageS3Client) for i, dbEmbeddingRow := range dbEmbeddingRows { wg.Add(1) @@ -417,7 +417,7 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d cancel() return ente.EmbeddingObject{}, stacktrace.Propagate(ctx.Err(), "") default: - obj, err := c.downloadObject(fetchCtx, objectKey, downloader, c.embeddingBucket) + obj, err := c.downloadObject(fetchCtx, objectKey, downloader, c.derivedStorageBucket) cancel() // Ensure cancel is called to release resources if err == nil { if i > 0 { @@ -432,10 +432,10 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d // check if the error is due to object not found if s3Err, ok := err.(awserr.RequestFailure); ok { if s3Err.Code() == s3.ErrCodeNoSuchKey { - if c.areEmbeddingAndHotBucketSame { + if c.areDerivedAndHotBucketSame { ctxLogger.Error("Object not found: ", s3Err) } else { - // If embedding and hot bucket are different, try to copy from hot bucket + // If derived and hot bucket are different, try to copy from hot bucket copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey) if err == nil { ctxLogger.Info("Got the object from hot bucket object") @@ -473,7 +473,7 @@ func (c *Controller) downloadObject(ctx context.Context, objectKey string, downl // download the embedding object from hot bucket and upload to embeddings bucket func (c *Controller) copyEmbeddingObject(ctx context.Context, objectKey string) (*ente.EmbeddingObject, error) { - if c.embeddingBucket == c.S3Config.GetHotBucket() { + if c.derivedStorageBucket == c.S3Config.GetHotBucket() { return nil, stacktrace.Propagate(errors.New("embedding bucket and hot bucket are same"), "") } downloader := s3manager.NewDownloaderWithClient(c.S3Config.GetHotS3Client()) diff --git a/server/pkg/utils/s3config/s3config.go b/server/pkg/utils/s3config/s3config.go index 02a7fbd26..f3e22493a 100644 --- a/server/pkg/utils/s3config/s3config.go +++ b/server/pkg/utils/s3config/s3config.go @@ -28,8 +28,8 @@ type S3Config struct { hotDC string // Secondary (hot) data center secondaryHotDC string - // Bucket for storing ml embeddings & preview files - embeddingsDC string + //Derived data data center for derived files like ml embeddings & preview files + derivedStorageDC string // A map from data centers to S3 configurations s3Configs map[string]*aws.Config // A map from data centers to pre-created S3 clients @@ -102,10 +102,10 @@ func (config *S3Config) initialize() { config.secondaryHotDC = hs2 log.Infof("Hot storage: %s (secondary: %s)", hs1, hs2) } - config.embeddingsDC = config.hotDC - embeddingsDC := viper.GetString("s3.embeddings-bucket") + config.derivedStorageDC = config.hotDC + embeddingsDC := viper.GetString("s3.derived-storage") if embeddingsDC != "" && array.StringInList(embeddingsDC, dcs[:]) { - config.embeddingsDC = embeddingsDC + config.derivedStorageDC = embeddingsDC log.Infof("Embeddings bucket: %s", embeddingsDC) } @@ -180,15 +180,15 @@ func (config *S3Config) GetHotS3Client() *s3.S3 { return &s3Client } -func (config *S3Config) GetEmbeddingsDataCenter() string { - return config.embeddingsDC +func (config *S3Config) GetDerivedStorageDataCenter() string { + return config.derivedStorageDC } -func (config *S3Config) GetEmbeddingsBucket() *string { - return config.GetBucket(config.embeddingsDC) +func (config *S3Config) GetDerivedStorageBucket() *string { + return config.GetBucket(config.derivedStorageDC) } -func (config *S3Config) GetEmbeddingsS3Client() *s3.S3 { - s3Client := config.GetS3Client(config.embeddingsDC) +func (config *S3Config) GetDerivedStorageS3Client() *s3.S3 { + s3Client := config.GetS3Client(config.derivedStorageDC) return &s3Client }