This commit is contained in:
Manav Rathi 2024-04-19 11:22:02 +05:30
parent 722cc74e64
commit f24fd98bc3
No known key found for this signature in database

View file

@ -18,7 +18,7 @@ import { Collection } from "types/collection";
import { EncryptedEnteFile } from "types/file";
import { ElectronFile, FileWithCollection } from "types/upload";
import { groupFilesBasedOnCollectionID } from "utils/file";
import { isHiddenFile, isSystemFile } from "utils/upload";
import { isHiddenFile } from "utils/upload";
import { removeFromCollection } from "./collectionService";
import { getLocalFiles } from "./fileService";
@ -30,30 +30,40 @@ import { getLocalFiles } from "./fileService";
* works when we're running inside our desktop app.
*/
class FolderWatcher {
/** `true` if we are currently uploading */
private uploadRunning = false;
/** `true` if we are temporarily paused to let a user upload go through */
private isPaused = false;
/** Pending file system events that we need to process */
/** Pending file system events that we need to process. */
private eventQueue: WatchEvent[] = [];
private currentEvent: WatchEvent;
// TODO(MR): dedup if possible
private isEventRunning: boolean = false;
/** The folder watch whose event we're currently processing */
private activeWatch: FolderWatch | undefined;
/**
* If the file system directory corresponding to the (root) folder path of a
* folder watch is deleted on disk, we note down that in this queue so that
* we can ignore any file system events that come for it next.
*
* TODO (MR): is this really even coming into play? the mappings are
* pre-checked first.
* TODO: is this really needed? the mappings are pre-checked first.
*/
private deletedFolderPaths: string[] = [];
private currentlySyncedMapping: FolderWatch;
/** `true` if we are using the uploader. */
private uploadRunning = false;
/** `true` if we are temporarily paused to let a user upload go through. */
private isPaused = false;
private filePathToUploadedFileIDMap = new Map<string, EncryptedEnteFile>();
private unUploadableFilePaths = new Set<string>();
private setElectronFiles: (files: ElectronFile[]) => void;
private setCollectionName: (collectionName: string) => void;
/**
* A function to call when we want to enqueue a new upload of the given list
* of file paths to the given Ente collection.
*
* This is passed as a param to {@link init}.
*/
private upload: (collectionName: string, filePaths: string[]) => void;
/**
* A function to call when we want to sync with the backend.
*
* This is passed as a param to {@link init}.
*/
private syncWithRemote: () => void;
/** A helper function that debounces invocations of {@link runNextEvent}. */
private debouncedRunNextEvent: () => void;
constructor() {
@ -61,20 +71,21 @@ class FolderWatcher {
}
/**
* Initialize the watcher.
* Initialize the watcher and start processing file system events.
*
* This is only called when we're running in the context of our desktop app.
*
* The caller provides us with the hooks we can use to actually upload the
* files, and to sync with remote (say after deletion).
*/
async init(
setElectronFiles: (files: ElectronFile[]) => void,
setCollectionName: (collectionName: string) => void,
init(
upload: (collectionName: string, filePaths: string[]) => void,
syncWithRemote: () => void,
) {
this.setElectronFiles = setElectronFiles;
this.setCollectionName = setCollectionName;
this.upload = upload;
this.syncWithRemote = syncWithRemote;
this.registerListeners();
await this.syncWithDisk();
this.syncWithDisk();
}
/** `true` if we are currently using the uploader */
@ -117,9 +128,7 @@ class FolderWatcher {
* {@link folderPath}.
*/
isSyncingFolder(folderPath: string) {
return (
this.isEventRunning && this.currentEvent?.folderPath == folderPath
);
return this.activeWatch?.folderPath == folderPath;
}
/**
@ -204,63 +213,101 @@ class FolderWatcher {
}
private async runNextEvent() {
try {
if (
this.eventQueue.length === 0 ||
this.isEventRunning ||
this.isPaused
) {
if (this.eventQueue.length == 0 || this.activeWatch || this.isPaused)
return;
const skip = (reason: string) => {
log.info(`Ignoring event since ${reason}`);
this.debouncedRunNextEvent();
};
const event = this.dequeueClubbedEvent();
log.info(
`Processing ${event.action} event for folder watch ${event.folderPath} (collectionName ${event.collectionName}, ${event.filePaths.length} files)`,
);
const watch = (await this.getWatches()).find(
(watch) => watch.folderPath == event.folderPath,
);
if (!watch) {
// Possibly stale
skip(`no folder watch for found for ${event.folderPath}`);
return;
}
if (event.action === "upload") {
const pathsToUpload = pathsThatNeedUpload(event.filePaths, watch);
if (pathsToUpload.length == 0) {
skip("none of the files need uploading");
return;
}
const event = this.clubSameCollectionEvents();
log.info(
`running event type:${event.type} collectionName:${event.collectionName} folderPath:${event.folderPath} , fileCount:${event.files?.length} pathsCount: ${event.paths?.length}`,
);
const mappings = await this.getWatchMappings();
const mapping = mappings.find(
(mapping) => mapping.folderPath === event.folderPath,
);
if (!mapping) {
throw Error("no Mapping found for event");
}
log.info(
`mapping for event rootFolder: ${mapping.rootFolderName} folderPath: ${mapping.folderPath} colelctionMapping: ${mapping.collectionMapping} syncedFilesCount: ${mapping.syncedFiles.length} ignoredFilesCount ${mapping.ignoredFiles.length}`,
);
if (event.type === "upload") {
event.files = getValidFilesToUpload(event.files, mapping);
log.info(`valid files count: ${event.files?.length}`);
if (event.files.length === 0) {
return;
}
}
this.currentEvent = event;
this.currentlySyncedMapping = mapping;
// Here we pass control to the uploader. When the upload is done,
// the uploader will notify us by calling allFileUploadsDone.
this.isEventRunning = true;
if (event.type === "upload") {
this.processUploadEvent();
} else {
await this.processTrashEvent();
this.isEventRunning = false;
setTimeout(() => this.runNextEvent(), 0);
}
} catch (e) {
log.error("runNextEvent failed", e);
}
}
private async processUploadEvent() {
try {
this.activeWatch = watch;
this.uploadRunning = true;
this.setCollectionName(this.currentEvent.collectionName);
this.setElectronFiles(this.currentEvent.files);
} catch (e) {
log.error("error while running next upload", e);
const collectionName = event.collectionName;
log.info(
`Folder watch requested upload of ${pathsToUpload.length} files to collection ${collectionName}`,
);
this.upload(collectionName, pathsToUpload);
} else {
if (this.pruneFileEventsFromDeletedFolderPaths()) {
skip("event was from a deleted folder path");
return;
}
const { paths } = this.currentEvent;
const filePathsToRemove = new Set(paths);
const files = this.currentlySyncedMapping.syncedFiles.filter(
(file) => filePathsToRemove.has(file.path),
);
await this.trashByIDs(files);
this.currentlySyncedMapping.syncedFiles =
this.currentlySyncedMapping.syncedFiles.filter(
(file) => !filePathsToRemove.has(file.path),
);
await ensureElectron().updateWatchMappingSyncedFiles(
this.currentlySyncedMapping.folderPath,
this.currentlySyncedMapping.syncedFiles,
);
this.isEventRunning = false;
setTimeout(() => this.runNextEvent(), 0);
}
}
/**
* Batch the next run of events with the same action, collection and folder
* path into a single clubbed event that contains the list of all effected
* file paths from the individual events.
*/
private dequeueClubbedEvent(): ClubbedWatchEvent | undefined {
const event = this.eventQueue.shift();
if (!event) return undefined;
const filePaths = [event.filePath];
while (
this.eventQueue.length > 0 &&
event.action === this.eventQueue[0].action &&
event.folderPath === this.eventQueue[0].folderPath &&
event.collectionName === this.eventQueue[0].collectionName
) {
filePaths.push(this.eventQueue[0].filePath);
this.eventQueue.shift();
}
return { ...event, filePaths };
}
/**
* Callback invoked by the uploader whenever a file is uploaded.
*/
async onFileUpload(
fileUploadResult: UPLOAD_RESULT,
fileWithCollection: FileWithCollection,
@ -317,6 +364,9 @@ class FolderWatcher {
}
}
/**
* Callback invoked by the uploader whenever a set of file uploads finishes.
*/
async allFileUploadsDone(
filesWithCollection: FileWithCollection[],
collections: Collection[],
@ -469,34 +519,6 @@ class FolderWatcher {
}
}
private async processTrashEvent() {
try {
if (this.pruneFileEventsFromDeletedFolderPaths()) {
return;
}
const { paths } = this.currentEvent;
const filePathsToRemove = new Set(paths);
const files = this.currentlySyncedMapping.syncedFiles.filter(
(file) => filePathsToRemove.has(file.path),
);
await this.trashByIDs(files);
this.currentlySyncedMapping.syncedFiles =
this.currentlySyncedMapping.syncedFiles.filter(
(file) => !filePathsToRemove.has(file.path),
);
await ensureElectron().updateWatchMappingSyncedFiles(
this.currentlySyncedMapping.folderPath,
this.currentlySyncedMapping.syncedFiles,
);
} catch (e) {
log.error("error while running next trash", e);
}
}
private async trashByIDs(toTrashFiles: FolderWatch["syncedFiles"]) {
try {
const files = await getLocalFiles();
@ -560,28 +582,6 @@ class FolderWatcher {
log.error("error while getting collection name", e);
}
}
/**
* Batch the next run of events with the same action, collection and folder
* path into a single clubbed event that contains the list of all effected
* file paths from the individual events.
*/
private dequeueClubbedEvent(): ClubbedWatchEvent | undefined {
const event = this.eventQueue.shift();
if (!event) return undefined;
const filePaths = [event.filePath];
while (
this.eventQueue.length > 0 &&
event.action === this.eventQueue[0].action &&
event.folderPath === this.eventQueue[0].folderPath &&
event.collectionName === this.eventQueue[0].collectionName
) {
filePaths.push(this.eventQueue[0].filePath);
this.eventQueue.shift();
}
return { ...event, filePaths };
}
}
/** The singleton instance of the {@link FolderWatcher}. */
@ -612,7 +612,7 @@ interface WatchEvent {
/** The path of the root folder corresponding to the {@link FolderWatch}. */
folderPath: string;
/** The name of the Ente collection the file belongs to. */
collectionName?: string;
collectionName: string;
/** The absolute path to the file under consideration. */
filePath: string;
}
@ -629,29 +629,6 @@ type ClubbedWatchEvent = Omit<WatchEvent, "filePath"> & {
filePaths: string[];
};
export function getValidFilesToUpload(
files: ElectronFile[],
mapping: FolderWatch,
) {
const uniqueFilePaths = new Set<string>();
return files.filter((file) => {
if (!isSystemFile(file) && !isSyncedOrIgnoredFile(file, mapping)) {
if (!uniqueFilePaths.has(file.path)) {
uniqueFilePaths.add(file.path);
return true;
}
}
return false;
});
}
function isSyncedOrIgnoredFile(file: ElectronFile, mapping: FolderWatch) {
return (
mapping.ignoredFiles.includes(file.path) ||
mapping.syncedFiles.find((f) => f.path === file.path)
);
}
/**
* Determine which events we need to process to synchronize the watched on-disk
* folders to their corresponding collections.
@ -664,14 +641,10 @@ const deduceEvents = async (watches: FolderWatch[]): Promise<WatchEvent[]> => {
for (const watch of watches) {
const folderPath = watch.folderPath;
const paths = (await electron.watch.findFiles(folderPath))
// Filter out hidden files (files whose names begins with a dot)
.filter((path) => !isHiddenFile(path));
const paths = await electron.watch.findFiles(folderPath);
// Files that are on disk but not yet synced.
const pathsToUpload = paths.filter(
(path) => !isSyncedOrIgnoredPath(path, watch),
);
const pathsToUpload = pathsThatNeedUpload(paths, watch);
for (const path of pathsToUpload)
events.push({
@ -698,6 +671,17 @@ const deduceEvents = async (watches: FolderWatch[]): Promise<WatchEvent[]> => {
return events;
};
/**
* Remove hidden files and previously synced or ignored from {@link filePaths},
* returning the list of filtered paths that need to be uploaded.
*/
const pathsThatNeedUpload = (filePaths: string[], watch: FolderWatch) =>
filePaths
// Filter out hidden files (files whose names begins with a dot)
.filter((path) => !isHiddenFile(path))
// Files that are on disk but not yet synced or ignored.
.filter((path) => !isSyncedOrIgnoredPath(path, watch));
const isSyncedOrIgnoredPath = (path: string, watch: FolderWatch) =>
watch.ignoredFiles.includes(path) ||
watch.syncedFiles.find((f) => f.path === path);