213 lines
7.5 KiB
Go
213 lines
7.5 KiB
Go
package repo
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"math/rand"
|
|
"strconv"
|
|
|
|
"github.com/ente-io/museum/ente"
|
|
"github.com/ente-io/stacktrace"
|
|
"github.com/lib/pq"
|
|
)
|
|
|
|
type ObjectRepository struct {
|
|
DB *sql.DB
|
|
QueueRepo *QueueRepository
|
|
}
|
|
|
|
func (repo *ObjectRepository) GetObjectsMissingInDC(dc string, limit int, random bool) ([]ente.S3ObjectKey, error) {
|
|
rows, err := repo.DB.Query(`SELECT file_id, o_type, object_key, size FROM object_keys
|
|
WHERE is_deleted = false AND NOT($1 = ANY(datacenters)) limit $2`, dc, limit)
|
|
if err != nil {
|
|
return nil, stacktrace.Propagate(err, "")
|
|
}
|
|
files, err := convertRowsToObjectKeys(rows)
|
|
if err != nil {
|
|
return files, stacktrace.Propagate(err, "")
|
|
}
|
|
|
|
if random && files != nil && len(files) > 0 {
|
|
rand.Shuffle(len(files), func(i, j int) { files[i], files[j] = files[j], files[i] })
|
|
}
|
|
|
|
return files, nil
|
|
}
|
|
|
|
func (repo *ObjectRepository) MarkObjectReplicated(objectKey string, datacenter string) (int64, error) {
|
|
result, err := repo.DB.Exec(`UPDATE object_keys SET datacenters = datacenters || $1::s3region WHERE object_key = $2`,
|
|
datacenter, objectKey)
|
|
if err != nil {
|
|
return 0, stacktrace.Propagate(err, "")
|
|
}
|
|
return result.RowsAffected()
|
|
}
|
|
|
|
func (repo *ObjectRepository) GetObjectsForFileIDs(fileIDs []int64) ([]ente.S3ObjectKey, error) {
|
|
rows, err := repo.DB.Query(`SELECT file_id, o_type, object_key, size FROM object_keys
|
|
WHERE file_id = ANY($1) AND is_deleted=false`, pq.Array(fileIDs))
|
|
if err != nil {
|
|
return nil, stacktrace.Propagate(err, "")
|
|
}
|
|
return convertRowsToObjectKeys(rows)
|
|
}
|
|
|
|
// GetObject returns the ente.S3ObjectKey key for a file id and type
|
|
func (repo *ObjectRepository) GetObject(fileID int64, objType ente.ObjectType) (ente.S3ObjectKey, error) {
|
|
// todo: handling of deleted objects
|
|
row := repo.DB.QueryRow(`SELECT object_key, size, o_type FROM object_keys WHERE file_id = $1 AND o_type = $2 AND is_deleted=false`,
|
|
fileID, objType)
|
|
var s3ObjectKey ente.S3ObjectKey
|
|
s3ObjectKey.FileID = fileID
|
|
err := row.Scan(&s3ObjectKey.ObjectKey, &s3ObjectKey.FileSize, &s3ObjectKey.Type)
|
|
return s3ObjectKey, stacktrace.Propagate(err, "")
|
|
}
|
|
|
|
func (repo *ObjectRepository) GetAllFileObjectsByObjectKey(objectKey string) ([]ente.S3ObjectKey, error) {
|
|
rows, err := repo.DB.Query(`SELECT file_id, o_type, object_key, size from object_keys where file_id in
|
|
(select file_id from object_keys where object_key= $1)
|
|
and is_deleted=false`, objectKey)
|
|
if err != nil {
|
|
return nil, stacktrace.Propagate(err, "")
|
|
}
|
|
return convertRowsToObjectKeys(rows)
|
|
}
|
|
|
|
func (repo *ObjectRepository) GetDataCentersForObject(objectKey string) ([]string, error) {
|
|
rows, err := repo.DB.Query(`select jsonb_array_elements_text(to_jsonb(datacenters)) from object_keys where object_key = $1`, objectKey)
|
|
if err != nil {
|
|
return nil, stacktrace.Propagate(err, "")
|
|
}
|
|
defer rows.Close()
|
|
datacenters := make([]string, 0)
|
|
for rows.Next() {
|
|
var dc string
|
|
err := rows.Scan(&dc)
|
|
if err != nil {
|
|
return datacenters, stacktrace.Propagate(err, "")
|
|
}
|
|
datacenters = append(datacenters, dc)
|
|
}
|
|
return datacenters, nil
|
|
}
|
|
|
|
func (repo *ObjectRepository) RemoveDataCenterFromObject(objectKey string, datacenter string) error {
|
|
_, err := repo.DB.Exec(`UPDATE object_keys SET datacenters = array_remove(datacenters, $1) WHERE object_key = $2`,
|
|
datacenter, objectKey)
|
|
return stacktrace.Propagate(err, "")
|
|
}
|
|
|
|
// RemoveObjectsForKey removes the keys of a deleted object from our tables
|
|
func (repo *ObjectRepository) RemoveObjectsForKey(objectKey string) error {
|
|
_, err := repo.DB.Exec(`DELETE FROM object_keys WHERE object_key = $1 AND is_deleted = TRUE`,
|
|
objectKey)
|
|
return stacktrace.Propagate(err, "")
|
|
}
|
|
|
|
// MarkObjectsAsDeletedForFileIDs marks the object keys corresponding to the given filesIDs as deleted
|
|
// The actual deletion happens later when the queue is processed
|
|
func (repo *ObjectRepository) MarkObjectsAsDeletedForFileIDs(ctx context.Context, tx *sql.Tx, fileIDs []int64) ([]ente.S3ObjectKey, error) {
|
|
rows, err := tx.QueryContext(ctx, `SELECT file_id, o_type, object_key, size FROM object_keys
|
|
WHERE file_id = ANY($1) AND is_deleted=false FOR UPDATE`, pq.Array(fileIDs))
|
|
if err != nil {
|
|
return nil, stacktrace.Propagate(err, "")
|
|
}
|
|
|
|
s3ObjectKeys, err := convertRowsToObjectKeys(rows)
|
|
if err != nil {
|
|
return nil, stacktrace.Propagate(err, "")
|
|
}
|
|
|
|
var keysToBeDeleted []string
|
|
for _, s3ObjectKey := range s3ObjectKeys {
|
|
keysToBeDeleted = append(keysToBeDeleted, s3ObjectKey.ObjectKey)
|
|
}
|
|
|
|
err = repo.QueueRepo.AddItems(ctx, tx, RemoveComplianceHoldQueue, keysToBeDeleted)
|
|
if err != nil {
|
|
return nil, stacktrace.Propagate(err, "")
|
|
}
|
|
|
|
err = repo.QueueRepo.AddItems(ctx, tx, DeleteObjectQueue, keysToBeDeleted)
|
|
if err != nil {
|
|
return nil, stacktrace.Propagate(err, "")
|
|
}
|
|
|
|
var embeddingsToBeDeleted []string
|
|
for _, fileID := range fileIDs {
|
|
embeddingsToBeDeleted = append(embeddingsToBeDeleted, strconv.FormatInt(fileID, 10))
|
|
}
|
|
|
|
err = repo.QueueRepo.AddItems(ctx, tx, DeleteEmbeddingsQueue, embeddingsToBeDeleted)
|
|
if err != nil {
|
|
return nil, stacktrace.Propagate(err, "")
|
|
}
|
|
|
|
_, err = tx.ExecContext(ctx, `UPDATE object_keys SET is_deleted = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs))
|
|
if err != nil {
|
|
return nil, stacktrace.Propagate(err, "")
|
|
}
|
|
return s3ObjectKeys, nil
|
|
}
|
|
|
|
func convertRowsToObjectKeys(rows *sql.Rows) ([]ente.S3ObjectKey, error) {
|
|
defer rows.Close()
|
|
fileObjectKeys := make([]ente.S3ObjectKey, 0)
|
|
for rows.Next() {
|
|
var fileObjectKey ente.S3ObjectKey
|
|
err := rows.Scan(&fileObjectKey.FileID, &fileObjectKey.Type, &fileObjectKey.ObjectKey, &fileObjectKey.FileSize)
|
|
if err != nil {
|
|
return fileObjectKeys, stacktrace.Propagate(err, "")
|
|
}
|
|
fileObjectKeys = append(fileObjectKeys, fileObjectKey)
|
|
}
|
|
return fileObjectKeys, nil
|
|
}
|
|
|
|
// DoesObjectExist returns the true if there is an entry for the object key.
|
|
func (repo *ObjectRepository) DoesObjectExist(tx *sql.Tx, objectKey string) (bool, error) {
|
|
var exists bool
|
|
err := tx.QueryRow(
|
|
`SELECT EXISTS (SELECT 1 FROM object_keys WHERE object_key = $1)`,
|
|
objectKey).Scan(&exists)
|
|
return exists, stacktrace.Propagate(err, "")
|
|
}
|
|
|
|
// DoesObjectOrTempObjectExist returns the true if there is an entry for the object key in
|
|
// either the object_keys or in temp_objects table.
|
|
func (repo *ObjectRepository) DoesObjectOrTempObjectExist(objectKey string) (bool, error) {
|
|
var exists bool
|
|
err := repo.DB.QueryRow(
|
|
`SELECT (EXISTS (SELECT 1 FROM object_keys WHERE object_key = $1) OR
|
|
EXISTS (SELECT 1 FROM temp_objects WHERE object_key = $1))`,
|
|
objectKey).Scan(&exists)
|
|
return exists, stacktrace.Propagate(err, "")
|
|
}
|
|
|
|
// GetObjectState returns various bits of information about an object that are
|
|
// useful in pre-flight checks during replication.
|
|
//
|
|
// Unknown objects (i.e. objectKeys for which there are no entries) are
|
|
// considered as deleted.
|
|
func (repo *ObjectRepository) GetObjectState(tx *sql.Tx, objectKey string) (ObjectState ente.ObjectState, err error) {
|
|
row := tx.QueryRow(`
|
|
SELECT ok.is_deleted, u.encrypted_email IS NULL AS is_user_deleted, ok.size
|
|
FROM object_keys ok
|
|
JOIN files f ON ok.file_id = f.file_id
|
|
JOIN users u ON f.owner_id = u.user_id
|
|
where object_key = $1
|
|
`, objectKey)
|
|
var os ente.ObjectState
|
|
err = row.Scan(&os.IsFileDeleted, &os.IsUserDeleted, &os.Size)
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
os.IsFileDeleted = true
|
|
os.IsUserDeleted = true
|
|
return os, nil
|
|
}
|
|
return os, stacktrace.Propagate(err, "Failed to fetch object state")
|
|
}
|
|
|
|
return os, nil
|
|
}
|