file_uploader.dart 18 KB


  1. import 'dart:async';
  2. import 'dart:collection';
  3. import 'dart:convert';
  4. import 'dart:io' as io;
  5. import 'dart:math';
  6. import 'package:connectivity/connectivity.dart';
  7. import 'package:dio/dio.dart';
  8. import 'package:flutter_sodium/flutter_sodium.dart';
  9. import 'package:logging/logging.dart';
  10. import 'package:photos/core/configuration.dart';
  11. import 'package:photos/core/constants.dart';
  12. import 'package:photos/core/event_bus.dart';
  13. import 'package:photos/core/network.dart';
  14. import 'package:photos/db/files_db.dart';
  15. import 'package:photos/events/subscription_purchased_event.dart';
  16. import 'package:photos/models/encryption_result.dart';
  17. import 'package:photos/models/file.dart';
  18. import 'package:photos/models/location.dart';
  19. import 'package:photos/models/upload_url.dart';
  20. import 'package:photos/repositories/file_repository.dart';
  21. import 'package:photos/services/collections_service.dart';
  22. import 'package:photos/services/sync_service.dart';
  23. import 'package:photos/utils/crypto_util.dart';
  24. import 'package:photos/utils/file_util.dart';
  25. class FileUploader {
  26. final _logger = Logger("FileUploader");
  27. final _dio = Network.instance.getDio();
  28. final _queue = LinkedHashMap<int, FileUploadItem>();
  29. final kMaximumConcurrentUploads = 4;
  30. final kMaximumThumbnailCompressionAttempts = 2;
  31. final kMaximumUploadAttempts = 4;
  32. int _currentlyUploading = 0;
  33. final _uploadURLs = Queue<UploadURL>();
  34. FileUploader._privateConstructor() {
  35. Bus.instance.on<SubscriptionPurchasedEvent>().listen((event) {
  36. _uploadURLFetchInProgress = null;
  37. });
  38. }
  39. static FileUploader instance = FileUploader._privateConstructor();
  40. Future<File> upload(File file, int collectionID) {
  41. // If the file hasn't been queued yet, queue it
  42. if (!_queue.containsKey(file.generatedID)) {
  43. final completer = Completer<File>();
  44. _queue[file.generatedID] = FileUploadItem(file, collectionID, completer);
  45. _pollQueue();
  46. return completer.future;
  47. }
  48. // If the file exists in the queue for a matching collectionID,
  49. // return the existing future
  50. final item = _queue[file.generatedID];
  51. if (item.collectionID == collectionID) {
  52. return item.completer.future;
  53. }
  54. // Else wait for the existing upload to complete,
  55. // and add it to the relevant collection
  56. return item.completer.future.then((uploadedFile) {
  57. return CollectionsService.instance
  58. .addToCollection(collectionID, [uploadedFile]).then((aVoid) {
  59. return uploadedFile;
  60. });
  61. });
  62. }
  63. Future<File> forceUpload(File file, int collectionID) async {
  64. _logger.info("Force uploading " +
  65. file.toString() +
  66. " into collection " +
  67. collectionID.toString());
  68. // If the file hasn't been queued yet, ez.
  69. if (!_queue.containsKey(file.generatedID)) {
  70. final completer = Completer<File>();
  71. _queue[file.generatedID] = FileUploadItem(
  72. file,
  73. collectionID,
  74. completer,
  75. status: UploadStatus.in_progress,
  76. );
  77. _encryptAndUploadFileToCollection(file, collectionID, forcedUpload: true);
  78. return completer.future;
  79. }
  80. var item = _queue[file.generatedID];
  81. // If the file is being uploaded right now, wait and proceed
  82. if (item.status == UploadStatus.in_progress) {
  83. final uploadedFile = await item.completer.future;
  84. if (uploadedFile.collectionID == collectionID) {
  85. // Do nothing
  86. } else {
  87. await CollectionsService.instance
  88. .addToCollection(collectionID, [uploadedFile]);
  89. }
  90. return uploadedFile;
  91. } else {
  92. // If the file is yet to be processed,
  93. // 1. Set the status to in_progress
  94. // 2. Force upload the file
  95. // 3. Add to the relevant collection
  96. item = _queue[file.generatedID];
  97. item.status = UploadStatus.in_progress;
  98. final uploadedFile = await _encryptAndUploadFileToCollection(
  99. file, collectionID,
  100. forcedUpload: true);
  101. if (item.collectionID == collectionID) {
  102. return uploadedFile;
  103. } else {
  104. await CollectionsService.instance
  105. .addToCollection(item.collectionID, [uploadedFile]);
  106. return uploadedFile;
  107. }
  108. }
  109. }
  110. void clearQueue(final Error reason) {
  111. final uploadsToBeRemoved = List<int>();
  112. _queue.entries
  113. .where((entry) => entry.value.status == UploadStatus.not_started)
  114. .forEach((pendingUpload) {
  115. uploadsToBeRemoved.add(pendingUpload.key);
  116. });
  117. for (final id in uploadsToBeRemoved) {
  118. _queue.remove(id).completer.completeError(reason);
  119. }
  120. }
  121. void _pollQueue() {
  122. if (SyncService.instance.shouldStopSync()) {
  123. clearQueue(SyncStopRequestedError());
  124. }
  125. if (_queue.length > 0 && _currentlyUploading < kMaximumConcurrentUploads) {
  126. final firstPendingEntry = _queue.entries
  127. .firstWhere((entry) => entry.value.status == UploadStatus.not_started,
  128. orElse: () => null)
  129. ?.value;
  130. if (firstPendingEntry != null) {
  131. firstPendingEntry.status = UploadStatus.in_progress;
  132. _encryptAndUploadFileToCollection(
  133. firstPendingEntry.file, firstPendingEntry.collectionID);
  134. }
  135. }
  136. }
  137. Future<File> _encryptAndUploadFileToCollection(File file, int collectionID,
  138. {bool forcedUpload = false}) async {
  139. _currentlyUploading++;
  140. try {
  141. final uploadedFile = await _tryToUpload(file, collectionID, forcedUpload);
  142. _queue.remove(file.generatedID).completer.complete(uploadedFile);
  143. return uploadedFile;
  144. } catch (e) {
  145. _queue.remove(file.generatedID).completer.completeError(e);
  146. return null;
  147. } finally {
  148. _currentlyUploading--;
  149. _pollQueue();
  150. }
  151. }
  152. Future<File> _tryToUpload(
  153. File file, int collectionID, bool forcedUpload) async {
  154. final connectivityResult = await (Connectivity().checkConnectivity());
  155. var canUploadUnderCurrentNetworkConditions =
  156. (connectivityResult == ConnectivityResult.wifi ||
  157. Configuration.instance.shouldBackupOverMobileData());
  158. if (!canUploadUnderCurrentNetworkConditions && !forcedUpload) {
  159. throw WiFiUnavailableError();
  160. }
  161. final tempDirectory = Configuration.instance.getTempDirectory();
  162. final encryptedFilePath =
  163. tempDirectory + file.generatedID.toString() + ".encrypted";
  164. final encryptedThumbnailPath =
  165. tempDirectory + file.generatedID.toString() + "_thumbnail.encrypted";
  166. var sourceFile;
  167. try {
  168. _logger.info("Trying to upload " +
  169. file.toString() +
  170. ", isForced: " +
  171. forcedUpload.toString());
  172. sourceFile = (await (await file.getAsset()).originFile);
  173. var key;
  174. var isAlreadyUploadedFile = file.uploadedFileID != null;
  175. if (isAlreadyUploadedFile) {
  176. key = decryptFileKey(file);
  177. } else {
  178. key = null;
  179. }
  180. if (io.File(encryptedFilePath).existsSync()) {
  181. io.File(encryptedFilePath).deleteSync();
  182. }
  183. final encryptedFile = io.File(encryptedFilePath);
  184. final fileAttributes = await CryptoUtil.encryptFile(
  185. sourceFile.path,
  186. encryptedFilePath,
  187. key: key,
  188. );
  189. var thumbnailData = (await (await file.getAsset()).thumbDataWithSize(
  190. THUMBNAIL_LARGE_SIZE,
  191. THUMBNAIL_LARGE_SIZE,
  192. quality: 50,
  193. ));
  194. if (thumbnailData == null) {
  195. _logger.severe("Could not generate thumbnail for " + file.toString());
  196. await FilesDB.instance.deleteLocalFile(file.localID);
  197. throw InvalidFileError();
  198. }
  199. int compressionAttempts = 0;
  200. while (thumbnailData.length > THUMBNAIL_DATA_LIMIT &&
  201. compressionAttempts < kMaximumThumbnailCompressionAttempts) {
  202. _logger.info("Thumbnail size " + thumbnailData.length.toString());
  203. thumbnailData = await compressThumbnail(thumbnailData);
  204. _logger.info(
  205. "Compressed thumbnail size " + thumbnailData.length.toString());
  206. compressionAttempts++;
  207. }
  208. final encryptedThumbnailData =
  209. CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key);
  210. if (io.File(encryptedThumbnailPath).existsSync()) {
  211. io.File(encryptedThumbnailPath).deleteSync();
  212. }
  213. final encryptedThumbnailFile = io.File(encryptedThumbnailPath);
  214. encryptedThumbnailFile
  215. .writeAsBytesSync(encryptedThumbnailData.encryptedData);
  216. final thumbnailUploadURL = await _getUploadURL();
  217. String thumbnailObjectKey =
  218. await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
  219. final fileUploadURL = await _getUploadURL();
  220. String fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
  221. // h4ck to fetch location data if missing (thank you Android Q+) lazily only during uploads
  222. if (file.location == null ||
  223. (file.location.latitude == 0 && file.location.longitude == 0)) {
  224. final latLong = await (await file.getAsset()).latlngAsync();
  225. file.location = Location(latLong.latitude, latLong.longitude);
  226. }
  227. final encryptedMetadataData = CryptoUtil.encryptChaCha(
  228. utf8.encode(jsonEncode(file.getMetadata())), fileAttributes.key);
  229. final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header);
  230. final thumbnailDecryptionHeader =
  231. Sodium.bin2base64(encryptedThumbnailData.header);
  232. final encryptedMetadata =
  233. Sodium.bin2base64(encryptedMetadataData.encryptedData);
  234. final metadataDecryptionHeader =
  235. Sodium.bin2base64(encryptedMetadataData.header);
  236. if (isAlreadyUploadedFile) {
  237. final updatedFile = await _updateFile(
  238. file,
  239. fileObjectKey,
  240. fileDecryptionHeader,
  241. thumbnailObjectKey,
  242. thumbnailDecryptionHeader,
  243. encryptedMetadata,
  244. metadataDecryptionHeader,
  245. );
  246. // Update across all collections
  247. await FilesDB.instance.updateUploadedFileAcrossCollections(updatedFile);
  248. FileRepository.instance.reloadFiles();
  249. return updatedFile;
  250. } else {
  251. final uploadedFile = await _uploadFile(
  252. file,
  253. collectionID,
  254. fileAttributes,
  255. fileObjectKey,
  256. fileDecryptionHeader,
  257. thumbnailObjectKey,
  258. thumbnailDecryptionHeader,
  259. encryptedMetadata,
  260. metadataDecryptionHeader,
  261. );
  262. await FilesDB.instance.update(uploadedFile);
  263. FileRepository.instance.reloadFiles();
  264. return uploadedFile;
  265. }
  266. } catch (e, s) {
  267. if (!(e is NoActiveSubscriptionError || e is StorageLimitExceededError)) {
  268. _logger.severe("File upload failed for " + file.toString(), e, s);
  269. }
  270. throw e;
  271. } finally {
  272. if (io.Platform.isIOS && sourceFile != null) {
  273. sourceFile.deleteSync();
  274. }
  275. if (io.File(encryptedFilePath).existsSync()) {
  276. io.File(encryptedFilePath).deleteSync();
  277. }
  278. if (io.File(encryptedThumbnailPath).existsSync()) {
  279. io.File(encryptedThumbnailPath).deleteSync();
  280. }
  281. _logger.severe("File upload attempt complete for " + file.toString());
  282. }
  283. }
  284. Future<File> _uploadFile(
  285. File file,
  286. int collectionID,
  287. EncryptionResult fileAttributes,
  288. String fileObjectKey,
  289. String fileDecryptionHeader,
  290. String thumbnailObjectKey,
  291. String thumbnailDecryptionHeader,
  292. String encryptedMetadata,
  293. String metadataDecryptionHeader,
  294. ) async {
  295. final encryptedFileKeyData = CryptoUtil.encryptSync(
  296. fileAttributes.key,
  297. CollectionsService.instance.getCollectionKey(collectionID),
  298. );
  299. final encryptedKey = Sodium.bin2base64(encryptedFileKeyData.encryptedData);
  300. final keyDecryptionNonce = Sodium.bin2base64(encryptedFileKeyData.nonce);
  301. final request = {
  302. "collectionID": collectionID,
  303. "encryptedKey": encryptedKey,
  304. "keyDecryptionNonce": keyDecryptionNonce,
  305. "file": {
  306. "objectKey": fileObjectKey,
  307. "decryptionHeader": fileDecryptionHeader,
  308. },
  309. "thumbnail": {
  310. "objectKey": thumbnailObjectKey,
  311. "decryptionHeader": thumbnailDecryptionHeader,
  312. },
  313. "metadata": {
  314. "encryptedData": encryptedMetadata,
  315. "decryptionHeader": metadataDecryptionHeader,
  316. }
  317. };
  318. try {
  319. final response = await _dio.post(
  320. Configuration.instance.getHttpEndpoint() + "/files",
  321. options: Options(
  322. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  323. data: request,
  324. );
  325. final data = response.data;
  326. file.uploadedFileID = data["id"];
  327. file.collectionID = collectionID;
  328. file.updationTime = data["updationTime"];
  329. file.ownerID = data["ownerID"];
  330. file.encryptedKey = encryptedKey;
  331. file.keyDecryptionNonce = keyDecryptionNonce;
  332. file.fileDecryptionHeader = fileDecryptionHeader;
  333. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  334. file.metadataDecryptionHeader = metadataDecryptionHeader;
  335. return file;
  336. } on DioError catch (e) {
  337. if (e.response.statusCode == 426) {
  338. _onStorageLimitExceeded();
  339. }
  340. throw e;
  341. }
  342. }
  343. Future<File> _updateFile(
  344. File file,
  345. String fileObjectKey,
  346. String fileDecryptionHeader,
  347. String thumbnailObjectKey,
  348. String thumbnailDecryptionHeader,
  349. String encryptedMetadata,
  350. String metadataDecryptionHeader,
  351. ) async {
  352. final request = {
  353. "id": file.uploadedFileID,
  354. "file": {
  355. "objectKey": fileObjectKey,
  356. "decryptionHeader": fileDecryptionHeader,
  357. },
  358. "thumbnail": {
  359. "objectKey": thumbnailObjectKey,
  360. "decryptionHeader": thumbnailDecryptionHeader,
  361. },
  362. "metadata": {
  363. "encryptedData": encryptedMetadata,
  364. "decryptionHeader": metadataDecryptionHeader,
  365. }
  366. };
  367. try {
  368. final response = await _dio.post(
  369. Configuration.instance.getHttpEndpoint() + "/files",
  370. options: Options(
  371. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  372. data: request,
  373. );
  374. final data = response.data;
  375. file.uploadedFileID = data["id"];
  376. file.updationTime = data["updationTime"];
  377. file.fileDecryptionHeader = fileDecryptionHeader;
  378. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  379. file.metadataDecryptionHeader = metadataDecryptionHeader;
  380. return file;
  381. } on DioError catch (e) {
  382. if (e.response.statusCode == 426) {
  383. _onStorageLimitExceeded();
  384. }
  385. throw e;
  386. }
  387. }
  388. Future<UploadURL> _getUploadURL() async {
  389. if (_uploadURLs.isEmpty) {
  390. await _fetchUploadURLs();
  391. }
  392. return _uploadURLs.removeFirst();
  393. }
  394. Future<void> _uploadURLFetchInProgress;
  395. Future<void> _fetchUploadURLs() async {
  396. if (_uploadURLFetchInProgress == null) {
  397. final completer = Completer<void>();
  398. _uploadURLFetchInProgress = completer.future;
  399. try {
  400. final response = await _dio.get(
  401. Configuration.instance.getHttpEndpoint() + "/files/upload-urls",
  402. queryParameters: {
  403. "count": min(42, 2 * _queue.length), // m4gic number
  404. },
  405. options: Options(
  406. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  407. );
  408. final urls = (response.data["urls"] as List)
  409. .map((e) => UploadURL.fromMap(e))
  410. .toList();
  411. _uploadURLs.addAll(urls);
  412. } on DioError catch (e) {
  413. if (e.response != null) {
  414. if (e.response.statusCode == 402) {
  415. _onExpiredSubscription();
  416. } else if (e.response.statusCode == 426) {
  417. _onStorageLimitExceeded();
  418. }
  419. }
  420. throw e;
  421. }
  422. _uploadURLFetchInProgress = null;
  423. completer.complete();
  424. }
  425. return _uploadURLFetchInProgress;
  426. }
  427. void _onStorageLimitExceeded() {
  428. clearQueue(StorageLimitExceededError());
  429. throw StorageLimitExceededError();
  430. }
  431. void _onExpiredSubscription() {
  432. clearQueue(NoActiveSubscriptionError());
  433. throw NoActiveSubscriptionError();
  434. }
  435. Future<String> _putFile(
  436. UploadURL uploadURL,
  437. io.File file, {
  438. int contentLength,
  439. int attempt = 1,
  440. }) async {
  441. final fileSize = contentLength ?? file.lengthSync();
  442. final startTime = DateTime.now().millisecondsSinceEpoch;
  443. try {
  444. await _dio.put(
  445. uploadURL.url,
  446. data: file.openRead(),
  447. options: Options(
  448. headers: {
  449. Headers.contentLengthHeader: fileSize,
  450. },
  451. ),
  452. );
  453. _logger.info("Upload speed : " +
  454. (file.lengthSync() /
  455. (DateTime.now().millisecondsSinceEpoch - startTime))
  456. .toString() +
  457. " kilo bytes per second");
  458. return uploadURL.objectKey;
  459. } on DioError catch (e) {
  460. if (e.message.startsWith(
  461. "HttpException: Content size exceeds specified contentLength.") &&
  462. attempt == 1) {
  463. return _putFile(uploadURL, file,
  464. contentLength: file.readAsBytesSync().length, attempt: 2);
  465. } else if (attempt < kMaximumUploadAttempts) {
  466. _logger.info("Retrying upload that failed ", e);
  467. final newUploadURL = await _getUploadURL();
  468. return _putFile(newUploadURL, file,
  469. contentLength: file.readAsBytesSync().length, attempt: attempt++);
  470. } else {
  471. throw e;
  472. }
  473. }
  474. }
  475. }
  476. class FileUploadItem {
  477. final File file;
  478. final int collectionID;
  479. final Completer<File> completer;
  480. UploadStatus status;
  481. FileUploadItem(
  482. this.file,
  483. this.collectionID,
  484. this.completer, {
  485. this.status = UploadStatus.not_started,
  486. });
  487. }
  488. enum UploadStatus {
  489. not_started,
  490. in_progress,
  491. completed,
  492. }
  493. class InvalidFileError extends Error {}
  494. class WiFiUnavailableError extends Error {}
  495. class SyncStopRequestedError extends Error {}
  496. class NoActiveSubscriptionError extends Error {}
  497. class StorageLimitExceededError extends Error {}