refactor(EventDispatcher): use bullmq as a message queue

This commit is contained in:
Nicolas Meienberger 2023-08-15 22:55:02 +02:00 committed by Nicolas Meienberger
parent 5dd3f6963b
commit 710eaa8aaf
11 changed files with 98 additions and 703 deletions

View file

@ -1,20 +1,21 @@
# Only edit this file if you know what you are doing!
# It will be overwritten on update.
APPS_REPO_ID=7a92c8307e0a8074763c80be1fcfa4f87da6641daea9211aea6743b0116aba3b
APPS_REPO_URL=https://github.com/meienberger/runtipi-appstore
TZ=UTC
TZ=Etc/UTC
INTERNAL_IP=localhost
DNS_IP=9.9.9.9
ARCHITECTURE=arm64
TIPI_VERSION=0.8.0
ARCHITECTURE=arm64 # arm64 or amd64
TIPI_VERSION=1.5.2
JWT_SECRET=secret
ROOT_FOLDER_HOST=/Users/nicolas/Projects/runtipi
NGINX_PORT=3000
ROOT_FOLDER_HOST=/path/to/runtipi # absolute path to the root folder of the runtipi installation
STORAGE_PATH=/path/to/runtipi # absolute path to the root folder of the runtipi installation
NGINX_PORT=7000
NGINX_PORT_SSL=443
DOMAIN=tipi.localhost
POSTGRES_HOST=tipi-db
POSTGRES_DBNAME=tipi
POSTGRES_USERNAME=tipi
POSTGRES_PASSWORD=postgres
DOMAIN=example.com
STORAGE_PATH=/Users/nicolas/Projects/runtipi
POSTGRES_PORT=5432
REDIS_HOST=tipi-redis
DATABASE_URL=postgres://tipi:postgres@localhost:5432/tipi
DEMO_MODE=false
LOCAL_DOMAIN=tipi.lan

View file

@ -22,18 +22,4 @@ export const readdirSync = (path: string): string[] => fs.readdirSync(path);
export const fileExists = (path: string): boolean => fs.existsSync(path);
export const writeFile = (path: string, data: string) => fs.writeFileSync(path, data);
export const createFolder = (path: string) => {
if (!fileExists(path)) {
fs.mkdirSync(path, { recursive: true });
}
};
export const deleteFolder = (path: string) => fs.rmSync(path, { recursive: true });
export const getSeed = () => {
const seed = readFile('/runtipi/state/seed');
return seed.toString();
};
export const unlinkFile = (path: string) => fs.promises.unlink(path);

View file

@ -5,9 +5,6 @@ const WATCH_FILE = '/runtipi/state/events';
jest.mock('fs-extra');
// eslint-disable-next-line no-promise-executor-return
const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
beforeEach(async () => {
await fs.promises.mkdir('/runtipi/state', { recursive: true });
await fs.promises.mkdir('/app/logs', { recursive: true });
@ -18,183 +15,11 @@ beforeEach(async () => {
});
describe('EventDispatcher - dispatchEvent', () => {
it('should dispatch an event', () => {
const event = EventDispatcher.dispatchEvent('app');
expect(event.id).toBeDefined();
});
it('should dispatch an event with args', () => {
const event = EventDispatcher.dispatchEvent('app', ['--help']);
expect(event.id).toBeDefined();
});
it('Should put events into queue', async () => {
EventDispatcher.dispatchEvent('app', ['--help']);
EventDispatcher.dispatchEvent('app', ['--help']);
// @ts-expect-error - private method
const { queue } = EventDispatcher;
expect(queue.length).toBe(2);
});
it('Should put first event into lock after 1 sec', async () => {
EventDispatcher.dispatchEvent('app', ['--help']);
EventDispatcher.dispatchEvent('update', ['--help']);
// @ts-expect-error - private method
const { queue } = EventDispatcher;
await wait(1050);
// @ts-expect-error - private method
const { lock } = EventDispatcher;
expect(queue.length).toBe(2);
expect(lock).toBeDefined();
expect(lock?.type).toBe('app');
});
it('Should clear event once its status is success', async () => {
// @ts-expect-error - private method
jest.spyOn(EventDispatcher, 'getEventStatus').mockReturnValueOnce('success');
EventDispatcher.dispatchEvent('app', ['--help']);
await wait(1050);
// @ts-expect-error - private method
const { queue } = EventDispatcher;
expect(queue.length).toBe(0);
});
it('Should clear event once its status is error', async () => {
// @ts-expect-error - private method
jest.spyOn(EventDispatcher, 'getEventStatus').mockReturnValueOnce('error');
EventDispatcher.dispatchEvent('app', ['--help']);
await wait(1050);
// @ts-expect-error - private method
const { queue } = EventDispatcher;
expect(queue.length).toBe(0);
});
it('should dispatch an event in the queue', () => {});
});
describe('EventDispatcher - dispatchEventAsync', () => {
it('Should dispatch an event and wait for it to finish', async () => {
// @ts-expect-error - private method
jest.spyOn(EventDispatcher, 'getEventStatus').mockReturnValueOnce('success');
const { success } = await EventDispatcher.dispatchEventAsync('app', ['--help']);
it('Should dispatch an event and wait for it to finish', async () => {});
expect(success).toBe(true);
});
it('Should dispatch an event and wait for it to finish with error', async () => {
// @ts-expect-error - private method
jest.spyOn(EventDispatcher, 'getEventStatus').mockReturnValueOnce('error');
const { success } = await EventDispatcher.dispatchEventAsync('app', ['--help']);
expect(success).toBe(false);
});
});
describe('EventDispatcher - runEvent', () => {
it('Should do nothing if there is a lock', async () => {
// @ts-expect-error - private method
EventDispatcher.lock = { id: '123', type: 'app', args: [] };
// @ts-expect-error - private method
await EventDispatcher.runEvent();
const file = fs.readFileSync(WATCH_FILE, 'utf8');
expect(file).toBe('');
});
it('Should do nothing if there is no event in queue', async () => {
// @ts-expect-error - private method
await EventDispatcher.runEvent();
const file = fs.readFileSync(WATCH_FILE, 'utf8');
expect(file).toBe('');
});
});
describe('EventDispatcher - getEventStatus', () => {
it('Should return success if event is not in the queue', async () => {
// @ts-expect-error - private method
EventDispatcher.queue = [];
// @ts-expect-error - private method
const status = EventDispatcher.getEventStatus('123');
expect(status).toBe('success');
});
it('Should return error if event is expired', async () => {
const dateFiveMinutesAgo = new Date(new Date().getTime() - 5 * 60 * 10000);
// @ts-expect-error - private method
EventDispatcher.queue = [{ id: '123', type: 'app', args: [], creationDate: dateFiveMinutesAgo }];
// @ts-expect-error - private method
const status = EventDispatcher.getEventStatus('123');
expect(status).toBe('error');
});
it('Should be waiting if line is not found in the file', async () => {
// @ts-expect-error - private method
EventDispatcher.queue = [{ id: '123', type: 'app', args: [], creationDate: new Date() }];
// @ts-expect-error - private method
const status = EventDispatcher.getEventStatus('123');
expect(status).toBe('waiting');
});
});
describe('EventDispatcher - clearEvent', () => {
it('Should clear event', async () => {
const event = { id: '123', type: 'app', args: [], creationDate: new Date() };
// @ts-expect-error - private method
EventDispatcher.queue = [event];
// @ts-expect-error - private method
EventDispatcher.clearEvent(event);
// @ts-expect-error - private method
const { queue } = EventDispatcher;
expect(queue.length).toBe(0);
});
});
describe('EventDispatcher - pollQueue', () => {
it('Should not create a new interval if one already exists', async () => {
// @ts-expect-error - private method
EventDispatcher.interval = 123;
// @ts-expect-error - private method
const id = EventDispatcher.pollQueue();
// @ts-expect-error - private method
const { interval } = EventDispatcher;
expect(interval).toBe(123);
expect(id).toBe(123);
clearInterval(interval);
clearInterval(id);
});
});
describe('EventDispatcher - collectLockStatusAndClean', () => {
it('Should do nothing if there is no lock', async () => {
// @ts-expect-error - private method
EventDispatcher.lock = null;
// @ts-expect-error - private method
EventDispatcher.collectLockStatusAndClean();
// @ts-expect-error - private method
const { lock } = EventDispatcher;
expect(lock).toBeNull();
});
it('Should dispatch an event and wait for it to finish with error', async () => {});
});

View file

@ -1,6 +1,7 @@
/* eslint-disable vars-on-top */
import cron from 'node-cron';
import fs from 'fs-extra';
import { Queue, QueueEvents } from 'bullmq';
import { eventResultSchema, eventSchema, SystemEvent } from '@runtipi/shared';
import { getConfig } from '@/server/core/TipiConfig';
import { Logger } from '../Logger';
declare global {
@ -8,54 +9,26 @@ declare global {
var EventDispatcher: EventDispatcher | undefined;
}
export const EVENT_TYPES = {
// System events
RESTART: 'restart',
UPDATE: 'update',
CLONE_REPO: 'clone_repo',
UPDATE_REPO: 'update_repo',
APP: 'app',
SYSTEM_INFO: 'system_info',
} as const;
export type EventType = (typeof EVENT_TYPES)[keyof typeof EVENT_TYPES];
type SystemEvent = {
id: string;
type: EventType;
args: string[];
creationDate: Date;
};
const EVENT_STATUS = {
RUNNING: 'running',
SUCCESS: 'success',
ERROR: 'error',
WAITING: 'waiting',
} as const;
type EventStatus = (typeof EVENT_STATUS)[keyof typeof EVENT_STATUS];
const WATCH_FILE = '/runtipi/state/events';
// File state example:
// restart 1631231231231 running "arg1 arg2"
class EventDispatcher {
private static instance: EventDispatcher | null;
private dispatcherId = EventDispatcher.generateId();
private queue;
private queue: SystemEvent[] = [];
private lock: SystemEvent | null = null;
private interval: NodeJS.Timer;
private intervals: NodeJS.Timer[] = [];
private queueEvents;
constructor() {
const timer = this.pollQueue();
this.interval = timer;
this.queue = new Queue('events', { connection: { host: getConfig().REDIS_HOST, port: 6379 } });
this.queueEvents = new QueueEvents('events', { connection: { host: getConfig().REDIS_HOST, port: 6379 } });
}
public async cleanRepeatableJobs() {
const repeatableJobs = await this.queue.getRepeatableJobs();
await Promise.all(
repeatableJobs.map(async (job) => {
await this.queue.removeRepeatableByKey(job.key);
}),
);
}
public static getInstance(): EventDispatcher {
@ -65,194 +38,56 @@ class EventDispatcher {
return EventDispatcher.instance;
}
/**
* Generate a random task id
*
* @returns {string} id - Randomly generated id
*/
static generateId() {
return Math.random().toString(36).substring(2, 9);
}
/**
* Collect lock status and clean queue if event is done
*/
private collectLockStatusAndClean() {
if (!this.lock) {
return;
}
const status = this.getEventStatus(this.lock.id);
if (status === 'running' || status === 'waiting') {
return;
}
this.clearEvent(this.lock, status);
this.lock = null;
}
/**
* Poll queue and run events
*
* @returns {NodeJS.Timer} - Interval timer
*/
private pollQueue() {
Logger.info(`EventDispatcher(${this.dispatcherId}): Polling queue...`);
if (!this.interval) {
const id = setInterval(() => {
this.runEvent();
this.collectLockStatusAndClean();
}, 1000);
this.intervals.push(id);
return id;
}
return this.interval;
}
/**
* Run event from the queue if there is no lock
*/
private async runEvent() {
if (this.lock) {
return;
}
const event = this.queue[0];
if (!event) {
return;
}
this.lock = event;
// Write event to state file
const args = event.args.join(' ');
const line = `${event.type} ${event.id} waiting ${args}`;
fs.writeFileSync(WATCH_FILE, `${line}`);
}
/**
* Check event status
*
* @param {string} id - Event id
* @returns {EventStatus} - Event status
*/
private getEventStatus(id: string): EventStatus {
const event = this.queue.find((e) => e.id === id);
if (!event) {
return 'success';
}
// if event was created more than 3 minutes ago, it's an error
if (new Date().getTime() - event.creationDate.getTime() > 5 * 60 * 1000) {
return 'error';
}
const file = fs.readFileSync(WATCH_FILE, 'utf8');
const lines = file?.split('\n') || [];
const line = lines.find((l) => l.startsWith(`${event.type} ${event.id}`));
if (!line) {
return 'waiting';
}
const status = line.split(' ')[2] as EventStatus;
return status;
private generateJobId(event: SystemEvent) {
return [event.type, Date.now()].join('_');
}
/**
* Dispatch an event to the queue
*
* @param {EventType} type - Event type
* @param {[string]} args - Event arguments
* @returns {SystemEvent} event - Event object
* @param {SystemEvent} event - Event object
*/
public dispatchEvent(type: EventType, args?: string[]): SystemEvent {
const event: SystemEvent = {
id: EventDispatcher.generateId(),
type,
args: args || [],
creationDate: new Date(),
};
public dispatchEvent(event: SystemEvent) {
const jobid = this.generateJobId(event);
this.queue.push(event);
return event;
}
/**
* Clears an event from the queue
*
* @param {SystemEvent} event - The event to clear
* @param {EventStatus} status - The status to consider the event to
*/
private clearEvent(event: SystemEvent, status: EventStatus = 'success') {
this.queue = this.queue.filter((e) => e.id !== event.id);
if (fs.existsSync(`/app/logs/${event.id}.log`)) {
const log = fs.readFileSync(`/app/logs/${event.id}.log`, 'utf8');
if (log && status === 'error') {
Logger.error(`EventDispatcher: ${event.type} ${event.id} failed with error: ${log}`);
} else if (log) {
Logger.info(`EventDispatcher: ${event.type} ${event.id} finished with message: ${log}`);
}
fs.unlinkSync(`/app/logs/${event.id}.log`);
}
fs.writeFileSync(WATCH_FILE, '');
return this.queue.add(jobid, eventSchema.parse(event));
}
/**
* Dispatch an event to the queue and wait for it to finish
*
* @param {EventType} type - Event type
* @param {[string[]]} args - Event arguments
* @param {SystemEvent} event - Event object
* @returns {Promise<{ success: boolean; stdout?: string }>} - Promise that resolves when the event is done
*/
public async dispatchEventAsync(type: EventType, args?: string[]): Promise<{ success: boolean; stdout?: string }> {
const event = this.dispatchEvent(type, args);
public async dispatchEventAsync(event: SystemEvent): Promise<{ success: boolean; stdout?: string }> {
Logger.info(`Dispatching event ${JSON.stringify(event)}`);
try {
const job = await this.dispatchEvent(event);
const result = await job.waitUntilFinished(this.queueEvents, 1000 * 60 * 5);
return new Promise((resolve) => {
const interval = setInterval(() => {
this.intervals.push(interval);
const status = this.getEventStatus(event.id);
// const isFailed = await job.isFailed();
let log = '';
if (fs.existsSync(`/app/logs/${event.id}.log`)) {
log = fs.readFileSync(`/app/logs/${event.id}.log`, 'utf8');
}
if (status === 'success') {
clearInterval(interval);
resolve({ success: true, stdout: log });
} else if (status === 'error') {
clearInterval(interval);
resolve({ success: false, stdout: log });
}
}, 100);
});
return eventResultSchema.parse(result);
} catch (e) {
Logger.error(`Event failed: ${e}`);
let message = 'Event failed';
if (e instanceof Error) {
message = e.message;
}
return { success: false, stdout: message };
}
}
public clearInterval() {
clearInterval(this.interval);
this.intervals.forEach((i) => clearInterval(i));
public async clear() {
await this.cleanRepeatableJobs();
await this.queue.obliterate({ force: true });
}
public clear() {
this.queue = [];
this.lock = null;
EventDispatcher.instance = null;
fs.writeFileSync(WATCH_FILE, '');
}
public scheduleEvent(event: SystemEvent, cronExpression: string) {
Logger.info(`Scheduling event ${JSON.stringify(event)} with cron expression ${cronExpression}`);
const jobid = this.generateJobId(event);
public scheduleEvent(params: { type: EventType; args?: string[]; cronExpression: string }) {
const { type, args, cronExpression } = params;
cron.schedule(cronExpression, async () => {
this.dispatchEvent(type, args);
});
this.queue.add(jobid, eventSchema.parse(event), { repeat: { pattern: cronExpression } });
}
}

View file

@ -1,2 +1 @@
export { EventDispatcherInstance as EventDispatcher } from './EventDispatcher';
export { EVENT_TYPES } from './EventDispatcher';

View file

@ -33,8 +33,8 @@ if (!dev) {
const handle = nextApp.getRequestHandler();
nextApp.prepare().then(async () => {
const app = express();
const authService = new AuthQueries(db);
const app = express();
app.disable('x-powered-by');
@ -61,8 +61,8 @@ nextApp.prepare().then(async () => {
});
app.listen(port, async () => {
await EventDispatcher.clear();
const appService = new AppServiceClass(db);
EventDispatcher.clear();
// Run database migrations
if (getConfig().NODE_ENV !== 'development') {
@ -71,12 +71,12 @@ nextApp.prepare().then(async () => {
setConfig('status', 'RUNNING');
// Clone and update apps repo
await EventDispatcher.dispatchEventAsync('clone_repo', [getConfig().appsRepoUrl]);
await EventDispatcher.dispatchEventAsync('update_repo', [getConfig().appsRepoUrl]);
await EventDispatcher.dispatchEventAsync({ type: 'repo', command: 'clone', url: getConfig().appsRepoUrl });
await EventDispatcher.dispatchEventAsync({ type: 'repo', command: 'update', url: getConfig().appsRepoUrl });
// Scheduled events
EventDispatcher.scheduleEvent({ type: 'update_repo', args: [getConfig().appsRepoUrl], cronExpression: '*/30 * * * *' });
EventDispatcher.scheduleEvent({ type: 'system_info', args: [], cronExpression: '* * * * *' });
EventDispatcher.scheduleEvent({ type: 'repo', command: 'update', url: getConfig().appsRepoUrl }, '*/30 * * * *');
EventDispatcher.scheduleEvent({ type: 'system', command: 'system_info' }, '* * * * *');
appService.startAllApps();

View file

@ -2,9 +2,9 @@ import fs from 'fs-extra';
import waitForExpect from 'wait-for-expect';
import { TestDatabase, clearDatabase, closeDatabase, createDatabase } from '@/server/tests/test-utils';
import { faker } from '@faker-js/faker';
import { getAppEnvMap } from '@/server/utils/env-generation';
import { castAppConfig } from '@/client/modules/Apps/helpers/castAppConfig';
import { AppServiceClass } from './apps.service';
import { EventDispatcher, EVENT_TYPES } from '../../core/EventDispatcher';
import { EventDispatcher } from '../../core/EventDispatcher';
import { getAllApps, getAppById, updateApp, createAppConfig, insertApp } from '../../tests/apps.factory';
import { setConfig } from '../../core/TipiConfig';
@ -27,21 +27,6 @@ afterAll(async () => {
});
describe('Install app', () => {
it('Should correctly generate env file for app', async () => {
// arrange
const appConfig = createAppConfig({ form_fields: [{ type: 'text', label: '', env_variable: 'TEST_FIELD', required: true }] });
// act
await AppsService.installApp(appConfig.id, { TEST_FIELD: 'test' });
const envMap = await getAppEnvMap(appConfig.id);
// assert
expect(envMap.get('TEST_FIELD')).toBe('test');
expect(envMap.get('APP_PORT')).toBe(appConfig.port.toString());
expect(envMap.get('APP_ID')).toBe(appConfig.id);
expect(envMap.get('APP_DOMAIN')).toBe(`localhost:${appConfig.port}`);
});
it('Should add app in database', async () => {
// arrange
const appConfig = createAppConfig({ form_fields: [{ type: 'text', label: '', env_variable: 'TEST_FIELD', required: true }] });
@ -68,8 +53,8 @@ describe('Install app', () => {
// assert
expect(spy.mock.calls.length).toBe(2);
expect(spy.mock.calls[0]).toEqual([EVENT_TYPES.APP, ['install', appConfig.id]]);
expect(spy.mock.calls[1]).toEqual([EVENT_TYPES.APP, ['start', appConfig.id]]);
expect(spy.mock.calls[0]).toEqual([{ appid: appConfig.id, command: 'install', form: {}, type: 'app' }]);
expect(spy.mock.calls[1]).toEqual([{ appid: appConfig.id, command: 'start', form: {}, type: 'app' }]);
spy.mockRestore();
});
@ -87,59 +72,6 @@ describe('Install app', () => {
expect(app).toBeNull();
});
it('Should throw if required form fields are missing', async () => {
// arrange
const appConfig = createAppConfig({ form_fields: [{ type: 'text', label: '', env_variable: 'TEST_FIELD', required: true }] });
// act & assert
await expect(AppsService.installApp(appConfig.id, {})).rejects.toThrowError('Variable TEST_FIELD is required');
});
it('Correctly generates a random value if the field has a "random" type', async () => {
// arrange
const appConfig = createAppConfig({ form_fields: [{ type: 'random', label: '', env_variable: 'RANDOM_FIELD', required: true }] });
// act
await AppsService.installApp(appConfig.id, {});
const envMap = await getAppEnvMap(appConfig.id);
// assert
expect(envMap.get('RANDOM_FIELD')).toBeDefined();
expect(envMap.get('RANDOM_FIELD')).toHaveLength(32);
});
it('Should correctly copy app from repos to apps folder', async () => {
// arrange
const appConfig = createAppConfig({});
// act
await AppsService.installApp(appConfig.id, {});
const appFolder = fs.readdirSync(`/runtipi/apps/${appConfig.id}`);
// assert
expect(appFolder).toBeDefined();
expect(appFolder.indexOf('docker-compose.yml')).toBeGreaterThanOrEqual(0);
});
it('Should cleanup any app folder existing before install', async () => {
// arrange
const appConfig = createAppConfig();
const MockFiles: Record<string, unknown> = {};
MockFiles[`/runtipi/apps/${appConfig.id}/docker-compose.yml`] = 'test';
MockFiles[`/runtipi/apps/${appConfig.id}/test.yml`] = 'test';
MockFiles[`/runtipi/apps/${appConfig.id}`] = ['test.yml', 'docker-compose.yml'];
// @ts-expect-error - Mocking fs
fs.__applyMockFiles(MockFiles);
// act
expect(fs.existsSync(`/runtipi/apps/${appConfig.id}/test.yml`)).toBe(true);
await AppsService.installApp(appConfig.id, {});
// assert
expect(fs.existsSync(`/runtipi/apps/${appConfig.id}/test.yml`)).toBe(false);
expect(fs.existsSync(`/runtipi/apps/${appConfig.id}/docker-compose.yml`)).toBe(true);
});
it('Should throw if app is exposed and domain is not provided', async () => {
// arrange
const appConfig = createAppConfig({ exposable: true });
@ -218,19 +150,6 @@ describe('Install app', () => {
await expect(AppsService.installApp(appConfig.id, {})).rejects.toThrowError(`App ${appConfig.id} has invalid config.json file`);
});
it('Should throw if config.json is not valid after folder copy', async () => {
// arrange
jest.spyOn(fs, 'copySync').mockImplementationOnce(() => {});
const appConfig = createAppConfig({});
const MockFiles: Record<string, unknown> = {};
MockFiles[`/runtipi/apps/${appConfig.id}/config.json`] = 'test';
// @ts-expect-error - Mocking fs
fs.__applyMockFiles(MockFiles);
// act & assert
await expect(AppsService.installApp(appConfig.id, {})).rejects.toThrowError(`App ${appConfig.id} has invalid config.json file`);
});
it('should throw if app is not exposed and config has force_expose set to true', async () => {
// arrange
const appConfig = createAppConfig({ force_expose: true });
@ -238,39 +157,6 @@ describe('Install app', () => {
// act & assert
await expect(AppsService.installApp(appConfig.id, {})).rejects.toThrowError();
});
it('should replace env variables in .templates files in data folder', async () => {
// arrange
const appConfig = createAppConfig({ form_fields: [{ env_variable: 'TEST', type: 'text', label: 'test', required: true }] });
await fs.promises.mkdir(`/runtipi/repos/repo-id/apps/${appConfig.id}/data`, { recursive: true });
await fs.promises.writeFile(`/runtipi/repos/repo-id/apps/${appConfig.id}/data/test.txt.template`, 'test {{TEST}}');
await fs.promises.writeFile(`/runtipi/repos/repo-id/apps/${appConfig.id}/data/test2.txt`, 'test {{TEST}}');
// act
await AppsService.installApp(appConfig.id, { TEST: 'test' });
// assert
const file = await fs.promises.readFile(`/app/storage/app-data/${appConfig.id}/data/test.txt`);
const file2 = await fs.promises.readFile(`/app/storage/app-data/${appConfig.id}/data/test2.txt`);
expect(file.toString()).toBe('test test');
expect(file2.toString()).toBe('test {{TEST}}');
});
it('should copy and replace env variables in deeply nested .templates files in data folder', async () => {
// arrange
const appConfig = createAppConfig({ form_fields: [{ env_variable: 'TEST', type: 'text', label: 'test', required: true }] });
await fs.promises.mkdir(`/runtipi/repos/repo-id/apps/${appConfig.id}/data`, { recursive: true });
await fs.promises.writeFile(`/runtipi/repos/repo-id/apps/${appConfig.id}/data/test.txt.template`, 'test {{TEST}}');
await fs.promises.mkdir(`/runtipi/repos/repo-id/apps/${appConfig.id}/data/test/test`, { recursive: true });
await fs.promises.writeFile(`/runtipi/repos/repo-id/apps/${appConfig.id}/data/test/test/test.txt.template`, 'test {{TEST}}');
// act
await AppsService.installApp(appConfig.id, { TEST: 'test' });
// assert
const file = await fs.promises.readFile(`/app/storage/app-data/${appConfig.id}/data/test/test/test.txt`);
expect(file.toString()).toBe('test test');
});
});
describe('Uninstall app', () => {
@ -298,8 +184,8 @@ describe('Uninstall app', () => {
// assert
expect(spy.mock.calls.length).toBe(2);
expect(spy.mock.calls[0]).toEqual([EVENT_TYPES.APP, ['stop', appConfig.id]]);
expect(spy.mock.calls[1]).toEqual([EVENT_TYPES.APP, ['uninstall', appConfig.id]]);
expect(spy.mock.calls[0]).toEqual([{ appid: appConfig.id, command: 'stop', form: {}, type: 'app' }]);
expect(spy.mock.calls[1]).toEqual([{ appid: appConfig.id, command: 'uninstall', form: {}, type: 'app' }]);
spy.mockRestore();
});
@ -323,7 +209,7 @@ describe('Uninstall app', () => {
});
describe('Start app', () => {
it('Should correctly dispatch event', async () => {
it('Should correctly dispatch start event', async () => {
// arrange
const appConfig = createAppConfig({});
await insertApp({}, appConfig, db);
@ -333,7 +219,7 @@ describe('Start app', () => {
await AppsService.startApp(appConfig.id);
// assert
expect(spy.mock.lastCall).toEqual([EVENT_TYPES.APP, ['start', appConfig.id]]);
expect(spy.mock.lastCall).toEqual([{ appid: appConfig.id, command: 'start', form: {}, type: 'app' }]);
spy.mockRestore();
});
@ -357,24 +243,6 @@ describe('Start app', () => {
spy.mockRestore();
});
it('should regenerate env file', async () => {
// arrange
const appConfig = createAppConfig({ form_fields: [{ type: 'text', label: '', required: true, env_variable: 'TEST_FIELD' }] });
await insertApp({ config: { TEST_FIELD: 'test' } }, appConfig, db);
fs.writeFileSync(`/app/storage/app-data/${appConfig.id}/app.env`, 'TEST=test\nAPP_PORT=3000');
// act
await AppsService.startApp(appConfig.id);
const envMap = await getAppEnvMap(appConfig.id);
// assert
expect(envMap.get('TEST_FIELD')).toBe('test');
expect(envMap.get('APP_PORT')).toBe(appConfig.port.toString());
expect(envMap.get('APP_ID')).toBe(appConfig.id);
expect(envMap.get('TEST')).toBe('test');
expect(envMap.get('APP_DOMAIN')).toBe(`localhost:${appConfig.port}`);
});
it('Should throw if start script fails', async () => {
// arrange
const appConfig = createAppConfig({});
@ -386,18 +254,6 @@ describe('Start app', () => {
const app = await getAppById(appConfig.id, db);
expect(app?.status).toBe('stopped');
});
it('Should throw if app has invalid config.json', async () => {
// arrange
const appConfig = createAppConfig({});
await insertApp({ status: 'stopped' }, appConfig, db);
await fs.promises.writeFile(`/runtipi/apps/${appConfig.id}/config.json`, 'test');
// act & assert
await expect(AppsService.startApp(appConfig.id)).rejects.toThrow(`App ${appConfig.id} has invalid config.json`);
const app = await getAppById(appConfig.id, db);
expect(app?.status).toBe('stopped');
});
});
describe('Stop app', () => {
@ -411,7 +267,7 @@ describe('Stop app', () => {
await AppsService.stopApp(appConfig.id);
// assert
expect(spy.mock.lastCall).toEqual([EVENT_TYPES.APP, ['stop', appConfig.id]]);
expect(spy.mock.lastCall).toEqual([{ appid: appConfig.id, command: 'stop', form: {}, type: 'app' }]);
});
it('Should throw if app is not installed', async () => {
@ -440,45 +296,17 @@ describe('Update app config', () => {
// act
await AppsService.updateAppConfig(appConfig.id, { TEST_FIELD: word });
const envMap = await getAppEnvMap(appConfig.id);
const app = await getAppById(appConfig.id, db);
const config = castAppConfig(app?.config);
// assert
expect(envMap.get('TEST_FIELD')).toBe(word);
expect(envMap.get('APP_PORT')).toBe(appConfig.port.toString());
expect(envMap.get('APP_ID')).toBe(appConfig.id);
expect(envMap.get('APP_DOMAIN')).toBe(`localhost:${appConfig.port}`);
});
it('Should throw if required field is missing', async () => {
// arrange
const appConfig = createAppConfig({ form_fields: [{ type: 'text', label: '', required: true, env_variable: 'TEST_FIELD' }] });
await insertApp({}, appConfig, db);
// act & assert
await expect(AppsService.updateAppConfig(appConfig.id, { TEST_FIELD: '' })).rejects.toThrowError('Variable TEST_FIELD is required');
expect(config.TEST_FIELD).toBe(word);
});
it('Should throw if app is not installed', async () => {
await expect(AppsService.updateAppConfig('test-app-2', { test: 'test' })).rejects.toThrowError('server-messages.errors.app-not-found');
});
it('Should not recreate random field if already present in .env', async () => {
// arrange
const field = faker.lorem.word();
const appConfig = createAppConfig({ form_fields: [{ type: 'random', label: '', required: false, env_variable: field }] });
await insertApp({}, appConfig, db);
const envFile = fs.readFileSync(`/app/storage/app-data/${appConfig.id}/app.env`).toString();
fs.writeFileSync(`/app/storage/app-data/${appConfig.id}/app.env`, `${envFile}\n${field}=test`);
// act
await AppsService.updateAppConfig(appConfig.id, { TEST_FIELD: 'test' });
const envMap = await getAppEnvMap(appConfig.id);
// assert
expect(envMap.get(field)).toBe('test');
});
it('Should throw if app is exposed and domain is not provided', async () => {
// arrange
const appConfig = createAppConfig({ exposable: true });
@ -509,17 +337,6 @@ describe('Update app config', () => {
await expect(AppsService.updateAppConfig(appConfig2.id, {}, true, domain)).rejects.toThrowError('server-messages.errors.domain-already-in-use');
});
it('Should throw if app has invalid config.json', async () => {
// arrange
const appConfig = createAppConfig({});
await insertApp({}, appConfig, db);
fs.writeFileSync(`/runtipi/apps/${appConfig.id}/config.json`, 'test');
fs.writeFileSync(`/app/storage/app-data/${appConfig.id}/config.json`, 'test');
// act & assert
await expect(AppsService.updateAppConfig(appConfig.id, {})).rejects.toThrowError(`App ${appConfig.id} has invalid config.json`);
});
it('should throw if app is not exposed and config has force_expose set to true', async () => {
// arrange
const appConfig = createAppConfig({ force_expose: true });

View file

@ -3,11 +3,12 @@ import { App } from '@/server/db/schema';
import { AppQueries } from '@/server/queries/apps/apps.queries';
import { TranslatedError } from '@/server/utils/errors';
import { Database } from '@/server/db';
import { checkAppRequirements, checkEnvFile, generateEnvFile, getAvailableApps, ensureAppFolder, AppInfo, getAppInfo, getUpdateInfo, copyDataDir } from './apps.helpers';
import { castAppConfig } from '@/client/modules/Apps/helpers/castAppConfig';
import { AppInfo } from '@runtipi/shared';
import { checkAppRequirements, getAvailableApps, getAppInfo, getUpdateInfo } from './apps.helpers';
import { getConfig } from '../../core/TipiConfig';
import { EventDispatcher } from '../../core/EventDispatcher';
import { Logger } from '../../core/Logger';
import { createFolder } from '../../common/fs.helpers';
import { notEmpty } from '../../common/typescript.helpers';
const sortApps = (a: AppInfo, b: AppInfo) => a.id.localeCompare(b.id);
@ -29,12 +30,6 @@ export class AppServiceClass {
this.queries = new AppQueries(p);
}
async regenerateEnvFile(app: App) {
ensureAppFolder(app.id);
await generateEnvFile(app);
await checkEnvFile(app.id);
}
/**
* This function starts all apps that are in the 'running' status.
* It finds all the running apps and starts them by regenerating the env file, checking the env file and dispatching the start event.
@ -50,12 +45,9 @@ export class AppServiceClass {
await Promise.all(
apps.map(async (app) => {
try {
// Regenerate env file
await this.regenerateEnvFile(app);
await this.queries.updateApp(app.id, { status: 'starting' });
EventDispatcher.dispatchEventAsync('app', ['start', app.id]).then(({ success }) => {
EventDispatcher.dispatchEventAsync({ type: 'app', command: 'start', appid: app.id, form: castAppConfig(app.config) }).then(({ success }) => {
if (success) {
this.queries.updateApp(app.id, { status: 'running' });
} else {
@ -83,11 +75,8 @@ export class AppServiceClass {
throw new TranslatedError('server-messages.errors.app-not-found', { id: appName });
}
// Regenerate env file
await this.regenerateEnvFile(app);
await this.queries.updateApp(appName, { status: 'starting' });
const { success, stdout } = await EventDispatcher.dispatchEventAsync('app', ['start', app.id]);
const { success, stdout } = await EventDispatcher.dispatchEventAsync({ type: 'app', command: 'start', appid: appName, form: castAppConfig(app.config) });
if (success) {
await this.queries.updateApp(appName, { status: 'running' });
@ -123,12 +112,8 @@ export class AppServiceClass {
throw new TranslatedError('server-messages.errors.domain-not-valid', { domain });
}
ensureAppFolder(id, true);
checkAppRequirements(id);
// Create app folder
createFolder(`/app/storage/app-data/${id}/data`);
const appInfo = getAppInfo(id);
if (!appInfo) {
@ -151,16 +136,10 @@ export class AppServiceClass {
}
}
const newApp = await this.queries.createApp({ id, status: 'installing', config: form, version: appInfo.tipi_version, exposed: exposed || false, domain: domain || null });
if (newApp) {
// Create env file
await generateEnvFile(newApp);
await copyDataDir(id);
}
await this.queries.createApp({ id, status: 'installing', config: form, version: appInfo.tipi_version, exposed: exposed || false, domain: domain || null });
// Run script
const { success, stdout } = await EventDispatcher.dispatchEventAsync('app', ['install', id]);
const { success, stdout } = await EventDispatcher.dispatchEventAsync({ type: 'app', command: 'install', appid: id, form });
if (!success) {
await this.queries.deleteApp(id);
@ -228,13 +207,14 @@ export class AppServiceClass {
}
}
const updatedApp = await this.queries.updateApp(id, { exposed: exposed || false, domain: domain || null, config: form });
const { success } = await EventDispatcher.dispatchEventAsync({ type: 'app', command: 'generate_env', appid: id, form });
if (updatedApp) {
await generateEnvFile(updatedApp);
if (success) {
const updatedApp = await this.queries.updateApp(id, { exposed: exposed || false, domain: domain || null, config: form });
return updatedApp;
}
return updatedApp;
throw new TranslatedError('server-messages.errors.app-failed-to-update', { id });
};
/**
@ -250,12 +230,10 @@ export class AppServiceClass {
throw new TranslatedError('server-messages.errors.app-not-found', { id });
}
await this.regenerateEnvFile(app);
// Run script
await this.queries.updateApp(id, { status: 'stopping' });
const { success, stdout } = await EventDispatcher.dispatchEventAsync('app', ['stop', id]);
const { success, stdout } = await EventDispatcher.dispatchEventAsync({ type: 'app', command: 'stop', appid: id, form: castAppConfig(app.config) });
if (success) {
await this.queries.updateApp(id, { status: 'stopped' });
@ -285,11 +263,9 @@ export class AppServiceClass {
await this.stopApp(id);
}
await this.regenerateEnvFile(app);
await this.queries.updateApp(id, { status: 'uninstalling' });
const { success, stdout } = await EventDispatcher.dispatchEventAsync('app', ['uninstall', id]);
const { success, stdout } = await EventDispatcher.dispatchEventAsync({ type: 'app', command: 'uninstall', appid: id, form: castAppConfig(app.config) });
if (!success) {
await this.queries.updateApp(id, { status: 'stopped' });
@ -336,11 +312,9 @@ export class AppServiceClass {
throw new TranslatedError('server-messages.errors.app-not-found', { id });
}
await this.regenerateEnvFile(app);
await this.queries.updateApp(id, { status: 'updating' });
const { success, stdout } = await EventDispatcher.dispatchEventAsync('app', ['update', id]);
const { success, stdout } = await EventDispatcher.dispatchEventAsync({ type: 'app', command: 'update', appid: id, form: castAppConfig(app.config) });
if (success) {
const appInfo = getAppInfo(app.id, app.status);

View file

@ -101,7 +101,7 @@ export class SystemServiceClass {
TipiConfig.setConfig('status', 'UPDATING');
this.dispatcher.dispatchEventAsync('update');
this.dispatcher.dispatchEvent({ type: 'system', command: 'update' });
return true;
};
@ -116,7 +116,7 @@ export class SystemServiceClass {
}
TipiConfig.setConfig('status', 'RESTARTING');
this.dispatcher.dispatchEventAsync('restart');
this.dispatcher.dispatchEvent({ type: 'system', command: 'restart' });
return true;
};

View file

@ -1,36 +1,5 @@
import webpush from 'web-push';
import fs from 'fs-extra';
/**
* Convert a string of environment variables to a Map
*
* @param {string} envString - String of environment variables
*/
export const envStringToMap = (envString: string) => {
const envMap = new Map<string, string>();
const envArray = envString.split('\n');
envArray.forEach((env) => {
const [key, value] = env.split('=');
if (key && value) {
envMap.set(key, value);
}
});
return envMap;
};
/**
* Convert a Map of environment variables to a valid string of environment variables
* that can be used in a .env file
*
* @param {Map<string, string>} envMap - Map of environment variables
*/
export const envMapToString = (envMap: Map<string, string>) => {
const envArray = Array.from(envMap).map(([key, value]) => `${key}=${value}`);
return envArray.join('\n');
};
/**
* This function reads the env file for the app with the provided id and returns a Map containing the key-value pairs of the environment variables.
* It reads the app.env file, splits it into individual environment variables, and stores them in a Map, with the environment variable name as the key and its value as the value.
@ -53,14 +22,3 @@ export const getAppEnvMap = async (id: string) => {
return new Map<string, string>();
}
};
/**
* Generate VAPID keys
*/
export const generateVapidKeys = () => {
const vapidKeys = webpush.generateVAPIDKeys();
return {
publicKey: vapidKeys.publicKey,
privateKey: vapidKeys.privateKey,
};
};

View file

@ -39,5 +39,5 @@ jest.mock('next/config', () => () => ({
}));
afterAll(() => {
EventDispatcher.clearInterval();
EventDispatcher.clear();
});