remote_sync_service.dart 18 KB


  1. import 'dart:async';
  2. import 'dart:io';
  3. import 'dart:math';
  4. import 'package:logging/logging.dart';
  5. import 'package:photos/core/configuration.dart';
  6. import 'package:photos/core/errors.dart';
  7. import 'package:photos/core/event_bus.dart';
  8. import 'package:photos/db/files_db.dart';
  9. import 'package:photos/events/collection_updated_event.dart';
  10. import 'package:photos/events/files_updated_event.dart';
  11. import 'package:photos/events/force_reload_home_gallery_event.dart';
  12. import 'package:photos/events/local_photos_updated_event.dart';
  13. import 'package:photos/events/sync_status_update_event.dart';
  14. import 'package:photos/models/file.dart';
  15. import 'package:photos/models/file_type.dart';
  16. import 'package:photos/services/app_lifecycle_service.dart';
  17. import 'package:photos/services/collections_service.dart';
  18. import 'package:photos/services/ignored_files_service.dart';
  19. import 'package:photos/services/local_sync_service.dart';
  20. import 'package:photos/services/trash_sync_service.dart';
  21. import 'package:photos/utils/diff_fetcher.dart';
  22. import 'package:photos/utils/file_uploader.dart';
  23. import 'package:photos/utils/file_util.dart';
  24. import 'package:shared_preferences/shared_preferences.dart';
  25. class RemoteSyncService {
  26. final _logger = Logger("RemoteSyncService");
  27. final _db = FilesDB.instance;
  28. final _uploader = FileUploader.instance;
  29. final _collectionsService = CollectionsService.instance;
  30. final _diffFetcher = DiffFetcher();
  31. int _completedUploads = 0;
  32. SharedPreferences _prefs;
  33. Completer<void> _existingSync;
  34. static const kHasSyncedArchiveKey = "has_synced_archive";
  35. // 28 Sept, 2021 9:03:20 AM IST
  36. static const kArchiveFeatureReleaseTime = 1632800000000000;
  37. static const kHasSyncedEditTime = "has_synced_edit_time";
  38. // 29 October, 2021 3:56:40 AM IST
  39. static const kEditTimeFeatureReleaseTime = 1635460000000000;
  40. static const kMaximumPermissibleUploadsInThrottledMode = 4;
  41. static final RemoteSyncService instance =
  42. RemoteSyncService._privateConstructor();
  43. RemoteSyncService._privateConstructor();
  44. Future<void> init() async {
  45. _prefs = await SharedPreferences.getInstance();
  46. Bus.instance.on<LocalPhotosUpdatedEvent>().listen((event) async {
  47. if (event.type == EventType.addedOrUpdated) {
  48. if (_existingSync == null) {
  49. sync();
  50. }
  51. }
  52. });
  53. }
  54. Future<void> sync({bool silently = false}) async {
  55. if (!Configuration.instance.hasConfiguredAccount()) {
  56. _logger.info("Skipping remote sync since account is not configured");
  57. return;
  58. }
  59. if (_existingSync != null) {
  60. _logger.info("Remote sync already in progress, skipping");
  61. return _existingSync.future;
  62. }
  63. _existingSync = Completer<void>();
  64. try {
  65. await _pullDiff(silently);
  66. // sync trash but consume error during initial launch.
  67. // this is to ensure that we don't pause upload due to any error during
  68. // the trash sync. Impact: We may end up re-uploading a file which was
  69. // recently trashed.
  70. await TrashSyncService.instance
  71. .syncTrash()
  72. .onError((e, s) => _logger.severe('trash sync failed', e, s));
  73. final filesToBeUploaded = await _getFilesToBeUploaded();
  74. final hasUploadedFiles = await _uploadFiles(filesToBeUploaded);
  75. if (hasUploadedFiles) {
  76. await _pullDiff(true);
  77. _existingSync.complete();
  78. _existingSync = null;
  79. final hasMoreFilesToBackup = (await _getFilesToBeUploaded()).isNotEmpty;
  80. if (hasMoreFilesToBackup && !_shouldThrottleSync()) {
  81. // Skipping a resync to ensure that files that were ignored in this
  82. // session are not processed now
  83. sync();
  84. } else {
  85. Bus.instance.fire(SyncStatusUpdate(SyncStatus.completed_backup));
  86. }
  87. } else {
  88. _existingSync.complete();
  89. _existingSync = null;
  90. }
  91. } catch (e, s) {
  92. _logger.severe("Error executing remote sync ", e, s);
  93. _existingSync.complete();
  94. _existingSync = null;
  95. // rethrow whitelisted error so that UI status can be updated correctly.
  96. if (e is UnauthorizedError ||
  97. e is NoActiveSubscriptionError ||
  98. e is WiFiUnavailableError ||
  99. e is StorageLimitExceededError ||
  100. e is SyncStopRequestedError) {
  101. rethrow;
  102. }
  103. }
  104. }
  105. Future<void> _pullDiff(bool silently) async {
  106. final isFirstSync = !_collectionsService.hasSyncedCollections();
  107. await _collectionsService.sync();
  108. if (isFirstSync || _hasReSynced()) {
  109. await _syncUpdatedCollections(silently);
  110. } else {
  111. final syncSinceTime = _getSinceTimeForReSync();
  112. await _resyncAllCollectionsSinceTime(syncSinceTime);
  113. }
  114. if (!_hasReSynced()) {
  115. await _markReSyncAsDone();
  116. }
  117. }
  118. Future<void> _syncUpdatedCollections(bool silently) async {
  119. final updatedCollections =
  120. await _collectionsService.getCollectionsToBeSynced();
  121. if (updatedCollections.isNotEmpty && !silently) {
  122. Bus.instance.fire(SyncStatusUpdate(SyncStatus.applying_remote_diff));
  123. }
  124. for (final c in updatedCollections) {
  125. await _syncCollectionDiff(
  126. c.id, _collectionsService.getCollectionSyncTime(c.id));
  127. await _collectionsService.setCollectionSyncTime(c.id, c.updationTime);
  128. }
  129. }
  130. Future<void> _resyncAllCollectionsSinceTime(int sinceTime) async {
  131. _logger.info('re-sync collections sinceTime: $sinceTime');
  132. final collections = _collectionsService.getActiveCollections();
  133. for (final c in collections) {
  134. await _syncCollectionDiff(c.id,
  135. min(_collectionsService.getCollectionSyncTime(c.id), sinceTime));
  136. await _collectionsService.setCollectionSyncTime(c.id, c.updationTime);
  137. }
  138. }
  139. Future<void> _syncCollectionDiff(int collectionID, int sinceTime) async {
  140. final diff =
  141. await _diffFetcher.getEncryptedFilesDiff(collectionID, sinceTime);
  142. if (diff.deletedFiles.isNotEmpty) {
  143. final fileIDs = diff.deletedFiles.map((f) => f.uploadedFileID).toList();
  144. final deletedFiles =
  145. (await FilesDB.instance.getFilesFromIDs(fileIDs)).values.toList();
  146. await FilesDB.instance.deleteFilesFromCollection(collectionID, fileIDs);
  147. Bus.instance.fire(CollectionUpdatedEvent(collectionID, deletedFiles,
  148. type: EventType.deletedFromRemote));
  149. Bus.instance.fire(LocalPhotosUpdatedEvent(deletedFiles,
  150. type: EventType.deletedFromRemote));
  151. }
  152. if (diff.updatedFiles.isNotEmpty) {
  153. await _storeDiff(diff.updatedFiles, collectionID);
  154. _logger.info("Updated " +
  155. diff.updatedFiles.length.toString() +
  156. " files in collection " +
  157. collectionID.toString());
  158. Bus.instance.fire(LocalPhotosUpdatedEvent(diff.updatedFiles));
  159. Bus.instance
  160. .fire(CollectionUpdatedEvent(collectionID, diff.updatedFiles));
  161. }
  162. if (diff.latestUpdatedAtTime > 0) {
  163. await _collectionsService.setCollectionSyncTime(
  164. collectionID, diff.latestUpdatedAtTime);
  165. }
  166. if (diff.hasMore) {
  167. return await _syncCollectionDiff(collectionID,
  168. _collectionsService.getCollectionSyncTime(collectionID));
  169. }
  170. }
  171. Future<List<File>> _getFilesToBeUploaded() async {
  172. final foldersToBackUp = Configuration.instance.getPathsToBackUp();
  173. List<File> filesToBeUploaded;
  174. if (LocalSyncService.instance.hasGrantedLimitedPermissions() &&
  175. foldersToBackUp.isEmpty) {
  176. filesToBeUploaded = await _db.getAllLocalFiles();
  177. } else {
  178. filesToBeUploaded =
  179. await _db.getFilesToBeUploadedWithinFolders(foldersToBackUp);
  180. }
  181. if (!Configuration.instance.shouldBackupVideos() || _shouldThrottleSync()) {
  182. filesToBeUploaded
  183. .removeWhere((element) => element.fileType == FileType.video);
  184. }
  185. if (filesToBeUploaded.isNotEmpty) {
  186. final int prevCount = filesToBeUploaded.length;
  187. final ignoredIDs = await IgnoredFilesService.instance.ignoredIDs;
  188. filesToBeUploaded.removeWhere((file) =>
  189. IgnoredFilesService.instance.shouldSkipUpload(ignoredIDs, file));
  190. if (prevCount != filesToBeUploaded.length) {
  191. _logger.info((prevCount - filesToBeUploaded.length).toString() +
  192. " files were ignored for upload");
  193. }
  194. }
  195. if (filesToBeUploaded.isEmpty) {
  196. // look for files which user manually tried to back up but they are not
  197. // uploaded yet. These files should ignore video backup & ignored files filter
  198. filesToBeUploaded = await _db.getPendingManualUploads();
  199. }
  200. _sortByTimeAndType(filesToBeUploaded);
  201. _logger.info(
  202. filesToBeUploaded.length.toString() + " new files to be uploaded.");
  203. return filesToBeUploaded;
  204. }
  205. Future<bool> _uploadFiles(List<File> filesToBeUploaded) async {
  206. final updatedFileIDs = await _db.getUploadedFileIDsToBeUpdated();
  207. _logger.info(updatedFileIDs.length.toString() + " files updated.");
  208. final editedFiles = await _db.getEditedRemoteFiles();
  209. _logger.info(editedFiles.length.toString() + " files edited.");
  210. _completedUploads = 0;
  211. int toBeUploaded =
  212. filesToBeUploaded.length + updatedFileIDs.length + editedFiles.length;
  213. if (toBeUploaded > 0) {
  214. Bus.instance.fire(SyncStatusUpdate(SyncStatus.preparing_for_upload));
  215. }
  216. final List<Future> futures = [];
  217. for (final uploadedFileID in updatedFileIDs) {
  218. if (_shouldThrottleSync() &&
  219. futures.length >= kMaximumPermissibleUploadsInThrottledMode) {
  220. _logger
  221. .info("Skipping some updated files as we are throttling uploads");
  222. break;
  223. }
  224. final file = await _db.getUploadedFileInAnyCollection(uploadedFileID);
  225. _uploadFile(file, file.collectionID, futures);
  226. }
  227. for (final file in filesToBeUploaded) {
  228. if (_shouldThrottleSync() &&
  229. futures.length >= kMaximumPermissibleUploadsInThrottledMode) {
  230. _logger.info("Skipping some new files as we are throttling uploads");
  231. break;
  232. }
  233. // prefer existing collection ID for manually uploaded files.
  234. // See https://github.com/ente-io/frame/pull/187
  235. final collectionID = file.collectionID ??
  236. (await CollectionsService.instance
  237. .getOrCreateForPath(file.deviceFolder))
  238. .id;
  239. _uploadFile(file, collectionID, futures);
  240. }
  241. for (final file in editedFiles) {
  242. if (_shouldThrottleSync() &&
  243. futures.length >= kMaximumPermissibleUploadsInThrottledMode) {
  244. _logger.info("Skipping some edited files as we are throttling uploads");
  245. break;
  246. }
  247. _uploadFile(file, file.collectionID, futures);
  248. }
  249. try {
  250. await Future.wait(futures);
  251. } on InvalidFileError {
  252. // Do nothing
  253. } on FileSystemException {
  254. // Do nothing since it's caused mostly due to concurrency issues
  255. // when the foreground app deletes temporary files, interrupting a background
  256. // upload
  257. } on LockAlreadyAcquiredError {
  258. // Do nothing
  259. } on SilentlyCancelUploadsError {
  260. // Do nothing
  261. } on UserCancelledUploadError {
  262. // Do nothing
  263. } catch (e) {
  264. rethrow;
  265. }
  266. return _completedUploads > 0;
  267. }
  268. void _uploadFile(File file, int collectionID, List<Future> futures) {
  269. final future = _uploader
  270. .upload(file, collectionID)
  271. .then((uploadedFile) => _onFileUploaded(uploadedFile));
  272. futures.add(future);
  273. }
  274. Future<void> _onFileUploaded(File file) async {
  275. Bus.instance.fire(CollectionUpdatedEvent(file.collectionID, [file]));
  276. _completedUploads++;
  277. final toBeUploadedInThisSession =
  278. FileUploader.instance.getCurrentSessionUploadCount();
  279. if (toBeUploadedInThisSession == 0) {
  280. return;
  281. }
  282. if (_completedUploads > toBeUploadedInThisSession ||
  283. _completedUploads < 0 ||
  284. toBeUploadedInThisSession < 0) {
  285. _logger.info(
  286. "Incorrect sync status",
  287. InvalidSyncStatusError("Tried to report $_completedUploads as "
  288. "uploaded out of $toBeUploadedInThisSession"));
  289. return;
  290. }
  291. Bus.instance.fire(SyncStatusUpdate(SyncStatus.in_progress,
  292. completed: _completedUploads, total: toBeUploadedInThisSession));
  293. }
  294. Future _storeDiff(List<File> diff, int collectionID) async {
  295. int existing = 0,
  296. updated = 0,
  297. remote = 0,
  298. localButUpdatedOnRemote = 0,
  299. localButAddedToNewCollectionOnRemote = 0;
  300. bool hasAnyCreationTimeChanged = false;
  301. List<File> toBeInserted = [];
  302. for (File file in diff) {
  303. final existingFiles = file.deviceFolder == null
  304. ? null
  305. : await _db.getMatchingFiles(file.title, file.deviceFolder);
  306. if (existingFiles == null || existingFiles.isEmpty) {
  307. // File uploaded from a different device.
  308. // Other rare possibilities : The local file is present on
  309. // device but it's not imported in local db due to missing permission
  310. // after reinstall (iOS selected file permissions or user revoking
  311. // permissions, or issue/delay in importing devices files.
  312. file.localID = null;
  313. toBeInserted.add(file);
  314. remote++;
  315. } else {
  316. // File exists in ente db with same title & device folder
  317. // Note: The file.generatedID might be already set inside
  318. // [DiffFetcher.getEncryptedFilesDiff]
  319. // Try to find existing file with same localID as remote file with a fallback
  320. // to finding any existing file with localID. This is needed to handle
  321. // case when localID for a file changes and the file is uploaded again in
  322. // the same collection
  323. final fileWithLocalID = existingFiles.firstWhere(
  324. (e) =>
  325. file.localID != null &&
  326. e.localID != null &&
  327. e.localID == file.localID,
  328. orElse: () => existingFiles.firstWhere((e) => e.localID != null,
  329. orElse: () => null));
  330. if (fileWithLocalID != null) {
  331. // File should ideally have the same localID
  332. if (file.localID != null && file.localID != fileWithLocalID.localID) {
  333. _logger.severe(
  334. "unexpected mismatch in localIDs remote: ${file.toString()} and existing: ${fileWithLocalID.toString()}");
  335. }
  336. file.localID = fileWithLocalID.localID;
  337. } else {
  338. file.localID = null;
  339. }
  340. bool wasUploadedOnAPreviousInstallation =
  341. existingFiles.length == 1 && existingFiles[0].collectionID == null;
  342. if (wasUploadedOnAPreviousInstallation) {
  343. file.generatedID = existingFiles[0].generatedID;
  344. if (file.modificationTime != existingFiles[0].modificationTime) {
  345. // File was updated since the app was uninstalled
  346. _logger.info("Updated since last installation: " +
  347. file.uploadedFileID.toString());
  348. file.modificationTime = existingFiles[0].modificationTime;
  349. file.updationTime = null;
  350. updated++;
  351. } else {
  352. existing++;
  353. }
  354. toBeInserted.add(file);
  355. } else {
  356. bool foundMatchingCollection = false;
  357. for (final existingFile in existingFiles) {
  358. if (file.collectionID == existingFile.collectionID &&
  359. file.uploadedFileID == existingFile.uploadedFileID) {
  360. // File was updated on remote
  361. if (file.creationTime != existingFile.creationTime) {
  362. hasAnyCreationTimeChanged = true;
  363. }
  364. foundMatchingCollection = true;
  365. file.generatedID = existingFile.generatedID;
  366. toBeInserted.add(file);
  367. await clearCache(file);
  368. localButUpdatedOnRemote++;
  369. break;
  370. }
  371. }
  372. if (!foundMatchingCollection) {
  373. // Added to a new collection
  374. toBeInserted.add(file);
  375. localButAddedToNewCollectionOnRemote++;
  376. }
  377. }
  378. }
  379. }
  380. await _db.insertMultiple(toBeInserted);
  381. _logger.info(
  382. "Diff to be deduplicated was: " +
  383. diff.length.toString() +
  384. " out of which \n" +
  385. existing.toString() +
  386. " was uploaded from device, \n" +
  387. updated.toString() +
  388. " was uploaded from device, but has been updated since and should be reuploaded, \n" +
  389. remote.toString() +
  390. " was uploaded from remote, \n" +
  391. localButUpdatedOnRemote.toString() +
  392. " was uploaded from device but updated on remote, and \n" +
  393. localButAddedToNewCollectionOnRemote.toString() +
  394. " was uploaded from device but added to a new collection on remote.",
  395. );
  396. if (hasAnyCreationTimeChanged) {
  397. Bus.instance.fire(ForceReloadHomeGalleryEvent());
  398. }
  399. }
  400. // return true if the client needs to re-sync the collections from previous
  401. // version
  402. bool _hasReSynced() {
  403. return _prefs.containsKey(kHasSyncedEditTime) &&
  404. _prefs.containsKey(kHasSyncedArchiveKey);
  405. }
  406. Future<void> _markReSyncAsDone() async {
  407. await _prefs.setBool(kHasSyncedArchiveKey, true);
  408. await _prefs.setBool(kHasSyncedEditTime, true);
  409. }
  410. int _getSinceTimeForReSync() {
  411. // re-sync from archive feature time if the client still hasn't synced
  412. // since the feature release.
  413. if (!_prefs.containsKey(kHasSyncedArchiveKey)) {
  414. return kArchiveFeatureReleaseTime;
  415. }
  416. return kEditTimeFeatureReleaseTime;
  417. }
  418. bool _shouldThrottleSync() {
  419. return Platform.isIOS && !AppLifecycleService.instance.isForeground;
  420. }
  421. // _sortByTimeAndType moves videos to end and sort by creation time (desc).
  422. // This is done to upload most recent photo first.
  423. void _sortByTimeAndType(List<File> file) {
  424. file.sort((first, second) {
  425. if (first.fileType == second.fileType) {
  426. return second.creationTime.compareTo(first.creationTime);
  427. } else if (first.fileType == FileType.video) {
  428. return 1;
  429. } else {
  430. return -1;
  431. }
  432. });
  433. }
  434. }