Enable parallel uploads

This commit is contained in:
Vishnu Mohandas 2020-11-09 17:58:43 +05:30
parent 4cec4aa540
commit e825798433
5 changed files with 124 additions and 90 deletions

View file

@ -147,7 +147,7 @@ class CollectionsService {
Future<List<Collection>> _fetchCollections(int sinceTime) {
return Dio()
.get(
Configuration.instance.getHttpEndpoint() + "/collections/",
Configuration.instance.getHttpEndpoint() + "/collections",
queryParameters: {
"sinceTime": sinceTime,
},
@ -265,7 +265,7 @@ class CollectionsService {
Future<Collection> createAndCacheCollection(Collection collection) async {
return Dio()
.post(
Configuration.instance.getHttpEndpoint() + "/collections/",
Configuration.instance.getHttpEndpoint() + "/collections",
data: collection.toMap(),
options:
Options(headers: {"X-Auth-Token": Configuration.instance.getToken()}),

View file

@ -35,7 +35,7 @@ class FavoritesService {
final collectionID = await _getOrCreateFavoriteCollectionID();
if (file.uploadedFileID == null) {
file.collectionID = collectionID;
final uploadedFile = (await _fileUploader.encryptAndUploadFile(file));
final uploadedFile = (await _fileUploader.forceUpload(file));
await _filesDB.update(uploadedFile);
} else {
await _collectionsService.addToCollection(collectionID, [file]);

View file

@ -131,8 +131,8 @@ class SyncService {
for (final collection in collections) {
await _fetchEncryptedFilesDiff(collection.id);
}
await _uploadDiff();
await deleteFilesOnServer();
await _uploadDiff();
}
Future<void> _fetchEncryptedFilesDiff(int collectionID) async {
@ -169,6 +169,7 @@ class SyncService {
final foldersToBackUp = Configuration.instance.getPathsToBackUp();
List<File> filesToBeUploaded =
await _db.getFilesToBeUploadedWithinFolders(foldersToBackUp);
final futures = List<Future>();
for (int i = 0; i < filesToBeUploaded.length; i++) {
if (_syncStopRequested) {
_syncStopRequested = false;
@ -185,36 +186,46 @@ class SyncService {
file.collectionID = (await CollectionsService.instance
.getOrCreateForPath(file.deviceFolder))
.id;
final existingFile = await _db.getFile(file.generatedID);
if (existingFile == null) {
final currentFile = await _db.getFile(file.generatedID);
if (currentFile == null) {
// File was deleted locally while being uploaded
await _deleteFileOnServer(file.uploadedFileID);
continue;
}
if (existingFile.uploadedFileID != null) {
Future<void> future;
if (currentFile.uploadedFileID != null) {
// The file was uploaded outside this loop
// Eg: Addition to an album or favorites
await CollectionsService.instance
.addToCollection(file.collectionID, [existingFile]);
} else if (_uploader.getCurrentUploadStatus(file.generatedID) != null) {
// The file is currently being uploaded outside this loop
// Eg: Addition to an album or favorites
await _uploader.getCurrentUploadStatus(file.generatedID);
await CollectionsService.instance
.addToCollection(file.collectionID, [existingFile]);
future = CollectionsService.instance
.addToCollection(file.collectionID, [currentFile]);
} else {
final uploadedFile = await _uploader.encryptAndUploadFile(file);
await _db.update(uploadedFile);
if (_uploader.getCurrentUploadStatus(file) != null) {
// The file is currently being uploaded outside this loop
// Eg: Addition to an album or favorites
future = _uploader
.getCurrentUploadStatus(file)
.then((uploadedFile) async {
await CollectionsService.instance
.addToCollection(file.collectionID, [uploadedFile]);
});
} else {
future = _uploader.addToQueue(file).then((uploadedFile) async {
await _db.update(uploadedFile);
});
}
}
Bus.instance
.fire(CollectionUpdatedEvent(collectionID: file.collectionID));
Bus.instance.fire(PhotoUploadEvent(
completed: i + 1, total: filesToBeUploaded.length));
futures.add(future.then((value) {
Bus.instance
.fire(CollectionUpdatedEvent(collectionID: file.collectionID));
Bus.instance.fire(PhotoUploadEvent(
completed: i + 1, total: filesToBeUploaded.length));
}));
} catch (e) {
Bus.instance.fire(PhotoUploadEvent(hasError: true));
throw e;
}
}
await Future.wait(futures);
}
Future _storeDiff(List<File> diff, int collectionID) async {

View file

@ -224,7 +224,7 @@ class _CreateCollectionPageState extends State<CreateCollectionPage> {
if (file.uploadedFileID == null) {
file.collectionID = collectionID;
final uploadedFile =
(await FileUploader.instance.encryptAndUploadFile(file));
(await FileUploader.instance.forceUpload(file));
await FilesDB.instance.update(uploadedFile);
files.add(uploadedFile);
} else {

View file

@ -1,3 +1,5 @@
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:io' as io;
import 'package:dio/dio.dart';
@ -10,61 +12,49 @@ import 'package:photos/models/location.dart';
import 'package:photos/models/upload_url.dart';
import 'package:photos/services/collections_service.dart';
import 'package:photos/utils/crypto_util.dart';
import 'package:photos/utils/file_name_util.dart';
import 'package:photos/utils/file_util.dart';
class FileUploader {
final _logger = Logger("FileUploader");
final _dio = Dio();
final _currentlyUploading = Map<int, Future<File>>();
final _queue = LinkedHashMap<int, FileUploadItem>();
final _maximumConcurrentUploads = 4;
int _currentlyUploading = 0;
FileUploader._privateConstructor();
static FileUploader instance = FileUploader._privateConstructor();
Future<File> getCurrentUploadStatus(int generatedID) {
return _currentlyUploading[generatedID];
Future<File> addToQueue(File file) {
if (_queue[file.generatedID] == null) {
_queue[file.generatedID] = FileUploadItem(file, Completer<File>());
_pollQueue();
}
return _queue[file.generatedID].completer.future;
}
Future<UploadURL> getUploadURL() {
return Dio()
.get(
Configuration.instance.getHttpEndpoint() + "/files/upload-url",
options: Options(
headers: {"X-Auth-Token": Configuration.instance.getToken()}),
)
.then((response) => UploadURL.fromMap(response.data));
Future<File> getCurrentUploadStatus(File file) {
return _queue[file.generatedID]?.completer?.future;
}
Future<String> putFile(UploadURL uploadURL, io.File file) async {
final fileSize = file.lengthSync().toString();
final startTime = DateTime.now().millisecondsSinceEpoch;
_logger.info("Putting file of size " + fileSize + " to " + uploadURL.url);
return Dio()
.put(uploadURL.url,
data: file.openRead(),
options: Options(headers: {
Headers.contentLengthHeader: await file.length(),
}))
.catchError((e) {
_logger.severe(e);
throw e;
}).then((value) {
_logger.info("Upload speed : " +
(file.lengthSync() /
(DateTime.now().millisecondsSinceEpoch - startTime))
.toString() +
" kilo bytes per second");
return uploadURL.objectKey;
});
Future<File> forceUpload(File file) async {
return _encryptAndUploadFile(file, forcedUpload: true);
}
Future<File> encryptAndUploadFile(File file) async {
_currentlyUploading[file.generatedID] = _encryptAndUploadFile(file);
return _currentlyUploading[file.generatedID];
void _pollQueue() {
if (_queue.length > 0 && _currentlyUploading < _maximumConcurrentUploads) {
final firstPendingEntry = _queue.entries
.firstWhere((entry) => entry.value.status == UploadStatus.not_started)
.value;
firstPendingEntry.status = UploadStatus.in_progress;
_encryptAndUploadFile(firstPendingEntry.file);
}
}
Future<File> _encryptAndUploadFile(File file) async {
Future<File> _encryptAndUploadFile(File file,
{bool forcedUpload = false}) async {
_logger.info("Uploading " + file.toString());
if (!forcedUpload) {
_currentlyUploading++;
}
final encryptedFileName = file.generatedID.toString() + ".encrypted";
final tempDirectory = Configuration.instance.getTempDirectory();
final encryptedFilePath = tempDirectory + encryptedFileName;
@ -74,8 +64,8 @@ class FileUploader {
final fileAttributes =
await CryptoUtil.encryptFile(sourceFile.path, encryptedFilePath);
final fileUploadURL = await getUploadURL();
String fileObjectKey = await putFile(fileUploadURL, encryptedFile);
final fileUploadURL = await _getUploadURL();
String fileObjectKey = await _putFile(fileUploadURL, encryptedFile);
final thumbnailData = (await (await file.getAsset()).thumbDataWithSize(
THUMBNAIL_LARGE_SIZE,
@ -85,14 +75,14 @@ class FileUploader {
final encryptedThumbnailName =
file.generatedID.toString() + "_thumbnail.encrypted";
final encryptedThumbnailPath = tempDirectory + encryptedThumbnailName;
final encryptedThumbnail =
final encryptedThumbnailData =
CryptoUtil.encryptChaCha(thumbnailData, fileAttributes.key);
io.File(encryptedThumbnailPath)
.writeAsBytesSync(encryptedThumbnail.encryptedData);
final encryptedThumbnail = io.File(encryptedThumbnailPath);
encryptedThumbnail.writeAsBytesSync(encryptedThumbnailData.encryptedData);
final thumbnailUploadURL = await getUploadURL();
final thumbnailUploadURL = await _getUploadURL();
String thumbnailObjectKey =
await putFile(thumbnailUploadURL, io.File(encryptedThumbnailPath));
await _putFile(thumbnailUploadURL, encryptedThumbnail);
// h4ck to fetch location data if missing (thank you Android Q+) lazily only during uploads
if (file.location.latitude == 0 && file.location.longitude == 0) {
@ -112,7 +102,7 @@ class FileUploader {
final keyDecryptionNonce = Sodium.bin2base64(encryptedFileKeyData.nonce);
final fileDecryptionHeader = Sodium.bin2base64(fileAttributes.header);
final thumbnailDecryptionHeader =
Sodium.bin2base64(encryptedThumbnail.header);
Sodium.bin2base64(encryptedThumbnailData.header);
final encryptedMetadata =
Sodium.bin2base64(encryptedMetadataData.encryptedData);
final metadataDecryptionHeader =
@ -144,7 +134,7 @@ class FileUploader {
)
.then((response) {
encryptedFile.deleteSync();
io.File(encryptedThumbnailPath).deleteSync();
encryptedThumbnail.deleteSync();
final data = response.data;
file.uploadedFileID = data["id"];
file.updationTime = data["updationTime"];
@ -154,31 +144,64 @@ class FileUploader {
file.fileDecryptionHeader = fileDecryptionHeader;
file.thumbnailDecryptionHeader = thumbnailDecryptionHeader;
file.metadataDecryptionHeader = metadataDecryptionHeader;
_currentlyUploading.remove(file.generatedID);
if (!forcedUpload) {
_currentlyUploading--;
_queue[file.generatedID].completer.complete(file);
_queue.remove(file.generatedID);
_pollQueue();
}
return file;
});
}
Future<File> uploadFile(File localPhoto) async {
final title = getJPGFileNameForHEIC(localPhoto);
final formData = FormData.fromMap({
"file": MultipartFile.fromBytes(await getBytesFromDisk(localPhoto),
filename: title),
"deviceFileID": localPhoto.localID,
"deviceFolder": localPhoto.deviceFolder,
"title": title,
"creationTime": localPhoto.creationTime,
"modificationTime": localPhoto.modificationTime,
});
return _dio
.post(
Configuration.instance.getHttpEndpoint() + "/files",
options:
Options(headers: {"X-Auth-Token": Configuration.instance.getToken()}),
data: formData,
)
.then((response) {
return File.fromJson(response.data);
Future<UploadURL> _getUploadURL() {
return Dio()
.get(
Configuration.instance.getHttpEndpoint() + "/files/upload-url",
options: Options(
headers: {"X-Auth-Token": Configuration.instance.getToken()}),
)
.then((response) => UploadURL.fromMap(response.data));
}
Future<String> _putFile(UploadURL uploadURL, io.File file) async {
final fileSize = file.lengthSync().toString();
final startTime = DateTime.now().millisecondsSinceEpoch;
_logger.info("Putting file of size " + fileSize + " to " + uploadURL.url);
return Dio()
.put(uploadURL.url,
data: file.openRead(),
options: Options(headers: {
Headers.contentLengthHeader: await file.length(),
}))
.catchError((e) {
_logger.severe(e);
throw e;
}).then((value) {
_logger.info("Upload speed : " +
(file.lengthSync() /
(DateTime.now().millisecondsSinceEpoch - startTime))
.toString() +
" kilo bytes per second");
return uploadURL.objectKey;
});
}
}
class FileUploadItem {
final File file;
final Completer<File> completer;
UploadStatus status;
FileUploadItem(
this.file,
this.completer, {
this.status = UploadStatus.not_started,
});
}
enum UploadStatus {
not_started,
in_progress,
completed,
}