|
|
|
@ -23,6 +23,7 @@ class SyncService {
|
|
|
|
|
final Isar _db;
|
|
|
|
|
final AsyncMutex _lock = AsyncMutex();
|
|
|
|
|
final Logger _log = Logger('SyncService');
|
|
|
|
|
static const int batchSize = 100;
|
|
|
|
|
|
|
|
|
|
SyncService(this._db);
|
|
|
|
|
|
|
|
|
@ -425,50 +426,67 @@ class SyncService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// general case, e.g. some assets have been deleted or there are excluded albums on iOS
|
|
|
|
|
final inDb = await album.assets
|
|
|
|
|
.filter()
|
|
|
|
|
.ownerIdEqualTo(Store.get(StoreKey.currentUser).isarId)
|
|
|
|
|
.deviceIdEqualTo(Store.get(StoreKey.deviceIdHash))
|
|
|
|
|
.sortByLocalId()
|
|
|
|
|
.findAll();
|
|
|
|
|
final List<Asset> onDevice =
|
|
|
|
|
await ape.getAssets(excludedAssets: excludedAssets);
|
|
|
|
|
onDevice.sort(Asset.compareByLocalId);
|
|
|
|
|
final d = _diffAssets(onDevice, inDb, compare: Asset.compareByLocalId);
|
|
|
|
|
final List<Asset> toAdd = d.first, toUpdate = d.second, toDelete = d.third;
|
|
|
|
|
if (toAdd.isEmpty &&
|
|
|
|
|
toUpdate.isEmpty &&
|
|
|
|
|
toDelete.isEmpty &&
|
|
|
|
|
album.name == ape.name &&
|
|
|
|
|
album.modifiedAt == ape.lastModified) {
|
|
|
|
|
// changes only affeted excluded albums
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
final result = await _linkWithExistingFromDb(toAdd);
|
|
|
|
|
deleteCandidates.addAll(toDelete);
|
|
|
|
|
existing.addAll(result.first);
|
|
|
|
|
album.name = ape.name;
|
|
|
|
|
album.modifiedAt = ape.lastModified!;
|
|
|
|
|
if (album.thumbnail.value != null &&
|
|
|
|
|
toDelete.contains(album.thumbnail.value)) {
|
|
|
|
|
album.thumbnail.value = null;
|
|
|
|
|
}
|
|
|
|
|
final List<AssetPathEntity> splits = await _batchedTimeSpans(ape);
|
|
|
|
|
bool changes = false;
|
|
|
|
|
try {
|
|
|
|
|
await _db.writeTxn(() async {
|
|
|
|
|
await _db.assets.putAll(result.second);
|
|
|
|
|
await _db.assets.putAll(toUpdate);
|
|
|
|
|
await album.assets
|
|
|
|
|
.update(link: result.first + result.second, unlink: toDelete);
|
|
|
|
|
await _db.albums.put(album);
|
|
|
|
|
for (AssetPathEntity a in splits) {
|
|
|
|
|
final inDb = await album.assets
|
|
|
|
|
.filter()
|
|
|
|
|
.ownerIdEqualTo(Store.get(StoreKey.currentUser).isarId)
|
|
|
|
|
.deviceIdEqualTo(Store.get(StoreKey.deviceIdHash))
|
|
|
|
|
.fileModifiedAtBetween(
|
|
|
|
|
a == splits.last
|
|
|
|
|
? DateTime.fromMillisecondsSinceEpoch(0)
|
|
|
|
|
: a.filterOption.updateTimeCond.min,
|
|
|
|
|
a == splits.first
|
|
|
|
|
? DateTime.now()
|
|
|
|
|
: a.filterOption.updateTimeCond.max,
|
|
|
|
|
)
|
|
|
|
|
.sortByLocalId()
|
|
|
|
|
.findAll();
|
|
|
|
|
final List<Asset> onDevice =
|
|
|
|
|
await a.getAssets(excludedAssets: excludedAssets);
|
|
|
|
|
onDevice.sort(Asset.compareByLocalId);
|
|
|
|
|
final d = _diffAssets(onDevice, inDb, compare: Asset.compareByLocalId);
|
|
|
|
|
final List<Asset> toAdd = d.first,
|
|
|
|
|
toUpdate = d.second,
|
|
|
|
|
toDelete = d.third;
|
|
|
|
|
if (toAdd.isEmpty && toUpdate.isEmpty && toDelete.isEmpty) {
|
|
|
|
|
// no changes in this batch OR changes only affeted excluded assets
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
final result = await _linkWithExistingFromDb(toAdd);
|
|
|
|
|
deleteCandidates.addAll(toDelete);
|
|
|
|
|
existing.addAll(result.first);
|
|
|
|
|
if (album.thumbnail.value != null &&
|
|
|
|
|
toDelete.contains(album.thumbnail.value)) {
|
|
|
|
|
album.thumbnail.value = null;
|
|
|
|
|
}
|
|
|
|
|
await _db.writeTxn(() async {
|
|
|
|
|
await _db.assets.putAll(result.second);
|
|
|
|
|
await _db.assets.putAll(toUpdate);
|
|
|
|
|
await album.assets.update(link: result.first, unlink: toDelete);
|
|
|
|
|
await album.assets.update(link: result.second);
|
|
|
|
|
});
|
|
|
|
|
changes = true;
|
|
|
|
|
}
|
|
|
|
|
if (changes ||
|
|
|
|
|
album.name != ape.name ||
|
|
|
|
|
album.modifiedAt != ape.lastModified) {
|
|
|
|
|
album.name = ape.name;
|
|
|
|
|
album.modifiedAt = ape.lastModified!;
|
|
|
|
|
album.thumbnail.value ??= await album.assets.filter().findFirst();
|
|
|
|
|
await album.thumbnail.save();
|
|
|
|
|
});
|
|
|
|
|
_log.info("Synced changes of local album $ape to DB");
|
|
|
|
|
await _db.writeTxn(() async {
|
|
|
|
|
await _db.albums.put(album);
|
|
|
|
|
await album.thumbnail.save();
|
|
|
|
|
});
|
|
|
|
|
_log.info("Synced changes of local album $ape to DB");
|
|
|
|
|
}
|
|
|
|
|
} on IsarError catch (e) {
|
|
|
|
|
_log.severe("Failed to update synced album $ape in DB: $e");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
return changes;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// fast path for common case: only new assets were added to device album
|
|
|
|
@ -488,27 +506,114 @@ class SyncService {
|
|
|
|
|
if (modified == null) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
final List<Asset> newAssets = await modified.getAssets();
|
|
|
|
|
if (totalOnDevice != album.assets.length + newAssets.length) {
|
|
|
|
|
final int newCount = await modified.assetCountAsync;
|
|
|
|
|
if (totalOnDevice != album.assets.length + newCount) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
album.modifiedAt = ape.lastModified!.toUtc();
|
|
|
|
|
final result = await _linkWithExistingFromDb(newAssets);
|
|
|
|
|
try {
|
|
|
|
|
await _db.writeTxn(() async {
|
|
|
|
|
await _db.assets.putAll(result.second);
|
|
|
|
|
await album.assets.update(link: result.first + result.second);
|
|
|
|
|
await _db.albums.put(album);
|
|
|
|
|
});
|
|
|
|
|
_log.info("Fast synced local album $ape to DB");
|
|
|
|
|
for (int i = 0, stop = 0; stop == 0; i++) {
|
|
|
|
|
final List<Asset> newAssets =
|
|
|
|
|
await modified.getAssetsPage(page: i, size: batchSize);
|
|
|
|
|
stop = batchSize - newAssets.length;
|
|
|
|
|
final result = await _linkWithExistingFromDb(newAssets);
|
|
|
|
|
await _db.writeTxn(() async {
|
|
|
|
|
await _db.assets.putAll(result.second);
|
|
|
|
|
await album.assets.update(link: result.first);
|
|
|
|
|
await album.assets.update(link: result.second);
|
|
|
|
|
await _db.albums.put(album);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
} on IsarError catch (e) {
|
|
|
|
|
_log.severe("Failed to fast sync local album $ape to DB: $e");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
_log.info("Fast synced local album $ape to DB");
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<List<AssetPathEntity>> _batchedTimeSpans(
|
|
|
|
|
final AssetPathEntity ape, [
|
|
|
|
|
final int chunkLength = 31557600000, // ms in a norm year of 365.25 days
|
|
|
|
|
]) async {
|
|
|
|
|
int count = await ape.assetCountAsync;
|
|
|
|
|
if (count < 2 * batchSize || chunkLength / 1000 != chunkLength ~/ 1000) {
|
|
|
|
|
// use full if size ok OR timespan too short (less than two weeks)
|
|
|
|
|
return [ape];
|
|
|
|
|
}
|
|
|
|
|
final maxExact = ape.filterOption.updateTimeCond.max.millisecondsSinceEpoch;
|
|
|
|
|
// round up to full seconds because photomanager internally deals in seconds
|
|
|
|
|
final int upperBound = ((maxExact + 999) ~/ 1000) * 1000;
|
|
|
|
|
final lowerBound = ape.filterOption.updateTimeCond.min.toUtc();
|
|
|
|
|
final List<AssetPathEntity?> splits = [];
|
|
|
|
|
final int total = await ape.assetCountAsync;
|
|
|
|
|
int accounted = 0;
|
|
|
|
|
// idea: slice into chunks from most recent to old until oldest chunk has ok size
|
|
|
|
|
for (int ms = upperBound; ms > chunkLength; ms -= chunkLength) {
|
|
|
|
|
final upper = DateTime.fromMillisecondsSinceEpoch(ms, isUtc: true);
|
|
|
|
|
final lower = DateTime.fromMillisecondsSinceEpoch(
|
|
|
|
|
1000 + ms - chunkLength, // +1 second to not overlap
|
|
|
|
|
isUtc: true,
|
|
|
|
|
);
|
|
|
|
|
final olderUpper =
|
|
|
|
|
DateTime.fromMillisecondsSinceEpoch(ms - chunkLength, isUtc: true);
|
|
|
|
|
final AssetPathEntity? recent = await ape.fetchPathProperties(
|
|
|
|
|
filterOptionGroup: ape.filterOption
|
|
|
|
|
.copyWith(updateTimeCond: DateTimeCond(min: lower, max: upper)),
|
|
|
|
|
);
|
|
|
|
|
final AssetPathEntity? old = await ape.fetchPathProperties(
|
|
|
|
|
filterOptionGroup: ape.filterOption.copyWith(
|
|
|
|
|
updateTimeCond: DateTimeCond(min: lowerBound, max: olderUpper),
|
|
|
|
|
),
|
|
|
|
|
);
|
|
|
|
|
if (recent == null && old == null) {
|
|
|
|
|
// better safe than sorry, just use the full album
|
|
|
|
|
return [ape];
|
|
|
|
|
}
|
|
|
|
|
if (recent != null) {
|
|
|
|
|
accounted += await recent.assetCountAsync;
|
|
|
|
|
// recursivly slice each chunk in half until sizes are ok
|
|
|
|
|
splits.addAll(await _batchedTimeSpans(recent, chunkLength ~/ 2));
|
|
|
|
|
if (old == null) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (old != null && batchSize * 2 > await old.assetCountAsync) {
|
|
|
|
|
splits.add(old);
|
|
|
|
|
accounted += await old.assetCountAsync;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
assert(splits.length > 1, "logic error in _batchedTimeSpans");
|
|
|
|
|
assert(accounted == total, "assets changed while _batchedTimeSpans runs");
|
|
|
|
|
// merge too small chunks together if possible
|
|
|
|
|
for (int i = splits.length - 1; i > 0; i--) {
|
|
|
|
|
final older = splits[i]!;
|
|
|
|
|
final newer = splits[i - 1]!;
|
|
|
|
|
final currentCount = await older.assetCountAsync;
|
|
|
|
|
final nextCount = await newer.assetCountAsync;
|
|
|
|
|
AssetPathEntity? a;
|
|
|
|
|
if ((currentCount < batchSize / 2 || nextCount < batchSize / 2) &&
|
|
|
|
|
currentCount + nextCount < 2 * batchSize) {
|
|
|
|
|
a = await older.fetchPathProperties(
|
|
|
|
|
filterOptionGroup: older.filterOption.copyWith(
|
|
|
|
|
updateTimeCond: DateTimeCond(
|
|
|
|
|
min: older.filterOption.updateTimeCond.min,
|
|
|
|
|
max: newer.filterOption.updateTimeCond.max,
|
|
|
|
|
),
|
|
|
|
|
),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
if (a != null && currentCount + nextCount == await a.assetCountAsync) {
|
|
|
|
|
splits[i - 1] = a;
|
|
|
|
|
splits[i] = null;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
final List<AssetPathEntity> result = splits.whereNotNull().toList();
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Adds a new album from the device to the database and Accumulates all
|
|
|
|
|
/// assets already existing in the database to the list of `existing` assets
|
|
|
|
|
Future<void> _addAlbumFromDevice(
|
|
|
|
@ -518,24 +623,31 @@ class SyncService {
|
|
|
|
|
]) async {
|
|
|
|
|
_log.info("Syncing a new local album to DB: $ape");
|
|
|
|
|
final Album a = Album.local(ape);
|
|
|
|
|
final result = await _linkWithExistingFromDb(
|
|
|
|
|
await ape.getAssets(excludedAssets: excludedAssets),
|
|
|
|
|
);
|
|
|
|
|
_log.info(
|
|
|
|
|
"${result.first.length} assets already existed in DB, to upsert ${result.second.length}",
|
|
|
|
|
);
|
|
|
|
|
await _upsertAssetsWithExif(result.second);
|
|
|
|
|
existing.addAll(result.first);
|
|
|
|
|
a.assets.addAll(result.first);
|
|
|
|
|
a.assets.addAll(result.second);
|
|
|
|
|
final thumb = result.first.firstOrNull ?? result.second.firstOrNull;
|
|
|
|
|
a.thumbnail.value = thumb;
|
|
|
|
|
try {
|
|
|
|
|
await _db.writeTxn(() => _db.albums.store(a));
|
|
|
|
|
_log.info("Added a new local album to DB: $ape");
|
|
|
|
|
for (int i = 0, stop = 0; stop == 0; i++) {
|
|
|
|
|
final assets = await ape.getAssetsPage(
|
|
|
|
|
page: i, size: batchSize, excludedAssets: excludedAssets);
|
|
|
|
|
stop = batchSize - assets.length;
|
|
|
|
|
final existingUpdated = await _linkWithExistingFromDb(assets);
|
|
|
|
|
_log.info(
|
|
|
|
|
"${existingUpdated.first.length} assets already existed in DB, to upsert ${existingUpdated.second.length}",
|
|
|
|
|
);
|
|
|
|
|
await _upsertAssetsWithExif(existingUpdated.second);
|
|
|
|
|
existing.addAll(existingUpdated.first);
|
|
|
|
|
a.thumbnail.value ??= existingUpdated.first.firstOrNull ??
|
|
|
|
|
existingUpdated.second.firstOrNull;
|
|
|
|
|
await _db.writeTxn(() async {
|
|
|
|
|
await a.assets.update(link: existingUpdated.first);
|
|
|
|
|
await a.assets.update(link: existingUpdated.second);
|
|
|
|
|
await a.thumbnail.save();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
} on IsarError catch (e) {
|
|
|
|
|
_log.severe("Failed to add new local album $ape to DB: $e");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
_log.info("Added a new local album to DB: $ape");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns a tuple (existing, updated)
|
|
|
|
|