Store dc during insert or update

This commit is contained in:
Neeraj Gupta 2024-05-16 10:58:00 +05:30
parent da188aa753
commit 64ecdfa153
2 changed files with 20 additions and 9 deletions

View file

@ -103,7 +103,7 @@ func (c *Controller) InsertOrUpdate(ctx *gin.Context, req ente.InsertOrUpdateEmb
log.Error(uploadErr)
return nil, stacktrace.Propagate(uploadErr, "")
}
embedding, err := c.Repo.InsertOrUpdate(ctx, userID, req, size, version)
embedding, err := c.Repo.InsertOrUpdate(ctx, userID, req, size, version, c.S3Config.GetDerivedStorageDataCenter())
embedding.Version = &version
if err != nil {
return nil, stacktrace.Propagate(err, "")

View file

@ -18,15 +18,26 @@ type Repository struct {
}
// Create inserts a new embedding
func (r *Repository) InsertOrUpdate(ctx context.Context, ownerID int64, entry ente.InsertOrUpdateEmbeddingRequest, size int, version int) (ente.Embedding, error) {
func (r *Repository) InsertOrUpdate(ctx context.Context, ownerID int64, entry ente.InsertOrUpdateEmbeddingRequest, size int, version int, dc string) (ente.Embedding, error) {
var updatedAt int64
err := r.DB.QueryRowContext(ctx, `INSERT INTO embeddings
(file_id, owner_id, model, size, version)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT ON CONSTRAINT unique_embeddings_file_id_model
DO UPDATE SET updated_at = now_utc_micro_seconds(), size = $4, version = $5
RETURNING updated_at`, entry.FileID, ownerID, entry.Model, size, version).Scan(&updatedAt)
err := r.DB.QueryRowContext(ctx, `
INSERT INTO embeddings
(file_id, owner_id, model, size, version, datacenters)
VALUES
($1, $2, $3, $4, $5, ARRAY[$6]::s3region[])
ON CONFLICT ON CONSTRAINT unique_embeddings_file_id_model
DO UPDATE
SET
updated_at = now_utc_micro_seconds(),
size = $4,
version = $5,
datacenters = CASE
WHEN $6 = ANY(COALESCE(embeddings.datacenters, ARRAY['b2-eu-cen']::s3region[])) THEN embeddings.datacenters
ELSE array_append(COALESCE(embeddings.datacenters, ARRAY['b2-eu-cen']::s3region[]), $6::s3region)
END
RETURNING updated_at`,
entry.FileID, ownerID, entry.Model, size, version, dc).Scan(&updatedAt)
if err != nil {
// check if error is due to model enum invalid value
if err.Error() == fmt.Sprintf("pq: invalid input value for enum model: \"%s\"", entry.Model) {