Ver Fonte

feat(server): pagination for asset queries in jobs (#2516)

* feat(server): pagination for asset queries in jobs

* default mock value for getAll

* remove live photo name correction

* order paginated results by createdAt

* change log level

* move usePagination to domain
Michel Heusschen há 2 anos atrás
pai
commit
f1384fea58

+ 12 - 6
server/apps/microservices/src/processors/metadata-extraction.processor.ts

@@ -6,7 +6,9 @@ import {
   IGeocodingRepository,
   IGeocodingRepository,
   IJobRepository,
   IJobRepository,
   JobName,
   JobName,
+  JOBS_ASSET_PAGINATION_SIZE,
   QueueName,
   QueueName,
+  usePagination,
   WithoutProperty,
   WithoutProperty,
 } from '@app/domain';
 } from '@app/domain';
 import { AssetEntity, AssetType, ExifEntity } from '@app/infra/entities';
 import { AssetEntity, AssetType, ExifEntity } from '@app/infra/entities';
@@ -74,13 +76,17 @@ export class MetadataExtractionProcessor {
   async handleQueueMetadataExtraction(job: Job<IBaseJob>) {
   async handleQueueMetadataExtraction(job: Job<IBaseJob>) {
     try {
     try {
       const { force } = job.data;
       const { force } = job.data;
-      const assets = force
-        ? await this.assetRepository.getAll()
-        : await this.assetRepository.getWithout(WithoutProperty.EXIF);
+      const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => {
+        return force
+          ? this.assetRepository.getAll(pagination)
+          : this.assetRepository.getWithout(pagination, WithoutProperty.EXIF);
+      });
 
 
-      for (const asset of assets) {
-        const name = asset.type === AssetType.VIDEO ? JobName.EXTRACT_VIDEO_METADATA : JobName.EXIF_EXTRACTION;
-        await this.jobRepository.queue({ name, data: { asset } });
+      for await (const assets of assetPagination) {
+        for (const asset of assets) {
+          const name = asset.type === AssetType.VIDEO ? JobName.EXTRACT_VIDEO_METADATA : JobName.EXIF_EXTRACTION;
+          await this.jobRepository.queue({ name, data: { asset } });
+        }
       }
       }
     } catch (error: any) {
     } catch (error: any) {
       this.logger.error(`Unable to queue metadata extraction`, error?.stack);
       this.logger.error(`Unable to queue metadata extraction`, error?.stack);

+ 1 - 5
server/libs/domain/src/asset/asset.core.ts

@@ -1,14 +1,10 @@
 import { AssetEntity } from '@app/infra/entities';
 import { AssetEntity } from '@app/infra/entities';
 import { IJobRepository, JobName } from '../job';
 import { IJobRepository, JobName } from '../job';
-import { AssetSearchOptions, IAssetRepository, LivePhotoSearchOptions } from './asset.repository';
+import { IAssetRepository, LivePhotoSearchOptions } from './asset.repository';
 
 
 export class AssetCore {
 export class AssetCore {
   constructor(private assetRepository: IAssetRepository, private jobRepository: IJobRepository) {}
   constructor(private assetRepository: IAssetRepository, private jobRepository: IJobRepository) {}
 
 
-  getAll(options: AssetSearchOptions) {
-    return this.assetRepository.getAll(options);
-  }
-
   async save(asset: Partial<AssetEntity>) {
   async save(asset: Partial<AssetEntity>) {
     const _asset = await this.assetRepository.save(asset);
     const _asset = await this.assetRepository.save(asset);
     await this.jobRepository.queue({
     await this.jobRepository.queue({

+ 3 - 2
server/libs/domain/src/asset/asset.repository.ts

@@ -1,4 +1,5 @@
 import { AssetEntity, AssetType } from '@app/infra/entities';
 import { AssetEntity, AssetType } from '@app/infra/entities';
+import { Paginated, PaginationOptions } from '../domain.util';
 
 
 export interface AssetSearchOptions {
 export interface AssetSearchOptions {
   isVisible?: boolean;
   isVisible?: boolean;
@@ -35,10 +36,10 @@ export const IAssetRepository = 'IAssetRepository';
 
 
 export interface IAssetRepository {
 export interface IAssetRepository {
   getByIds(ids: string[]): Promise<AssetEntity[]>;
   getByIds(ids: string[]): Promise<AssetEntity[]>;
-  getWithout(property: WithoutProperty): Promise<AssetEntity[]>;
+  getWithout(pagination: PaginationOptions, property: WithoutProperty): Paginated<AssetEntity>;
   getFirstAssetForAlbumId(albumId: string): Promise<AssetEntity | null>;
   getFirstAssetForAlbumId(albumId: string): Promise<AssetEntity | null>;
   deleteAll(ownerId: string): Promise<void>;
   deleteAll(ownerId: string): Promise<void>;
-  getAll(options?: AssetSearchOptions): Promise<AssetEntity[]>;
+  getAll(pagination: PaginationOptions, options?: AssetSearchOptions): Paginated<AssetEntity>;
   save(asset: Partial<AssetEntity>): Promise<AssetEntity>;
   save(asset: Partial<AssetEntity>): Promise<AssetEntity>;
   findLivePhotoMatch(options: LivePhotoSearchOptions): Promise<AssetEntity | null>;
   findLivePhotoMatch(options: LivePhotoSearchOptions): Promise<AssetEntity | null>;
   getMapMarkers(ownerId: string, options?: MapMarkerSearchOptions): Promise<MapMarker[]>;
   getMapMarkers(ownerId: string, options?: MapMarkerSearchOptions): Promise<MapMarker[]>;

+ 25 - 0
server/libs/domain/src/domain.util.ts

@@ -32,3 +32,28 @@ export function asHumanReadable(bytes: number, precision = 1): string {
 
 
   return `${remainder.toFixed(magnitude == 0 ? 0 : precision)} ${units[magnitude]}`;
   return `${remainder.toFixed(magnitude == 0 ? 0 : precision)} ${units[magnitude]}`;
 }
 }
+
+export interface PaginationOptions {
+  take: number;
+  skip?: number;
+}
+
+export interface PaginationResult<T> {
+  items: T[];
+  hasNextPage: boolean;
+}
+
+export type Paginated<T> = Promise<PaginationResult<T>>;
+
+export async function* usePagination<T>(
+  pageSize: number,
+  getNextPage: (pagination: PaginationOptions) => Paginated<T>,
+) {
+  let hasNextPage = true;
+
+  for (let skip = 0; hasNextPage; skip += pageSize) {
+    const result = await getNextPage({ take: pageSize, skip });
+    hasNextPage = result.hasNextPage;
+    yield result.items;
+  }
+}

+ 9 - 3
server/libs/domain/src/facial-recognition/facial-recognition.service.spec.ts

@@ -132,10 +132,13 @@ describe(FacialRecognitionService.name, () => {
 
 
   describe('handleQueueRecognizeFaces', () => {
   describe('handleQueueRecognizeFaces', () => {
     it('should queue missing assets', async () => {
     it('should queue missing assets', async () => {
-      assetMock.getWithout.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getWithout.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
       await sut.handleQueueRecognizeFaces({});
       await sut.handleQueueRecognizeFaces({});
 
 
-      expect(assetMock.getWithout).toHaveBeenCalledWith(WithoutProperty.FACES);
+      expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.FACES);
       expect(jobMock.queue).toHaveBeenCalledWith({
       expect(jobMock.queue).toHaveBeenCalledWith({
         name: JobName.RECOGNIZE_FACES,
         name: JobName.RECOGNIZE_FACES,
         data: { asset: assetEntityStub.image },
         data: { asset: assetEntityStub.image },
@@ -143,7 +146,10 @@ describe(FacialRecognitionService.name, () => {
     });
     });
 
 
     it('should queue all assets', async () => {
     it('should queue all assets', async () => {
-      assetMock.getAll.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getAll.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
       personMock.deleteAll.mockResolvedValue(5);
       personMock.deleteAll.mockResolvedValue(5);
       searchMock.deleteAllFaces.mockResolvedValue(100);
       searchMock.deleteAllFaces.mockResolvedValue(100);
 
 

+ 11 - 6
server/libs/domain/src/facial-recognition/facial-recognition.services.ts

@@ -2,7 +2,8 @@ import { Inject, Logger } from '@nestjs/common';
 import { join } from 'path';
 import { join } from 'path';
 import { IAssetRepository, WithoutProperty } from '../asset';
 import { IAssetRepository, WithoutProperty } from '../asset';
 import { MACHINE_LEARNING_ENABLED } from '../domain.constant';
 import { MACHINE_LEARNING_ENABLED } from '../domain.constant';
-import { IAssetJob, IBaseJob, IFaceThumbnailJob, IJobRepository, JobName } from '../job';
+import { usePagination } from '../domain.util';
+import { IAssetJob, IBaseJob, IFaceThumbnailJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job';
 import { CropOptions, FACE_THUMBNAIL_SIZE, IMediaRepository } from '../media';
 import { CropOptions, FACE_THUMBNAIL_SIZE, IMediaRepository } from '../media';
 import { IPersonRepository } from '../person/person.repository';
 import { IPersonRepository } from '../person/person.repository';
 import { ISearchRepository } from '../search/search.repository';
 import { ISearchRepository } from '../search/search.repository';
@@ -27,17 +28,21 @@ export class FacialRecognitionService {
 
 
   async handleQueueRecognizeFaces({ force }: IBaseJob) {
   async handleQueueRecognizeFaces({ force }: IBaseJob) {
     try {
     try {
-      const assets = force
-        ? await this.assetRepository.getAll()
-        : await this.assetRepository.getWithout(WithoutProperty.FACES);
+      const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => {
+        return force
+          ? this.assetRepository.getAll(pagination)
+          : this.assetRepository.getWithout(pagination, WithoutProperty.FACES);
+      });
 
 
       if (force) {
       if (force) {
         const people = await this.personRepository.deleteAll();
         const people = await this.personRepository.deleteAll();
         const faces = await this.searchRepository.deleteAllFaces();
         const faces = await this.searchRepository.deleteAllFaces();
         this.logger.debug(`Deleted ${people} people and ${faces} faces`);
         this.logger.debug(`Deleted ${people} people and ${faces} faces`);
       }
       }
-      for (const asset of assets) {
-        await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: { asset } });
+      for await (const assets of assetPagination) {
+        for (const asset of assets) {
+          await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: { asset } });
+        }
       }
       }
     } catch (error: any) {
     } catch (error: any) {
       this.logger.error(`Unable to queue recognize faces`, error?.stack);
       this.logger.error(`Unable to queue recognize faces`, error?.stack);

+ 2 - 0
server/libs/domain/src/job/job.constants.ts

@@ -73,3 +73,5 @@ export enum JobName {
   QUEUE_ENCODE_CLIP = 'queue-clip-encode',
   QUEUE_ENCODE_CLIP = 'queue-clip-encode',
   ENCODE_CLIP = 'clip-encode',
   ENCODE_CLIP = 'clip-encode',
 }
 }
+
+export const JOBS_ASSET_PAGINATION_SIZE = 1000;

+ 19 - 7
server/libs/domain/src/media/media.service.spec.ts

@@ -44,7 +44,10 @@ describe(MediaService.name, () => {
 
 
   describe('handleQueueGenerateThumbnails', () => {
   describe('handleQueueGenerateThumbnails', () => {
     it('should queue all assets', async () => {
     it('should queue all assets', async () => {
-      assetMock.getAll.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getAll.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
 
 
       await sut.handleQueueGenerateThumbnails({ force: true });
       await sut.handleQueueGenerateThumbnails({ force: true });
 
 
@@ -57,12 +60,15 @@ describe(MediaService.name, () => {
     });
     });
 
 
     it('should queue all assets with missing thumbnails', async () => {
     it('should queue all assets with missing thumbnails', async () => {
-      assetMock.getWithout.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getWithout.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
 
 
       await sut.handleQueueGenerateThumbnails({ force: false });
       await sut.handleQueueGenerateThumbnails({ force: false });
 
 
       expect(assetMock.getAll).not.toHaveBeenCalled();
       expect(assetMock.getAll).not.toHaveBeenCalled();
-      expect(assetMock.getWithout).toHaveBeenCalledWith(WithoutProperty.THUMBNAIL);
+      expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL);
       expect(jobMock.queue).toHaveBeenCalledWith({
       expect(jobMock.queue).toHaveBeenCalledWith({
         name: JobName.GENERATE_JPEG_THUMBNAIL,
         name: JobName.GENERATE_JPEG_THUMBNAIL,
         data: { asset: assetEntityStub.image },
         data: { asset: assetEntityStub.image },
@@ -183,11 +189,14 @@ describe(MediaService.name, () => {
 
 
   describe('handleQueueVideoConversion', () => {
   describe('handleQueueVideoConversion', () => {
     it('should queue all video assets', async () => {
     it('should queue all video assets', async () => {
-      assetMock.getAll.mockResolvedValue([assetEntityStub.video]);
+      assetMock.getAll.mockResolvedValue({
+        items: [assetEntityStub.video],
+        hasNextPage: false,
+      });
 
 
       await sut.handleQueueVideoConversion({ force: true });
       await sut.handleQueueVideoConversion({ force: true });
 
 
-      expect(assetMock.getAll).toHaveBeenCalledWith({ type: AssetType.VIDEO });
+      expect(assetMock.getAll).toHaveBeenCalledWith({ skip: 0, take: 1000 }, { type: AssetType.VIDEO });
       expect(assetMock.getWithout).not.toHaveBeenCalled();
       expect(assetMock.getWithout).not.toHaveBeenCalled();
       expect(jobMock.queue).toHaveBeenCalledWith({
       expect(jobMock.queue).toHaveBeenCalledWith({
         name: JobName.VIDEO_CONVERSION,
         name: JobName.VIDEO_CONVERSION,
@@ -196,12 +205,15 @@ describe(MediaService.name, () => {
     });
     });
 
 
     it('should queue all video assets without encoded videos', async () => {
     it('should queue all video assets without encoded videos', async () => {
-      assetMock.getWithout.mockResolvedValue([assetEntityStub.video]);
+      assetMock.getWithout.mockResolvedValue({
+        items: [assetEntityStub.video],
+        hasNextPage: false,
+      });
 
 
       await sut.handleQueueVideoConversion({});
       await sut.handleQueueVideoConversion({});
 
 
       expect(assetMock.getAll).not.toHaveBeenCalled();
       expect(assetMock.getAll).not.toHaveBeenCalled();
-      expect(assetMock.getWithout).toHaveBeenCalledWith(WithoutProperty.ENCODED_VIDEO);
+      expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.ENCODED_VIDEO);
       expect(jobMock.queue).toHaveBeenCalledWith({
       expect(jobMock.queue).toHaveBeenCalledWith({
         name: JobName.VIDEO_CONVERSION,
         name: JobName.VIDEO_CONVERSION,
         data: { asset: assetEntityStub.video },
         data: { asset: assetEntityStub.video },

+ 21 - 11
server/libs/domain/src/media/media.service.ts

@@ -3,7 +3,8 @@ import { Inject, Injectable, Logger } from '@nestjs/common';
 import { join } from 'path';
 import { join } from 'path';
 import { IAssetRepository, mapAsset, WithoutProperty } from '../asset';
 import { IAssetRepository, mapAsset, WithoutProperty } from '../asset';
 import { CommunicationEvent, ICommunicationRepository } from '../communication';
 import { CommunicationEvent, ICommunicationRepository } from '../communication';
-import { IAssetJob, IBaseJob, IJobRepository, JobName } from '../job';
+import { usePagination } from '../domain.util';
+import { IAssetJob, IBaseJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job';
 import { IStorageRepository, StorageCore, StorageFolder } from '../storage';
 import { IStorageRepository, StorageCore, StorageFolder } from '../storage';
 import { ISystemConfigRepository, SystemConfigFFmpegDto } from '../system-config';
 import { ISystemConfigRepository, SystemConfigFFmpegDto } from '../system-config';
 import { SystemConfigCore } from '../system-config/system-config.core';
 import { SystemConfigCore } from '../system-config/system-config.core';
@@ -31,12 +32,16 @@ export class MediaService {
     try {
     try {
       const { force } = job;
       const { force } = job;
 
 
-      const assets = force
-        ? await this.assetRepository.getAll()
-        : await this.assetRepository.getWithout(WithoutProperty.THUMBNAIL);
+      const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => {
+        return force
+          ? this.assetRepository.getAll(pagination)
+          : this.assetRepository.getWithout(pagination, WithoutProperty.THUMBNAIL);
+      });
 
 
-      for (const asset of assets) {
-        await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { asset } });
+      for await (const assets of assetPagination) {
+        for (const asset of assets) {
+          await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { asset } });
+        }
       }
       }
     } catch (error: any) {
     } catch (error: any) {
       this.logger.error('Failed to queue generate thumbnail jobs', error.stack);
       this.logger.error('Failed to queue generate thumbnail jobs', error.stack);
@@ -115,11 +120,16 @@ export class MediaService {
     const { force } = job;
     const { force } = job;
 
 
     try {
     try {
-      const assets = force
-        ? await this.assetRepository.getAll({ type: AssetType.VIDEO })
-        : await this.assetRepository.getWithout(WithoutProperty.ENCODED_VIDEO);
-      for (const asset of assets) {
-        await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { asset } });
+      const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => {
+        return force
+          ? this.assetRepository.getAll(pagination, { type: AssetType.VIDEO })
+          : this.assetRepository.getWithout(pagination, WithoutProperty.ENCODED_VIDEO);
+      });
+
+      for await (const assets of assetPagination) {
+        for (const asset of assets) {
+          await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { asset } });
+        }
       }
       }
     } catch (error: any) {
     } catch (error: any) {
       this.logger.error('Failed to queue video conversions', error.stack);
       this.logger.error('Failed to queue video conversions', error.stack);

+ 8 - 4
server/libs/domain/src/search/search.service.spec.ts

@@ -185,15 +185,16 @@ describe(SearchService.name, () => {
 
 
   describe('handleIndexAssets', () => {
   describe('handleIndexAssets', () => {
     it('should call done, even when there are no assets', async () => {
     it('should call done, even when there are no assets', async () => {
-      assetMock.getAll.mockResolvedValue([]);
-
       await sut.handleIndexAssets();
       await sut.handleIndexAssets();
 
 
       expect(searchMock.importAssets).toHaveBeenCalledWith([], true);
       expect(searchMock.importAssets).toHaveBeenCalledWith([], true);
     });
     });
 
 
     it('should index all the assets', async () => {
     it('should index all the assets', async () => {
-      assetMock.getAll.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getAll.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
 
 
       await sut.handleIndexAssets();
       await sut.handleIndexAssets();
 
 
@@ -204,7 +205,10 @@ describe(SearchService.name, () => {
     });
     });
 
 
     it('should log an error', async () => {
     it('should log an error', async () => {
-      assetMock.getAll.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getAll.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
       searchMock.importAssets.mockRejectedValue(new Error('import failed'));
       searchMock.importAssets.mockRejectedValue(new Error('import failed'));
 
 
       await sut.handleIndexAssets();
       await sut.handleIndexAssets();

+ 10 - 6
server/libs/domain/src/search/search.service.ts

@@ -7,8 +7,9 @@ import { mapAsset } from '../asset';
 import { IAssetRepository } from '../asset/asset.repository';
 import { IAssetRepository } from '../asset/asset.repository';
 import { AuthUserDto } from '../auth';
 import { AuthUserDto } from '../auth';
 import { MACHINE_LEARNING_ENABLED } from '../domain.constant';
 import { MACHINE_LEARNING_ENABLED } from '../domain.constant';
+import { usePagination } from '../domain.util';
 import { AssetFaceId, IFaceRepository } from '../facial-recognition';
 import { AssetFaceId, IFaceRepository } from '../facial-recognition';
-import { IAssetFaceJob, IBulkEntityJob, IJobRepository, JobName } from '../job';
+import { IAssetFaceJob, IBulkEntityJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job';
 import { IMachineLearningRepository } from '../smart-info';
 import { IMachineLearningRepository } from '../smart-info';
 import { SearchDto } from './dto';
 import { SearchDto } from './dto';
 import { SearchConfigResponseDto, SearchResponseDto } from './response-dto';
 import { SearchConfigResponseDto, SearchResponseDto } from './response-dto';
@@ -155,12 +156,15 @@ export class SearchService {
 
 
     try {
     try {
       // TODO: do this in batches based on searchIndexVersion
       // TODO: do this in batches based on searchIndexVersion
-      const assets = this.patchAssets(await this.assetRepository.getAll({ isVisible: true }));
-      this.logger.log(`Indexing ${assets.length} assets`);
+      const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) =>
+        this.assetRepository.getAll(pagination, { isVisible: true }),
+      );
 
 
-      const chunkSize = 1000;
-      for (let i = 0; i < assets.length; i += chunkSize) {
-        await this.searchRepository.importAssets(assets.slice(i, i + chunkSize), false);
+      for await (const assets of assetPagination) {
+        this.logger.debug(`Indexing ${assets.length} assets`);
+
+        const patchedAssets = this.patchAssets(assets);
+        await this.searchRepository.importAssets(patchedAssets, false);
       }
       }
 
 
       await this.searchRepository.importAssets([], true);
       await this.searchRepository.importAssets([], true);

+ 18 - 6
server/libs/domain/src/smart-info/smart-info.service.spec.ts

@@ -38,7 +38,10 @@ describe(SmartInfoService.name, () => {
 
 
   describe('handleQueueObjectTagging', () => {
   describe('handleQueueObjectTagging', () => {
     it('should queue the assets without tags', async () => {
     it('should queue the assets without tags', async () => {
-      assetMock.getWithout.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getWithout.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
 
 
       await sut.handleQueueObjectTagging({ force: false });
       await sut.handleQueueObjectTagging({ force: false });
 
 
@@ -46,11 +49,14 @@ describe(SmartInfoService.name, () => {
         [{ name: JobName.CLASSIFY_IMAGE, data: { asset: assetEntityStub.image } }],
         [{ name: JobName.CLASSIFY_IMAGE, data: { asset: assetEntityStub.image } }],
         [{ name: JobName.DETECT_OBJECTS, data: { asset: assetEntityStub.image } }],
         [{ name: JobName.DETECT_OBJECTS, data: { asset: assetEntityStub.image } }],
       ]);
       ]);
-      expect(assetMock.getWithout).toHaveBeenCalledWith(WithoutProperty.OBJECT_TAGS);
+      expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.OBJECT_TAGS);
     });
     });
 
 
     it('should queue all the assets', async () => {
     it('should queue all the assets', async () => {
-      assetMock.getAll.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getAll.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
 
 
       await sut.handleQueueObjectTagging({ force: true });
       await sut.handleQueueObjectTagging({ force: true });
 
 
@@ -140,16 +146,22 @@ describe(SmartInfoService.name, () => {
 
 
   describe('handleQueueEncodeClip', () => {
   describe('handleQueueEncodeClip', () => {
     it('should queue the assets without clip embeddings', async () => {
     it('should queue the assets without clip embeddings', async () => {
-      assetMock.getWithout.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getWithout.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
 
 
       await sut.handleQueueEncodeClip({ force: false });
       await sut.handleQueueEncodeClip({ force: false });
 
 
       expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.ENCODE_CLIP, data: { asset: assetEntityStub.image } });
       expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.ENCODE_CLIP, data: { asset: assetEntityStub.image } });
-      expect(assetMock.getWithout).toHaveBeenCalledWith(WithoutProperty.CLIP_ENCODING);
+      expect(assetMock.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.CLIP_ENCODING);
     });
     });
 
 
     it('should queue all the assets', async () => {
     it('should queue all the assets', async () => {
-      assetMock.getAll.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getAll.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
 
 
       await sut.handleQueueEncodeClip({ force: true });
       await sut.handleQueueEncodeClip({ force: true });
 
 

+ 21 - 12
server/libs/domain/src/smart-info/smart-info.service.ts

@@ -1,7 +1,8 @@
 import { Inject, Injectable, Logger } from '@nestjs/common';
 import { Inject, Injectable, Logger } from '@nestjs/common';
 import { IAssetRepository, WithoutProperty } from '../asset';
 import { IAssetRepository, WithoutProperty } from '../asset';
 import { MACHINE_LEARNING_ENABLED } from '../domain.constant';
 import { MACHINE_LEARNING_ENABLED } from '../domain.constant';
-import { IAssetJob, IBaseJob, IJobRepository, JobName } from '../job';
+import { usePagination } from '../domain.util';
+import { IAssetJob, IBaseJob, IJobRepository, JobName, JOBS_ASSET_PAGINATION_SIZE } from '../job';
 import { IMachineLearningRepository } from './machine-learning.interface';
 import { IMachineLearningRepository } from './machine-learning.interface';
 import { ISmartInfoRepository } from './smart-info.repository';
 import { ISmartInfoRepository } from './smart-info.repository';
 
 
@@ -18,13 +19,17 @@ export class SmartInfoService {
 
 
   async handleQueueObjectTagging({ force }: IBaseJob) {
   async handleQueueObjectTagging({ force }: IBaseJob) {
     try {
     try {
-      const assets = force
-        ? await this.assetRepository.getAll()
-        : await this.assetRepository.getWithout(WithoutProperty.OBJECT_TAGS);
+      const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => {
+        return force
+          ? this.assetRepository.getAll(pagination)
+          : this.assetRepository.getWithout(pagination, WithoutProperty.OBJECT_TAGS);
+      });
 
 
-      for (const asset of assets) {
-        await this.jobRepository.queue({ name: JobName.CLASSIFY_IMAGE, data: { asset } });
-        await this.jobRepository.queue({ name: JobName.DETECT_OBJECTS, data: { asset } });
+      for await (const assets of assetPagination) {
+        for (const asset of assets) {
+          await this.jobRepository.queue({ name: JobName.CLASSIFY_IMAGE, data: { asset } });
+          await this.jobRepository.queue({ name: JobName.DETECT_OBJECTS, data: { asset } });
+        }
       }
       }
     } catch (error: any) {
     } catch (error: any) {
       this.logger.error(`Unable to queue object tagging`, error?.stack);
       this.logger.error(`Unable to queue object tagging`, error?.stack);
@@ -69,12 +74,16 @@ export class SmartInfoService {
 
 
   async handleQueueEncodeClip({ force }: IBaseJob) {
   async handleQueueEncodeClip({ force }: IBaseJob) {
     try {
     try {
-      const assets = force
-        ? await this.assetRepository.getAll()
-        : await this.assetRepository.getWithout(WithoutProperty.CLIP_ENCODING);
+      const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => {
+        return force
+          ? this.assetRepository.getAll(pagination)
+          : this.assetRepository.getWithout(pagination, WithoutProperty.CLIP_ENCODING);
+      });
 
 
-      for (const asset of assets) {
-        await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: { asset } });
+      for await (const assets of assetPagination) {
+        for (const asset of assets) {
+          await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: { asset } });
+        }
       }
       }
     } catch (error: any) {
     } catch (error: any) {
       this.logger.error(`Unable to queue clip encoding`, error?.stack);
       this.logger.error(`Unable to queue clip encoding`, error?.stack);

+ 42 - 19
server/libs/domain/src/storage-template/storage-template.service.spec.ts

@@ -36,7 +36,10 @@ describe(StorageTemplateService.name, () => {
 
 
   describe('handle template migration', () => {
   describe('handle template migration', () => {
     it('should handle no assets', async () => {
     it('should handle no assets', async () => {
-      assetMock.getAll.mockResolvedValue([]);
+      assetMock.getAll.mockResolvedValue({
+        items: [],
+        hasNextPage: false,
+      });
       userMock.getList.mockResolvedValue([]);
       userMock.getList.mockResolvedValue([]);
 
 
       await sut.handleTemplateMigration();
       await sut.handleTemplateMigration();
@@ -45,7 +48,10 @@ describe(StorageTemplateService.name, () => {
     });
     });
 
 
     it('should handle an asset with a duplicate destination', async () => {
     it('should handle an asset with a duplicate destination', async () => {
-      assetMock.getAll.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getAll.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
       assetMock.save.mockResolvedValue(assetEntityStub.image);
       assetMock.save.mockResolvedValue(assetEntityStub.image);
       userMock.getList.mockResolvedValue([userEntityStub.user1]);
       userMock.getList.mockResolvedValue([userEntityStub.user1]);
 
 
@@ -69,12 +75,15 @@ describe(StorageTemplateService.name, () => {
     });
     });
 
 
     it('should skip when an asset already matches the template', async () => {
     it('should skip when an asset already matches the template', async () => {
-      assetMock.getAll.mockResolvedValue([
-        {
-          ...assetEntityStub.image,
-          originalPath: 'upload/library/user-id/2023/2023-02-23/asset-id.ext',
-        },
-      ]);
+      assetMock.getAll.mockResolvedValue({
+        items: [
+          {
+            ...assetEntityStub.image,
+            originalPath: 'upload/library/user-id/2023/2023-02-23/asset-id.ext',
+          },
+        ],
+        hasNextPage: false,
+      });
       userMock.getList.mockResolvedValue([userEntityStub.user1]);
       userMock.getList.mockResolvedValue([userEntityStub.user1]);
 
 
       await sut.handleTemplateMigration();
       await sut.handleTemplateMigration();
@@ -86,12 +95,15 @@ describe(StorageTemplateService.name, () => {
     });
     });
 
 
     it('should skip when an asset is probably a duplicate', async () => {
     it('should skip when an asset is probably a duplicate', async () => {
-      assetMock.getAll.mockResolvedValue([
-        {
-          ...assetEntityStub.image,
-          originalPath: 'upload/library/user-id/2023/2023-02-23/asset-id+1.ext',
-        },
-      ]);
+      assetMock.getAll.mockResolvedValue({
+        items: [
+          {
+            ...assetEntityStub.image,
+            originalPath: 'upload/library/user-id/2023/2023-02-23/asset-id+1.ext',
+          },
+        ],
+        hasNextPage: false,
+      });
       userMock.getList.mockResolvedValue([userEntityStub.user1]);
       userMock.getList.mockResolvedValue([userEntityStub.user1]);
 
 
       await sut.handleTemplateMigration();
       await sut.handleTemplateMigration();
@@ -103,7 +115,10 @@ describe(StorageTemplateService.name, () => {
     });
     });
 
 
     it('should move an asset', async () => {
     it('should move an asset', async () => {
-      assetMock.getAll.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getAll.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
       assetMock.save.mockResolvedValue(assetEntityStub.image);
       assetMock.save.mockResolvedValue(assetEntityStub.image);
       userMock.getList.mockResolvedValue([userEntityStub.user1]);
       userMock.getList.mockResolvedValue([userEntityStub.user1]);
 
 
@@ -121,7 +136,10 @@ describe(StorageTemplateService.name, () => {
     });
     });
 
 
     it('should use the user storage label', async () => {
     it('should use the user storage label', async () => {
-      assetMock.getAll.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getAll.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
       assetMock.save.mockResolvedValue(assetEntityStub.image);
       assetMock.save.mockResolvedValue(assetEntityStub.image);
       userMock.getList.mockResolvedValue([userEntityStub.storageLabel]);
       userMock.getList.mockResolvedValue([userEntityStub.storageLabel]);
 
 
@@ -139,7 +157,10 @@ describe(StorageTemplateService.name, () => {
     });
     });
 
 
     it('should not update the database if the move fails', async () => {
     it('should not update the database if the move fails', async () => {
-      assetMock.getAll.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getAll.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
       storageMock.moveFile.mockRejectedValue(new Error('Read only system'));
       storageMock.moveFile.mockRejectedValue(new Error('Read only system'));
       userMock.getList.mockResolvedValue([userEntityStub.user1]);
       userMock.getList.mockResolvedValue([userEntityStub.user1]);
 
 
@@ -154,7 +175,10 @@ describe(StorageTemplateService.name, () => {
     });
     });
 
 
     it('should move the asset back if the database fails', async () => {
     it('should move the asset back if the database fails', async () => {
-      assetMock.getAll.mockResolvedValue([assetEntityStub.image]);
+      assetMock.getAll.mockResolvedValue({
+        items: [assetEntityStub.image],
+        hasNextPage: false,
+      });
       assetMock.save.mockRejectedValue('Connection Error!');
       assetMock.save.mockRejectedValue('Connection Error!');
       userMock.getList.mockResolvedValue([userEntityStub.user1]);
       userMock.getList.mockResolvedValue([userEntityStub.user1]);
 
 
@@ -173,7 +197,6 @@ describe(StorageTemplateService.name, () => {
   });
   });
 
 
   it('should handle an error', async () => {
   it('should handle an error', async () => {
-    assetMock.getAll.mockResolvedValue([]);
     storageMock.removeEmptyDirs.mockRejectedValue(new Error('Read only filesystem'));
     storageMock.removeEmptyDirs.mockRejectedValue(new Error('Read only filesystem'));
     userMock.getList.mockResolvedValue([]);
     userMock.getList.mockResolvedValue([]);
 
 

+ 12 - 17
server/libs/domain/src/storage-template/storage-template.service.ts

@@ -2,8 +2,8 @@ import { AssetEntity, SystemConfig } from '@app/infra/entities';
 import { Inject, Injectable, Logger } from '@nestjs/common';
 import { Inject, Injectable, Logger } from '@nestjs/common';
 import { IAssetRepository } from '../asset/asset.repository';
 import { IAssetRepository } from '../asset/asset.repository';
 import { APP_MEDIA_LOCATION } from '../domain.constant';
 import { APP_MEDIA_LOCATION } from '../domain.constant';
-import { getLivePhotoMotionFilename } from '../domain.util';
-import { IAssetJob } from '../job';
+import { getLivePhotoMotionFilename, usePagination } from '../domain.util';
+import { IAssetJob, JOBS_ASSET_PAGINATION_SIZE } from '../job';
 import { IStorageRepository } from '../storage/storage.repository';
 import { IStorageRepository } from '../storage/storage.repository';
 import { INITIAL_SYSTEM_CONFIG, ISystemConfigRepository } from '../system-config';
 import { INITIAL_SYSTEM_CONFIG, ISystemConfigRepository } from '../system-config';
 import { IUserRepository } from '../user/user.repository';
 import { IUserRepository } from '../user/user.repository';
@@ -52,26 +52,21 @@ export class StorageTemplateService {
   async handleTemplateMigration() {
   async handleTemplateMigration() {
     try {
     try {
       console.time('migrating-time');
       console.time('migrating-time');
-      const assets = await this.assetRepository.getAll();
-      const users = await this.userRepository.getList();
 
 
-      const livePhotoMap: Record<string, AssetEntity> = {};
+      const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) =>
+        this.assetRepository.getAll(pagination),
+      );
+      const users = await this.userRepository.getList();
 
 
-      for (const asset of assets) {
-        if (asset.livePhotoVideoId) {
-          livePhotoMap[asset.livePhotoVideoId] = asset;
+      for await (const assets of assetPagination) {
+        for (const asset of assets) {
+          const user = users.find((user) => user.id === asset.ownerId);
+          const storageLabel = user?.storageLabel || null;
+          const filename = asset.originalFileName || asset.id;
+          await this.moveAsset(asset, { storageLabel, filename });
         }
         }
       }
       }
 
 
-      for (const asset of assets) {
-        const livePhotoParentAsset = livePhotoMap[asset.id];
-        // TODO: remove livePhoto specific stuff once upload is fixed
-        const user = users.find((user) => user.id === asset.ownerId);
-        const storageLabel = user?.storageLabel || null;
-        const filename = asset.originalFileName || livePhotoParentAsset?.originalFileName || asset.id;
-        await this.moveAsset(asset, { storageLabel, filename });
-      }
-
       this.logger.debug('Cleaning up empty directories...');
       this.logger.debug('Cleaning up empty directories...');
       await this.storageRepository.removeEmptyDirs(APP_MEDIA_LOCATION);
       await this.storageRepository.removeEmptyDirs(APP_MEDIA_LOCATION);
     } catch (error: any) {
     } catch (error: any) {

+ 4 - 1
server/libs/domain/test/asset.repository.mock.ts

@@ -5,7 +5,10 @@ export const newAssetRepositoryMock = (): jest.Mocked<IAssetRepository> => {
     getByIds: jest.fn(),
     getByIds: jest.fn(),
     getWithout: jest.fn(),
     getWithout: jest.fn(),
     getFirstAssetForAlbumId: jest.fn(),
     getFirstAssetForAlbumId: jest.fn(),
-    getAll: jest.fn(),
+    getAll: jest.fn().mockResolvedValue({
+      items: [],
+      hasNextPage: false,
+    }),
     deleteAll: jest.fn(),
     deleteAll: jest.fn(),
     save: jest.fn(),
     save: jest.fn(),
     findLivePhotoMatch: jest.fn(),
     findLivePhotoMatch: jest.fn(),

+ 15 - 6
server/libs/infra/src/repositories/asset.repository.ts

@@ -4,12 +4,15 @@ import {
   LivePhotoSearchOptions,
   LivePhotoSearchOptions,
   MapMarker,
   MapMarker,
   MapMarkerSearchOptions,
   MapMarkerSearchOptions,
+  Paginated,
+  PaginationOptions,
   WithoutProperty,
   WithoutProperty,
 } from '@app/domain';
 } from '@app/domain';
 import { Injectable } from '@nestjs/common';
 import { Injectable } from '@nestjs/common';
 import { InjectRepository } from '@nestjs/typeorm';
 import { InjectRepository } from '@nestjs/typeorm';
 import { FindOptionsRelations, FindOptionsWhere, In, IsNull, Not, Repository } from 'typeorm';
 import { FindOptionsRelations, FindOptionsWhere, In, IsNull, Not, Repository } from 'typeorm';
 import { AssetEntity, AssetType } from '../entities';
 import { AssetEntity, AssetType } from '../entities';
+import { paginate } from '../utils/pagination.util';
 
 
 @Injectable()
 @Injectable()
 export class AssetRepository implements IAssetRepository {
 export class AssetRepository implements IAssetRepository {
@@ -32,10 +35,8 @@ export class AssetRepository implements IAssetRepository {
     await this.repository.delete({ ownerId });
     await this.repository.delete({ ownerId });
   }
   }
 
 
-  getAll(options?: AssetSearchOptions | undefined): Promise<AssetEntity[]> {
-    options = options || {};
-
-    return this.repository.find({
+  getAll(pagination: PaginationOptions, options: AssetSearchOptions = {}): Paginated<AssetEntity> {
+    return paginate(this.repository, pagination, {
       where: {
       where: {
         isVisible: options.isVisible,
         isVisible: options.isVisible,
         type: options.type,
         type: options.type,
@@ -48,6 +49,10 @@ export class AssetRepository implements IAssetRepository {
           person: true,
           person: true,
         },
         },
       },
       },
+      order: {
+        // Ensures correct order when paginating
+        createdAt: 'ASC',
+      },
     });
     });
   }
   }
 
 
@@ -83,7 +88,7 @@ export class AssetRepository implements IAssetRepository {
     });
     });
   }
   }
 
 
-  getWithout(property: WithoutProperty): Promise<AssetEntity[]> {
+  getWithout(pagination: PaginationOptions, property: WithoutProperty): Paginated<AssetEntity> {
     let relations: FindOptionsRelations<AssetEntity> = {};
     let relations: FindOptionsRelations<AssetEntity> = {};
     let where: FindOptionsWhere<AssetEntity> | FindOptionsWhere<AssetEntity>[] = {};
     let where: FindOptionsWhere<AssetEntity> | FindOptionsWhere<AssetEntity>[] = {};
 
 
@@ -160,9 +165,13 @@ export class AssetRepository implements IAssetRepository {
         throw new Error(`Invalid getWithout property: ${property}`);
         throw new Error(`Invalid getWithout property: ${property}`);
     }
     }
 
 
-    return this.repository.find({
+    return paginate(this.repository, pagination, {
       relations,
       relations,
       where,
       where,
+      order: {
+        // Ensures correct order when paginating
+        createdAt: 'ASC',
+      },
     });
     });
   }
   }
 
 

+ 20 - 0
server/libs/infra/src/utils/pagination.util.ts

@@ -0,0 +1,20 @@
+import { Paginated, PaginationOptions } from '@app/domain';
+import { FindOneOptions, ObjectLiteral, Repository } from 'typeorm';
+
+export async function paginate<Entity extends ObjectLiteral>(
+  repository: Repository<Entity>,
+  paginationOptions: PaginationOptions,
+  searchOptions?: FindOneOptions<Entity>,
+): Paginated<Entity> {
+  const items = await repository.find({
+    ...searchOptions,
+    // Take one more item to check if there's a next page
+    take: paginationOptions.take + 1,
+    skip: paginationOptions.skip,
+  });
+
+  const hasNextPage = items.length > paginationOptions.take;
+  items.splice(paginationOptions.take);
+
+  return { items, hasNextPage };
+}