generate-checksum.processor.ts 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import { AssetEntity } from '@app/database/entities/asset.entity';
  2. import { QueueNameEnum } from '@app/job';
  3. import { Process, Processor } from '@nestjs/bull';
  4. import { Logger } from '@nestjs/common';
  5. import { InjectRepository } from '@nestjs/typeorm';
  6. import { createHash } from 'node:crypto';
  7. import fs from 'node:fs';
  8. import { FindOptionsWhere, IsNull, MoreThan, QueryFailedError, Repository } from 'typeorm';
  9. // TODO: just temporary task to generate previous uploaded assets.
  10. @Processor(QueueNameEnum.CHECKSUM_GENERATION)
  11. export class GenerateChecksumProcessor {
  12. constructor(
  13. @InjectRepository(AssetEntity)
  14. private assetRepository: Repository<AssetEntity>,
  15. ) {}
  16. @Process()
  17. async generateChecksum() {
  18. const pageSize = 200;
  19. let hasNext = true;
  20. let lastErrAssetId: string | undefined = undefined;
  21. while (hasNext) {
  22. const whereStat: FindOptionsWhere<AssetEntity> = {
  23. checksum: IsNull(),
  24. };
  25. if (lastErrAssetId) {
  26. whereStat.id = MoreThan(lastErrAssetId);
  27. }
  28. const assets = await this.assetRepository.find({
  29. where: whereStat,
  30. take: pageSize,
  31. order: { id: 'ASC' },
  32. });
  33. if (!assets?.length) {
  34. hasNext = false; // avoid using break
  35. } else {
  36. for (const asset of assets) {
  37. try {
  38. await this.generateAssetChecksum(asset);
  39. } catch (err: any) {
  40. lastErrAssetId = asset.id;
  41. if (err instanceof QueryFailedError && (err as any).constraint === 'UQ_userid_checksum') {
  42. Logger.error(`${asset.originalPath} duplicated`);
  43. } else {
  44. Logger.error(`checksum generation ${err}`);
  45. }
  46. }
  47. }
  48. // break when reach to the last page
  49. if (assets.length < pageSize) {
  50. hasNext = false;
  51. }
  52. }
  53. }
  54. Logger.log(`checksum generation done!`);
  55. }
  56. private async generateAssetChecksum(asset: AssetEntity) {
  57. if (!asset.originalPath) return;
  58. if (!fs.existsSync(asset.originalPath)) return;
  59. const fileReadStream = fs.createReadStream(asset.originalPath);
  60. const sha1Hash = createHash('sha1');
  61. const deferred = new Promise<Buffer>((resolve, reject) => {
  62. sha1Hash.once('error', (err) => reject(err));
  63. sha1Hash.once('finish', () => resolve(sha1Hash.read()));
  64. });
  65. fileReadStream.pipe(sha1Hash);
  66. const checksum = await deferred;
  67. await this.assetRepository.update(asset.id, { checksum });
  68. }
  69. }