video-transcode.processor.ts 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import { APP_UPLOAD_LOCATION } from '@app/common/constants';
  2. import { AssetEntity } from '@app/infra';
  3. import { IVideoConversionProcessor, JobName, QueueName, SystemConfigService } from '@app/domain';
  4. import { Process, Processor } from '@nestjs/bull';
  5. import { Logger } from '@nestjs/common';
  6. import { InjectRepository } from '@nestjs/typeorm';
  7. import { Job } from 'bull';
  8. import ffmpeg, { FfprobeData } from 'fluent-ffmpeg';
  9. import { existsSync, mkdirSync } from 'fs';
  10. import { Repository } from 'typeorm';
  11. @Processor(QueueName.VIDEO_CONVERSION)
  12. export class VideoTranscodeProcessor {
  13. readonly logger = new Logger(VideoTranscodeProcessor.name);
  14. constructor(
  15. @InjectRepository(AssetEntity)
  16. private assetRepository: Repository<AssetEntity>,
  17. private systemConfigService: SystemConfigService,
  18. ) {}
  19. @Process({ name: JobName.VIDEO_CONVERSION, concurrency: 2 })
  20. async videoConversion(job: Job<IVideoConversionProcessor>) {
  21. const { asset } = job.data;
  22. const basePath = APP_UPLOAD_LOCATION;
  23. const encodedVideoPath = `${basePath}/${asset.userId}/encoded-video`;
  24. if (!existsSync(encodedVideoPath)) {
  25. mkdirSync(encodedVideoPath, { recursive: true });
  26. }
  27. const savedEncodedPath = `${encodedVideoPath}/${asset.id}.mp4`;
  28. await this.runVideoEncode(asset, savedEncodedPath);
  29. }
  30. async runFFProbePipeline(asset: AssetEntity): Promise<FfprobeData> {
  31. return new Promise((resolve, reject) => {
  32. ffmpeg.ffprobe(asset.originalPath, (err, data) => {
  33. if (err || !data) {
  34. this.logger.error(`Cannot probe video ${err}`, 'runFFProbePipeline');
  35. reject(err);
  36. }
  37. resolve(data);
  38. });
  39. });
  40. }
  41. async runVideoEncode(asset: AssetEntity, savedEncodedPath: string): Promise<void> {
  42. const config = await this.systemConfigService.getConfig();
  43. if (config.ffmpeg.transcodeAll) {
  44. return this.runFFMPEGPipeLine(asset, savedEncodedPath);
  45. }
  46. const videoInfo = await this.runFFProbePipeline(asset);
  47. const videoStreams = videoInfo.streams.filter((stream) => {
  48. return stream.codec_type === 'video';
  49. });
  50. const longestVideoStream = videoStreams.sort((stream1, stream2) => {
  51. const stream1Frames = Number.parseInt(stream1.nb_frames ?? '0');
  52. const stream2Frames = Number.parseInt(stream2.nb_frames ?? '0');
  53. return stream2Frames - stream1Frames;
  54. })[0];
  55. //TODO: If video or audio are already the correct format, don't re-encode, copy the stream
  56. if (longestVideoStream.codec_name !== config.ffmpeg.targetVideoCodec) {
  57. return this.runFFMPEGPipeLine(asset, savedEncodedPath);
  58. }
  59. }
  60. async runFFMPEGPipeLine(asset: AssetEntity, savedEncodedPath: string): Promise<void> {
  61. const config = await this.systemConfigService.getConfig();
  62. return new Promise((resolve, reject) => {
  63. ffmpeg(asset.originalPath)
  64. .outputOptions([
  65. `-crf ${config.ffmpeg.crf}`,
  66. `-preset ${config.ffmpeg.preset}`,
  67. `-vcodec ${config.ffmpeg.targetVideoCodec}`,
  68. `-acodec ${config.ffmpeg.targetAudioCodec}`,
  69. `-vf scale=${config.ffmpeg.targetScaling}`,
  70. ])
  71. .output(savedEncodedPath)
  72. .on('start', () => {
  73. this.logger.log('Start Converting Video');
  74. })
  75. .on('error', (error) => {
  76. this.logger.error(`Cannot Convert Video ${error}`);
  77. reject();
  78. })
  79. .on('end', async () => {
  80. this.logger.log(`Converting Success ${asset.id}`);
  81. await this.assetRepository.update({ id: asset.id }, { encodedVideoPath: savedEncodedPath });
  82. resolve();
  83. })
  84. .run();
  85. });
  86. }
  87. }