fix: keep transition in-progress status alive

This commit is contained in:
Karol Sójko 2023-09-29 14:08:23 +02:00
parent 34b956b482
commit 032cde7723
No known key found for this signature in database
GPG key ID: C2F813669419D05F
8 changed files with 122 additions and 199 deletions

View file

@ -390,6 +390,8 @@ export class ContainerConfigLoader {
container.get<TimerInterface>(TYPES.Revisions_Timer),
container.get<winston.Logger>(TYPES.Revisions_Logger),
env.get('MIGRATION_BATCH_SIZE', true) ? +env.get('MIGRATION_BATCH_SIZE', true) : 100,
container.get<DomainEventPublisherInterface>(TYPES.Revisions_DomainEventPublisher),
container.get<DomainEventFactoryInterface>(TYPES.Revisions_DomainEventFactory),
),
)
container
@ -473,9 +475,6 @@ export class ContainerConfigLoader {
container.get<TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser>(
TYPES.Revisions_TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser,
),
container.get<RevisionRepositoryInterface>(TYPES.Revisions_SQLRevisionRepository),
container.get<DomainEventPublisherInterface>(TYPES.Revisions_DomainEventPublisher),
container.get<DomainEventFactoryInterface>(TYPES.Revisions_DomainEventFactory),
container.get<winston.Logger>(TYPES.Revisions_Logger),
),
)

View file

@ -1,20 +1,10 @@
import {
DomainEventHandlerInterface,
DomainEventPublisherInterface,
TransitionRequestedEvent,
} from '@standardnotes/domain-events'
import { DomainEventHandlerInterface, TransitionRequestedEvent } from '@standardnotes/domain-events'
import { Logger } from 'winston'
import { TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser } from '../UseCase/Transition/TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser/TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser'
import { DomainEventFactoryInterface } from '../Event/DomainEventFactoryInterface'
import { RevisionRepositoryInterface } from '../Revision/RevisionRepositoryInterface'
import { TransitionStatus, Uuid } from '@standardnotes/domain-core'
export class TransitionRequestedEventHandler implements DomainEventHandlerInterface {
constructor(
private transitionRevisionsFromPrimaryToSecondaryDatabaseForUser: TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser,
private primaryRevisionsRepository: RevisionRepositoryInterface,
private domainEventPublisher: DomainEventPublisherInterface,
private domainEventFactory: DomainEventFactoryInterface,
private logger: Logger,
) {}
@ -23,94 +13,13 @@ export class TransitionRequestedEventHandler implements DomainEventHandlerInterf
return
}
const userUuid = await this.getUserUuidFromEvent(event)
if (!userUuid) {
return
}
if (await this.isAlreadyMigrated(userUuid)) {
this.logger.info(`[${event.payload.userUuid}] User already migrated.`)
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: event.payload.userUuid,
status: TransitionStatus.STATUSES.Verified,
transitionType: 'revisions',
transitionTimestamp: event.payload.timestamp,
}),
)
return
}
this.logger.info(`[${event.payload.userUuid}] Handling transition requested event`)
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: event.payload.userUuid,
status: TransitionStatus.STATUSES.InProgress,
transitionType: 'revisions',
transitionTimestamp: event.payload.timestamp,
}),
)
const result = await this.transitionRevisionsFromPrimaryToSecondaryDatabaseForUser.execute({
userUuid: event.payload.userUuid,
timestamp: event.payload.timestamp,
})
if (result.isFailed()) {
this.logger.error(`[${event.payload.userUuid}] Failed to transition: ${result.getError()}`)
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: event.payload.userUuid,
status: TransitionStatus.STATUSES.Failed,
transitionType: 'revisions',
transitionTimestamp: event.payload.timestamp,
}),
)
return
}
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: event.payload.userUuid,
status: TransitionStatus.STATUSES.Verified,
transitionType: 'revisions',
transitionTimestamp: event.payload.timestamp,
}),
)
}
private async isAlreadyMigrated(userUuid: Uuid): Promise<boolean> {
const totalRevisionsCountForUserInPrimary = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
if (totalRevisionsCountForUserInPrimary > 0) {
this.logger.info(
`[${userUuid.value}] User has ${totalRevisionsCountForUserInPrimary} revisions in primary database.`,
)
}
return totalRevisionsCountForUserInPrimary === 0
}
private async getUserUuidFromEvent(event: TransitionRequestedEvent): Promise<Uuid | null> {
const userUuidOrError = Uuid.create(event.payload.userUuid)
if (userUuidOrError.isFailed()) {
this.logger.error(`[${event.payload.userUuid}] Failed to transition revisions: ${userUuidOrError.getError()}`)
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: event.payload.userUuid,
status: TransitionStatus.STATUSES.Failed,
transitionType: 'revisions',
transitionTimestamp: event.payload.timestamp,
}),
)
return null
}
return userUuidOrError.getValue()
}
}

View file

@ -1,11 +1,13 @@
/* istanbul ignore file */
import { Result, UseCaseInterface, Uuid } from '@standardnotes/domain-core'
import { Result, TransitionStatus, UseCaseInterface, Uuid } from '@standardnotes/domain-core'
import { TimerInterface } from '@standardnotes/time'
import { Logger } from 'winston'
import { TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO } from './TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO'
import { RevisionRepositoryInterface } from '../../../Revision/RevisionRepositoryInterface'
import { TransitionRepositoryInterface } from '../../../Transition/TransitionRepositoryInterface'
import { DomainEventPublisherInterface } from '@standardnotes/domain-events'
import { DomainEventFactoryInterface } from '../../../Event/DomainEventFactoryInterface'
export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements UseCaseInterface<void> {
constructor(
@ -15,6 +17,8 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
private timer: TimerInterface,
private logger: Logger,
private pageSize: number,
private domainEventPublisher: DomainEventPublisherInterface,
private domainEventFactory: DomainEventFactoryInterface,
) {}
async execute(dto: TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO): Promise<Result<void>> {
@ -34,12 +38,24 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
}
const userUuid = userUuidOrError.getValue()
if (await this.isAlreadyMigrated(userUuid)) {
this.logger.info(`[${userUuid.value}] User already migrated.`)
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Verified, dto.timestamp)
return Result.ok()
}
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.InProgress, dto.timestamp)
const migrationTimeStart = this.timer.getTimestampInMicroseconds()
this.logger.info(`[${dto.userUuid}] Migrating revisions`)
const migrationResult = await this.migrateRevisionsForUser(userUuid)
const migrationResult = await this.migrateRevisionsForUser(userUuid, dto.timestamp)
if (migrationResult.isFailed()) {
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
return Result.fail(migrationResult.getError())
}
@ -54,11 +70,15 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(userUuid.value, 1)
await (this.transitionStatusRepository as TransitionRepositoryInterface).setIntegrityProgress(userUuid.value, 1)
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
return Result.fail(integrityCheckResult.getError())
}
const cleanupResult = await this.deleteRevisionsForUser(userUuid, this.primaryRevisionsRepository)
if (cleanupResult.isFailed()) {
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
this.logger.error(`[${dto.userUuid}] Failed to clean up primary database revisions: ${cleanupResult.getError()}`)
}
@ -71,10 +91,12 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
`[${dto.userUuid}] Transitioned revisions in ${migrationDurationTimeStructure.hours}h ${migrationDurationTimeStructure.minutes}m ${migrationDurationTimeStructure.seconds}s ${migrationDurationTimeStructure.milliseconds}ms`,
)
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Verified, dto.timestamp)
return Result.ok()
}
private async migrateRevisionsForUser(userUuid: Uuid): Promise<Result<void>> {
private async migrateRevisionsForUser(userUuid: Uuid, timestamp: number): Promise<Result<void>> {
try {
const initialPage = await (this.transitionStatusRepository as TransitionRepositoryInterface).getPagingProgress(
userUuid.value,
@ -85,6 +107,16 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
const totalRevisionsCountForUser = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
const totalPages = Math.ceil(totalRevisionsCountForUser / this.pageSize)
for (let currentPage = initialPage; currentPage <= totalPages; currentPage++) {
const isPageInEvery10Percent = currentPage % Math.ceil(totalPages / 10) === 0
if (isPageInEvery10Percent) {
this.logger.info(
`[${userUuid.value}] Migrating revisions for user: ${Math.round(
(currentPage / totalPages) * 100,
)}% completed`,
)
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.InProgress, timestamp)
}
await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(
userUuid.value,
currentPage,
@ -246,4 +278,27 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
)
}
}
private async updateTransitionStatus(userUuid: Uuid, status: string, timestamp: number): Promise<void> {
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: userUuid.value,
status,
transitionType: 'revisions',
transitionTimestamp: timestamp,
}),
)
}
private async isAlreadyMigrated(userUuid: Uuid): Promise<boolean> {
const totalRevisionsCountForUserInPrimary = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
if (totalRevisionsCountForUserInPrimary > 0) {
this.logger.info(
`[${userUuid.value}] User has ${totalRevisionsCountForUserInPrimary} revisions in primary database.`,
)
}
return totalRevisionsCountForUserInPrimary === 0
}
}

View file

@ -1,3 +1,4 @@
export interface TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO {
userUuid: string
timestamp: number
}

View file

@ -879,6 +879,8 @@ export class ContainerConfigLoader {
container.get<TimerInterface>(TYPES.Sync_Timer),
container.get<Logger>(TYPES.Sync_Logger),
env.get('MIGRATION_BATCH_SIZE', true) ? +env.get('MIGRATION_BATCH_SIZE', true) : 100,
container.get<DomainEventPublisherInterface>(TYPES.Sync_DomainEventPublisher),
container.get<DomainEventFactoryInterface>(TYPES.Sync_DomainEventFactory),
),
)
container
@ -1049,12 +1051,9 @@ export class ContainerConfigLoader {
.bind<TransitionRequestedEventHandler>(TYPES.Sync_TransitionRequestedEventHandler)
.toConstantValue(
new TransitionRequestedEventHandler(
container.get<ItemRepositoryInterface>(TYPES.Sync_SQLItemRepository),
container.get<TransitionItemsFromPrimaryToSecondaryDatabaseForUser>(
TYPES.Sync_TransitionItemsFromPrimaryToSecondaryDatabaseForUser,
),
container.get<DomainEventPublisherInterface>(TYPES.Sync_DomainEventPublisher),
container.get<DomainEventFactoryInterface>(TYPES.Sync_DomainEventFactory),
container.get<Logger>(TYPES.Sync_Logger),
),
)

View file

@ -1,21 +1,11 @@
import {
DomainEventHandlerInterface,
DomainEventPublisherInterface,
TransitionRequestedEvent,
} from '@standardnotes/domain-events'
import { DomainEventHandlerInterface, TransitionRequestedEvent } from '@standardnotes/domain-events'
import { Logger } from 'winston'
import { TransitionStatus, Uuid } from '@standardnotes/domain-core'
import { TransitionItemsFromPrimaryToSecondaryDatabaseForUser } from '../UseCase/Transition/TransitionItemsFromPrimaryToSecondaryDatabaseForUser/TransitionItemsFromPrimaryToSecondaryDatabaseForUser'
import { ItemRepositoryInterface } from '../Item/ItemRepositoryInterface'
import { DomainEventFactoryInterface } from '../Event/DomainEventFactoryInterface'
export class TransitionRequestedEventHandler implements DomainEventHandlerInterface {
constructor(
private primaryItemRepository: ItemRepositoryInterface,
private transitionItemsFromPrimaryToSecondaryDatabaseForUser: TransitionItemsFromPrimaryToSecondaryDatabaseForUser,
private domainEventPublisher: DomainEventPublisherInterface,
private domainEventFactory: DomainEventFactoryInterface,
private logger: Logger,
) {}
@ -24,95 +14,13 @@ export class TransitionRequestedEventHandler implements DomainEventHandlerInterf
return
}
const userUuid = await this.getUserUuidFromEvent(event)
if (!userUuid) {
return
}
if (await this.isAlreadyMigrated(userUuid)) {
this.logger.info(`[${event.payload.userUuid}] User already migrated.`)
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: event.payload.userUuid,
status: TransitionStatus.STATUSES.Verified,
transitionType: 'items',
transitionTimestamp: event.payload.timestamp,
}),
)
return
}
this.logger.info(`[${event.payload.userUuid}] Handling transition requested event`)
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: event.payload.userUuid,
status: TransitionStatus.STATUSES.InProgress,
transitionType: 'items',
transitionTimestamp: event.payload.timestamp,
}),
)
const result = await this.transitionItemsFromPrimaryToSecondaryDatabaseForUser.execute({
userUuid: event.payload.userUuid,
timestamp: event.payload.timestamp,
})
if (result.isFailed()) {
this.logger.error(`[${event.payload.userUuid}] Failed to trigger transition: ${result.getError()}`)
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: event.payload.userUuid,
status: TransitionStatus.STATUSES.Failed,
transitionType: 'items',
transitionTimestamp: event.payload.timestamp,
}),
)
return
}
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: event.payload.userUuid,
status: TransitionStatus.STATUSES.Verified,
transitionType: 'items',
transitionTimestamp: event.payload.timestamp,
}),
)
}
private async isAlreadyMigrated(userUuid: Uuid): Promise<boolean> {
const totalItemsCountForUserInPrimary = await this.primaryItemRepository.countAll({
userUuid: userUuid.value,
})
if (totalItemsCountForUserInPrimary > 0) {
this.logger.info(`[${userUuid.value}] User has ${totalItemsCountForUserInPrimary} items in primary database.`)
}
return totalItemsCountForUserInPrimary === 0
}
private async getUserUuidFromEvent(event: TransitionRequestedEvent): Promise<Uuid | null> {
const userUuidOrError = Uuid.create(event.payload.userUuid)
if (userUuidOrError.isFailed()) {
this.logger.error(`[${event.payload.userUuid}] Failed to transition items: ${userUuidOrError.getError()}`)
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: event.payload.userUuid,
status: TransitionStatus.STATUSES.Failed,
transitionType: 'items',
transitionTimestamp: event.payload.timestamp,
}),
)
return null
}
return userUuidOrError.getValue()
}
}

View file

@ -1,12 +1,14 @@
/* istanbul ignore file */
import { TimerInterface } from '@standardnotes/time'
import { Result, UseCaseInterface, Uuid } from '@standardnotes/domain-core'
import { Result, TransitionStatus, UseCaseInterface, Uuid } from '@standardnotes/domain-core'
import { Logger } from 'winston'
import { TransitionItemsFromPrimaryToSecondaryDatabaseForUserDTO } from './TransitionItemsFromPrimaryToSecondaryDatabaseForUserDTO'
import { ItemRepositoryInterface } from '../../../Item/ItemRepositoryInterface'
import { ItemQuery } from '../../../Item/ItemQuery'
import { TransitionRepositoryInterface } from '../../../Transition/TransitionRepositoryInterface'
import { DomainEventPublisherInterface } from '@standardnotes/domain-events'
import { DomainEventFactoryInterface } from '../../../Event/DomainEventFactoryInterface'
export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements UseCaseInterface<void> {
constructor(
@ -16,6 +18,8 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
private timer: TimerInterface,
private logger: Logger,
private pageSize: number,
private domainEventPublisher: DomainEventPublisherInterface,
private domainEventFactory: DomainEventFactoryInterface,
) {}
async execute(dto: TransitionItemsFromPrimaryToSecondaryDatabaseForUserDTO): Promise<Result<void>> {
@ -35,12 +39,22 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
}
const userUuid = userUuidOrError.getValue()
if (await this.isAlreadyMigrated(userUuid)) {
this.logger.info(`[${userUuid.value}] User already migrated.`)
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Verified, dto.timestamp)
return Result.ok()
}
const migrationTimeStart = this.timer.getTimestampInMicroseconds()
this.logger.info(`[${dto.userUuid}] Migrating items`)
const migrationResult = await this.migrateItemsForUser(userUuid)
const migrationResult = await this.migrateItemsForUser(userUuid, dto.timestamp)
if (migrationResult.isFailed()) {
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
return Result.fail(migrationResult.getError())
}
@ -55,11 +69,15 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(userUuid.value, 1)
await (this.transitionStatusRepository as TransitionRepositoryInterface).setIntegrityProgress(userUuid.value, 1)
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
return Result.fail(integrityCheckResult.getError())
}
const cleanupResult = await this.deleteItemsForUser(userUuid, this.primaryItemRepository)
if (cleanupResult.isFailed()) {
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
this.logger.error(`[${dto.userUuid}] Failed to clean up primary database items: ${cleanupResult.getError()}`)
}
@ -72,6 +90,8 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
`[${dto.userUuid}] Transitioned items in ${migrationDurationTimeStructure.hours}h ${migrationDurationTimeStructure.minutes}m ${migrationDurationTimeStructure.seconds}s ${migrationDurationTimeStructure.milliseconds}ms`,
)
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Verified, dto.timestamp)
return Result.ok()
}
@ -80,7 +100,7 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
await this.timer.sleep(twoSecondsInMilliseconds)
}
private async migrateItemsForUser(userUuid: Uuid): Promise<Result<void>> {
private async migrateItemsForUser(userUuid: Uuid, timestamp: number): Promise<Result<void>> {
try {
const initialPage = await (this.transitionStatusRepository as TransitionRepositoryInterface).getPagingProgress(
userUuid.value,
@ -91,6 +111,14 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
const totalItemsCountForUser = await this.primaryItemRepository.countAll({ userUuid: userUuid.value })
const totalPages = Math.ceil(totalItemsCountForUser / this.pageSize)
for (let currentPage = initialPage; currentPage <= totalPages; currentPage++) {
const isPageInEvery10Percent = currentPage % Math.ceil(totalPages / 10) === 0
if (isPageInEvery10Percent) {
this.logger.info(
`[${userUuid.value}] Migrating items for user: ${Math.round((currentPage / totalPages) * 100)}% completed`,
)
await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.InProgress, timestamp)
}
await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(
userUuid.value,
currentPage,
@ -229,4 +257,27 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
return Result.fail((error as Error).message)
}
}
private async updateTransitionStatus(userUuid: Uuid, status: string, timestamp: number): Promise<void> {
await this.domainEventPublisher.publish(
this.domainEventFactory.createTransitionStatusUpdatedEvent({
userUuid: userUuid.value,
status,
transitionType: 'items',
transitionTimestamp: timestamp,
}),
)
}
private async isAlreadyMigrated(userUuid: Uuid): Promise<boolean> {
const totalItemsCountForUserInPrimary = await this.primaryItemRepository.countAll({
userUuid: userUuid.value,
})
if (totalItemsCountForUserInPrimary > 0) {
this.logger.info(`[${userUuid.value}] User has ${totalItemsCountForUserInPrimary} items in primary database.`)
}
return totalItemsCountForUserInPrimary === 0
}
}

View file

@ -1,3 +1,4 @@
export interface TransitionItemsFromPrimaryToSecondaryDatabaseForUserDTO {
userUuid: string
timestamp: number
}