TransitionItemsFromPrimaryToSecondaryDatabaseForUser.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. /* istanbul ignore file */
  2. import { TimerInterface } from '@standardnotes/time'
  3. import { Result, TransitionStatus, UseCaseInterface, Uuid } from '@standardnotes/domain-core'
  4. import { Logger } from 'winston'
  5. import { TransitionItemsFromPrimaryToSecondaryDatabaseForUserDTO } from './TransitionItemsFromPrimaryToSecondaryDatabaseForUserDTO'
  6. import { ItemRepositoryInterface } from '../../../Item/ItemRepositoryInterface'
  7. import { ItemQuery } from '../../../Item/ItemQuery'
  8. import { TransitionRepositoryInterface } from '../../../Transition/TransitionRepositoryInterface'
  9. import { DomainEventPublisherInterface } from '@standardnotes/domain-events'
  10. import { DomainEventFactoryInterface } from '../../../Event/DomainEventFactoryInterface'
  11. export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements UseCaseInterface<void> {
  12. constructor(
  13. private primaryItemRepository: ItemRepositoryInterface,
  14. private secondaryItemRepository: ItemRepositoryInterface | null,
  15. private transitionStatusRepository: TransitionRepositoryInterface | null,
  16. private timer: TimerInterface,
  17. private logger: Logger,
  18. private pageSize: number,
  19. private domainEventPublisher: DomainEventPublisherInterface,
  20. private domainEventFactory: DomainEventFactoryInterface,
  21. ) {}
  22. async execute(dto: TransitionItemsFromPrimaryToSecondaryDatabaseForUserDTO): Promise<Result<void>> {
  23. this.logger.info(`[TRANSITION][${dto.userUuid}] Transitioning items`)
  24. if (this.secondaryItemRepository === null) {
  25. return Result.fail('Secondary item repository is not set')
  26. }
  27. if (this.transitionStatusRepository === null) {
  28. return Result.fail('Transition status repository is not set')
  29. }
  30. const userUuidOrError = Uuid.create(dto.userUuid)
  31. if (userUuidOrError.isFailed()) {
  32. return Result.fail(userUuidOrError.getError())
  33. }
  34. const userUuid = userUuidOrError.getValue()
  35. if (await this.isAlreadyMigrated(userUuid)) {
  36. this.logger.info(`[TRANSITION][${userUuid.value}] User already migrated.`)
  37. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Verified, dto.timestamp)
  38. return Result.ok()
  39. }
  40. const migrationTimeStart = this.timer.getTimestampInMicroseconds()
  41. this.logger.info(`[TRANSITION][${dto.userUuid}] Migrating items`)
  42. const migrationResult = await this.migrateItemsForUser(userUuid, dto.timestamp)
  43. if (migrationResult.isFailed()) {
  44. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
  45. return Result.fail(migrationResult.getError())
  46. }
  47. this.logger.info(`[TRANSITION][${dto.userUuid}] Items migrated`)
  48. await this.allowForPrimaryDatabaseToCatchUp()
  49. this.logger.info(`[TRANSITION][${dto.userUuid}] Checking integrity between primary and secondary database`)
  50. const integrityCheckResult = await this.checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid)
  51. if (integrityCheckResult.isFailed()) {
  52. await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(userUuid.value, 1)
  53. await (this.transitionStatusRepository as TransitionRepositoryInterface).setIntegrityProgress(userUuid.value, 1)
  54. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
  55. return Result.fail(integrityCheckResult.getError())
  56. }
  57. const cleanupResult = await this.deleteItemsForUser(
  58. userUuid,
  59. this.secondaryItemRepository as ItemRepositoryInterface,
  60. )
  61. if (cleanupResult.isFailed()) {
  62. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Failed, dto.timestamp)
  63. this.logger.error(
  64. `[TRANSITION][${dto.userUuid}] Failed to clean up secondary database items: ${cleanupResult.getError()}`,
  65. )
  66. }
  67. const migrationTimeEnd = this.timer.getTimestampInMicroseconds()
  68. const migrationDuration = migrationTimeEnd - migrationTimeStart
  69. const migrationDurationTimeStructure = this.timer.convertMicrosecondsToTimeStructure(migrationDuration)
  70. this.logger.info(
  71. `[TRANSITION][${dto.userUuid}] Transitioned items in ${migrationDurationTimeStructure.hours}h ${migrationDurationTimeStructure.minutes}m ${migrationDurationTimeStructure.seconds}s ${migrationDurationTimeStructure.milliseconds}ms`,
  72. )
  73. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.Verified, dto.timestamp)
  74. return Result.ok()
  75. }
  76. private async allowForPrimaryDatabaseToCatchUp(): Promise<void> {
  77. const delay = 1_000
  78. await this.timer.sleep(delay)
  79. }
  80. private async migrateItemsForUser(userUuid: Uuid, timestamp: number): Promise<Result<void>> {
  81. try {
  82. const initialPage = await (this.transitionStatusRepository as TransitionRepositoryInterface).getPagingProgress(
  83. userUuid.value,
  84. )
  85. this.logger.info(`[TRANSITION][${userUuid.value}] Migrating from page ${initialPage}`)
  86. const totalItemsCountForUser = await (this.secondaryItemRepository as ItemRepositoryInterface).countAll({
  87. userUuid: userUuid.value,
  88. })
  89. this.logger.info(`[TRANSITION][${userUuid.value}] Total items count for user: ${totalItemsCountForUser}`)
  90. const totalPages = Math.ceil(totalItemsCountForUser / this.pageSize)
  91. this.logger.info(`[TRANSITION][${userUuid.value}] Total pages: ${totalPages}`)
  92. let insertedCount = 0
  93. let updatedCount = 0
  94. let newerCount = 0
  95. let identicalCount = 0
  96. for (let currentPage = initialPage; currentPage <= totalPages; currentPage++) {
  97. const isPageInEvery10Percent = currentPage % Math.ceil(totalPages / 10) === 0
  98. if (isPageInEvery10Percent) {
  99. this.logger.info(
  100. `[TRANSITION][${userUuid.value}] Migrating items for user: ${Math.round(
  101. (currentPage / totalPages) * 100,
  102. )}% completed`,
  103. )
  104. this.logger.info(
  105. `[TRANSITION][${userUuid.value}] Inserted items count: ${insertedCount}. Newer items count: ${newerCount}. Identical items count: ${identicalCount}. Updated items count: ${updatedCount}`,
  106. )
  107. await this.updateTransitionStatus(userUuid, TransitionStatus.STATUSES.InProgress, timestamp)
  108. }
  109. await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(
  110. userUuid.value,
  111. currentPage,
  112. )
  113. const query: ItemQuery = {
  114. userUuid: userUuid.value,
  115. offset: (currentPage - 1) * this.pageSize,
  116. limit: this.pageSize,
  117. sortBy: 'created_at_timestamp',
  118. sortOrder: 'ASC',
  119. }
  120. const items = await (this.secondaryItemRepository as ItemRepositoryInterface).findAll(query)
  121. for (const item of items) {
  122. try {
  123. const itemInPrimary = await this.primaryItemRepository.findByUuid(item.uuid)
  124. if (!itemInPrimary) {
  125. await this.primaryItemRepository.insert(item)
  126. insertedCount++
  127. } else {
  128. if (itemInPrimary.props.timestamps.updatedAt > item.props.timestamps.updatedAt) {
  129. this.logger.info(
  130. `[TRANSITION][${userUuid.value}] Item ${item.uuid.value} is older in secondary than item in primary database`,
  131. )
  132. newerCount++
  133. continue
  134. }
  135. if (itemInPrimary.isIdenticalTo(item)) {
  136. identicalCount++
  137. continue
  138. }
  139. await this.primaryItemRepository.update(item)
  140. updatedCount++
  141. }
  142. } catch (error) {
  143. this.logger.error(
  144. `[TRANSITION][${userUuid.value}] Errored when saving item ${item.uuid.value} to primary database: ${
  145. (error as Error).message
  146. }`,
  147. )
  148. }
  149. }
  150. }
  151. this.logger.info(
  152. `[TRANSITION][${userUuid.value}] Inserted items count: ${insertedCount}. Newer items count: ${newerCount}. Identical items count: ${identicalCount}. Updated items count: ${updatedCount}`,
  153. )
  154. return Result.ok()
  155. } catch (error) {
  156. return Result.fail((error as Error).message)
  157. }
  158. }
  159. private async deleteItemsForUser(userUuid: Uuid, itemRepository: ItemRepositoryInterface): Promise<Result<void>> {
  160. try {
  161. this.logger.info(`[TRANSITION][${userUuid.value}] Cleaning up secondary database items`)
  162. await itemRepository.deleteByUserUuidAndNotInSharedVault(userUuid)
  163. return Result.ok()
  164. } catch (error) {
  165. return Result.fail((error as Error).message)
  166. }
  167. }
  168. private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid: Uuid): Promise<Result<boolean>> {
  169. try {
  170. const initialPage = await (this.transitionStatusRepository as TransitionRepositoryInterface).getIntegrityProgress(
  171. userUuid.value,
  172. )
  173. this.logger.info(`[TRANSITION][${userUuid.value}] Checking integrity from page ${initialPage}`)
  174. const totalItemsCountForUserInSecondary = await (
  175. this.secondaryItemRepository as ItemRepositoryInterface
  176. ).countAll({
  177. userUuid: userUuid.value,
  178. })
  179. const totalItemsCountForUserInPrimary = await this.primaryItemRepository.countAll({
  180. userUuid: userUuid.value,
  181. })
  182. if (totalItemsCountForUserInPrimary < totalItemsCountForUserInSecondary) {
  183. return Result.fail(
  184. `Total items count for user ${userUuid.value} in primary database (${totalItemsCountForUserInPrimary}) does not match total items count in secondary database (${totalItemsCountForUserInSecondary})`,
  185. )
  186. }
  187. const totalPages = Math.ceil(totalItemsCountForUserInPrimary / this.pageSize)
  188. for (let currentPage = initialPage; currentPage <= totalPages; currentPage++) {
  189. await (this.transitionStatusRepository as TransitionRepositoryInterface).setIntegrityProgress(
  190. userUuid.value,
  191. currentPage,
  192. )
  193. const query: ItemQuery = {
  194. userUuid: userUuid.value,
  195. offset: (currentPage - 1) * this.pageSize,
  196. limit: this.pageSize,
  197. sortBy: 'created_at_timestamp',
  198. sortOrder: 'ASC',
  199. }
  200. const items = await (this.secondaryItemRepository as ItemRepositoryInterface).findAll(query)
  201. for (const item of items) {
  202. const itemInPrimary = await this.primaryItemRepository.findByUuid(item.uuid)
  203. if (!itemInPrimary) {
  204. return Result.fail(`Item ${item.uuid.value} not found in primary database`)
  205. }
  206. if (itemInPrimary.props.timestamps.updatedAt > item.props.timestamps.updatedAt) {
  207. this.logger.info(
  208. `[TRANSITION][${userUuid.value}] Integrity check of Item ${item.uuid.value} - is older in secondary than item in primary database`,
  209. )
  210. continue
  211. }
  212. if (item.isIdenticalTo(itemInPrimary)) {
  213. continue
  214. }
  215. return Result.fail(
  216. `Item ${
  217. item.uuid.value
  218. } is not identical in primary and secondary database. Item in secondary database: ${JSON.stringify(
  219. item,
  220. )}, item in primary database: ${JSON.stringify(itemInPrimary)}`,
  221. )
  222. }
  223. }
  224. return Result.ok()
  225. } catch (error) {
  226. return Result.fail((error as Error).message)
  227. }
  228. }
  229. private async updateTransitionStatus(userUuid: Uuid, status: string, timestamp: number): Promise<void> {
  230. await this.domainEventPublisher.publish(
  231. this.domainEventFactory.createTransitionStatusUpdatedEvent({
  232. userUuid: userUuid.value,
  233. status,
  234. transitionType: 'items',
  235. transitionTimestamp: timestamp,
  236. }),
  237. )
  238. }
  239. private async isAlreadyMigrated(userUuid: Uuid): Promise<boolean> {
  240. const totalItemsCountForUserInSecondary = await (this.secondaryItemRepository as ItemRepositoryInterface).countAll({
  241. userUuid: userUuid.value,
  242. })
  243. if (totalItemsCountForUserInSecondary > 0) {
  244. this.logger.info(
  245. `[TRANSITION][${userUuid.value}] User has ${totalItemsCountForUserInSecondary} items in secondary database.`,
  246. )
  247. }
  248. return totalItemsCountForUserInSecondary === 0
  249. }
  250. }