Jelajahi Sumber

test(server): job service (#2634)

Jason Rasmussen 2 tahun lalu
induk
melakukan
76a1629e75

+ 1 - 1
server/libs/domain/src/job/job.interface.ts

@@ -19,7 +19,7 @@ export interface IFaceThumbnailJob extends IAssetFaceJob {
 
 export interface IEntityJob extends IBaseJob {
   id: string;
-  source?: string;
+  source?: 'upload';
 }
 
 export interface IBulkEntityJob extends IBaseJob {

+ 105 - 9
server/libs/domain/src/job/job.service.spec.ts

@@ -1,5 +1,7 @@
+import { SystemConfig } from '@app/infra/entities';
 import { BadRequestException } from '@nestjs/common';
 import {
+  asyncTick,
   newAssetRepositoryMock,
   newCommunicationRepositoryMock,
   newJobRepositoryMock,
@@ -7,8 +9,17 @@ import {
 } from '../../test';
 import { IAssetRepository } from '../asset';
 import { ICommunicationRepository } from '../communication';
-import { IJobRepository, JobCommand, JobHandler, JobName, JobService, QueueName } from '../job';
+import { IJobRepository, JobCommand, JobHandler, JobItem, JobName, JobService, QueueName } from '../job';
 import { ISystemConfigRepository } from '../system-config';
+import { SystemConfigCore } from '../system-config/system-config.core';
+
+const makeMockHandlers = (success: boolean) => {
+  const mock = jest.fn().mockResolvedValue(success);
+  return Object.values(JobName).reduce((map, jobName) => ({ ...map, [jobName]: mock }), {}) as Record<
+    JobName,
+    JobHandler
+  >;
+};
 
 describe(JobService.name, () => {
   let sut: JobService;
@@ -192,16 +203,101 @@ describe(JobService.name, () => {
 
   describe('registerHandlers', () => {
     it('should register a handler for each queue', async () => {
-      const mock = jest.fn();
-      const handlers = Object.values(JobName).reduce((map, jobName) => ({ ...map, [jobName]: mock }), {}) as Record<
-        JobName,
-        JobHandler
-      >;
-
-      await sut.registerHandlers(handlers);
-
+      await sut.registerHandlers(makeMockHandlers(true));
       expect(configMock.load).toHaveBeenCalled();
       expect(jobMock.addHandler).toHaveBeenCalledTimes(Object.keys(QueueName).length);
     });
+
+    it('should subscribe to config changes', async () => {
+      await sut.registerHandlers(makeMockHandlers(false));
+
+      const configCore = new SystemConfigCore(newSystemConfigRepositoryMock());
+      configCore.config$.next({
+        job: {
+          [QueueName.BACKGROUND_TASK]: { concurrency: 10 },
+          [QueueName.CLIP_ENCODING]: { concurrency: 10 },
+          [QueueName.METADATA_EXTRACTION]: { concurrency: 10 },
+          [QueueName.OBJECT_TAGGING]: { concurrency: 10 },
+          [QueueName.RECOGNIZE_FACES]: { concurrency: 10 },
+          [QueueName.SEARCH]: { concurrency: 10 },
+          [QueueName.SIDECAR]: { concurrency: 10 },
+          [QueueName.STORAGE_TEMPLATE_MIGRATION]: { concurrency: 10 },
+          [QueueName.THUMBNAIL_GENERATION]: { concurrency: 10 },
+          [QueueName.VIDEO_CONVERSION]: { concurrency: 10 },
+        },
+      } as SystemConfig);
+
+      expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.BACKGROUND_TASK, 10);
+      expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.CLIP_ENCODING, 10);
+      expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.METADATA_EXTRACTION, 10);
+      expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.OBJECT_TAGGING, 10);
+      expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.RECOGNIZE_FACES, 10);
+      expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.SIDECAR, 10);
+      expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.STORAGE_TEMPLATE_MIGRATION, 10);
+      expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.THUMBNAIL_GENERATION, 10);
+      expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.VIDEO_CONVERSION, 10);
+    });
+
+    const tests: Array<{ item: JobItem; jobs: JobName[] }> = [
+      {
+        item: { name: JobName.SIDECAR_SYNC, data: { id: 'asset-1' } },
+        jobs: [JobName.METADATA_EXTRACTION],
+      },
+      {
+        item: { name: JobName.SIDECAR_DISCOVERY, data: { id: 'asset-1' } },
+        jobs: [JobName.METADATA_EXTRACTION],
+      },
+      {
+        item: { name: JobName.METADATA_EXTRACTION, data: { id: 'asset-1' } },
+        jobs: [JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, JobName.SEARCH_INDEX_ASSET],
+      },
+      {
+        item: { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { id: 'asset-1', source: 'upload' } },
+        jobs: [JobName.GENERATE_JPEG_THUMBNAIL],
+      },
+      {
+        item: { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { id: 'asset-1' } },
+        jobs: [],
+      },
+      {
+        item: { name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: 'asset-1' } },
+        jobs: [JobName.GENERATE_WEBP_THUMBNAIL, JobName.CLASSIFY_IMAGE, JobName.ENCODE_CLIP, JobName.RECOGNIZE_FACES],
+      },
+      {
+        item: { name: JobName.CLASSIFY_IMAGE, data: { id: 'asset-1' } },
+        jobs: [JobName.SEARCH_INDEX_ASSET],
+      },
+      {
+        item: { name: JobName.ENCODE_CLIP, data: { id: 'asset-1' } },
+        jobs: [JobName.SEARCH_INDEX_ASSET],
+      },
+      {
+        item: { name: JobName.RECOGNIZE_FACES, data: { id: 'asset-1' } },
+        jobs: [JobName.SEARCH_INDEX_ASSET],
+      },
+    ];
+
+    for (const { item, jobs } of tests) {
+      it(`should queue ${jobs.length} jobs when a ${item.name} job finishes successfully`, async () => {
+        assetMock.getByIds.mockResolvedValue([]);
+
+        await sut.registerHandlers(makeMockHandlers(true));
+        await jobMock.addHandler.mock.calls[0][2](item);
+        await asyncTick(3);
+
+        expect(jobMock.queue).toHaveBeenCalledTimes(jobs.length);
+        for (const jobName of jobs) {
+          expect(jobMock.queue).toHaveBeenCalledWith({ name: jobName, data: expect.anything() });
+        }
+      });
+
+      it(`should not queue any jobs when ${item.name} finishes with 'false'`, async () => {
+        await sut.registerHandlers(makeMockHandlers(false));
+        await jobMock.addHandler.mock.calls[0][2](item);
+        await asyncTick(3);
+
+        expect(jobMock.queue).not.toHaveBeenCalled();
+      });
+    }
   });
 });

+ 2 - 2
server/libs/domain/src/job/job.service.ts

@@ -135,11 +135,11 @@ export class JobService {
   /**
    * Queue follow up jobs
    */
-  async onDone(item: JobItem) {
+  private async onDone(item: JobItem) {
     switch (item.name) {
       case JobName.SIDECAR_SYNC:
       case JobName.SIDECAR_DISCOVERY:
-        await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: { id: item.data.id } });
+        await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: item.data });
         break;
 
       case JobName.METADATA_EXTRACTION: