EventDispatcher.ts 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. /* eslint-disable vars-on-top */
  2. import cron from 'node-cron';
  3. import fs from 'fs-extra';
  4. import { Logger } from '../Logger';
  5. import { getConfig } from '../TipiConfig';
  6. declare global {
  7. // eslint-disable-next-line no-var
  8. var EventDispatcher: EventDispatcher | undefined;
  9. }
  10. export const EVENT_TYPES = {
  11. // System events
  12. RESTART: 'restart',
  13. UPDATE: 'update',
  14. CLONE_REPO: 'clone_repo',
  15. UPDATE_REPO: 'update_repo',
  16. APP: 'app',
  17. SYSTEM_INFO: 'system_info',
  18. } as const;
  19. export type EventType = (typeof EVENT_TYPES)[keyof typeof EVENT_TYPES];
  20. type SystemEvent = {
  21. id: string;
  22. type: EventType;
  23. args: string[];
  24. creationDate: Date;
  25. };
  26. const EVENT_STATUS = {
  27. RUNNING: 'running',
  28. SUCCESS: 'success',
  29. ERROR: 'error',
  30. WAITING: 'waiting',
  31. } as const;
  32. type EventStatus = (typeof EVENT_STATUS)[keyof typeof EVENT_STATUS];
  33. const WATCH_FILE = '/runtipi/state/events';
  34. // File state example:
  35. // restart 1631231231231 running "arg1 arg2"
  36. class EventDispatcher {
  37. private static instance: EventDispatcher | null;
  38. private dispatcherId = EventDispatcher.generateId();
  39. private queue: SystemEvent[] = [];
  40. private lock: SystemEvent | null = null;
  41. private interval: NodeJS.Timer;
  42. private intervals: NodeJS.Timer[] = [];
  43. constructor() {
  44. const timer = this.pollQueue();
  45. this.interval = timer;
  46. }
  47. public static getInstance(): EventDispatcher {
  48. if (!EventDispatcher.instance) {
  49. EventDispatcher.instance = new EventDispatcher();
  50. }
  51. return EventDispatcher.instance;
  52. }
  53. /**
  54. * Generate a random task id
  55. *
  56. * @returns {string} id - Randomly generated id
  57. */
  58. static generateId() {
  59. return Math.random().toString(36).substring(2, 9);
  60. }
  61. /**
  62. * Collect lock status and clean queue if event is done
  63. */
  64. private collectLockStatusAndClean() {
  65. if (!this.lock) {
  66. return;
  67. }
  68. const status = this.getEventStatus(this.lock.id);
  69. if (status === 'running' || status === 'waiting') {
  70. return;
  71. }
  72. this.clearEvent(this.lock, status);
  73. this.lock = null;
  74. }
  75. /**
  76. * Poll queue and run events
  77. */
  78. private pollQueue() {
  79. Logger.info(`EventDispatcher(${this.dispatcherId}): Polling queue...`);
  80. if (!this.interval) {
  81. const id = setInterval(() => {
  82. this.runEvent();
  83. this.collectLockStatusAndClean();
  84. }, 1000);
  85. this.intervals.push(id);
  86. return id;
  87. }
  88. return this.interval;
  89. }
  90. /**
  91. * Run event from the queue if there is no lock
  92. */
  93. private async runEvent() {
  94. if (this.lock) {
  95. return;
  96. }
  97. const event = this.queue[0];
  98. if (!event) {
  99. return;
  100. }
  101. this.lock = event;
  102. // Write event to state file
  103. const args = event.args.join(' ');
  104. const line = `${event.type} ${event.id} waiting ${args}`;
  105. fs.writeFileSync(WATCH_FILE, `${line}`);
  106. }
  107. /**
  108. * Check event status
  109. *
  110. * @param {string} id - Event id
  111. * @returns {EventStatus} - Event status
  112. */
  113. private getEventStatus(id: string): EventStatus {
  114. const event = this.queue.find((e) => e.id === id);
  115. if (!event) {
  116. return 'success';
  117. }
  118. // if event was created more than 3 minutes ago, it's an error
  119. if (new Date().getTime() - event.creationDate.getTime() > 5 * 60 * 1000) {
  120. return 'error';
  121. }
  122. const file = fs.readFileSync(WATCH_FILE, 'utf8');
  123. const lines = file?.split('\n') || [];
  124. const line = lines.find((l) => l.startsWith(`${event.type} ${event.id}`));
  125. if (!line) {
  126. return 'waiting';
  127. }
  128. const status = line.split(' ')[2] as EventStatus;
  129. return status;
  130. }
  131. /**
  132. * Dispatch an event to the queue
  133. *
  134. * @param {EventType} type - Event type
  135. * @param {[string]} args - Event arguments
  136. * @returns {SystemEvent} event - Event object
  137. */
  138. public dispatchEvent(type: EventType, args?: string[]): SystemEvent {
  139. const event: SystemEvent = {
  140. id: EventDispatcher.generateId(),
  141. type,
  142. args: args || [],
  143. creationDate: new Date(),
  144. };
  145. this.queue.push(event);
  146. return event;
  147. }
  148. /**
  149. * Clears an event from the queue
  150. *
  151. * @param {SystemEvent} event - The event to clear
  152. * @param {EventStatus} status - The status to consider the event to
  153. */
  154. private clearEvent(event: SystemEvent, status: EventStatus = 'success') {
  155. this.queue = this.queue.filter((e) => e.id !== event.id);
  156. if (fs.existsSync(`/app/logs/${event.id}.log`)) {
  157. const log = fs.readFileSync(`/app/logs/${event.id}.log`, 'utf8');
  158. if (log && status === 'error') {
  159. Logger.error(`EventDispatcher: ${event.type} ${event.id} failed with error: ${log}`);
  160. } else if (log) {
  161. Logger.info(`EventDispatcher: ${event.type} ${event.id} finished with message: ${log}`);
  162. }
  163. fs.unlinkSync(`/app/logs/${event.id}.log`);
  164. }
  165. fs.writeFileSync(WATCH_FILE, '');
  166. }
  167. /**
  168. * Dispatch an event to the queue and wait for it to finish
  169. *
  170. * @param {EventType} type - Event type
  171. * @param {[string[]]} args - Event arguments
  172. * @returns - Promise that resolves when the event is done
  173. */
  174. public async dispatchEventAsync(type: EventType, args?: string[]): Promise<{ success: boolean; stdout?: string }> {
  175. const event = this.dispatchEvent(type, args);
  176. return new Promise((resolve) => {
  177. const interval = setInterval(() => {
  178. this.intervals.push(interval);
  179. const status = this.getEventStatus(event.id);
  180. let log = '';
  181. if (fs.existsSync(`/app/logs/${event.id}.log`)) {
  182. log = fs.readFileSync(`/app/logs/${event.id}.log`, 'utf8');
  183. }
  184. if (status === 'success') {
  185. clearInterval(interval);
  186. resolve({ success: true, stdout: log });
  187. } else if (status === 'error') {
  188. clearInterval(interval);
  189. resolve({ success: false, stdout: log });
  190. }
  191. }, 100);
  192. });
  193. }
  194. public clearInterval() {
  195. clearInterval(this.interval);
  196. this.intervals.forEach((i) => clearInterval(i));
  197. }
  198. public clear() {
  199. this.queue = [];
  200. this.lock = null;
  201. EventDispatcher.instance = null;
  202. fs.writeFileSync(WATCH_FILE, '');
  203. }
  204. public scheduleEvent(params: { type: EventType; args?: string[]; cronExpression: string }) {
  205. const { type, args, cronExpression } = params;
  206. cron.schedule(cronExpression, async () => {
  207. this.dispatchEvent(type, args);
  208. });
  209. }
  210. }
  211. export const EventDispatcherInstance = global.EventDispatcher || EventDispatcher.getInstance();
  212. if (getConfig().NODE_ENV !== 'production') {
  213. global.EventDispatcher = EventDispatcherInstance;
  214. }