file_uploader.dart 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. import 'dart:async';
  2. import 'dart:collection';
  3. import 'dart:convert';
  4. import 'dart:io' as io;
  5. import 'dart:math';
  6. import 'dart:typed_data';
  7. import 'package:connectivity/connectivity.dart';
  8. import 'package:dio/dio.dart';
  9. import 'package:flutter_sodium/flutter_sodium.dart';
  10. import 'package:logging/logging.dart';
  11. import 'package:photos/core/configuration.dart';
  12. import 'package:photos/core/errors.dart';
  13. import 'package:photos/core/event_bus.dart';
  14. import 'package:photos/core/network.dart';
  15. import 'package:photos/db/files_db.dart';
  16. import 'package:photos/db/upload_locks_db.dart';
  17. import 'package:photos/events/local_photos_updated_event.dart';
  18. import 'package:photos/events/subscription_purchased_event.dart';
  19. import 'package:photos/main.dart';
  20. import 'package:photos/models/encryption_result.dart';
  21. import 'package:photos/models/file.dart';
  22. import 'package:photos/models/upload_url.dart';
  23. import 'package:photos/services/collections_service.dart';
  24. import 'package:photos/services/local_sync_service.dart';
  25. import 'package:photos/services/sync_service.dart';
  26. import 'package:photos/utils/crypto_util.dart';
  27. import 'package:photos/utils/file_download_util.dart';
  28. import 'package:photos/utils/file_uploader_util.dart';
  29. import 'package:shared_preferences/shared_preferences.dart';
  30. class FileUploader {
  31. static const kMaximumConcurrentUploads = 4;
  32. static const kMaximumThumbnailCompressionAttempts = 2;
  33. static const kMaximumUploadAttempts = 4;
  34. static const kBlockedUploadsPollFrequency = Duration(seconds: 2);
  35. final _logger = Logger("FileUploader");
  36. final _dio = Network.instance.getDio();
  37. final _queue = LinkedHashMap<String, FileUploadItem>();
  38. final _uploadLocks = UploadLocksDB.instance;
  39. final kSafeBufferForLockExpiry = Duration(days: 1).inMicroseconds;
  40. final kBGTaskDeathTimeout = Duration(seconds: 5).inMicroseconds;
  41. final _uploadURLs = Queue<UploadURL>();
  42. // Maintains the count of files in the current upload session.
  43. // Upload session is the period between the first entry into the _queue and last entry out of the _queue
  44. int _totalCountInUploadSession = 0;
  45. int _currentlyUploading = 0;
  46. ProcessType _processType;
  47. bool _isBackground;
  48. SharedPreferences _prefs;
  49. FileUploader._privateConstructor() {
  50. Bus.instance.on<SubscriptionPurchasedEvent>().listen((event) {
  51. _uploadURLFetchInProgress = null;
  52. });
  53. }
  54. static FileUploader instance = FileUploader._privateConstructor();
  55. Future<void> init(bool isBackground) async {
  56. _prefs = await SharedPreferences.getInstance();
  57. _isBackground = isBackground;
  58. _processType =
  59. isBackground ? ProcessType.background : ProcessType.foreground;
  60. final currentTime = DateTime.now().microsecondsSinceEpoch;
  61. await _uploadLocks.releaseLocksAcquiredByOwnerBefore(
  62. _processType.toString(), currentTime);
  63. await _uploadLocks
  64. .releaseAllLocksAcquiredBefore(currentTime - kSafeBufferForLockExpiry);
  65. if (!isBackground) {
  66. await _prefs.reload();
  67. final isBGTaskDead = (_prefs.getInt(kLastBGTaskHeartBeatTime) ?? 0) <
  68. (currentTime - kBGTaskDeathTimeout);
  69. if (isBGTaskDead) {
  70. await _uploadLocks.releaseLocksAcquiredByOwnerBefore(
  71. ProcessType.background.toString(), currentTime);
  72. _logger.info("BG task was found dead, cleared all locks");
  73. }
  74. _pollBackgroundUploadStatus();
  75. }
  76. }
  77. Future<File> upload(File file, int collectionID) {
  78. // If the file hasn't been queued yet, queue it
  79. _totalCountInUploadSession++;
  80. if (!_queue.containsKey(file.localID)) {
  81. final completer = Completer<File>();
  82. _queue[file.localID] = FileUploadItem(file, collectionID, completer);
  83. _pollQueue();
  84. return completer.future;
  85. }
  86. // If the file exists in the queue for a matching collectionID,
  87. // return the existing future
  88. final item = _queue[file.localID];
  89. if (item.collectionID == collectionID) {
  90. _totalCountInUploadSession--;
  91. return item.completer.future;
  92. }
  93. // Else wait for the existing upload to complete,
  94. // and add it to the relevant collection
  95. return item.completer.future.then((uploadedFile) {
  96. return CollectionsService.instance
  97. .addToCollection(collectionID, [uploadedFile]).then((aVoid) {
  98. return uploadedFile;
  99. });
  100. });
  101. }
  102. Future<File> forceUpload(File file, int collectionID) async {
  103. _logger.info("Force uploading " +
  104. file.toString() +
  105. " into collection " +
  106. collectionID.toString());
  107. _totalCountInUploadSession++;
  108. // If the file hasn't been queued yet, ez.
  109. if (!_queue.containsKey(file.localID)) {
  110. final completer = Completer<File>();
  111. _queue[file.localID] = FileUploadItem(
  112. file,
  113. collectionID,
  114. completer,
  115. status: UploadStatus.in_progress,
  116. );
  117. _encryptAndUploadFileToCollection(file, collectionID, forcedUpload: true);
  118. return completer.future;
  119. }
  120. var item = _queue[file.localID];
  121. // If the file is being uploaded right now, wait and proceed
  122. if (item.status == UploadStatus.in_progress ||
  123. item.status == UploadStatus.in_background) {
  124. _totalCountInUploadSession--;
  125. final uploadedFile = await item.completer.future;
  126. if (uploadedFile.collectionID == collectionID) {
  127. // Do nothing
  128. } else {
  129. await CollectionsService.instance
  130. .addToCollection(collectionID, [uploadedFile]);
  131. }
  132. return uploadedFile;
  133. } else {
  134. // If the file is yet to be processed,
  135. // 1. Set the status to in_progress
  136. // 2. Force upload the file
  137. // 3. Add to the relevant collection
  138. item = _queue[file.localID];
  139. item.status = UploadStatus.in_progress;
  140. final uploadedFile = await _encryptAndUploadFileToCollection(
  141. file, collectionID,
  142. forcedUpload: true);
  143. if (item.collectionID == collectionID) {
  144. return uploadedFile;
  145. } else {
  146. await CollectionsService.instance
  147. .addToCollection(item.collectionID, [uploadedFile]);
  148. return uploadedFile;
  149. }
  150. }
  151. }
  152. int getCurrentSessionUploadCount() {
  153. return _totalCountInUploadSession;
  154. }
  155. void clearQueue(final Error reason) {
  156. final List<String> uploadsToBeRemoved = [];
  157. _queue.entries
  158. .where((entry) => entry.value.status == UploadStatus.not_started)
  159. .forEach((pendingUpload) {
  160. uploadsToBeRemoved.add(pendingUpload.key);
  161. });
  162. for (final id in uploadsToBeRemoved) {
  163. _queue.remove(id).completer.completeError(reason);
  164. }
  165. _totalCountInUploadSession = 0;
  166. }
  167. void removeFromQueueWhere(final bool Function(File) fn, final Error reason) {
  168. List<String> uploadsToBeRemoved = [];
  169. _queue.entries
  170. .where((entry) => entry.value.status == UploadStatus.not_started)
  171. .forEach((pendingUpload) {
  172. if (fn(pendingUpload.value.file)) {
  173. uploadsToBeRemoved.add(pendingUpload.key);
  174. }
  175. });
  176. for (final id in uploadsToBeRemoved) {
  177. _queue.remove(id).completer.completeError(reason);
  178. }
  179. _totalCountInUploadSession -= uploadsToBeRemoved.length;
  180. }
  181. void _pollQueue() {
  182. if (SyncService.instance.shouldStopSync()) {
  183. clearQueue(SyncStopRequestedError());
  184. }
  185. if (_queue.isEmpty) {
  186. // Upload session completed
  187. _totalCountInUploadSession = 0;
  188. return;
  189. }
  190. if (_currentlyUploading < kMaximumConcurrentUploads) {
  191. final firstPendingEntry = _queue.entries
  192. .firstWhere((entry) => entry.value.status == UploadStatus.not_started,
  193. orElse: () => null)
  194. ?.value;
  195. if (firstPendingEntry != null) {
  196. firstPendingEntry.status = UploadStatus.in_progress;
  197. _encryptAndUploadFileToCollection(
  198. firstPendingEntry.file, firstPendingEntry.collectionID);
  199. }
  200. }
  201. }
  202. Future<File> _encryptAndUploadFileToCollection(File file, int collectionID,
  203. {bool forcedUpload = false}) async {
  204. _currentlyUploading++;
  205. final localID = file.localID;
  206. try {
  207. final uploadedFile = await _tryToUpload(file, collectionID, forcedUpload);
  208. _queue.remove(localID).completer.complete(uploadedFile);
  209. return uploadedFile;
  210. } catch (e) {
  211. if (e is LockAlreadyAcquiredError) {
  212. _queue[localID].status = UploadStatus.in_background;
  213. return _queue[localID].completer.future;
  214. } else {
  215. _queue.remove(localID).completer.completeError(e);
  216. return null;
  217. }
  218. } finally {
  219. _currentlyUploading--;
  220. _pollQueue();
  221. }
  222. }
  223. Future<File> _tryToUpload(
  224. File file, int collectionID, bool forcedUpload) async {
  225. final connectivityResult = await (Connectivity().checkConnectivity());
  226. var canUploadUnderCurrentNetworkConditions =
  227. (connectivityResult == ConnectivityResult.wifi ||
  228. Configuration.instance.shouldBackupOverMobileData());
  229. if (!canUploadUnderCurrentNetworkConditions && !forcedUpload) {
  230. throw WiFiUnavailableError();
  231. }
  232. try {
  233. await _uploadLocks.acquireLock(
  234. file.localID,
  235. _processType.toString(),
  236. DateTime.now().microsecondsSinceEpoch,
  237. );
  238. } catch (e) {
  239. _logger.warning("Lock was already taken for " + file.toString());
  240. throw LockAlreadyAcquiredError();
  241. }
  242. final tempDirectory = Configuration.instance.getTempDirectory();
  243. final encryptedFilePath = tempDirectory +
  244. file.generatedID.toString() +
  245. (_isBackground ? "_bg" : "") +
  246. ".encrypted";
  247. final encryptedThumbnailPath = tempDirectory +
  248. file.generatedID.toString() +
  249. "_thumbnail" +
  250. (_isBackground ? "_bg" : "") +
  251. ".encrypted";
  252. MediaUploadData mediaUploadData;
  253. try {
  254. _logger.info("Trying to upload " +
  255. file.toString() +
  256. ", isForced: " +
  257. forcedUpload.toString());
  258. try {
  259. mediaUploadData = await getUploadDataFromEnteFile(file);
  260. } catch (e) {
  261. if (e is InvalidFileError) {
  262. _onInvalidFileError(file);
  263. } else {
  264. rethrow;
  265. }
  266. }
  267. Uint8List key;
  268. var isAlreadyUploadedFile = file.uploadedFileID != null;
  269. if (isAlreadyUploadedFile) {
  270. key = decryptFileKey(file);
  271. } else {
  272. key = null;
  273. }
  274. if (io.File(encryptedFilePath).existsSync()) {
  275. await io.File(encryptedFilePath).delete();
  276. }
  277. final encryptedFile = io.File(encryptedFilePath);
  278. final fileAttributes = await CryptoUtil.encryptFile(
  279. mediaUploadData.sourceFile.path,
  280. encryptedFilePath,
  281. key: key,
  282. );
  283. var thumbnailData = mediaUploadData.thumbnail;
  284. final encryptedThumbnailData =
  285. await CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key);
  286. if (io.File(encryptedThumbnailPath).existsSync()) {
  287. await io.File(encryptedThumbnailPath).delete();
  288. }
  289. final encryptedThumbnailFile = io.File(encryptedThumbnailPath);
  290. await encryptedThumbnailFile
  291. .writeAsBytes(encryptedThumbnailData.encryptedData);
  292. final thumbnailUploadURL = await _getUploadURL();
  293. String thumbnailObjectKey =
  294. await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
  295. final fileUploadURL = await _getUploadURL();
  296. String fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
  297. final encryptedMetadataData = await CryptoUtil.encryptChaCha(
  298. utf8.encode(jsonEncode(file.getMetadata())), fileAttributes.key);
  299. final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header);
  300. final thumbnailDecryptionHeader =
  301. Sodium.bin2base64(encryptedThumbnailData.header);
  302. final encryptedMetadata =
  303. Sodium.bin2base64(encryptedMetadataData.encryptedData);
  304. final metadataDecryptionHeader =
  305. Sodium.bin2base64(encryptedMetadataData.header);
  306. if (SyncService.instance.shouldStopSync()) {
  307. throw SyncStopRequestedError();
  308. }
  309. File remoteFile;
  310. if (isAlreadyUploadedFile) {
  311. remoteFile = await _updateFile(
  312. file,
  313. fileObjectKey,
  314. fileDecryptionHeader,
  315. encryptedFile.lengthSync(),
  316. thumbnailObjectKey,
  317. thumbnailDecryptionHeader,
  318. encryptedThumbnailFile.lengthSync(),
  319. encryptedMetadata,
  320. metadataDecryptionHeader,
  321. );
  322. // Update across all collections
  323. await FilesDB.instance.updateUploadedFileAcrossCollections(remoteFile);
  324. } else {
  325. remoteFile = await _uploadFile(
  326. file,
  327. collectionID,
  328. fileAttributes,
  329. fileObjectKey,
  330. fileDecryptionHeader,
  331. encryptedFile.lengthSync(),
  332. thumbnailObjectKey,
  333. thumbnailDecryptionHeader,
  334. encryptedThumbnailFile.lengthSync(),
  335. encryptedMetadata,
  336. metadataDecryptionHeader,
  337. );
  338. if (mediaUploadData.isDeleted) {
  339. _logger.info("File found to be deleted");
  340. remoteFile.localID = null;
  341. }
  342. await FilesDB.instance.update(remoteFile);
  343. }
  344. if (!_isBackground) {
  345. Bus.instance.fire(LocalPhotosUpdatedEvent([remoteFile]));
  346. }
  347. _logger.info("File upload complete for " + remoteFile.toString());
  348. return remoteFile;
  349. } catch (e, s) {
  350. if (!(e is NoActiveSubscriptionError || e is StorageLimitExceededError)) {
  351. _logger.severe("File upload failed for " + file.toString(), e, s);
  352. }
  353. rethrow;
  354. } finally {
  355. if (io.Platform.isIOS &&
  356. mediaUploadData != null &&
  357. mediaUploadData.sourceFile != null) {
  358. await mediaUploadData.sourceFile.delete();
  359. }
  360. if (io.File(encryptedFilePath).existsSync()) {
  361. await io.File(encryptedFilePath).delete();
  362. }
  363. if (io.File(encryptedThumbnailPath).existsSync()) {
  364. await io.File(encryptedThumbnailPath).delete();
  365. }
  366. await _uploadLocks.releaseLock(file.localID, _processType.toString());
  367. }
  368. }
  369. Future _onInvalidFileError(File file) async {
  370. _logger.warning("Invalid file encountered: " + file.toString());
  371. await FilesDB.instance.deleteLocalFile(file);
  372. await LocalSyncService.instance.trackInvalidFile(file);
  373. throw InvalidFileError();
  374. }
  375. Future<File> _uploadFile(
  376. File file,
  377. int collectionID,
  378. EncryptionResult fileAttributes,
  379. String fileObjectKey,
  380. String fileDecryptionHeader,
  381. int fileSize,
  382. String thumbnailObjectKey,
  383. String thumbnailDecryptionHeader,
  384. int thumbnailSize,
  385. String encryptedMetadata,
  386. String metadataDecryptionHeader, {
  387. int attempt = 1,
  388. }) async {
  389. final encryptedFileKeyData = CryptoUtil.encryptSync(
  390. fileAttributes.key,
  391. CollectionsService.instance.getCollectionKey(collectionID),
  392. );
  393. final encryptedKey = Sodium.bin2base64(encryptedFileKeyData.encryptedData);
  394. final keyDecryptionNonce = Sodium.bin2base64(encryptedFileKeyData.nonce);
  395. final request = {
  396. "collectionID": collectionID,
  397. "encryptedKey": encryptedKey,
  398. "keyDecryptionNonce": keyDecryptionNonce,
  399. "file": {
  400. "objectKey": fileObjectKey,
  401. "decryptionHeader": fileDecryptionHeader,
  402. "size": fileSize,
  403. },
  404. "thumbnail": {
  405. "objectKey": thumbnailObjectKey,
  406. "decryptionHeader": thumbnailDecryptionHeader,
  407. "size": thumbnailSize,
  408. },
  409. "metadata": {
  410. "encryptedData": encryptedMetadata,
  411. "decryptionHeader": metadataDecryptionHeader,
  412. }
  413. };
  414. try {
  415. final response = await _dio.post(
  416. Configuration.instance.getHttpEndpoint() + "/files",
  417. options: Options(
  418. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  419. data: request,
  420. );
  421. final data = response.data;
  422. file.uploadedFileID = data["id"];
  423. file.collectionID = collectionID;
  424. file.updationTime = data["updationTime"];
  425. file.ownerID = data["ownerID"];
  426. file.encryptedKey = encryptedKey;
  427. file.keyDecryptionNonce = keyDecryptionNonce;
  428. file.fileDecryptionHeader = fileDecryptionHeader;
  429. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  430. file.metadataDecryptionHeader = metadataDecryptionHeader;
  431. return file;
  432. } on DioError catch (e) {
  433. if (e.response?.statusCode == 426) {
  434. _onStorageLimitExceeded();
  435. } else if (attempt < kMaximumUploadAttempts) {
  436. _logger.info("Upload file failed, will retry in 3 seconds");
  437. await Future.delayed(Duration(seconds: 3));
  438. return _uploadFile(
  439. file,
  440. collectionID,
  441. fileAttributes,
  442. fileObjectKey,
  443. fileDecryptionHeader,
  444. fileSize,
  445. thumbnailObjectKey,
  446. thumbnailDecryptionHeader,
  447. thumbnailSize,
  448. encryptedMetadata,
  449. metadataDecryptionHeader,
  450. attempt: attempt + 1,
  451. );
  452. }
  453. rethrow;
  454. }
  455. }
  456. Future<File> _updateFile(
  457. File file,
  458. String fileObjectKey,
  459. String fileDecryptionHeader,
  460. int fileSize,
  461. String thumbnailObjectKey,
  462. String thumbnailDecryptionHeader,
  463. int thumbnailSize,
  464. String encryptedMetadata,
  465. String metadataDecryptionHeader, {
  466. int attempt = 1,
  467. }) async {
  468. final request = {
  469. "id": file.uploadedFileID,
  470. "file": {
  471. "objectKey": fileObjectKey,
  472. "decryptionHeader": fileDecryptionHeader,
  473. "size": fileSize,
  474. },
  475. "thumbnail": {
  476. "objectKey": thumbnailObjectKey,
  477. "decryptionHeader": thumbnailDecryptionHeader,
  478. "size": thumbnailSize,
  479. },
  480. "metadata": {
  481. "encryptedData": encryptedMetadata,
  482. "decryptionHeader": metadataDecryptionHeader,
  483. }
  484. };
  485. try {
  486. final response = await _dio.post(
  487. Configuration.instance.getHttpEndpoint() + "/files",
  488. options: Options(
  489. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  490. data: request,
  491. );
  492. final data = response.data;
  493. file.uploadedFileID = data["id"];
  494. file.updationTime = data["updationTime"];
  495. file.fileDecryptionHeader = fileDecryptionHeader;
  496. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  497. file.metadataDecryptionHeader = metadataDecryptionHeader;
  498. return file;
  499. } on DioError catch (e) {
  500. if (e.response?.statusCode == 426) {
  501. _onStorageLimitExceeded();
  502. } else if (attempt < kMaximumUploadAttempts) {
  503. _logger.info("Update file failed, will retry in 3 seconds");
  504. await Future.delayed(Duration(seconds: 3));
  505. return _updateFile(
  506. file,
  507. fileObjectKey,
  508. fileDecryptionHeader,
  509. fileSize,
  510. thumbnailObjectKey,
  511. thumbnailDecryptionHeader,
  512. thumbnailSize,
  513. encryptedMetadata,
  514. metadataDecryptionHeader,
  515. attempt: attempt + 1,
  516. );
  517. }
  518. rethrow;
  519. }
  520. }
  521. Future<UploadURL> _getUploadURL() async {
  522. if (_uploadURLs.isEmpty) {
  523. await _fetchUploadURLs();
  524. }
  525. return _uploadURLs.removeFirst();
  526. }
  527. Future<void> _uploadURLFetchInProgress;
  528. Future<void> _fetchUploadURLs() async {
  529. _uploadURLFetchInProgress ??= Future<void>(() async {
  530. try {
  531. final response = await _dio.get(
  532. Configuration.instance.getHttpEndpoint() + "/files/upload-urls",
  533. queryParameters: {
  534. "count": min(42, 2 * _queue.length), // m4gic number
  535. },
  536. options: Options(
  537. headers: {"X-Auth-Token": Configuration.instance.getToken()}),
  538. );
  539. final urls = (response.data["urls"] as List)
  540. .map((e) => UploadURL.fromMap(e))
  541. .toList();
  542. _uploadURLs.addAll(urls);
  543. _uploadURLFetchInProgress = null;
  544. } on DioError catch (e) {
  545. _uploadURLFetchInProgress = null;
  546. if (e.response != null) {
  547. if (e.response.statusCode == 402) {
  548. final error = NoActiveSubscriptionError();
  549. clearQueue(error);
  550. throw error;
  551. } else if (e.response.statusCode == 426) {
  552. final error = StorageLimitExceededError();
  553. clearQueue(error);
  554. throw error;
  555. }
  556. }
  557. rethrow;
  558. }
  559. });
  560. return _uploadURLFetchInProgress;
  561. }
  562. void _onStorageLimitExceeded() {
  563. clearQueue(StorageLimitExceededError());
  564. throw StorageLimitExceededError();
  565. }
  566. Future<String> _putFile(
  567. UploadURL uploadURL,
  568. io.File file, {
  569. int contentLength,
  570. int attempt = 1,
  571. }) async {
  572. final fileSize = contentLength ?? file.lengthSync();
  573. final startTime = DateTime.now().millisecondsSinceEpoch;
  574. try {
  575. await _dio.put(
  576. uploadURL.url,
  577. data: file.openRead(),
  578. options: Options(
  579. headers: {
  580. Headers.contentLengthHeader: fileSize,
  581. },
  582. ),
  583. );
  584. _logger.info("Upload speed : " +
  585. (file.lengthSync() /
  586. (DateTime.now().millisecondsSinceEpoch - startTime))
  587. .toString() +
  588. " kilo bytes per second");
  589. return uploadURL.objectKey;
  590. } on DioError catch (e) {
  591. if (e.message.startsWith(
  592. "HttpException: Content size exceeds specified contentLength.") &&
  593. attempt == 1) {
  594. return _putFile(uploadURL, file,
  595. contentLength: file.readAsBytesSync().length, attempt: 2);
  596. } else if (attempt < kMaximumUploadAttempts) {
  597. final newUploadURL = await _getUploadURL();
  598. return _putFile(newUploadURL, file,
  599. contentLength: file.readAsBytesSync().length, attempt: attempt + 1);
  600. } else {
  601. _logger.info(
  602. "Upload failed for file with size " + fileSize.toString(), e);
  603. rethrow;
  604. }
  605. }
  606. }
  607. Future<void> _pollBackgroundUploadStatus() async {
  608. final blockedUploads = _queue.entries
  609. .where((e) => e.value.status == UploadStatus.in_background)
  610. .toList();
  611. for (final upload in blockedUploads) {
  612. final file = upload.value.file;
  613. final isStillLocked = await _uploadLocks.isLocked(
  614. file.localID, ProcessType.background.toString());
  615. if (!isStillLocked) {
  616. final completer = _queue.remove(upload.key).completer;
  617. final dbFile =
  618. await FilesDB.instance.getFile(upload.value.file.generatedID);
  619. if (dbFile.uploadedFileID != null) {
  620. _logger.info("Background upload success detected");
  621. completer.complete(dbFile);
  622. } else {
  623. _logger.info("Background upload failure detected");
  624. completer.completeError(SilentlyCancelUploadsError());
  625. }
  626. }
  627. }
  628. Future.delayed(kBlockedUploadsPollFrequency, () async {
  629. await _pollBackgroundUploadStatus();
  630. });
  631. }
  632. }
  633. class FileUploadItem {
  634. final File file;
  635. final int collectionID;
  636. final Completer<File> completer;
  637. UploadStatus status;
  638. FileUploadItem(
  639. this.file,
  640. this.collectionID,
  641. this.completer, {
  642. this.status = UploadStatus.not_started,
  643. });
  644. }
  645. enum UploadStatus {
  646. not_started,
  647. in_progress,
  648. in_background,
  649. completed,
  650. }
  651. enum ProcessType {
  652. background,
  653. foreground,
  654. }