Ver Fonte

fix: add paging memory to integrity check

Karol Sójko há 1 ano atrás
pai
commit
e4ca310707

+ 2 - 0
packages/revisions/src/Domain/Transition/TransitionRepositoryInterface.ts

@@ -1,4 +1,6 @@
 export interface TransitionRepositoryInterface {
   getPagingProgress(userUuid: string): Promise<number>
   setPagingProgress(userUuid: string, progress: number): Promise<void>
+  getIntegrityProgress(userUuid: string): Promise<number>
+  setIntegrityProgress(userUuid: string, progress: number): Promise<void>
 }

+ 33 - 20
packages/revisions/src/Domain/UseCase/Transition/TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser/TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser.ts

@@ -42,7 +42,6 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
     if (migrationResult.isFailed()) {
       return Result.fail(migrationResult.getError())
     }
-    const revisionsToSkipInIntegrityCheck = migrationResult.getValue()
 
     this.logger.info(`[${dto.userUuid}] Revisions migrated`)
 
@@ -50,11 +49,11 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
 
     this.logger.info(`[${dto.userUuid}] Checking integrity between primary and secondary database`)
 
-    const integrityCheckResult = await this.checkIntegrityBetweenPrimaryAndSecondaryDatabase(
-      userUuid,
-      revisionsToSkipInIntegrityCheck,
-    )
+    const integrityCheckResult = await this.checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid)
     if (integrityCheckResult.isFailed()) {
+      await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(userUuid.value, 1)
+      await (this.transitionStatusRepository as TransitionRepositoryInterface).setIntegrityProgress(userUuid.value, 1)
+
       return Result.fail(integrityCheckResult.getError())
     }
 
@@ -75,7 +74,7 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
     return Result.ok()
   }
 
-  private async migrateRevisionsForUser(userUuid: Uuid): Promise<Result<string[]>> {
+  private async migrateRevisionsForUser(userUuid: Uuid): Promise<Result<void>> {
     try {
       const initialPage = await (this.transitionStatusRepository as TransitionRepositoryInterface).getPagingProgress(
         userUuid.value,
@@ -85,7 +84,6 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
 
       const totalRevisionsCountForUser = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
       const totalPages = Math.ceil(totalRevisionsCountForUser / this.pageSize)
-      const revisionsToSkipInIntegrityCheck = []
       for (let currentPage = initialPage; currentPage <= totalPages; currentPage++) {
         await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(
           userUuid.value,
@@ -113,7 +111,6 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
                 this.logger.info(
                   `[${userUuid.value}] Revision ${revision.id.toString()} is older than revision in secondary database`,
                 )
-                revisionsToSkipInIntegrityCheck.push(revision.id.toString())
 
                 continue
               }
@@ -145,7 +142,7 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
         }
       }
 
-      return Result.ok(revisionsToSkipInIntegrityCheck)
+      return Result.ok()
     } catch (error) {
       return Result.fail(`Errored when migrating revisions for user ${userUuid.value}: ${(error as Error).message}`)
     }
@@ -171,11 +168,14 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
     await this.timer.sleep(twoSecondsInMilliseconds)
   }
 
-  private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(
-    userUuid: Uuid,
-    revisionsToSkipInIntegrityCheck: string[],
-  ): Promise<Result<boolean>> {
+  private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid: Uuid): Promise<Result<boolean>> {
     try {
+      const initialPage = await (this.transitionStatusRepository as TransitionRepositoryInterface).getIntegrityProgress(
+        userUuid.value,
+      )
+
+      this.logger.info(`[${userUuid.value}] Checking integrity from page ${initialPage}`)
+
       const totalRevisionsCountForUserInPrimary = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
       const totalRevisionsCountForUserInSecondary = await (
         this.secondRevisionsRepository as RevisionRepositoryInterface
@@ -188,7 +188,12 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
       }
 
       const totalPages = Math.ceil(totalRevisionsCountForUserInPrimary / this.pageSize)
-      for (let currentPage = 1; currentPage <= totalPages; currentPage++) {
+      for (let currentPage = initialPage; currentPage <= totalPages; currentPage++) {
+        await (this.transitionStatusRepository as TransitionRepositoryInterface).setIntegrityProgress(
+          userUuid.value,
+          currentPage,
+        )
+
         const query = {
           userUuid: userUuid,
           offset: (currentPage - 1) * this.pageSize,
@@ -212,17 +217,25 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
             return Result.fail(`Revision ${revision.id.toString()} not found in secondary database`)
           }
 
-          if (revisionsToSkipInIntegrityCheck.includes(revision.id.toString())) {
+          if (revision.isIdenticalTo(revisionInSecondary)) {
             continue
           }
 
-          if (!revision.isIdenticalTo(revisionInSecondary)) {
-            return Result.fail(
-              `Revision ${revision.id.toString()} is not identical in primary and secondary database. Revision in primary database: ${JSON.stringify(
-                revision,
-              )}, revision in secondary database: ${JSON.stringify(revisionInSecondary)}`,
+          if (revisionInSecondary.props.dates.updatedAt > revision.props.dates.updatedAt) {
+            this.logger.info(
+              `[${
+                userUuid.value
+              }] Integrity check of revision ${revision.id.toString()} - is older than revision in secondary database`,
             )
+
+            continue
           }
+
+          return Result.fail(
+            `Revision ${revision.id.toString()} is not identical in primary and secondary database. Revision in primary database: ${JSON.stringify(
+              revision,
+            )}, revision in secondary database: ${JSON.stringify(revisionInSecondary)}`,
+          )
         }
       }
 

+ 16 - 1
packages/revisions/src/Infra/Redis/RedisTransitionRepository.ts

@@ -3,10 +3,25 @@ import * as IORedis from 'ioredis'
 import { TransitionRepositoryInterface } from '../../Domain/Transition/TransitionRepositoryInterface'
 
 export class RedisTransitionRepository implements TransitionRepositoryInterface {
-  private readonly PREFIX = 'transition-revisions-paging-progress'
+  private readonly PREFIX = 'transition-revisions-migration-progress'
+  private readonly INTEGRITY_PREFIX = 'transition-revisions-integrity-progress'
 
   constructor(private redisClient: IORedis.Redis) {}
 
+  async getIntegrityProgress(userUuid: string): Promise<number> {
+    const progress = await this.redisClient.get(`${this.INTEGRITY_PREFIX}:${userUuid}`)
+
+    if (progress === null) {
+      return 1
+    }
+
+    return parseInt(progress)
+  }
+
+  async setIntegrityProgress(userUuid: string, progress: number): Promise<void> {
+    await this.redisClient.setex(`${this.INTEGRITY_PREFIX}:${userUuid}`, 172_800, progress.toString())
+  }
+
   async getPagingProgress(userUuid: string): Promise<number> {
     const progress = await this.redisClient.get(`${this.PREFIX}:${userUuid}`)
 

+ 1 - 1
packages/revisions/src/Infra/TypeORM/SQL/SQLLegacyRevisionRepository.ts

@@ -30,7 +30,7 @@ export class SQLLegacyRevisionRepository implements RevisionRepositoryInterface
     const queryBuilder = this.ormRepository
       .createQueryBuilder('revision')
       .where('revision.user_uuid = :userUuid', { userUuid: dto.userUuid.value })
-      .orderBy('revision.uuid', 'ASC')
+      .orderBy('revision.created_at', 'ASC')
 
     if (dto.offset !== undefined) {
       queryBuilder.skip(dto.offset)

+ 2 - 0
packages/syncing-server/src/Domain/Transition/TransitionRepositoryInterface.ts

@@ -1,4 +1,6 @@
 export interface TransitionRepositoryInterface {
   getPagingProgress(userUuid: string): Promise<number>
   setPagingProgress(userUuid: string, progress: number): Promise<void>
+  getIntegrityProgress(userUuid: string): Promise<number>
+  setIntegrityProgress(userUuid: string, progress: number): Promise<void>
 }

+ 35 - 24
packages/syncing-server/src/Domain/UseCase/Transition/TransitionItemsFromPrimaryToSecondaryDatabaseForUser/TransitionItemsFromPrimaryToSecondaryDatabaseForUser.ts

@@ -43,7 +43,6 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
     if (migrationResult.isFailed()) {
       return Result.fail(migrationResult.getError())
     }
-    const itemsToSkipInIntegrityCheck = migrationResult.getValue()
 
     this.logger.info(`[${dto.userUuid}] Items migrated`)
 
@@ -51,11 +50,11 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
 
     this.logger.info(`[${dto.userUuid}] Checking integrity between primary and secondary database`)
 
-    const integrityCheckResult = await this.checkIntegrityBetweenPrimaryAndSecondaryDatabase(
-      userUuid,
-      itemsToSkipInIntegrityCheck,
-    )
+    const integrityCheckResult = await this.checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid)
     if (integrityCheckResult.isFailed()) {
+      await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(userUuid.value, 1)
+      await (this.transitionStatusRepository as TransitionRepositoryInterface).setIntegrityProgress(userUuid.value, 1)
+
       return Result.fail(integrityCheckResult.getError())
     }
 
@@ -81,7 +80,7 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
     await this.timer.sleep(twoSecondsInMilliseconds)
   }
 
-  private async migrateItemsForUser(userUuid: Uuid): Promise<Result<string[]>> {
+  private async migrateItemsForUser(userUuid: Uuid): Promise<Result<void>> {
     try {
       const initialPage = await (this.transitionStatusRepository as TransitionRepositoryInterface).getPagingProgress(
         userUuid.value,
@@ -91,7 +90,6 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
 
       const totalItemsCountForUser = await this.primaryItemRepository.countAll({ userUuid: userUuid.value })
       const totalPages = Math.ceil(totalItemsCountForUser / this.pageSize)
-      const itemsToSkipInIntegrityCheck = []
       for (let currentPage = initialPage; currentPage <= totalPages; currentPage++) {
         await (this.transitionStatusRepository as TransitionRepositoryInterface).setPagingProgress(
           userUuid.value,
@@ -102,7 +100,7 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
           userUuid: userUuid.value,
           offset: (currentPage - 1) * this.pageSize,
           limit: this.pageSize,
-          sortBy: 'uuid',
+          sortBy: 'created_at_timestamp',
           sortOrder: 'ASC',
         }
 
@@ -120,7 +118,6 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
               }
               if (itemInSecondary.props.timestamps.updatedAt > item.props.timestamps.updatedAt) {
                 this.logger.info(`[${userUuid.value}] Item ${item.uuid.value} is older than item in secondary database`)
-                itemsToSkipInIntegrityCheck.push(item.uuid.value)
 
                 continue
               }
@@ -143,7 +140,7 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
         }
       }
 
-      return Result.ok(itemsToSkipInIntegrityCheck)
+      return Result.ok()
     } catch (error) {
       return Result.fail((error as Error).message)
     }
@@ -161,11 +158,14 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
     }
   }
 
-  private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(
-    userUuid: Uuid,
-    itemsToSkipInIntegrityCheck: string[],
-  ): Promise<Result<boolean>> {
+  private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(userUuid: Uuid): Promise<Result<boolean>> {
     try {
+      const initialPage = await (this.transitionStatusRepository as TransitionRepositoryInterface).getIntegrityProgress(
+        userUuid.value,
+      )
+
+      this.logger.info(`[${userUuid.value}] Checking integrity from page ${initialPage}`)
+
       const totalItemsCountForUserInPrimary = await this.primaryItemRepository.countAll({ userUuid: userUuid.value })
       const totalItemsCountForUserInSecondary = await (
         this.secondaryItemRepository as ItemRepositoryInterface
@@ -180,12 +180,17 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
       }
 
       const totalPages = Math.ceil(totalItemsCountForUserInPrimary / this.pageSize)
-      for (let currentPage = 1; currentPage <= totalPages; currentPage++) {
+      for (let currentPage = initialPage; currentPage <= totalPages; currentPage++) {
+        await (this.transitionStatusRepository as TransitionRepositoryInterface).setIntegrityProgress(
+          userUuid.value,
+          currentPage,
+        )
+
         const query: ItemQuery = {
           userUuid: userUuid.value,
           offset: (currentPage - 1) * this.pageSize,
           limit: this.pageSize,
-          sortBy: 'uuid',
+          sortBy: 'created_at_timestamp',
           sortOrder: 'ASC',
         }
 
@@ -197,19 +202,25 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
             return Result.fail(`Item ${item.uuid.value} not found in secondary database`)
           }
 
-          if (itemsToSkipInIntegrityCheck.includes(item.id.toString())) {
+          if (item.isIdenticalTo(itemInSecondary)) {
             continue
           }
 
-          if (!item.isIdenticalTo(itemInSecondary)) {
-            return Result.fail(
-              `Item ${
-                item.uuid.value
-              } is not identical in primary and secondary database. Item in primary database: ${JSON.stringify(
-                item,
-              )}, item in secondary database: ${JSON.stringify(itemInSecondary)}`,
+          if (itemInSecondary.props.timestamps.updatedAt > item.props.timestamps.updatedAt) {
+            this.logger.info(
+              `[${userUuid.value}] Integrity check of Item ${item.uuid.value} - is older than item in secondary database`,
             )
+
+            continue
           }
+
+          return Result.fail(
+            `Item ${
+              item.uuid.value
+            } is not identical in primary and secondary database. Item in primary database: ${JSON.stringify(
+              item,
+            )}, item in secondary database: ${JSON.stringify(itemInSecondary)}`,
+          )
         }
       }
 

+ 16 - 1
packages/syncing-server/src/Infra/Redis/RedisTransitionRepository.ts

@@ -3,10 +3,25 @@ import * as IORedis from 'ioredis'
 import { TransitionRepositoryInterface } from '../../Domain/Transition/TransitionRepositoryInterface'
 
 export class RedisTransitionRepository implements TransitionRepositoryInterface {
-  private readonly PREFIX = 'transition-items-paging-progress'
+  private readonly PREFIX = 'transition-items-migration-progress'
+  private readonly INTEGRITY_PREFIX = 'transition-items-integrity-progress'
 
   constructor(private redisClient: IORedis.Redis) {}
 
+  async getIntegrityProgress(userUuid: string): Promise<number> {
+    const progress = await this.redisClient.get(`${this.INTEGRITY_PREFIX}:${userUuid}`)
+
+    if (progress === null) {
+      return 1
+    }
+
+    return parseInt(progress)
+  }
+
+  async setIntegrityProgress(userUuid: string, progress: number): Promise<void> {
+    await this.redisClient.setex(`${this.INTEGRITY_PREFIX}:${userUuid}`, 172_800, progress.toString())
+  }
+
   async getPagingProgress(userUuid: string): Promise<number> {
     const progress = await this.redisClient.get(`${this.PREFIX}:${userUuid}`)