|
@@ -1,8 +1,4 @@
|
|
-import { haveWindow } from "@/next/env";
|
|
|
|
import log from "@/next/log";
|
|
import log from "@/next/log";
|
|
-import { ComlinkWorker } from "@/next/worker/comlink-worker";
|
|
|
|
-import { getDedicatedCryptoWorker } from "@ente/shared/crypto";
|
|
|
|
-import { DedicatedCryptoWorker } from "@ente/shared/crypto/internal/crypto.worker";
|
|
|
|
import { CustomError, parseUploadErrorCodes } from "@ente/shared/error";
|
|
import { CustomError, parseUploadErrorCodes } from "@ente/shared/error";
|
|
import PQueue from "p-queue";
|
|
import PQueue from "p-queue";
|
|
import mlIDbStorage, { ML_SEARCH_CONFIG_NAME } from "services/face/db";
|
|
import mlIDbStorage, { ML_SEARCH_CONFIG_NAME } from "services/face/db";
|
|
@@ -54,63 +50,30 @@ class MLSyncContext {
|
|
public localFilesMap: Map<number, EnteFile>;
|
|
public localFilesMap: Map<number, EnteFile>;
|
|
public outOfSyncFiles: EnteFile[];
|
|
public outOfSyncFiles: EnteFile[];
|
|
public nSyncedFiles: number;
|
|
public nSyncedFiles: number;
|
|
-
|
|
|
|
public error?: Error;
|
|
public error?: Error;
|
|
|
|
|
|
public syncQueue: PQueue;
|
|
public syncQueue: PQueue;
|
|
- // TODO: wheather to limit concurrent downloads
|
|
|
|
- // private downloadQueue: PQueue;
|
|
|
|
-
|
|
|
|
- private concurrency: number;
|
|
|
|
- private comlinkCryptoWorker: Array<
|
|
|
|
- ComlinkWorker<typeof DedicatedCryptoWorker>
|
|
|
|
- >;
|
|
|
|
- private enteWorkers: Array<any>;
|
|
|
|
|
|
|
|
- constructor(token: string, userID: number, concurrency?: number) {
|
|
|
|
|
|
+ constructor(token: string, userID: number) {
|
|
this.token = token;
|
|
this.token = token;
|
|
this.userID = userID;
|
|
this.userID = userID;
|
|
|
|
|
|
this.outOfSyncFiles = [];
|
|
this.outOfSyncFiles = [];
|
|
this.nSyncedFiles = 0;
|
|
this.nSyncedFiles = 0;
|
|
|
|
|
|
- this.concurrency = concurrency ?? getConcurrency();
|
|
|
|
-
|
|
|
|
- log.info("Using concurrency: ", this.concurrency);
|
|
|
|
- // timeout is added on downloads
|
|
|
|
- // timeout on queue will keep the operation open till worker is terminated
|
|
|
|
- this.syncQueue = new PQueue({ concurrency: this.concurrency });
|
|
|
|
- logQueueStats(this.syncQueue, "sync");
|
|
|
|
- // this.downloadQueue = new PQueue({ concurrency: 1 });
|
|
|
|
- // logQueueStats(this.downloadQueue, 'download');
|
|
|
|
-
|
|
|
|
- this.comlinkCryptoWorker = new Array(this.concurrency);
|
|
|
|
- this.enteWorkers = new Array(this.concurrency);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public async getEnteWorker(id: number): Promise<any> {
|
|
|
|
- const wid = id % this.enteWorkers.length;
|
|
|
|
- console.log("getEnteWorker: ", id, wid);
|
|
|
|
- if (!this.enteWorkers[wid]) {
|
|
|
|
- this.comlinkCryptoWorker[wid] = getDedicatedCryptoWorker();
|
|
|
|
- this.enteWorkers[wid] = await this.comlinkCryptoWorker[wid].remote;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return this.enteWorkers[wid];
|
|
|
|
|
|
+ const concurrency = getConcurrency();
|
|
|
|
+ this.syncQueue = new PQueue({ concurrency });
|
|
}
|
|
}
|
|
|
|
|
|
public async dispose() {
|
|
public async dispose() {
|
|
this.localFilesMap = undefined;
|
|
this.localFilesMap = undefined;
|
|
await this.syncQueue.onIdle();
|
|
await this.syncQueue.onIdle();
|
|
this.syncQueue.removeAllListeners();
|
|
this.syncQueue.removeAllListeners();
|
|
- for (const enteComlinkWorker of this.comlinkCryptoWorker) {
|
|
|
|
- enteComlinkWorker?.terminate();
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-export const getConcurrency = () =>
|
|
|
|
- haveWindow() && Math.max(2, Math.ceil(navigator.hardwareConcurrency / 2));
|
|
|
|
|
|
+const getConcurrency = () =>
|
|
|
|
+ Math.max(2, Math.ceil(navigator.hardwareConcurrency / 2));
|
|
|
|
|
|
class MachineLearningService {
|
|
class MachineLearningService {
|
|
private localSyncContext: Promise<MLSyncContext>;
|
|
private localSyncContext: Promise<MLSyncContext>;
|
|
@@ -320,9 +283,6 @@ class MachineLearningService {
|
|
localFile?: globalThis.File,
|
|
localFile?: globalThis.File,
|
|
) {
|
|
) {
|
|
try {
|
|
try {
|
|
- console.log(
|
|
|
|
- `Indexing ${enteFile.title ?? "<untitled>"} ${enteFile.id}`,
|
|
|
|
- );
|
|
|
|
const mlFileData = await this.syncFile(enteFile, localFile);
|
|
const mlFileData = await this.syncFile(enteFile, localFile);
|
|
syncContext.nSyncedFiles += 1;
|
|
syncContext.nSyncedFiles += 1;
|
|
return mlFileData;
|
|
return mlFileData;
|
|
@@ -390,15 +350,3 @@ class MachineLearningService {
|
|
}
|
|
}
|
|
|
|
|
|
export default new MachineLearningService();
|
|
export default new MachineLearningService();
|
|
-
|
|
|
|
-export function logQueueStats(queue: PQueue, name: string) {
|
|
|
|
- queue.on("active", () =>
|
|
|
|
- log.info(
|
|
|
|
- `queuestats: ${name}: Active, Size: ${queue.size} Pending: ${queue.pending}`,
|
|
|
|
- ),
|
|
|
|
- );
|
|
|
|
- queue.on("idle", () => log.info(`queuestats: ${name}: Idle`));
|
|
|
|
- queue.on("error", (error) =>
|
|
|
|
- console.error(`queuestats: ${name}: Error, `, error),
|
|
|
|
- );
|
|
|
|
-}
|
|
|