Sfoglia il codice sorgente

fix: retry failed revision transitions

Karol Sójko 1 anno fa
parent
commit
e535cd504c

+ 26 - 2
packages/auth/bin/transition.ts

@@ -11,6 +11,7 @@ import { DomainEventPublisherInterface } from '@standardnotes/domain-events'
 import { DomainEventFactoryInterface } from '../src/Domain/Event/DomainEventFactoryInterface'
 import { UserRepositoryInterface } from '../src/Domain/User/UserRepositoryInterface'
 import { RoleName } from '@standardnotes/domain-core'
+import { TransitionStatusRepositoryInterface } from '../src/Domain/Transition/TransitionStatusRepositoryInterface'
 
 const inputArgs = process.argv.slice(2)
 const startDateString = inputArgs[0]
@@ -18,6 +19,7 @@ const endDateString = inputArgs[1]
 
 const requestTransition = async (
   userRepository: UserRepositoryInterface,
+  transitionStatusRepository: TransitionStatusRepositoryInterface,
   logger: Logger,
   domainEventFactory: DomainEventFactoryInterface,
   domainEventPublisher: DomainEventPublisherInterface,
@@ -37,7 +39,10 @@ const requestTransition = async (
       continue
     }
 
-    const transitionRequestedEvent = domainEventFactory.createTransitionRequestedEvent({ userUuid: user.uuid })
+    const transitionRequestedEvent = domainEventFactory.createTransitionRequestedEvent({
+      userUuid: user.uuid,
+      type: 'items',
+    })
 
     usersTriggered += 1
 
@@ -47,6 +52,20 @@ const requestTransition = async (
   logger.info(
     `Triggered transition for ${usersTriggered} users created between ${startDateString} and ${endDateString}`,
   )
+
+  const revisionStatuses = await transitionStatusRepository.getStatuses('revisions')
+  const failedStatuses = revisionStatuses.filter((status) => status.status === 'FAILED')
+
+  logger.info(`Found ${failedStatuses.length} failed revision transitions`)
+
+  for (const status of failedStatuses) {
+    const transitionRequestedEvent = domainEventFactory.createTransitionRequestedEvent({
+      userUuid: status.userUuid,
+      type: 'revisions',
+    })
+
+    await domainEventPublisher.publish(transitionRequestedEvent)
+  }
 }
 
 const container = new ContainerConfigLoader('worker')
@@ -63,8 +82,13 @@ void container.load().then((container) => {
   const userRepository: UserRepositoryInterface = container.get(TYPES.Auth_UserRepository)
   const domainEventFactory: DomainEventFactoryInterface = container.get(TYPES.Auth_DomainEventFactory)
   const domainEventPublisher: DomainEventPublisherInterface = container.get(TYPES.Auth_DomainEventPublisher)
+  const transitionStatusRepository: TransitionStatusRepositoryInterface = container.get(
+    TYPES.Auth_TransitionStatusRepository,
+  )
 
-  Promise.resolve(requestTransition(userRepository, logger, domainEventFactory, domainEventPublisher))
+  Promise.resolve(
+    requestTransition(userRepository, transitionStatusRepository, logger, domainEventFactory, domainEventPublisher),
+  )
     .then(() => {
       logger.info(`Finished transition request for users created between ${startDateString} and ${endDateString}`)
 

+ 1 - 1
packages/auth/src/Domain/Event/DomainEventFactory.ts

@@ -33,7 +33,7 @@ import { DomainEventFactoryInterface } from './DomainEventFactoryInterface'
 export class DomainEventFactory implements DomainEventFactoryInterface {
   constructor(@inject(TYPES.Auth_Timer) private timer: TimerInterface) {}
 
-  createTransitionRequestedEvent(dto: { userUuid: string }): TransitionRequestedEvent {
+  createTransitionRequestedEvent(dto: { userUuid: string; type: 'items' | 'revisions' }): TransitionRequestedEvent {
     return {
       type: 'TRANSITION_REQUESTED',
       createdAt: this.timer.getUTCDate(),

+ 1 - 1
packages/auth/src/Domain/Event/DomainEventFactoryInterface.ts

@@ -90,5 +90,5 @@ export interface DomainEventFactoryInterface {
   }): StatisticPersistenceRequestedEvent
   createSessionCreatedEvent(dto: { userUuid: string }): SessionCreatedEvent
   createSessionRefreshedEvent(dto: { userUuid: string }): SessionRefreshedEvent
-  createTransitionRequestedEvent(dto: { userUuid: string }): TransitionRequestedEvent
+  createTransitionRequestedEvent(dto: { userUuid: string; type: 'items' | 'revisions' }): TransitionRequestedEvent
 }

+ 3 - 0
packages/auth/src/Domain/Transition/TransitionStatusRepositoryInterface.ts

@@ -9,4 +9,7 @@ export interface TransitionStatusRepositoryInterface {
     userUuid: string,
     transitionType: 'items' | 'revisions',
   ): Promise<'STARTED' | 'IN_PROGRESS' | 'FAILED' | null>
+  getStatuses(
+    transitionType: 'items' | 'revisions',
+  ): Promise<Array<{ userUuid: string; status: 'STARTED' | 'IN_PROGRESS' | 'FAILED' }>>
 }

+ 18 - 0
packages/auth/src/Infra/InMemory/InMemoryTransitionStatusRepository.ts

@@ -4,6 +4,24 @@ export class InMemoryTransitionStatusRepository implements TransitionStatusRepos
   private itemStatuses: Map<string, 'STARTED' | 'FAILED'> = new Map()
   private revisionStatuses: Map<string, 'STARTED' | 'FAILED'> = new Map()
 
+  async getStatuses(
+    transitionType: 'items' | 'revisions',
+  ): Promise<{ userUuid: string; status: 'STARTED' | 'FAILED' | 'IN_PROGRESS' }[]> {
+    const statuses: { userUuid: string; status: 'STARTED' | 'FAILED' | 'IN_PROGRESS' }[] = []
+
+    if (transitionType === 'items') {
+      for (const [userUuid, status] of this.itemStatuses) {
+        statuses.push({ userUuid, status })
+      }
+    } else {
+      for (const [userUuid, status] of this.revisionStatuses) {
+        statuses.push({ userUuid, status })
+      }
+    }
+
+    return statuses
+  }
+
   async updateStatus(
     userUuid: string,
     transitionType: 'items' | 'revisions',

+ 15 - 0
packages/auth/src/Infra/Redis/RedisTransitionStatusRepository.ts

@@ -7,6 +7,21 @@ export class RedisTransitionStatusRepository implements TransitionStatusReposito
 
   constructor(private redisClient: IORedis.Redis) {}
 
+  async getStatuses(
+    transitionType: 'items' | 'revisions',
+  ): Promise<{ userUuid: string; status: 'STARTED' | 'IN_PROGRESS' | 'FAILED' }[]> {
+    const keys = await this.redisClient.keys(`${this.PREFIX}:${transitionType}:*`)
+    const statuses = await Promise.all(
+      keys.map(async (key) => {
+        const userUuid = key.split(':')[2]
+        const status = (await this.redisClient.get(key)) as 'STARTED' | 'IN_PROGRESS' | 'FAILED'
+        return { userUuid, status }
+      }),
+    )
+
+    return statuses
+  }
+
   async updateStatus(
     userUuid: string,
     transitionType: 'items' | 'revisions',

+ 1 - 0
packages/domain-events/src/Domain/Event/TransitionRequestedEventPayload.ts

@@ -1,3 +1,4 @@
 export interface TransitionRequestedEventPayload {
   userUuid: string
+  type: 'items' | 'revisions'
 }

+ 12 - 0
packages/revisions/src/Bootstrap/Container.ts

@@ -68,6 +68,7 @@ import { SQLRevisionMetadataPersistenceMapper } from '../Mapping/Persistence/SQL
 import { SQLRevisionPersistenceMapper } from '../Mapping/Persistence/SQL/SQLRevisionPersistenceMapper'
 import { RemoveRevisionsFromSharedVault } from '../Domain/UseCase/RemoveRevisionsFromSharedVault/RemoveRevisionsFromSharedVault'
 import { ItemRemovedFromSharedVaultEventHandler } from '../Domain/Handler/ItemRemovedFromSharedVaultEventHandler'
+import { TransitionRequestedEventHandler } from '../Domain/Handler/TransitionRequestedEventHandler'
 
 export class ContainerConfigLoader {
   constructor(private mode: 'server' | 'worker' = 'server') {}
@@ -458,6 +459,16 @@ export class ContainerConfigLoader {
           container.get<winston.Logger>(TYPES.Revisions_Logger),
         ),
       )
+    container
+      .bind<TransitionRequestedEventHandler>(TYPES.Revisions_TransitionRequestedEventHandler)
+      .toConstantValue(
+        new TransitionRequestedEventHandler(
+          container.get<TriggerTransitionFromPrimaryToSecondaryDatabaseForUser>(
+            TYPES.Revisions_TriggerTransitionFromPrimaryToSecondaryDatabaseForUser,
+          ),
+          container.get<winston.Logger>(TYPES.Revisions_Logger),
+        ),
+      )
 
     const eventHandlers: Map<string, DomainEventHandlerInterface> = new Map([
       ['ITEM_DUMPED', container.get(TYPES.Revisions_ItemDumpedEventHandler)],
@@ -465,6 +476,7 @@ export class ContainerConfigLoader {
       ['REVISIONS_COPY_REQUESTED', container.get(TYPES.Revisions_RevisionsCopyRequestedEventHandler)],
       ['TRANSITION_STATUS_UPDATED', container.get(TYPES.Revisions_TransitionStatusUpdatedEventHandler)],
       ['ITEM_REMOVED_FROM_SHARED_VAULT', container.get(TYPES.Revisions_ItemRemovedFromSharedVaultEventHandler)],
+      ['TRANSITION_REQUESTED', container.get(TYPES.Revisions_TransitionRequestedEventHandler)],
     ])
 
     if (isConfiguredForHomeServer) {

+ 1 - 0
packages/revisions/src/Bootstrap/Types.ts

@@ -60,6 +60,7 @@ const TYPES = {
   Revisions_RevisionsCopyRequestedEventHandler: Symbol.for('Revisions_RevisionsCopyRequestedEventHandler'),
   Revisions_TransitionStatusUpdatedEventHandler: Symbol.for('Revisions_TransitionStatusUpdatedEventHandler'),
   Revisions_ItemRemovedFromSharedVaultEventHandler: Symbol.for('Revisions_ItemRemovedFromSharedVaultEventHandler'),
+  Revisions_TransitionRequestedEventHandler: Symbol.for('Revisions_TransitionRequestedEventHandler'),
   // Services
   Revisions_CrossServiceTokenDecoder: Symbol.for('Revisions_CrossServiceTokenDecoder'),
   Revisions_DomainEventSubscriberFactory: Symbol.for('Revisions_DomainEventSubscriberFactory'),

+ 27 - 0
packages/revisions/src/Domain/Handler/TransitionRequestedEventHandler.ts

@@ -0,0 +1,27 @@
+import { DomainEventHandlerInterface, TransitionRequestedEvent } from '@standardnotes/domain-events'
+import { Logger } from 'winston'
+
+import { TriggerTransitionFromPrimaryToSecondaryDatabaseForUser } from '../UseCase/Transition/TriggerTransitionFromPrimaryToSecondaryDatabaseForUser/TriggerTransitionFromPrimaryToSecondaryDatabaseForUser'
+
+export class TransitionRequestedEventHandler implements DomainEventHandlerInterface {
+  constructor(
+    private triggerTransitionFromPrimaryToSecondaryDatabaseForUser: TriggerTransitionFromPrimaryToSecondaryDatabaseForUser,
+    private logger: Logger,
+  ) {}
+
+  async handle(event: TransitionRequestedEvent): Promise<void> {
+    if (event.payload.type !== 'revisions') {
+      return
+    }
+
+    this.logger.info(`Handling transition requested event for user ${event.payload.userUuid}`)
+
+    const result = await this.triggerTransitionFromPrimaryToSecondaryDatabaseForUser.execute({
+      userUuid: event.payload.userUuid,
+    })
+
+    if (result.isFailed()) {
+      this.logger.error(`Failed to trigger transition for user ${event.payload.userUuid}`)
+    }
+  }
+}

+ 4 - 0
packages/syncing-server/src/Domain/Handler/TransitionRequestedEventHandler.ts

@@ -10,6 +10,10 @@ export class TransitionRequestedEventHandler implements DomainEventHandlerInterf
   ) {}
 
   async handle(event: TransitionRequestedEvent): Promise<void> {
+    if (event.payload.type !== 'items') {
+      return
+    }
+
     this.logger.info(`Handling transition requested event for user ${event.payload.userUuid}`)
 
     const result = await this.triggerTransitionFromPrimaryToSecondaryDatabaseForUser.execute({