Jelajahi Sumber

[FEAT] Resumable Multipart Upload (#1406)

## Description


## Test Cases
Resumed Uploads

- [x] Verify that uploads are resuming
- [x] Verify that on resumption, parts that are already uploaded, we are
not re-uploading them.
- [x] Verify that we are able to download file from another client after
resuming
- [x] Verify that if `multiUpartUploadSize` changes, existing inflight
upload is either discarded or it's continue using old size.
- [x] Verify that if file is modified during upload, then on resumption
the existing upload is discarded
Neeraj Gupta 1 tahun lalu
induk
melakukan
2b58568230

+ 37 - 16
mobile/lib/core/configuration.dart

@@ -72,8 +72,6 @@ class Configuration {
   static const anonymousUserIDKey = "anonymous_user_id";
   static const endPointKey = "endpoint";
 
-  final kTempFolderDeletionTimeBuffer = const Duration(hours: 6).inMicroseconds;
-
   static final _logger = Logger("Configuration");
 
   String? _cachedToken;
@@ -103,20 +101,7 @@ class Configuration {
     _documentsDirectory = (await getApplicationDocumentsDirectory()).path;
     _tempDocumentsDirPath = _documentsDirectory + "/temp/";
     final tempDocumentsDir = Directory(_tempDocumentsDirPath);
-    try {
-      final currentTime = DateTime.now().microsecondsSinceEpoch;
-      if (tempDocumentsDir.existsSync() &&
-          (_preferences.getInt(lastTempFolderClearTimeKey) ?? 0) <
-              (currentTime - kTempFolderDeletionTimeBuffer)) {
-        await tempDocumentsDir.delete(recursive: true);
-        await _preferences.setInt(lastTempFolderClearTimeKey, currentTime);
-        _logger.info("Cleared temp folder");
-      } else {
-        _logger.info("Skipping temp folder clear");
-      }
-    } catch (e) {
-      _logger.warning(e);
-    }
+    await _cleanUpStaleFiles(tempDocumentsDir);
     tempDocumentsDir.createSync(recursive: true);
     final tempDirectoryPath = (await getTemporaryDirectory()).path;
     _thumbnailCacheDirectory = tempDirectoryPath + "/thumbnail-cache";
@@ -144,6 +129,42 @@ class Configuration {
     SuperLogging.setUserID(await _getOrCreateAnonymousUserID()).ignore();
   }
 
+  // _cleanUpStaleFiles deletes all files in the temp directory that are older
+  // than kTempFolderDeletionTimeBuffer except the the temp encrypted files for upload.
+  // Those file are deleted by file uploader after the upload is complete or those
+  // files are not being used / tracked.
+  Future<void> _cleanUpStaleFiles(Directory tempDocumentsDir) async {
+    try {
+      final currentTime = DateTime.now().microsecondsSinceEpoch;
+      if (tempDocumentsDir.existsSync() &&
+          (_preferences.getInt(lastTempFolderClearTimeKey) ?? 0) <
+              (currentTime - tempDirCleanUpInterval)) {
+        int skippedTempUploadFiles = 0;
+        final files = tempDocumentsDir.listSync();
+        for (final file in files) {
+          if (file is File) {
+            if (file.path.contains(uploadTempFilePrefix)) {
+              skippedTempUploadFiles++;
+              continue;
+            }
+            _logger.info("Deleting file: ${file.path}");
+            await file.delete();
+          } else if (file is Directory) {
+            await file.delete(recursive: true);
+          }
+        }
+        await _preferences.setInt(lastTempFolderClearTimeKey, currentTime);
+        _logger.info(
+          "Cleared temp folder except $skippedTempUploadFiles upload files",
+        );
+      } else {
+        _logger.info("Skipping temp folder clear");
+      }
+    } catch (e) {
+      _logger.warning(e);
+    }
+  }
+
   Future<void> logout({bool autoLogout = false}) async {
     if (SyncService.instance.isSyncInProgress()) {
       SyncService.instance.stopSync();

+ 8 - 0
mobile/lib/core/constants.dart

@@ -1,3 +1,5 @@
+import "package:flutter/foundation.dart";
+
 const int thumbnailSmallSize = 256;
 const int thumbnailQuality = 50;
 const int thumbnailLargeSize = 512;
@@ -41,6 +43,7 @@ const supportEmail = 'support@ente.io';
 
 // this is the chunk size of the un-encrypted file which is read and encrypted before uploading it as a single part.
 const multipartPartSize = 20 * 1024 * 1024;
+const multipartPartSizeInternal = 8 * 1024 * 1024;
 
 const kDefaultProductionEndpoint = 'https://api.ente.io';
 
@@ -95,3 +98,8 @@ const blackThumbnailBase64 = '/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAEBAQEBAQEB'
     'KACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAo' +
     'AKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAo' +
     'AKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgAoAKACgD/9k=';
+
+const uploadTempFilePrefix = "upload_file_";
+final tempDirCleanUpInterval = kDebugMode
+    ? const Duration(seconds: 30).inMicroseconds
+    : const Duration(hours: 6).inMicroseconds;

+ 356 - 30
mobile/lib/db/upload_locks_db.dart

@@ -3,16 +3,60 @@ import 'dart:io';
 
 import 'package:path/path.dart';
 import 'package:path_provider/path_provider.dart';
+import "package:photos/module/upload/model/multipart.dart";
 import 'package:sqflite/sqflite.dart';
+import "package:sqflite_migration/sqflite_migration.dart";
 
 class UploadLocksDB {
   static const _databaseName = "ente.upload_locks.db";
-  static const _databaseVersion = 1;
 
-  static const _table = "upload_locks";
-  static const _columnID = "id";
-  static const _columnOwner = "owner";
-  static const _columnTime = "time";
+  static const _uploadLocksTable = (
+    table: "upload_locks",
+    columnID: "id",
+    columnOwner: "owner",
+    columnTime: "time",
+  );
+
+  static const _trackUploadTable = (
+    table: "track_uploads",
+    columnID: "id",
+    columnLocalID: "local_id",
+    columnFileHash: "file_hash",
+    columnCollectionID: "collection_id",
+    columnEncryptedFileName: "encrypted_file_name",
+    columnEncryptedFileSize: "encrypted_file_size",
+    columnEncryptedFileKey: "encrypted_file_key",
+    columnFileEncryptionNonce: "file_encryption_nonce",
+    columnKeyEncryptionNonce: "key_encryption_nonce",
+    columnObjectKey: "object_key",
+    columnCompleteUrl: "complete_url",
+    columnStatus: "status",
+    columnPartSize: "part_size",
+    columnLastAttemptedAt: "last_attempted_at",
+    columnCreatedAt: "created_at",
+  );
+
+  static const _partsTable = (
+    table: "upload_parts",
+    columnObjectKey: "object_key",
+    columnPartNumber: "part_number",
+    columnPartUrl: "part_url",
+    columnPartETag: "part_etag",
+    columnPartStatus: "part_status",
+  );
+
+  static final initializationScript = [
+    ..._createUploadLocksTable(),
+  ];
+
+  static final migrationScripts = [
+    ..._createTrackUploadsTable(),
+  ];
+
+  final dbConfig = MigrationConfig(
+    initializationScript: initializationScript,
+    migrationScripts: migrationScripts,
+  );
 
   UploadLocksDB._privateConstructor();
   static final UploadLocksDB instance = UploadLocksDB._privateConstructor();
@@ -27,44 +71,82 @@ class UploadLocksDB {
     final Directory documentsDirectory =
         await getApplicationDocumentsDirectory();
     final String path = join(documentsDirectory.path, _databaseName);
-    return await openDatabase(
-      path,
-      version: _databaseVersion,
-      onCreate: _onCreate,
-    );
+
+    return await openDatabaseWithMigration(path, dbConfig);
   }
 
-  Future _onCreate(Database db, int version) async {
-    await db.execute(
+  static List<String> _createUploadLocksTable() {
+    return [
       '''
-                CREATE TABLE $_table (
-                  $_columnID TEXT PRIMARY KEY NOT NULL,
-                  $_columnOwner TEXT NOT NULL,
-                  $_columnTime TEXT NOT NULL
+                CREATE TABLE ${_uploadLocksTable.table} (
+                  ${_uploadLocksTable.columnID} TEXT PRIMARY KEY NOT NULL,
+                  ${_uploadLocksTable.columnOwner} TEXT NOT NULL,
+                 ${_uploadLocksTable.columnTime} TEXT NOT NULL
                 )
                 ''',
-    );
+    ];
+  }
+
+  static List<String> _createTrackUploadsTable() {
+    return [
+      '''
+                CREATE TABLE ${_trackUploadTable.table} (
+                  ${_trackUploadTable.columnID} INTEGER PRIMARY KEY,
+                  ${_trackUploadTable.columnLocalID} TEXT NOT NULL,
+                  ${_trackUploadTable.columnFileHash} TEXT NOT NULL,
+                  ${_trackUploadTable.columnCollectionID} INTEGER NOT NULL,
+                  ${_trackUploadTable.columnEncryptedFileName} TEXT NOT NULL,
+                  ${_trackUploadTable.columnEncryptedFileSize} INTEGER NOT NULL,
+                  ${_trackUploadTable.columnEncryptedFileKey} TEXT NOT NULL,
+                  ${_trackUploadTable.columnFileEncryptionNonce} TEXT NOT NULL,
+                  ${_trackUploadTable.columnKeyEncryptionNonce} TEXT NOT NULL,
+                  ${_trackUploadTable.columnObjectKey} TEXT NOT NULL,
+                  ${_trackUploadTable.columnCompleteUrl} TEXT NOT NULL,
+                  ${_trackUploadTable.columnStatus} TEXT DEFAULT '${MultipartStatus.pending.name}' NOT NULL,
+                  ${_trackUploadTable.columnPartSize} INTEGER NOT NULL,
+                  ${_trackUploadTable.columnLastAttemptedAt} INTEGER NOT NULL,
+                  ${_trackUploadTable.columnCreatedAt} INTEGER DEFAULT CURRENT_TIMESTAMP NOT NULL
+                )
+                ''',
+      '''
+                CREATE TABLE ${_partsTable.table} (
+                  ${_partsTable.columnObjectKey} TEXT NOT NULL REFERENCES ${_trackUploadTable.table}(${_trackUploadTable.columnObjectKey}) ON DELETE CASCADE,
+                  ${_partsTable.columnPartNumber} INTEGER NOT NULL,
+                  ${_partsTable.columnPartUrl} TEXT NOT NULL,
+                  ${_partsTable.columnPartETag} TEXT,
+                  ${_partsTable.columnPartStatus} TEXT NOT NULL,
+                  PRIMARY KEY (${_partsTable.columnObjectKey}, ${_partsTable.columnPartNumber})
+                )
+                ''',
+    ];
   }
 
   Future<void> clearTable() async {
     final db = await instance.database;
-    await db.delete(_table);
+    await db.delete(_uploadLocksTable.table);
+    await db.delete(_trackUploadTable.table);
+    await db.delete(_partsTable.table);
   }
 
   Future<void> acquireLock(String id, String owner, int time) async {
     final db = await instance.database;
     final row = <String, dynamic>{};
-    row[_columnID] = id;
-    row[_columnOwner] = owner;
-    row[_columnTime] = time;
-    await db.insert(_table, row, conflictAlgorithm: ConflictAlgorithm.fail);
+    row[_uploadLocksTable.columnID] = id;
+    row[_uploadLocksTable.columnOwner] = owner;
+    row[_uploadLocksTable.columnTime] = time;
+    await db.insert(
+      _uploadLocksTable.table,
+      row,
+      conflictAlgorithm: ConflictAlgorithm.fail,
+    );
   }
 
   Future<bool> isLocked(String id, String owner) async {
     final db = await instance.database;
     final rows = await db.query(
-      _table,
-      where: '$_columnID = ? AND $_columnOwner = ?',
+      _uploadLocksTable.table,
+      where:
+          '${_uploadLocksTable.columnID} = ? AND ${_uploadLocksTable.columnOwner} = ?',
       whereArgs: [id, owner],
     );
     return rows.length == 1;
@@ -73,8 +155,9 @@ class UploadLocksDB {
   Future<int> releaseLock(String id, String owner) async {
     final db = await instance.database;
     return db.delete(
-      _table,
-      where: '$_columnID = ? AND $_columnOwner = ?',
+      _uploadLocksTable.table,
+      where:
+          '${_uploadLocksTable.columnID} = ? AND ${_uploadLocksTable.columnOwner} = ?',
       whereArgs: [id, owner],
     );
   }
@@ -82,8 +165,9 @@ class UploadLocksDB {
   Future<int> releaseLocksAcquiredByOwnerBefore(String owner, int time) async {
     final db = await instance.database;
     return db.delete(
-      _table,
-      where: '$_columnOwner = ? AND $_columnTime < ?',
+      _uploadLocksTable.table,
+      where:
+          '${_uploadLocksTable.columnOwner} = ? AND ${_uploadLocksTable.columnTime} < ?',
       whereArgs: [owner, time],
     );
   }
@@ -91,9 +175,251 @@ class UploadLocksDB {
   Future<int> releaseAllLocksAcquiredBefore(int time) async {
     final db = await instance.database;
     return db.delete(
-      _table,
-      where: '$_columnTime < ?',
+      _uploadLocksTable.table,
+      where: '${_uploadLocksTable.columnTime} < ?',
       whereArgs: [time],
     );
   }
+
+  Future<({String encryptedFileKey, String fileNonce, String keyNonce})>
+      getFileEncryptionData(
+    String localId,
+    String fileHash,
+    int collectionID,
+  ) async {
+    final db = await instance.database;
+
+    final rows = await db.query(
+      _trackUploadTable.table,
+      where: '${_trackUploadTable.columnLocalID} = ?'
+          ' AND ${_trackUploadTable.columnFileHash} = ?'
+          ' AND ${_trackUploadTable.columnCollectionID} = ?',
+      whereArgs: [localId, fileHash, collectionID],
+    );
+
+    if (rows.isEmpty) {
+      throw Exception("No cached links found for $localId and $fileHash");
+    }
+    final row = rows.first;
+
+    return (
+      encryptedFileKey: row[_trackUploadTable.columnEncryptedFileKey] as String,
+      fileNonce: row[_trackUploadTable.columnFileEncryptionNonce] as String,
+      keyNonce: row[_trackUploadTable.columnKeyEncryptionNonce] as String,
+    );
+  }
+
+  Future<void> updateLastAttempted(
+    String localId,
+    String fileHash,
+    int collectionID,
+  ) async {
+    final db = await instance.database;
+    await db.update(
+      _trackUploadTable.table,
+      {
+        _trackUploadTable.columnLastAttemptedAt:
+            DateTime.now().millisecondsSinceEpoch,
+      },
+      where: '${_trackUploadTable.columnLocalID} = ?'
+          ' AND ${_trackUploadTable.columnFileHash} = ?'
+          ' AND ${_trackUploadTable.columnCollectionID} = ?',
+      whereArgs: [
+        localId,
+        fileHash,
+        collectionID,
+      ],
+    );
+  }
+
+  Future<MultipartInfo> getCachedLinks(
+    String localId,
+    String fileHash,
+    int collectionID,
+  ) async {
+    final db = await instance.database;
+    final rows = await db.query(
+      _trackUploadTable.table,
+      where: '${_trackUploadTable.columnLocalID} = ?'
+          ' AND ${_trackUploadTable.columnFileHash} = ?'
+          ' AND ${_trackUploadTable.columnCollectionID} = ?',
+      whereArgs: [localId, fileHash, collectionID],
+    );
+    if (rows.isEmpty) {
+      throw Exception("No cached links found for $localId and $fileHash");
+    }
+    final row = rows.first;
+    final objectKey = row[_trackUploadTable.columnObjectKey] as String;
+    final partsStatus = await db.query(
+      _partsTable.table,
+      where: '${_partsTable.columnObjectKey} = ?',
+      whereArgs: [objectKey],
+    );
+
+    final List<bool> partUploadStatus = [];
+    final List<String> partsURLs = List.generate(
+      partsStatus.length,
+      (index) => "",
+    );
+    final Map<int, String> partETags = {};
+
+    for (final part in partsStatus) {
+      final partNumber = part[_partsTable.columnPartNumber] as int;
+      final partUrl = part[_partsTable.columnPartUrl] as String;
+      final partStatus = part[_partsTable.columnPartStatus] as String;
+      partsURLs[partNumber] = partUrl;
+      if (part[_partsTable.columnPartETag] != null) {
+        partETags[partNumber] = part[_partsTable.columnPartETag] as String;
+      }
+      partUploadStatus.add(partStatus == "uploaded");
+    }
+    final urls = MultipartUploadURLs(
+      objectKey: objectKey,
+      completeURL: row[_trackUploadTable.columnCompleteUrl] as String,
+      partsURLs: partsURLs,
+    );
+
+    return MultipartInfo(
+      urls: urls,
+      status: MultipartStatus.values
+          .byName(row[_trackUploadTable.columnStatus] as String),
+      partUploadStatus: partUploadStatus,
+      partETags: partETags,
+      partSize: row[_trackUploadTable.columnPartSize] as int,
+    );
+  }
+
+  Future<void> createTrackUploadsEntry(
+    String localId,
+    String fileHash,
+    int collectionID,
+    MultipartUploadURLs urls,
+    String encryptedFileName,
+    int fileSize,
+    String fileKey,
+    String fileNonce,
+    String keyNonce, {
+    required int partSize,
+  }) async {
+    final db = await UploadLocksDB.instance.database;
+    final objectKey = urls.objectKey;
+
+    await db.insert(
+      _trackUploadTable.table,
+      {
+        _trackUploadTable.columnLocalID: localId,
+        _trackUploadTable.columnFileHash: fileHash,
+        _trackUploadTable.columnCollectionID: collectionID,
+        _trackUploadTable.columnObjectKey: objectKey,
+        _trackUploadTable.columnCompleteUrl: urls.completeURL,
+        _trackUploadTable.columnEncryptedFileName: encryptedFileName,
+        _trackUploadTable.columnEncryptedFileSize: fileSize,
+        _trackUploadTable.columnEncryptedFileKey: fileKey,
+        _trackUploadTable.columnFileEncryptionNonce: fileNonce,
+        _trackUploadTable.columnKeyEncryptionNonce: keyNonce,
+        _trackUploadTable.columnPartSize: partSize,
+        _trackUploadTable.columnLastAttemptedAt:
+            DateTime.now().millisecondsSinceEpoch,
+      },
+    );
+
+    final partsURLs = urls.partsURLs;
+    final partsLength = partsURLs.length;
+
+    for (int i = 0; i < partsLength; i++) {
+      await db.insert(
+        _partsTable.table,
+        {
+          _partsTable.columnObjectKey: objectKey,
+          _partsTable.columnPartNumber: i,
+          _partsTable.columnPartUrl: partsURLs[i],
+          _partsTable.columnPartStatus: PartStatus.pending.name,
+        },
+      );
+    }
+  }
+
+  Future<void> updatePartStatus(
+    String objectKey,
+    int partNumber,
+    String etag,
+  ) async {
+    final db = await instance.database;
+    await db.update(
+      _partsTable.table,
+      {
+        _partsTable.columnPartStatus: PartStatus.uploaded.name,
+        _partsTable.columnPartETag: etag,
+      },
+      where:
+          '${_partsTable.columnObjectKey} = ? AND ${_partsTable.columnPartNumber} = ?',
+      whereArgs: [objectKey, partNumber],
+    );
+  }
+
+  Future<void> updateTrackUploadStatus(
+    String objectKey,
+    MultipartStatus status,
+  ) async {
+    final db = await instance.database;
+    await db.update(
+      _trackUploadTable.table,
+      {
+        _trackUploadTable.columnStatus: status.name,
+      },
+      where: '${_trackUploadTable.columnObjectKey} = ?',
+      whereArgs: [objectKey],
+    );
+  }
+
+  Future<int> deleteMultipartTrack(
+    String localId,
+  ) async {
+    final db = await instance.database;
+    return await db.delete(
+      _trackUploadTable.table,
+      where: '${_trackUploadTable.columnLocalID} = ?',
+      whereArgs: [localId],
+    );
+  }
+
+  // getFileNameToLastAttemptedAtMap returns a map of encrypted file name to last attempted at time
+  Future<Map<String, int>> getFileNameToLastAttemptedAtMap() {
+    return instance.database.then((db) async {
+      final rows = await db.query(
+        _trackUploadTable.table,
+        columns: [
+          _trackUploadTable.columnEncryptedFileName,
+          _trackUploadTable.columnLastAttemptedAt,
+        ],
+      );
+      final map = <String, int>{};
+      for (final row in rows) {
+        map[row[_trackUploadTable.columnEncryptedFileName] as String] =
+            row[_trackUploadTable.columnLastAttemptedAt] as int;
+      }
+      return map;
+    });
+  }
+
+  Future<String?> getEncryptedFileName(
+    String localId,
+    String fileHash,
+    int collectionID,
+  ) {
+    return instance.database.then((db) async {
+      final rows = await db.query(
+        _trackUploadTable.table,
+        where: '${_trackUploadTable.columnLocalID} = ?'
+            ' AND ${_trackUploadTable.columnFileHash} = ?'
+            ' AND ${_trackUploadTable.columnCollectionID} = ?',
+        whereArgs: [localId, fileHash, collectionID],
+      );
+      if (rows.isEmpty) {
+        return null;
+      }
+      final row = rows.first;
+      return row[_trackUploadTable.columnEncryptedFileName] as String;
+    });
+  }
 }

+ 66 - 0
mobile/lib/module/upload/model/multipart.dart

@@ -0,0 +1,66 @@
+import "package:photos/module/upload/model/xml.dart";
+
+class PartETag extends XmlParsableObject {
+  final int partNumber;
+  final String eTag;
+
+  PartETag(this.partNumber, this.eTag);
+
+  @override
+  String get elementName => "Part";
+
+  @override
+  Map<String, dynamic> toMap() {
+    return {
+      "PartNumber": partNumber,
+      "ETag": eTag,
+    };
+  }
+}
+
+enum MultipartStatus {
+  pending,
+  uploaded,
+  completed,
+}
+
+enum PartStatus {
+  pending,
+  uploaded,
+}
+
+class MultipartInfo {
+  final List<bool>? partUploadStatus;
+  final Map<int, String>? partETags;
+  final int? partSize;
+  final MultipartUploadURLs urls;
+  final MultipartStatus status;
+
+  MultipartInfo({
+    this.partUploadStatus,
+    this.partETags,
+    this.partSize,
+    this.status = MultipartStatus.pending,
+    required this.urls,
+  });
+}
+
+class MultipartUploadURLs {
+  final String objectKey;
+  final List<String> partsURLs;
+  final String completeURL;
+
+  MultipartUploadURLs({
+    required this.objectKey,
+    required this.partsURLs,
+    required this.completeURL,
+  });
+
+  factory MultipartUploadURLs.fromMap(Map<String, dynamic> map) {
+    return MultipartUploadURLs(
+      objectKey: map["urls"]["objectKey"],
+      partsURLs: (map["urls"]["partURLs"] as List).cast<String>(),
+      completeURL: map["urls"]["completeURL"],
+    );
+  }
+}

+ 41 - 0
mobile/lib/module/upload/model/xml.dart

@@ -0,0 +1,41 @@
+// ignore_for_file: implementation_imports
+
+import "package:xml/xml.dart";
+
+// used for classes that can be converted to xml
+abstract class XmlParsableObject {
+  Map<String, dynamic> toMap();
+  String get elementName;
+}
+
+// for converting the response to xml
+String convertJs2Xml(Map<String, dynamic> json) {
+  final builder = XmlBuilder();
+  buildXml(builder, json);
+  return builder.buildDocument().toXmlString(
+        pretty: true,
+        indent: '    ',
+      );
+}
+
+// for building the xml node tree recursively
+void buildXml(XmlBuilder builder, dynamic node) {
+  if (node is Map<String, dynamic>) {
+    node.forEach((key, value) {
+      builder.element(key, nest: () => buildXml(builder, value));
+    });
+  } else if (node is List<dynamic>) {
+    for (var item in node) {
+      buildXml(builder, item);
+    }
+  } else if (node is XmlParsableObject) {
+    builder.element(
+      node.elementName,
+      nest: () {
+        buildXml(builder, node.toMap());
+      },
+    );
+  } else {
+    builder.text(node.toString());
+  }
+}

+ 266 - 0
mobile/lib/module/upload/service/multipart.dart

@@ -0,0 +1,266 @@
+import "dart:io";
+
+import "package:dio/dio.dart";
+import "package:ente_feature_flag/ente_feature_flag.dart";
+import "package:flutter/foundation.dart";
+import "package:logging/logging.dart";
+import "package:photos/core/constants.dart";
+import "package:photos/db/upload_locks_db.dart";
+import "package:photos/models/encryption_result.dart";
+import "package:photos/module/upload/model/multipart.dart";
+import "package:photos/module/upload/model/xml.dart";
+import "package:photos/services/collections_service.dart";
+import "package:photos/utils/crypto_util.dart";
+
+class MultiPartUploader {
+  final Dio _enteDio;
+  final Dio _s3Dio;
+  final UploadLocksDB _db;
+  final FlagService _featureFlagService;
+  late final Logger _logger = Logger("MultiPartUploader");
+
+  MultiPartUploader(
+    this._enteDio,
+    this._s3Dio,
+    this._db,
+    this._featureFlagService,
+  );
+
+  Future<EncryptionResult> getEncryptionResult(
+    String localId,
+    String fileHash,
+    int collectionID,
+  ) async {
+    final collectionKey =
+        CollectionsService.instance.getCollectionKey(collectionID);
+    final result =
+        await _db.getFileEncryptionData(localId, fileHash, collectionID);
+    final encryptedFileKey = CryptoUtil.base642bin(result.encryptedFileKey);
+    final fileNonce = CryptoUtil.base642bin(result.fileNonce);
+
+    final encryptKeyNonce = CryptoUtil.base642bin(result.keyNonce);
+
+    return EncryptionResult(
+      key: CryptoUtil.decryptSync(
+        encryptedFileKey,
+        collectionKey,
+        encryptKeyNonce,
+      ),
+      header: fileNonce,
+    );
+  }
+
+  int get multipartPartSizeForUpload {
+    if (_featureFlagService.internalUser) {
+      return multipartPartSizeInternal;
+    }
+    return multipartPartSize;
+  }
+
+  Future<int> calculatePartCount(int fileSize) async {
+    // Multipart upload is only enabled for internal users
+    // and debug builds till it's battle tested.
+    if (!_featureFlagService.internalUser) return 1;
+
+    final partCount = (fileSize / multipartPartSizeForUpload).ceil();
+    return partCount;
+  }
+
+  Future<MultipartUploadURLs> getMultipartUploadURLs(int count) async {
+    try {
+      assert(
+        _featureFlagService.internalUser,
+        "Multipart upload should not be enabled for external users.",
+      );
+      final response = await _enteDio.get(
+        "/files/multipart-upload-urls",
+        queryParameters: {
+          "count": count,
+        },
+      );
+
+      return MultipartUploadURLs.fromMap(response.data);
+    } on Exception catch (e) {
+      _logger.severe('failed to get multipart url', e);
+      rethrow;
+    }
+  }
+
+  Future<void> createTableEntry(
+    String localId,
+    String fileHash,
+    int collectionID,
+    MultipartUploadURLs urls,
+    String encryptedFileName,
+    int fileSize,
+    Uint8List fileKey,
+    Uint8List fileNonce,
+  ) async {
+    final collectionKey =
+        CollectionsService.instance.getCollectionKey(collectionID);
+
+    final encryptedResult = CryptoUtil.encryptSync(
+      fileKey,
+      collectionKey,
+    );
+
+    await _db.createTrackUploadsEntry(
+      localId,
+      fileHash,
+      collectionID,
+      urls,
+      encryptedFileName,
+      fileSize,
+      CryptoUtil.bin2base64(encryptedResult.encryptedData!),
+      CryptoUtil.bin2base64(fileNonce),
+      CryptoUtil.bin2base64(encryptedResult.nonce!),
+      partSize: multipartPartSizeForUpload,
+    );
+  }
+
+  Future<String> putExistingMultipartFile(
+    File encryptedFile,
+    String localId,
+    String fileHash,
+    int collectionID,
+  ) async {
+    final multipartInfo =
+        await _db.getCachedLinks(localId, fileHash, collectionID);
+    await _db.updateLastAttempted(localId, fileHash, collectionID);
+
+    Map<int, String> etags = multipartInfo.partETags ?? {};
+
+    if (multipartInfo.status == MultipartStatus.pending) {
+      // upload individual parts and get their etags
+      etags = await _uploadParts(multipartInfo, encryptedFile);
+    }
+
+    if (multipartInfo.status != MultipartStatus.completed) {
+      // complete the multipart upload
+      await _completeMultipartUpload(
+        multipartInfo.urls.objectKey,
+        etags,
+        multipartInfo.urls.completeURL,
+      );
+    }
+
+    return multipartInfo.urls.objectKey;
+  }
+
+  Future<String> putMultipartFile(
+    MultipartUploadURLs urls,
+    File encryptedFile,
+  ) async {
+    // upload individual parts and get their etags
+    final etags = await _uploadParts(
+      MultipartInfo(urls: urls),
+      encryptedFile,
+    );
+
+    // complete the multipart upload
+    await _completeMultipartUpload(urls.objectKey, etags, urls.completeURL);
+
+    return urls.objectKey;
+  }
+
+  Future<Map<int, String>> _uploadParts(
+    MultipartInfo partInfo,
+    File encryptedFile,
+  ) async {
+    final partsURLs = partInfo.urls.partsURLs;
+    final partUploadStatus = partInfo.partUploadStatus;
+    final partsLength = partsURLs.length;
+    final etags = partInfo.partETags ?? <int, String>{};
+
+    int i = 0;
+    final partSize = partInfo.partSize ?? multipartPartSizeForUpload;
+
+    // Go to the first part that is not uploaded
+    while (i < (partUploadStatus?.length ?? 0) &&
+        (partUploadStatus?[i] ?? false)) {
+      i++;
+    }
+
+    final int encFileLength = encryptedFile.lengthSync();
+    // Start parts upload
+    int count = 0;
+    while (i < partsLength) {
+      count++;
+      final partURL = partsURLs[i];
+      final isLastPart = i == partsLength - 1;
+      final fileSize = isLastPart ? encFileLength % partSize : partSize;
+      _logger.info(
+        "Uploading part ${i + 1} / $partsLength of size $fileSize bytes (total size $encFileLength).",
+      );
+      if (kDebugMode && count > 3) {
+        throw Exception(
+          'Forced exception to test multipart upload retry mechanism.',
+        );
+      }
+      final response = await _s3Dio.put(
+        partURL,
+        data: encryptedFile.openRead(
+          i * partSize,
+          isLastPart ? null : (i + 1) * partSize,
+        ),
+        options: Options(
+          headers: {
+            Headers.contentLengthHeader: fileSize,
+          },
+        ),
+      );
+
+      final eTag = response.headers.value("etag");
+
+      if (eTag?.isEmpty ?? true) {
+        throw Exception('ETAG_MISSING');
+      }
+
+      etags[i] = eTag!;
+
+      await _db.updatePartStatus(partInfo.urls.objectKey, i, eTag);
+      i++;
+    }
+
+    await _db.updateTrackUploadStatus(
+      partInfo.urls.objectKey,
+      MultipartStatus.uploaded,
+    );
+
+    return etags;
+  }
+
+  Future<void> _completeMultipartUpload(
+    String objectKey,
+    Map<int, String> partEtags,
+    String completeURL,
+  ) async {
+    final body = convertJs2Xml({
+      'CompleteMultipartUpload': partEtags.entries
+          .map(
+            (e) => PartETag(
+              e.key + 1,
+              e.value,
+            ),
+          )
+          .toList(),
+    }).replaceAll('"', '').replaceAll('&quot;', '');
+
+    try {
+      await _s3Dio.post(
+        completeURL,
+        data: body,
+        options: Options(
+          contentType: "text/xml",
+        ),
+      );
+      await _db.updateTrackUploadStatus(
+        objectKey,
+        MultipartStatus.completed,
+      );
+    } catch (e) {
+      Logger("MultipartUpload").severe(e);
+      rethrow;
+    }
+  }
+}

+ 153 - 37
mobile/lib/utils/file_uploader.dart

@@ -2,7 +2,7 @@ import 'dart:async';
 import 'dart:collection';
 import 'dart:convert';
 import 'dart:io';
-import 'dart:math';
+import 'dart:math' as math;
 
 import 'package:collection/collection.dart';
 import 'package:connectivity_plus/connectivity_plus.dart';
@@ -28,6 +28,8 @@ import 'package:photos/models/file/file_type.dart';
 import "package:photos/models/metadata/file_magic.dart";
 import 'package:photos/models/upload_url.dart';
 import "package:photos/models/user_details.dart";
+import "package:photos/module/upload/service/multipart.dart";
+import "package:photos/service_locator.dart";
 import 'package:photos/services/collections_service.dart';
 import "package:photos/services/file_magic_service.dart";
 import 'package:photos/services/local_sync_service.dart';
@@ -37,7 +39,6 @@ import 'package:photos/utils/crypto_util.dart';
 import 'package:photos/utils/file_download_util.dart';
 import 'package:photos/utils/file_uploader_util.dart';
 import "package:photos/utils/file_util.dart";
-import "package:photos/utils/multipart_upload_util.dart";
 import 'package:shared_preferences/shared_preferences.dart';
 import 'package:tuple/tuple.dart';
 import "package:uuid/uuid.dart";
@@ -51,7 +52,7 @@ class FileUploader {
   static const kBlockedUploadsPollFrequency = Duration(seconds: 2);
   static const kFileUploadTimeout = Duration(minutes: 50);
   static const k20MBStorageBuffer = 20 * 1024 * 1024;
-  static const kUploadTempPrefix = "upload_file_";
+  static const _lastStaleFileCleanupTime = "lastStaleFileCleanupTime";
 
   final _logger = Logger("FileUploader");
   final _dio = NetworkClient.instance.getDio();
@@ -79,6 +80,7 @@ class FileUploader {
   // cases, we don't want to clear the stale upload files. See #removeStaleFiles
   // as it can result in clearing files which are still being force uploaded.
   bool _hasInitiatedForceUpload = false;
+  late MultiPartUploader _multiPartUploader;
 
   FileUploader._privateConstructor() {
     Bus.instance.on<SubscriptionPurchasedEvent>().listen((event) {
@@ -114,6 +116,17 @@ class FileUploader {
       // ignore: unawaited_futures
       _pollBackgroundUploadStatus();
     }
+    _multiPartUploader = MultiPartUploader(
+      _enteDio,
+      _dio,
+      UploadLocksDB.instance,
+      flagService,
+    );
+    if (currentTime - (_prefs.getInt(_lastStaleFileCleanupTime) ?? 0) >
+        tempDirCleanUpInterval) {
+      await removeStaleFiles();
+      await _prefs.setInt(_lastStaleFileCleanupTime, currentTime);
+    }
     Bus.instance.on<LocalPhotosUpdatedEvent>().listen((event) {
       if (event.type == EventType.deletedFromDevice ||
           event.type == EventType.deletedFromEverywhere) {
@@ -309,13 +322,28 @@ class FileUploader {
       // ends with .encrypted. Fetch files in async manner
       final files = await Directory(dir).list().toList();
       final filesToDelete = files.where((file) {
-        return file.path.contains(kUploadTempPrefix) &&
+        return file.path.contains(uploadTempFilePrefix) &&
             file.path.contains(".encrypted");
       });
       if (filesToDelete.isNotEmpty) {
-        _logger.info('cleaning up state files ${filesToDelete.length}');
+        _logger.info('Deleting ${filesToDelete.length} stale upload files ');
+        final fileNameToLastAttempt =
+            await _uploadLocks.getFileNameToLastAttemptedAtMap();
         for (final file in filesToDelete) {
-          await file.delete();
+          final fileName = file.path.split('/').last;
+          final lastAttemptTime = fileNameToLastAttempt[fileName] != null
+              ? DateTime.fromMillisecondsSinceEpoch(
+                  fileNameToLastAttempt[fileName]!,
+                )
+              : null;
+          if (lastAttemptTime == null ||
+              DateTime.now().difference(lastAttemptTime).inDays > 1) {
+            await file.delete();
+          } else {
+            _logger.info(
+              'Skipping file $fileName as it was attempted recently on $lastAttemptTime',
+            );
+          }
         }
       }
 
@@ -405,7 +433,7 @@ class FileUploader {
           (fileOnDisk.updationTime ?? -1) != -1 &&
           (fileOnDisk.collectionID ?? -1) == collectionID;
       if (wasAlreadyUploaded) {
-        debugPrint("File is already uploaded ${fileOnDisk.tag}");
+        _logger.info("File is already uploaded ${fileOnDisk.tag}");
         return fileOnDisk;
       }
     }
@@ -425,6 +453,7 @@ class FileUploader {
     }
 
     final String lockKey = file.localID!;
+    bool _isMultipartUpload = false;
 
     try {
       await _uploadLocks.acquireLock(
@@ -438,12 +467,27 @@ class FileUploader {
     }
 
     final tempDirectory = Configuration.instance.getTempDirectory();
+    MediaUploadData? mediaUploadData;
+    mediaUploadData = await getUploadDataFromEnteFile(file);
+
+    final String? existingMultipartEncFileName =
+        mediaUploadData.hashData?.fileHash != null
+            ? await _uploadLocks.getEncryptedFileName(
+                lockKey,
+                mediaUploadData.hashData!.fileHash!,
+                collectionID,
+              )
+            : null;
+    bool multipartEntryExists = existingMultipartEncFileName != null;
+
     final String uniqueID = const Uuid().v4().toString();
-    final encryptedFilePath =
-        '$tempDirectory$kUploadTempPrefix${uniqueID}_file.encrypted';
+
+    final encryptedFilePath = multipartEntryExists
+        ? '$tempDirectory$existingMultipartEncFileName'
+        : '$tempDirectory$uploadTempFilePrefix${uniqueID}_file.encrypted';
     final encryptedThumbnailPath =
-        '$tempDirectory$kUploadTempPrefix${uniqueID}_thumb.encrypted';
-    MediaUploadData? mediaUploadData;
+        '$tempDirectory$uploadTempFilePrefix${uniqueID}_thumb.encrypted';
+
     var uploadCompleted = false;
     // This flag is used to decide whether to clear the iOS origin file cache
     // or not.
@@ -457,13 +501,18 @@ class FileUploader {
         '${isUpdatedFile ? 're-upload' : 'upload'} of ${file.toString()}',
       );
 
-      mediaUploadData = await getUploadDataFromEnteFile(file);
-
       Uint8List? key;
+      EncryptionResult? multiPartFileEncResult = multipartEntryExists
+          ? await _multiPartUploader.getEncryptionResult(
+              lockKey,
+              mediaUploadData.hashData!.fileHash!,
+              collectionID,
+            )
+          : null;
       if (isUpdatedFile) {
         key = getFileKey(file);
       } else {
-        key = null;
+        key = multiPartFileEncResult?.key;
         // check if the file is already uploaded and can be mapped to existing
         // uploaded file. If map is found, it also returns the corresponding
         // mapped or update file entry.
@@ -482,16 +531,40 @@ class FileUploader {
         }
       }
 
-      if (File(encryptedFilePath).existsSync()) {
+      final encryptedFileExists = File(encryptedFilePath).existsSync();
+
+      // If the multipart entry exists but the encrypted file doesn't, it means
+      // that we'll have to reupload as the nonce is lost
+      if (multipartEntryExists) {
+        final bool updateWithDiffKey = isUpdatedFile &&
+            multiPartFileEncResult != null &&
+            !listEquals(key, multiPartFileEncResult.key);
+        if (!encryptedFileExists || updateWithDiffKey) {
+          if (updateWithDiffKey) {
+            _logger.severe('multiPart update resumed with differentKey');
+          } else {
+            _logger.warning(
+              'multiPart EncryptedFile missing, discard multipart entry',
+            );
+          }
+          await _uploadLocks.deleteMultipartTrack(lockKey);
+          multipartEntryExists = false;
+          multiPartFileEncResult = null;
+        }
+      } else if (encryptedFileExists) {
+        // otherwise just delete the file for singlepart upload
         await File(encryptedFilePath).delete();
       }
       await _checkIfWithinStorageLimit(mediaUploadData.sourceFile!);
       final encryptedFile = File(encryptedFilePath);
-      final EncryptionResult fileAttributes = await CryptoUtil.encryptFile(
-        mediaUploadData.sourceFile!.path,
-        encryptedFilePath,
-        key: key,
-      );
+
+      final EncryptionResult fileAttributes = multiPartFileEncResult ??
+          await CryptoUtil.encryptFile(
+            mediaUploadData.sourceFile!.path,
+            encryptedFilePath,
+            key: key,
+          );
+
       late final Uint8List? thumbnailData;
       if (mediaUploadData.thumbnail == null &&
           file.fileType == FileType.video) {
@@ -512,31 +585,63 @@ class FileUploader {
       await encryptedThumbnailFile
           .writeAsBytes(encryptedThumbnailData.encryptedData!);
 
-      final thumbnailUploadURL = await _getUploadURL();
-      final String thumbnailObjectKey =
-          await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
-
-      // Calculate the number of parts for the file. Multiple part upload
-      // is only enabled for internal users and debug builds till it's battle tested.
-      final count = kDebugMode
-          ? await calculatePartCount(
-              await encryptedFile.length(),
-            )
-          : 1;
+      // Calculate the number of parts for the file.
+      final count = await _multiPartUploader.calculatePartCount(
+        await encryptedFile.length(),
+      );
 
       late String fileObjectKey;
+      late String thumbnailObjectKey;
 
       if (count <= 1) {
+        final thumbnailUploadURL = await _getUploadURL();
+        thumbnailObjectKey =
+            await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
         final fileUploadURL = await _getUploadURL();
         fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
       } else {
-        final fileUploadURLs = await getMultipartUploadURLs(count);
-        fileObjectKey = await putMultipartFile(fileUploadURLs, encryptedFile);
+        _isMultipartUpload = true;
+        _logger.finest(
+          "Init multipartUpload $multipartEntryExists, isUpdate $isUpdatedFile",
+        );
+        if (multipartEntryExists) {
+          fileObjectKey = await _multiPartUploader.putExistingMultipartFile(
+            encryptedFile,
+            lockKey,
+            mediaUploadData.hashData!.fileHash!,
+            collectionID,
+          );
+        } else {
+          final fileUploadURLs =
+              await _multiPartUploader.getMultipartUploadURLs(count);
+          final encFileName = encryptedFile.path.split('/').last;
+          await _multiPartUploader.createTableEntry(
+            lockKey,
+            mediaUploadData.hashData!.fileHash!,
+            collectionID,
+            fileUploadURLs,
+            encFileName,
+            await encryptedFile.length(),
+            fileAttributes.key!,
+            fileAttributes.header!,
+          );
+          fileObjectKey = await _multiPartUploader.putMultipartFile(
+            fileUploadURLs,
+            encryptedFile,
+          );
+        }
+        // in case of multipart, upload the thumbnail towards the end to avoid
+        // re-uploading the thumbnail in case of failure.
+        // In regular upload, always upload the thumbnail first to keep existing behaviour
+        //
+        final thumbnailUploadURL = await _getUploadURL();
+        thumbnailObjectKey =
+            await _putFile(thumbnailUploadURL, encryptedThumbnailFile);
       }
 
       final metadata = await file.getMetadataForUpload(mediaUploadData);
       final encryptedMetadataResult = await CryptoUtil.encryptChaCha(
-        utf8.encode(jsonEncode(metadata)) as Uint8List,
+        utf8.encode(jsonEncode(metadata)),
         fileAttributes.key!,
       );
       final fileDecryptionHeader =
@@ -618,6 +723,8 @@ class FileUploader {
         }
         await FilesDB.instance.update(remoteFile);
       }
+      await UploadLocksDB.instance.deleteMultipartTrack(lockKey);
+
       if (!_isBackground) {
         Bus.instance.fire(
           LocalPhotosUpdatedEvent(
@@ -659,6 +766,7 @@ class FileUploader {
         encryptedFilePath,
         encryptedThumbnailPath,
         lockKey: lockKey,
+        isMultiPartUpload: _isMultipartUpload,
       );
     }
   }
@@ -803,6 +911,7 @@ class FileUploader {
     String encryptedFilePath,
     String encryptedThumbnailPath, {
     required String lockKey,
+    bool isMultiPartUpload = false,
   }) async {
     if (mediaUploadData != null && mediaUploadData.sourceFile != null) {
       // delete the file from app's internal cache if it was copied to app
@@ -816,7 +925,14 @@ class FileUploader {
       }
     }
     if (File(encryptedFilePath).existsSync()) {
-      await File(encryptedFilePath).delete();
+      if (isMultiPartUpload && !uploadCompleted) {
+        _logger.fine(
+          "skip delete for multipart encrypted file $encryptedFilePath",
+        );
+      } else {
+        _logger.fine("deleting encrypted file $encryptedFilePath");
+        await File(encryptedFilePath).delete();
+      }
     }
     if (File(encryptedThumbnailPath).existsSync()) {
       await File(encryptedThumbnailPath).delete();
@@ -1039,7 +1155,7 @@ class FileUploader {
     if (_uploadURLs.isEmpty) {
       // the queue is empty, fetch at least for one file to handle force uploads
       // that are not in the queue. This is to also avoid
-      await fetchUploadURLs(max(_queue.length, 1));
+      await fetchUploadURLs(math.max(_queue.length, 1));
     }
     try {
       return _uploadURLs.removeFirst();
@@ -1061,7 +1177,7 @@ class FileUploader {
         final response = await _enteDio.get(
           "/files/upload-urls",
           queryParameters: {
-            "count": min(42, fileCount * 2), // m4gic number
+            "count": math.min(42, fileCount * 2), // m4gic number
           },
         );
         final urls = (response.data["urls"] as List)

+ 1 - 1
mobile/lib/utils/multipart_upload_util.dart

@@ -6,8 +6,8 @@ import "package:dio/dio.dart";
 import "package:logging/logging.dart";
 import "package:photos/core/constants.dart";
 import "package:photos/core/network/network.dart";
+import 'package:photos/module/upload/model/xml.dart';
 import "package:photos/service_locator.dart";
-import "package:photos/utils/xml_parser_util.dart";
 
 final _enteDio = NetworkClient.instance.enteDio;
 final _dio = NetworkClient.instance.getDio();

+ 0 - 40
mobile/lib/utils/xml_parser_util.dart

@@ -1,41 +1 @@
-// ignore_for_file: implementation_imports
 
-import "package:xml/xml.dart";
-
-// used for classes that can be converted to xml
-abstract class XmlParsableObject {
-  Map<String, dynamic> toMap();
-  String get elementName;
-}
-
-// for converting the response to xml
-String convertJs2Xml(Map<String, dynamic> json) {
-  final builder = XmlBuilder();
-  buildXml(builder, json);
-  return builder.buildDocument().toXmlString(
-        pretty: true,
-        indent: '    ',
-      );
-}
-
-// for building the xml node tree recursively
-void buildXml(XmlBuilder builder, dynamic node) {
-  if (node is Map<String, dynamic>) {
-    node.forEach((key, value) {
-      builder.element(key, nest: () => buildXml(builder, value));
-    });
-  } else if (node is List<dynamic>) {
-    for (var item in node) {
-      buildXml(builder, item);
-    }
-  } else if (node is XmlParsableObject) {
-    builder.element(
-      node.elementName,
-      nest: () {
-        buildXml(builder, node.toMap());
-      },
-    );
-  } else {
-    builder.text(node.toString());
-  }
-}