EXtract common queueProcessor

This commit is contained in:
Manav Rathi 2024-03-23 18:49:22 +05:30
parent 7704b902c4
commit b1d0909675
No known key found for this signature in database
7 changed files with 8 additions and 94 deletions

View file

@ -1,9 +1,9 @@
import { addLogLine } from "@ente/shared/logging";
import { promiseWithTimeout } from "@ente/shared/promise";
import { logError } from "@ente/shared/sentry";
import QueueProcessor from "@ente/shared/utils/queueProcessor";
import { generateTempName } from "@ente/shared/utils/temp";
import { createFFmpeg, FFmpeg } from "ffmpeg-wasm";
import QueueProcessor from "services/queueProcessor";
import { getUint8ArrayView } from "services/readerService";
const INPUT_PATH_PLACEHOLDER = "INPUT";

View file

@ -2,8 +2,8 @@ import { CustomError } from "@ente/shared/error";
import { addLogLine } from "@ente/shared/logging";
import { retryAsyncFunction } from "@ente/shared/promise";
import { logError } from "@ente/shared/sentry";
import QueueProcessor from "@ente/shared/utils/queueProcessor";
import { convertBytesToHumanReadable } from "@ente/shared/utils/size";
import QueueProcessor from "services/queueProcessor";
import { getDedicatedConvertWorker } from "utils/comlink/ComlinkConvertWorker";
import { ComlinkWorker } from "utils/comlink/comlinkWorker";
import { DedicatedConvertWorker } from "worker/convert.worker";

View file

@ -42,6 +42,10 @@ import { CustomError } from "@ente/shared/error";
import { Events, eventBus } from "@ente/shared/events";
import { addLogLine } from "@ente/shared/logging";
import { User } from "@ente/shared/user/types";
import QueueProcessor, {
CancellationStatus,
RequestCanceller,
} from "@ente/shared/utils/queueProcessor";
import { ExportStage } from "constants/export";
import { FILE_TYPE } from "constants/file";
import { Collection } from "types/collection";
@ -56,10 +60,6 @@ import {
getCollectionUserFacingName,
getNonEmptyPersonalCollections,
} from "utils/collection";
import QueueProcessor, {
CancellationStatus,
RequestCanceller,
} from "../queueProcessor";
import { migrateExport } from "./migration";
const EXPORT_RECORD_FILE_NAME = "export_status.json";

View file

@ -1,86 +0,0 @@
import { CustomError } from "@ente/shared/error";
interface RequestQueueItem {
request: (canceller?: RequestCanceller) => Promise<any>;
successCallback: (response: any) => void;
failureCallback: (error: Error) => void;
isCanceled: { status: boolean };
canceller: { exec: () => void };
}
export enum PROCESSING_STRATEGY {
FIFO,
LIFO,
}
export interface RequestCanceller {
exec: () => void;
}
export interface CancellationStatus {
status: boolean;
}
export default class QueueProcessor<T> {
private requestQueue: RequestQueueItem[] = [];
private requestInProcessing = 0;
constructor(
private maxParallelProcesses: number,
private processingStrategy = PROCESSING_STRATEGY.FIFO,
) {}
public queueUpRequest(
request: (canceller?: RequestCanceller) => Promise<T>,
) {
const isCanceled: CancellationStatus = { status: false };
const canceller: RequestCanceller = {
exec: () => {
isCanceled.status = true;
},
};
const promise = new Promise<T>((resolve, reject) => {
this.requestQueue.push({
request,
successCallback: resolve,
failureCallback: reject,
isCanceled,
canceller,
});
this.pollQueue();
});
return { promise, canceller };
}
private async pollQueue() {
if (this.requestInProcessing < this.maxParallelProcesses) {
this.requestInProcessing++;
this.processQueue();
}
}
private async processQueue() {
while (this.requestQueue.length > 0) {
const queueItem =
this.processingStrategy === PROCESSING_STRATEGY.LIFO
? this.requestQueue.pop()
: this.requestQueue.shift();
let response = null;
if (queueItem.isCanceled.status) {
queueItem.failureCallback(Error(CustomError.REQUEST_CANCELLED));
} else {
try {
response = await queueItem.request(queueItem.canceller);
queueItem.successCallback(response);
} catch (e) {
queueItem.failureCallback(e);
}
}
}
this.requestInProcessing--;
}
}

View file

@ -2,7 +2,7 @@ import { addLogLine } from "@ente/shared/logging";
import { logError } from "@ente/shared/sentry";
import { generateTempName } from "@ente/shared/utils/temp";
import { createFFmpeg, FFmpeg } from "ffmpeg-wasm";
import QueueProcessor from "services/queueProcessor";
import QueueProcessor from "@ente/shared/utils/queueProcessor";
import { getUint8ArrayView } from "services/readerService";
import { promiseWithTimeout } from "utils/common";

View file

@ -2,9 +2,9 @@ import { CustomError } from "@ente/shared/error";
import { addLogLine } from "@ente/shared/logging";
import { retryAsyncFunction } from "@ente/shared/promise";
import { logError } from "@ente/shared/sentry";
import QueueProcessor from "@ente/shared/utils/queueProcessor";
import { convertBytesToHumanReadable } from "@ente/shared/utils/size";
import { ComlinkWorker } from "@ente/shared/worker/comlinkWorker";
import QueueProcessor from "services/queueProcessor";
import { getDedicatedConvertWorker } from "utils/comlink/ComlinkConvertWorker";
import { DedicatedConvertWorker } from "worker/convert.worker";