|
@@ -1,14 +1,12 @@
|
|
|
import { APP_UPLOAD_LOCATION } from '@app/common/constants';
|
|
|
import { AssetEntity } from '@app/infra';
|
|
|
-import { QueueName, JobName } from '@app/domain';
|
|
|
-import { IMp4ConversionProcessor } from '@app/domain';
|
|
|
+import { IVideoConversionProcessor, JobName, QueueName, SystemConfigService } from '@app/domain';
|
|
|
import { Process, Processor } from '@nestjs/bull';
|
|
|
import { Logger } from '@nestjs/common';
|
|
|
import { InjectRepository } from '@nestjs/typeorm';
|
|
|
import { Job } from 'bull';
|
|
|
-import ffmpeg from 'fluent-ffmpeg';
|
|
|
+import ffmpeg, { FfprobeData } from 'fluent-ffmpeg';
|
|
|
import { existsSync, mkdirSync } from 'fs';
|
|
|
-import { SystemConfigService } from '@app/domain';
|
|
|
import { Repository } from 'typeorm';
|
|
|
|
|
|
@Processor(QueueName.VIDEO_CONVERSION)
|
|
@@ -19,24 +17,60 @@ export class VideoTranscodeProcessor {
|
|
|
private systemConfigService: SystemConfigService,
|
|
|
) {}
|
|
|
|
|
|
- @Process({ name: JobName.MP4_CONVERSION, concurrency: 2 })
|
|
|
- async mp4Conversion(job: Job<IMp4ConversionProcessor>) {
|
|
|
+ @Process({ name: JobName.VIDEO_CONVERSION, concurrency: 2 })
|
|
|
+ async videoConversion(job: Job<IVideoConversionProcessor>) {
|
|
|
const { asset } = job.data;
|
|
|
|
|
|
- if (asset.mimeType != 'video/mp4') {
|
|
|
- const basePath = APP_UPLOAD_LOCATION;
|
|
|
- const encodedVideoPath = `${basePath}/${asset.userId}/encoded-video`;
|
|
|
+ const basePath = APP_UPLOAD_LOCATION;
|
|
|
+ const encodedVideoPath = `${basePath}/${asset.userId}/encoded-video`;
|
|
|
|
|
|
- if (!existsSync(encodedVideoPath)) {
|
|
|
- mkdirSync(encodedVideoPath, { recursive: true });
|
|
|
- }
|
|
|
+ if (!existsSync(encodedVideoPath)) {
|
|
|
+ mkdirSync(encodedVideoPath, { recursive: true });
|
|
|
+ }
|
|
|
+
|
|
|
+ const savedEncodedPath = `${encodedVideoPath}/${asset.id}.mp4`;
|
|
|
+
|
|
|
+ if (!asset.encodedVideoPath) {
|
|
|
+ // Put the processing into its own async function to prevent the job exist right away
|
|
|
+ await this.runVideoEncode(asset, savedEncodedPath);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async runFFProbePipeline(asset: AssetEntity): Promise<FfprobeData> {
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ ffmpeg.ffprobe(asset.originalPath, (err, data) => {
|
|
|
+ if (err || !data) {
|
|
|
+ Logger.error(`Cannot probe video ${err}`, 'mp4Conversion');
|
|
|
+ reject(err);
|
|
|
+ }
|
|
|
+
|
|
|
+ resolve(data);
|
|
|
+ });
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ async runVideoEncode(asset: AssetEntity, savedEncodedPath: string): Promise<void> {
|
|
|
+ const config = await this.systemConfigService.getConfig();
|
|
|
+
|
|
|
+ if (config.ffmpeg.transcodeAll) {
|
|
|
+ return this.runFFMPEGPipeLine(asset, savedEncodedPath);
|
|
|
+ }
|
|
|
+
|
|
|
+ const videoInfo = await this.runFFProbePipeline(asset);
|
|
|
+
|
|
|
+ const videoStreams = videoInfo.streams.filter((stream) => {
|
|
|
+ return stream.codec_type === 'video';
|
|
|
+ });
|
|
|
|
|
|
- const savedEncodedPath = encodedVideoPath + '/' + asset.id + '.mp4';
|
|
|
+ const longestVideoStream = videoStreams.sort((stream1, stream2) => {
|
|
|
+ const stream1Frames = Number.parseInt(stream1.nb_frames ?? '0');
|
|
|
+ const stream2Frames = Number.parseInt(stream2.nb_frames ?? '0');
|
|
|
+ return stream2Frames - stream1Frames;
|
|
|
+ })[0];
|
|
|
|
|
|
- if (asset.encodedVideoPath == '' || !asset.encodedVideoPath) {
|
|
|
- // Put the processing into its own async function to prevent the job exist right away
|
|
|
- await this.runFFMPEGPipeLine(asset, savedEncodedPath);
|
|
|
- }
|
|
|
+ //TODO: If video or audio are already the correct format, don't re-encode, copy the stream
|
|
|
+ if (longestVideoStream.codec_name !== config.ffmpeg.targetVideoCodec) {
|
|
|
+ return this.runFFMPEGPipeLine(asset, savedEncodedPath);
|
|
|
}
|
|
|
}
|
|
|
|