feature(mobile): memory efficient sync of large local albums
This commit is contained in:
parent
e0b80f49b6
commit
a79f2fa618
4 changed files with 200 additions and 65 deletions
|
@ -126,9 +126,13 @@ class AlbumService {
|
|||
final Set<String> result = HashSet<String>();
|
||||
for (AssetPathEntity a in albums) {
|
||||
if (excludedAlbumIds.contains(a.id)) {
|
||||
final List<AssetEntity> assets =
|
||||
await a.getAssetListRange(start: 0, end: 0x7fffffffffffffff);
|
||||
result.addAll(assets.map((e) => e.id));
|
||||
const int batchSize = 1000;
|
||||
for (int i = 0, stop = 0; stop == 0; i++) {
|
||||
final List<AssetEntity> assets =
|
||||
await a.getAssetListPaged(page: i, size: batchSize);
|
||||
result.addAll(assets.map((e) => e.id));
|
||||
stop = batchSize - assets.length;
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
|
|
@ -174,6 +174,21 @@ extension AssetPathEntityHelper on AssetPathEntity {
|
|||
}
|
||||
return assetEntities.map(Asset.local).toList();
|
||||
}
|
||||
|
||||
Future<List<Asset>> getAssetsPage({
|
||||
required int page,
|
||||
required int size,
|
||||
Set<String>? excludedAssets,
|
||||
}) async {
|
||||
final assetEntities = await getAssetListPaged(page: page, size: size);
|
||||
if (excludedAssets != null) {
|
||||
return assetEntities
|
||||
.where((e) => !excludedAssets.contains(e.id))
|
||||
.map(Asset.local)
|
||||
.toList();
|
||||
}
|
||||
return assetEntities.map(Asset.local).toList();
|
||||
}
|
||||
}
|
||||
|
||||
extension AlbumResponseDtoHelper on AlbumResponseDto {
|
||||
|
|
|
@ -117,7 +117,11 @@ class ImmichLogger {
|
|||
void flush() {
|
||||
if (_timer != null) {
|
||||
_timer!.cancel();
|
||||
_db.writeTxnSync(() => _db.loggerMessages.putAllSync(_msgBuffer));
|
||||
try {
|
||||
_db.writeTxnSync(() => _db.loggerMessages.putAllSync(_msgBuffer));
|
||||
} on IsarError {
|
||||
debugPrint("Failed to flush log messages to persistent storage!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ class SyncService {
|
|||
final Isar _db;
|
||||
final AsyncMutex _lock = AsyncMutex();
|
||||
final Logger _log = Logger('SyncService');
|
||||
static const int batchSize = 100;
|
||||
|
||||
SyncService(this._db);
|
||||
|
||||
|
@ -425,50 +426,67 @@ class SyncService {
|
|||
}
|
||||
|
||||
// general case, e.g. some assets have been deleted or there are excluded albums on iOS
|
||||
final inDb = await album.assets
|
||||
.filter()
|
||||
.ownerIdEqualTo(Store.get(StoreKey.currentUser).isarId)
|
||||
.deviceIdEqualTo(Store.get(StoreKey.deviceIdHash))
|
||||
.sortByLocalId()
|
||||
.findAll();
|
||||
final List<Asset> onDevice =
|
||||
await ape.getAssets(excludedAssets: excludedAssets);
|
||||
onDevice.sort(Asset.compareByLocalId);
|
||||
final d = _diffAssets(onDevice, inDb, compare: Asset.compareByLocalId);
|
||||
final List<Asset> toAdd = d.first, toUpdate = d.second, toDelete = d.third;
|
||||
if (toAdd.isEmpty &&
|
||||
toUpdate.isEmpty &&
|
||||
toDelete.isEmpty &&
|
||||
album.name == ape.name &&
|
||||
album.modifiedAt == ape.lastModified) {
|
||||
// changes only affeted excluded albums
|
||||
return false;
|
||||
}
|
||||
final result = await _linkWithExistingFromDb(toAdd);
|
||||
deleteCandidates.addAll(toDelete);
|
||||
existing.addAll(result.first);
|
||||
album.name = ape.name;
|
||||
album.modifiedAt = ape.lastModified!;
|
||||
if (album.thumbnail.value != null &&
|
||||
toDelete.contains(album.thumbnail.value)) {
|
||||
album.thumbnail.value = null;
|
||||
}
|
||||
final List<AssetPathEntity> splits = await _batchedTimeSpans(ape);
|
||||
bool changes = false;
|
||||
try {
|
||||
await _db.writeTxn(() async {
|
||||
await _db.assets.putAll(result.second);
|
||||
await _db.assets.putAll(toUpdate);
|
||||
await album.assets
|
||||
.update(link: result.first + result.second, unlink: toDelete);
|
||||
await _db.albums.put(album);
|
||||
for (AssetPathEntity a in splits) {
|
||||
final inDb = await album.assets
|
||||
.filter()
|
||||
.ownerIdEqualTo(Store.get(StoreKey.currentUser).isarId)
|
||||
.deviceIdEqualTo(Store.get(StoreKey.deviceIdHash))
|
||||
.fileModifiedAtBetween(
|
||||
a == splits.last
|
||||
? DateTime.fromMillisecondsSinceEpoch(0)
|
||||
: a.filterOption.updateTimeCond.min,
|
||||
a == splits.first
|
||||
? DateTime.now()
|
||||
: a.filterOption.updateTimeCond.max,
|
||||
)
|
||||
.sortByLocalId()
|
||||
.findAll();
|
||||
final List<Asset> onDevice =
|
||||
await a.getAssets(excludedAssets: excludedAssets);
|
||||
onDevice.sort(Asset.compareByLocalId);
|
||||
final d = _diffAssets(onDevice, inDb, compare: Asset.compareByLocalId);
|
||||
final List<Asset> toAdd = d.first,
|
||||
toUpdate = d.second,
|
||||
toDelete = d.third;
|
||||
if (toAdd.isEmpty && toUpdate.isEmpty && toDelete.isEmpty) {
|
||||
// no changes in this batch OR changes only affeted excluded assets
|
||||
continue;
|
||||
}
|
||||
final result = await _linkWithExistingFromDb(toAdd);
|
||||
deleteCandidates.addAll(toDelete);
|
||||
existing.addAll(result.first);
|
||||
if (album.thumbnail.value != null &&
|
||||
toDelete.contains(album.thumbnail.value)) {
|
||||
album.thumbnail.value = null;
|
||||
}
|
||||
await _db.writeTxn(() async {
|
||||
await _db.assets.putAll(result.second);
|
||||
await _db.assets.putAll(toUpdate);
|
||||
await album.assets.update(link: result.first, unlink: toDelete);
|
||||
await album.assets.update(link: result.second);
|
||||
});
|
||||
changes = true;
|
||||
}
|
||||
if (changes ||
|
||||
album.name != ape.name ||
|
||||
album.modifiedAt != ape.lastModified) {
|
||||
album.name = ape.name;
|
||||
album.modifiedAt = ape.lastModified!;
|
||||
album.thumbnail.value ??= await album.assets.filter().findFirst();
|
||||
await album.thumbnail.save();
|
||||
});
|
||||
_log.info("Synced changes of local album $ape to DB");
|
||||
await _db.writeTxn(() async {
|
||||
await _db.albums.put(album);
|
||||
await album.thumbnail.save();
|
||||
});
|
||||
_log.info("Synced changes of local album $ape to DB");
|
||||
}
|
||||
} on IsarError catch (e) {
|
||||
_log.severe("Failed to update synced album $ape in DB: $e");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
return changes;
|
||||
}
|
||||
|
||||
/// fast path for common case: only new assets were added to device album
|
||||
|
@ -488,27 +506,114 @@ class SyncService {
|
|||
if (modified == null) {
|
||||
return false;
|
||||
}
|
||||
final List<Asset> newAssets = await modified.getAssets();
|
||||
if (totalOnDevice != album.assets.length + newAssets.length) {
|
||||
final int newCount = await modified.assetCountAsync;
|
||||
if (totalOnDevice != album.assets.length + newCount) {
|
||||
return false;
|
||||
}
|
||||
album.modifiedAt = ape.lastModified!.toUtc();
|
||||
final result = await _linkWithExistingFromDb(newAssets);
|
||||
try {
|
||||
await _db.writeTxn(() async {
|
||||
await _db.assets.putAll(result.second);
|
||||
await album.assets.update(link: result.first + result.second);
|
||||
await _db.albums.put(album);
|
||||
});
|
||||
_log.info("Fast synced local album $ape to DB");
|
||||
for (int i = 0, stop = 0; stop == 0; i++) {
|
||||
final List<Asset> newAssets =
|
||||
await modified.getAssetsPage(page: i, size: batchSize);
|
||||
stop = batchSize - newAssets.length;
|
||||
final result = await _linkWithExistingFromDb(newAssets);
|
||||
await _db.writeTxn(() async {
|
||||
await _db.assets.putAll(result.second);
|
||||
await album.assets.update(link: result.first);
|
||||
await album.assets.update(link: result.second);
|
||||
await _db.albums.put(album);
|
||||
});
|
||||
}
|
||||
} on IsarError catch (e) {
|
||||
_log.severe("Failed to fast sync local album $ape to DB: $e");
|
||||
return false;
|
||||
}
|
||||
_log.info("Fast synced local album $ape to DB");
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
Future<List<AssetPathEntity>> _batchedTimeSpans(
|
||||
final AssetPathEntity ape, [
|
||||
final int chunkLength = 31557600000, // ms in a norm year of 365.25 days
|
||||
]) async {
|
||||
int count = await ape.assetCountAsync;
|
||||
if (count < 2 * batchSize || chunkLength / 1000 != chunkLength ~/ 1000) {
|
||||
// use full if size ok OR timespan too short (less than two weeks)
|
||||
return [ape];
|
||||
}
|
||||
final maxExact = ape.filterOption.updateTimeCond.max.millisecondsSinceEpoch;
|
||||
// round up to full seconds because photomanager internally deals in seconds
|
||||
final int upperBound = ((maxExact + 999) ~/ 1000) * 1000;
|
||||
final lowerBound = ape.filterOption.updateTimeCond.min.toUtc();
|
||||
final List<AssetPathEntity?> splits = [];
|
||||
final int total = await ape.assetCountAsync;
|
||||
int accounted = 0;
|
||||
// idea: slice into chunks from most recent to old until oldest chunk has ok size
|
||||
for (int ms = upperBound; ms > chunkLength; ms -= chunkLength) {
|
||||
final upper = DateTime.fromMillisecondsSinceEpoch(ms, isUtc: true);
|
||||
final lower = DateTime.fromMillisecondsSinceEpoch(
|
||||
1000 + ms - chunkLength, // +1 second to not overlap
|
||||
isUtc: true,
|
||||
);
|
||||
final olderUpper =
|
||||
DateTime.fromMillisecondsSinceEpoch(ms - chunkLength, isUtc: true);
|
||||
final AssetPathEntity? recent = await ape.fetchPathProperties(
|
||||
filterOptionGroup: ape.filterOption
|
||||
.copyWith(updateTimeCond: DateTimeCond(min: lower, max: upper)),
|
||||
);
|
||||
final AssetPathEntity? old = await ape.fetchPathProperties(
|
||||
filterOptionGroup: ape.filterOption.copyWith(
|
||||
updateTimeCond: DateTimeCond(min: lowerBound, max: olderUpper),
|
||||
),
|
||||
);
|
||||
if (recent == null && old == null) {
|
||||
// better safe than sorry, just use the full album
|
||||
return [ape];
|
||||
}
|
||||
if (recent != null) {
|
||||
accounted += await recent.assetCountAsync;
|
||||
// recursivly slice each chunk in half until sizes are ok
|
||||
splits.addAll(await _batchedTimeSpans(recent, chunkLength ~/ 2));
|
||||
if (old == null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (old != null && batchSize * 2 > await old.assetCountAsync) {
|
||||
splits.add(old);
|
||||
accounted += await old.assetCountAsync;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert(splits.length > 1, "logic error in _batchedTimeSpans");
|
||||
assert(accounted == total, "assets changed while _batchedTimeSpans runs");
|
||||
// merge too small chunks together if possible
|
||||
for (int i = splits.length - 1; i > 0; i--) {
|
||||
final older = splits[i]!;
|
||||
final newer = splits[i - 1]!;
|
||||
final currentCount = await older.assetCountAsync;
|
||||
final nextCount = await newer.assetCountAsync;
|
||||
AssetPathEntity? a;
|
||||
if ((currentCount < batchSize / 2 || nextCount < batchSize / 2) &&
|
||||
currentCount + nextCount < 2 * batchSize) {
|
||||
a = await older.fetchPathProperties(
|
||||
filterOptionGroup: older.filterOption.copyWith(
|
||||
updateTimeCond: DateTimeCond(
|
||||
min: older.filterOption.updateTimeCond.min,
|
||||
max: newer.filterOption.updateTimeCond.max,
|
||||
),
|
||||
),
|
||||
);
|
||||
}
|
||||
if (a != null && currentCount + nextCount == await a.assetCountAsync) {
|
||||
splits[i - 1] = a;
|
||||
splits[i] = null;
|
||||
}
|
||||
}
|
||||
final List<AssetPathEntity> result = splits.whereNotNull().toList();
|
||||
return result;
|
||||
}
|
||||
|
||||
/// Adds a new album from the device to the database and Accumulates all
|
||||
/// assets already existing in the database to the list of `existing` assets
|
||||
Future<void> _addAlbumFromDevice(
|
||||
|
@ -518,24 +623,31 @@ class SyncService {
|
|||
]) async {
|
||||
_log.info("Syncing a new local album to DB: $ape");
|
||||
final Album a = Album.local(ape);
|
||||
final result = await _linkWithExistingFromDb(
|
||||
await ape.getAssets(excludedAssets: excludedAssets),
|
||||
);
|
||||
_log.info(
|
||||
"${result.first.length} assets already existed in DB, to upsert ${result.second.length}",
|
||||
);
|
||||
await _upsertAssetsWithExif(result.second);
|
||||
existing.addAll(result.first);
|
||||
a.assets.addAll(result.first);
|
||||
a.assets.addAll(result.second);
|
||||
final thumb = result.first.firstOrNull ?? result.second.firstOrNull;
|
||||
a.thumbnail.value = thumb;
|
||||
try {
|
||||
await _db.writeTxn(() => _db.albums.store(a));
|
||||
_log.info("Added a new local album to DB: $ape");
|
||||
for (int i = 0, stop = 0; stop == 0; i++) {
|
||||
final assets = await ape.getAssetsPage(
|
||||
page: i, size: batchSize, excludedAssets: excludedAssets);
|
||||
stop = batchSize - assets.length;
|
||||
final existingUpdated = await _linkWithExistingFromDb(assets);
|
||||
_log.info(
|
||||
"${existingUpdated.first.length} assets already existed in DB, to upsert ${existingUpdated.second.length}",
|
||||
);
|
||||
await _upsertAssetsWithExif(existingUpdated.second);
|
||||
existing.addAll(existingUpdated.first);
|
||||
a.thumbnail.value ??= existingUpdated.first.firstOrNull ??
|
||||
existingUpdated.second.firstOrNull;
|
||||
await _db.writeTxn(() async {
|
||||
await a.assets.update(link: existingUpdated.first);
|
||||
await a.assets.update(link: existingUpdated.second);
|
||||
await a.thumbnail.save();
|
||||
});
|
||||
}
|
||||
} on IsarError catch (e) {
|
||||
_log.severe("Failed to add new local album $ape to DB: $e");
|
||||
return;
|
||||
}
|
||||
_log.info("Added a new local album to DB: $ape");
|
||||
}
|
||||
|
||||
/// Returns a tuple (existing, updated)
|
||||
|
|
Loading…
Reference in a new issue