[desktop] Fix large video conversion (#1713)

This was the last pending TODO for getting the new build at par with how
the olde one behaved.
This commit is contained in:
Manav Rathi 2024-05-13 20:19:39 +05:30 committed by GitHub
commit 8a8d240bfd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 373 additions and 232 deletions

View file

@ -69,6 +69,7 @@ import {
watchUpdateIgnoredFiles,
watchUpdateSyncedFiles,
} from "./services/watch";
import { clearConvertToMP4Results } from "./stream";
/**
* Listen for IPC events sent/invoked by the renderer process, and route them to
@ -108,6 +109,8 @@ export const attachIPCHandlers = () => {
ipcMain.on("clearStores", () => clearStores());
ipcMain.on("clearConvertToMP4Results", () => clearConvertToMP4Results());
ipcMain.handle("saveEncryptionKey", (_, encryptionKey: string) =>
saveEncryptionKey(encryptionKey),
);
@ -171,14 +174,7 @@ export const attachIPCHandlers = () => {
command: string[],
dataOrPathOrZipItem: Uint8Array | string | ZipItem,
outputFileExtension: string,
timeoutMS: number,
) =>
ffmpegExec(
command,
dataOrPathOrZipItem,
outputFileExtension,
timeoutMS,
),
) => ffmpegExec(command, dataOrPathOrZipItem, outputFileExtension),
);
// - ML

View file

@ -140,17 +140,17 @@ const checkForUpdatesAndNotify = async (mainWindow: BrowserWindow) => {
log.debug(() => "Attempting auto update");
await autoUpdater.downloadUpdate();
let timeoutId: ReturnType<typeof setTimeout>;
let timeout: ReturnType<typeof setTimeout>;
const fiveMinutes = 5 * 60 * 1000;
autoUpdater.on("update-downloaded", () => {
timeoutId = setTimeout(
timeout = setTimeout(
() => showUpdateDialog({ autoUpdatable: true, version }),
fiveMinutes,
);
});
autoUpdater.on("error", (error) => {
clearTimeout(timeoutId);
clearTimeout(timeout);
log.error("Auto update failed", error);
showUpdateDialog({ autoUpdatable: false, version });
});

View file

@ -1,11 +1,10 @@
import pathToFfmpeg from "ffmpeg-static";
import fs from "node:fs/promises";
import type { ZipItem } from "../../types/ipc";
import log from "../log";
import { ensure, withTimeout } from "../utils/common";
import { ensure } from "../utils/common";
import { execAsync } from "../utils/electron";
import {
deleteTempFile,
deleteTempFileIgnoringErrors,
makeFileForDataOrPathOrZipItem,
makeTempFilePath,
} from "../utils/temp";
@ -46,13 +45,7 @@ export const ffmpegExec = async (
command: string[],
dataOrPathOrZipItem: Uint8Array | string | ZipItem,
outputFileExtension: string,
timeoutMS: number,
): Promise<Uint8Array> => {
// TODO (MR): This currently copies files for both input (when
// dataOrPathOrZipItem is data) and output. This needs to be tested
// extremely large video files when invoked downstream of `convertToMP4` in
// the web code.
const {
path: inputFilePath,
isFileTemporary: isInputFileTemporary,
@ -69,17 +62,13 @@ export const ffmpegExec = async (
outputFilePath,
);
if (timeoutMS) await withTimeout(execAsync(cmd), timeoutMS);
else await execAsync(cmd);
await execAsync(cmd);
return fs.readFile(outputFilePath);
} finally {
try {
if (isInputFileTemporary) await deleteTempFile(inputFilePath);
await deleteTempFile(outputFilePath);
} catch (e) {
log.error("Could not clean up temp files", e);
}
if (isInputFileTemporary)
await deleteTempFileIgnoringErrors(inputFilePath);
await deleteTempFileIgnoringErrors(outputFilePath);
}
};
@ -112,3 +101,32 @@ const ffmpegBinaryPath = () => {
// https://github.com/eugeneware/ffmpeg-static/issues/16
return ensure(pathToFfmpeg).replace("app.asar", "app.asar.unpacked");
};
/**
* A variant of {@link ffmpegExec} adapted to work with streams so that it can
* handle the MP4 conversion of large video files.
*
* See: [Note: Convert to MP4]
* @param inputFilePath The path to a file on the user's local file system. This
* is the video we want to convert.
* @param inputFilePath The path to a file on the user's local file system where
* we should write the converted MP4 video.
*/
export const ffmpegConvertToMP4 = async (
inputFilePath: string,
outputFilePath: string,
): Promise<void> => {
const command = [
ffmpegPathPlaceholder,
"-i",
inputPathPlaceholder,
"-preset",
"ultrafast",
outputPathPlaceholder,
];
const cmd = substitutePlaceholders(command, inputFilePath, outputFilePath);
await execAsync(cmd);
};

View file

@ -6,7 +6,7 @@ import { CustomErrorMessage, type ZipItem } from "../../types/ipc";
import log from "../log";
import { execAsync, isDev } from "../utils/electron";
import {
deleteTempFile,
deleteTempFileIgnoringErrors,
makeFileForDataOrPathOrZipItem,
makeTempFilePath,
} from "../utils/temp";
@ -23,12 +23,8 @@ export const convertToJPEG = async (imageData: Uint8Array) => {
await execAsync(command);
return new Uint8Array(await fs.readFile(outputFilePath));
} finally {
try {
await deleteTempFile(inputFilePath);
await deleteTempFile(outputFilePath);
} catch (e) {
log.error("Could not clean up temp files", e);
}
await deleteTempFileIgnoringErrors(inputFilePath);
await deleteTempFileIgnoringErrors(outputFilePath);
}
};
@ -107,12 +103,9 @@ export const generateImageThumbnail = async (
} while (thumbnail.length > maxSize && quality > 50);
return thumbnail;
} finally {
try {
if (isInputFileTemporary) await deleteTempFile(inputFilePath);
await deleteTempFile(outputFilePath);
} catch (e) {
log.error("Could not clean up temp files", e);
}
if (isInputFileTemporary)
await deleteTempFileIgnoringErrors(inputFilePath);
await deleteTempFileIgnoringErrors(outputFilePath);
}
};

View file

@ -3,13 +3,20 @@
*/
import { net, protocol } from "electron/main";
import StreamZip from "node-stream-zip";
import { randomUUID } from "node:crypto";
import { createWriteStream, existsSync } from "node:fs";
import fs from "node:fs/promises";
import { Readable } from "node:stream";
import { ReadableStream } from "node:stream/web";
import { pathToFileURL } from "node:url";
import log from "./log";
import { ffmpegConvertToMP4 } from "./services/ffmpeg";
import { ensure } from "./utils/common";
import {
deleteTempFile,
deleteTempFileIgnoringErrors,
makeTempFilePath,
} from "./utils/temp";
/**
* Register a protocol handler that we use for streaming large files between the
@ -34,119 +41,117 @@ import { ensure } from "./utils/common";
* Depends on {@link registerPrivilegedSchemes}.
*/
export const registerStreamProtocol = () => {
protocol.handle("stream", async (request: Request) => {
const url = request.url;
// The request URL contains the command to run as the host, and the
// pathname of the file(s) as the search params.
const { host, searchParams } = new URL(url);
switch (host) {
case "read":
return handleRead(ensure(searchParams.get("path")));
case "read-zip":
return handleReadZip(
ensure(searchParams.get("zipPath")),
ensure(searchParams.get("entryName")),
);
case "write":
return handleWrite(ensure(searchParams.get("path")), request);
default:
return new Response("", { status: 404 });
protocol.handle("stream", (request: Request) => {
try {
return handleStreamRequest(request);
} catch (e) {
log.error(`Failed to handle stream request for ${request.url}`, e);
return new Response(String(e), { status: 500 });
}
});
};
const handleRead = async (path: string) => {
try {
const res = await net.fetch(pathToFileURL(path).toString());
if (res.ok) {
// net.fetch already seems to add "Content-Type" and "Last-Modified"
// headers, but I couldn't find documentation for this. In any case,
// since we already are stat-ting the file for the "Content-Length",
// we explicitly add the "X-Last-Modified-Ms" too,
//
// 1. Guaranteeing its presence,
//
// 2. Having it be in the exact format we want (no string <-> date
// conversions),
//
// 3. Retaining milliseconds.
const handleStreamRequest = async (request: Request): Promise<Response> => {
const url = request.url;
// The request URL contains the command to run as the host, and the
// pathname of the file(s) as the search params.
const { host, searchParams } = new URL(url);
switch (host) {
case "read":
return handleRead(ensure(searchParams.get("path")));
const stat = await fs.stat(path);
case "read-zip":
return handleReadZip(
ensure(searchParams.get("zipPath")),
ensure(searchParams.get("entryName")),
);
// Add the file's size as the Content-Length header.
const fileSize = stat.size;
res.headers.set("Content-Length", `${fileSize}`);
case "write":
return handleWrite(ensure(searchParams.get("path")), request);
// Add the file's last modified time (as epoch milliseconds).
const mtimeMs = stat.mtimeMs;
res.headers.set("X-Last-Modified-Ms", `${mtimeMs}`);
case "convert-to-mp4": {
const token = searchParams.get("token");
const done = searchParams.get("done") !== null;
return token
? done
? handleConvertToMP4ReadDone(token)
: handleConvertToMP4Read(token)
: handleConvertToMP4Write(request);
}
return res;
} catch (e) {
log.error(`Failed to read stream at ${path}`, e);
return new Response(`Failed to read stream: ${String(e)}`, {
status: 500,
});
default:
return new Response("", { status: 404 });
}
};
const handleRead = async (path: string) => {
const res = await net.fetch(pathToFileURL(path).toString());
if (res.ok) {
// net.fetch already seems to add "Content-Type" and "Last-Modified"
// headers, but I couldn't find documentation for this. In any case,
// since we already are stat-ting the file for the "Content-Length", we
// explicitly add the "X-Last-Modified-Ms" too,
//
// 1. Guaranteeing its presence,
//
// 2. Having it be in the exact format we want (no string <-> date
// conversions),
//
// 3. Retaining milliseconds.
const stat = await fs.stat(path);
// Add the file's size as the Content-Length header.
const fileSize = stat.size;
res.headers.set("Content-Length", `${fileSize}`);
// Add the file's last modified time (as epoch milliseconds).
const mtimeMs = stat.mtimeMs;
res.headers.set("X-Last-Modified-Ms", `${mtimeMs}`);
}
return res;
};
const handleReadZip = async (zipPath: string, entryName: string) => {
try {
const zip = new StreamZip.async({ file: zipPath });
const entry = await zip.entry(entryName);
if (!entry) return new Response("", { status: 404 });
const zip = new StreamZip.async({ file: zipPath });
const entry = await zip.entry(entryName);
if (!entry) return new Response("", { status: 404 });
// This returns an "old style" NodeJS.ReadableStream.
const stream = await zip.stream(entry);
// Convert it into a new style NodeJS.Readable.
const nodeReadable = new Readable().wrap(stream);
// Then convert it into a Web stream.
const webReadableStreamAny = Readable.toWeb(nodeReadable);
// However, we get a ReadableStream<any> now. This doesn't go into the
// `BodyInit` expected by the Response constructor, which wants a
// ReadableStream<Uint8Array>. Force a cast.
const webReadableStream =
webReadableStreamAny as ReadableStream<Uint8Array>;
// This returns an "old style" NodeJS.ReadableStream.
const stream = await zip.stream(entry);
// Convert it into a new style NodeJS.Readable.
const nodeReadable = new Readable().wrap(stream);
// Then convert it into a Web stream.
const webReadableStreamAny = Readable.toWeb(nodeReadable);
// However, we get a ReadableStream<any> now. This doesn't go into the
// `BodyInit` expected by the Response constructor, which wants a
// ReadableStream<Uint8Array>. Force a cast.
const webReadableStream =
webReadableStreamAny as ReadableStream<Uint8Array>;
// Close the zip handle when the underlying stream closes.
stream.on("end", () => void zip.close());
// Close the zip handle when the underlying stream closes.
stream.on("end", () => void zip.close());
return new Response(webReadableStream, {
headers: {
// We don't know the exact type, but it doesn't really matter,
// just set it to a generic binary content-type so that the
// browser doesn't tinker with it thinking of it as text.
"Content-Type": "application/octet-stream",
"Content-Length": `${entry.size}`,
// While it is documented that entry.time is the modification
// time, the units are not mentioned. By seeing the source code,
// we can verify that it is indeed epoch milliseconds. See
// `parseZipTime` in the node-stream-zip source,
// https://github.com/antelle/node-stream-zip/blob/master/node_stream_zip.js
"X-Last-Modified-Ms": `${entry.time}`,
},
});
} catch (e) {
log.error(
`Failed to read entry ${entryName} from zip file at ${zipPath}`,
e,
);
return new Response(`Failed to read stream: ${String(e)}`, {
status: 500,
});
}
return new Response(webReadableStream, {
headers: {
// We don't know the exact type, but it doesn't really matter, just
// set it to a generic binary content-type so that the browser
// doesn't tinker with it thinking of it as text.
"Content-Type": "application/octet-stream",
"Content-Length": `${entry.size}`,
// While it is documented that entry.time is the modification time,
// the units are not mentioned. By seeing the source code, we can
// verify that it is indeed epoch milliseconds. See `parseZipTime`
// in the node-stream-zip source,
// https://github.com/antelle/node-stream-zip/blob/master/node_stream_zip.js
"X-Last-Modified-Ms": `${entry.time}`,
},
});
};
const handleWrite = async (path: string, request: Request) => {
try {
await writeStream(path, ensure(request.body));
return new Response("", { status: 200 });
} catch (e) {
log.error(`Failed to write stream to ${path}`, e);
return new Response(`Failed to write stream: ${String(e)}`, {
status: 500,
});
}
await writeStream(path, ensure(request.body));
return new Response("", { status: 200 });
};
/**
@ -154,7 +159,7 @@ const handleWrite = async (path: string, request: Request) => {
*
* The returned promise resolves when the write completes.
*
* @param filePath The local filesystem path where the file should be written.
* @param filePath The local file system path where the file should be written.
*
* @param readableStream A web
* [ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream).
@ -181,3 +186,84 @@ const writeNodeStream = async (filePath: string, fileStream: Readable) => {
});
});
};
/**
* A map from token to file paths for convert-to-mp4 requests that we have
* received.
*/
const convertToMP4Results = new Map<string, string>();
/**
* Clear any in-memory state for in-flight convert-to-mp4 requests. Meant to be
* called during logout.
*/
export const clearConvertToMP4Results = () => convertToMP4Results.clear();
/**
* [Note: Convert to MP4]
*
* When we want to convert a video to MP4, if we were to send the entire
* contents of the video from the renderer to the main process over IPC, it just
* causes the renderer to run out of memory and restart when the videos are very
* large. So we need to stream the original video renderer main and then
* stream back the converted video renderer main.
*
* Currently Chromium does not support bi-directional streaming ("full" duplex
* mode for the Web fetch API). So we need to simulate that using two different
* streaming requests.
*
* renderer main stream://convert-to-mp4
* request.body is the original video
* response is a token
*
* renderer main stream://convert-to-mp4?token=<token>
* response.body is the converted video
*
* renderer main stream://convert-to-mp4?token=<token>&done
* 200 OK
*
* Note that the conversion itself is not streaming. The conversion still
* happens in a single shot, we are just streaming the data across the IPC
* boundary to allow us to pass large amounts of data without running out of
* memory.
*
* See also: [Note: IPC streams]
*/
const handleConvertToMP4Write = async (request: Request) => {
const inputTempFilePath = await makeTempFilePath();
await writeStream(inputTempFilePath, ensure(request.body));
const outputTempFilePath = await makeTempFilePath("mp4");
try {
await ffmpegConvertToMP4(inputTempFilePath, outputTempFilePath);
} catch (e) {
log.error("Conversion to MP4 failed", e);
await deleteTempFileIgnoringErrors(outputTempFilePath);
throw e;
} finally {
await deleteTempFileIgnoringErrors(inputTempFilePath);
}
const token = randomUUID();
convertToMP4Results.set(token, outputTempFilePath);
return new Response(token, { status: 200 });
};
const handleConvertToMP4Read = async (token: string) => {
const filePath = convertToMP4Results.get(token);
if (!filePath)
return new Response(`Unknown token ${token}`, { status: 404 });
return net.fetch(pathToFileURL(filePath).toString());
};
const handleConvertToMP4ReadDone = async (token: string) => {
const filePath = convertToMP4Results.get(token);
if (!filePath)
return new Response(`Unknown token ${token}`, { status: 404 });
await deleteTempFile(filePath);
convertToMP4Results.delete(token);
return new Response("", { status: 200 });
};

View file

@ -13,32 +13,3 @@ export const ensure = <T>(v: T | null | undefined): T => {
if (v === undefined) throw new Error("Required value was not found");
return v;
};
/**
* Wait for {@link ms} milliseconds
*
* This function is a promisified `setTimeout`. It returns a promise that
* resolves after {@link ms} milliseconds.
*/
export const wait = (ms: number) =>
new Promise((resolve) => setTimeout(resolve, ms));
/**
* Await the given {@link promise} for {@link timeoutMS} milliseconds. If it
* does not resolve within {@link timeoutMS}, then reject with a timeout error.
*/
export const withTimeout = async <T>(promise: Promise<T>, ms: number) => {
let timeoutId: ReturnType<typeof setTimeout>;
const rejectOnTimeout = new Promise<T>((_, reject) => {
timeoutId = setTimeout(
() => reject(new Error("Operation timed out")),
ms,
);
});
const promiseAndCancelTimeout = async () => {
const result = await promise;
clearTimeout(timeoutId);
return result;
};
return Promise.race([promiseAndCancelTimeout(), rejectOnTimeout]);
};

View file

@ -49,12 +49,12 @@ export const posixPath = (platformPath: string) =>
* > output, this might not be the best option and it might be better to use the
* > underlying functions.
*/
export const execAsync = (command: string | string[]) => {
export const execAsync = async (command: string | string[]) => {
const escapedCommand = Array.isArray(command)
? shellescape(command)
: command;
const startTime = Date.now();
const result = execAsync_(escapedCommand);
const result = await execAsync_(escapedCommand);
log.debug(
() => `${escapedCommand} (${Math.round(Date.now() - startTime)} ms)`,
);

View file

@ -4,6 +4,7 @@ import { existsSync } from "node:fs";
import fs from "node:fs/promises";
import path from "node:path";
import type { ZipItem } from "../../types/ipc";
import log from "../log";
import { ensure } from "./common";
/**
@ -62,6 +63,19 @@ export const deleteTempFile = async (tempFilePath: string) => {
await fs.rm(tempFilePath, { force: true });
};
/**
* A variant of {@link deleteTempFile} that supresses any errors, making it
* safe to call them in a sequence without needing to handle the scenario where
* one of them failing causes the rest to be skipped.
*/
export const deleteTempFileIgnoringErrors = async (tempFilePath: string) => {
try {
await deleteTempFile(tempFilePath);
} catch (e) {
log.error(`Could not delete temporary file at path ${tempFilePath}`, e);
}
};
/** The result of {@link makeFileForDataOrPathOrZipItem}. */
interface FileForDataOrPathOrZipItem {
/**

View file

@ -65,6 +65,9 @@ const selectDirectory = () => ipcRenderer.invoke("selectDirectory");
const clearStores = () => ipcRenderer.send("clearStores");
const clearConvertToMP4Results = () =>
ipcRenderer.send("clearConvertToMP4Results");
const encryptionKey = () => ipcRenderer.invoke("encryptionKey");
const saveEncryptionKey = (encryptionKey: string) =>
@ -140,14 +143,12 @@ const ffmpegExec = (
command: string[],
dataOrPathOrZipItem: Uint8Array | string | ZipItem,
outputFileExtension: string,
timeoutMS: number,
) =>
ipcRenderer.invoke(
"ffmpegExec",
command,
dataOrPathOrZipItem,
outputFileExtension,
timeoutMS,
);
// - ML
@ -308,6 +309,7 @@ contextBridge.exposeInMainWorld("electron", {
openLogDirectory,
selectDirectory,
clearStores,
clearConvertToMP4Results,
encryptionKey,
saveEncryptionKey,
onMainWindowFocus,

View file

@ -576,7 +576,6 @@ async function getPlayableVideo(
if (!forceConvert && !runOnWeb && !isElectron()) {
return null;
}
// TODO(MR): This might not work for very large (~ GB) videos. Test.
log.info(`Converting video ${videoNameTitle} to mp4`);
const convertedVideoData = await ffmpeg.convertToMP4(videoBlob);
return new Blob([convertedVideoData], { type: "video/mp4" });

View file

@ -9,6 +9,11 @@ import {
} from "constants/ffmpeg";
import { NULL_LOCATION } from "constants/upload";
import type { ParsedExtractedMetadata } from "types/metadata";
import {
readConvertToMP4Done,
readConvertToMP4Stream,
writeConvertToMP4Stream,
} from "utils/native-stream";
import type { DedicatedFFmpegWorker } from "worker/ffmpeg.worker";
import {
toDataOrPathOrZipEntry,
@ -31,7 +36,7 @@ import {
*/
export const generateVideoThumbnailWeb = async (blob: Blob) =>
_generateVideoThumbnail((seekTime: number) =>
ffmpegExecWeb(makeGenThumbnailCommand(seekTime), blob, "jpeg", 0),
ffmpegExecWeb(makeGenThumbnailCommand(seekTime), blob, "jpeg"),
);
const _generateVideoThumbnail = async (
@ -70,7 +75,6 @@ export const generateVideoThumbnailNative = async (
makeGenThumbnailCommand(seekTime),
toDataOrPathOrZipEntry(desktopUploadItem),
"jpeg",
0,
),
);
@ -98,8 +102,8 @@ const makeGenThumbnailCommand = (seekTime: number) => [
* of videos that the user is uploading.
*
* @param uploadItem A {@link File}, or the absolute path to a file on the
* user's local filesytem. A path can only be provided when we're running in the
* context of our desktop app.
* user's local file sytem. A path can only be provided when we're running in
* the context of our desktop app.
*/
export const extractVideoMetadata = async (
uploadItem: UploadItem,
@ -107,12 +111,11 @@ export const extractVideoMetadata = async (
const command = extractVideoMetadataCommand;
const outputData =
uploadItem instanceof File
? await ffmpegExecWeb(command, uploadItem, "txt", 0)
? await ffmpegExecWeb(command, uploadItem, "txt")
: await electron.ffmpegExec(
command,
toDataOrPathOrZipEntry(uploadItem),
"txt",
0,
);
return parseFFmpegExtractedMetadata(outputData);
@ -219,10 +222,9 @@ const ffmpegExecWeb = async (
command: string[],
blob: Blob,
outputFileExtension: string,
timeoutMs: number,
) => {
const worker = await workerFactory.lazy();
return await worker.exec(command, blob, outputFileExtension, timeoutMs);
return await worker.exec(command, blob, outputFileExtension);
};
/**
@ -234,61 +236,46 @@ const ffmpegExecWeb = async (
*
* @param blob The video blob.
*
* @returns The mp4 video data.
* @returns The mp4 video blob.
*/
export const convertToMP4 = async (blob: Blob) =>
ffmpegExecNativeOrWeb(
[
export const convertToMP4 = async (blob: Blob): Promise<Blob | Uint8Array> => {
const electron = globalThis.electron;
if (electron) {
return convertToMP4Native(electron, blob);
} else {
const command = [
ffmpegPathPlaceholder,
"-i",
inputPathPlaceholder,
"-preset",
"ultrafast",
outputPathPlaceholder,
],
blob,
"mp4",
30 * 1000,
);
];
return ffmpegExecWeb(command, blob, "mp4");
}
};
/**
* Run the given FFmpeg command using a native FFmpeg binary when we're running
* in the context of our desktop app, otherwise using the browser based wasm
* FFmpeg implemenation.
*
* See also: {@link ffmpegExecWeb}.
*/
const ffmpegExecNativeOrWeb = async (
command: string[],
blob: Blob,
outputFileExtension: string,
timeoutMs: number,
) => {
const electron = globalThis.electron;
if (electron)
return electron.ffmpegExec(
command,
new Uint8Array(await blob.arrayBuffer()),
outputFileExtension,
timeoutMs,
);
else return ffmpegExecWeb(command, blob, outputFileExtension, timeoutMs);
const convertToMP4Native = async (electron: Electron, blob: Blob) => {
const token = await writeConvertToMP4Stream(electron, blob);
const mp4Blob = await readConvertToMP4Stream(electron, token);
readConvertToMP4Done(electron, token);
return mp4Blob;
};
/** Lazily create a singleton instance of our worker */
class WorkerFactory {
private instance: Promise<Remote<DedicatedFFmpegWorker>>;
private createComlinkWorker = () =>
new ComlinkWorker<typeof DedicatedFFmpegWorker>(
"ffmpeg-worker",
new Worker(new URL("worker/ffmpeg.worker.ts", import.meta.url)),
);
async lazy() {
if (!this.instance) this.instance = createComlinkWorker().remote;
if (!this.instance) this.instance = this.createComlinkWorker().remote;
return this.instance;
}
}
const workerFactory = new WorkerFactory();
const createComlinkWorker = () =>
new ComlinkWorker<typeof DedicatedFFmpegWorker>(
"ffmpeg-worker",
new Worker(new URL("worker/ffmpeg.worker.ts", import.meta.url)),
);

View file

@ -111,7 +111,79 @@ export const writeStream = async (
const res = await fetch(req);
if (!res.ok)
throw new Error(
`Failed to write stream to ${path}: HTTP ${res.status}`,
);
throw new Error(`Failed to write stream to ${url}: HTTP ${res.status}`);
};
/**
* Variant of {@link writeStream} tailored for video conversion.
*
* @param blob The video to convert.
*
* @returns a token that can then be passed to {@link readConvertToMP4Stream} to
* read back the converted video. See: [Note: Convert to MP4].
*/
export const writeConvertToMP4Stream = async (_: Electron, blob: Blob) => {
const url = "stream://convert-to-mp4";
const req = new Request(url, {
method: "POST",
body: blob,
// @ts-expect-error TypeScript's libdom.d.ts does not include the
// "duplex" parameter, e.g. see
// https://github.com/node-fetch/node-fetch/issues/1769.
duplex: "half",
});
const res = await fetch(req);
if (!res.ok)
throw new Error(`Failed to write stream to ${url}: HTTP ${res.status}`);
const token = res.text();
return token;
};
/**
* Variant of {@link readStream} tailored for video conversion.
*
* @param token A token obtained from {@link writeConvertToMP4Stream}.
*
* @returns the contents of the converted video. See: [Note: Convert to MP4].
*/
export const readConvertToMP4Stream = async (
_: Electron,
token: string,
): Promise<Blob> => {
const params = new URLSearchParams({ token });
const url = new URL(`stream://convert-to-mp4?${params.toString()}`);
const req = new Request(url, { method: "GET" });
const res = await fetch(req);
if (!res.ok)
throw new Error(
`Failed to read stream from ${url}: HTTP ${res.status}`,
);
return res.blob();
};
/**
* Sibling of {@link readConvertToMP4Stream} to let the native side know when we
* are done reading the response, and they can dispose any temporary resources
* it was using.
*
* @param token A token obtained from {@link writeConvertToMP4Stream}.
*/
export const readConvertToMP4Done = async (
_: Electron,
token: string,
): Promise<void> => {
// The value for `done` is arbitrary, only its presence matters.
const params = new URLSearchParams({ token, done: "1" });
const url = new URL(`stream://convert-to-mp4?${params.toString()}`);
const req = new Request(url, { method: "GET" });
const res = await fetch(req);
if (!res.ok)
throw new Error(`Failed to close stream at ${url}: HTTP ${res.status}`);
};

View file

@ -1,5 +1,4 @@
import log from "@/next/log";
import { withTimeout } from "@/utils/promise";
import QueueProcessor from "@ente/shared/utils/queueProcessor";
import { expose } from "comlink";
import {
@ -48,15 +47,11 @@ export class DedicatedFFmpegWorker {
command: string[],
blob: Blob,
outputFileExtension: string,
timeoutMs,
): Promise<Uint8Array> {
if (!this.ffmpeg.isLoaded()) await this.ffmpeg.load();
const go = () =>
ffmpegExec(this.ffmpeg, command, outputFileExtension, blob);
const request = this.ffmpegTaskQueue.queueUpRequest(() =>
timeoutMs ? withTimeout(go(), timeoutMs) : go(),
ffmpegExec(this.ffmpeg, command, outputFileExtension, blob),
);
return await request.promise;

View file

@ -47,6 +47,11 @@ export const logoutUser = async () => {
} catch (e) {
log.error("Ignoring error when resetting native folder watches", e);
}
try {
await electron.clearConvertToMP4Results();
} catch (e) {
log.error("Ignoring error when clearing convert-to-mp4 results", e);
}
try {
await electron.clearStores();
} catch (e) {

View file

@ -67,10 +67,17 @@ export interface Electron {
* Clear any stored data.
*
* This is a coarse single shot cleanup, meant for use in clearing any
* Electron side state during logout.
* persisted Electron side state during logout.
*/
clearStores: () => void;
/**
* Clear an state corresponding to in-flight convert-to-mp4 requests.
*
* This is meant for use during logout.
*/
clearConvertToMP4Results: () => void;
/**
* Return the previously saved encryption key from persistent safe storage.
*
@ -260,7 +267,7 @@ export interface Electron {
* This executes the command using a FFmpeg executable we bundle with our
* desktop app. We also have a wasm FFmpeg wasm implementation that we use
* when running on the web, which has a sibling function with the same
* parameters. See [Note: ffmpeg in Electron].
* parameters. See [Note:FFmpeg in Electron].
*
* @param command An array of strings, each representing one positional
* parameter in the command to execute. Placeholders for the input, output
@ -280,9 +287,6 @@ export interface Electron {
* just return its contents, for some FFmpeg command the extension matters
* (e.g. conversion to a JPEG fails if the extension is arbitrary).
*
* @param timeoutMS If non-zero, then abort and throw a timeout error if the
* ffmpeg command takes more than the given number of milliseconds.
*
* @returns The contents of the output file produced by the ffmpeg command
* (specified as {@link outputPathPlaceholder} in {@link command}).
*/
@ -290,7 +294,6 @@ export interface Electron {
command: string[],
dataOrPathOrZipItem: Uint8Array | string | ZipItem,
outputFileExtension: string,
timeoutMS: number,
) => Promise<Uint8Array>;
// - ML