Continue refactoring
This commit is contained in:
parent
cdc45e9fcc
commit
7bf5c0ad5c
6 changed files with 152 additions and 147 deletions
|
@ -1,5 +1,5 @@
|
|||
import log from "@/next/log";
|
||||
import type { Electron } from "@/next/types/ipc";
|
||||
import type { CollectionMapping, Electron } from "@/next/types/ipc";
|
||||
import { CustomError } from "@ente/shared/error";
|
||||
import { isPromise } from "@ente/shared/utils";
|
||||
import DiscFullIcon from "@mui/icons-material/DiscFull";
|
||||
|
@ -8,7 +8,6 @@ import {
|
|||
DEFAULT_IMPORT_SUGGESTION,
|
||||
PICKED_UPLOAD_TYPE,
|
||||
UPLOAD_STAGES,
|
||||
type CollectionMapping,
|
||||
} from "constants/upload";
|
||||
import { t } from "i18next";
|
||||
import isElectron from "is-electron";
|
||||
|
@ -24,7 +23,7 @@ import {
|
|||
savePublicCollectionUploaderName,
|
||||
} from "services/publicCollectionService";
|
||||
import uploadManager from "services/upload/uploadManager";
|
||||
import watchFolderService from "services/watch";
|
||||
import watcher from "services/watch";
|
||||
import { NotificationAttributes } from "types/Notification";
|
||||
import { Collection } from "types/collection";
|
||||
import {
|
||||
|
@ -183,7 +182,7 @@ export default function Uploader(props: Props) {
|
|||
resumeDesktopUpload(type, electronFiles, collectionName);
|
||||
},
|
||||
);
|
||||
watchFolderService.init(
|
||||
watcher.init(
|
||||
setElectronFiles,
|
||||
setCollectionName,
|
||||
props.syncWithRemote,
|
||||
|
@ -291,18 +290,16 @@ export default function Uploader(props: Props) {
|
|||
}`,
|
||||
);
|
||||
if (uploadManager.isUploadRunning()) {
|
||||
if (watchFolderService.isUploadRunning()) {
|
||||
if (watcher.isUploadRunning()) {
|
||||
// Pause watch folder sync on user upload
|
||||
log.info(
|
||||
"watchFolder upload was running, pausing it to run user upload",
|
||||
"Folder watcher was uploading, pausing it to first run user upload",
|
||||
);
|
||||
// pause watch folder service on user upload
|
||||
watchFolderService.pauseRunningSync();
|
||||
watcher.pauseRunningSync();
|
||||
} else {
|
||||
log.info(
|
||||
"an upload is already running, rejecting new upload request",
|
||||
"Ignoring new upload request because an upload is already running",
|
||||
);
|
||||
// no-op
|
||||
// a user upload is already in progress
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -330,7 +327,7 @@ export default function Uploader(props: Props) {
|
|||
|
||||
const importSuggestion = getImportSuggestion(
|
||||
pickedUploadType.current,
|
||||
toUploadFiles.current,
|
||||
toUploadFiles.current.map((file) => file["path"]),
|
||||
);
|
||||
setImportSuggestion(importSuggestion);
|
||||
|
||||
|
@ -505,7 +502,7 @@ export default function Uploader(props: Props) {
|
|||
if (
|
||||
electron &&
|
||||
!isPendingDesktopUpload.current &&
|
||||
!watchFolderService.isUploadRunning()
|
||||
!watcher.isUploadRunning()
|
||||
) {
|
||||
await ImportService.setToUploadCollection(collections);
|
||||
if (zipPaths.current) {
|
||||
|
@ -532,14 +529,14 @@ export default function Uploader(props: Props) {
|
|||
closeUploadProgress();
|
||||
}
|
||||
if (isElectron()) {
|
||||
if (watchFolderService.isUploadRunning()) {
|
||||
await watchFolderService.allFileUploadsDone(
|
||||
if (watcher.isUploadRunning()) {
|
||||
await watcher.allFileUploadsDone(
|
||||
filesWithCollectionToUploadIn,
|
||||
collections,
|
||||
);
|
||||
} else if (watchFolderService.isSyncPaused()) {
|
||||
} else if (watcher.isSyncPaused()) {
|
||||
// resume the service after user upload is done
|
||||
watchFolderService.resumePausedSync();
|
||||
watcher.resumePausedSync();
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
|
@ -652,13 +649,13 @@ export default function Uploader(props: Props) {
|
|||
log.info(
|
||||
`pending upload - strategy - "multiple collections" `,
|
||||
);
|
||||
uploadFilesToNewCollections("leaf");
|
||||
uploadFilesToNewCollections("parent");
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (isElectron() && pickedUploadType === PICKED_UPLOAD_TYPE.ZIPS) {
|
||||
log.info("uploading zip files");
|
||||
uploadFilesToNewCollections("leaf");
|
||||
uploadFilesToNewCollections("parent");
|
||||
return;
|
||||
}
|
||||
if (isFirstUpload && !importSuggestion.rootFolderName) {
|
||||
|
@ -777,7 +774,7 @@ export default function Uploader(props: Props) {
|
|||
);
|
||||
return;
|
||||
}
|
||||
uploadFilesToNewCollections("leaf");
|
||||
uploadFilesToNewCollections("parent");
|
||||
};
|
||||
|
||||
const didSelectCollectionMapping = (mapping: CollectionMapping) => {
|
||||
|
|
|
@ -76,19 +76,19 @@ export const WatchFolder: React.FC<WatchFolderProps> = ({ open, onClose }) => {
|
|||
for (let i = 0; i < folders.length; i++) {
|
||||
const folder: any = folders[i];
|
||||
const path = (folder.path as string).replace(/\\/g, "/");
|
||||
if (await watcher.isFolder(path)) {
|
||||
if (await ensureElectron().fs.isDir(path)) {
|
||||
await selectCollectionMappingAndAddWatch(path);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const selectCollectionMappingAndAddWatch = async (path: string) => {
|
||||
const files = await ensureElectron().getDirFiles(path);
|
||||
const analysisResult = getImportSuggestion(
|
||||
const filePaths = await ensureElectron().watch.findFiles(path);
|
||||
const { hasNestedFolders } = getImportSuggestion(
|
||||
PICKED_UPLOAD_TYPE.FOLDERS,
|
||||
files,
|
||||
filePaths,
|
||||
);
|
||||
if (analysisResult.hasNestedFolders) {
|
||||
if (hasNestedFolders) {
|
||||
setSavedFolderPath(path);
|
||||
setChoiceModalOpen(true);
|
||||
} else {
|
||||
|
@ -102,9 +102,9 @@ export const WatchFolder: React.FC<WatchFolderProps> = ({ open, onClose }) => {
|
|||
};
|
||||
|
||||
const addNewWatch = async () => {
|
||||
const folderPath = await watcher.selectFolder();
|
||||
if (folderPath) {
|
||||
await selectCollectionMappingAndAddWatch(folderPath);
|
||||
const dirPath = await ensureElectron().selectDirectory();
|
||||
if (dirPath) {
|
||||
await selectCollectionMappingAndAddWatch(dirPath);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ import {
|
|||
getPublicCollectionUID,
|
||||
} from "services/publicCollectionService";
|
||||
import { getDisableCFUploadProxyFlag } from "services/userService";
|
||||
import watchFolderService from "services/watch";
|
||||
import watcher from "services/watch";
|
||||
import { Collection } from "types/collection";
|
||||
import { EncryptedEnteFile, EnteFile } from "types/file";
|
||||
import { SetFiles } from "types/gallery";
|
||||
|
@ -387,7 +387,7 @@ class UploadManager {
|
|||
uploadedFile: EncryptedEnteFile,
|
||||
) {
|
||||
if (isElectron()) {
|
||||
await watchFolderService.onFileUpload(
|
||||
await watcher.onFileUpload(
|
||||
fileUploadResult,
|
||||
fileWithCollection,
|
||||
uploadedFile,
|
||||
|
@ -436,7 +436,7 @@ class UploadManager {
|
|||
}
|
||||
|
||||
public shouldAllowNewUpload = () => {
|
||||
return !this.uploadInProgress || watchFolderService.isUploadRunning();
|
||||
return !this.uploadInProgress || watcher.isUploadRunning();
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -6,7 +6,11 @@
|
|||
import { ensureElectron } from "@/next/electron";
|
||||
import { nameAndExtension } from "@/next/file";
|
||||
import log from "@/next/log";
|
||||
import type { CollectionMapping, FolderWatch, FolderWatchSyncedFile } from "@/next/types/ipc";
|
||||
import type {
|
||||
CollectionMapping,
|
||||
FolderWatch,
|
||||
FolderWatchSyncedFile,
|
||||
} from "@/next/types/ipc";
|
||||
import { UPLOAD_RESULT } from "constants/upload";
|
||||
import debounce from "debounce";
|
||||
import uploadManager from "services/upload/uploadManager";
|
||||
|
@ -19,43 +23,24 @@ import { removeFromCollection } from "./collectionService";
|
|||
import { getLocalFiles } from "./fileService";
|
||||
|
||||
/**
|
||||
* A file system watch event encapsulates a change that has occurred on disk
|
||||
* that needs us to take some action within Ente to synchronize with the user's
|
||||
* Ente collections.
|
||||
* Watch for file system folders and automatically update the corresponding Ente
|
||||
* collections.
|
||||
*
|
||||
* Events get added in two ways:
|
||||
*
|
||||
* - When the app starts, it reads the current state of files on disk and
|
||||
* compares that with its last known state to determine what all events it
|
||||
* missed. This is easier than it sounds as we have only two events: add and
|
||||
* remove.
|
||||
*
|
||||
* - When the app is running, it gets live notifications from our file system
|
||||
* watcher (from the Node.js layer) about changes that have happened on disk,
|
||||
* which the app then enqueues onto the event queue if they pertain to the
|
||||
* files we're interested in.
|
||||
* This class relies on APIs exposed over the Electron IPC layer, and thus only
|
||||
* works when we're running inside our desktop app.
|
||||
*/
|
||||
interface WatchEvent {
|
||||
/** The action to take */
|
||||
action: "upload" | "trash";
|
||||
/** The path of the root folder corresponding to the {@link FolderWatch}. */
|
||||
folderPath: string;
|
||||
/** The name of the Ente collection the file belongs to. */
|
||||
collectionName?: string;
|
||||
/** The absolute path to the file under consideration. */
|
||||
filePath: string;
|
||||
}
|
||||
|
||||
class WatchFolderService {
|
||||
class FolderWatcher {
|
||||
private eventQueue: WatchEvent[] = [];
|
||||
private currentEvent: WatchEvent;
|
||||
private currentlySyncedMapping: FolderWatch;
|
||||
private trashingDirQueue: string[] = [];
|
||||
private isEventRunning: boolean = false;
|
||||
private uploadRunning: boolean = false;
|
||||
/** `true` if we are currently uploading */
|
||||
private uploadRunning = false;
|
||||
/** `true` if we are temporarily paused to let a user upload go through */
|
||||
private isPaused = false;
|
||||
private filePathToUploadedFileIDMap = new Map<string, EncryptedEnteFile>();
|
||||
private unUploadableFilePaths = new Set<string>();
|
||||
private isPaused = false;
|
||||
private setElectronFiles: (files: ElectronFile[]) => void;
|
||||
private setCollectionName: (collectionName: string) => void;
|
||||
private syncWithRemote: () => void;
|
||||
|
@ -66,31 +51,53 @@ class WatchFolderService {
|
|||
this.debouncedRunNextEvent = debounce(() => this.runNextEvent(), 1000);
|
||||
}
|
||||
|
||||
/** `true` if we are currently using the uploader */
|
||||
isUploadRunning() {
|
||||
return this.uploadRunning;
|
||||
}
|
||||
|
||||
/** `true` if syncing has been temporarily paused */
|
||||
isSyncPaused() {
|
||||
return this.isPaused;
|
||||
}
|
||||
|
||||
/**
|
||||
* Temporarily pause syncing and cancel any running uploads.
|
||||
*
|
||||
* This frees up the uploader for handling user initated uploads.
|
||||
*/
|
||||
pauseRunningSync() {
|
||||
this.isPaused = true;
|
||||
uploadManager.cancelRunningUpload();
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume from a temporary pause, resyncing from disk.
|
||||
*
|
||||
* Sibling of {@link pauseRunningSync}.
|
||||
*/
|
||||
resumePausedSync() {
|
||||
this.isPaused = false;
|
||||
this.syncWithDisk();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the watcher.
|
||||
*
|
||||
* This is only called when we're running in the context of our desktop app.
|
||||
*/
|
||||
async init(
|
||||
setElectronFiles: (files: ElectronFile[]) => void,
|
||||
setCollectionName: (collectionName: string) => void,
|
||||
syncWithRemote: () => void,
|
||||
setWatchFolderServiceIsRunning: (isRunning: boolean) => void,
|
||||
) {
|
||||
try {
|
||||
this.setElectronFiles = setElectronFiles;
|
||||
this.setCollectionName = setCollectionName;
|
||||
this.syncWithRemote = syncWithRemote;
|
||||
this.setWatchFolderServiceIsRunning =
|
||||
setWatchFolderServiceIsRunning;
|
||||
this.setupWatcherFunctions();
|
||||
await this.syncWithDisk();
|
||||
} catch (e) {
|
||||
log.error("error while initializing watch service", e);
|
||||
}
|
||||
this.setElectronFiles = setElectronFiles;
|
||||
this.setCollectionName = setCollectionName;
|
||||
this.syncWithRemote = syncWithRemote;
|
||||
this.setWatchFolderServiceIsRunning = setWatchFolderServiceIsRunning;
|
||||
this.setupWatcherFunctions();
|
||||
await this.syncWithDisk();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -101,44 +108,6 @@ class WatchFolderService {
|
|||
return this.currentEvent?.folderPath === watch.folderPath;
|
||||
}
|
||||
|
||||
private async syncWithDisk() {
|
||||
try {
|
||||
const electron = ensureElectron();
|
||||
const mappings = await electron.getWatchMappings();
|
||||
if (!mappings) return;
|
||||
|
||||
this.eventQueue = [];
|
||||
const { events, deletedFolderPaths } = await deduceEvents(mappings);
|
||||
log.info(`Folder watch deduced ${events.length} events`);
|
||||
this.eventQueue = this.eventQueue.concat(events);
|
||||
|
||||
for (const path of deletedFolderPaths)
|
||||
electron.removeWatchMapping(path);
|
||||
|
||||
this.debouncedRunNextEvent();
|
||||
} catch (e) {
|
||||
log.error("Ignoring error while syncing watched folders", e);
|
||||
}
|
||||
}
|
||||
|
||||
private pushEvent(event: WatchEvent) {
|
||||
this.eventQueue.push(event);
|
||||
log.info("Folder watch event", event);
|
||||
this.debouncedRunNextEvent();
|
||||
}
|
||||
|
||||
async pushTrashedDir(path: string) {
|
||||
this.trashingDirQueue.push(path);
|
||||
}
|
||||
|
||||
private setupWatcherFunctions() {
|
||||
ensureElectron().registerWatcherFunctions(
|
||||
diskFileAddedCallback,
|
||||
diskFileRemovedCallback,
|
||||
diskFolderRemovedCallback,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new folder watch for the given root {@link folderPath}
|
||||
*
|
||||
|
@ -174,6 +143,44 @@ class WatchFolderService {
|
|||
}
|
||||
}
|
||||
|
||||
private async syncWithDisk() {
|
||||
try {
|
||||
const electron = ensureElectron();
|
||||
const mappings = await electron.getWatchMappings();
|
||||
if (!mappings) return;
|
||||
|
||||
this.eventQueue = [];
|
||||
const { events, deletedFolderPaths } = await deduceEvents(mappings);
|
||||
log.info(`Folder watch deduced ${events.length} events`);
|
||||
this.eventQueue = this.eventQueue.concat(events);
|
||||
|
||||
for (const path of deletedFolderPaths)
|
||||
electron.removeWatchMapping(path);
|
||||
|
||||
this.debouncedRunNextEvent();
|
||||
} catch (e) {
|
||||
log.error("Ignoring error while syncing watched folders", e);
|
||||
}
|
||||
}
|
||||
|
||||
pushEvent(event: WatchEvent) {
|
||||
this.eventQueue.push(event);
|
||||
log.info("Folder watch event", event);
|
||||
this.debouncedRunNextEvent();
|
||||
}
|
||||
|
||||
async pushTrashedDir(path: string) {
|
||||
this.trashingDirQueue.push(path);
|
||||
}
|
||||
|
||||
private setupWatcherFunctions() {
|
||||
ensureElectron().registerWatcherFunctions(
|
||||
diskFileAddedCallback,
|
||||
diskFileRemovedCallback,
|
||||
diskFolderRemovedCallback,
|
||||
);
|
||||
}
|
||||
|
||||
private setIsEventRunning(isEventRunning: boolean) {
|
||||
this.isEventRunning = isEventRunning;
|
||||
this.setWatchFolderServiceIsRunning(isEventRunning);
|
||||
|
@ -542,15 +549,6 @@ class WatchFolderService {
|
|||
}
|
||||
}
|
||||
|
||||
async selectFolder(): Promise<string> {
|
||||
try {
|
||||
const folderPath = await ensureElectron().selectDirectory();
|
||||
return folderPath;
|
||||
} catch (e) {
|
||||
log.error("error while selecting folder", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Batches all the files to be uploaded (or trashed) from the
|
||||
// event queue of same collection as the next event
|
||||
private clubSameCollectionEvents(): EventQueueItem {
|
||||
|
@ -569,33 +567,44 @@ class WatchFolderService {
|
|||
}
|
||||
return event;
|
||||
}
|
||||
|
||||
async isFolder(folderPath: string) {
|
||||
try {
|
||||
return await ensureElectron().fs.isDir(folderPath);
|
||||
} catch (e) {
|
||||
log.error("error while checking if folder exists", e);
|
||||
}
|
||||
}
|
||||
|
||||
pauseRunningSync() {
|
||||
this.isPaused = true;
|
||||
uploadManager.cancelRunningUpload();
|
||||
}
|
||||
|
||||
resumePausedSync() {
|
||||
this.isPaused = false;
|
||||
this.syncWithDisk();
|
||||
}
|
||||
}
|
||||
|
||||
const watchFolderService = new WatchFolderService();
|
||||
/** The singleton instance of the {@link FolderWatcher}. */
|
||||
const watcher = new FolderWatcher();
|
||||
|
||||
export default watchFolderService;
|
||||
export default watcher;
|
||||
|
||||
/**
|
||||
* A file system watch event encapsulates a change that has occurred on disk
|
||||
* that needs us to take some action within Ente to synchronize with the user's
|
||||
* Ente collections.
|
||||
*
|
||||
* Events get added in two ways:
|
||||
*
|
||||
* - When the app starts, it reads the current state of files on disk and
|
||||
* compares that with its last known state to determine what all events it
|
||||
* missed. This is easier than it sounds as we have only two events: add and
|
||||
* remove.
|
||||
*
|
||||
* - When the app is running, it gets live notifications from our file system
|
||||
* watcher (from the Node.js layer) about changes that have happened on disk,
|
||||
* which the app then enqueues onto the event queue if they pertain to the
|
||||
* files we're interested in.
|
||||
*/
|
||||
interface WatchEvent {
|
||||
/** The action to take */
|
||||
action: "upload" | "trash";
|
||||
/** The path of the root folder corresponding to the {@link FolderWatch}. */
|
||||
folderPath: string;
|
||||
/** The name of the Ente collection the file belongs to. */
|
||||
collectionName?: string;
|
||||
/** The absolute path to the file under consideration. */
|
||||
filePath: string;
|
||||
}
|
||||
|
||||
async function diskFileAddedCallback(file: ElectronFile) {
|
||||
const collectionNameAndFolderPath =
|
||||
await watchFolderService.getCollectionNameAndFolderPath(file.path);
|
||||
await watcher.getCollectionNameAndFolderPath(file.path);
|
||||
|
||||
if (!collectionNameAndFolderPath) {
|
||||
return;
|
||||
|
@ -609,12 +618,12 @@ async function diskFileAddedCallback(file: ElectronFile) {
|
|||
folderPath,
|
||||
path: file.path,
|
||||
};
|
||||
watchFolderService.pushEvent(event);
|
||||
watcher.pushEvent(event);
|
||||
}
|
||||
|
||||
async function diskFileRemovedCallback(filePath: string) {
|
||||
const collectionNameAndFolderPath =
|
||||
await watchFolderService.getCollectionNameAndFolderPath(filePath);
|
||||
await watcher.getCollectionNameAndFolderPath(filePath);
|
||||
|
||||
if (!collectionNameAndFolderPath) {
|
||||
return;
|
||||
|
@ -628,12 +637,12 @@ async function diskFileRemovedCallback(filePath: string) {
|
|||
folderPath,
|
||||
path: filePath,
|
||||
};
|
||||
watchFolderService.pushEvent(event);
|
||||
watcher.pushEvent(event);
|
||||
}
|
||||
|
||||
async function diskFolderRemovedCallback(folderPath: string) {
|
||||
try {
|
||||
const mappings = await watchFolderService.getWatchMappings();
|
||||
const mappings = await watcher.getWatchMappings();
|
||||
const mapping = mappings.find(
|
||||
(mapping) => mapping.folderPath === folderPath,
|
||||
);
|
||||
|
@ -641,7 +650,7 @@ async function diskFolderRemovedCallback(folderPath: string) {
|
|||
log.info(`folder not found in mappings, ${folderPath}`);
|
||||
throw Error(`Watch mapping not found`);
|
||||
}
|
||||
watchFolderService.pushTrashedDir(folderPath);
|
||||
watcher.pushTrashedDir(folderPath);
|
||||
log.info(`added trashedDir, ${folderPath}`);
|
||||
} catch (e) {
|
||||
log.error("error while calling diskFolderRemovedCallback", e);
|
||||
|
|
|
@ -112,13 +112,12 @@ export function areFileWithCollectionsSame(
|
|||
|
||||
export function getImportSuggestion(
|
||||
uploadType: PICKED_UPLOAD_TYPE,
|
||||
toUploadFiles: File[] | ElectronFile[],
|
||||
paths: string[],
|
||||
): ImportSuggestion {
|
||||
if (isElectron() && uploadType === PICKED_UPLOAD_TYPE.FILES) {
|
||||
return DEFAULT_IMPORT_SUGGESTION;
|
||||
}
|
||||
|
||||
const paths: string[] = toUploadFiles.map((file) => file["path"]);
|
||||
const getCharCount = (str: string) => (str.match(/\//g) ?? []).length;
|
||||
paths.sort((path1, path2) => getCharCount(path1) - getCharCount(path2));
|
||||
const firstPath = paths[0];
|
||||
|
|
|
@ -96,7 +96,7 @@ export const testZipWithRootFileReadingTest = async () => {
|
|||
|
||||
const importSuggestion = getImportSuggestion(
|
||||
PICKED_UPLOAD_TYPE.ZIPS,
|
||||
files,
|
||||
files.map((file) => file["path"]),
|
||||
);
|
||||
if (!importSuggestion.rootFolderName) {
|
||||
throw Error(
|
||||
|
|
Loading…
Add table
Reference in a new issue