浏览代码

feat(server): generate checksum for previous uploaded assets (#558)

* feat(server): generate checksum for previous uploaded assets

* fix(server): typo
Thanh Pham 2 年之前
父节点
当前提交
c76f7804ab

+ 13 - 6
server/apps/microservices/src/microservices.module.ts

@@ -6,6 +6,7 @@ import { SmartInfoEntity } from '@app/database/entities/smart-info.entity';
 import { UserEntity } from '@app/database/entities/user.entity';
 import {
   assetUploadedQueueName,
+  generateChecksumQueueName,
   metadataExtractionQueueName,
   thumbnailGeneratorQueueName,
   videoConversionQueueName,
@@ -17,6 +18,7 @@ import { TypeOrmModule } from '@nestjs/typeorm';
 import { CommunicationModule } from '../../immich/src/api-v1/communication/communication.module';
 import { MicroservicesService } from './microservices.service';
 import { AssetUploadedProcessor } from './processors/asset-uploaded.processor';
+import { GenerateChecksumProcessor } from './processors/generate-checksum.processor';
 import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor';
 import { ThumbnailGeneratorProcessor } from './processors/thumbnail.processor';
 import { VideoTranscodeProcessor } from './processors/video-transcode.processor';
@@ -45,30 +47,34 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor'
         removeOnComplete: true,
         removeOnFail: false,
       },
-    }),
-    BullModule.registerQueue({
+    }, {
       name: assetUploadedQueueName,
       defaultJobOptions: {
         attempts: 3,
         removeOnComplete: true,
         removeOnFail: false,
       },
-    }),
-    BullModule.registerQueue({
+    }, {
       name: metadataExtractionQueueName,
       defaultJobOptions: {
         attempts: 3,
         removeOnComplete: true,
         removeOnFail: false,
       },
-    }),
-    BullModule.registerQueue({
+    }, {
       name: videoConversionQueueName,
       defaultJobOptions: {
         attempts: 3,
         removeOnComplete: true,
         removeOnFail: false,
       },
+    }, {
+      name: generateChecksumQueueName,
+      defaultJobOptions: {
+        attempts: 3,
+        removeOnComplete: true,
+        removeOnFail: false,
+      },
     }),
     CommunicationModule,
   ],
@@ -79,6 +85,7 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor'
     ThumbnailGeneratorProcessor,
     MetadataExtractionProcessor,
     VideoTranscodeProcessor,
+    GenerateChecksumProcessor,
   ],
   exports: [],
 })

+ 13 - 4
server/apps/microservices/src/microservices.service.ts

@@ -1,8 +1,17 @@
-import { Injectable } from '@nestjs/common';
+import { generateChecksumQueueName } from '@app/job';
+import { InjectQueue } from '@nestjs/bull';
+import { Injectable, OnModuleInit } from '@nestjs/common';
+import { Queue } from 'bull';
+import { randomUUID } from 'node:crypto';
 
 @Injectable()
-export class MicroservicesService {
-  getHello(): string {
-    return 'Hello World 123!';
+export class MicroservicesService implements OnModuleInit {
+  constructor (
+    @InjectQueue(generateChecksumQueueName)
+    private generateChecksumQueue: Queue,
+  ) {}
+
+  async onModuleInit() {
+    await this.generateChecksumQueue.add({}, { jobId: randomUUID() },);
   }
 }

+ 69 - 0
server/apps/microservices/src/processors/generate-checksum.processor.ts

@@ -0,0 +1,69 @@
+import { AssetEntity } from '@app/database/entities/asset.entity';
+import { generateChecksumQueueName } from '@app/job';
+import { Process, Processor } from '@nestjs/bull';
+import { Logger } from '@nestjs/common';
+import { InjectRepository } from '@nestjs/typeorm';
+import { createHash } from 'node:crypto';
+import fs from 'node:fs';
+import { IsNull, Repository } from 'typeorm';
+
+// TODO: just temporary task to generate previous uploaded assets.
+@Processor(generateChecksumQueueName)
+export class GenerateChecksumProcessor {
+  constructor(
+    @InjectRepository(AssetEntity)
+    private assetRepository: Repository<AssetEntity>,
+  ) {}
+
+  @Process()
+  async generateChecksum() {
+    let hasNext = true;
+    let pageSize = 200;
+    let offset = 0;
+
+    while (hasNext) {
+      const assets = await this.assetRepository.find({
+        where: {
+          checksum: IsNull()
+        },
+        skip: offset,
+        take: pageSize,
+      });
+
+      if (!assets?.length) {
+        hasNext = false; // avoid using break
+      } else {
+        for (const asset of assets) {
+          try {
+            await this.generateAssetChecksum(asset);
+          } catch (err: any) {
+            Logger.error(`Error generate checksum ${err}`);
+          }
+        }
+
+        if (assets.length < pageSize) {
+          hasNext = false;
+        } else {
+          offset += pageSize;
+        }
+      }
+    }
+  }
+
+  private async generateAssetChecksum(asset: AssetEntity) {
+    if (!asset.originalPath) return;
+    if (!fs.existsSync(asset.originalPath)) return;
+
+    const fileReadStream = fs.createReadStream(asset.originalPath);
+    const sha1Hash = createHash('sha1');
+    const deferred = new Promise<Buffer>((resolve, reject) => {
+      sha1Hash.once('error', (err) => reject(err));
+      sha1Hash.once('finish', () => resolve(sha1Hash.read()));
+    });
+
+    fileReadStream.pipe(sha1Hash);
+    const checksum = await deferred;
+
+    await this.assetRepository.update(asset.id, { checksum });
+  }
+}

+ 1 - 0
server/libs/job/src/constants/queue-name.constant.ts

@@ -2,3 +2,4 @@ export const thumbnailGeneratorQueueName = 'thumbnail-generator-queue';
 export const assetUploadedQueueName = 'asset-uploaded-queue';
 export const metadataExtractionQueueName = 'metadata-extraction-queue';
 export const videoConversionQueueName = 'video-conversion-queue';
+export const generateChecksumQueueName = 'generate-checksum-queue';