file_uploader.dart 16 KB


  1. import 'dart:async';
  2. import 'dart:collection';
  3. import 'dart:convert';
  4. import 'dart:io' as io;
  5. import 'package:connectivity/connectivity.dart';
  6. import 'package:dio/dio.dart';
  7. import 'package:flutter_sodium/flutter_sodium.dart';
  8. import 'package:logging/logging.dart';
  9. import 'package:photos/core/configuration.dart';
  10. import 'package:photos/core/constants.dart';
  11. import 'package:photos/core/event_bus.dart';
  12. import 'package:photos/core/network.dart';
  13. import 'package:photos/db/files_db.dart';
  14. import 'package:photos/events/subscription_purchased_event.dart';
  15. import 'package:photos/models/encryption_result.dart';
  16. import 'package:photos/models/file.dart';
  17. import 'package:photos/models/location.dart';
  18. import 'package:photos/models/upload_url.dart';
  19. import 'package:photos/services/collections_service.dart';
  20. import 'package:photos/services/sync_service.dart';
  21. import 'package:photos/utils/crypto_util.dart';
  22. import 'package:photos/utils/file_util.dart';
  23. class FileUploader {
  24. final _logger = Logger("FileUploader");
  25. final _dio = Network.instance.getDio();
  26. final _queue = LinkedHashMap<int, FileUploadItem>();
  27. final _maximumConcurrentUploads = 4;
  28. int _currentlyUploading = 0;
  29. final _uploadURLs = Queue<UploadURL>();
  30. FileUploader._privateConstructor() {
  31. Bus.instance.on<SubscriptionPurchasedEvent>().listen((event) {
  32. _uploadURLFetchInProgress = null;
  33. });
  34. }
  35. static FileUploader instance = FileUploader._privateConstructor();
  36. Future<File> upload(File file, int collectionID) {
  37. // If the file hasn't been queued yet, queue it
  38. if (!_queue.containsKey(file.generatedID)) {
  39. final completer = Completer<File>();
  40. _queue[file.generatedID] = FileUploadItem(file, collectionID, completer);
  41. _pollQueue();
  42. return completer.future;
  43. }
  44. // If the file exists in the queue for a matching collectionID,
  45. // return the existing future
  46. final item = _queue[file.generatedID];
  47. if (item.collectionID == collectionID) {
  48. return item.completer.future;
  49. }
  50. // Else wait for the existing upload to complete,
  51. // and add it to the relevant collection
  52. return item.completer.future.then((uploadedFile) {
  53. return CollectionsService.instance
  54. .addToCollection(collectionID, [uploadedFile]).then((aVoid) {
  55. return uploadedFile;
  56. });
  57. });
  58. }
  59. Future<File> forceUpload(File file, int collectionID) async {
  60. // If the file hasn't been queued yet, ez.
  61. if (!_queue.containsKey(file.generatedID)) {
  62. final completer = Completer<File>();
  63. _queue[file.generatedID] = FileUploadItem(
  64. file,
  65. collectionID,
  66. completer,
  67. status: UploadStatus.in_progress,
  68. );
  69. _encryptAndUploadFileToCollection(file, collectionID, forcedUpload: true);
  70. return completer.future;
  71. }
  72. var item = _queue[file.generatedID];
  73. // If the file is being uploaded right now, wait and proceed
  74. if (item.status == UploadStatus.in_progress) {
  75. return item.completer.future.then((uploadedFile) async {
  76. if (uploadedFile.collectionID == collectionID) {
  77. // Do nothing
  78. return uploadedFile;
  79. } else {
  80. return CollectionsService.instance
  81. .addToCollection(collectionID, [uploadedFile]).then((aVoid) {
  82. return uploadedFile;
  83. });
  84. }
  85. });
  86. } else {
  87. // If the file is yet to be processed,
  88. // 1. Remove it from the queue,
  89. // 2. Force upload the current file
  90. // 3. Trigger the callback for the original request
  91. item = _queue.remove(file.generatedID);
  92. return _encryptAndUploadFileToCollection(file, collectionID,
  93. forcedUpload: true)
  94. .then((uploadedFile) {
  95. if (item.collectionID == collectionID) {
  96. item.completer.complete(uploadedFile);
  97. return uploadedFile;
  98. } else {
  99. CollectionsService.instance
  100. .addToCollection(item.collectionID, [uploadedFile]).then((aVoid) {
  101. item.completer.complete(uploadedFile);
  102. });
  103. return uploadedFile;
  104. }
  105. });
  106. }
  107. }
  108. void clearQueue() {
  109. final uploadsToBeRemoved = List<int>();
  110. _queue.entries
  111. .where((entry) => entry.value.status == UploadStatus.not_started)
  112. .forEach((pendingUpload) {
  113. uploadsToBeRemoved.add(pendingUpload.key);
  114. });
  115. for (final id in uploadsToBeRemoved) {
  116. _queue.remove(id).completer.completeError(SyncStopRequestedError());
  117. }
  118. }
  119. void _pollQueue() {
  120. if (SyncService.instance.shouldStopSync()) {
  121. clearQueue();
  122. }
  123. if (_queue.length > 0 && _currentlyUploading < _maximumConcurrentUploads) {
  124. final firstPendingEntry = _queue.entries
  125. .firstWhere((entry) => entry.value.status == UploadStatus.not_started,
  126. orElse: () => null)
  127. ?.value;
  128. if (firstPendingEntry != null) {
  129. firstPendingEntry.status = UploadStatus.in_progress;
  130. _encryptAndUploadFileToCollection(
  131. firstPendingEntry.file, firstPendingEntry.collectionID);
  132. }
  133. }
  134. }
  135. Future<File> _encryptAndUploadFileToCollection(File file, int collectionID,
  136. {bool forcedUpload = false}) async {
  137. _currentlyUploading++;
  138. try {
  139. final uploadedFile = await _tryToUpload(file, collectionID, forcedUpload);
  140. _queue.remove(file.generatedID).completer.complete(uploadedFile);
  141. } catch (e) {
  142. _queue.remove(file.generatedID).completer.completeError(e);
  143. } finally {
  144. _currentlyUploading--;
  145. _pollQueue();
  146. }
  147. return null;
  148. }
  149. Future<File> _tryToUpload(
  150. File file, int collectionID, bool forcedUpload) async {
  151. final connectivityResult = await (Connectivity().checkConnectivity());
  152. var canUploadUnderCurrentNetworkConditions =
  153. (connectivityResult == ConnectivityResult.wifi ||
  154. Configuration.instance.shouldBackupOverMobileData());
  155. if (!canUploadUnderCurrentNetworkConditions && !forcedUpload) {
  156. throw WiFiUnavailableError();
  157. }
  158. final tempDirectory = Configuration.instance.getTempDirectory();
  159. final encryptedFilePath =
  160. tempDirectory + file.generatedID.toString() + ".encrypted";
  161. final encryptedThumbnailPath =
  162. tempDirectory + file.generatedID.toString() + "_thumbnail.encrypted";
  163. var sourceFile;
  164. try {
  165. // Placing this in the try-catch block to safe guard against: https://github.com/CaiJingLong/flutter_photo_manager/issues/405
  166. sourceFile = (await (await file.getAsset()).originFile);
  167. var key;
  168. var isAlreadyUploadedFile = file.uploadedFileID != null;
  169. if (isAlreadyUploadedFile) {
  170. key = decryptFileKey(file);
  171. } else {
  172. key = null;
  173. }
  174. if (io.File(encryptedFilePath).existsSync()) {
  175. io.File(encryptedFilePath).deleteSync();
  176. }
  177. final encryptedFile = io.File(encryptedFilePath);
  178. final fileAttributes = await CryptoUtil.encryptFile(
  179. sourceFile.path,
  180. encryptedFilePath,
  181. key: key,
  182. );
  183. var thumbnailData = (await (await file.getAsset()).thumbDataWithSize(
  184. THUMBNAIL_LARGE_SIZE,
  185. THUMBNAIL_LARGE_SIZE,
  186. quality: 50,
  187. ));
  188. if (thumbnailData == null) {
  189. _logger.severe("Could not generate thumbnail for " + file.toString());
  190. await FilesDB.instance.deleteLocalFile(file.localID);
  191. throw InvalidFileError();
  192. }
  193. final thumbnailSize = thumbnailData.length;
  194. if (thumbnailSize > THUMBNAIL_DATA_LIMIT) {
  195. thumbnailData = await compressThumbnail(thumbnailData);
  196. _logger.info("Thumbnail size " + thumbnailSize.toString());
  197. _logger.info(
  198. "Compressed thumbnail size " + thumbnailData.length.toString());
  199. }
  200. final encryptedThumbnailData =
  201. CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key);
  202. if (io.File(encryptedThumbnailPath).existsSync()) {
  203. io.File(encryptedThumbnailPath).deleteSync();
  204. }
  205. final encryptedThumbnailFile = io.File(encryptedThumbnailPath);
  206. encryptedThumbnailFile
  207. .writeAsBytesSync(encryptedThumbnailData.encryptedData);
  208. final fileUploadURL = await _getUploadURL();
  209. String fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
  210. final thumbnailUploadURL = await _getUploadURL();
  211. String thumbnailObjectKey =
  212. await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
  213. // h4ck to fetch location data if missing (thank you Android Q+) lazily only during uploads
  214. if (file.location == null ||
  215. (file.location.latitude == 0 && file.location.longitude == 0)) {
  216. final latLong = await (await file.getAsset()).latlngAsync();
  217. file.location = Location(latLong.latitude, latLong.longitude);
  218. }
  219. final encryptedMetadataData = CryptoUtil.encryptChaCha(
  220. utf8.encode(jsonEncode(file.getMetadata())), fileAttributes.key);
  221. final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header);
  222. final thumbnailDecryptionHeader =
  223. Sodium.bin2base64(encryptedThumbnailData.header);
  224. final encryptedMetadata =
  225. Sodium.bin2base64(encryptedMetadataData.encryptedData);
  226. final metadataDecryptionHeader =
  227. Sodium.bin2base64(encryptedMetadataData.header);
  228. if (isAlreadyUploadedFile) {
  229. final updatedFile = await _updateFile(
  230. file,
  231. fileObjectKey,
  232. fileDecryptionHeader,
  233. thumbnailObjectKey,
  234. thumbnailDecryptionHeader,
  235. encryptedMetadata,
  236. metadataDecryptionHeader,
  237. );
  238. // Update across all collections
  239. await FilesDB.instance.updateUploadedFileAcrossCollections(updatedFile);
  240. return updatedFile;
  241. } else {
  242. final uploadedFile = await _uploadFile(
  243. file,
  244. collectionID,
  245. fileAttributes,
  246. fileObjectKey,
  247. fileDecryptionHeader,
  248. thumbnailObjectKey,
  249. thumbnailDecryptionHeader,
  250. encryptedMetadata,
  251. metadataDecryptionHeader,
  252. );
  253. await FilesDB.instance.update(uploadedFile);
  254. return uploadedFile;
  255. }
  256. } catch (e, s) {
  257. if (!(e is NoActiveSubscriptionError)) {
  258. _logger.severe(
  259. "File upload failed for " + file.generatedID.toString(), e, s);
  260. }
  261. throw e;
  262. } finally {
  263. if (io.Platform.isIOS && sourceFile != null) {
  264. sourceFile.deleteSync();
  265. }
  266. if (io.File(encryptedFilePath).existsSync()) {
  267. io.File(encryptedFilePath).deleteSync();
  268. }
  269. if (io.File(encryptedThumbnailPath).existsSync()) {
  270. io.File(encryptedThumbnailPath).deleteSync();
  271. }
  272. }
  273. }
  274. Future<File> _uploadFile(
  275. File file,
  276. int collectionID,
  277. EncryptionResult fileAttributes,
  278. String fileObjectKey,
  279. String fileDecryptionHeader,
  280. String thumbnailObjectKey,
  281. String thumbnailDecryptionHeader,
  282. String encryptedMetadata,
  283. String metadataDecryptionHeader,
  284. ) async {
  285. final encryptedFileKeyData = CryptoUtil.encryptSync(
  286. fileAttributes.key,
  287. CollectionsService.instance.getCollectionKey(collectionID),
  288. );
  289. final encryptedKey = Sodium.bin2base64(encryptedFileKeyData.encryptedData);
  290. final keyDecryptionNonce = Sodium.bin2base64(encryptedFileKeyData.nonce);
  291. final request = {
  292. "collectionID": collectionID,
  293. "encryptedKey": encryptedKey,
  294. "keyDecryptionNonce": keyDecryptionNonce,
  295. "file": {
  296. "objectKey": fileObjectKey,
  297. "decryptionHeader": fileDecryptionHeader,
  298. },
  299. "thumbnail": {
  300. "objectKey": thumbnailObjectKey,
  301. "decryptionHeader": thumbnailDecryptionHeader,
  302. },
  303. "metadata": {
  304. "encryptedData": encryptedMetadata,
  305. "decryptionHeader": metadataDecryptionHeader,
  306. }
  307. };
  308. try {
  309. final response = await _dio.post(
  310. Configuration.instance.getHttpEndpoint() + "/files",
  311. options: Options(
  312. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  313. data: request,
  314. );
  315. final data = response.data;
  316. file.uploadedFileID = data["id"];
  317. file.collectionID = collectionID;
  318. file.updationTime = data["updationTime"];
  319. file.ownerID = data["ownerID"];
  320. file.encryptedKey = encryptedKey;
  321. file.keyDecryptionNonce = keyDecryptionNonce;
  322. file.fileDecryptionHeader = fileDecryptionHeader;
  323. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  324. file.metadataDecryptionHeader = metadataDecryptionHeader;
  325. return file;
  326. } on DioError catch (e) {
  327. if (e.response.statusCode == 426) {
  328. throw StorageLimitExceededError();
  329. }
  330. throw e;
  331. }
  332. }
  333. Future<File> _updateFile(
  334. File file,
  335. String fileObjectKey,
  336. String fileDecryptionHeader,
  337. String thumbnailObjectKey,
  338. String thumbnailDecryptionHeader,
  339. String encryptedMetadata,
  340. String metadataDecryptionHeader,
  341. ) async {
  342. final request = {
  343. "id": file.uploadedFileID,
  344. "file": {
  345. "objectKey": fileObjectKey,
  346. "decryptionHeader": fileDecryptionHeader,
  347. },
  348. "thumbnail": {
  349. "objectKey": thumbnailObjectKey,
  350. "decryptionHeader": thumbnailDecryptionHeader,
  351. },
  352. "metadata": {
  353. "encryptedData": encryptedMetadata,
  354. "decryptionHeader": metadataDecryptionHeader,
  355. }
  356. };
  357. try {
  358. final response = await _dio.post(
  359. Configuration.instance.getHttpEndpoint() + "/files",
  360. options: Options(
  361. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  362. data: request,
  363. );
  364. final data = response.data;
  365. file.uploadedFileID = data["id"];
  366. file.updationTime = data["updationTime"];
  367. file.fileDecryptionHeader = fileDecryptionHeader;
  368. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  369. file.metadataDecryptionHeader = metadataDecryptionHeader;
  370. return file;
  371. } on DioError catch (e) {
  372. if (e.response.statusCode == 426) {
  373. throw StorageLimitExceededError();
  374. }
  375. throw e;
  376. }
  377. }
  378. Future<UploadURL> _getUploadURL() async {
  379. if (_uploadURLs.isEmpty) {
  380. await _fetchUploadURLs();
  381. }
  382. return _uploadURLs.removeFirst();
  383. }
  384. Future<void> _uploadURLFetchInProgress;
  385. Future<void> _fetchUploadURLs() async {
  386. if (_uploadURLFetchInProgress == null) {
  387. final completer = Completer<void>();
  388. _uploadURLFetchInProgress = completer.future;
  389. try {
  390. final response = await _dio.get(
  391. Configuration.instance.getHttpEndpoint() + "/files/upload-urls",
  392. queryParameters: {
  393. "count": 42, // m4gic number
  394. },
  395. options: Options(
  396. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  397. );
  398. final urls = (response.data["urls"] as List)
  399. .map((e) => UploadURL.fromMap(e))
  400. .toList();
  401. _uploadURLs.addAll(urls);
  402. } on DioError catch (e) {
  403. if (e.response.statusCode == 402) {
  404. throw NoActiveSubscriptionError();
  405. }
  406. throw e;
  407. }
  408. _uploadURLFetchInProgress = null;
  409. completer.complete();
  410. }
  411. return _uploadURLFetchInProgress;
  412. }
  413. Future<String> _putFile(UploadURL uploadURL, io.File file) async {
  414. final fileSize = file.lengthSync().toString();
  415. final startTime = DateTime.now().millisecondsSinceEpoch;
  416. _logger.info("Putting file of size " + fileSize + " to " + uploadURL.url);
  417. await _dio.put(uploadURL.url,
  418. data: file.openRead(),
  419. options: Options(headers: {
  420. Headers.contentLengthHeader: await file.length(),
  421. }));
  422. _logger.info("Upload speed : " +
  423. (file.lengthSync() /
  424. (DateTime.now().millisecondsSinceEpoch - startTime))
  425. .toString() +
  426. " kilo bytes per second");
  427. return uploadURL.objectKey;
  428. }
  429. }
  430. class FileUploadItem {
  431. final File file;
  432. final int collectionID;
  433. final Completer<File> completer;
  434. UploadStatus status;
  435. FileUploadItem(
  436. this.file,
  437. this.collectionID,
  438. this.completer, {
  439. this.status = UploadStatus.not_started,
  440. });
  441. }
  442. enum UploadStatus {
  443. not_started,
  444. in_progress,
  445. completed,
  446. }
  447. class InvalidFileError extends Error {}
  448. class WiFiUnavailableError extends Error {}
  449. class SyncStopRequestedError extends Error {}
  450. class NoActiveSubscriptionError extends Error {}
  451. class StorageLimitExceededError extends Error {}