file_uploader.dart 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828
  1. import 'dart:async';
  2. import 'dart:collection';
  3. import 'dart:convert';
  4. import 'dart:io' as io;
  5. import 'dart:math';
  6. import 'dart:typed_data';
  7. import 'package:connectivity/connectivity.dart';
  8. import 'package:dio/dio.dart';
  9. import 'package:flutter_sodium/flutter_sodium.dart';
  10. import 'package:logging/logging.dart';
  11. import 'package:path/path.dart';
  12. import 'package:photos/core/configuration.dart';
  13. import 'package:photos/core/errors.dart';
  14. import 'package:photos/core/event_bus.dart';
  15. import 'package:photos/core/network.dart';
  16. import 'package:photos/db/files_db.dart';
  17. import 'package:photos/db/upload_locks_db.dart';
  18. import 'package:photos/events/files_updated_event.dart';
  19. import 'package:photos/events/local_photos_updated_event.dart';
  20. import 'package:photos/events/subscription_purchased_event.dart';
  21. import 'package:photos/main.dart';
  22. import 'package:photos/models/encryption_result.dart';
  23. import 'package:photos/models/file.dart';
  24. import 'package:photos/models/file_type.dart';
  25. import 'package:photos/models/upload_url.dart';
  26. import 'package:photos/services/collections_service.dart';
  27. import 'package:photos/services/local_sync_service.dart';
  28. import 'package:photos/services/sync_service.dart';
  29. import 'package:photos/utils/crypto_util.dart';
  30. import 'package:photos/utils/file_download_util.dart';
  31. import 'package:photos/utils/file_uploader_util.dart';
  32. import 'package:shared_preferences/shared_preferences.dart';
  33. class FileUploader {
  34. static const kMaximumConcurrentUploads = 4;
  35. static const kMaximumConcurrentVideoUploads = 2;
  36. static const kMaximumThumbnailCompressionAttempts = 2;
  37. static const kMaximumUploadAttempts = 4;
  38. static const kBlockedUploadsPollFrequency = Duration(seconds: 2);
  39. static const kFileUploadTimeout = Duration(minutes: 50);
  40. final _logger = Logger("FileUploader");
  41. final _dio = Network.instance.getDio();
  42. final _queue = LinkedHashMap<String, FileUploadItem>();
  43. final _uploadLocks = UploadLocksDB.instance;
  44. final kSafeBufferForLockExpiry = Duration(days: 1).inMicroseconds;
  45. final kBGTaskDeathTimeout = Duration(seconds: 5).inMicroseconds;
  46. final _uploadURLs = Queue<UploadURL>();
  47. // Maintains the count of files in the current upload session.
  48. // Upload session is the period between the first entry into the _queue and last entry out of the _queue
  49. int _totalCountInUploadSession = 0;
  50. // _uploadCounter indicates number of uploads which are currently in progress
  51. int _uploadCounter = 0;
  52. int _videoUploadCounter = 0;
  53. ProcessType _processType;
  54. bool _isBackground;
  55. SharedPreferences _prefs;
  56. FileUploader._privateConstructor() {
  57. Bus.instance.on<SubscriptionPurchasedEvent>().listen((event) {
  58. _uploadURLFetchInProgress = null;
  59. });
  60. }
  61. static FileUploader instance = FileUploader._privateConstructor();
  62. Future<void> init(bool isBackground) async {
  63. _prefs = await SharedPreferences.getInstance();
  64. _isBackground = isBackground;
  65. _processType =
  66. isBackground ? ProcessType.background : ProcessType.foreground;
  67. final currentTime = DateTime.now().microsecondsSinceEpoch;
  68. await _uploadLocks.releaseLocksAcquiredByOwnerBefore(
  69. _processType.toString(),
  70. currentTime,
  71. );
  72. await _uploadLocks
  73. .releaseAllLocksAcquiredBefore(currentTime - kSafeBufferForLockExpiry);
  74. if (!isBackground) {
  75. await _prefs.reload();
  76. final isBGTaskDead = (_prefs.getInt(kLastBGTaskHeartBeatTime) ?? 0) <
  77. (currentTime - kBGTaskDeathTimeout);
  78. if (isBGTaskDead) {
  79. await _uploadLocks.releaseLocksAcquiredByOwnerBefore(
  80. ProcessType.background.toString(),
  81. currentTime,
  82. );
  83. _logger.info("BG task was found dead, cleared all locks");
  84. }
  85. _pollBackgroundUploadStatus();
  86. }
  87. Bus.instance.on<LocalPhotosUpdatedEvent>().listen((event) {
  88. if (event.type == EventType.deletedFromDevice ||
  89. event.type == EventType.deletedFromEverywhere) {
  90. removeFromQueueWhere(
  91. (file) {
  92. for (final updatedFile in event.updatedFiles) {
  93. if (file.generatedID == updatedFile.generatedID) {
  94. return true;
  95. }
  96. }
  97. return false;
  98. },
  99. InvalidFileError("File already deleted"),
  100. );
  101. }
  102. });
  103. }
  104. Future<File> upload(File file, int collectionID) {
  105. // If the file hasn't been queued yet, queue it
  106. _totalCountInUploadSession++;
  107. if (!_queue.containsKey(file.localID)) {
  108. final completer = Completer<File>();
  109. _queue[file.localID] = FileUploadItem(file, collectionID, completer);
  110. _pollQueue();
  111. return completer.future;
  112. }
  113. // If the file exists in the queue for a matching collectionID,
  114. // return the existing future
  115. final item = _queue[file.localID];
  116. if (item.collectionID == collectionID) {
  117. _totalCountInUploadSession--;
  118. return item.completer.future;
  119. }
  120. // Else wait for the existing upload to complete,
  121. // and add it to the relevant collection
  122. return item.completer.future.then((uploadedFile) {
  123. return CollectionsService.instance
  124. .addToCollection(collectionID, [uploadedFile]).then((aVoid) {
  125. return uploadedFile;
  126. });
  127. });
  128. }
  129. Future<File> forceUpload(File file, int collectionID) async {
  130. _logger.info(
  131. "Force uploading " +
  132. file.toString() +
  133. " into collection " +
  134. collectionID.toString(),
  135. );
  136. _totalCountInUploadSession++;
  137. // If the file hasn't been queued yet, ez.
  138. if (!_queue.containsKey(file.localID)) {
  139. final completer = Completer<File>();
  140. _queue[file.localID] = FileUploadItem(
  141. file,
  142. collectionID,
  143. completer,
  144. status: UploadStatus.in_progress,
  145. );
  146. _encryptAndUploadFileToCollection(file, collectionID, forcedUpload: true);
  147. return completer.future;
  148. }
  149. var item = _queue[file.localID];
  150. // If the file is being uploaded right now, wait and proceed
  151. if (item.status == UploadStatus.in_progress ||
  152. item.status == UploadStatus.in_background) {
  153. _totalCountInUploadSession--;
  154. final uploadedFile = await item.completer.future;
  155. if (uploadedFile.collectionID == collectionID) {
  156. // Do nothing
  157. } else {
  158. await CollectionsService.instance
  159. .addToCollection(collectionID, [uploadedFile]);
  160. }
  161. return uploadedFile;
  162. } else {
  163. // If the file is yet to be processed,
  164. // 1. Set the status to in_progress
  165. // 2. Force upload the file
  166. // 3. Add to the relevant collection
  167. item = _queue[file.localID];
  168. item.status = UploadStatus.in_progress;
  169. final uploadedFile = await _encryptAndUploadFileToCollection(
  170. file,
  171. collectionID,
  172. forcedUpload: true,
  173. );
  174. if (item.collectionID == collectionID) {
  175. return uploadedFile;
  176. } else {
  177. await CollectionsService.instance
  178. .addToCollection(item.collectionID, [uploadedFile]);
  179. return uploadedFile;
  180. }
  181. }
  182. }
  183. int getCurrentSessionUploadCount() {
  184. return _totalCountInUploadSession;
  185. }
  186. void clearQueue(final Error reason) {
  187. final List<String> uploadsToBeRemoved = [];
  188. _queue.entries
  189. .where((entry) => entry.value.status == UploadStatus.not_started)
  190. .forEach((pendingUpload) {
  191. uploadsToBeRemoved.add(pendingUpload.key);
  192. });
  193. for (final id in uploadsToBeRemoved) {
  194. _queue.remove(id).completer.completeError(reason);
  195. }
  196. _totalCountInUploadSession = 0;
  197. }
  198. void removeFromQueueWhere(final bool Function(File) fn, final Error reason) {
  199. List<String> uploadsToBeRemoved = [];
  200. _queue.entries
  201. .where((entry) => entry.value.status == UploadStatus.not_started)
  202. .forEach((pendingUpload) {
  203. if (fn(pendingUpload.value.file)) {
  204. uploadsToBeRemoved.add(pendingUpload.key);
  205. }
  206. });
  207. for (final id in uploadsToBeRemoved) {
  208. _queue.remove(id).completer.completeError(reason);
  209. }
  210. _totalCountInUploadSession -= uploadsToBeRemoved.length;
  211. }
  212. void _pollQueue() {
  213. if (SyncService.instance.shouldStopSync()) {
  214. clearQueue(SyncStopRequestedError());
  215. }
  216. if (_queue.isEmpty) {
  217. // Upload session completed
  218. _totalCountInUploadSession = 0;
  219. return;
  220. }
  221. if (_uploadCounter < kMaximumConcurrentUploads) {
  222. var pendingEntry = _queue.entries
  223. .firstWhere(
  224. (entry) => entry.value.status == UploadStatus.not_started,
  225. orElse: () => null,
  226. )
  227. ?.value;
  228. if (pendingEntry != null &&
  229. pendingEntry.file.fileType == FileType.video &&
  230. _videoUploadCounter >= kMaximumConcurrentVideoUploads) {
  231. // check if there's any non-video entry which can be queued for upload
  232. pendingEntry = _queue.entries
  233. .firstWhere(
  234. (entry) =>
  235. entry.value.status == UploadStatus.not_started &&
  236. entry.value.file.fileType != FileType.video,
  237. orElse: () => null,
  238. )
  239. ?.value;
  240. }
  241. if (pendingEntry != null) {
  242. pendingEntry.status = UploadStatus.in_progress;
  243. _encryptAndUploadFileToCollection(
  244. pendingEntry.file,
  245. pendingEntry.collectionID,
  246. );
  247. }
  248. }
  249. }
  250. Future<File> _encryptAndUploadFileToCollection(
  251. File file,
  252. int collectionID, {
  253. bool forcedUpload = false,
  254. }) async {
  255. _uploadCounter++;
  256. if (file.fileType == FileType.video) {
  257. _videoUploadCounter++;
  258. }
  259. final localID = file.localID;
  260. try {
  261. final uploadedFile =
  262. await _tryToUpload(file, collectionID, forcedUpload).timeout(
  263. kFileUploadTimeout,
  264. onTimeout: () {
  265. final message = "Upload timed out for file " + file.toString();
  266. _logger.severe(message);
  267. throw TimeoutException(message);
  268. },
  269. );
  270. _queue.remove(localID).completer.complete(uploadedFile);
  271. return uploadedFile;
  272. } catch (e) {
  273. if (e is LockAlreadyAcquiredError) {
  274. _queue[localID].status = UploadStatus.in_background;
  275. return _queue[localID].completer.future;
  276. } else {
  277. _queue.remove(localID).completer.completeError(e);
  278. return null;
  279. }
  280. } finally {
  281. _uploadCounter--;
  282. if (file.fileType == FileType.video) {
  283. _videoUploadCounter--;
  284. }
  285. _pollQueue();
  286. }
  287. }
  288. Future<File> _tryToUpload(
  289. File file,
  290. int collectionID,
  291. bool forcedUpload,
  292. ) async {
  293. final connectivityResult = await (Connectivity().checkConnectivity());
  294. var canUploadUnderCurrentNetworkConditions =
  295. (connectivityResult == ConnectivityResult.wifi ||
  296. Configuration.instance.shouldBackupOverMobileData());
  297. if (!canUploadUnderCurrentNetworkConditions && !forcedUpload) {
  298. throw WiFiUnavailableError();
  299. }
  300. final fileOnDisk = await FilesDB.instance.getFile(file.generatedID);
  301. final wasAlreadyUploaded = fileOnDisk.uploadedFileID != null &&
  302. fileOnDisk.updationTime != -1 &&
  303. fileOnDisk.collectionID == collectionID;
  304. if (wasAlreadyUploaded) {
  305. return fileOnDisk;
  306. }
  307. try {
  308. await _uploadLocks.acquireLock(
  309. file.localID,
  310. _processType.toString(),
  311. DateTime.now().microsecondsSinceEpoch,
  312. );
  313. } catch (e) {
  314. _logger.warning("Lock was already taken for " + file.toString());
  315. throw LockAlreadyAcquiredError();
  316. }
  317. final tempDirectory = Configuration.instance.getTempDirectory();
  318. final encryptedFilePath = tempDirectory +
  319. file.generatedID.toString() +
  320. (_isBackground ? "_bg" : "") +
  321. ".encrypted";
  322. final encryptedThumbnailPath = tempDirectory +
  323. file.generatedID.toString() +
  324. "_thumbnail" +
  325. (_isBackground ? "_bg" : "") +
  326. ".encrypted";
  327. MediaUploadData mediaUploadData;
  328. try {
  329. _logger.info(
  330. "Trying to upload " +
  331. file.toString() +
  332. ", isForced: " +
  333. forcedUpload.toString(),
  334. );
  335. try {
  336. mediaUploadData = await getUploadDataFromEnteFile(file);
  337. } catch (e) {
  338. if (e is InvalidFileError) {
  339. await _onInvalidFileError(file, e);
  340. } else {
  341. rethrow;
  342. }
  343. }
  344. Uint8List key;
  345. bool isUpdatedFile =
  346. file.uploadedFileID != null && file.updationTime == -1;
  347. if (isUpdatedFile) {
  348. _logger.info("File was updated " + file.toString());
  349. key = decryptFileKey(file);
  350. } else {
  351. key = null;
  352. }
  353. if (io.File(encryptedFilePath).existsSync()) {
  354. await io.File(encryptedFilePath).delete();
  355. }
  356. final encryptedFile = io.File(encryptedFilePath);
  357. final fileAttributes = await CryptoUtil.encryptFile(
  358. mediaUploadData.sourceFile.path,
  359. encryptedFilePath,
  360. key: key,
  361. );
  362. var thumbnailData = mediaUploadData.thumbnail;
  363. final encryptedThumbnailData =
  364. await CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key);
  365. if (io.File(encryptedThumbnailPath).existsSync()) {
  366. await io.File(encryptedThumbnailPath).delete();
  367. }
  368. final encryptedThumbnailFile = io.File(encryptedThumbnailPath);
  369. await encryptedThumbnailFile
  370. .writeAsBytes(encryptedThumbnailData.encryptedData);
  371. final thumbnailUploadURL = await _getUploadURL();
  372. String thumbnailObjectKey =
  373. await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
  374. final fileUploadURL = await _getUploadURL();
  375. String fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
  376. final metadata =
  377. await file.getMetadataForUpload(mediaUploadData.sourceFile);
  378. final encryptedMetadataData = await CryptoUtil.encryptChaCha(
  379. utf8.encode(jsonEncode(metadata)),
  380. fileAttributes.key,
  381. );
  382. final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header);
  383. final thumbnailDecryptionHeader =
  384. Sodium.bin2base64(encryptedThumbnailData.header);
  385. final encryptedMetadata =
  386. Sodium.bin2base64(encryptedMetadataData.encryptedData);
  387. final metadataDecryptionHeader =
  388. Sodium.bin2base64(encryptedMetadataData.header);
  389. if (SyncService.instance.shouldStopSync()) {
  390. throw SyncStopRequestedError();
  391. }
  392. File remoteFile;
  393. if (isUpdatedFile) {
  394. remoteFile = await _updateFile(
  395. file,
  396. fileObjectKey,
  397. fileDecryptionHeader,
  398. await encryptedFile.length(),
  399. thumbnailObjectKey,
  400. thumbnailDecryptionHeader,
  401. await encryptedThumbnailFile.length(),
  402. encryptedMetadata,
  403. metadataDecryptionHeader,
  404. );
  405. // Update across all collections
  406. await FilesDB.instance.updateUploadedFileAcrossCollections(remoteFile);
  407. } else {
  408. final encryptedFileKeyData = CryptoUtil.encryptSync(
  409. fileAttributes.key,
  410. CollectionsService.instance.getCollectionKey(collectionID),
  411. );
  412. final encryptedKey =
  413. Sodium.bin2base64(encryptedFileKeyData.encryptedData);
  414. final keyDecryptionNonce =
  415. Sodium.bin2base64(encryptedFileKeyData.nonce);
  416. remoteFile = await _uploadFile(
  417. file,
  418. collectionID,
  419. encryptedKey,
  420. keyDecryptionNonce,
  421. fileAttributes,
  422. fileObjectKey,
  423. fileDecryptionHeader,
  424. await encryptedFile.length(),
  425. thumbnailObjectKey,
  426. thumbnailDecryptionHeader,
  427. await encryptedThumbnailFile.length(),
  428. encryptedMetadata,
  429. metadataDecryptionHeader,
  430. );
  431. if (mediaUploadData.isDeleted) {
  432. _logger.info("File found to be deleted");
  433. remoteFile.localID = null;
  434. }
  435. await FilesDB.instance.update(remoteFile);
  436. }
  437. if (!_isBackground) {
  438. Bus.instance.fire(LocalPhotosUpdatedEvent([remoteFile]));
  439. }
  440. _logger.info("File upload complete for " + remoteFile.toString());
  441. return remoteFile;
  442. } catch (e, s) {
  443. if (!(e is NoActiveSubscriptionError ||
  444. e is StorageLimitExceededError ||
  445. e is WiFiUnavailableError ||
  446. e is SilentlyCancelUploadsError ||
  447. e is FileTooLargeForPlanError)) {
  448. _logger.severe("File upload failed for " + file.toString(), e, s);
  449. }
  450. rethrow;
  451. } finally {
  452. if (io.Platform.isIOS &&
  453. mediaUploadData != null &&
  454. mediaUploadData.sourceFile != null) {
  455. await mediaUploadData.sourceFile.delete();
  456. }
  457. if (io.File(encryptedFilePath).existsSync()) {
  458. await io.File(encryptedFilePath).delete();
  459. }
  460. if (io.File(encryptedThumbnailPath).existsSync()) {
  461. await io.File(encryptedThumbnailPath).delete();
  462. }
  463. await _uploadLocks.releaseLock(file.localID, _processType.toString());
  464. }
  465. }
  466. Future _onInvalidFileError(File file, InvalidFileError e) async {
  467. String ext = file.title == null ? "no title" : extension(file.title);
  468. _logger.severe(
  469. "Invalid file: (ext: $ext) encountered: " + file.toString(),
  470. e,
  471. );
  472. await FilesDB.instance.deleteLocalFile(file);
  473. await LocalSyncService.instance.trackInvalidFile(file);
  474. throw e;
  475. }
  476. Future<File> _uploadFile(
  477. File file,
  478. int collectionID,
  479. String encryptedKey,
  480. String keyDecryptionNonce,
  481. EncryptionResult fileAttributes,
  482. String fileObjectKey,
  483. String fileDecryptionHeader,
  484. int fileSize,
  485. String thumbnailObjectKey,
  486. String thumbnailDecryptionHeader,
  487. int thumbnailSize,
  488. String encryptedMetadata,
  489. String metadataDecryptionHeader, {
  490. int attempt = 1,
  491. }) async {
  492. final request = {
  493. "collectionID": collectionID,
  494. "encryptedKey": encryptedKey,
  495. "keyDecryptionNonce": keyDecryptionNonce,
  496. "file": {
  497. "objectKey": fileObjectKey,
  498. "decryptionHeader": fileDecryptionHeader,
  499. "size": fileSize,
  500. },
  501. "thumbnail": {
  502. "objectKey": thumbnailObjectKey,
  503. "decryptionHeader": thumbnailDecryptionHeader,
  504. "size": thumbnailSize,
  505. },
  506. "metadata": {
  507. "encryptedData": encryptedMetadata,
  508. "decryptionHeader": metadataDecryptionHeader,
  509. }
  510. };
  511. try {
  512. final response = await _dio.post(
  513. Configuration.instance.getHttpEndpoint() + "/files",
  514. options: Options(
  515. headers: {"X-Auth-Token": Configuration.instance.getToken()},
  516. ),
  517. data: request,
  518. );
  519. final data = response.data;
  520. file.uploadedFileID = data["id"];
  521. file.collectionID = collectionID;
  522. file.updationTime = data["updationTime"];
  523. file.ownerID = data["ownerID"];
  524. file.encryptedKey = encryptedKey;
  525. file.keyDecryptionNonce = keyDecryptionNonce;
  526. file.fileDecryptionHeader = fileDecryptionHeader;
  527. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  528. file.metadataDecryptionHeader = metadataDecryptionHeader;
  529. return file;
  530. } on DioError catch (e) {
  531. if (e.response?.statusCode == 413) {
  532. throw FileTooLargeForPlanError();
  533. } else if (e.response?.statusCode == 426) {
  534. _onStorageLimitExceeded();
  535. } else if (attempt < kMaximumUploadAttempts) {
  536. _logger.info("Upload file failed, will retry in 3 seconds");
  537. await Future.delayed(Duration(seconds: 3));
  538. return _uploadFile(
  539. file,
  540. collectionID,
  541. encryptedKey,
  542. keyDecryptionNonce,
  543. fileAttributes,
  544. fileObjectKey,
  545. fileDecryptionHeader,
  546. fileSize,
  547. thumbnailObjectKey,
  548. thumbnailDecryptionHeader,
  549. thumbnailSize,
  550. encryptedMetadata,
  551. metadataDecryptionHeader,
  552. attempt: attempt + 1,
  553. );
  554. }
  555. rethrow;
  556. }
  557. }
  558. Future<File> _updateFile(
  559. File file,
  560. String fileObjectKey,
  561. String fileDecryptionHeader,
  562. int fileSize,
  563. String thumbnailObjectKey,
  564. String thumbnailDecryptionHeader,
  565. int thumbnailSize,
  566. String encryptedMetadata,
  567. String metadataDecryptionHeader, {
  568. int attempt = 1,
  569. }) async {
  570. final request = {
  571. "id": file.uploadedFileID,
  572. "file": {
  573. "objectKey": fileObjectKey,
  574. "decryptionHeader": fileDecryptionHeader,
  575. "size": fileSize,
  576. },
  577. "thumbnail": {
  578. "objectKey": thumbnailObjectKey,
  579. "decryptionHeader": thumbnailDecryptionHeader,
  580. "size": thumbnailSize,
  581. },
  582. "metadata": {
  583. "encryptedData": encryptedMetadata,
  584. "decryptionHeader": metadataDecryptionHeader,
  585. }
  586. };
  587. try {
  588. final response = await _dio.put(
  589. Configuration.instance.getHttpEndpoint() + "/files/update",
  590. options: Options(
  591. headers: {"X-Auth-Token": Configuration.instance.getToken()},
  592. ),
  593. data: request,
  594. );
  595. final data = response.data;
  596. file.uploadedFileID = data["id"];
  597. file.updationTime = data["updationTime"];
  598. file.fileDecryptionHeader = fileDecryptionHeader;
  599. file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
  600. file.metadataDecryptionHeader = metadataDecryptionHeader;
  601. return file;
  602. } on DioError catch (e) {
  603. if (e.response?.statusCode == 426) {
  604. _onStorageLimitExceeded();
  605. } else if (attempt < kMaximumUploadAttempts) {
  606. _logger.info("Update file failed, will retry in 3 seconds");
  607. await Future.delayed(Duration(seconds: 3));
  608. return _updateFile(
  609. file,
  610. fileObjectKey,
  611. fileDecryptionHeader,
  612. fileSize,
  613. thumbnailObjectKey,
  614. thumbnailDecryptionHeader,
  615. thumbnailSize,
  616. encryptedMetadata,
  617. metadataDecryptionHeader,
  618. attempt: attempt + 1,
  619. );
  620. }
  621. rethrow;
  622. }
  623. }
  624. Future<UploadURL> _getUploadURL() async {
  625. if (_uploadURLs.isEmpty) {
  626. await fetchUploadURLs(_queue.length);
  627. }
  628. try {
  629. return _uploadURLs.removeFirst();
  630. } catch (e, s) {
  631. if (e is StateError && e.message == 'No element' && _queue.isNotEmpty) {
  632. _logger.warning("Oops, uploadUrls has no element now, fetching again");
  633. return _getUploadURL();
  634. } else {
  635. rethrow;
  636. }
  637. }
  638. }
  639. Future<void> _uploadURLFetchInProgress;
  640. Future<void> fetchUploadURLs(int fileCount) async {
  641. _uploadURLFetchInProgress ??= Future<void>(() async {
  642. try {
  643. final response = await _dio.get(
  644. Configuration.instance.getHttpEndpoint() + "/files/upload-urls",
  645. queryParameters: {
  646. "count": min(42, fileCount * 2), // m4gic number
  647. },
  648. options: Options(
  649. headers: {"X-Auth-Token": Configuration.instance.getToken()},
  650. ),
  651. );
  652. final urls = (response.data["urls"] as List)
  653. .map((e) => UploadURL.fromMap(e))
  654. .toList();
  655. _uploadURLs.addAll(urls);
  656. } on DioError catch (e, s) {
  657. if (e.response != null) {
  658. if (e.response.statusCode == 402) {
  659. final error = NoActiveSubscriptionError();
  660. clearQueue(error);
  661. throw error;
  662. } else if (e.response.statusCode == 426) {
  663. final error = StorageLimitExceededError();
  664. clearQueue(error);
  665. throw error;
  666. } else {
  667. _logger.severe("Could not fetch upload URLs", e, s);
  668. }
  669. }
  670. rethrow;
  671. } finally {
  672. _uploadURLFetchInProgress = null;
  673. }
  674. });
  675. return _uploadURLFetchInProgress;
  676. }
  677. void _onStorageLimitExceeded() {
  678. clearQueue(StorageLimitExceededError());
  679. throw StorageLimitExceededError();
  680. }
  681. Future<String> _putFile(
  682. UploadURL uploadURL,
  683. io.File file, {
  684. int contentLength,
  685. int attempt = 1,
  686. }) async {
  687. final fileSize = contentLength ?? await file.length();
  688. _logger.info(
  689. "Putting object for " +
  690. file.toString() +
  691. " of size: " +
  692. fileSize.toString(),
  693. );
  694. final startTime = DateTime.now().millisecondsSinceEpoch;
  695. try {
  696. await _dio.put(
  697. uploadURL.url,
  698. data: file.openRead(),
  699. options: Options(
  700. headers: {
  701. Headers.contentLengthHeader: fileSize,
  702. },
  703. ),
  704. );
  705. _logger.info(
  706. "Upload speed : " +
  707. (fileSize / (DateTime.now().millisecondsSinceEpoch - startTime))
  708. .toString() +
  709. " kilo bytes per second",
  710. );
  711. return uploadURL.objectKey;
  712. } on DioError catch (e) {
  713. if (e.message.startsWith(
  714. "HttpException: Content size exceeds specified contentLength.",
  715. ) &&
  716. attempt == 1) {
  717. return _putFile(
  718. uploadURL,
  719. file,
  720. contentLength: (await file.readAsBytes()).length,
  721. attempt: 2,
  722. );
  723. } else if (attempt < kMaximumUploadAttempts) {
  724. final newUploadURL = await _getUploadURL();
  725. return _putFile(
  726. newUploadURL,
  727. file,
  728. contentLength: (await file.readAsBytes()).length,
  729. attempt: attempt + 1,
  730. );
  731. } else {
  732. _logger.info(
  733. "Upload failed for file with size " + fileSize.toString(),
  734. e,
  735. );
  736. rethrow;
  737. }
  738. }
  739. }
  740. Future<void> _pollBackgroundUploadStatus() async {
  741. final blockedUploads = _queue.entries
  742. .where((e) => e.value.status == UploadStatus.in_background)
  743. .toList();
  744. for (final upload in blockedUploads) {
  745. final file = upload.value.file;
  746. final isStillLocked = await _uploadLocks.isLocked(
  747. file.localID,
  748. ProcessType.background.toString(),
  749. );
  750. if (!isStillLocked) {
  751. final completer = _queue.remove(upload.key).completer;
  752. final dbFile =
  753. await FilesDB.instance.getFile(upload.value.file.generatedID);
  754. if (dbFile.uploadedFileID != null) {
  755. _logger.info("Background upload success detected");
  756. completer.complete(dbFile);
  757. } else {
  758. _logger.info("Background upload failure detected");
  759. completer.completeError(SilentlyCancelUploadsError());
  760. }
  761. }
  762. }
  763. Future.delayed(kBlockedUploadsPollFrequency, () async {
  764. await _pollBackgroundUploadStatus();
  765. });
  766. }
  767. }
  768. class FileUploadItem {
  769. final File file;
  770. final int collectionID;
  771. final Completer<File> completer;
  772. UploadStatus status;
  773. FileUploadItem(
  774. this.file,
  775. this.collectionID,
  776. this.completer, {
  777. this.status = UploadStatus.not_started,
  778. });
  779. }
  780. enum UploadStatus {
  781. not_started,
  782. in_progress,
  783. in_background,
  784. completed,
  785. }
  786. enum ProcessType {
  787. background,
  788. foreground,
  789. }