Browse Source

close workers

Jonathan Jogenfors 1 year ago
parent
commit
8cd61d3291

+ 1 - 0
server/src/domain/job/job.repository.ts

@@ -112,4 +112,5 @@ export interface IJobRepository {
   getQueueStatus(name: QueueName): Promise<QueueStatus>;
   getJobCounts(name: QueueName): Promise<JobCounts>;
   obliterate(name: QueueName, force: boolean): Promise<void>;
+  closeWorkers(): Promise<void>;
 }

+ 4 - 0
server/src/domain/job/job.service.ts

@@ -69,6 +69,10 @@ export class JobService {
     }
   }
 
+  async closeAll(): Promise<void> {
+    await this.jobRepository.closeWorkers();
+  }
+
   private async start(name: QueueName, { force }: JobCommandDto): Promise<void> {
     const { isActive } = await this.jobRepository.getQueueStatus(name);
     if (isActive) {

+ 1 - 1
server/src/domain/library/library.service.ts

@@ -350,7 +350,7 @@ export class LibraryService {
   }
 
   async handleQueueAssetRefresh(job: ILibraryRefreshJob): Promise<boolean> {
-    console.log('Handle queue asset refresh: ' + job);
+    console.log('Handle queue asset refresh: ' + JSON.stringify(job));
     console.log(await this.repository.getAll(true, LibraryType.EXTERNAL));
     const library = await this.repository.get(job.id);
     if (!library || library.type !== LibraryType.EXTERNAL) {

+ 11 - 1
server/src/infra/repositories/job.repository.ts

@@ -28,6 +28,16 @@ export class JobRepository implements IJobRepository {
     worker.concurrency = concurrency;
   }
 
+  async closeWorkers() {
+    for (const queue in Object.keys(this.workers)) {
+      const queueName = queue as QueueName;
+      const worker = this.workers[queueName];
+      if (worker) {
+        await worker.close();
+      }
+    }
+  }
+
   async getQueueStatus(name: QueueName): Promise<QueueStatus> {
     const queue = this.getQueue(name);
 
@@ -50,7 +60,7 @@ export class JobRepository implements IJobRepository {
   }
 
   obliterate(name: QueueName, force = false) {
-    return this.getQueue(name).obliterate({force});
+    return this.getQueue(name).obliterate({ force });
   }
 
   getJobCounts(name: QueueName): Promise<JobCounts> {

+ 3 - 1
server/test/e2e/library2.e2e-spec.ts

@@ -1,6 +1,7 @@
 import { JobService, LoginResponseDto, QueueName } from '@app/domain';
 import { AppModule } from '@app/immich/app.module';
 import { LibraryType } from '@app/infra/entities';
+import { JobRepository } from '@app/infra/repositories';
 import { INestApplication, Logger } from '@nestjs/common';
 import { Test, TestingModule } from '@nestjs/testing';
 import { api } from '@test/api';
@@ -99,8 +100,9 @@ describe('libe2e', () => {
   });
 
   afterAll(async () => {
-    await db.disconnect();
+    await jobService.closeAll();
     await app.close();
     await moduleFixture.close();
+    await db.disconnect();
   });
 });

+ 1 - 0
server/test/repositories/job.repository.mock.ts

@@ -11,5 +11,6 @@ export const newJobRepositoryMock = (): jest.Mocked<IJobRepository> => {
     getQueueStatus: jest.fn(),
     getJobCounts: jest.fn(),
     obliterate: jest.fn(),
+    closeWorkers: jest.fn(),
   };
 };