Rework the video chunk decryptor stream logic
When running on Ubuntu 24 arm64 in the desktop app (didn't test on web0, trying to open certain videos fails with: > [rndr] [error] Failed to process file stream: TypeError: Failed to execute 'enqueue' on 'ReadableStreamDefaultController': Cannot enqueue a chunk into a closed readable stream While not specifically fixing that issue, I'm first rewriting this to use the more normal (recommended?) approach of implementing a pull instead of doing everything in start. Maybe that fixes the issue, otherwise at least one less ghost for me to worry about.
This commit is contained in:
parent
048aaee40d
commit
132ddd3648
1 changed files with 55 additions and 67 deletions
|
@ -318,76 +318,64 @@ class DownloadManagerImpl {
|
|||
const contentLength = +res.headers.get("Content-Length") ?? 0;
|
||||
let downloadedBytes = 0;
|
||||
|
||||
const decryptionHeader = await this.cryptoWorker.fromB64(
|
||||
file.file.decryptionHeader,
|
||||
);
|
||||
const fileKey = await this.cryptoWorker.fromB64(file.key);
|
||||
const { pullState, decryptionChunkSize } =
|
||||
await this.cryptoWorker.initChunkDecryption(
|
||||
decryptionHeader,
|
||||
fileKey,
|
||||
);
|
||||
|
||||
let leftoverBytes = new Uint8Array();
|
||||
|
||||
const stream = new ReadableStream({
|
||||
start: async (controller) => {
|
||||
try {
|
||||
const decryptionHeader = await this.cryptoWorker.fromB64(
|
||||
file.file.decryptionHeader,
|
||||
);
|
||||
const fileKey = await this.cryptoWorker.fromB64(file.key);
|
||||
const { pullState, decryptionChunkSize } =
|
||||
await this.cryptoWorker.initChunkDecryption(
|
||||
decryptionHeader,
|
||||
fileKey,
|
||||
pull: async (controller) => {
|
||||
// done is a boolean and value is an Uint8Array. When done is
|
||||
// true value will be empty, otherwise present.
|
||||
const { done, value } = await reader.read();
|
||||
|
||||
let data: Uint8Array;
|
||||
if (done) {
|
||||
data = leftoverBytes;
|
||||
} else {
|
||||
downloadedBytes += value.length;
|
||||
onDownloadProgress({
|
||||
loaded: downloadedBytes,
|
||||
total: contentLength,
|
||||
});
|
||||
|
||||
data = new Uint8Array(leftoverBytes.length + value.length);
|
||||
data.set(new Uint8Array(leftoverBytes), 0);
|
||||
data.set(new Uint8Array(value), leftoverBytes.length);
|
||||
}
|
||||
|
||||
// data.length might be a multiple of decryptionChunkSize, and
|
||||
// we might need multiple iterations to drain it all.
|
||||
|
||||
while (data && data.length >= decryptionChunkSize) {
|
||||
const { decryptedData } =
|
||||
await this.cryptoWorker.decryptFileChunk(
|
||||
data.slice(0, decryptionChunkSize),
|
||||
pullState,
|
||||
);
|
||||
controller.enqueue(decryptedData);
|
||||
data = data.slice(decryptionChunkSize);
|
||||
}
|
||||
|
||||
let data = new Uint8Array();
|
||||
let more = true;
|
||||
while (more) {
|
||||
more = false;
|
||||
|
||||
// "done" is a Boolean and value a "Uint8Array"
|
||||
const { done, value } = await reader.read();
|
||||
|
||||
// Is there more data to read?
|
||||
if (!done) {
|
||||
downloadedBytes += value.length;
|
||||
onDownloadProgress({
|
||||
loaded: downloadedBytes,
|
||||
total: contentLength,
|
||||
});
|
||||
|
||||
const buffer = new Uint8Array(
|
||||
data.length + value.length,
|
||||
);
|
||||
buffer.set(new Uint8Array(data), 0);
|
||||
buffer.set(new Uint8Array(value), data.length);
|
||||
|
||||
// Note that buffer.length might be a multiple of
|
||||
// decryptionChunkSize. We let these accumulate, and
|
||||
// drain it all with a nested while loop when done.
|
||||
|
||||
if (buffer.length > decryptionChunkSize) {
|
||||
const { decryptedData } =
|
||||
await this.cryptoWorker.decryptFileChunk(
|
||||
buffer.slice(0, decryptionChunkSize),
|
||||
pullState,
|
||||
);
|
||||
controller.enqueue(decryptedData);
|
||||
data = buffer.slice(decryptionChunkSize);
|
||||
} else {
|
||||
data = buffer;
|
||||
}
|
||||
more = true;
|
||||
} else {
|
||||
while (data && data.length) {
|
||||
const { decryptedData } =
|
||||
await this.cryptoWorker.decryptFileChunk(
|
||||
data.slice(0, decryptionChunkSize),
|
||||
pullState,
|
||||
);
|
||||
controller.enqueue(decryptedData);
|
||||
data =
|
||||
data.length > decryptionChunkSize
|
||||
? data.slice(decryptionChunkSize)
|
||||
: undefined;
|
||||
}
|
||||
controller.close();
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
log.error("Failed to process file stream", e);
|
||||
controller.error(e);
|
||||
if (done) {
|
||||
// Send off the last one, no more bytes are going to come.
|
||||
const { decryptedData } =
|
||||
await this.cryptoWorker.decryptFileChunk(
|
||||
data,
|
||||
pullState,
|
||||
);
|
||||
controller.enqueue(decryptedData);
|
||||
controller.close();
|
||||
} else {
|
||||
// Save it for the next pull.
|
||||
leftoverBytes = data;
|
||||
}
|
||||
},
|
||||
});
|
||||
|
|
Loading…
Add table
Reference in a new issue