Compare commits

...

4 commits

Author SHA1 Message Date
Jonathan Jogenfors
7bea3eca37 fix: lint 2023-10-09 15:19:54 +02:00
Jonathan Jogenfors
46d4626ac4 Merge branch 'main' of https://github.com/immich-app/immich into fix/library-scanning 2023-10-09 15:17:50 +02:00
Jonathan Jogenfors
c111f11192 Merge branch 'main' of https://github.com/immich-app/immich into fix/library-scanning 2023-10-09 15:17:21 +02:00
Jason Rasmussen
89cf1dcdce
feat(server): improve library scanning 2023-09-26 21:38:41 -04:00
9 changed files with 159 additions and 214 deletions

View file

@ -69,7 +69,6 @@ export enum JobName {
LIBRARY_SCAN = 'library-refresh',
LIBRARY_SCAN_ASSET = 'library-refresh-asset',
LIBRARY_REMOVE_OFFLINE = 'library-remove-offline',
LIBRARY_MARK_ASSET_OFFLINE = 'library-mark-asset-offline',
LIBRARY_DELETE = 'library-delete',
LIBRARY_QUEUE_SCAN_ALL = 'library-queue-all-refresh',
LIBRARY_QUEUE_CLEANUP = 'library-queue-cleanup',
@ -172,7 +171,6 @@ export const JOBS_TO_QUEUE: Record<JobName, QueueName> = {
// Library managment
[JobName.LIBRARY_SCAN_ASSET]: QueueName.LIBRARY,
[JobName.LIBRARY_MARK_ASSET_OFFLINE]: QueueName.LIBRARY,
[JobName.LIBRARY_SCAN]: QueueName.LIBRARY,
[JobName.LIBRARY_DELETE]: QueueName.LIBRARY,
[JobName.LIBRARY_REMOVE_OFFLINE]: QueueName.LIBRARY,

View file

@ -1,5 +1,4 @@
import { JobName, QueueName } from './job.constants';
import {
IAssetDeletionJob,
IAssetFaceJob,
@ -9,7 +8,6 @@ import {
IEntityJob,
ILibraryFileJob,
ILibraryRefreshJob,
IOfflineLibraryFileJob,
} from './job.interface';
export interface JobCounts {
@ -88,7 +86,6 @@ export type JobItem =
// Library Managment
| { name: JobName.LIBRARY_SCAN_ASSET; data: ILibraryFileJob }
| { name: JobName.LIBRARY_MARK_ASSET_OFFLINE; data: IOfflineLibraryFileJob }
| { name: JobName.LIBRARY_SCAN; data: ILibraryRefreshJob }
| { name: JobName.LIBRARY_REMOVE_OFFLINE; data: IEntityJob }
| { name: JobName.LIBRARY_DELETE; data: IEntityJob }

View file

@ -16,7 +16,6 @@ export interface ILibraryRepository {
getUploadLibraryCount(ownerId: string): Promise<number>;
update(library: Partial<LibraryEntity>): Promise<LibraryEntity>;
getStatistics(id: string): Promise<LibraryStatsResponseDto>;
getOnlineAssetPaths(id: string): Promise<string[]>;
getAssetIds(id: string, withDeleted?: boolean): Promise<string[]>;
existsByName(name: string, withDeleted?: boolean): Promise<boolean>;
}

View file

@ -1,10 +1,9 @@
import { AssetType, LibraryType, UserEntity } from '@app/infra/entities';
import { BadRequestException } from '@nestjs/common';
import {
IAccessRepositoryMock,
assetStub,
authStub,
IAccessRepositoryMock,
libraryStub,
newAccessRepositoryMock,
newAssetRepositoryMock,
@ -16,9 +15,11 @@ import {
userStub,
} from '@test';
import { Stats } from 'fs';
import { IJobRepository, ILibraryFileJob, ILibraryRefreshJob, IOfflineLibraryFileJob, JobName } from '../job';
import { IAssetRepository, ICryptoRepository, IStorageRepository, IUserRepository } from '..';
import { IAssetRepository } from '../asset';
import { ICryptoRepository } from '../crypto';
import { IJobRepository, ILibraryFileJob, ILibraryRefreshJob, JobName } from '../job';
import { IStorageRepository } from '../storage';
import { IUserRepository } from '../user';
import { ILibraryRepository } from './library.repository';
import { LibraryService } from './library.service';
@ -68,7 +69,6 @@ describe(LibraryService.name, () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
storageMock.crawl.mockResolvedValue(['/data/user2/photo.jpg']);
assetMock.getByLibraryId.mockResolvedValue([]);
libraryMock.getOnlineAssetPaths.mockResolvedValue([]);
userMock.get.mockResolvedValue(userStub.externalPath1);
await sut.handleQueueAssetRefresh(mockLibraryJob);
@ -86,7 +86,6 @@ describe(LibraryService.name, () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
storageMock.crawl.mockResolvedValue(['/data/user1/photo.jpg']);
assetMock.getByLibraryId.mockResolvedValue([]);
libraryMock.getOnlineAssetPaths.mockResolvedValue([]);
userMock.get.mockResolvedValue(userStub.externalPath1);
await sut.handleQueueAssetRefresh(mockLibraryJob);
@ -116,22 +115,9 @@ describe(LibraryService.name, () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
storageMock.crawl.mockResolvedValue(['/data/user1/photo.jpg']);
assetMock.getByLibraryId.mockResolvedValue([assetStub.external]);
libraryMock.getOnlineAssetPaths.mockResolvedValue([]);
userMock.get.mockResolvedValue(userStub.externalPath2);
await sut.handleQueueAssetRefresh(mockLibraryJob);
expect(jobMock.queue.mock.calls).toEqual([
[
{
name: JobName.LIBRARY_MARK_ASSET_OFFLINE,
data: {
id: libraryStub.externalLibrary1.id,
assetPath: '/data/user1/photo.jpg',
},
},
],
]);
});
it('should not scan libraries owned by user without external path', async () => {
@ -595,24 +581,6 @@ describe(LibraryService.name, () => {
});
});
describe('handleOfflineAsset', () => {
it('should mark an asset as offline', async () => {
const offlineJob: IOfflineLibraryFileJob = {
id: libraryStub.externalLibrary1.id,
assetPath: '/data/user1/photo.jpg',
};
assetMock.getByLibraryIdAndOriginalPath.mockResolvedValue(assetStub.image);
await expect(sut.handleOfflineAsset(offlineJob)).resolves.toBe(true);
expect(assetMock.save).toHaveBeenCalledWith({
id: assetStub.image.id,
isOffline: true,
});
});
});
describe('delete', () => {
it('should delete a library', async () => {
assetMock.getByLibraryIdAndOriginalPath.mockResolvedValue(assetStub.image);

View file

@ -1,8 +1,7 @@
import { AssetType, LibraryType } from '@app/infra/entities';
import { AssetEntity, AssetType, LibraryType, UserEntity } from '@app/infra/entities';
import { BadRequestException, Inject, Injectable, Logger } from '@nestjs/common';
import { DateTime } from 'luxon';
import { R_OK } from 'node:constants';
import { Stats } from 'node:fs';
import path from 'node:path';
import { basename, parse } from 'path';
import { AccessCore, IAccessRepository, Permission } from '../access';
import { IAssetRepository, WithProperty } from '../asset';
@ -16,7 +15,6 @@ import {
IJobRepository,
ILibraryFileJob,
ILibraryRefreshJob,
IOfflineLibraryFileJob,
JOBS_ASSET_PAGINATION_SIZE,
JobName,
} from '../job';
@ -150,70 +148,29 @@ export class LibraryService {
}
async handleAssetRefresh(job: ILibraryFileJob) {
const assetPath = path.normalize(job.assetPath);
const user = await this.userRepository.get(job.ownerId);
if (!user?.externalPath) {
this.logger.warn('User has no external path set, cannot import asset');
const { id, ownerId, assetPath, force } = job;
const user = await this.userRepository.get(ownerId);
if (!this.hasAccess(user, assetPath)) {
return false;
}
if (!path.normalize(assetPath).match(new RegExp(`^${path.normalize(user.externalPath)}`))) {
this.logger.error("Asset must be within the user's external path");
const library = await this.repository.get(id, true);
if (library?.deletedAt) {
this.logger.error(`${assetPath} - skipped (deleted library)`);
return false;
}
const existingAssetEntity = await this.assetRepository.getByLibraryIdAndOriginalPath(job.id, assetPath);
let stats: Stats;
try {
stats = await this.storageRepository.stat(assetPath);
} catch (error: Error | any) {
// Can't access file, probably offline
if (existingAssetEntity) {
// Mark asset as offline
this.logger.debug(`Marking asset as offline: ${assetPath}`);
await this.assetRepository.save({ id: existingAssetEntity.id, isOffline: true });
let asset = await this.assetRepository.getByLibraryIdAndOriginalPath(id, assetPath);
const stats = await this.storageRepository.stat(assetPath).catch(() => null);
if (!stats) {
if (asset) {
this.logger.debug(`${assetPath} - updating (offline)`);
await this.assetRepository.save({ id: asset.id, isOffline: true });
return true;
} else {
// File can't be accessed and does not already exist in db
throw new BadRequestException("Can't access file", { cause: error });
}
}
let doImport = false;
let doRefresh = false;
if (job.force) {
doRefresh = true;
}
if (!existingAssetEntity) {
// This asset is new to us, read it from disk
this.logger.debug(`Importing new asset: ${assetPath}`);
doImport = true;
} else if (stats.mtime.toISOString() !== existingAssetEntity.fileModifiedAt.toISOString()) {
// File modification time has changed since last time we checked, re-read from disk
this.logger.debug(
`File modification time has changed, re-importing asset: ${assetPath}. Old mtime: ${existingAssetEntity.fileModifiedAt}. New mtime: ${stats.mtime}`,
);
doRefresh = true;
} else if (!job.force && stats && !existingAssetEntity.isOffline) {
// Asset exists on disk and in db and mtime has not changed. Also, we are not forcing refresn. Therefore, do nothing
this.logger.debug(`Asset already exists in database and on disk, will not import: ${assetPath}`);
}
if (stats && existingAssetEntity?.isOffline) {
// File was previously offline but is now online
this.logger.debug(`Marking previously-offline asset as online: ${assetPath}`);
await this.assetRepository.save({ id: existingAssetEntity.id, isOffline: false });
doRefresh = true;
}
if (!doImport && !doRefresh) {
// If we don't import, exit here
return true;
this.logger.debug(`${assetPath} - skipping (not found)`);
return false;
}
let assetType: AssetType;
@ -223,7 +180,8 @@ export class LibraryService {
} else if (mimeTypes.isVideo(assetPath)) {
assetType = AssetType.VIDEO;
} else {
throw new BadRequestException(`Unsupported file type ${assetPath}`);
this.logger.warn(`${assetPath} - skipped (unsupported file type)`);
return false;
}
// TODO: doesn't xmp replace the file extension? Will need investigation
@ -232,25 +190,15 @@ export class LibraryService {
sidecarPath = `${assetPath}.xmp`;
}
const deviceAssetId = `${basename(assetPath)}`.replace(/\s+/g, '');
const pathHash = this.cryptoRepository.hashSha1(`path:${assetPath}`);
let assetId;
if (doImport) {
const library = await this.repository.get(job.id, true);
if (library?.deletedAt) {
this.logger.error('Cannot import asset into deleted library');
return false;
}
// TODO: In wait of refactoring the domain asset service, this function is just manually written like this
const addedAsset = await this.assetRepository.create({
ownerId: job.ownerId,
libraryId: job.id,
checksum: pathHash,
const isNew = !asset;
if (!asset) {
this.logger.debug(`${assetPath} - importing (new)`);
asset = await this.assetRepository.create({
ownerId: ownerId,
libraryId: id,
checksum: this.cryptoRepository.hashSha1(`path:${assetPath}`),
originalPath: assetPath,
deviceAssetId: deviceAssetId,
deviceAssetId: `${basename(assetPath)}`.replace(/\s+/g, ''),
deviceId: 'Library Import',
fileCreatedAt: stats.mtime,
fileModifiedAt: stats.mtime,
@ -261,24 +209,26 @@ export class LibraryService {
isReadOnly: true,
isExternal: true,
});
assetId = addedAsset.id;
} else if (doRefresh && existingAssetEntity) {
assetId = existingAssetEntity.id;
await this.assetRepository.updateAll([existingAssetEntity.id], {
}
const isUpdated = asset && stats.mtime.toISOString() !== asset.fileModifiedAt.toISOString();
if (isUpdated) {
this.logger.debug(`${assetPath} - updating (changed)`);
await this.assetRepository.updateAll([asset.id], {
fileCreatedAt: stats.mtime,
fileModifiedAt: stats.mtime,
});
} else {
// Not importing and not refreshing, do nothing
return true;
}
this.logger.debug(`Queuing metadata extraction for: ${assetPath}`);
const isBackOnline = asset.isOffline;
if (isBackOnline) {
this.logger.debug(`${assetPath} - updating (online)`);
await this.assetRepository.save({ id: asset.id, isOffline: false });
}
await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: { id: assetId, source: 'upload' } });
if (assetType === AssetType.VIDEO) {
await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { id: assetId } });
if (force || isNew || isUpdated) {
await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: { id: asset.id } });
await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: asset.id } });
}
return true;
@ -351,83 +301,85 @@ export class LibraryService {
}
async handleQueueAssetRefresh(job: ILibraryRefreshJob): Promise<boolean> {
const start = DateTime.now();
const library = await this.repository.get(job.id);
if (!library || library.type !== LibraryType.EXTERNAL) {
this.logger.warn('Can only refresh external libraries');
if (!library || !library.owner || library.type !== LibraryType.EXTERNAL) {
return false;
}
const user = await this.userRepository.get(library.ownerId);
if (!user?.externalPath) {
const { id, name, owner, importPaths, exclusionPatterns } = library;
this.logger.log(`Starting library scan: ${name}`);
if (!owner.externalPath) {
this.logger.warn('User has no external path set, cannot refresh library');
return false;
}
const normalizedExternalPath = path.normalize(user.externalPath);
this.logger.verbose(`Refreshing library: ${job.id}`);
const crawledAssetPaths = (
await this.storageRepository.crawl({
pathsToCrawl: library.importPaths,
exclusionPatterns: library.exclusionPatterns,
})
)
.map(path.normalize)
.filter((assetPath) =>
// Filter out paths that are not within the user's external path
assetPath.match(new RegExp(`^${normalizedExternalPath}`)),
);
this.logger.debug(`Found ${crawledAssetPaths.length} assets when crawling import paths ${library.importPaths}`);
const assetsInLibrary = await this.assetRepository.getByLibraryId([job.id]);
const offlineAssets = assetsInLibrary.filter((asset) => !crawledAssetPaths.includes(asset.originalPath));
this.logger.debug(`${offlineAssets.length} assets in library are not present on disk and will be marked offline`);
for (const offlineAsset of offlineAssets) {
const offlineJobData: IOfflineLibraryFileJob = {
id: job.id,
assetPath: offlineAsset.originalPath,
};
await this.jobRepository.queue({ name: JobName.LIBRARY_MARK_ASSET_OFFLINE, data: offlineJobData });
// scan files
const allFiles = new Set<string>();
const newFiles = new Set<string>();
const files = await this.storageRepository.crawl({
pathsToCrawl: importPaths,
exclusionPatterns: exclusionPatterns,
});
for (const file of files) {
if (!file.match(new RegExp(`^${owner.externalPath}`))) {
continue;
}
if (crawledAssetPaths.length > 0) {
let filteredPaths: string[] = [];
if (job.refreshAllFiles || job.refreshModifiedFiles) {
filteredPaths = crawledAssetPaths;
allFiles.add(file);
newFiles.add(file);
}
// compare with library assets
const onlineAssets = new Map<string, AssetEntity>();
const offlineAssets = new Map<string, AssetEntity>();
const assets = await this.assetRepository.getByLibraryId([job.id]);
for (const asset of assets) {
if (allFiles.has(asset.originalPath)) {
onlineAssets.set(asset.id, asset);
newFiles.delete(asset.originalPath);
} else {
const existingPaths = await this.repository.getOnlineAssetPaths(job.id);
this.logger.debug(`Found ${existingPaths.length} existing asset(s) in library ${job.id}`);
filteredPaths = crawledAssetPaths.filter((assetPath) => !existingPaths.includes(assetPath));
this.logger.debug(`After db comparison, ${filteredPaths.length} asset(s) remain to be imported`);
offlineAssets.set(asset.id, asset);
}
}
for (const assetPath of filteredPaths) {
const libraryJobData: ILibraryFileJob = {
id: job.id,
assetPath: path.normalize(assetPath),
ownerId: library.ownerId,
// mark assets as offline
if (offlineAssets.size > 0) {
for (const id of offlineAssets.keys()) {
await this.assetRepository.save({ id, isOffline: true });
}
}
// queue assets for potential refresh
const targetFiles = job.refreshAllFiles || job.refreshModifiedFiles ? allFiles : newFiles;
for (const assetPath of targetFiles) {
await this.jobRepository.queue({
name: JobName.LIBRARY_SCAN_ASSET,
data: {
id,
assetPath,
ownerId: owner.id,
force: job.refreshAllFiles ?? false,
};
await this.jobRepository.queue({ name: JobName.LIBRARY_SCAN_ASSET, data: libraryJobData });
}
},
});
}
await this.repository.update({ id: job.id, refreshedAt: new Date() });
return true;
}
async handleOfflineAsset(job: IOfflineLibraryFileJob): Promise<boolean> {
const existingAssetEntity = await this.assetRepository.getByLibraryIdAndOriginalPath(job.id, job.assetPath);
if (existingAssetEntity) {
this.logger.verbose(`Marking asset as offline: ${job.assetPath}`);
await this.assetRepository.save({ id: existingAssetEntity.id, isOffline: true });
}
this.logger.log(`Finished library scan: ${name}`);
this.logger.debug({
id,
name,
importPaths,
total: allFiles.size,
new: newFiles.size,
online: onlineAssets.size,
offline: offlineAssets.size,
elapsedTime: DateTime.now().diff(start).toHuman(),
});
return true;
}
@ -439,4 +391,56 @@ export class LibraryService {
}
return library;
}
private async deleteAssets(assetIds: string[]) {
// TODO: this should be refactored to a centralized asset deletion service
for (const assetId of assetIds) {
const asset = await this.assetRepository.getById(assetId);
if (!asset) {
this.logger.warn(`Asset not found: ${assetId}`);
continue;
}
this.logger.debug(`Removing asset from library: ${asset.originalPath}`);
if (asset.faces) {
await Promise.all(
asset.faces.map(({ assetId, personId }) =>
this.jobRepository.queue({ name: JobName.SEARCH_REMOVE_FACE, data: { assetId, personId } }),
),
);
}
await this.assetRepository.remove(asset);
await this.jobRepository.queue({ name: JobName.SEARCH_REMOVE_ASSET, data: { ids: [asset.id] } });
await this.jobRepository.queue({
name: JobName.DELETE_FILES,
data: { files: [asset.webpPath, asset.resizePath, asset.encodedVideoPath, asset.sidecarPath] },
});
// TODO refactor this to use cascades
if (asset.livePhotoVideoId && !assetIds.includes(asset.livePhotoVideoId)) {
assetIds.push(asset.livePhotoVideoId);
}
}
}
private hasAccess(user: UserEntity | null, file: string): boolean {
if (!user) {
return false;
}
if (!user.externalPath) {
this.logger.warn('User has no external path set, skipping');
return false;
}
const matches = file.match(new RegExp(`^${user.externalPath}`));
if (!matches) {
this.logger.error("Asset must be within the user's external path");
}
return !!matches;
}
}

View file

@ -134,25 +134,6 @@ export class LibraryRepository implements ILibraryRepository {
};
}
async getOnlineAssetPaths(libraryId: string): Promise<string[]> {
// Return all non-offline asset paths for a given library
const rawResults = await this.repository
.createQueryBuilder('library')
.innerJoinAndSelect('library.assets', 'assets')
.where('library.id = :id', { id: libraryId })
.andWhere('assets.isOffline = false')
.select('assets.originalPath')
.getRawMany();
const results: string[] = [];
for (const rawPath of rawResults) {
results.push(rawPath.assets_originalPath);
}
return results;
}
async getAssetIds(libraryId: string, withDeleted = false): Promise<string[]> {
let query = await this.repository
.createQueryBuilder('library')

View file

@ -83,7 +83,6 @@ export class AppService {
[JobName.SIDECAR_DISCOVERY]: (data) => this.metadataService.handleSidecarDiscovery(data),
[JobName.SIDECAR_SYNC]: () => this.metadataService.handleSidecarSync(),
[JobName.LIBRARY_SCAN_ASSET]: (data) => this.libraryService.handleAssetRefresh(data),
[JobName.LIBRARY_MARK_ASSET_OFFLINE]: (data) => this.libraryService.handleOfflineAsset(data),
[JobName.LIBRARY_SCAN]: (data) => this.libraryService.handleQueueAssetRefresh(data),
[JobName.LIBRARY_DELETE]: (data) => this.libraryService.handleDeleteLibrary(data),
[JobName.LIBRARY_REMOVE_OFFLINE]: (data) => this.libraryService.handleOfflineRemoval(data),

View file

@ -12,7 +12,6 @@ export const newLibraryRepositoryMock = (): jest.Mocked<ILibraryRepository> => {
getStatistics: jest.fn(),
getDefaultUploadLibrary: jest.fn(),
getUploadLibraryCount: jest.fn(),
getOnlineAssetPaths: jest.fn(),
getAssetIds: jest.fn(),
existsByName: jest.fn(),
getAllDeleted: jest.fn(),