Browse Source

fix(syncing-server): paging through already existing items

Karol Sójko 1 năm trước cách đây
mục cha
commit
e4fcd738c3

+ 43 - 42
packages/revisions/src/Domain/UseCase/Transition/TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser/TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser.ts

@@ -30,15 +30,15 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
     const userUuid = userUuidOrError.getValue()
 
     let newRevisionsInSecondaryCount = 0
-    let updatedRevisionsInSecondary: Revision[] = []
+    let updatedRevisionsInSecondary: string[] = []
     if (await this.hasAlreadyDataInSecondaryDatabase(userUuid)) {
       const { alreadyExistingInPrimary, newRevisionsInSecondary, updatedInSecondary } =
         await this.getNewRevisionsCreatedInSecondaryDatabase(userUuid)
 
-      for (const existingRevision of alreadyExistingInPrimary) {
-        this.logger.info(`Removing revision ${existingRevision.id.toString()} from secondary database`)
+      for (const existingRevisionUuid of alreadyExistingInPrimary) {
+        this.logger.info(`Removing revision ${existingRevisionUuid} from secondary database`)
         await (this.secondRevisionsRepository as RevisionRepositoryInterface).removeOneByUuid(
-          Uuid.create(existingRevision.id.toString()).getValue(),
+          Uuid.create(existingRevisionUuid).getValue(),
           userUuid,
         )
       }
@@ -121,10 +121,7 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
     return Result.ok()
   }
 
-  private async migrateRevisionsForUser(
-    userUuid: Uuid,
-    updatedRevisionsInSecondary: Revision[],
-  ): Promise<Result<void>> {
+  private async migrateRevisionsForUser(userUuid: Uuid, updatedRevisionsInSecondary: string[]): Promise<Result<void>> {
     try {
       const totalRevisionsCountForUser = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
       let totalRevisionsCountTransitionedToSecondary = 0
@@ -141,9 +138,7 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
         for (const revision of revisions) {
           try {
             if (
-              updatedRevisionsInSecondary.find(
-                (updatedRevision) => updatedRevision.id.toString() === revision.id.toString(),
-              )
+              updatedRevisionsInSecondary.find((updatedRevisionUuid) => updatedRevisionUuid === revision.id.toString())
             ) {
               this.logger.info(
                 `Skipping saving revision ${revision.id.toString()} as it was updated in secondary database`,
@@ -215,39 +210,47 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
   }
 
   private async getNewRevisionsCreatedInSecondaryDatabase(userUuid: Uuid): Promise<{
-    alreadyExistingInPrimary: Revision[]
-    newRevisionsInSecondary: Revision[]
-    updatedInSecondary: Revision[]
+    alreadyExistingInPrimary: string[]
+    newRevisionsInSecondary: string[]
+    updatedInSecondary: string[]
   }> {
-    const revisions = await (this.secondRevisionsRepository as RevisionRepositoryInterface).findByUserUuid({
-      userUuid: userUuid,
-    })
-
-    const alreadyExistingInPrimary: Revision[] = []
-    const newRevisionsInSecondary: Revision[] = []
-    const updatedInSecondary: Revision[] = []
-
-    for (const revision of revisions) {
-      const { revisionInPrimary, newerRevisionInSecondary } =
-        await this.checkIfRevisionExistsInPrimaryDatabase(revision)
-      if (revisionInPrimary !== null) {
-        alreadyExistingInPrimary.push(revision)
-        continue
+    const totalRevisionsCountForUser = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
+    const totalPages = Math.ceil(totalRevisionsCountForUser / this.pageSize)
+
+    const alreadyExistingInPrimary: string[] = []
+    const newRevisionsInSecondary: string[] = []
+    const updatedInSecondary: string[] = []
+
+    for (let currentPage = 1; currentPage <= totalPages; currentPage++) {
+      const query = {
+        userUuid: userUuid,
+        offset: (currentPage - 1) * this.pageSize,
+        limit: this.pageSize,
       }
-      if (newerRevisionInSecondary !== null) {
-        updatedInSecondary.push(newerRevisionInSecondary)
-        continue
-      }
-      if (revisionInPrimary === null && newerRevisionInSecondary === null) {
-        newRevisionsInSecondary.push(revision)
-        continue
+
+      const revisions = await (this.secondRevisionsRepository as RevisionRepositoryInterface).findByUserUuid(query)
+      for (const revision of revisions) {
+        const { revisionInPrimary, newerRevisionInSecondary } =
+          await this.checkIfRevisionExistsInPrimaryDatabase(revision)
+        if (revisionInPrimary !== null) {
+          alreadyExistingInPrimary.push(revision.id.toString())
+          continue
+        }
+        if (newerRevisionInSecondary !== null) {
+          updatedInSecondary.push(newerRevisionInSecondary.id.toString())
+          continue
+        }
+        if (revisionInPrimary === null && newerRevisionInSecondary === null) {
+          newRevisionsInSecondary.push(revision.id.toString())
+          continue
+        }
       }
     }
 
     return {
-      alreadyExistingInPrimary: alreadyExistingInPrimary,
-      newRevisionsInSecondary: newRevisionsInSecondary,
-      updatedInSecondary: updatedInSecondary,
+      alreadyExistingInPrimary,
+      newRevisionsInSecondary,
+      updatedInSecondary,
     }
   }
 
@@ -290,7 +293,7 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
   private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(
     userUuid: Uuid,
     newRevisionsInSecondaryCount: number,
-    updatedRevisionsInSecondary: Revision[],
+    updatedRevisionsInSecondary: string[],
   ): Promise<Result<boolean>> {
     try {
       const totalRevisionsCountForUserInPrimary = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
@@ -321,9 +324,7 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
           }
 
           if (
-            updatedRevisionsInSecondary.find(
-              (updatedRevision) => updatedRevision.id.toString() === revision.id.toString(),
-            )
+            updatedRevisionsInSecondary.find((updatedRevisionUuid) => updatedRevisionUuid === revision.id.toString())
           ) {
             this.logger.info(
               `Skipping integrity check for revision ${revision.id.toString()} as it was updated in secondary database`,

+ 1 - 0
packages/syncing-server/src/Domain/Item/ItemRepositoryInterface.ts

@@ -15,6 +15,7 @@ export interface ItemRepositoryInterface {
   findByUuidAndUserUuid(uuid: string, userUuid: string): Promise<Item | null>
   findByUuid(uuid: Uuid): Promise<Item | null>
   remove(item: Item): Promise<void>
+  removeByUuid(uuid: Uuid): Promise<void>
   save(item: Item): Promise<void>
   markItemsAsDeleted(itemUuids: Array<string>, updatedAtTimestamp: number): Promise<void>
   updateContentSize(itemUuid: string, contentSize: number): Promise<void>

+ 46 - 31
packages/syncing-server/src/Domain/UseCase/Transition/TransitionItemsFromPrimaryToSecondaryDatabaseForUser/TransitionItemsFromPrimaryToSecondaryDatabaseForUser.ts

@@ -31,14 +31,16 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
     const userUuid = userUuidOrError.getValue()
 
     let newItemsInSecondaryCount = 0
-    let updatedItemsInSecondary: Item[] = []
+    let updatedItemsInSecondary: string[] = []
     if (await this.hasAlreadyDataInSecondaryDatabase(userUuid)) {
       const { alreadyExistingInPrimary, newItemsInSecondary, updatedInSecondary } =
         await this.getNewItemsCreatedInSecondaryDatabase(userUuid)
 
-      for (const existingItem of alreadyExistingInPrimary) {
-        this.logger.info(`Removing item ${existingItem.uuid.value} from secondary database`)
-        await (this.secondaryItemRepository as ItemRepositoryInterface).remove(existingItem)
+      for (const existingItemUuid of alreadyExistingInPrimary) {
+        this.logger.info(`Removing item ${existingItemUuid} from secondary database`)
+        await (this.secondaryItemRepository as ItemRepositoryInterface).removeByUuid(
+          Uuid.create(existingItemUuid).getValue(),
+        )
       }
 
       if (newItemsInSecondary.length > 0) {
@@ -135,31 +137,40 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
   }
 
   private async getNewItemsCreatedInSecondaryDatabase(userUuid: Uuid): Promise<{
-    alreadyExistingInPrimary: Item[]
-    newItemsInSecondary: Item[]
-    updatedInSecondary: Item[]
+    alreadyExistingInPrimary: string[]
+    newItemsInSecondary: string[]
+    updatedInSecondary: string[]
   }> {
-    const items = await (this.secondaryItemRepository as ItemRepositoryInterface).findAll({
-      userUuid: userUuid.value,
-    })
-
-    const alreadyExistingInPrimary: Item[] = []
-    const updatedInSecondary: Item[] = []
-    const newItemsInSecondary: Item[] = []
-
-    for (const item of items) {
-      const { itemInPrimary, newerItemInSecondary } = await this.checkIfItemExistsInPrimaryDatabase(item)
-      if (itemInPrimary !== null) {
-        alreadyExistingInPrimary.push(item)
-        continue
-      }
-      if (newerItemInSecondary !== null) {
-        updatedInSecondary.push(newerItemInSecondary)
-        continue
+    const alreadyExistingInPrimary: string[] = []
+    const updatedInSecondary: string[] = []
+    const newItemsInSecondary: string[] = []
+
+    const totalItemsCountForUser = await this.primaryItemRepository.countAll({ userUuid: userUuid.value })
+    const totalPages = Math.ceil(totalItemsCountForUser / this.pageSize)
+    for (let currentPage = 1; currentPage <= totalPages; currentPage++) {
+      const query: ItemQuery = {
+        userUuid: userUuid.value,
+        offset: (currentPage - 1) * this.pageSize,
+        limit: this.pageSize,
+        sortOrder: 'ASC',
+        sortBy: 'uuid',
       }
-      if (itemInPrimary === null && newerItemInSecondary === null) {
-        newItemsInSecondary.push(item)
-        continue
+
+      const items = await (this.secondaryItemRepository as ItemRepositoryInterface).findAll(query)
+      for (const item of items) {
+        const { itemInPrimary, newerItemInSecondary } = await this.checkIfItemExistsInPrimaryDatabase(item)
+        if (itemInPrimary !== null) {
+          alreadyExistingInPrimary.push(item.id.toString())
+          continue
+        }
+        if (newerItemInSecondary !== null) {
+          updatedInSecondary.push(newerItemInSecondary.id.toString())
+          continue
+        }
+        if (itemInPrimary === null && newerItemInSecondary === null) {
+          newItemsInSecondary.push(item.id.toString())
+          continue
+        }
       }
     }
 
@@ -195,7 +206,7 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
     return { itemInPrimary: itemInPrimary, newerItemInSecondary: null }
   }
 
-  private async migrateItemsForUser(userUuid: Uuid, updatedItemsInSecondary: Item[]): Promise<Result<void>> {
+  private async migrateItemsForUser(userUuid: Uuid, updatedItemsInSecondary: string[]): Promise<Result<void>> {
     try {
       const totalItemsCountForUser = await this.primaryItemRepository.countAll({ userUuid: userUuid.value })
       const totalPages = Math.ceil(totalItemsCountForUser / this.pageSize)
@@ -204,12 +215,14 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
           userUuid: userUuid.value,
           offset: (currentPage - 1) * this.pageSize,
           limit: this.pageSize,
+          sortBy: 'uuid',
+          sortOrder: 'ASC',
         }
 
         const items = await this.primaryItemRepository.findAll(query)
 
         for (const item of items) {
-          if (updatedItemsInSecondary.find((updatedItem) => updatedItem.uuid.equals(item.uuid))) {
+          if (updatedItemsInSecondary.find((updatedItemUuid) => item.uuid.value === updatedItemUuid)) {
             this.logger.info(`Skipping saving item ${item.uuid.value} as it was updated in secondary database`)
 
             continue
@@ -237,7 +250,7 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
   private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(
     userUuid: Uuid,
     newItemsInSecondaryCount: number,
-    updatedItemsInSecondary: Item[],
+    updatedItemsInSecondary: string[],
   ): Promise<Result<boolean>> {
     try {
       const totalItemsCountForUserInPrimary = await this.primaryItemRepository.countAll({ userUuid: userUuid.value })
@@ -259,6 +272,8 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
           userUuid: userUuid.value,
           offset: (currentPage - 1) * this.pageSize,
           limit: this.pageSize,
+          sortBy: 'uuid',
+          sortOrder: 'ASC',
         }
 
         const items = await this.primaryItemRepository.findAll(query)
@@ -269,7 +284,7 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
             return Result.fail(`Item ${item.uuid.value} not found in secondary database`)
           }
 
-          if (updatedItemsInSecondary.find((updatedItem) => updatedItem.uuid.equals(item.uuid))) {
+          if (updatedItemsInSecondary.find((updatedItemUuid) => item.uuid.value === updatedItemUuid)) {
             this.logger.info(
               `Skipping integrity check for item ${item.uuid.value} as it was updated in secondary database`,
             )

+ 4 - 0
packages/syncing-server/src/Infra/TypeORM/MongoDBItemRepository.ts

@@ -17,6 +17,10 @@ export class MongoDBItemRepository implements ItemRepositoryInterface {
     private logger: Logger,
   ) {}
 
+  async removeByUuid(uuid: Uuid): Promise<void> {
+    await this.mongoRepository.deleteOne({ _id: { $eq: BSON.UUID.createFromHexString(uuid.value) } })
+  }
+
   async deleteByUserUuid(userUuid: string): Promise<void> {
     await this.mongoRepository.deleteMany({ userUuid })
   }

+ 9 - 0
packages/syncing-server/src/Infra/TypeORM/SQLLegacyItemRepository.ts

@@ -16,6 +16,15 @@ export class SQLLegacyItemRepository implements ItemRepositoryInterface {
     protected logger: Logger,
   ) {}
 
+  async removeByUuid(uuid: Uuid): Promise<void> {
+    await this.ormRepository
+      .createQueryBuilder('item')
+      .delete()
+      .from('items')
+      .where('uuid = :uuid', { uuid: uuid.value })
+      .execute()
+  }
+
   async save(item: Item): Promise<void> {
     const persistence = this.mapper.toProjection(item)