瀏覽代碼

Merge pull request #158 from ente-io/split_syncs

Decouple local and remote syncs
Vishnu Mohandas 3 年之前
父節點
當前提交
1936ed3b8b
共有 3 個文件被更改,包括 68 次插入33 次删除
  1. 23 8
      lib/services/local_sync_service.dart
  2. 45 21
      lib/services/remote_sync_service.dart
  3. 0 4
      lib/services/sync_service.dart

+ 23 - 8
lib/services/local_sync_service.dart

@@ -19,6 +19,7 @@ class LocalSyncService {
   final _db = FilesDB.instance;
   final Computer _computer = Computer();
   SharedPreferences _prefs;
+  Completer<void> _existingSync;
 
   static const kDbUpdationTimeKey = "db_updation_time";
   static const kHasCompletedFirstImportKey = "has_completed_firstImport";
@@ -41,14 +42,9 @@ class LocalSyncService {
       await PhotoManager.setIgnorePermissionCheck(true);
     }
     await _computer.turnOn(workersCount: 1);
-  }
-
-  void addChangeCallback(Function() callback) {
-    PhotoManager.addChangeCallback((value) {
-      _logger.info("Something changed on disk");
-      callback();
-    });
-    PhotoManager.startChangeNotify();
+    if (hasGrantedPermissions()) {
+      _registerChangeCallback();
+    }
   }
 
   Future<void> sync() async {
@@ -64,6 +60,11 @@ class LocalSyncService {
         return;
       }
     }
+    if (_existingSync != null) {
+      _logger.warning("Sync already in progress, skipping.");
+      return _existingSync.future;
+    }
+    _existingSync = Completer<void>();
     final existingLocalFileIDs = await _db.getExistingLocalFileIDs();
     _logger.info(
         existingLocalFileIDs.length.toString() + " localIDs were discovered");
@@ -117,6 +118,8 @@ class LocalSyncService {
     final endTime = DateTime.now().microsecondsSinceEpoch;
     final duration = Duration(microseconds: endTime - startTime);
     _logger.info("Load took " + duration.inMilliseconds.toString() + "ms");
+    _existingSync.complete();
+    _existingSync = null;
   }
 
   Future<bool> syncAll() async {
@@ -201,6 +204,7 @@ class LocalSyncService {
   Future<void> onPermissionGranted(PermissionState state) async {
     await _prefs.setBool(kHasGrantedPermissionsKey, true);
     await _prefs.setString(kPermissionStateKey, state.toString());
+    _registerChangeCallback();
   }
 
   bool hasCompletedFirstImport() {
@@ -257,4 +261,15 @@ class LocalSyncService {
       Configuration.instance.setPathsToBackUp(pathsToBackup);
     }
   }
+
+  void _registerChangeCallback() {
+    PhotoManager.addChangeCallback((value) async {
+      _logger.info("Something changed on disk");
+      if (_existingSync != null) {
+        await _existingSync.future;
+      }
+      sync();
+    });
+    PhotoManager.startChangeNotify();
+  }
 }

+ 45 - 21
lib/services/remote_sync_service.dart

@@ -52,6 +52,14 @@ class RemoteSyncService {
 
   Future<void> init() async {
     _prefs = await SharedPreferences.getInstance();
+
+    Bus.instance.on<LocalPhotosUpdatedEvent>().listen((event) async {
+      if (event.type == EventType.addedOrUpdated) {
+        if (_existingSync == null) {
+          sync();
+        }
+      }
+    });
   }
 
   Future<void> sync({bool silently = false}) async {
@@ -65,20 +73,8 @@ class RemoteSyncService {
     }
     _existingSync = Completer<void>();
 
-    bool isFirstSync = !_collectionsService.hasSyncedCollections();
-
     try {
-      await _collectionsService.sync();
-
-      if (isFirstSync || _hasReSynced()) {
-        await _syncUpdatedCollections(silently);
-      } else {
-        final syncSinceTime = _getSinceTimeForReSync();
-        await _resyncAllCollectionsSinceTime(syncSinceTime);
-      }
-      if (!_hasReSynced()) {
-        await _markReSyncAsDone();
-      }
+      await _pullDiff(silently);
       // sync trash but consume error during initial launch.
       // this is to ensure that we don't pause upload due to any error during
       // the trash sync. Impact: We may end up re-uploading a file which was
@@ -86,13 +82,23 @@ class RemoteSyncService {
       await TrashSyncService.instance
           .syncTrash()
           .onError((e, s) => _logger.severe('trash sync failed', e, s));
-      bool hasUploadedFiles = await _uploadDiff();
-      _existingSync.complete();
-      _existingSync = null;
-      if (hasUploadedFiles && !_shouldThrottleSync()) {
-        // Skipping a resync to ensure that files that were ignored in this
-        // session are not processed now
-        sync(silently: true);
+      final filesToBeUploaded = await _getFilesToBeUploaded();
+      final hasUploadedFiles = await _uploadFiles(filesToBeUploaded);
+      if (hasUploadedFiles) {
+        await _pullDiff(true);
+        _existingSync.complete();
+        _existingSync = null;
+        final hasMoreFilesToBackup = (await _getFilesToBeUploaded()).isNotEmpty;
+        if (hasMoreFilesToBackup && !_shouldThrottleSync()) {
+          // Skipping a resync to ensure that files that were ignored in this
+          // session are not processed now
+          sync();
+        } else {
+          Bus.instance.fire(SyncStatusUpdate(SyncStatus.completed_backup));
+        }
+      } else {
+        _existingSync.complete();
+        _existingSync = null;
       }
     } catch (e, s) {
       _logger.severe("Error executing remote sync ", e, s);
@@ -101,6 +107,21 @@ class RemoteSyncService {
     }
   }
 
+  Future<void> _pullDiff(bool silently) async {
+    final isFirstSync = !_collectionsService.hasSyncedCollections();
+    await _collectionsService.sync();
+
+    if (isFirstSync || _hasReSynced()) {
+      await _syncUpdatedCollections(silently);
+    } else {
+      final syncSinceTime = _getSinceTimeForReSync();
+      await _resyncAllCollectionsSinceTime(syncSinceTime);
+    }
+    if (!_hasReSynced()) {
+      await _markReSyncAsDone();
+    }
+  }
+
   Future<void> _syncUpdatedCollections(bool silently) async {
     final updatedCollections =
         await _collectionsService.getCollectionsToBeSynced();
@@ -159,7 +180,7 @@ class RemoteSyncService {
     }
   }
 
-  Future<bool> _uploadDiff() async {
+  Future<List<File>> _getFilesToBeUploaded() async {
     final foldersToBackUp = Configuration.instance.getPathsToBackUp();
     List<File> filesToBeUploaded;
     if (LocalSyncService.instance.hasGrantedLimitedPermissions() &&
@@ -186,7 +207,10 @@ class RemoteSyncService {
     _moveVideosToEnd(filesToBeUploaded);
     _logger.info(
         filesToBeUploaded.length.toString() + " new files to be uploaded.");
+    return filesToBeUploaded;
+  }
 
+  Future<bool> _uploadFiles(List<File> filesToBeUploaded) async {
     final updatedFileIDs = await _db.getUploadedFileIDsToBeUpdated();
     _logger.info(updatedFileIDs.length.toString() + " files updated.");
 

+ 0 - 4
lib/services/sync_service.dart

@@ -65,9 +65,6 @@ class SyncService {
       await PhotoManager.clearFileCache();
       _logger.info("Cleared file cache");
     }
-    if (LocalSyncService.instance.hasGrantedPermissions()) {
-      LocalSyncService.instance.addChangeCallback(() => sync());
-    }
   }
 
   Future<bool> existingSync() async {
@@ -155,7 +152,6 @@ class SyncService {
     await _localSyncService.onPermissionGranted(state);
     Bus.instance.fire(PermissionGrantedEvent());
     _doSync();
-    LocalSyncService.instance.addChangeCallback(() => sync());
   }
 
   void onFoldersSet(Set<String> paths) {