file_uploader.dart 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718
  1. import 'dart:async';
  2. import 'dart:collection';
  3. import 'dart:convert';
  4. import 'dart:io' as io;
  5. import 'dart:math';
  6. import 'dart:typed_data';
  7. import 'package:connectivity/connectivity.dart';
  8. import 'package:dio/dio.dart';
  9. import 'package:flutter_sodium/flutter_sodium.dart';
  10. import 'package:logging/logging.dart';
  11. import 'package:path/path.dart';
  12. import 'package:photos/core/configuration.dart';
  13. import 'package:photos/core/errors.dart';
  14. import 'package:photos/core/event_bus.dart';
  15. import 'package:photos/core/network.dart';
  16. import 'package:photos/db/files_db.dart';
  17. import 'package:photos/db/upload_locks_db.dart';
  18. import 'package:photos/events/local_photos_updated_event.dart';
  19. import 'package:photos/events/subscription_purchased_event.dart';
  20. import 'package:photos/main.dart';
  21. import 'package:photos/models/encryption_result.dart';
  22. import 'package:photos/models/file.dart';
  23. import 'package:photos/models/upload_url.dart';
  24. import 'package:photos/services/collections_service.dart';
  25. import 'package:photos/services/local_sync_service.dart';
  26. import 'package:photos/services/sync_service.dart';
  27. import 'package:photos/utils/crypto_util.dart';
  28. import 'package:photos/utils/file_download_util.dart';
  29. import 'package:photos/utils/file_uploader_util.dart';
  30. import 'package:shared_preferences/shared_preferences.dart';
  31. class FileUploader {
  32. static const kMaximumConcurrentUploads = 4;
  33. static const kMaximumThumbnailCompressionAttempts = 2;
  34. static const kMaximumUploadAttempts = 4;
  35. static const kBlockedUploadsPollFrequency = Duration(seconds: 2);
  36. final _logger = Logger("FileUploader");
  37. final _dio = Network.instance.getDio();
  38. final _queue = LinkedHashMap<String, FileUploadItem>();
  39. final _uploadLocks = UploadLocksDB.instance;
  40. final kSafeBufferForLockExpiry = Duration(days: 1).inMicroseconds;
  41. final kBGTaskDeathTimeout = Duration(seconds: 5).inMicroseconds;
  42. final _uploadURLs = Queue<UploadURL>();
  43. // Maintains the count of files in the current upload session.
  44. // Upload session is the period between the first entry into the _queue and last entry out of the _queue
  45. int _totalCountInUploadSession = 0;
  46. int _currentlyUploading = 0;
  47. ProcessType _processType;
  48. bool _isBackground;
  49. SharedPreferences _prefs;
  50. FileUploader._privateConstructor() {
  51. Bus.instance.on<SubscriptionPurchasedEvent>().listen((event) {
  52. _uploadURLFetchInProgress = null;
  53. });
  54. }
  55. static FileUploader instance = FileUploader._privateConstructor();
  56. Future<void> init(bool isBackground) async {
  57. _prefs = await SharedPreferences.getInstance();
  58. _isBackground = isBackground;
  59. _processType =
  60. isBackground ? ProcessType.background : ProcessType.foreground;
  61. final currentTime = DateTime.now().microsecondsSinceEpoch;
  62. await _uploadLocks.releaseLocksAcquiredByOwnerBefore(
  63. _processType.toString(), currentTime);
  64. await _uploadLocks
  65. .releaseAllLocksAcquiredBefore(currentTime - kSafeBufferForLockExpiry);
  66. if (!isBackground) {
  67. await _prefs.reload();
  68. final isBGTaskDead = (_prefs.getInt(kLastBGTaskHeartBeatTime) ?? 0) <
  69. (currentTime - kBGTaskDeathTimeout);
  70. if (isBGTaskDead) {
  71. await _uploadLocks.releaseLocksAcquiredByOwnerBefore(
  72. ProcessType.background.toString(), currentTime);
  73. _logger.info("BG task was found dead, cleared all locks");
  74. }
  75. _pollBackgroundUploadStatus();
  76. }
  77. }
  78. Future<File> upload(File file, int collectionID) {
  79. // If the file hasn't been queued yet, queue it
  80. _totalCountInUploadSession++;
  81. if (!_queue.containsKey(file.localID)) {
  82. final completer = Completer<File>();
  83. _queue[file.localID] = FileUploadItem(file, collectionID, completer);
  84. _pollQueue();
  85. return completer.future;
  86. }
  87. // If the file exists in the queue for a matching collectionID,
  88. // return the existing future
  89. final item = _queue[file.localID];
  90. if (item.collectionID == collectionID) {
  91. _totalCountInUploadSession--;
  92. return item.completer.future;
  93. }
  94. // Else wait for the existing upload to complete,
  95. // and add it to the relevant collection
  96. return item.completer.future.then((uploadedFile) {
  97. return CollectionsService.instance
  98. .addToCollection(collectionID, [uploadedFile]).then((aVoid) {
  99. return uploadedFile;
  100. });
  101. });
  102. }
  103. Future<File> forceUpload(File file, int collectionID) async {
  104. _logger.info("Force uploading " +
  105. file.toString() +
  106. " into collection " +
  107. collectionID.toString());
  108. _totalCountInUploadSession++;
  109. // If the file hasn't been queued yet, ez.
  110. if (!_queue.containsKey(file.localID)) {
  111. final completer = Completer<File>();
  112. _queue[file.localID] = FileUploadItem(
  113. file,
  114. collectionID,
  115. completer,
  116. status: UploadStatus.in_progress,
  117. );
  118. _encryptAndUploadFileToCollection(file, collectionID, forcedUpload: true);
  119. return completer.future;
  120. }
  121. var item = _queue[file.localID];
  122. // If the file is being uploaded right now, wait and proceed
  123. if (item.status == UploadStatus.in_progress ||
  124. item.status == UploadStatus.in_background) {
  125. _totalCountInUploadSession--;
  126. final uploadedFile = await item.completer.future;
  127. if (uploadedFile.collectionID == collectionID) {
  128. // Do nothing
  129. } else {
  130. await CollectionsService.instance
  131. .addToCollection(collectionID, [uploadedFile]);
  132. }
  133. return uploadedFile;
  134. } else {
  135. // If the file is yet to be processed,
  136. // 1. Set the status to in_progress
  137. // 2. Force upload the file
  138. // 3. Add to the relevant collection
  139. item = _queue[file.localID];
  140. item.status = UploadStatus.in_progress;
  141. final uploadedFile = await _encryptAndUploadFileToCollection(
  142. file, collectionID,
  143. forcedUpload: true);
  144. if (item.collectionID == collectionID) {
  145. return uploadedFile;
  146. } else {
  147. await CollectionsService.instance
  148. .addToCollection(item.collectionID, [uploadedFile]);
  149. return uploadedFile;
  150. }
  151. }
  152. }
  153. int getCurrentSessionUploadCount() {
  154. return _totalCountInUploadSession;
  155. }
  156. void clearQueue(final Error reason) {
  157. final List<String> uploadsToBeRemoved = [];
  158. _queue.entries
  159. .where((entry) => entry.value.status == UploadStatus.not_started)
  160. .forEach((pendingUpload) {
  161. uploadsToBeRemoved.add(pendingUpload.key);
  162. });
  163. for (final id in uploadsToBeRemoved) {
  164. _queue.remove(id).completer.completeError(reason);
  165. }
  166. _totalCountInUploadSession = 0;
  167. }
  168. void removeFromQueueWhere(final bool Function(File) fn, final Error reason) {
  169. List<String> uploadsToBeRemoved = [];
  170. _queue.entries
  171. .where((entry) => entry.value.status == UploadStatus.not_started)
  172. .forEach((pendingUpload) {
  173. if (fn(pendingUpload.value.file)) {
  174. uploadsToBeRemoved.add(pendingUpload.key);
  175. }
  176. });
  177. for (final id in uploadsToBeRemoved) {
  178. _queue.remove(id).completer.completeError(reason);
  179. }
  180. _totalCountInUploadSession -= uploadsToBeRemoved.length;
  181. }
  182. void _pollQueue() {
  183. if (SyncService.instance.shouldStopSync()) {
  184. clearQueue(SyncStopRequestedError());
  185. }
  186. if (_queue.isEmpty) {
  187. // Upload session completed
  188. _totalCountInUploadSession = 0;
  189. return;
  190. }
  191. if (_currentlyUploading < kMaximumConcurrentUploads) {
  192. final firstPendingEntry = _queue.entries
  193. .firstWhere((entry) => entry.value.status == UploadStatus.not_started,
  194. orElse: () => null)
  195. ?.value;
  196. if (firstPendingEntry != null) {
  197. firstPendingEntry.status = UploadStatus.in_progress;
  198. _encryptAndUploadFileToCollection(
  199. firstPendingEntry.file, firstPendingEntry.collectionID);
  200. }
  201. }
  202. }
  203. Future<File> _encryptAndUploadFileToCollection(File file, int collectionID,
  204. {bool forcedUpload = false}) async {
  205. _currentlyUploading++;
  206. final localID = file.localID;
  207. try {
  208. final uploadedFile = await _tryToUpload(file, collectionID, forcedUpload);
  209. _queue.remove(localID).completer.complete(uploadedFile);
  210. return uploadedFile;
  211. } catch (e) {
  212. if (e is LockAlreadyAcquiredError) {
  213. _queue[localID].status = UploadStatus.in_background;
  214. return _queue[localID].completer.future;
  215. } else {
  216. _queue.remove(localID).completer.completeError(e);
  217. return null;
  218. }
  219. } finally {
  220. _currentlyUploading--;
  221. _pollQueue();
  222. }
  223. }
  224. Future<File> _tryToUpload(
  225. File file, int collectionID, bool forcedUpload) async {
  226. final connectivityResult = await (Connectivity().checkConnectivity());
  227. var canUploadUnderCurrentNetworkConditions =
  228. (connectivityResult == ConnectivityResult.wifi ||
  229. Configuration.instance.shouldBackupOverMobileData());
  230. if (!canUploadUnderCurrentNetworkConditions && !forcedUpload) {
  231. throw WiFiUnavailableError();
  232. }
  233. final fileOnDisk = await FilesDB.instance.getFile(file.generatedID);
  234. final wasAlreadyUploaded = fileOnDisk.uploadedFileID != null &&
  235. fileOnDisk.updationTime != -1 &&
  236. fileOnDisk.collectionID == collectionID;
  237. if (wasAlreadyUploaded) {
  238. return fileOnDisk;
  239. }
  240. try {
  241. await _uploadLocks.acquireLock(
  242. file.localID,
  243. _processType.toString(),
  244. DateTime.now().microsecondsSinceEpoch,
  245. );
  246. } catch (e) {
  247. _logger.warning("Lock was already taken for " + file.toString());
  248. throw LockAlreadyAcquiredError();
  249. }
  250. final tempDirectory = Configuration.instance.getTempDirectory();
  251. final encryptedFilePath = tempDirectory +
  252. file.generatedID.toString() +
  253. (_isBackground ? "_bg" : "") +
  254. ".encrypted";
  255. final encryptedThumbnailPath = tempDirectory +
  256. file.generatedID.toString() +
  257. "_thumbnail" +
  258. (_isBackground ? "_bg" : "") +
  259. ".encrypted";
  260. MediaUploadData mediaUploadData;
  261. try {
  262. _logger.info("Trying to upload " +
  263. file.toString() +
  264. ", isForced: " +
  265. forcedUpload.toString());
  266. try {
  267. mediaUploadData = await getUploadDataFromEnteFile(file);
  268. } catch (e) {
  269. if (e is InvalidFileError) {
  270. await _onInvalidFileError(file, e);
  271. } else {
  272. rethrow;
  273. }
  274. }
  275. Uint8List key;
  276. bool isUpdatedFile =
  277. file.uploadedFileID != null && file.updationTime == -1;
  278. if (isUpdatedFile) {
  279. _logger.info("File was updated " + file.toString());
  280. key = decryptFileKey(file);
  281. } else {
  282. key = null;
  283. }
  284. if (io.File(encryptedFilePath).existsSync()) {
  285. await io.File(encryptedFilePath).delete();
  286. }
  287. final encryptedFile = io.File(encryptedFilePath);
  288. final fileAttributes = await CryptoUtil.encryptFile(
  289. mediaUploadData.sourceFile.path,
  290. encryptedFilePath,
  291. key: key,
  292. );
  293. var thumbnailData = mediaUploadData.thumbnail;
  294. final encryptedThumbnailData =
  295. await CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key);
  296. if (io.File(encryptedThumbnailPath).existsSync()) {
  297. await io.File(encryptedThumbnailPath).delete();
  298. }
  299. final encryptedThumbnailFile = io.File(encryptedThumbnailPath);
  300. await encryptedThumbnailFile
  301. .writeAsBytes(encryptedThumbnailData.encryptedData);
  302. final thumbnailUploadURL = await _getUploadURL();
  303. String thumbnailObjectKey =
  304. await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
  305. final fileUploadURL = await _getUploadURL();
  306. String fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
  307. final metadata =
  308. await file.getMetadataForUpload(mediaUploadData.sourceFile);
  309. final encryptedMetadataData = await CryptoUtil.encryptChaCha(
  310. utf8.encode(jsonEncode(metadata)), fileAttributes.key);
  311. final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header);
  312. final thumbnailDecryptionHeader =
  313. Sodium.bin2base64(encryptedThumbnailData.header);
  314. final encryptedMetadata =
  315. Sodium.bin2base64(encryptedMetadataData.encryptedData);
  316. final metadataDecryptionHeader =
  317. Sodium.bin2base64(encryptedMetadataData.header);
  318. if (SyncService.instance.shouldStopSync()) {
  319. throw SyncStopRequestedError();
  320. }
  321. File remoteFile;
  322. if (isUpdatedFile) {
  323. remoteFile = await _updateFile(
  324. file,
  325. fileObjectKey,
  326. fileDecryptionHeader,
  327. await encryptedFile.length(),
  328. thumbnailObjectKey,
  329. thumbnailDecryptionHeader,
  330. await encryptedThumbnailFile.length(),
  331. encryptedMetadata,
  332. metadataDecryptionHeader,
  333. );
  334. // Update across all collections
  335. await FilesDB.instance.updateUploadedFileAcrossCollections(remoteFile);
  336. } else {
  337. final encryptedFileKeyData = CryptoUtil.encryptSync(
  338. fileAttributes.key,
  339. CollectionsService.instance.getCollectionKey(collectionID),
  340. );
  341. final encryptedKey =
  342. Sodium.bin2base64(encryptedFileKeyData.encryptedData);
  343. final keyDecryptionNonce =
  344. Sodium.bin2base64(encryptedFileKeyData.nonce);
  345. remoteFile = await _uploadFile(
  346. file,
  347. collectionID,
  348. encryptedKey,
  349. keyDecryptionNonce,
  350. fileAttributes,
  351. fileObjectKey,
  352. fileDecryptionHeader,
  353. await encryptedFile.length(),
  354. thumbnailObjectKey,
  355. thumbnailDecryptionHeader,
  356. await encryptedThumbnailFile.length(),
  357. encryptedMetadata,
  358. metadataDecryptionHeader,
  359. );
  360. if (mediaUploadData.isDeleted) {
  361. _logger.info("File found to be deleted");
  362. remoteFile.localID = null;
  363. }
  364. await FilesDB.instance.update(remoteFile);
  365. }
  366. if (!_isBackground) {
  367. Bus.instance.fire(LocalPhotosUpdatedEvent([remoteFile]));
  368. }
  369. _logger.info("File upload complete for " + remoteFile.toString());
  370. return remoteFile;
  371. } catch (e, s) {
  372. if (!(e is NoActiveSubscriptionError ||
  373. e is StorageLimitExceededError ||
  374. e is FileTooLargeForPlanError)) {
  375. _logger.severe("File upload failed for " + file.toString(), e, s);
  376. }
  377. rethrow;
  378. } finally {
  379. if (io.Platform.isIOS &&
  380. mediaUploadData != null &&
  381. mediaUploadData.sourceFile != null) {
  382. await mediaUploadData.sourceFile.delete();
  383. }
  384. if (io.File(encryptedFilePath).existsSync()) {
  385. await io.File(encryptedFilePath).delete();
  386. }
  387. if (io.File(encryptedThumbnailPath).existsSync()) {
  388. await io.File(encryptedThumbnailPath).delete();
  389. }
  390. await _uploadLocks.releaseLock(file.localID, _processType.toString());
  391. }
  392. }
  393. Future _onInvalidFileError(File file, InvalidFileError e) async {
  394. String ext = file.title == null ? "no title" : extension(file.title);
  395. _logger.severe(
  396. "Invalid file: (ext: $ext) encountered: " + file.toString(), e);
  397. await FilesDB.instance.deleteLocalFile(file);
  398. await LocalSyncService.instance.trackInvalidFile(file);
  399. throw e;
  400. }
  401. Future<File> _uploadFile(
  402. File file,
  403. int collectionID,
  404. String encryptedKey,
  405. String keyDecryptionNonce,
  406. EncryptionResult fileAttributes,
  407. String fileObjectKey,
  408. String fileDecryptionHeader,
  409. int fileSize,
  410. String thumbnailObjectKey,
  411. String thumbnailDecryptionHeader,
  412. int thumbnailSize,
  413. String encryptedMetadata,
  414. String metadataDecryptionHeader, {
  415. int attempt = 1,
  416. }) async {
  417. final request = {
  418. "collectionID": collectionID,
  419. "encryptedKey": encryptedKey,
  420. "keyDecryptionNonce": keyDecryptionNonce,
  421. "file": {
  422. "objectKey": fileObjectKey,
  423. "decryptionHeader": fileDecryptionHeader,
  424. "size": fileSize,
  425. },
  426. "thumbnail": {
  427. "objectKey": thumbnailObjectKey,
  428. "decryptionHeader": thumbnailDecryptionHeader,
  429. "size": thumbnailSize,
  430. },
  431. "metadata": {
  432. "encryptedData": encryptedMetadata,
  433. "decryptionHeader": metadataDecryptionHeader,
  434. }
  435. };
  436. try {
  437. final response = await _dio.post(
  438. Configuration.instance.getHttpEndpoint() + "/files",
  439. options: Options(
  440. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  441. data: request,
  442. );
  443. final data = response.data;
  444. file.uploadedFileID = data["id"];
  445. file.collectionID = collectionID;
  446. file.updationTime = data["updationTime"];
  447. file.ownerID = data["ownerID"];
  448. file.encryptedKey = encryptedKey;
  449. file.keyDecryptionNonce = keyDecryptionNonce;
  450. file.fileDecryptionHeader = fileDecryptionHeader;
  451. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  452. file.metadataDecryptionHeader = metadataDecryptionHeader;
  453. return file;
  454. } on DioError catch (e) {
  455. if (e.response?.statusCode == 413) {
  456. throw FileTooLargeForPlanError();
  457. } else if (e.response?.statusCode == 426) {
  458. _onStorageLimitExceeded();
  459. } else if (attempt < kMaximumUploadAttempts) {
  460. _logger.info("Upload file failed, will retry in 3 seconds");
  461. await Future.delayed(Duration(seconds: 3));
  462. return _uploadFile(
  463. file,
  464. collectionID,
  465. encryptedKey,
  466. keyDecryptionNonce,
  467. fileAttributes,
  468. fileObjectKey,
  469. fileDecryptionHeader,
  470. fileSize,
  471. thumbnailObjectKey,
  472. thumbnailDecryptionHeader,
  473. thumbnailSize,
  474. encryptedMetadata,
  475. metadataDecryptionHeader,
  476. attempt: attempt + 1,
  477. );
  478. }
  479. rethrow;
  480. }
  481. }
  482. Future<File> _updateFile(
  483. File file,
  484. String fileObjectKey,
  485. String fileDecryptionHeader,
  486. int fileSize,
  487. String thumbnailObjectKey,
  488. String thumbnailDecryptionHeader,
  489. int thumbnailSize,
  490. String encryptedMetadata,
  491. String metadataDecryptionHeader, {
  492. int attempt = 1,
  493. }) async {
  494. final request = {
  495. "id": file.uploadedFileID,
  496. "file": {
  497. "objectKey": fileObjectKey,
  498. "decryptionHeader": fileDecryptionHeader,
  499. "size": fileSize,
  500. },
  501. "thumbnail": {
  502. "objectKey": thumbnailObjectKey,
  503. "decryptionHeader": thumbnailDecryptionHeader,
  504. "size": thumbnailSize,
  505. },
  506. "metadata": {
  507. "encryptedData": encryptedMetadata,
  508. "decryptionHeader": metadataDecryptionHeader,
  509. }
  510. };
  511. try {
  512. final response = await _dio.post(
  513. Configuration.instance.getHttpEndpoint() + "/files",
  514. options: Options(
  515. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  516. data: request,
  517. );
  518. final data = response.data;
  519. file.uploadedFileID = data["id"];
  520. file.updationTime = data["updationTime"];
  521. file.fileDecryptionHeader = fileDecryptionHeader;
  522. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  523. file.metadataDecryptionHeader = metadataDecryptionHeader;
  524. return file;
  525. } on DioError catch (e) {
  526. if (e.response?.statusCode == 426) {
  527. _onStorageLimitExceeded();
  528. } else if (attempt < kMaximumUploadAttempts) {
  529. _logger.info("Update file failed, will retry in 3 seconds");
  530. await Future.delayed(Duration(seconds: 3));
  531. return _updateFile(
  532. file,
  533. fileObjectKey,
  534. fileDecryptionHeader,
  535. fileSize,
  536. thumbnailObjectKey,
  537. thumbnailDecryptionHeader,
  538. thumbnailSize,
  539. encryptedMetadata,
  540. metadataDecryptionHeader,
  541. attempt: attempt + 1,
  542. );
  543. }
  544. rethrow;
  545. }
  546. }
  547. Future<UploadURL> _getUploadURL() async {
  548. if (_uploadURLs.isEmpty) {
  549. await _fetchUploadURLs();
  550. }
  551. return _uploadURLs.removeFirst();
  552. }
  553. Future<void> _uploadURLFetchInProgress;
  554. Future<void> _fetchUploadURLs() async {
  555. _uploadURLFetchInProgress ??= Future<void>(() async {
  556. try {
  557. final response = await _dio.get(
  558. Configuration.instance.getHttpEndpoint() + "/files/upload-urls",
  559. queryParameters: {
  560. "count": min(42, 2 * _queue.length), // m4gic number
  561. },
  562. options: Options(
  563. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  564. );
  565. final urls = (response.data["urls"] as List)
  566. .map((e) => UploadURL.fromMap(e))
  567. .toList();
  568. _uploadURLs.addAll(urls);
  569. _uploadURLFetchInProgress = null;
  570. } on DioError catch (e) {
  571. _uploadURLFetchInProgress = null;
  572. if (e.response != null) {
  573. if (e.response.statusCode == 402) {
  574. final error = NoActiveSubscriptionError();
  575. clearQueue(error);
  576. throw error;
  577. } else if (e.response.statusCode == 426) {
  578. final error = StorageLimitExceededError();
  579. clearQueue(error);
  580. throw error;
  581. }
  582. }
  583. rethrow;
  584. }
  585. });
  586. return _uploadURLFetchInProgress;
  587. }
  588. void _onStorageLimitExceeded() {
  589. clearQueue(StorageLimitExceededError());
  590. throw StorageLimitExceededError();
  591. }
  592. Future<String> _putFile(
  593. UploadURL uploadURL,
  594. io.File file, {
  595. int contentLength,
  596. int attempt = 1,
  597. }) async {
  598. final fileSize = contentLength ?? await file.length();
  599. final startTime = DateTime.now().millisecondsSinceEpoch;
  600. try {
  601. await _dio.put(
  602. uploadURL.url,
  603. data: file.openRead(),
  604. options: Options(
  605. headers: {
  606. Headers.contentLengthHeader: fileSize,
  607. },
  608. ),
  609. );
  610. _logger.info("Upload speed : " +
  611. (fileSize / (DateTime.now().millisecondsSinceEpoch - startTime))
  612. .toString() +
  613. " kilo bytes per second");
  614. return uploadURL.objectKey;
  615. } on DioError catch (e) {
  616. if (e.message.startsWith(
  617. "HttpException: Content size exceeds specified contentLength.") &&
  618. attempt == 1) {
  619. return _putFile(uploadURL, file,
  620. contentLength: (await file.readAsBytes()).length, attempt: 2);
  621. } else if (attempt < kMaximumUploadAttempts) {
  622. final newUploadURL = await _getUploadURL();
  623. return _putFile(newUploadURL, file,
  624. contentLength: (await file.readAsBytes()).length,
  625. attempt: attempt + 1);
  626. } else {
  627. _logger.info(
  628. "Upload failed for file with size " + fileSize.toString(), e);
  629. rethrow;
  630. }
  631. }
  632. }
  633. Future<void> _pollBackgroundUploadStatus() async {
  634. final blockedUploads = _queue.entries
  635. .where((e) => e.value.status == UploadStatus.in_background)
  636. .toList();
  637. for (final upload in blockedUploads) {
  638. final file = upload.value.file;
  639. final isStillLocked = await _uploadLocks.isLocked(
  640. file.localID, ProcessType.background.toString());
  641. if (!isStillLocked) {
  642. final completer = _queue.remove(upload.key).completer;
  643. final dbFile =
  644. await FilesDB.instance.getFile(upload.value.file.generatedID);
  645. if (dbFile.uploadedFileID != null) {
  646. _logger.info("Background upload success detected");
  647. completer.complete(dbFile);
  648. } else {
  649. _logger.info("Background upload failure detected");
  650. completer.completeError(SilentlyCancelUploadsError());
  651. }
  652. }
  653. }
  654. Future.delayed(kBlockedUploadsPollFrequency, () async {
  655. await _pollBackgroundUploadStatus();
  656. });
  657. }
  658. }
  659. class FileUploadItem {
  660. final File file;
  661. final int collectionID;
  662. final Completer<File> completer;
  663. UploadStatus status;
  664. FileUploadItem(
  665. this.file,
  666. this.collectionID,
  667. this.completer, {
  668. this.status = UploadStatus.not_started,
  669. });
  670. }
  671. enum UploadStatus {
  672. not_started,
  673. in_progress,
  674. in_background,
  675. completed,
  676. }
  677. enum ProcessType {
  678. background,
  679. foreground,
  680. }