Fix race conditions in the upload flow

This commit is contained in:
Vishnu Mohandas 2020-11-09 21:40:43 +05:30
parent 0ed7ca3977
commit 433e11882a
4 changed files with 80 additions and 43 deletions

View file

@ -36,9 +36,7 @@ class FavoritesService {
Future<void> addToFavorites(File file) async {
final collectionID = await _getOrCreateFavoriteCollectionID();
if (file.uploadedFileID == null) {
file.collectionID = collectionID;
final uploadedFile = (await _fileUploader.forceUpload(file));
await _filesDB.update(uploadedFile);
await _fileUploader.forceUpload(file, collectionID);
Bus.instance.fire(CollectionUpdatedEvent(collectionID: collectionID));
} else {
await _collectionsService.addToCollection(collectionID, [file]);

View file

@ -183,38 +183,16 @@ class SyncService {
}
}
try {
file.collectionID = (await CollectionsService.instance
final collectionID = (await CollectionsService.instance
.getOrCreateForPath(file.deviceFolder))
.id;
final currentFile = await _db.getFile(file.generatedID);
Future<void> future;
if (currentFile.uploadedFileID != null) {
// The file was uploaded outside this loop
// Eg: Addition to an album or favorites
future = CollectionsService.instance
.addToCollection(file.collectionID, [currentFile]);
} else {
if (_uploader.getCurrentUploadStatus(file) != null) {
// The file is currently being uploaded outside this loop
// Eg: Addition to an album or favorites
future = _uploader
.getCurrentUploadStatus(file)
.then((uploadedFile) async {
await CollectionsService.instance
.addToCollection(file.collectionID, [uploadedFile]);
});
} else {
future = _uploader.addToQueue(file).then((uploadedFile) async {
await _db.update(uploadedFile);
});
}
}
futures.add(future.then((value) {
final future = _uploader.upload(file, collectionID).then((value) {
Bus.instance
.fire(CollectionUpdatedEvent(collectionID: file.collectionID));
Bus.instance.fire(PhotoUploadEvent(
completed: i + 1, total: filesToBeUploaded.length));
}));
});
futures.add(future);
} catch (e) {
Bus.instance.fire(PhotoUploadEvent(hasError: true));
throw e;

View file

@ -222,10 +222,8 @@ class _CreateCollectionPageState extends State<CreateCollectionPage> {
final files = List<File>();
for (final file in widget.selectedFiles.files) {
if (file.uploadedFileID == null) {
file.collectionID = collectionID;
final uploadedFile =
(await FileUploader.instance.forceUpload(file));
await FilesDB.instance.update(uploadedFile);
(await FileUploader.instance.forceUpload(file, collectionID));
files.add(uploadedFile);
} else {
files.add(file);

View file

@ -7,6 +7,7 @@ import 'package:flutter_sodium/flutter_sodium.dart';
import 'package:logging/logging.dart';
import 'package:photos/core/configuration.dart';
import 'package:photos/core/constants.dart';
import 'package:photos/db/files_db.dart';
import 'package:photos/models/file.dart';
import 'package:photos/models/location.dart';
import 'package:photos/models/upload_url.dart';
@ -23,20 +24,77 @@ class FileUploader {
FileUploader._privateConstructor();
static FileUploader instance = FileUploader._privateConstructor();
Future<File> addToQueue(File file) {
if (_queue[file.generatedID] == null) {
_queue[file.generatedID] = FileUploadItem(file, Completer<File>());
Future<File> upload(File file, int collectionID) {
// If the file hasn't been queued yet, queue it
if (!_queue.containsKey(file.generatedID)) {
final completer = Completer<File>();
_queue[file.generatedID] = FileUploadItem(file, collectionID, completer);
_pollQueue();
return completer.future;
}
return _queue[file.generatedID].completer.future;
// If the file exists in the queue for a matching collectionID,
// return the existing future
final item = _queue[file.generatedID];
if (item.collectionID == collectionID) {
return item.completer.future;
}
// Else wait for the existing upload to complete,
// and add it to the relevant collection
return item.completer.future.then((uploadedFile) {
return CollectionsService.instance
.addToCollection(collectionID, [uploadedFile]).then((aVoid) {
return uploadedFile;
});
});
}
Future<File> getCurrentUploadStatus(File file) {
return _queue[file.generatedID]?.completer?.future;
}
Future<File> forceUpload(File file) async {
return _encryptAndUploadFile(file, forcedUpload: true);
Future<File> forceUpload(File file, int collectionID) async {
// If the file hasn't been queued yet, ez.
if (!_queue.containsKey(file.generatedID)) {
return _encryptAndUploadFileToCollection(file, collectionID,
forcedUpload: true);
}
var item = _queue[file.generatedID];
// If the file is being uploaded right now, wait and proceed
if (item.status == UploadStatus.in_progress) {
return item.completer.future.then((uploadedFile) async {
if (uploadedFile.collectionID == collectionID) {
// Do nothing
return uploadedFile;
} else {
return CollectionsService.instance
.addToCollection(collectionID, [uploadedFile]).then((aVoid) {
return uploadedFile;
});
}
});
} else {
// If the file is yet to be processed,
// 1. Remove it from the queue,
// 2. Force upload the current file
// 3. Trigger the callback for the original request
item = _queue.remove(file.generatedID);
return _encryptAndUploadFileToCollection(file, collectionID,
forcedUpload: true)
.then((uploadedFile) {
if (item.collectionID == collectionID) {
item.completer.complete(uploadedFile);
return uploadedFile;
} else {
CollectionsService.instance
.addToCollection(item.collectionID, [uploadedFile]).then((aVoid) {
item.completer.complete(uploadedFile);
});
return uploadedFile;
}
});
}
}
void _pollQueue() {
@ -45,11 +103,12 @@ class FileUploader {
.firstWhere((entry) => entry.value.status == UploadStatus.not_started)
.value;
firstPendingEntry.status = UploadStatus.in_progress;
_encryptAndUploadFile(firstPendingEntry.file);
_encryptAndUploadFileToCollection(
firstPendingEntry.file, firstPendingEntry.collectionID);
}
}
Future<File> _encryptAndUploadFile(File file,
Future<File> _encryptAndUploadFileToCollection(File file, int collectionID,
{bool forcedUpload = false}) async {
_logger.info("Uploading " + file.toString());
if (!forcedUpload) {
@ -95,7 +154,7 @@ class FileUploader {
final encryptedFileKeyData = CryptoUtil.encryptSync(
fileAttributes.key,
CollectionsService.instance.getCollectionKey(file.collectionID),
CollectionsService.instance.getCollectionKey(collectionID),
);
final encryptedKey = Sodium.bin2base64(encryptedFileKeyData.encryptedData);
@ -109,7 +168,7 @@ class FileUploader {
Sodium.bin2base64(encryptedMetadataData.header);
final data = {
"collectionID": file.collectionID,
"collectionID": collectionID,
"encryptedKey": encryptedKey,
"keyDecryptionNonce": keyDecryptionNonce,
"file": {
@ -132,11 +191,12 @@ class FileUploader {
Options(headers: {"X-Auth-Token": Configuration.instance.getToken()}),
data: data,
)
.then((response) {
.then((response) async {
encryptedFile.deleteSync();
encryptedThumbnail.deleteSync();
final data = response.data;
file.uploadedFileID = data["id"];
file.collectionID = collectionID;
file.updationTime = data["updationTime"];
file.ownerID = data["ownerID"];
file.encryptedKey = encryptedKey;
@ -150,6 +210,7 @@ class FileUploader {
_queue.remove(file.generatedID);
_pollQueue();
}
await FilesDB.instance.update(file);
return file;
});
}
@ -190,11 +251,13 @@ class FileUploader {
class FileUploadItem {
final File file;
final int collectionID;
final Completer<File> completer;
UploadStatus status;
FileUploadItem(
this.file,
this.collectionID,
this.completer, {
this.status = UploadStatus.not_started,
});