file_uploader.dart 18 KB


  1. import 'dart:async';
  2. import 'dart:collection';
  3. import 'dart:convert';
  4. import 'dart:io' as io;
  5. import 'dart:math';
  6. import 'package:connectivity/connectivity.dart';
  7. import 'package:dio/dio.dart';
  8. import 'package:flutter_sodium/flutter_sodium.dart';
  9. import 'package:logging/logging.dart';
  10. import 'package:photos/core/configuration.dart';
  11. import 'package:photos/core/constants.dart';
  12. import 'package:photos/core/event_bus.dart';
  13. import 'package:photos/core/network.dart';
  14. import 'package:photos/db/files_db.dart';
  15. import 'package:photos/events/subscription_purchased_event.dart';
  16. import 'package:photos/models/encryption_result.dart';
  17. import 'package:photos/models/file.dart';
  18. import 'package:photos/models/location.dart';
  19. import 'package:photos/models/upload_url.dart';
  20. import 'package:photos/repositories/file_repository.dart';
  21. import 'package:photos/services/collections_service.dart';
  22. import 'package:photos/services/sync_service.dart';
  23. import 'package:photos/utils/crypto_util.dart';
  24. import 'package:photos/utils/file_util.dart';
  25. class FileUploader {
  26. final _logger = Logger("FileUploader");
  27. final _dio = Network.instance.getDio();
  28. final _queue = LinkedHashMap<int, FileUploadItem>();
  29. final kMaximumConcurrentUploads = 4;
  30. final kMaximumThumbnailCompressionAttempts = 2;
  31. final kMaximumUploadAttempts = 4;
  32. int _currentlyUploading = 0;
  33. final _uploadURLs = Queue<UploadURL>();
  34. FileUploader._privateConstructor() {
  35. Bus.instance.on<SubscriptionPurchasedEvent>().listen((event) {
  36. _uploadURLFetchInProgress = null;
  37. });
  38. }
  39. static FileUploader instance = FileUploader._privateConstructor();
  40. Future<File> upload(File file, int collectionID) {
  41. // If the file hasn't been queued yet, queue it
  42. if (!_queue.containsKey(file.generatedID)) {
  43. final completer = Completer<File>();
  44. _queue[file.generatedID] = FileUploadItem(file, collectionID, completer);
  45. _pollQueue();
  46. return completer.future;
  47. }
  48. // If the file exists in the queue for a matching collectionID,
  49. // return the existing future
  50. final item = _queue[file.generatedID];
  51. if (item.collectionID == collectionID) {
  52. return item.completer.future;
  53. }
  54. // Else wait for the existing upload to complete,
  55. // and add it to the relevant collection
  56. return item.completer.future.then((uploadedFile) {
  57. return CollectionsService.instance
  58. .addToCollection(collectionID, [uploadedFile]).then((aVoid) {
  59. return uploadedFile;
  60. });
  61. });
  62. }
  63. Future<File> forceUpload(File file, int collectionID) async {
  64. // If the file hasn't been queued yet, ez.
  65. if (!_queue.containsKey(file.generatedID)) {
  66. final completer = Completer<File>();
  67. _queue[file.generatedID] = FileUploadItem(
  68. file,
  69. collectionID,
  70. completer,
  71. status: UploadStatus.in_progress,
  72. );
  73. _encryptAndUploadFileToCollection(file, collectionID, forcedUpload: true);
  74. return completer.future;
  75. }
  76. var item = _queue[file.generatedID];
  77. // If the file is being uploaded right now, wait and proceed
  78. if (item.status == UploadStatus.in_progress) {
  79. return item.completer.future.then((uploadedFile) async {
  80. if (uploadedFile.collectionID == collectionID) {
  81. // Do nothing
  82. return uploadedFile;
  83. } else {
  84. return CollectionsService.instance
  85. .addToCollection(collectionID, [uploadedFile]).then((aVoid) {
  86. return uploadedFile;
  87. });
  88. }
  89. });
  90. } else {
  91. // If the file is yet to be processed,
  92. // 1. Remove it from the queue,
  93. // 2. Force upload the current file
  94. // 3. Trigger the callback for the original request
  95. item = _queue.remove(file.generatedID);
  96. return _encryptAndUploadFileToCollection(file, collectionID,
  97. forcedUpload: true)
  98. .then((uploadedFile) {
  99. if (item.collectionID == collectionID) {
  100. item.completer.complete(uploadedFile);
  101. return uploadedFile;
  102. } else {
  103. CollectionsService.instance
  104. .addToCollection(item.collectionID, [uploadedFile]).then((aVoid) {
  105. item.completer.complete(uploadedFile);
  106. });
  107. return uploadedFile;
  108. }
  109. });
  110. }
  111. }
  112. void clearQueue(final Error reason) {
  113. final uploadsToBeRemoved = List<int>();
  114. _queue.entries
  115. .where((entry) => entry.value.status == UploadStatus.not_started)
  116. .forEach((pendingUpload) {
  117. uploadsToBeRemoved.add(pendingUpload.key);
  118. });
  119. for (final id in uploadsToBeRemoved) {
  120. _queue.remove(id).completer.completeError(reason);
  121. }
  122. }
  123. void _pollQueue() {
  124. if (SyncService.instance.shouldStopSync()) {
  125. clearQueue(SyncStopRequestedError());
  126. }
  127. if (_queue.length > 0 && _currentlyUploading < kMaximumConcurrentUploads) {
  128. final firstPendingEntry = _queue.entries
  129. .firstWhere((entry) => entry.value.status == UploadStatus.not_started,
  130. orElse: () => null)
  131. ?.value;
  132. if (firstPendingEntry != null) {
  133. firstPendingEntry.status = UploadStatus.in_progress;
  134. _encryptAndUploadFileToCollection(
  135. firstPendingEntry.file, firstPendingEntry.collectionID);
  136. }
  137. }
  138. }
  139. Future<File> _encryptAndUploadFileToCollection(File file, int collectionID,
  140. {bool forcedUpload = false}) async {
  141. _currentlyUploading++;
  142. try {
  143. final uploadedFile = await _tryToUpload(file, collectionID, forcedUpload);
  144. _queue.remove(file.generatedID).completer.complete(uploadedFile);
  145. } catch (e) {
  146. _queue.remove(file.generatedID).completer.completeError(e);
  147. } finally {
  148. _currentlyUploading--;
  149. _pollQueue();
  150. }
  151. return null;
  152. }
  153. Future<File> _tryToUpload(
  154. File file, int collectionID, bool forcedUpload) async {
  155. final connectivityResult = await (Connectivity().checkConnectivity());
  156. var canUploadUnderCurrentNetworkConditions =
  157. (connectivityResult == ConnectivityResult.wifi ||
  158. Configuration.instance.shouldBackupOverMobileData());
  159. if (!canUploadUnderCurrentNetworkConditions && !forcedUpload) {
  160. throw WiFiUnavailableError();
  161. }
  162. final tempDirectory = Configuration.instance.getTempDirectory();
  163. final encryptedFilePath =
  164. tempDirectory + file.generatedID.toString() + ".encrypted";
  165. final encryptedThumbnailPath =
  166. tempDirectory + file.generatedID.toString() + "_thumbnail.encrypted";
  167. var sourceFile;
  168. try {
  169. // Placing this in the try-catch block to safe guard against: https://github.com/CaiJingLong/flutter_photo_manager/issues/405
  170. _logger.info("Trying to upload " + file.toString());
  171. sourceFile = (await (await file.getAsset()).originFile);
  172. var key;
  173. var isAlreadyUploadedFile = file.uploadedFileID != null;
  174. if (isAlreadyUploadedFile) {
  175. key = decryptFileKey(file);
  176. } else {
  177. key = null;
  178. }
  179. if (io.File(encryptedFilePath).existsSync()) {
  180. io.File(encryptedFilePath).deleteSync();
  181. }
  182. final encryptedFile = io.File(encryptedFilePath);
  183. final fileAttributes = await CryptoUtil.encryptFile(
  184. sourceFile.path,
  185. encryptedFilePath,
  186. key: key,
  187. );
  188. var thumbnailData = (await (await file.getAsset()).thumbDataWithSize(
  189. THUMBNAIL_LARGE_SIZE,
  190. THUMBNAIL_LARGE_SIZE,
  191. quality: 50,
  192. ));
  193. if (thumbnailData == null) {
  194. _logger.severe("Could not generate thumbnail for " + file.toString());
  195. await FilesDB.instance.deleteLocalFile(file.localID);
  196. throw InvalidFileError();
  197. }
  198. int compressionAttempts = 0;
  199. while (thumbnailData.length > THUMBNAIL_DATA_LIMIT &&
  200. compressionAttempts < kMaximumThumbnailCompressionAttempts) {
  201. _logger.info("Thumbnail size " + thumbnailData.length.toString());
  202. thumbnailData = await compressThumbnail(thumbnailData);
  203. _logger.info(
  204. "Compressed thumbnail size " + thumbnailData.length.toString());
  205. compressionAttempts++;
  206. }
  207. final encryptedThumbnailData =
  208. CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key);
  209. if (io.File(encryptedThumbnailPath).existsSync()) {
  210. io.File(encryptedThumbnailPath).deleteSync();
  211. }
  212. final encryptedThumbnailFile = io.File(encryptedThumbnailPath);
  213. encryptedThumbnailFile
  214. .writeAsBytesSync(encryptedThumbnailData.encryptedData);
  215. final fileUploadURL = await _getUploadURL();
  216. String fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
  217. final thumbnailUploadURL = await _getUploadURL();
  218. String thumbnailObjectKey =
  219. await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
  220. // h4ck to fetch location data if missing (thank you Android Q+) lazily only during uploads
  221. if (file.location == null ||
  222. (file.location.latitude == 0 && file.location.longitude == 0)) {
  223. final latLong = await (await file.getAsset()).latlngAsync();
  224. file.location = Location(latLong.latitude, latLong.longitude);
  225. }
  226. final encryptedMetadataData = CryptoUtil.encryptChaCha(
  227. utf8.encode(jsonEncode(file.getMetadata())), fileAttributes.key);
  228. final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header);
  229. final thumbnailDecryptionHeader =
  230. Sodium.bin2base64(encryptedThumbnailData.header);
  231. final encryptedMetadata =
  232. Sodium.bin2base64(encryptedMetadataData.encryptedData);
  233. final metadataDecryptionHeader =
  234. Sodium.bin2base64(encryptedMetadataData.header);
  235. if (isAlreadyUploadedFile) {
  236. final updatedFile = await _updateFile(
  237. file,
  238. fileObjectKey,
  239. fileDecryptionHeader,
  240. thumbnailObjectKey,
  241. thumbnailDecryptionHeader,
  242. encryptedMetadata,
  243. metadataDecryptionHeader,
  244. );
  245. // Update across all collections
  246. await FilesDB.instance.updateUploadedFileAcrossCollections(updatedFile);
  247. FileRepository.instance.reloadFiles();
  248. return updatedFile;
  249. } else {
  250. final uploadedFile = await _uploadFile(
  251. file,
  252. collectionID,
  253. fileAttributes,
  254. fileObjectKey,
  255. fileDecryptionHeader,
  256. thumbnailObjectKey,
  257. thumbnailDecryptionHeader,
  258. encryptedMetadata,
  259. metadataDecryptionHeader,
  260. );
  261. await FilesDB.instance.update(uploadedFile);
  262. FileRepository.instance.reloadFiles();
  263. return uploadedFile;
  264. }
  265. } catch (e, s) {
  266. if (!(e is NoActiveSubscriptionError)) {
  267. _logger.severe(
  268. "File upload failed for " + file.generatedID.toString(), e, s);
  269. }
  270. throw e;
  271. } finally {
  272. if (io.Platform.isIOS && sourceFile != null) {
  273. sourceFile.deleteSync();
  274. }
  275. if (io.File(encryptedFilePath).existsSync()) {
  276. io.File(encryptedFilePath).deleteSync();
  277. }
  278. if (io.File(encryptedThumbnailPath).existsSync()) {
  279. io.File(encryptedThumbnailPath).deleteSync();
  280. }
  281. }
  282. }
  283. Future<File> _uploadFile(
  284. File file,
  285. int collectionID,
  286. EncryptionResult fileAttributes,
  287. String fileObjectKey,
  288. String fileDecryptionHeader,
  289. String thumbnailObjectKey,
  290. String thumbnailDecryptionHeader,
  291. String encryptedMetadata,
  292. String metadataDecryptionHeader,
  293. ) async {
  294. final encryptedFileKeyData = CryptoUtil.encryptSync(
  295. fileAttributes.key,
  296. CollectionsService.instance.getCollectionKey(collectionID),
  297. );
  298. final encryptedKey = Sodium.bin2base64(encryptedFileKeyData.encryptedData);
  299. final keyDecryptionNonce = Sodium.bin2base64(encryptedFileKeyData.nonce);
  300. final request = {
  301. "collectionID": collectionID,
  302. "encryptedKey": encryptedKey,
  303. "keyDecryptionNonce": keyDecryptionNonce,
  304. "file": {
  305. "objectKey": fileObjectKey,
  306. "decryptionHeader": fileDecryptionHeader,
  307. },
  308. "thumbnail": {
  309. "objectKey": thumbnailObjectKey,
  310. "decryptionHeader": thumbnailDecryptionHeader,
  311. },
  312. "metadata": {
  313. "encryptedData": encryptedMetadata,
  314. "decryptionHeader": metadataDecryptionHeader,
  315. }
  316. };
  317. try {
  318. final response = await _dio.post(
  319. Configuration.instance.getHttpEndpoint() + "/files",
  320. options: Options(
  321. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  322. data: request,
  323. );
  324. final data = response.data;
  325. file.uploadedFileID = data["id"];
  326. file.collectionID = collectionID;
  327. file.updationTime = data["updationTime"];
  328. file.ownerID = data["ownerID"];
  329. file.encryptedKey = encryptedKey;
  330. file.keyDecryptionNonce = keyDecryptionNonce;
  331. file.fileDecryptionHeader = fileDecryptionHeader;
  332. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  333. file.metadataDecryptionHeader = metadataDecryptionHeader;
  334. return file;
  335. } on DioError catch (e) {
  336. if (e.response.statusCode == 426) {
  337. _onStorageLimitExceeded();
  338. }
  339. throw e;
  340. }
  341. }
  342. Future<File> _updateFile(
  343. File file,
  344. String fileObjectKey,
  345. String fileDecryptionHeader,
  346. String thumbnailObjectKey,
  347. String thumbnailDecryptionHeader,
  348. String encryptedMetadata,
  349. String metadataDecryptionHeader,
  350. ) async {
  351. final request = {
  352. "id": file.uploadedFileID,
  353. "file": {
  354. "objectKey": fileObjectKey,
  355. "decryptionHeader": fileDecryptionHeader,
  356. },
  357. "thumbnail": {
  358. "objectKey": thumbnailObjectKey,
  359. "decryptionHeader": thumbnailDecryptionHeader,
  360. },
  361. "metadata": {
  362. "encryptedData": encryptedMetadata,
  363. "decryptionHeader": metadataDecryptionHeader,
  364. }
  365. };
  366. try {
  367. final response = await _dio.post(
  368. Configuration.instance.getHttpEndpoint() + "/files",
  369. options: Options(
  370. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  371. data: request,
  372. );
  373. final data = response.data;
  374. file.uploadedFileID = data["id"];
  375. file.updationTime = data["updationTime"];
  376. file.fileDecryptionHeader = fileDecryptionHeader;
  377. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  378. file.metadataDecryptionHeader = metadataDecryptionHeader;
  379. return file;
  380. } on DioError catch (e) {
  381. if (e.response.statusCode == 426) {
  382. _onStorageLimitExceeded();
  383. }
  384. throw e;
  385. }
  386. }
  387. Future<UploadURL> _getUploadURL() async {
  388. if (_uploadURLs.isEmpty) {
  389. await _fetchUploadURLs();
  390. }
  391. return _uploadURLs.removeFirst();
  392. }
  393. Future<void> _uploadURLFetchInProgress;
  394. Future<void> _fetchUploadURLs() async {
  395. if (_uploadURLFetchInProgress == null) {
  396. final completer = Completer<void>();
  397. _uploadURLFetchInProgress = completer.future;
  398. try {
  399. final response = await _dio.get(
  400. Configuration.instance.getHttpEndpoint() + "/files/upload-urls",
  401. queryParameters: {
  402. "count": min(42, 2 * _queue.length), // m4gic number
  403. },
  404. options: Options(
  405. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  406. );
  407. final urls = (response.data["urls"] as List)
  408. .map((e) => UploadURL.fromMap(e))
  409. .toList();
  410. _uploadURLs.addAll(urls);
  411. } on DioError catch (e) {
  412. if (e.response != null) {
  413. if (e.response.statusCode == 402) {
  414. _onExpiredSubscription();
  415. } else if (e.response.statusCode == 426) {
  416. _onStorageLimitExceeded();
  417. }
  418. }
  419. throw e;
  420. }
  421. _uploadURLFetchInProgress = null;
  422. completer.complete();
  423. }
  424. return _uploadURLFetchInProgress;
  425. }
  426. void _onStorageLimitExceeded() {
  427. clearQueue(StorageLimitExceededError());
  428. throw StorageLimitExceededError();
  429. }
  430. void _onExpiredSubscription() {
  431. clearQueue(NoActiveSubscriptionError());
  432. throw NoActiveSubscriptionError();
  433. }
  434. Future<String> _putFile(
  435. UploadURL uploadURL,
  436. io.File file, {
  437. int contentLength,
  438. int attempt = 1,
  439. }) async {
  440. final fileSize = contentLength ?? file.lengthSync();
  441. final startTime = DateTime.now().millisecondsSinceEpoch;
  442. _logger.info(
  443. "Putting file of size " + fileSize.toString() + " to " + uploadURL.url);
  444. try {
  445. await _dio.put(
  446. uploadURL.url,
  447. data: file.openRead(),
  448. options: Options(
  449. headers: {
  450. Headers.contentLengthHeader: fileSize,
  451. },
  452. ),
  453. );
  454. _logger.info("Upload speed : " +
  455. (file.lengthSync() /
  456. (DateTime.now().millisecondsSinceEpoch - startTime))
  457. .toString() +
  458. " kilo bytes per second");
  459. return uploadURL.objectKey;
  460. } on DioError catch (e) {
  461. if (e.message.startsWith(
  462. "HttpException: Content size exceeds specified contentLength.") &&
  463. attempt == 1) {
  464. return _putFile(uploadURL, file,
  465. contentLength: file.readAsBytesSync().length, attempt: 2);
  466. } else if (attempt < kMaximumUploadAttempts) {
  467. _logger.info("Retrying upload that failed ", e);
  468. final newUploadURL = await _getUploadURL();
  469. return _putFile(newUploadURL, file,
  470. contentLength: file.readAsBytesSync().length, attempt: attempt++);
  471. } else {
  472. throw e;
  473. }
  474. }
  475. }
  476. }
  477. class FileUploadItem {
  478. final File file;
  479. final int collectionID;
  480. final Completer<File> completer;
  481. UploadStatus status;
  482. FileUploadItem(
  483. this.file,
  484. this.collectionID,
  485. this.completer, {
  486. this.status = UploadStatus.not_started,
  487. });
  488. }
  489. enum UploadStatus {
  490. not_started,
  491. in_progress,
  492. completed,
  493. }
  494. class InvalidFileError extends Error {}
  495. class WiFiUnavailableError extends Error {}
  496. class SyncStopRequestedError extends Error {}
  497. class NoActiveSubscriptionError extends Error {}
  498. class StorageLimitExceededError extends Error {}