file_uploader.dart 24 KB


  1. import 'dart:async';
  2. import 'dart:collection';
  3. import 'dart:convert';
  4. import 'dart:io' as io;
  5. import 'dart:math';
  6. import 'dart:typed_data';
  7. import 'package:connectivity/connectivity.dart';
  8. import 'package:dio/dio.dart';
  9. import 'package:flutter_sodium/flutter_sodium.dart';
  10. import 'package:logging/logging.dart';
  11. import 'package:path/path.dart';
  12. import 'package:photos/core/configuration.dart';
  13. import 'package:photos/core/errors.dart';
  14. import 'package:photos/core/event_bus.dart';
  15. import 'package:photos/core/network.dart';
  16. import 'package:photos/db/files_db.dart';
  17. import 'package:photos/db/upload_locks_db.dart';
  18. import 'package:photos/events/local_photos_updated_event.dart';
  19. import 'package:photos/events/subscription_purchased_event.dart';
  20. import 'package:photos/main.dart';
  21. import 'package:photos/models/encryption_result.dart';
  22. import 'package:photos/models/file.dart';
  23. import 'package:photos/models/upload_url.dart';
  24. import 'package:photos/services/collections_service.dart';
  25. import 'package:photos/services/local_sync_service.dart';
  26. import 'package:photos/services/sync_service.dart';
  27. import 'package:photos/utils/crypto_util.dart';
  28. import 'package:photos/utils/file_download_util.dart';
  29. import 'package:photos/utils/file_uploader_util.dart';
  30. import 'package:shared_preferences/shared_preferences.dart';
  31. class FileUploader {
  32. static const kMaximumConcurrentUploads = 4;
  33. static const kMaximumThumbnailCompressionAttempts = 2;
  34. static const kMaximumUploadAttempts = 4;
  35. static const kBlockedUploadsPollFrequency = Duration(seconds: 2);
  36. static const kFileUploadTimeout = Duration(minutes: 50);
  37. final _logger = Logger("FileUploader");
  38. final _dio = Network.instance.getDio();
  39. final _queue = LinkedHashMap<String, FileUploadItem>();
  40. final _uploadLocks = UploadLocksDB.instance;
  41. final kSafeBufferForLockExpiry = Duration(days: 1).inMicroseconds;
  42. final kBGTaskDeathTimeout = Duration(seconds: 5).inMicroseconds;
  43. final _uploadURLs = Queue<UploadURL>();
  44. // Maintains the count of files in the current upload session.
  45. // Upload session is the period between the first entry into the _queue and last entry out of the _queue
  46. int _totalCountInUploadSession = 0;
  47. int _currentlyUploading = 0;
  48. ProcessType _processType;
  49. bool _isBackground;
  50. SharedPreferences _prefs;
  51. FileUploader._privateConstructor() {
  52. Bus.instance.on<SubscriptionPurchasedEvent>().listen((event) {
  53. _uploadURLFetchInProgress = null;
  54. });
  55. }
  56. static FileUploader instance = FileUploader._privateConstructor();
  57. Future<void> init(bool isBackground) async {
  58. _prefs = await SharedPreferences.getInstance();
  59. _isBackground = isBackground;
  60. _processType =
  61. isBackground ? ProcessType.background : ProcessType.foreground;
  62. final currentTime = DateTime.now().microsecondsSinceEpoch;
  63. await _uploadLocks.releaseLocksAcquiredByOwnerBefore(
  64. _processType.toString(), currentTime);
  65. await _uploadLocks
  66. .releaseAllLocksAcquiredBefore(currentTime - kSafeBufferForLockExpiry);
  67. if (!isBackground) {
  68. await _prefs.reload();
  69. final isBGTaskDead = (_prefs.getInt(kLastBGTaskHeartBeatTime) ?? 0) <
  70. (currentTime - kBGTaskDeathTimeout);
  71. if (isBGTaskDead) {
  72. await _uploadLocks.releaseLocksAcquiredByOwnerBefore(
  73. ProcessType.background.toString(), currentTime);
  74. _logger.info("BG task was found dead, cleared all locks");
  75. }
  76. _pollBackgroundUploadStatus();
  77. }
  78. }
  79. Future<File> upload(File file, int collectionID) {
  80. // If the file hasn't been queued yet, queue it
  81. _totalCountInUploadSession++;
  82. if (!_queue.containsKey(file.localID)) {
  83. final completer = Completer<File>();
  84. _queue[file.localID] = FileUploadItem(file, collectionID, completer);
  85. _pollQueue();
  86. return completer.future;
  87. }
  88. // If the file exists in the queue for a matching collectionID,
  89. // return the existing future
  90. final item = _queue[file.localID];
  91. if (item.collectionID == collectionID) {
  92. _totalCountInUploadSession--;
  93. return item.completer.future;
  94. }
  95. // Else wait for the existing upload to complete,
  96. // and add it to the relevant collection
  97. return item.completer.future.then((uploadedFile) {
  98. return CollectionsService.instance
  99. .addToCollection(collectionID, [uploadedFile]).then((aVoid) {
  100. return uploadedFile;
  101. });
  102. });
  103. }
  104. Future<File> forceUpload(File file, int collectionID) async {
  105. _logger.info("Force uploading " +
  106. file.toString() +
  107. " into collection " +
  108. collectionID.toString());
  109. _totalCountInUploadSession++;
  110. // If the file hasn't been queued yet, ez.
  111. if (!_queue.containsKey(file.localID)) {
  112. final completer = Completer<File>();
  113. _queue[file.localID] = FileUploadItem(
  114. file,
  115. collectionID,
  116. completer,
  117. status: UploadStatus.in_progress,
  118. );
  119. _encryptAndUploadFileToCollection(file, collectionID, forcedUpload: true);
  120. return completer.future;
  121. }
  122. var item = _queue[file.localID];
  123. // If the file is being uploaded right now, wait and proceed
  124. if (item.status == UploadStatus.in_progress ||
  125. item.status == UploadStatus.in_background) {
  126. _totalCountInUploadSession--;
  127. final uploadedFile = await item.completer.future;
  128. if (uploadedFile.collectionID == collectionID) {
  129. // Do nothing
  130. } else {
  131. await CollectionsService.instance
  132. .addToCollection(collectionID, [uploadedFile]);
  133. }
  134. return uploadedFile;
  135. } else {
  136. // If the file is yet to be processed,
  137. // 1. Set the status to in_progress
  138. // 2. Force upload the file
  139. // 3. Add to the relevant collection
  140. item = _queue[file.localID];
  141. item.status = UploadStatus.in_progress;
  142. final uploadedFile = await _encryptAndUploadFileToCollection(
  143. file, collectionID,
  144. forcedUpload: true);
  145. if (item.collectionID == collectionID) {
  146. return uploadedFile;
  147. } else {
  148. await CollectionsService.instance
  149. .addToCollection(item.collectionID, [uploadedFile]);
  150. return uploadedFile;
  151. }
  152. }
  153. }
  154. int getCurrentSessionUploadCount() {
  155. return _totalCountInUploadSession;
  156. }
  157. void clearQueue(final Error reason) {
  158. final List<String> uploadsToBeRemoved = [];
  159. _queue.entries
  160. .where((entry) => entry.value.status == UploadStatus.not_started)
  161. .forEach((pendingUpload) {
  162. uploadsToBeRemoved.add(pendingUpload.key);
  163. });
  164. for (final id in uploadsToBeRemoved) {
  165. _queue.remove(id).completer.completeError(reason);
  166. }
  167. _totalCountInUploadSession = 0;
  168. }
  169. void removeFromQueueWhere(final bool Function(File) fn, final Error reason) {
  170. List<String> uploadsToBeRemoved = [];
  171. _queue.entries
  172. .where((entry) => entry.value.status == UploadStatus.not_started)
  173. .forEach((pendingUpload) {
  174. if (fn(pendingUpload.value.file)) {
  175. uploadsToBeRemoved.add(pendingUpload.key);
  176. }
  177. });
  178. for (final id in uploadsToBeRemoved) {
  179. _queue.remove(id).completer.completeError(reason);
  180. }
  181. _totalCountInUploadSession -= uploadsToBeRemoved.length;
  182. }
  183. void _pollQueue() {
  184. if (SyncService.instance.shouldStopSync()) {
  185. clearQueue(SyncStopRequestedError());
  186. }
  187. if (_queue.isEmpty) {
  188. // Upload session completed
  189. _totalCountInUploadSession = 0;
  190. return;
  191. }
  192. if (_currentlyUploading < kMaximumConcurrentUploads) {
  193. final firstPendingEntry = _queue.entries
  194. .firstWhere((entry) => entry.value.status == UploadStatus.not_started,
  195. orElse: () => null)
  196. ?.value;
  197. if (firstPendingEntry != null) {
  198. firstPendingEntry.status = UploadStatus.in_progress;
  199. _encryptAndUploadFileToCollection(
  200. firstPendingEntry.file, firstPendingEntry.collectionID);
  201. }
  202. }
  203. }
  204. Future<File> _encryptAndUploadFileToCollection(File file, int collectionID,
  205. {bool forcedUpload = false}) async {
  206. _currentlyUploading++;
  207. final localID = file.localID;
  208. try {
  209. final uploadedFile = await _tryToUpload(file, collectionID, forcedUpload)
  210. .timeout(kFileUploadTimeout, onTimeout: () {
  211. final message = "Upload timed out for file " + file.toString();
  212. _logger.severe(message);
  213. throw TimeoutException(message);
  214. });
  215. _queue.remove(localID).completer.complete(uploadedFile);
  216. return uploadedFile;
  217. } catch (e) {
  218. if (e is LockAlreadyAcquiredError) {
  219. _queue[localID].status = UploadStatus.in_background;
  220. return _queue[localID].completer.future;
  221. } else {
  222. _queue.remove(localID).completer.completeError(e);
  223. return null;
  224. }
  225. } finally {
  226. _currentlyUploading--;
  227. _pollQueue();
  228. }
  229. }
  230. Future<File> _tryToUpload(
  231. File file, int collectionID, bool forcedUpload) async {
  232. final connectivityResult = await (Connectivity().checkConnectivity());
  233. var canUploadUnderCurrentNetworkConditions =
  234. (connectivityResult == ConnectivityResult.wifi ||
  235. Configuration.instance.shouldBackupOverMobileData());
  236. if (!canUploadUnderCurrentNetworkConditions && !forcedUpload) {
  237. throw WiFiUnavailableError();
  238. }
  239. final fileOnDisk = await FilesDB.instance.getFile(file.generatedID);
  240. final wasAlreadyUploaded = fileOnDisk.uploadedFileID != null &&
  241. fileOnDisk.updationTime != -1 &&
  242. fileOnDisk.collectionID == collectionID;
  243. if (wasAlreadyUploaded) {
  244. return fileOnDisk;
  245. }
  246. try {
  247. await _uploadLocks.acquireLock(
  248. file.localID,
  249. _processType.toString(),
  250. DateTime.now().microsecondsSinceEpoch,
  251. );
  252. } catch (e) {
  253. _logger.warning("Lock was already taken for " + file.toString());
  254. throw LockAlreadyAcquiredError();
  255. }
  256. final tempDirectory = Configuration.instance.getTempDirectory();
  257. final encryptedFilePath = tempDirectory +
  258. file.generatedID.toString() +
  259. (_isBackground ? "_bg" : "") +
  260. ".encrypted";
  261. final encryptedThumbnailPath = tempDirectory +
  262. file.generatedID.toString() +
  263. "_thumbnail" +
  264. (_isBackground ? "_bg" : "") +
  265. ".encrypted";
  266. MediaUploadData mediaUploadData;
  267. try {
  268. _logger.info("Trying to upload " +
  269. file.toString() +
  270. ", isForced: " +
  271. forcedUpload.toString());
  272. try {
  273. mediaUploadData = await getUploadDataFromEnteFile(file);
  274. } catch (e) {
  275. if (e is InvalidFileError) {
  276. await _onInvalidFileError(file, e);
  277. } else {
  278. rethrow;
  279. }
  280. }
  281. Uint8List key;
  282. bool isUpdatedFile =
  283. file.uploadedFileID != null && file.updationTime == -1;
  284. if (isUpdatedFile) {
  285. _logger.info("File was updated " + file.toString());
  286. key = decryptFileKey(file);
  287. } else {
  288. key = null;
  289. }
  290. if (io.File(encryptedFilePath).existsSync()) {
  291. await io.File(encryptedFilePath).delete();
  292. }
  293. final encryptedFile = io.File(encryptedFilePath);
  294. final fileAttributes = await CryptoUtil.encryptFile(
  295. mediaUploadData.sourceFile.path,
  296. encryptedFilePath,
  297. key: key,
  298. );
  299. var thumbnailData = mediaUploadData.thumbnail;
  300. final encryptedThumbnailData =
  301. await CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key);
  302. if (io.File(encryptedThumbnailPath).existsSync()) {
  303. await io.File(encryptedThumbnailPath).delete();
  304. }
  305. final encryptedThumbnailFile = io.File(encryptedThumbnailPath);
  306. await encryptedThumbnailFile
  307. .writeAsBytes(encryptedThumbnailData.encryptedData);
  308. final thumbnailUploadURL = await _getUploadURL();
  309. String thumbnailObjectKey =
  310. await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
  311. final fileUploadURL = await _getUploadURL();
  312. String fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
  313. final metadata =
  314. await file.getMetadataForUpload(mediaUploadData.sourceFile);
  315. final encryptedMetadataData = await CryptoUtil.encryptChaCha(
  316. utf8.encode(jsonEncode(metadata)), fileAttributes.key);
  317. final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header);
  318. final thumbnailDecryptionHeader =
  319. Sodium.bin2base64(encryptedThumbnailData.header);
  320. final encryptedMetadata =
  321. Sodium.bin2base64(encryptedMetadataData.encryptedData);
  322. final metadataDecryptionHeader =
  323. Sodium.bin2base64(encryptedMetadataData.header);
  324. if (SyncService.instance.shouldStopSync()) {
  325. throw SyncStopRequestedError();
  326. }
  327. File remoteFile;
  328. if (isUpdatedFile) {
  329. remoteFile = await _updateFile(
  330. file,
  331. fileObjectKey,
  332. fileDecryptionHeader,
  333. await encryptedFile.length(),
  334. thumbnailObjectKey,
  335. thumbnailDecryptionHeader,
  336. await encryptedThumbnailFile.length(),
  337. encryptedMetadata,
  338. metadataDecryptionHeader,
  339. );
  340. // Update across all collections
  341. await FilesDB.instance.updateUploadedFileAcrossCollections(remoteFile);
  342. } else {
  343. final encryptedFileKeyData = CryptoUtil.encryptSync(
  344. fileAttributes.key,
  345. CollectionsService.instance.getCollectionKey(collectionID),
  346. );
  347. final encryptedKey =
  348. Sodium.bin2base64(encryptedFileKeyData.encryptedData);
  349. final keyDecryptionNonce =
  350. Sodium.bin2base64(encryptedFileKeyData.nonce);
  351. remoteFile = await _uploadFile(
  352. file,
  353. collectionID,
  354. encryptedKey,
  355. keyDecryptionNonce,
  356. fileAttributes,
  357. fileObjectKey,
  358. fileDecryptionHeader,
  359. await encryptedFile.length(),
  360. thumbnailObjectKey,
  361. thumbnailDecryptionHeader,
  362. await encryptedThumbnailFile.length(),
  363. encryptedMetadata,
  364. metadataDecryptionHeader,
  365. );
  366. if (mediaUploadData.isDeleted) {
  367. _logger.info("File found to be deleted");
  368. remoteFile.localID = null;
  369. }
  370. await FilesDB.instance.update(remoteFile);
  371. }
  372. if (!_isBackground) {
  373. Bus.instance.fire(LocalPhotosUpdatedEvent([remoteFile]));
  374. }
  375. _logger.info("File upload complete for " + remoteFile.toString());
  376. return remoteFile;
  377. } catch (e, s) {
  378. if (!(e is NoActiveSubscriptionError ||
  379. e is StorageLimitExceededError ||
  380. e is FileTooLargeForPlanError)) {
  381. _logger.severe("File upload failed for " + file.toString(), e, s);
  382. }
  383. rethrow;
  384. } finally {
  385. if (io.Platform.isIOS &&
  386. mediaUploadData != null &&
  387. mediaUploadData.sourceFile != null) {
  388. await mediaUploadData.sourceFile.delete();
  389. }
  390. if (io.File(encryptedFilePath).existsSync()) {
  391. await io.File(encryptedFilePath).delete();
  392. }
  393. if (io.File(encryptedThumbnailPath).existsSync()) {
  394. await io.File(encryptedThumbnailPath).delete();
  395. }
  396. await _uploadLocks.releaseLock(file.localID, _processType.toString());
  397. }
  398. }
  399. Future _onInvalidFileError(File file, InvalidFileError e) async {
  400. String ext = file.title == null ? "no title" : extension(file.title);
  401. _logger.severe(
  402. "Invalid file: (ext: $ext) encountered: " + file.toString(), e);
  403. await FilesDB.instance.deleteLocalFile(file);
  404. await LocalSyncService.instance.trackInvalidFile(file);
  405. throw e;
  406. }
  407. Future<File> _uploadFile(
  408. File file,
  409. int collectionID,
  410. String encryptedKey,
  411. String keyDecryptionNonce,
  412. EncryptionResult fileAttributes,
  413. String fileObjectKey,
  414. String fileDecryptionHeader,
  415. int fileSize,
  416. String thumbnailObjectKey,
  417. String thumbnailDecryptionHeader,
  418. int thumbnailSize,
  419. String encryptedMetadata,
  420. String metadataDecryptionHeader, {
  421. int attempt = 1,
  422. }) async {
  423. final request = {
  424. "collectionID": collectionID,
  425. "encryptedKey": encryptedKey,
  426. "keyDecryptionNonce": keyDecryptionNonce,
  427. "file": {
  428. "objectKey": fileObjectKey,
  429. "decryptionHeader": fileDecryptionHeader,
  430. "size": fileSize,
  431. },
  432. "thumbnail": {
  433. "objectKey": thumbnailObjectKey,
  434. "decryptionHeader": thumbnailDecryptionHeader,
  435. "size": thumbnailSize,
  436. },
  437. "metadata": {
  438. "encryptedData": encryptedMetadata,
  439. "decryptionHeader": metadataDecryptionHeader,
  440. }
  441. };
  442. try {
  443. final response = await _dio.post(
  444. Configuration.instance.getHttpEndpoint() + "/files",
  445. options: Options(
  446. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  447. data: request,
  448. );
  449. final data = response.data;
  450. file.uploadedFileID = data["id"];
  451. file.collectionID = collectionID;
  452. file.updationTime = data["updationTime"];
  453. file.ownerID = data["ownerID"];
  454. file.encryptedKey = encryptedKey;
  455. file.keyDecryptionNonce = keyDecryptionNonce;
  456. file.fileDecryptionHeader = fileDecryptionHeader;
  457. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  458. file.metadataDecryptionHeader = metadataDecryptionHeader;
  459. return file;
  460. } on DioError catch (e) {
  461. if (e.response?.statusCode == 413) {
  462. throw FileTooLargeForPlanError();
  463. } else if (e.response?.statusCode == 426) {
  464. _onStorageLimitExceeded();
  465. } else if (attempt < kMaximumUploadAttempts) {
  466. _logger.info("Upload file failed, will retry in 3 seconds");
  467. await Future.delayed(Duration(seconds: 3));
  468. return _uploadFile(
  469. file,
  470. collectionID,
  471. encryptedKey,
  472. keyDecryptionNonce,
  473. fileAttributes,
  474. fileObjectKey,
  475. fileDecryptionHeader,
  476. fileSize,
  477. thumbnailObjectKey,
  478. thumbnailDecryptionHeader,
  479. thumbnailSize,
  480. encryptedMetadata,
  481. metadataDecryptionHeader,
  482. attempt: attempt + 1,
  483. );
  484. }
  485. rethrow;
  486. }
  487. }
  488. Future<File> _updateFile(
  489. File file,
  490. String fileObjectKey,
  491. String fileDecryptionHeader,
  492. int fileSize,
  493. String thumbnailObjectKey,
  494. String thumbnailDecryptionHeader,
  495. int thumbnailSize,
  496. String encryptedMetadata,
  497. String metadataDecryptionHeader, {
  498. int attempt = 1,
  499. }) async {
  500. final request = {
  501. "id": file.uploadedFileID,
  502. "file": {
  503. "objectKey": fileObjectKey,
  504. "decryptionHeader": fileDecryptionHeader,
  505. "size": fileSize,
  506. },
  507. "thumbnail": {
  508. "objectKey": thumbnailObjectKey,
  509. "decryptionHeader": thumbnailDecryptionHeader,
  510. "size": thumbnailSize,
  511. },
  512. "metadata": {
  513. "encryptedData": encryptedMetadata,
  514. "decryptionHeader": metadataDecryptionHeader,
  515. }
  516. };
  517. try {
  518. final response = await _dio.post(
  519. Configuration.instance.getHttpEndpoint() + "/files",
  520. options: Options(
  521. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  522. data: request,
  523. );
  524. final data = response.data;
  525. file.uploadedFileID = data["id"];
  526. file.updationTime = data["updationTime"];
  527. file.fileDecryptionHeader = fileDecryptionHeader;
  528. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  529. file.metadataDecryptionHeader = metadataDecryptionHeader;
  530. return file;
  531. } on DioError catch (e) {
  532. if (e.response?.statusCode == 426) {
  533. _onStorageLimitExceeded();
  534. } else if (attempt < kMaximumUploadAttempts) {
  535. _logger.info("Update file failed, will retry in 3 seconds");
  536. await Future.delayed(Duration(seconds: 3));
  537. return _updateFile(
  538. file,
  539. fileObjectKey,
  540. fileDecryptionHeader,
  541. fileSize,
  542. thumbnailObjectKey,
  543. thumbnailDecryptionHeader,
  544. thumbnailSize,
  545. encryptedMetadata,
  546. metadataDecryptionHeader,
  547. attempt: attempt + 1,
  548. );
  549. }
  550. rethrow;
  551. }
  552. }
  553. Future<UploadURL> _getUploadURL() async {
  554. if (_uploadURLs.isEmpty) {
  555. await _fetchUploadURLs();
  556. }
  557. return _uploadURLs.removeFirst();
  558. }
  559. Future<void> _uploadURLFetchInProgress;
  560. Future<void> _fetchUploadURLs() async {
  561. _uploadURLFetchInProgress ??= Future<void>(() async {
  562. try {
  563. final response = await _dio.get(
  564. Configuration.instance.getHttpEndpoint() + "/files/upload-urls",
  565. queryParameters: {
  566. "count": min(42, 2 * _queue.length), // m4gic number
  567. },
  568. options: Options(
  569. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  570. );
  571. final urls = (response.data["urls"] as List)
  572. .map((e) => UploadURL.fromMap(e))
  573. .toList();
  574. _uploadURLs.addAll(urls);
  575. _uploadURLFetchInProgress = null;
  576. } on DioError catch (e) {
  577. _uploadURLFetchInProgress = null;
  578. if (e.response != null) {
  579. if (e.response.statusCode == 402) {
  580. final error = NoActiveSubscriptionError();
  581. clearQueue(error);
  582. throw error;
  583. } else if (e.response.statusCode == 426) {
  584. final error = StorageLimitExceededError();
  585. clearQueue(error);
  586. throw error;
  587. }
  588. }
  589. rethrow;
  590. }
  591. });
  592. return _uploadURLFetchInProgress;
  593. }
  594. void _onStorageLimitExceeded() {
  595. clearQueue(StorageLimitExceededError());
  596. throw StorageLimitExceededError();
  597. }
  598. Future<String> _putFile(
  599. UploadURL uploadURL,
  600. io.File file, {
  601. int contentLength,
  602. int attempt = 1,
  603. }) async {
  604. final fileSize = contentLength ?? await file.length();
  605. _logger.info("Putting object for " +
  606. file.toString() +
  607. " of size: " +
  608. fileSize.toString());
  609. final startTime = DateTime.now().millisecondsSinceEpoch;
  610. try {
  611. await _dio.put(
  612. uploadURL.url,
  613. data: file.openRead(),
  614. options: Options(
  615. headers: {
  616. Headers.contentLengthHeader: fileSize,
  617. },
  618. ),
  619. );
  620. _logger.info("Upload speed : " +
  621. (fileSize / (DateTime.now().millisecondsSinceEpoch - startTime))
  622. .toString() +
  623. " kilo bytes per second");
  624. return uploadURL.objectKey;
  625. } on DioError catch (e) {
  626. if (e.message.startsWith(
  627. "HttpException: Content size exceeds specified contentLength.") &&
  628. attempt == 1) {
  629. return _putFile(uploadURL, file,
  630. contentLength: (await file.readAsBytes()).length, attempt: 2);
  631. } else if (attempt < kMaximumUploadAttempts) {
  632. final newUploadURL = await _getUploadURL();
  633. return _putFile(newUploadURL, file,
  634. contentLength: (await file.readAsBytes()).length,
  635. attempt: attempt + 1);
  636. } else {
  637. _logger.info(
  638. "Upload failed for file with size " + fileSize.toString(), e);
  639. rethrow;
  640. }
  641. }
  642. }
  643. Future<void> _pollBackgroundUploadStatus() async {
  644. final blockedUploads = _queue.entries
  645. .where((e) => e.value.status == UploadStatus.in_background)
  646. .toList();
  647. for (final upload in blockedUploads) {
  648. final file = upload.value.file;
  649. final isStillLocked = await _uploadLocks.isLocked(
  650. file.localID, ProcessType.background.toString());
  651. if (!isStillLocked) {
  652. final completer = _queue.remove(upload.key).completer;
  653. final dbFile =
  654. await FilesDB.instance.getFile(upload.value.file.generatedID);
  655. if (dbFile.uploadedFileID != null) {
  656. _logger.info("Background upload success detected");
  657. completer.complete(dbFile);
  658. } else {
  659. _logger.info("Background upload failure detected");
  660. completer.completeError(SilentlyCancelUploadsError());
  661. }
  662. }
  663. }
  664. Future.delayed(kBlockedUploadsPollFrequency, () async {
  665. await _pollBackgroundUploadStatus();
  666. });
  667. }
  668. }
  669. class FileUploadItem {
  670. final File file;
  671. final int collectionID;
  672. final Completer<File> completer;
  673. UploadStatus status;
  674. FileUploadItem(
  675. this.file,
  676. this.collectionID,
  677. this.completer, {
  678. this.status = UploadStatus.not_started,
  679. });
  680. }
  681. enum UploadStatus {
  682. not_started,
  683. in_progress,
  684. in_background,
  685. completed,
  686. }
  687. enum ProcessType {
  688. background,
  689. foreground,
  690. }