diff --git a/web/apps/cast/src/services/wasm/ffmpeg.ts b/web/apps/cast/src/services/wasm/ffmpeg.ts index bd567f07e..50ab5a5a9 100644 --- a/web/apps/cast/src/services/wasm/ffmpeg.ts +++ b/web/apps/cast/src/services/wasm/ffmpeg.ts @@ -1,9 +1,9 @@ import { addLogLine } from "@ente/shared/logging"; import { promiseWithTimeout } from "@ente/shared/promise"; import { logError } from "@ente/shared/sentry"; +import QueueProcessor from "@ente/shared/utils/queueProcessor"; import { generateTempName } from "@ente/shared/utils/temp"; import { createFFmpeg, FFmpeg } from "ffmpeg-wasm"; -import QueueProcessor from "services/queueProcessor"; import { getUint8ArrayView } from "services/readerService"; const INPUT_PATH_PLACEHOLDER = "INPUT"; diff --git a/web/apps/cast/src/services/wasmHeicConverter/wasmHEICConverterService.ts b/web/apps/cast/src/services/wasmHeicConverter/wasmHEICConverterService.ts index c6d33b772..42790e515 100644 --- a/web/apps/cast/src/services/wasmHeicConverter/wasmHEICConverterService.ts +++ b/web/apps/cast/src/services/wasmHeicConverter/wasmHEICConverterService.ts @@ -2,8 +2,8 @@ import { CustomError } from "@ente/shared/error"; import { addLogLine } from "@ente/shared/logging"; import { retryAsyncFunction } from "@ente/shared/promise"; import { logError } from "@ente/shared/sentry"; +import QueueProcessor from "@ente/shared/utils/queueProcessor"; import { convertBytesToHumanReadable } from "@ente/shared/utils/size"; -import QueueProcessor from "services/queueProcessor"; import { getDedicatedConvertWorker } from "utils/comlink/ComlinkConvertWorker"; import { ComlinkWorker } from "utils/comlink/comlinkWorker"; import { DedicatedConvertWorker } from "worker/convert.worker"; diff --git a/web/apps/photos/src/services/export/index.ts b/web/apps/photos/src/services/export/index.ts index df17a6e79..dea10d82e 100644 --- a/web/apps/photos/src/services/export/index.ts +++ b/web/apps/photos/src/services/export/index.ts @@ -42,6 +42,10 @@ import { CustomError } from "@ente/shared/error"; import { Events, eventBus } from "@ente/shared/events"; import { addLogLine } from "@ente/shared/logging"; import { User } from "@ente/shared/user/types"; +import QueueProcessor, { + CancellationStatus, + RequestCanceller, +} from "@ente/shared/utils/queueProcessor"; import { ExportStage } from "constants/export"; import { FILE_TYPE } from "constants/file"; import { Collection } from "types/collection"; @@ -56,10 +60,6 @@ import { getCollectionUserFacingName, getNonEmptyPersonalCollections, } from "utils/collection"; -import QueueProcessor, { - CancellationStatus, - RequestCanceller, -} from "../queueProcessor"; import { migrateExport } from "./migration"; const EXPORT_RECORD_FILE_NAME = "export_status.json"; diff --git a/web/apps/photos/src/services/queueProcessor.ts b/web/apps/photos/src/services/queueProcessor.ts deleted file mode 100644 index 8e70c4a7f..000000000 --- a/web/apps/photos/src/services/queueProcessor.ts +++ /dev/null @@ -1,86 +0,0 @@ -import { CustomError } from "@ente/shared/error"; - -interface RequestQueueItem { - request: (canceller?: RequestCanceller) => Promise; - successCallback: (response: any) => void; - failureCallback: (error: Error) => void; - isCanceled: { status: boolean }; - canceller: { exec: () => void }; -} - -export enum PROCESSING_STRATEGY { - FIFO, - LIFO, -} - -export interface RequestCanceller { - exec: () => void; -} - -export interface CancellationStatus { - status: boolean; -} - -export default class QueueProcessor { - private requestQueue: RequestQueueItem[] = []; - - private requestInProcessing = 0; - - constructor( - private maxParallelProcesses: number, - private processingStrategy = PROCESSING_STRATEGY.FIFO, - ) {} - - public queueUpRequest( - request: (canceller?: RequestCanceller) => Promise, - ) { - const isCanceled: CancellationStatus = { status: false }; - const canceller: RequestCanceller = { - exec: () => { - isCanceled.status = true; - }, - }; - - const promise = new Promise((resolve, reject) => { - this.requestQueue.push({ - request, - successCallback: resolve, - failureCallback: reject, - isCanceled, - canceller, - }); - this.pollQueue(); - }); - - return { promise, canceller }; - } - - private async pollQueue() { - if (this.requestInProcessing < this.maxParallelProcesses) { - this.requestInProcessing++; - this.processQueue(); - } - } - - private async processQueue() { - while (this.requestQueue.length > 0) { - const queueItem = - this.processingStrategy === PROCESSING_STRATEGY.LIFO - ? this.requestQueue.pop() - : this.requestQueue.shift(); - let response = null; - - if (queueItem.isCanceled.status) { - queueItem.failureCallback(Error(CustomError.REQUEST_CANCELLED)); - } else { - try { - response = await queueItem.request(queueItem.canceller); - queueItem.successCallback(response); - } catch (e) { - queueItem.failureCallback(e); - } - } - } - this.requestInProcessing--; - } -} diff --git a/web/apps/photos/src/services/wasm/ffmpeg.ts b/web/apps/photos/src/services/wasm/ffmpeg.ts index ae6ad1498..5d3a082f3 100644 --- a/web/apps/photos/src/services/wasm/ffmpeg.ts +++ b/web/apps/photos/src/services/wasm/ffmpeg.ts @@ -2,7 +2,7 @@ import { addLogLine } from "@ente/shared/logging"; import { logError } from "@ente/shared/sentry"; import { generateTempName } from "@ente/shared/utils/temp"; import { createFFmpeg, FFmpeg } from "ffmpeg-wasm"; -import QueueProcessor from "services/queueProcessor"; +import QueueProcessor from "@ente/shared/utils/queueProcessor"; import { getUint8ArrayView } from "services/readerService"; import { promiseWithTimeout } from "utils/common"; diff --git a/web/apps/photos/src/services/wasmHeicConverter/wasmHEICConverterService.ts b/web/apps/photos/src/services/wasmHeicConverter/wasmHEICConverterService.ts index c782d4069..0213e70c4 100644 --- a/web/apps/photos/src/services/wasmHeicConverter/wasmHEICConverterService.ts +++ b/web/apps/photos/src/services/wasmHeicConverter/wasmHEICConverterService.ts @@ -2,9 +2,9 @@ import { CustomError } from "@ente/shared/error"; import { addLogLine } from "@ente/shared/logging"; import { retryAsyncFunction } from "@ente/shared/promise"; import { logError } from "@ente/shared/sentry"; +import QueueProcessor from "@ente/shared/utils/queueProcessor"; import { convertBytesToHumanReadable } from "@ente/shared/utils/size"; import { ComlinkWorker } from "@ente/shared/worker/comlinkWorker"; -import QueueProcessor from "services/queueProcessor"; import { getDedicatedConvertWorker } from "utils/comlink/ComlinkConvertWorker"; import { DedicatedConvertWorker } from "worker/convert.worker"; diff --git a/web/apps/cast/src/services/queueProcessor.ts b/web/packages/shared/utils/queueProcessor.ts similarity index 100% rename from web/apps/cast/src/services/queueProcessor.ts rename to web/packages/shared/utils/queueProcessor.ts