TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser.ts 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import { Result, UseCaseInterface, Uuid } from '@standardnotes/domain-core'
  2. import { TimerInterface } from '@standardnotes/time'
  3. import { Logger } from 'winston'
  4. import { TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO } from './TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO'
  5. import { RevisionRepositoryInterface } from '../../../Revision/RevisionRepositoryInterface'
  6. export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements UseCaseInterface<void> {
  7. constructor(
  8. private primaryRevisionsRepository: RevisionRepositoryInterface,
  9. private secondRevisionsRepository: RevisionRepositoryInterface | null,
  10. private timer: TimerInterface,
  11. private logger: Logger,
  12. private pageSize: number,
  13. ) {}
  14. async execute(dto: TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO): Promise<Result<void>> {
  15. if (this.secondRevisionsRepository === null) {
  16. return Result.fail('Secondary revision repository is not set')
  17. }
  18. const userUuidOrError = Uuid.create(dto.userUuid)
  19. if (userUuidOrError.isFailed()) {
  20. return Result.fail(userUuidOrError.getError())
  21. }
  22. const userUuid = userUuidOrError.getValue()
  23. if (await this.isAlreadyMigrated(userUuid)) {
  24. this.logger.info(`Revisions for user ${userUuid.value} are already migrated`)
  25. return Result.ok()
  26. }
  27. const migrationTimeStart = this.timer.getTimestampInMicroseconds()
  28. this.logger.debug(`Transitioning revisions for user ${userUuid.value}`)
  29. const migrationResult = await this.migrateRevisionsForUser(userUuid)
  30. if (migrationResult.isFailed()) {
  31. const cleanupResult = await this.deleteRevisionsForUser(userUuid, this.secondRevisionsRepository)
  32. if (cleanupResult.isFailed()) {
  33. this.logger.error(
  34. `Failed to clean up secondary database revisions for user ${userUuid.value}: ${cleanupResult.getError()}`,
  35. )
  36. }
  37. return Result.fail(migrationResult.getError())
  38. }
  39. await this.allowForSecondaryDatabaseToCatchUp()
  40. const integrityCheckResult = await this.checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid)
  41. if (integrityCheckResult.isFailed()) {
  42. const cleanupResult = await this.deleteRevisionsForUser(userUuid, this.secondRevisionsRepository)
  43. if (cleanupResult.isFailed()) {
  44. this.logger.error(
  45. `Failed to clean up secondary database revisions for user ${userUuid.value}: ${cleanupResult.getError()}`,
  46. )
  47. }
  48. return Result.fail(integrityCheckResult.getError())
  49. }
  50. const cleanupResult = await this.deleteRevisionsForUser(userUuid, this.primaryRevisionsRepository)
  51. if (cleanupResult.isFailed()) {
  52. this.logger.error(
  53. `Failed to clean up primary database revisions for user ${userUuid.value}: ${cleanupResult.getError()}`,
  54. )
  55. }
  56. const migrationTimeEnd = this.timer.getTimestampInMicroseconds()
  57. const migrationDuration = migrationTimeEnd - migrationTimeStart
  58. const migrationDurationTimeStructure = this.timer.convertMicrosecondsToTimeStructure(migrationDuration)
  59. this.logger.info(
  60. `Transitioned revisions for user ${userUuid.value} in ${migrationDurationTimeStructure.hours}h ${migrationDurationTimeStructure.minutes}m ${migrationDurationTimeStructure.seconds}s ${migrationDurationTimeStructure.milliseconds}ms`,
  61. )
  62. return Result.ok()
  63. }
  64. private async migrateRevisionsForUser(userUuid: Uuid): Promise<Result<void>> {
  65. try {
  66. const totalRevisionsCountForUser = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
  67. let totalRevisionsCountTransitionedToSecondary = 0
  68. const totalPages = Math.ceil(totalRevisionsCountForUser / this.pageSize)
  69. for (let currentPage = 1; currentPage <= totalPages; currentPage++) {
  70. const query = {
  71. userUuid: userUuid,
  72. offset: (currentPage - 1) * this.pageSize,
  73. limit: this.pageSize,
  74. }
  75. const revisions = await this.primaryRevisionsRepository.findByUserUuid(query)
  76. for (const revision of revisions) {
  77. try {
  78. this.logger.debug(
  79. `Transitioning revision #${
  80. totalRevisionsCountTransitionedToSecondary + 1
  81. }: ${revision.id.toString()} to secondary database`,
  82. )
  83. const didSave = await (this.secondRevisionsRepository as RevisionRepositoryInterface).insert(revision)
  84. if (!didSave) {
  85. return Result.fail(`Failed to save revision ${revision.id.toString()} to secondary database`)
  86. }
  87. totalRevisionsCountTransitionedToSecondary++
  88. } catch (error) {
  89. return Result.fail(
  90. `Errored when saving revision ${revision.id.toString()} to secondary database: ${
  91. (error as Error).message
  92. }`,
  93. )
  94. }
  95. }
  96. }
  97. this.logger.debug(`Transitioned ${totalRevisionsCountTransitionedToSecondary} revisions to secondary database`)
  98. return Result.ok()
  99. } catch (error) {
  100. return Result.fail(`Errored when migrating revisions for user ${userUuid.value}: ${(error as Error).message}`)
  101. }
  102. }
  103. private async deleteRevisionsForUser(
  104. userUuid: Uuid,
  105. revisionRepository: RevisionRepositoryInterface,
  106. ): Promise<Result<void>> {
  107. try {
  108. await revisionRepository.removeByUserUuid(userUuid)
  109. return Result.ok()
  110. } catch (error) {
  111. return Result.fail(`Errored when deleting revisions for user ${userUuid.value}: ${(error as Error).message}`)
  112. }
  113. }
  114. private async allowForSecondaryDatabaseToCatchUp(): Promise<void> {
  115. const twoSecondsInMilliseconds = 2_000
  116. await this.timer.sleep(twoSecondsInMilliseconds)
  117. }
  118. private async isAlreadyMigrated(userUuid: Uuid): Promise<boolean> {
  119. const totalRevisionsCountForUserInPrimary = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
  120. return totalRevisionsCountForUserInPrimary === 0
  121. }
  122. private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid: Uuid): Promise<Result<boolean>> {
  123. try {
  124. const totalRevisionsCountForUserInPrimary = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
  125. const totalPages = Math.ceil(totalRevisionsCountForUserInPrimary / this.pageSize)
  126. for (let currentPage = 1; currentPage <= totalPages; currentPage++) {
  127. const query = {
  128. userUuid: userUuid,
  129. offset: (currentPage - 1) * this.pageSize,
  130. limit: this.pageSize,
  131. }
  132. const revisions = await this.primaryRevisionsRepository.findByUserUuid(query)
  133. for (const revision of revisions) {
  134. const revisionUuidOrError = Uuid.create(revision.id.toString())
  135. /* istanbul ignore if */
  136. if (revisionUuidOrError.isFailed()) {
  137. return Result.fail(revisionUuidOrError.getError())
  138. }
  139. const revisionUuid = revisionUuidOrError.getValue()
  140. const revisionInSecondary = await (
  141. this.secondRevisionsRepository as RevisionRepositoryInterface
  142. ).findOneByUuid(revisionUuid, userUuid, [])
  143. if (!revisionInSecondary) {
  144. return Result.fail(`Revision ${revision.id.toString()} not found in secondary database`)
  145. }
  146. if (!revision.isIdenticalTo(revisionInSecondary)) {
  147. return Result.fail(
  148. `Revision ${revision.id.toString()} is not identical in primary and secondary database. Revision in primary database: ${JSON.stringify(
  149. revision,
  150. )}, revision in secondary database: ${JSON.stringify(revisionInSecondary)}`,
  151. )
  152. }
  153. }
  154. }
  155. const totalRevisionsCountForUserInSecondary = await (
  156. this.secondRevisionsRepository as RevisionRepositoryInterface
  157. ).countByUserUuid(userUuid)
  158. if (totalRevisionsCountForUserInPrimary !== totalRevisionsCountForUserInSecondary) {
  159. return Result.fail(
  160. `Total revisions count for user ${userUuid.value} in primary database (${totalRevisionsCountForUserInPrimary}) does not match total revisions count in secondary database (${totalRevisionsCountForUserInSecondary})`,
  161. )
  162. }
  163. return Result.ok()
  164. } catch (error) {
  165. return Result.fail(
  166. `Errored when checking integrity between primary and secondary database: ${(error as Error).message}`,
  167. )
  168. }
  169. }
  170. }