TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. /* istanbul ignore file */
  2. import { Result, TransitionStatus, UseCaseInterface, Uuid } from '@standardnotes/domain-core'
  3. import { TimerInterface } from '@standardnotes/time'
  4. import { Logger } from 'winston'
  5. import { TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO } from './TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO'
  6. import { RevisionRepositoryInterface } from '../../../Revision/RevisionRepositoryInterface'
  7. import { TransitionRepositoryInterface } from '../../../Transition/TransitionRepositoryInterface'
  8. import { DomainEventPublisherInterface } from '@standardnotes/domain-events'
  9. import { DomainEventFactoryInterface } from '../../../Event/DomainEventFactoryInterface'
  10. export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements UseCaseInterface<void> {
  11. constructor(
  12. private primaryRevisionsRepository: RevisionRepositoryInterface,
  13. private secondRevisionsRepository: RevisionRepositoryInterface | null,
  14. private transitionStatusRepository: TransitionRepositoryInterface | null,
  15. private timer: TimerInterface,
  16. private logger: Logger,
  17. private pageSize: number,
  18. private domainEventPublisher: DomainEventPublisherInterface,
  19. private domainEventFactory: DomainEventFactoryInterface,
  20. ) {}
  21. async execute(dto: TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO): Promise<Result<void>> {
  22. this.logger.info(`[${dto.userUuid}] Transitioning revisions for user`)
  23. if (this.secondRevisionsRepository === null) {
  24. return Result.fail('Secondary revision repository is not set')
  25. }
  26. if (this.transitionStatusRepository === null) {
  27. return Result.fail('Transition status repository is not set')
  28. }
  29. const userUuidOrError = Uuid.create(dto.userUuid)
  30. if (userUuidOrError.isFailed()) {
  31. return Result.fail(userUuidOrError.getError())
  32. }
  33. const userUuid = userUuidOrError.getValue()
  34. if (await this.isAlreadyMigrated(userUuid)) {
  35. this.logger.info(`[${userUuid.value}] User already migrated.`)
  36. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Verified, dto.timestamp)
  37. return Result.ok()
  38. }
  39. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.InProgress, dto.timestamp)
  40. const migrationTimeStart = this.timer.getTimestampInMicroseconds()
  41. this.logger.info(`[${dto.userUuid}] Migrating revisions`)
  42. const migrationResult = await this.migrateRevisionsForUser(userUuid, dto.timestamp)
  43. if (migrationResult.isFailed()) {
  44. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
  45. return Result.fail(migrationResult.getError())
  46. }
  47. this.logger.info(`[${dto.userUuid}] Revisions migrated`)
  48. await this.allowForPrimaryDatabaseToCatchUp()
  49. this.logger.info(`[${dto.userUuid}] Checking integrity between primary and secondary database`)
  50. const integrityCheckResult = await this.checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid)
  51. if (integrityCheckResult.isFailed()) {
  52. await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(userUuid.value, 1)
  53. await (this.transitionStatusRepository as TransitionRepositoryInterface).setIntegrityProgress(userUuid.value, 1)
  54. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
  55. return Result.fail(integrityCheckResult.getError())
  56. }
  57. const cleanupResult = await this.deleteRevisionsForUser(
  58. userUuid,
  59. this.secondRevisionsRepository as RevisionRepositoryInterface,
  60. )
  61. if (cleanupResult.isFailed()) {
  62. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
  63. this.logger.error(
  64. `[${dto.userUuid}] Failed to clean up secondary database revisions: ${cleanupResult.getError()}`,
  65. )
  66. }
  67. const migrationTimeEnd = this.timer.getTimestampInMicroseconds()
  68. const migrationDuration = migrationTimeEnd - migrationTimeStart
  69. const migrationDurationTimeStructure = this.timer.convertMicrosecondsToTimeStructure(migrationDuration)
  70. this.logger.info(
  71. `[${dto.userUuid}] Transitioned revisions in ${migrationDurationTimeStructure.hours}h ${migrationDurationTimeStructure.minutes}m ${migrationDurationTimeStructure.seconds}s ${migrationDurationTimeStructure.milliseconds}ms`,
  72. )
  73. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Verified, dto.timestamp)
  74. return Result.ok()
  75. }
  76. private async migrateRevisionsForUser(userUuid: Uuid, timestamp: number): Promise<Result<void>> {
  77. try {
  78. const initialPage = await (this.transitionStatusRepository as TransitionRepositoryInterface).getPagingProgress(
  79. userUuid.value,
  80. )
  81. this.logger.info(`[${userUuid.value}] Migrating from page ${initialPage}`)
  82. const totalRevisionsCountForUser = await (
  83. this.secondRevisionsRepository as RevisionRepositoryInterface
  84. ).countByUserUuid(userUuid)
  85. const totalPages = Math.ceil(totalRevisionsCountForUser / this.pageSize)
  86. for (let currentPage = initialPage; currentPage <= totalPages; currentPage++) {
  87. const isPageInEvery10Percent = currentPage % Math.ceil(totalPages / 10) === 0
  88. if (isPageInEvery10Percent) {
  89. this.logger.info(
  90. `[${userUuid.value}] Migrating revisions for user: ${Math.round(
  91. (currentPage / totalPages) * 100,
  92. )}% completed`,
  93. )
  94. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.InProgress, timestamp)
  95. }
  96. await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(
  97. userUuid.value,
  98. currentPage,
  99. )
  100. const query = {
  101. userUuid: userUuid,
  102. offset: (currentPage - 1) * this.pageSize,
  103. limit: this.pageSize,
  104. }
  105. const revisions = await (this.secondRevisionsRepository as RevisionRepositoryInterface).findByUserUuid(query)
  106. for (const revision of revisions) {
  107. try {
  108. const revisionInPrimary = await this.primaryRevisionsRepository.findOneByUuid(
  109. Uuid.create(revision.id.toString()).getValue(),
  110. revision.props.userUuid as Uuid,
  111. [],
  112. )
  113. if (revisionInPrimary !== null) {
  114. if (revisionInPrimary.props.dates.updatedAt > revision.props.dates.updatedAt) {
  115. this.logger.info(
  116. `[${
  117. userUuid.value
  118. }] Revision ${revision.id.toString()} is older in secondary than revision in primary database`,
  119. )
  120. continue
  121. }
  122. if (revisionInPrimary.isIdenticalTo(revision)) {
  123. continue
  124. }
  125. this.logger.info(
  126. `[${
  127. userUuid.value
  128. }] Removing revision ${revision.id.toString()} in primary database as it is not identical to revision in secondary database`,
  129. )
  130. await this.primaryRevisionsRepository.removeOneByUuid(
  131. Uuid.create(revisionInPrimary.id.toString()).getValue(),
  132. revisionInPrimary.props.userUuid as Uuid,
  133. )
  134. await this.allowForPrimaryDatabaseToCatchUp()
  135. }
  136. const didSave = await this.primaryRevisionsRepository.insert(revision)
  137. if (!didSave) {
  138. this.logger.error(`Failed to save revision ${revision.id.toString()} to primary database`)
  139. }
  140. } catch (error) {
  141. this.logger.error(
  142. `Errored when saving revision ${revision.id.toString()} to primary database: ${(error as Error).message}`,
  143. )
  144. }
  145. }
  146. }
  147. return Result.ok()
  148. } catch (error) {
  149. return Result.fail(`Errored when migrating revisions for user ${userUuid.value}: ${(error as Error).message}`)
  150. }
  151. }
  152. private async deleteRevisionsForUser(
  153. userUuid: Uuid,
  154. revisionRepository: RevisionRepositoryInterface,
  155. ): Promise<Result<void>> {
  156. try {
  157. this.logger.info(`[${userUuid.value}] Deleting all revisions from secondary database`)
  158. await revisionRepository.removeByUserUuid(userUuid)
  159. return Result.ok()
  160. } catch (error) {
  161. return Result.fail(`Errored when deleting revisions for user ${userUuid.value}: ${(error as Error).message}`)
  162. }
  163. }
  164. private async allowForPrimaryDatabaseToCatchUp(): Promise<void> {
  165. const twoSecondsInMilliseconds = 2_000
  166. await this.timer.sleep(twoSecondsInMilliseconds)
  167. }
  168. private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid: Uuid): Promise<Result<boolean>> {
  169. try {
  170. const initialPage = await (this.transitionStatusRepository as TransitionRepositoryInterface).getIntegrityProgress(
  171. userUuid.value,
  172. )
  173. this.logger.info(`[${userUuid.value}] Checking integrity from page ${initialPage}`)
  174. const totalRevisionsCountForUserInSecondary = await (
  175. this.secondRevisionsRepository as RevisionRepositoryInterface
  176. ).countByUserUuid(userUuid)
  177. const totalRevisionsCountForUserInPrimary = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
  178. if (totalRevisionsCountForUserInPrimary < totalRevisionsCountForUserInSecondary) {
  179. return Result.fail(
  180. `Total revisions count for user ${userUuid.value} in primary database (${totalRevisionsCountForUserInPrimary}) does not match total revisions count in secondary database (${totalRevisionsCountForUserInSecondary})`,
  181. )
  182. }
  183. const totalPages = Math.ceil(totalRevisionsCountForUserInPrimary / this.pageSize)
  184. for (let currentPage = initialPage; currentPage <= totalPages; currentPage++) {
  185. await (this.transitionStatusRepository as TransitionRepositoryInterface).setIntegrityProgress(
  186. userUuid.value,
  187. currentPage,
  188. )
  189. const query = {
  190. userUuid: userUuid,
  191. offset: (currentPage - 1) * this.pageSize,
  192. limit: this.pageSize,
  193. }
  194. const revisions = await (this.secondRevisionsRepository as RevisionRepositoryInterface).findByUserUuid(query)
  195. for (const revision of revisions) {
  196. const revisionUuidOrError = Uuid.create(revision.id.toString())
  197. /* istanbul ignore if */
  198. if (revisionUuidOrError.isFailed()) {
  199. return Result.fail(revisionUuidOrError.getError())
  200. }
  201. const revisionUuid = revisionUuidOrError.getValue()
  202. const revisionInPrimary = await this.primaryRevisionsRepository.findOneByUuid(revisionUuid, userUuid, [])
  203. if (!revisionInPrimary) {
  204. return Result.fail(`Revision ${revision.id.toString()} not found in primary database`)
  205. }
  206. if (revisionInPrimary.props.dates.updatedAt > revision.props.dates.updatedAt) {
  207. this.logger.info(
  208. `[${
  209. userUuid.value
  210. }] Integrity check of revision ${revision.id.toString()} - is older in secondary than revision in primary database`,
  211. )
  212. continue
  213. }
  214. if (revision.isIdenticalTo(revisionInPrimary)) {
  215. continue
  216. }
  217. return Result.fail(
  218. `Revision ${revision.id.toString()} is not identical in primary and secondary database. Revision in primary database: ${JSON.stringify(
  219. revisionInPrimary,
  220. )}, revision in secondary database: ${JSON.stringify(revision)}`,
  221. )
  222. }
  223. }
  224. return Result.ok()
  225. } catch (error) {
  226. return Result.fail(
  227. `Errored when checking integrity between primary and secondary database: ${(error as Error).message}`,
  228. )
  229. }
  230. }
  231. private async updateTransitionStatus(userUuid: Uuid, status: string, timestamp: number): Promise<void> {
  232. await this.domainEventPublisher.publish(
  233. this.domainEventFactory.createTransitionStatusUpdatedEvent({
  234. userUuid: userUuid.value,
  235. status,
  236. transitionType: 'revisions',
  237. transitionTimestamp: timestamp,
  238. }),
  239. )
  240. }
  241. private async isAlreadyMigrated(userUuid: Uuid): Promise<boolean> {
  242. const totalRevisionsCountForUserInSecondary = await (
  243. this.secondRevisionsRepository as RevisionRepositoryInterface
  244. ).countByUserUuid(userUuid)
  245. if (totalRevisionsCountForUserInSecondary > 0) {
  246. this.logger.info(
  247. `[${userUuid.value}] User has ${totalRevisionsCountForUserInSecondary} revisions in primary database.`,
  248. )
  249. }
  250. return totalRevisionsCountForUserInSecondary === 0
  251. }
  252. }