file_uploader.dart 17 KB


  1. import 'dart:async';
  2. import 'dart:collection';
  3. import 'dart:convert';
  4. import 'dart:io' as io;
  5. import 'dart:math';
  6. import 'package:connectivity/connectivity.dart';
  7. import 'package:dio/dio.dart';
  8. import 'package:flutter_sodium/flutter_sodium.dart';
  9. import 'package:logging/logging.dart';
  10. import 'package:photos/core/configuration.dart';
  11. import 'package:photos/core/constants.dart';
  12. import 'package:photos/core/event_bus.dart';
  13. import 'package:photos/core/network.dart';
  14. import 'package:photos/db/files_db.dart';
  15. import 'package:photos/events/subscription_purchased_event.dart';
  16. import 'package:photos/models/encryption_result.dart';
  17. import 'package:photos/models/file.dart';
  18. import 'package:photos/models/location.dart';
  19. import 'package:photos/models/upload_url.dart';
  20. import 'package:photos/services/collections_service.dart';
  21. import 'package:photos/services/sync_service.dart';
  22. import 'package:photos/utils/crypto_util.dart';
  23. import 'package:photos/utils/file_util.dart';
  24. class FileUploader {
  25. final _logger = Logger("FileUploader");
  26. final _dio = Network.instance.getDio();
  27. final _queue = LinkedHashMap<int, FileUploadItem>();
  28. final kMaximumConcurrentUploads = 4;
  29. final kMaximumThumbnailCompressionAttempts = 2;
  30. int _currentlyUploading = 0;
  31. final _uploadURLs = Queue<UploadURL>();
  32. FileUploader._privateConstructor() {
  33. Bus.instance.on<SubscriptionPurchasedEvent>().listen((event) {
  34. _uploadURLFetchInProgress = null;
  35. });
  36. }
  37. static FileUploader instance = FileUploader._privateConstructor();
  38. Future<File> upload(File file, int collectionID) {
  39. // If the file hasn't been queued yet, queue it
  40. if (!_queue.containsKey(file.generatedID)) {
  41. final completer = Completer<File>();
  42. _queue[file.generatedID] = FileUploadItem(file, collectionID, completer);
  43. _pollQueue();
  44. return completer.future;
  45. }
  46. // If the file exists in the queue for a matching collectionID,
  47. // return the existing future
  48. final item = _queue[file.generatedID];
  49. if (item.collectionID == collectionID) {
  50. return item.completer.future;
  51. }
  52. // Else wait for the existing upload to complete,
  53. // and add it to the relevant collection
  54. return item.completer.future.then((uploadedFile) {
  55. return CollectionsService.instance
  56. .addToCollection(collectionID, [uploadedFile]).then((aVoid) {
  57. return uploadedFile;
  58. });
  59. });
  60. }
  61. Future<File> forceUpload(File file, int collectionID) async {
  62. // If the file hasn't been queued yet, ez.
  63. if (!_queue.containsKey(file.generatedID)) {
  64. final completer = Completer<File>();
  65. _queue[file.generatedID] = FileUploadItem(
  66. file,
  67. collectionID,
  68. completer,
  69. status: UploadStatus.in_progress,
  70. );
  71. _encryptAndUploadFileToCollection(file, collectionID, forcedUpload: true);
  72. return completer.future;
  73. }
  74. var item = _queue[file.generatedID];
  75. // If the file is being uploaded right now, wait and proceed
  76. if (item.status == UploadStatus.in_progress) {
  77. return item.completer.future.then((uploadedFile) async {
  78. if (uploadedFile.collectionID == collectionID) {
  79. // Do nothing
  80. return uploadedFile;
  81. } else {
  82. return CollectionsService.instance
  83. .addToCollection(collectionID, [uploadedFile]).then((aVoid) {
  84. return uploadedFile;
  85. });
  86. }
  87. });
  88. } else {
  89. // If the file is yet to be processed,
  90. // 1. Remove it from the queue,
  91. // 2. Force upload the current file
  92. // 3. Trigger the callback for the original request
  93. item = _queue.remove(file.generatedID);
  94. return _encryptAndUploadFileToCollection(file, collectionID,
  95. forcedUpload: true)
  96. .then((uploadedFile) {
  97. if (item.collectionID == collectionID) {
  98. item.completer.complete(uploadedFile);
  99. return uploadedFile;
  100. } else {
  101. CollectionsService.instance
  102. .addToCollection(item.collectionID, [uploadedFile]).then((aVoid) {
  103. item.completer.complete(uploadedFile);
  104. });
  105. return uploadedFile;
  106. }
  107. });
  108. }
  109. }
  110. void clearQueue() {
  111. final uploadsToBeRemoved = List<int>();
  112. _queue.entries
  113. .where((entry) => entry.value.status == UploadStatus.not_started)
  114. .forEach((pendingUpload) {
  115. uploadsToBeRemoved.add(pendingUpload.key);
  116. });
  117. for (final id in uploadsToBeRemoved) {
  118. _queue.remove(id).completer.completeError(SyncStopRequestedError());
  119. }
  120. }
  121. void _pollQueue() {
  122. if (SyncService.instance.shouldStopSync()) {
  123. clearQueue();
  124. }
  125. if (_queue.length > 0 && _currentlyUploading < kMaximumConcurrentUploads) {
  126. final firstPendingEntry = _queue.entries
  127. .firstWhere((entry) => entry.value.status == UploadStatus.not_started,
  128. orElse: () => null)
  129. ?.value;
  130. if (firstPendingEntry != null) {
  131. firstPendingEntry.status = UploadStatus.in_progress;
  132. _encryptAndUploadFileToCollection(
  133. firstPendingEntry.file, firstPendingEntry.collectionID);
  134. }
  135. }
  136. }
  137. Future<File> _encryptAndUploadFileToCollection(File file, int collectionID,
  138. {bool forcedUpload = false}) async {
  139. _currentlyUploading++;
  140. try {
  141. final uploadedFile = await _tryToUpload(file, collectionID, forcedUpload);
  142. _queue.remove(file.generatedID).completer.complete(uploadedFile);
  143. } catch (e) {
  144. _queue.remove(file.generatedID).completer.completeError(e);
  145. } finally {
  146. _currentlyUploading--;
  147. _pollQueue();
  148. }
  149. return null;
  150. }
  151. Future<File> _tryToUpload(
  152. File file, int collectionID, bool forcedUpload) async {
  153. final connectivityResult = await (Connectivity().checkConnectivity());
  154. var canUploadUnderCurrentNetworkConditions =
  155. (connectivityResult == ConnectivityResult.wifi ||
  156. Configuration.instance.shouldBackupOverMobileData());
  157. if (!canUploadUnderCurrentNetworkConditions && !forcedUpload) {
  158. throw WiFiUnavailableError();
  159. }
  160. final tempDirectory = Configuration.instance.getTempDirectory();
  161. final encryptedFilePath =
  162. tempDirectory + file.generatedID.toString() + ".encrypted";
  163. final encryptedThumbnailPath =
  164. tempDirectory + file.generatedID.toString() + "_thumbnail.encrypted";
  165. var sourceFile;
  166. try {
  167. // Placing this in the try-catch block to safe guard against: https://github.com/CaiJingLong/flutter_photo_manager/issues/405
  168. _logger.info("Trying to upload " + file.toString());
  169. sourceFile = (await (await file.getAsset()).originFile);
  170. var key;
  171. var isAlreadyUploadedFile = file.uploadedFileID != null;
  172. if (isAlreadyUploadedFile) {
  173. key = decryptFileKey(file);
  174. } else {
  175. key = null;
  176. }
  177. if (io.File(encryptedFilePath).existsSync()) {
  178. io.File(encryptedFilePath).deleteSync();
  179. }
  180. final encryptedFile = io.File(encryptedFilePath);
  181. final fileAttributes = await CryptoUtil.encryptFile(
  182. sourceFile.path,
  183. encryptedFilePath,
  184. key: key,
  185. );
  186. var thumbnailData = (await (await file.getAsset()).thumbDataWithSize(
  187. THUMBNAIL_LARGE_SIZE,
  188. THUMBNAIL_LARGE_SIZE,
  189. quality: 50,
  190. ));
  191. if (thumbnailData == null) {
  192. _logger.severe("Could not generate thumbnail for " + file.toString());
  193. await FilesDB.instance.deleteLocalFile(file.localID);
  194. throw InvalidFileError();
  195. }
  196. int compressionAttempts = 0;
  197. while (thumbnailData.length > THUMBNAIL_DATA_LIMIT &&
  198. compressionAttempts < kMaximumThumbnailCompressionAttempts) {
  199. _logger.info("Thumbnail size " + thumbnailData.length.toString());
  200. thumbnailData = await compressThumbnail(thumbnailData);
  201. _logger.info(
  202. "Compressed thumbnail size " + thumbnailData.length.toString());
  203. compressionAttempts++;
  204. }
  205. final encryptedThumbnailData =
  206. CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key);
  207. if (io.File(encryptedThumbnailPath).existsSync()) {
  208. io.File(encryptedThumbnailPath).deleteSync();
  209. }
  210. final encryptedThumbnailFile = io.File(encryptedThumbnailPath);
  211. encryptedThumbnailFile
  212. .writeAsBytesSync(encryptedThumbnailData.encryptedData);
  213. final fileUploadURL = await _getUploadURL();
  214. String fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
  215. final thumbnailUploadURL = await _getUploadURL();
  216. String thumbnailObjectKey =
  217. await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
  218. // h4ck to fetch location data if missing (thank you Android Q+) lazily only during uploads
  219. if (file.location == null ||
  220. (file.location.latitude == 0 && file.location.longitude == 0)) {
  221. final latLong = await (await file.getAsset()).latlngAsync();
  222. file.location = Location(latLong.latitude, latLong.longitude);
  223. }
  224. final encryptedMetadataData = CryptoUtil.encryptChaCha(
  225. utf8.encode(jsonEncode(file.getMetadata())), fileAttributes.key);
  226. final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header);
  227. final thumbnailDecryptionHeader =
  228. Sodium.bin2base64(encryptedThumbnailData.header);
  229. final encryptedMetadata =
  230. Sodium.bin2base64(encryptedMetadataData.encryptedData);
  231. final metadataDecryptionHeader =
  232. Sodium.bin2base64(encryptedMetadataData.header);
  233. if (isAlreadyUploadedFile) {
  234. final updatedFile = await _updateFile(
  235. file,
  236. fileObjectKey,
  237. fileDecryptionHeader,
  238. thumbnailObjectKey,
  239. thumbnailDecryptionHeader,
  240. encryptedMetadata,
  241. metadataDecryptionHeader,
  242. );
  243. // Update across all collections
  244. await FilesDB.instance.updateUploadedFileAcrossCollections(updatedFile);
  245. return updatedFile;
  246. } else {
  247. final uploadedFile = await _uploadFile(
  248. file,
  249. collectionID,
  250. fileAttributes,
  251. fileObjectKey,
  252. fileDecryptionHeader,
  253. thumbnailObjectKey,
  254. thumbnailDecryptionHeader,
  255. encryptedMetadata,
  256. metadataDecryptionHeader,
  257. );
  258. await FilesDB.instance.update(uploadedFile);
  259. return uploadedFile;
  260. }
  261. } catch (e, s) {
  262. if (!(e is NoActiveSubscriptionError)) {
  263. _logger.severe(
  264. "File upload failed for " + file.generatedID.toString(), e, s);
  265. }
  266. throw e;
  267. } finally {
  268. if (io.Platform.isIOS && sourceFile != null) {
  269. sourceFile.deleteSync();
  270. }
  271. if (io.File(encryptedFilePath).existsSync()) {
  272. io.File(encryptedFilePath).deleteSync();
  273. }
  274. if (io.File(encryptedThumbnailPath).existsSync()) {
  275. io.File(encryptedThumbnailPath).deleteSync();
  276. }
  277. }
  278. }
  279. Future<File> _uploadFile(
  280. File file,
  281. int collectionID,
  282. EncryptionResult fileAttributes,
  283. String fileObjectKey,
  284. String fileDecryptionHeader,
  285. String thumbnailObjectKey,
  286. String thumbnailDecryptionHeader,
  287. String encryptedMetadata,
  288. String metadataDecryptionHeader,
  289. ) async {
  290. final encryptedFileKeyData = CryptoUtil.encryptSync(
  291. fileAttributes.key,
  292. CollectionsService.instance.getCollectionKey(collectionID),
  293. );
  294. final encryptedKey = Sodium.bin2base64(encryptedFileKeyData.encryptedData);
  295. final keyDecryptionNonce = Sodium.bin2base64(encryptedFileKeyData.nonce);
  296. final request = {
  297. "collectionID": collectionID,
  298. "encryptedKey": encryptedKey,
  299. "keyDecryptionNonce": keyDecryptionNonce,
  300. "file": {
  301. "objectKey": fileObjectKey,
  302. "decryptionHeader": fileDecryptionHeader,
  303. },
  304. "thumbnail": {
  305. "objectKey": thumbnailObjectKey,
  306. "decryptionHeader": thumbnailDecryptionHeader,
  307. },
  308. "metadata": {
  309. "encryptedData": encryptedMetadata,
  310. "decryptionHeader": metadataDecryptionHeader,
  311. }
  312. };
  313. try {
  314. final response = await _dio.post(
  315. Configuration.instance.getHttpEndpoint() + "/files",
  316. options: Options(
  317. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  318. data: request,
  319. );
  320. final data = response.data;
  321. file.uploadedFileID = data["id"];
  322. file.collectionID = collectionID;
  323. file.updationTime = data["updationTime"];
  324. file.ownerID = data["ownerID"];
  325. file.encryptedKey = encryptedKey;
  326. file.keyDecryptionNonce = keyDecryptionNonce;
  327. file.fileDecryptionHeader = fileDecryptionHeader;
  328. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  329. file.metadataDecryptionHeader = metadataDecryptionHeader;
  330. return file;
  331. } on DioError catch (e) {
  332. if (e.response.statusCode == 426) {
  333. throw StorageLimitExceededError();
  334. }
  335. throw e;
  336. }
  337. }
  338. Future<File> _updateFile(
  339. File file,
  340. String fileObjectKey,
  341. String fileDecryptionHeader,
  342. String thumbnailObjectKey,
  343. String thumbnailDecryptionHeader,
  344. String encryptedMetadata,
  345. String metadataDecryptionHeader,
  346. ) async {
  347. final request = {
  348. "id": file.uploadedFileID,
  349. "file": {
  350. "objectKey": fileObjectKey,
  351. "decryptionHeader": fileDecryptionHeader,
  352. },
  353. "thumbnail": {
  354. "objectKey": thumbnailObjectKey,
  355. "decryptionHeader": thumbnailDecryptionHeader,
  356. },
  357. "metadata": {
  358. "encryptedData": encryptedMetadata,
  359. "decryptionHeader": metadataDecryptionHeader,
  360. }
  361. };
  362. try {
  363. final response = await _dio.post(
  364. Configuration.instance.getHttpEndpoint() + "/files",
  365. options: Options(
  366. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  367. data: request,
  368. );
  369. final data = response.data;
  370. file.uploadedFileID = data["id"];
  371. file.updationTime = data["updationTime"];
  372. file.fileDecryptionHeader = fileDecryptionHeader;
  373. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  374. file.metadataDecryptionHeader = metadataDecryptionHeader;
  375. return file;
  376. } on DioError catch (e) {
  377. if (e.response.statusCode == 426) {
  378. throw StorageLimitExceededError();
  379. }
  380. throw e;
  381. }
  382. }
  383. Future<UploadURL> _getUploadURL() async {
  384. if (_uploadURLs.isEmpty) {
  385. await _fetchUploadURLs();
  386. }
  387. return _uploadURLs.removeFirst();
  388. }
  389. Future<void> _uploadURLFetchInProgress;
  390. Future<void> _fetchUploadURLs() async {
  391. if (_uploadURLFetchInProgress == null) {
  392. final completer = Completer<void>();
  393. _uploadURLFetchInProgress = completer.future;
  394. try {
  395. final response = await _dio.get(
  396. Configuration.instance.getHttpEndpoint() + "/files/upload-urls",
  397. queryParameters: {
  398. "count": min(42, 2 * _queue.length), // m4gic number
  399. },
  400. options: Options(
  401. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  402. );
  403. final urls = (response.data["urls"] as List)
  404. .map((e) => UploadURL.fromMap(e))
  405. .toList();
  406. _uploadURLs.addAll(urls);
  407. } on DioError catch (e) {
  408. if (e.response.statusCode == 402) {
  409. throw NoActiveSubscriptionError();
  410. }
  411. throw e;
  412. }
  413. _uploadURLFetchInProgress = null;
  414. completer.complete();
  415. }
  416. return _uploadURLFetchInProgress;
  417. }
  418. Future<String> _putFile(
  419. UploadURL uploadURL,
  420. io.File file, {
  421. int contentLength,
  422. int attempt = 1,
  423. }) async {
  424. final fileSize = contentLength ?? file.lengthSync();
  425. final startTime = DateTime.now().millisecondsSinceEpoch;
  426. _logger.info(
  427. "Putting file of size " + fileSize.toString() + " to " + uploadURL.url);
  428. try {
  429. await _dio.put(
  430. uploadURL.url,
  431. data: file.openRead(),
  432. options: Options(
  433. headers: {
  434. Headers.contentLengthHeader: fileSize,
  435. },
  436. ),
  437. );
  438. _logger.info("Upload speed : " +
  439. (file.lengthSync() /
  440. (DateTime.now().millisecondsSinceEpoch - startTime))
  441. .toString() +
  442. " kilo bytes per second");
  443. return uploadURL.objectKey;
  444. } on DioError catch (e) {
  445. if (e.message.startsWith(
  446. "HttpException: Content size exceeds specified contentLength.") &&
  447. attempt == 1) {
  448. return _putFile(uploadURL, file,
  449. contentLength: file.readAsBytesSync().length, attempt: 2);
  450. } else {
  451. throw e;
  452. }
  453. }
  454. }
  455. }
  456. class FileUploadItem {
  457. final File file;
  458. final int collectionID;
  459. final Completer<File> completer;
  460. UploadStatus status;
  461. FileUploadItem(
  462. this.file,
  463. this.collectionID,
  464. this.completer, {
  465. this.status = UploadStatus.not_started,
  466. });
  467. }
  468. enum UploadStatus {
  469. not_started,
  470. in_progress,
  471. completed,
  472. }
  473. class InvalidFileError extends Error {}
  474. class WiFiUnavailableError extends Error {}
  475. class SyncStopRequestedError extends Error {}
  476. class NoActiveSubscriptionError extends Error {}
  477. class StorageLimitExceededError extends Error {}