TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser.ts 14 KB


  1. /* istanbul ignore file */
  2. import { Result, TransitionStatus, UseCaseInterface, Uuid } from '@standardnotes/domain-core'
  3. import { TimerInterface } from '@standardnotes/time'
  4. import { Logger } from 'winston'
  5. import { TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO } from './TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO'
  6. import { RevisionRepositoryInterface } from '../../../Revision/RevisionRepositoryInterface'
  7. import { TransitionRepositoryInterface } from '../../../Transition/TransitionRepositoryInterface'
  8. import { DomainEventPublisherInterface } from '@standardnotes/domain-events'
  9. import { DomainEventFactoryInterface } from '../../../Event/DomainEventFactoryInterface'
  10. export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements UseCaseInterface<void> {
  11. constructor(
  12. private primaryRevisionsRepository: RevisionRepositoryInterface,
  13. private secondRevisionsRepository: RevisionRepositoryInterface | null,
  14. private transitionStatusRepository: TransitionRepositoryInterface | null,
  15. private timer: TimerInterface,
  16. private logger: Logger,
  17. private pageSize: number,
  18. private domainEventPublisher: DomainEventPublisherInterface,
  19. private domainEventFactory: DomainEventFactoryInterface,
  20. ) {}
  21. async execute(dto: TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO): Promise<Result<void>> {
  22. this.logger.info(`[TRANSITION][${dto.userUuid}] Transitioning revisions for user`)
  23. if (this.secondRevisionsRepository === null) {
  24. return Result.fail('Secondary revision repository is not set')
  25. }
  26. if (this.transitionStatusRepository === null) {
  27. return Result.fail('Transition status repository is not set')
  28. }
  29. const userUuidOrError = Uuid.create(dto.userUuid)
  30. if (userUuidOrError.isFailed()) {
  31. return Result.fail(userUuidOrError.getError())
  32. }
  33. const userUuid = userUuidOrError.getValue()
  34. if (await this.isAlreadyMigrated(userUuid)) {
  35. this.logger.info(`[TRANSITION][${userUuid.value}] User already migrated.`)
  36. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Verified, dto.timestamp)
  37. return Result.ok()
  38. }
  39. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.InProgress, dto.timestamp)
  40. const migrationTimeStart = this.timer.getTimestampInMicroseconds()
  41. this.logger.info(`[TRANSITION][${dto.userUuid}] Migrating revisions`)
  42. const migrationResult = await this.migrateRevisionsForUser(userUuid, dto.timestamp)
  43. if (migrationResult.isFailed()) {
  44. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
  45. return Result.fail(migrationResult.getError())
  46. }
  47. this.logger.info(`[TRANSITION][${dto.userUuid}] Revisions migrated`)
  48. await this.allowForPrimaryDatabaseToCatchUp()
  49. this.logger.info(`[TRANSITION][${dto.userUuid}] Checking integrity between primary and secondary database`)
  50. const integrityCheckResult = await this.checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid)
  51. if (integrityCheckResult.isFailed()) {
  52. await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(userUuid.value, 1)
  53. await (this.transitionStatusRepository as TransitionRepositoryInterface).setIntegrityProgress(userUuid.value, 1)
  54. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
  55. return Result.fail(integrityCheckResult.getError())
  56. }
  57. const cleanupResult = await this.deleteRevisionsForUser(
  58. userUuid,
  59. this.secondRevisionsRepository as RevisionRepositoryInterface,
  60. )
  61. if (cleanupResult.isFailed()) {
  62. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
  63. this.logger.error(
  64. `[TRANSITION][${dto.userUuid}] Failed to clean up secondary database revisions: ${cleanupResult.getError()}`,
  65. )
  66. }
  67. const migrationTimeEnd = this.timer.getTimestampInMicroseconds()
  68. const migrationDuration = migrationTimeEnd - migrationTimeStart
  69. const migrationDurationTimeStructure = this.timer.convertMicrosecondsToTimeStructure(migrationDuration)
  70. this.logger.info(
  71. `[TRANSITION][${dto.userUuid}] Transitioned revisions in ${migrationDurationTimeStructure.hours}h ${migrationDurationTimeStructure.minutes}m ${migrationDurationTimeStructure.seconds}s ${migrationDurationTimeStructure.milliseconds}ms`,
  72. )
  73. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Verified, dto.timestamp)
  74. return Result.ok()
  75. }
  76. private async migrateRevisionsForUser(userUuid: Uuid, timestamp: number): Promise<Result<void>> {
  77. try {
  78. const initialPage = await (this.transitionStatusRepository as TransitionRepositoryInterface).getPagingProgress(
  79. userUuid.value,
  80. )
  81. this.logger.info(`[TRANSITION][${userUuid.value}] Migrating from page ${initialPage}`)
  82. const totalRevisionsCountForUser = await (
  83. this.secondRevisionsRepository as RevisionRepositoryInterface
  84. ).countByUserUuid(userUuid)
  85. this.logger.info(`[TRANSITION][${userUuid.value}] Total revisions count for user: ${totalRevisionsCountForUser}`)
  86. const totalPages = Math.ceil(totalRevisionsCountForUser / this.pageSize)
  87. this.logger.info(`[TRANSITION][${userUuid.value}] Total pages: ${totalPages}`)
  88. let insertedCount = 0
  89. let newerCount = 0
  90. let identicalCount = 0
  91. let updatedCount = 0
  92. let duplicatedCount = 0
  93. const processedUuids = new Set<string>()
  94. for (let currentPage = initialPage; currentPage <= totalPages; currentPage++) {
  95. const isPageInEvery10Percent = currentPage % Math.ceil(totalPages / 10) === 0
  96. if (isPageInEvery10Percent) {
  97. this.logger.info(
  98. `[TRANSITION][${userUuid.value}] Migrating revisions for user: ${Math.round(
  99. (currentPage / totalPages) * 100,
  100. )}% completed`,
  101. )
  102. this.logger.info(
  103. `[TRANSITION][${userUuid.value}] Inserted ${insertedCount} revisions so far. Skipped ${newerCount} revisions because they were newer in primary database. Skipped ${identicalCount} revisions because they were identical in primary and secondary database. Updated ${updatedCount} revisions because they were older in primary database.`,
  104. )
  105. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.InProgress, timestamp)
  106. }
  107. await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(
  108. userUuid.value,
  109. currentPage,
  110. )
  111. const query = {
  112. userUuid: userUuid,
  113. offset: (currentPage - 1) * this.pageSize,
  114. limit: this.pageSize,
  115. }
  116. const revisions = await (this.secondRevisionsRepository as RevisionRepositoryInterface).findByUserUuid(query)
  117. for (const revision of revisions) {
  118. try {
  119. if (processedUuids.has(revision.id.toString())) {
  120. this.logger.warn(
  121. `[TRANSITION][${userUuid.value}] Revision ${revision.id.toString()} was already processed`,
  122. )
  123. duplicatedCount++
  124. } else {
  125. processedUuids.add(revision.id.toString())
  126. }
  127. const revisionInPrimary = await this.primaryRevisionsRepository.findOneByUuid(
  128. Uuid.create(revision.id.toString()).getValue(),
  129. revision.props.userUuid as Uuid,
  130. [],
  131. )
  132. if (!revisionInPrimary) {
  133. await this.primaryRevisionsRepository.insert(revision)
  134. insertedCount++
  135. } else {
  136. if (revisionInPrimary.props.dates.updatedAt > revision.props.dates.updatedAt) {
  137. this.logger.info(
  138. `[TRANSITION][${
  139. userUuid.value
  140. }] Revision ${revision.id.toString()} is older in secondary than revision in primary database`,
  141. )
  142. newerCount++
  143. continue
  144. }
  145. if (revisionInPrimary.isIdenticalTo(revision)) {
  146. identicalCount++
  147. continue
  148. }
  149. await this.primaryRevisionsRepository.update(revision)
  150. updatedCount++
  151. }
  152. } catch (error) {
  153. this.logger.error(
  154. `[TRANSITION][${
  155. userUuid.value
  156. }] Errored when saving revision ${revision.id.toString()} to primary database: ${
  157. (error as Error).message
  158. }`,
  159. )
  160. }
  161. }
  162. }
  163. this.logger.info(
  164. `[TRANSITION][${userUuid.value}] Inserted ${insertedCount} revisions. Skipped ${newerCount} revisions because they were newer in primary database. Skipped ${identicalCount} revisions because they were identical in primary and secondary database. Updated ${updatedCount} revisions because they were older in primary database.`,
  165. )
  166. if (duplicatedCount > 0) {
  167. this.logger.warn(`[TRANSITION][${userUuid.value}] Skipped ${duplicatedCount} duplicated revisions`)
  168. }
  169. return Result.ok()
  170. } catch (error) {
  171. return Result.fail(`Errored when migrating revisions for user ${userUuid.value}: ${(error as Error).message}`)
  172. }
  173. }
  174. private async deleteRevisionsForUser(
  175. userUuid: Uuid,
  176. revisionRepository: RevisionRepositoryInterface,
  177. ): Promise<Result<void>> {
  178. try {
  179. this.logger.info(`[TRANSITION][${userUuid.value}] Deleting all revisions from secondary database`)
  180. await revisionRepository.removeByUserUuid(userUuid)
  181. return Result.ok()
  182. } catch (error) {
  183. return Result.fail(`Errored when deleting revisions for user ${userUuid.value}: ${(error as Error).message}`)
  184. }
  185. }
  186. private async allowForPrimaryDatabaseToCatchUp(): Promise<void> {
  187. const delay = 1_000
  188. await this.timer.sleep(delay)
  189. }
  190. private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid: Uuid): Promise<Result<boolean>> {
  191. try {
  192. const initialPage = await (this.transitionStatusRepository as TransitionRepositoryInterface).getIntegrityProgress(
  193. userUuid.value,
  194. )
  195. this.logger.info(`[TRANSITION][${userUuid.value}] Checking integrity from page ${initialPage}`)
  196. const totalRevisionsCountForUserInSecondary = await (
  197. this.secondRevisionsRepository as RevisionRepositoryInterface
  198. ).countByUserUuid(userUuid)
  199. const totalRevisionsCountForUserInPrimary = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
  200. if (totalRevisionsCountForUserInPrimary < totalRevisionsCountForUserInSecondary) {
  201. return Result.fail(
  202. `Total revisions count for user ${userUuid.value} in primary database (${totalRevisionsCountForUserInPrimary}) does not match total revisions count in secondary database (${totalRevisionsCountForUserInSecondary})`,
  203. )
  204. }
  205. const totalPages = Math.ceil(totalRevisionsCountForUserInPrimary / this.pageSize)
  206. for (let currentPage = initialPage; currentPage <= totalPages; currentPage++) {
  207. await (this.transitionStatusRepository as TransitionRepositoryInterface).setIntegrityProgress(
  208. userUuid.value,
  209. currentPage,
  210. )
  211. const query = {
  212. userUuid: userUuid,
  213. offset: (currentPage - 1) * this.pageSize,
  214. limit: this.pageSize,
  215. }
  216. const revisions = await (this.secondRevisionsRepository as RevisionRepositoryInterface).findByUserUuid(query)
  217. for (const revision of revisions) {
  218. const revisionUuidOrError = Uuid.create(revision.id.toString())
  219. /* istanbul ignore if */
  220. if (revisionUuidOrError.isFailed()) {
  221. return Result.fail(revisionUuidOrError.getError())
  222. }
  223. const revisionUuid = revisionUuidOrError.getValue()
  224. const revisionInPrimary = await this.primaryRevisionsRepository.findOneByUuid(revisionUuid, userUuid, [])
  225. if (!revisionInPrimary) {
  226. return Result.fail(`Revision ${revision.id.toString()} not found in primary database`)
  227. }
  228. if (revisionInPrimary.props.dates.updatedAt > revision.props.dates.updatedAt) {
  229. this.logger.info(
  230. `[TRANSITION][${
  231. userUuid.value
  232. }] Integrity check of revision ${revision.id.toString()} - is older in secondary than revision in primary database`,
  233. )
  234. continue
  235. }
  236. if (revision.isIdenticalTo(revisionInPrimary)) {
  237. continue
  238. }
  239. return Result.fail(
  240. `Revision ${revision.id.toString()} is not identical in primary and secondary database. Revision in primary database: ${JSON.stringify(
  241. revisionInPrimary,
  242. )}, revision in secondary database: ${JSON.stringify(revision)}`,
  243. )
  244. }
  245. }
  246. return Result.ok()
  247. } catch (error) {
  248. return Result.fail(
  249. `Errored when checking integrity between primary and secondary database: ${(error as Error).message}`,
  250. )
  251. }
  252. }
  253. private async updateTransitionStatus(userUuid: Uuid, status: string, timestamp: number): Promise<void> {
  254. await this.domainEventPublisher.publish(
  255. this.domainEventFactory.createTransitionStatusUpdatedEvent({
  256. userUuid: userUuid.value,
  257. status,
  258. transitionType: 'revisions',
  259. transitionTimestamp: timestamp,
  260. }),
  261. )
  262. }
  263. private async isAlreadyMigrated(userUuid: Uuid): Promise<boolean> {
  264. const totalRevisionsCountForUserInSecondary = await (
  265. this.secondRevisionsRepository as RevisionRepositoryInterface
  266. ).countByUserUuid(userUuid)
  267. if (totalRevisionsCountForUserInSecondary > 0) {
  268. this.logger.info(
  269. `[TRANSITION][${userUuid.value}] User has ${totalRevisionsCountForUserInSecondary} revisions in secondary database.`,
  270. )
  271. }
  272. return totalRevisionsCountForUserInSecondary === 0
  273. }
  274. }