[server] store embedding objSize & version
This commit is contained in:
parent
7634b2c153
commit
05b0f1649f
3 changed files with 29 additions and 17 deletions
|
@ -14,6 +14,7 @@ type InsertOrUpdateEmbeddingRequest struct {
|
|||
Model string `json:"model" binding:"required"`
|
||||
EncryptedEmbedding string `json:"encryptedEmbedding" binding:"required"`
|
||||
DecryptionHeader string `json:"decryptionHeader" binding:"required"`
|
||||
Version *int `json:"version,omitempty"`
|
||||
}
|
||||
|
||||
type GetEmbeddingDiffRequest struct {
|
||||
|
@ -37,9 +38,10 @@ type GetFilesEmbeddingResponse struct {
|
|||
type Model string
|
||||
|
||||
const (
|
||||
OnnxClip Model = "onnx-clip"
|
||||
GgmlClip Model = "ggml-clip"
|
||||
OnnxYolo5MobileNet Model = "onnx-yolo5-mobile"
|
||||
OnnxClip Model = "onnx-clip"
|
||||
GgmlClip Model = "ggml-clip"
|
||||
OnnxYoloV5MobileNet Model = "onnx-yolov5-mobile"
|
||||
FileMlClipFace Model = "file-ml-clip-face"
|
||||
)
|
||||
|
||||
type EmbeddingObject struct {
|
||||
|
|
|
@ -58,19 +58,23 @@ func (c *Controller) InsertOrUpdate(ctx *gin.Context, req ente.InsertOrUpdateEmb
|
|||
if count < 1 {
|
||||
return nil, stacktrace.Propagate(ente.ErrNotFound, "")
|
||||
}
|
||||
version := 1
|
||||
if req.Version != nil {
|
||||
version = *req.Version
|
||||
}
|
||||
|
||||
obj := ente.EmbeddingObject{
|
||||
Version: 1,
|
||||
Version: version,
|
||||
EncryptedEmbedding: req.EncryptedEmbedding,
|
||||
DecryptionHeader: req.DecryptionHeader,
|
||||
Client: network.GetPrettyUA(ctx.GetHeader("User-Agent")) + "/" + ctx.GetHeader("X-Client-Version"),
|
||||
}
|
||||
err = c.uploadObject(obj, c.getObjectKey(userID, req.FileID, req.Model))
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return nil, stacktrace.Propagate(err, "")
|
||||
size, uploadErr := c.uploadObject(obj, c.getObjectKey(userID, req.FileID, req.Model))
|
||||
if uploadErr != nil {
|
||||
log.Error(uploadErr)
|
||||
return nil, stacktrace.Propagate(uploadErr, "")
|
||||
}
|
||||
embedding, err := c.Repo.InsertOrUpdate(ctx, userID, req)
|
||||
embedding, err := c.Repo.InsertOrUpdate(ctx, userID, req, size)
|
||||
if err != nil {
|
||||
return nil, stacktrace.Propagate(err, "")
|
||||
}
|
||||
|
@ -258,7 +262,8 @@ func (c *Controller) getEmbeddingObjectPrefix(userID int64, fileID int64) string
|
|||
return strconv.FormatInt(userID, 10) + "/ml-data/" + strconv.FormatInt(fileID, 10) + "/"
|
||||
}
|
||||
|
||||
func (c *Controller) uploadObject(obj ente.EmbeddingObject, key string) error {
|
||||
// uploadObject uploads the embedding object to the object store and returns the object size
|
||||
func (c *Controller) uploadObject(obj ente.EmbeddingObject, key string) (int, error) {
|
||||
embeddingObj, _ := json.Marshal(obj)
|
||||
uploader := s3manager.NewUploaderWithClient(c.S3Config.GetHotS3Client())
|
||||
up := s3manager.UploadInput{
|
||||
|
@ -269,10 +274,11 @@ func (c *Controller) uploadObject(obj ente.EmbeddingObject, key string) error {
|
|||
result, err := uploader.Upload(&up)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return stacktrace.Propagate(err, "")
|
||||
return -1, stacktrace.Propagate(err, "")
|
||||
}
|
||||
|
||||
log.Infof("Uploaded to bucket %s", result.Location)
|
||||
return nil
|
||||
return len(embeddingObj), nil
|
||||
}
|
||||
|
||||
var globalFetchSemaphore = make(chan struct{}, 300)
|
||||
|
|
|
@ -19,14 +19,18 @@ type Repository struct {
|
|||
|
||||
// Create inserts a new embedding
|
||||
|
||||
func (r *Repository) InsertOrUpdate(ctx context.Context, ownerID int64, entry ente.InsertOrUpdateEmbeddingRequest) (ente.Embedding, error) {
|
||||
func (r *Repository) InsertOrUpdate(ctx context.Context, ownerID int64, entry ente.InsertOrUpdateEmbeddingRequest, size int) (ente.Embedding, error) {
|
||||
var updatedAt int64
|
||||
version := 1
|
||||
if entry.Version != nil {
|
||||
version = *entry.Version
|
||||
}
|
||||
err := r.DB.QueryRowContext(ctx, `INSERT INTO embeddings
|
||||
(file_id, owner_id, model)
|
||||
VALUES ($1, $2, $3)
|
||||
(file_id, owner_id, model, size, version)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT ON CONSTRAINT unique_embeddings_file_id_model
|
||||
DO UPDATE SET updated_at = now_utc_micro_seconds()
|
||||
RETURNING updated_at`, entry.FileID, ownerID, entry.Model).Scan(&updatedAt)
|
||||
DO UPDATE SET updated_at = now_utc_micro_seconds(), size = $4, version = $5
|
||||
RETURNING updated_at`, entry.FileID, ownerID, entry.Model, size, version).Scan(&updatedAt)
|
||||
if err != nil {
|
||||
// check if error is due to model enum invalid value
|
||||
if err.Error() == fmt.Sprintf("pq: invalid input value for enum model: \"%s\"", entry.Model) {
|
||||
|
|
Loading…
Reference in a new issue