Compare commits

...

1 commit

Author SHA1 Message Date
Fynn Petersen-Frey
a79f2fa618 feature(mobile): memory efficient sync of large local albums 2023-03-28 16:05:24 +02:00
4 changed files with 200 additions and 65 deletions

View file

@ -126,9 +126,13 @@ class AlbumService {
final Set<String> result = HashSet<String>();
for (AssetPathEntity a in albums) {
if (excludedAlbumIds.contains(a.id)) {
final List<AssetEntity> assets =
await a.getAssetListRange(start: 0, end: 0x7fffffffffffffff);
result.addAll(assets.map((e) => e.id));
const int batchSize = 1000;
for (int i = 0, stop = 0; stop == 0; i++) {
final List<AssetEntity> assets =
await a.getAssetListPaged(page: i, size: batchSize);
result.addAll(assets.map((e) => e.id));
stop = batchSize - assets.length;
}
}
}
return result;

View file

@ -174,6 +174,21 @@ extension AssetPathEntityHelper on AssetPathEntity {
}
return assetEntities.map(Asset.local).toList();
}
Future<List<Asset>> getAssetsPage({
required int page,
required int size,
Set<String>? excludedAssets,
}) async {
final assetEntities = await getAssetListPaged(page: page, size: size);
if (excludedAssets != null) {
return assetEntities
.where((e) => !excludedAssets.contains(e.id))
.map(Asset.local)
.toList();
}
return assetEntities.map(Asset.local).toList();
}
}
extension AlbumResponseDtoHelper on AlbumResponseDto {

View file

@ -117,7 +117,11 @@ class ImmichLogger {
void flush() {
if (_timer != null) {
_timer!.cancel();
_db.writeTxnSync(() => _db.loggerMessages.putAllSync(_msgBuffer));
try {
_db.writeTxnSync(() => _db.loggerMessages.putAllSync(_msgBuffer));
} on IsarError {
debugPrint("Failed to flush log messages to persistent storage!");
}
}
}
}

View file

@ -23,6 +23,7 @@ class SyncService {
final Isar _db;
final AsyncMutex _lock = AsyncMutex();
final Logger _log = Logger('SyncService');
static const int batchSize = 100;
SyncService(this._db);
@ -425,50 +426,67 @@ class SyncService {
}
// general case, e.g. some assets have been deleted or there are excluded albums on iOS
final inDb = await album.assets
.filter()
.ownerIdEqualTo(Store.get(StoreKey.currentUser).isarId)
.deviceIdEqualTo(Store.get(StoreKey.deviceIdHash))
.sortByLocalId()
.findAll();
final List<Asset> onDevice =
await ape.getAssets(excludedAssets: excludedAssets);
onDevice.sort(Asset.compareByLocalId);
final d = _diffAssets(onDevice, inDb, compare: Asset.compareByLocalId);
final List<Asset> toAdd = d.first, toUpdate = d.second, toDelete = d.third;
if (toAdd.isEmpty &&
toUpdate.isEmpty &&
toDelete.isEmpty &&
album.name == ape.name &&
album.modifiedAt == ape.lastModified) {
// changes only affeted excluded albums
return false;
}
final result = await _linkWithExistingFromDb(toAdd);
deleteCandidates.addAll(toDelete);
existing.addAll(result.first);
album.name = ape.name;
album.modifiedAt = ape.lastModified!;
if (album.thumbnail.value != null &&
toDelete.contains(album.thumbnail.value)) {
album.thumbnail.value = null;
}
final List<AssetPathEntity> splits = await _batchedTimeSpans(ape);
bool changes = false;
try {
await _db.writeTxn(() async {
await _db.assets.putAll(result.second);
await _db.assets.putAll(toUpdate);
await album.assets
.update(link: result.first + result.second, unlink: toDelete);
await _db.albums.put(album);
for (AssetPathEntity a in splits) {
final inDb = await album.assets
.filter()
.ownerIdEqualTo(Store.get(StoreKey.currentUser).isarId)
.deviceIdEqualTo(Store.get(StoreKey.deviceIdHash))
.fileModifiedAtBetween(
a == splits.last
? DateTime.fromMillisecondsSinceEpoch(0)
: a.filterOption.updateTimeCond.min,
a == splits.first
? DateTime.now()
: a.filterOption.updateTimeCond.max,
)
.sortByLocalId()
.findAll();
final List<Asset> onDevice =
await a.getAssets(excludedAssets: excludedAssets);
onDevice.sort(Asset.compareByLocalId);
final d = _diffAssets(onDevice, inDb, compare: Asset.compareByLocalId);
final List<Asset> toAdd = d.first,
toUpdate = d.second,
toDelete = d.third;
if (toAdd.isEmpty && toUpdate.isEmpty && toDelete.isEmpty) {
// no changes in this batch OR changes only affeted excluded assets
continue;
}
final result = await _linkWithExistingFromDb(toAdd);
deleteCandidates.addAll(toDelete);
existing.addAll(result.first);
if (album.thumbnail.value != null &&
toDelete.contains(album.thumbnail.value)) {
album.thumbnail.value = null;
}
await _db.writeTxn(() async {
await _db.assets.putAll(result.second);
await _db.assets.putAll(toUpdate);
await album.assets.update(link: result.first, unlink: toDelete);
await album.assets.update(link: result.second);
});
changes = true;
}
if (changes ||
album.name != ape.name ||
album.modifiedAt != ape.lastModified) {
album.name = ape.name;
album.modifiedAt = ape.lastModified!;
album.thumbnail.value ??= await album.assets.filter().findFirst();
await album.thumbnail.save();
});
_log.info("Synced changes of local album $ape to DB");
await _db.writeTxn(() async {
await _db.albums.put(album);
await album.thumbnail.save();
});
_log.info("Synced changes of local album $ape to DB");
}
} on IsarError catch (e) {
_log.severe("Failed to update synced album $ape in DB: $e");
return false;
}
return true;
return changes;
}
/// fast path for common case: only new assets were added to device album
@ -488,27 +506,114 @@ class SyncService {
if (modified == null) {
return false;
}
final List<Asset> newAssets = await modified.getAssets();
if (totalOnDevice != album.assets.length + newAssets.length) {
final int newCount = await modified.assetCountAsync;
if (totalOnDevice != album.assets.length + newCount) {
return false;
}
album.modifiedAt = ape.lastModified!.toUtc();
final result = await _linkWithExistingFromDb(newAssets);
try {
await _db.writeTxn(() async {
await _db.assets.putAll(result.second);
await album.assets.update(link: result.first + result.second);
await _db.albums.put(album);
});
_log.info("Fast synced local album $ape to DB");
for (int i = 0, stop = 0; stop == 0; i++) {
final List<Asset> newAssets =
await modified.getAssetsPage(page: i, size: batchSize);
stop = batchSize - newAssets.length;
final result = await _linkWithExistingFromDb(newAssets);
await _db.writeTxn(() async {
await _db.assets.putAll(result.second);
await album.assets.update(link: result.first);
await album.assets.update(link: result.second);
await _db.albums.put(album);
});
}
} on IsarError catch (e) {
_log.severe("Failed to fast sync local album $ape to DB: $e");
return false;
}
_log.info("Fast synced local album $ape to DB");
return true;
}
Future<List<AssetPathEntity>> _batchedTimeSpans(
final AssetPathEntity ape, [
final int chunkLength = 31557600000, // ms in a norm year of 365.25 days
]) async {
int count = await ape.assetCountAsync;
if (count < 2 * batchSize || chunkLength / 1000 != chunkLength ~/ 1000) {
// use full if size ok OR timespan too short (less than two weeks)
return [ape];
}
final maxExact = ape.filterOption.updateTimeCond.max.millisecondsSinceEpoch;
// round up to full seconds because photomanager internally deals in seconds
final int upperBound = ((maxExact + 999) ~/ 1000) * 1000;
final lowerBound = ape.filterOption.updateTimeCond.min.toUtc();
final List<AssetPathEntity?> splits = [];
final int total = await ape.assetCountAsync;
int accounted = 0;
// idea: slice into chunks from most recent to old until oldest chunk has ok size
for (int ms = upperBound; ms > chunkLength; ms -= chunkLength) {
final upper = DateTime.fromMillisecondsSinceEpoch(ms, isUtc: true);
final lower = DateTime.fromMillisecondsSinceEpoch(
1000 + ms - chunkLength, // +1 second to not overlap
isUtc: true,
);
final olderUpper =
DateTime.fromMillisecondsSinceEpoch(ms - chunkLength, isUtc: true);
final AssetPathEntity? recent = await ape.fetchPathProperties(
filterOptionGroup: ape.filterOption
.copyWith(updateTimeCond: DateTimeCond(min: lower, max: upper)),
);
final AssetPathEntity? old = await ape.fetchPathProperties(
filterOptionGroup: ape.filterOption.copyWith(
updateTimeCond: DateTimeCond(min: lowerBound, max: olderUpper),
),
);
if (recent == null && old == null) {
// better safe than sorry, just use the full album
return [ape];
}
if (recent != null) {
accounted += await recent.assetCountAsync;
// recursivly slice each chunk in half until sizes are ok
splits.addAll(await _batchedTimeSpans(recent, chunkLength ~/ 2));
if (old == null) {
break;
}
}
if (old != null && batchSize * 2 > await old.assetCountAsync) {
splits.add(old);
accounted += await old.assetCountAsync;
break;
}
}
assert(splits.length > 1, "logic error in _batchedTimeSpans");
assert(accounted == total, "assets changed while _batchedTimeSpans runs");
// merge too small chunks together if possible
for (int i = splits.length - 1; i > 0; i--) {
final older = splits[i]!;
final newer = splits[i - 1]!;
final currentCount = await older.assetCountAsync;
final nextCount = await newer.assetCountAsync;
AssetPathEntity? a;
if ((currentCount < batchSize / 2 || nextCount < batchSize / 2) &&
currentCount + nextCount < 2 * batchSize) {
a = await older.fetchPathProperties(
filterOptionGroup: older.filterOption.copyWith(
updateTimeCond: DateTimeCond(
min: older.filterOption.updateTimeCond.min,
max: newer.filterOption.updateTimeCond.max,
),
),
);
}
if (a != null && currentCount + nextCount == await a.assetCountAsync) {
splits[i - 1] = a;
splits[i] = null;
}
}
final List<AssetPathEntity> result = splits.whereNotNull().toList();
return result;
}
/// Adds a new album from the device to the database and Accumulates all
/// assets already existing in the database to the list of `existing` assets
Future<void> _addAlbumFromDevice(
@ -518,24 +623,31 @@ class SyncService {
]) async {
_log.info("Syncing a new local album to DB: $ape");
final Album a = Album.local(ape);
final result = await _linkWithExistingFromDb(
await ape.getAssets(excludedAssets: excludedAssets),
);
_log.info(
"${result.first.length} assets already existed in DB, to upsert ${result.second.length}",
);
await _upsertAssetsWithExif(result.second);
existing.addAll(result.first);
a.assets.addAll(result.first);
a.assets.addAll(result.second);
final thumb = result.first.firstOrNull ?? result.second.firstOrNull;
a.thumbnail.value = thumb;
try {
await _db.writeTxn(() => _db.albums.store(a));
_log.info("Added a new local album to DB: $ape");
for (int i = 0, stop = 0; stop == 0; i++) {
final assets = await ape.getAssetsPage(
page: i, size: batchSize, excludedAssets: excludedAssets);
stop = batchSize - assets.length;
final existingUpdated = await _linkWithExistingFromDb(assets);
_log.info(
"${existingUpdated.first.length} assets already existed in DB, to upsert ${existingUpdated.second.length}",
);
await _upsertAssetsWithExif(existingUpdated.second);
existing.addAll(existingUpdated.first);
a.thumbnail.value ??= existingUpdated.first.firstOrNull ??
existingUpdated.second.firstOrNull;
await _db.writeTxn(() async {
await a.assets.update(link: existingUpdated.first);
await a.assets.update(link: existingUpdated.second);
await a.thumbnail.save();
});
}
} on IsarError catch (e) {
_log.severe("Failed to add new local album $ape to DB: $e");
return;
}
_log.info("Added a new local album to DB: $ape");
}
/// Returns a tuple (existing, updated)