TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser.ts 14 KB


  1. /* istanbul ignore file */
  2. import { Result, UseCaseInterface, Uuid } from '@standardnotes/domain-core'
  3. import { TimerInterface } from '@standardnotes/time'
  4. import { Logger } from 'winston'
  5. import { TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO } from './TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO'
  6. import { RevisionRepositoryInterface } from '../../../Revision/RevisionRepositoryInterface'
  7. import { Revision } from '../../../Revision/Revision'
  8. export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements UseCaseInterface<void> {
  9. constructor(
  10. private primaryRevisionsRepository: RevisionRepositoryInterface,
  11. private secondRevisionsRepository: RevisionRepositoryInterface | null,
  12. private timer: TimerInterface,
  13. private logger: Logger,
  14. private pageSize: number,
  15. ) {}
  16. async execute(dto: TransitionRevisionsFromPrimaryToSecondaryDatabaseForUserDTO): Promise<Result<void>> {
  17. this.logger.info(`[${dto.userUuid}] Transitioning revisions for user`)
  18. if (this.secondRevisionsRepository === null) {
  19. return Result.fail('Secondary revision repository is not set')
  20. }
  21. const userUuidOrError = Uuid.create(dto.userUuid)
  22. if (userUuidOrError.isFailed()) {
  23. return Result.fail(userUuidOrError.getError())
  24. }
  25. const userUuid = userUuidOrError.getValue()
  26. let newRevisionsInSecondaryCount = 0
  27. let updatedRevisionsInSecondary: string[] = []
  28. if (await this.hasAlreadyDataInSecondaryDatabase(userUuid)) {
  29. const { alreadyExistingInPrimary, newRevisionsInSecondary, updatedInSecondary } =
  30. await this.getNewRevisionsCreatedInSecondaryDatabase(userUuid)
  31. for (const existingRevisionUuid of alreadyExistingInPrimary) {
  32. this.logger.info(`[${dto.userUuid}] Removing revision ${existingRevisionUuid} from secondary database`)
  33. await (this.secondRevisionsRepository as RevisionRepositoryInterface).removeOneByUuid(
  34. Uuid.create(existingRevisionUuid).getValue(),
  35. userUuid,
  36. )
  37. }
  38. if (newRevisionsInSecondary.length > 0) {
  39. this.logger.info(
  40. `[${dto.userUuid}] Found ${newRevisionsInSecondary.length} new revisions in secondary database`,
  41. )
  42. }
  43. newRevisionsInSecondaryCount = newRevisionsInSecondary.length
  44. if (updatedInSecondary.length > 0) {
  45. this.logger.info(`[${dto.userUuid}] Found ${updatedInSecondary.length} updated revisions in secondary database`)
  46. }
  47. updatedRevisionsInSecondary = updatedInSecondary
  48. }
  49. const updatedRevisionsInSecondaryCount = updatedRevisionsInSecondary.length
  50. await this.allowForSecondaryDatabaseToCatchUp()
  51. const migrationTimeStart = this.timer.getTimestampInMicroseconds()
  52. this.logger.debug(`[${dto.userUuid}] Transitioning revisions`)
  53. const migrationResult = await this.migrateRevisionsForUser(userUuid, updatedRevisionsInSecondary)
  54. if (migrationResult.isFailed()) {
  55. if (newRevisionsInSecondaryCount === 0 && updatedRevisionsInSecondaryCount === 0) {
  56. const cleanupResult = await this.deleteRevisionsForUser(userUuid, this.secondRevisionsRepository)
  57. if (cleanupResult.isFailed()) {
  58. this.logger.error(
  59. `[${dto.userUuid}] Failed to clean up secondary database revisions: ${cleanupResult.getError()}`,
  60. )
  61. }
  62. }
  63. return Result.fail(migrationResult.getError())
  64. }
  65. await this.allowForSecondaryDatabaseToCatchUp()
  66. const integrityCheckResult = await this.checkIntegrityBetweenPrimaryAndSecondaryDatabase(
  67. userUuid,
  68. newRevisionsInSecondaryCount,
  69. updatedRevisionsInSecondary,
  70. )
  71. if (integrityCheckResult.isFailed()) {
  72. if (newRevisionsInSecondaryCount === 0 && updatedRevisionsInSecondaryCount === 0) {
  73. const cleanupResult = await this.deleteRevisionsForUser(userUuid, this.secondRevisionsRepository)
  74. if (cleanupResult.isFailed()) {
  75. this.logger.error(
  76. `[${dto.userUuid}] Failed to clean up secondary database revisions: ${cleanupResult.getError()}`,
  77. )
  78. }
  79. }
  80. return Result.fail(integrityCheckResult.getError())
  81. }
  82. const cleanupResult = await this.deleteRevisionsForUser(userUuid, this.primaryRevisionsRepository)
  83. if (cleanupResult.isFailed()) {
  84. this.logger.error(`[${dto.userUuid}] Failed to clean up primary database revisions: ${cleanupResult.getError()}`)
  85. }
  86. const migrationTimeEnd = this.timer.getTimestampInMicroseconds()
  87. const migrationDuration = migrationTimeEnd - migrationTimeStart
  88. const migrationDurationTimeStructure = this.timer.convertMicrosecondsToTimeStructure(migrationDuration)
  89. this.logger.info(
  90. `[${dto.userUuid}] Transitioned revisions in ${migrationDurationTimeStructure.hours}h ${migrationDurationTimeStructure.minutes}m ${migrationDurationTimeStructure.seconds}s ${migrationDurationTimeStructure.milliseconds}ms`,
  91. )
  92. return Result.ok()
  93. }
  94. private async migrateRevisionsForUser(userUuid: Uuid, updatedRevisionsInSecondary: string[]): Promise<Result<void>> {
  95. try {
  96. const totalRevisionsCountForUser = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
  97. let totalRevisionsCountTransitionedToSecondary = 0
  98. const totalPages = Math.ceil(totalRevisionsCountForUser / this.pageSize)
  99. for (let currentPage = 1; currentPage <= totalPages; currentPage++) {
  100. const query = {
  101. userUuid: userUuid,
  102. offset: (currentPage - 1) * this.pageSize,
  103. limit: this.pageSize,
  104. }
  105. const revisions = await this.primaryRevisionsRepository.findByUserUuid(query)
  106. for (const revision of revisions) {
  107. try {
  108. if (
  109. updatedRevisionsInSecondary.find((updatedRevisionUuid) => updatedRevisionUuid === revision.id.toString())
  110. ) {
  111. this.logger.info(
  112. `[${
  113. userUuid.value
  114. }] Skipping saving revision ${revision.id.toString()} as it was updated in secondary database`,
  115. )
  116. continue
  117. }
  118. this.logger.debug(
  119. `[${userUuid.value}]Transitioning revision #${
  120. totalRevisionsCountTransitionedToSecondary + 1
  121. }: ${revision.id.toString()} to secondary database`,
  122. )
  123. const didSave = await (this.secondRevisionsRepository as RevisionRepositoryInterface).insert(revision)
  124. if (!didSave) {
  125. return Result.fail(`Failed to save revision ${revision.id.toString()} to secondary database`)
  126. }
  127. totalRevisionsCountTransitionedToSecondary++
  128. } catch (error) {
  129. return Result.fail(
  130. `Errored when saving revision ${revision.id.toString()} to secondary database: ${
  131. (error as Error).message
  132. }`,
  133. )
  134. }
  135. }
  136. }
  137. this.logger.debug(
  138. `[${userUuid.value}] Transitioned ${totalRevisionsCountTransitionedToSecondary} revisions to secondary database`,
  139. )
  140. return Result.ok()
  141. } catch (error) {
  142. return Result.fail(`Errored when migrating revisions for user ${userUuid.value}: ${(error as Error).message}`)
  143. }
  144. }
  145. private async deleteRevisionsForUser(
  146. userUuid: Uuid,
  147. revisionRepository: RevisionRepositoryInterface,
  148. ): Promise<Result<void>> {
  149. try {
  150. await revisionRepository.removeByUserUuid(userUuid)
  151. return Result.ok()
  152. } catch (error) {
  153. return Result.fail(`Errored when deleting revisions for user ${userUuid.value}: ${(error as Error).message}`)
  154. }
  155. }
  156. private async allowForSecondaryDatabaseToCatchUp(): Promise<void> {
  157. const twoSecondsInMilliseconds = 2_000
  158. await this.timer.sleep(twoSecondsInMilliseconds)
  159. }
  160. private async hasAlreadyDataInSecondaryDatabase(userUuid: Uuid): Promise<boolean> {
  161. const totalRevisionsCountForUserInSecondary = await (
  162. this.secondRevisionsRepository as RevisionRepositoryInterface
  163. ).countByUserUuid(userUuid)
  164. const hasAlreadyDataInSecondaryDatabase = totalRevisionsCountForUserInSecondary > 0
  165. if (hasAlreadyDataInSecondaryDatabase) {
  166. this.logger.info(
  167. `[${userUuid.value}] User has already ${totalRevisionsCountForUserInSecondary} revisions in secondary database`,
  168. )
  169. }
  170. return hasAlreadyDataInSecondaryDatabase
  171. }
  172. private async getNewRevisionsCreatedInSecondaryDatabase(userUuid: Uuid): Promise<{
  173. alreadyExistingInPrimary: string[]
  174. newRevisionsInSecondary: string[]
  175. updatedInSecondary: string[]
  176. }> {
  177. const totalRevisionsCountForUser = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
  178. const totalPages = Math.ceil(totalRevisionsCountForUser / this.pageSize)
  179. const alreadyExistingInPrimary: string[] = []
  180. const newRevisionsInSecondary: string[] = []
  181. const updatedInSecondary: string[] = []
  182. for (let currentPage = 1; currentPage <= totalPages; currentPage++) {
  183. const query = {
  184. userUuid: userUuid,
  185. offset: (currentPage - 1) * this.pageSize,
  186. limit: this.pageSize,
  187. }
  188. const revisions = await (this.secondRevisionsRepository as RevisionRepositoryInterface).findByUserUuid(query)
  189. for (const revision of revisions) {
  190. const { revisionInPrimary, newerRevisionInSecondary } =
  191. await this.checkIfRevisionExistsInPrimaryDatabase(revision)
  192. if (revisionInPrimary !== null) {
  193. alreadyExistingInPrimary.push(revision.id.toString())
  194. continue
  195. }
  196. if (newerRevisionInSecondary !== null) {
  197. updatedInSecondary.push(newerRevisionInSecondary.id.toString())
  198. continue
  199. }
  200. if (revisionInPrimary === null && newerRevisionInSecondary === null) {
  201. newRevisionsInSecondary.push(revision.id.toString())
  202. continue
  203. }
  204. }
  205. }
  206. return {
  207. alreadyExistingInPrimary,
  208. newRevisionsInSecondary,
  209. updatedInSecondary,
  210. }
  211. }
  212. private async checkIfRevisionExistsInPrimaryDatabase(
  213. revision: Revision,
  214. ): Promise<{ revisionInPrimary: Revision | null; newerRevisionInSecondary: Revision | null }> {
  215. const revisionInPrimary = await this.primaryRevisionsRepository.findOneByUuid(
  216. Uuid.create(revision.id.toString()).getValue(),
  217. revision.props.userUuid as Uuid,
  218. [],
  219. )
  220. if (revisionInPrimary === null) {
  221. return {
  222. revisionInPrimary: null,
  223. newerRevisionInSecondary: null,
  224. }
  225. }
  226. if (!revision.isIdenticalTo(revisionInPrimary)) {
  227. this.logger.error(
  228. `[${revision.props.userUuid
  229. ?.value}] Revision ${revision.id.toString()} is not identical in primary and secondary database. Revision in secondary database: ${JSON.stringify(
  230. revision,
  231. )}, revision in primary database: ${JSON.stringify(revisionInPrimary)}`,
  232. )
  233. return {
  234. revisionInPrimary: null,
  235. newerRevisionInSecondary:
  236. revision.props.dates.updatedAt > revisionInPrimary.props.dates.updatedAt ? revision : null,
  237. }
  238. }
  239. return {
  240. revisionInPrimary: revisionInPrimary,
  241. newerRevisionInSecondary: null,
  242. }
  243. }
  244. private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(
  245. userUuid: Uuid,
  246. newRevisionsInSecondaryCount: number,
  247. updatedRevisionsInSecondary: string[],
  248. ): Promise<Result<boolean>> {
  249. try {
  250. const totalRevisionsCountForUserInPrimary = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
  251. const totalPages = Math.ceil(totalRevisionsCountForUserInPrimary / this.pageSize)
  252. for (let currentPage = 1; currentPage <= totalPages; currentPage++) {
  253. const query = {
  254. userUuid: userUuid,
  255. offset: (currentPage - 1) * this.pageSize,
  256. limit: this.pageSize,
  257. }
  258. const revisions = await this.primaryRevisionsRepository.findByUserUuid(query)
  259. for (const revision of revisions) {
  260. const revisionUuidOrError = Uuid.create(revision.id.toString())
  261. /* istanbul ignore if */
  262. if (revisionUuidOrError.isFailed()) {
  263. return Result.fail(revisionUuidOrError.getError())
  264. }
  265. const revisionUuid = revisionUuidOrError.getValue()
  266. const revisionInSecondary = await (
  267. this.secondRevisionsRepository as RevisionRepositoryInterface
  268. ).findOneByUuid(revisionUuid, userUuid, [])
  269. if (!revisionInSecondary) {
  270. return Result.fail(`Revision ${revision.id.toString()} not found in secondary database`)
  271. }
  272. if (
  273. updatedRevisionsInSecondary.find((updatedRevisionUuid) => updatedRevisionUuid === revision.id.toString())
  274. ) {
  275. this.logger.info(
  276. `[${
  277. userUuid.value
  278. }] Skipping integrity check for revision ${revision.id.toString()} as it was updated in secondary database`,
  279. )
  280. continue
  281. }
  282. if (!revision.isIdenticalTo(revisionInSecondary)) {
  283. return Result.fail(
  284. `Revision ${revision.id.toString()} is not identical in primary and secondary database. Revision in primary database: ${JSON.stringify(
  285. revision,
  286. )}, revision in secondary database: ${JSON.stringify(revisionInSecondary)}`,
  287. )
  288. }
  289. }
  290. }
  291. const totalRevisionsCountForUserInSecondary = await (
  292. this.secondRevisionsRepository as RevisionRepositoryInterface
  293. ).countByUserUuid(userUuid)
  294. if (
  295. totalRevisionsCountForUserInPrimary + newRevisionsInSecondaryCount !==
  296. totalRevisionsCountForUserInSecondary
  297. ) {
  298. return Result.fail(
  299. `Total revisions count for user ${userUuid.value} in primary database (${totalRevisionsCountForUserInPrimary} + ${newRevisionsInSecondaryCount}) does not match total revisions count in secondary database (${totalRevisionsCountForUserInSecondary})`,
  300. )
  301. }
  302. return Result.ok()
  303. } catch (error) {
  304. return Result.fail(
  305. `Errored when checking integrity between primary and secondary database: ${(error as Error).message}`,
  306. )
  307. }
  308. }
  309. }