fix: include handling updated items in revisions in secondary
This commit is contained in:
parent
179d8eaaa1
commit
fbcb45c3a2
2 changed files with 111 additions and 34 deletions
|
@ -30,9 +30,12 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
|
|||
const userUuid = userUuidOrError.getValue()
|
||||
|
||||
let newRevisionsInSecondaryCount = 0
|
||||
let updatedRevisionsInSecondary: Revision[] = []
|
||||
if (await this.hasAlreadyDataInSecondaryDatabase(userUuid)) {
|
||||
const newRevisions = await this.getNewRevisionsCreatedInSecondaryDatabase(userUuid)
|
||||
for (const existingRevision of newRevisions.alreadyExistingInPrimary) {
|
||||
const { alreadyExistingInPrimary, newRevisionsInSecondary, updatedInSecondary } =
|
||||
await this.getNewRevisionsCreatedInSecondaryDatabase(userUuid)
|
||||
|
||||
for (const existingRevision of alreadyExistingInPrimary) {
|
||||
this.logger.info(`Removing revision ${existingRevision.id.toString()} from secondary database`)
|
||||
await (this.secondRevisionsRepository as RevisionRepositoryInterface).removeOneByUuid(
|
||||
Uuid.create(existingRevision.id.toString()).getValue(),
|
||||
|
@ -40,24 +43,34 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
|
|||
)
|
||||
}
|
||||
|
||||
if (newRevisions.newRevisionsInSecondary.length > 0) {
|
||||
if (newRevisionsInSecondary.length > 0) {
|
||||
this.logger.info(
|
||||
`Found ${newRevisions.newRevisionsInSecondary.length} new revisions in secondary database for user ${userUuid.value}`,
|
||||
`Found ${newRevisionsInSecondary.length} new revisions in secondary database for user ${userUuid.value}`,
|
||||
)
|
||||
}
|
||||
|
||||
newRevisionsInSecondaryCount = newRevisions.newRevisionsInSecondary.length
|
||||
newRevisionsInSecondaryCount = newRevisionsInSecondary.length
|
||||
|
||||
if (updatedInSecondary.length > 0) {
|
||||
this.logger.info(
|
||||
`Found ${updatedInSecondary.length} updated revisions in secondary database for user ${userUuid.value}`,
|
||||
)
|
||||
}
|
||||
|
||||
updatedRevisionsInSecondary = updatedInSecondary
|
||||
}
|
||||
|
||||
const updatedRevisionsInSecondaryCount = updatedRevisionsInSecondary.length
|
||||
|
||||
await this.allowForSecondaryDatabaseToCatchUp()
|
||||
|
||||
const migrationTimeStart = this.timer.getTimestampInMicroseconds()
|
||||
|
||||
this.logger.debug(`Transitioning revisions for user ${userUuid.value}`)
|
||||
|
||||
const migrationResult = await this.migrateRevisionsForUser(userUuid)
|
||||
const migrationResult = await this.migrateRevisionsForUser(userUuid, updatedRevisionsInSecondary)
|
||||
if (migrationResult.isFailed()) {
|
||||
if (newRevisionsInSecondaryCount === 0) {
|
||||
if (newRevisionsInSecondaryCount === 0 && updatedRevisionsInSecondaryCount === 0) {
|
||||
const cleanupResult = await this.deleteRevisionsForUser(userUuid, this.secondRevisionsRepository)
|
||||
if (cleanupResult.isFailed()) {
|
||||
this.logger.error(
|
||||
|
@ -76,7 +89,7 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
|
|||
newRevisionsInSecondaryCount,
|
||||
)
|
||||
if (integrityCheckResult.isFailed()) {
|
||||
if (newRevisionsInSecondaryCount === 0) {
|
||||
if (newRevisionsInSecondaryCount === 0 && updatedRevisionsInSecondaryCount === 0) {
|
||||
const cleanupResult = await this.deleteRevisionsForUser(userUuid, this.secondRevisionsRepository)
|
||||
if (cleanupResult.isFailed()) {
|
||||
this.logger.error(
|
||||
|
@ -107,7 +120,10 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
|
|||
return Result.ok()
|
||||
}
|
||||
|
||||
private async migrateRevisionsForUser(userUuid: Uuid): Promise<Result<void>> {
|
||||
private async migrateRevisionsForUser(
|
||||
userUuid: Uuid,
|
||||
updatedRevisionsInSecondary: Revision[],
|
||||
): Promise<Result<void>> {
|
||||
try {
|
||||
const totalRevisionsCountForUser = await this.primaryRevisionsRepository.countByUserUuid(userUuid)
|
||||
let totalRevisionsCountTransitionedToSecondary = 0
|
||||
|
@ -123,6 +139,14 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
|
|||
|
||||
for (const revision of revisions) {
|
||||
try {
|
||||
if (
|
||||
updatedRevisionsInSecondary.find(
|
||||
(updatedRevision) => updatedRevision.id.toString() === revision.id.toString(),
|
||||
)
|
||||
) {
|
||||
continue
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
`Transitioning revision #${
|
||||
totalRevisionsCountTransitionedToSecondary + 1
|
||||
|
@ -188,6 +212,7 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
|
|||
private async getNewRevisionsCreatedInSecondaryDatabase(userUuid: Uuid): Promise<{
|
||||
alreadyExistingInPrimary: Revision[]
|
||||
newRevisionsInSecondary: Revision[]
|
||||
updatedInSecondary: Revision[]
|
||||
}> {
|
||||
const revisions = await (this.secondRevisionsRepository as RevisionRepositoryInterface).findByUserUuid({
|
||||
userUuid: userUuid,
|
||||
|
@ -195,23 +220,35 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
|
|||
|
||||
const alreadyExistingInPrimary: Revision[] = []
|
||||
const newRevisionsInSecondary: Revision[] = []
|
||||
const updatedInSecondary: Revision[] = []
|
||||
|
||||
for (const revision of revisions) {
|
||||
const revisionExistsInPrimary = await this.checkIfRevisionExistsInPrimaryDatabase(revision)
|
||||
if (revisionExistsInPrimary) {
|
||||
const { revisionInPrimary, newerRevisionInSecondary } =
|
||||
await this.checkIfRevisionExistsInPrimaryDatabase(revision)
|
||||
if (revisionInPrimary !== null) {
|
||||
alreadyExistingInPrimary.push(revision)
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
if (newerRevisionInSecondary !== null) {
|
||||
updatedInSecondary.push(newerRevisionInSecondary)
|
||||
continue
|
||||
}
|
||||
if (revisionInPrimary === null && newerRevisionInSecondary === null) {
|
||||
newRevisionsInSecondary.push(revision)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
alreadyExistingInPrimary: alreadyExistingInPrimary,
|
||||
newRevisionsInSecondary: newRevisionsInSecondary,
|
||||
updatedInSecondary: updatedInSecondary,
|
||||
}
|
||||
}
|
||||
|
||||
private async checkIfRevisionExistsInPrimaryDatabase(revision: Revision): Promise<boolean> {
|
||||
private async checkIfRevisionExistsInPrimaryDatabase(
|
||||
revision: Revision,
|
||||
): Promise<{ revisionInPrimary: Revision | null; newerRevisionInSecondary: Revision | null }> {
|
||||
const revisionInPrimary = await this.primaryRevisionsRepository.findOneByUuid(
|
||||
Uuid.create(revision.id.toString()).getValue(),
|
||||
revision.props.userUuid as Uuid,
|
||||
|
@ -219,7 +256,10 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
|
|||
)
|
||||
|
||||
if (revisionInPrimary === null) {
|
||||
return false
|
||||
return {
|
||||
revisionInPrimary: null,
|
||||
newerRevisionInSecondary: null,
|
||||
}
|
||||
}
|
||||
|
||||
if (!revision.isIdenticalTo(revisionInPrimary)) {
|
||||
|
@ -229,10 +269,17 @@ export class TransitionRevisionsFromPrimaryToSecondaryDatabaseForUser implements
|
|||
)}, revision in primary database: ${JSON.stringify(revisionInPrimary)}`,
|
||||
)
|
||||
|
||||
return false
|
||||
return {
|
||||
revisionInPrimary: null,
|
||||
newerRevisionInSecondary:
|
||||
revision.props.dates.updatedAt > revisionInPrimary.props.dates.updatedAt ? revision : null,
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
return {
|
||||
revisionInPrimary: revisionInPrimary,
|
||||
newerRevisionInSecondary: null,
|
||||
}
|
||||
}
|
||||
|
||||
private async checkIntegrityBetweenPrimaryAndSecondaryDatabase(
|
||||
|
|
|
@ -31,29 +31,41 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
|
|||
const userUuid = userUuidOrError.getValue()
|
||||
|
||||
let newItemsInSecondaryCount = 0
|
||||
let updatedItemsInSecondary: Item[] = []
|
||||
if (await this.hasAlreadyDataInSecondaryDatabase(userUuid)) {
|
||||
const newItems = await this.getNewItemsCreatedInSecondaryDatabase(userUuid)
|
||||
for (const existingItem of newItems.alreadyExistingInPrimary) {
|
||||
const { alreadyExistingInPrimary, newItemsInSecondary, updatedInSecondary } =
|
||||
await this.getNewItemsCreatedInSecondaryDatabase(userUuid)
|
||||
|
||||
for (const existingItem of alreadyExistingInPrimary) {
|
||||
this.logger.info(`Removing item ${existingItem.uuid.value} from secondary database`)
|
||||
await (this.secondaryItemRepository as ItemRepositoryInterface).remove(existingItem)
|
||||
}
|
||||
|
||||
if (newItems.newItemsInSecondary.length > 0) {
|
||||
if (newItemsInSecondary.length > 0) {
|
||||
this.logger.info(
|
||||
`Found ${newItems.newItemsInSecondary.length} new items in secondary database for user ${userUuid.value}`,
|
||||
`Found ${newItemsInSecondary.length} new items in secondary database for user ${userUuid.value}`,
|
||||
)
|
||||
}
|
||||
|
||||
newItemsInSecondaryCount = newItems.newItemsInSecondary.length
|
||||
newItemsInSecondaryCount = newItemsInSecondary.length
|
||||
|
||||
if (updatedInSecondary.length > 0) {
|
||||
this.logger.info(
|
||||
`Found ${updatedInSecondary.length} updated items in secondary database for user ${userUuid.value}`,
|
||||
)
|
||||
}
|
||||
|
||||
updatedItemsInSecondary = updatedInSecondary
|
||||
}
|
||||
const updatedItemsInSecondaryCount = updatedItemsInSecondary.length
|
||||
|
||||
await this.allowForSecondaryDatabaseToCatchUp()
|
||||
|
||||
const migrationTimeStart = this.timer.getTimestampInMicroseconds()
|
||||
|
||||
const migrationResult = await this.migrateItemsForUser(userUuid)
|
||||
const migrationResult = await this.migrateItemsForUser(userUuid, updatedItemsInSecondary)
|
||||
if (migrationResult.isFailed()) {
|
||||
if (newItemsInSecondaryCount === 0) {
|
||||
if (newItemsInSecondaryCount === 0 && updatedItemsInSecondaryCount === 0) {
|
||||
const cleanupResult = await this.deleteItemsForUser(userUuid, this.secondaryItemRepository)
|
||||
if (cleanupResult.isFailed()) {
|
||||
this.logger.error(
|
||||
|
@ -72,7 +84,7 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
|
|||
newItemsInSecondaryCount,
|
||||
)
|
||||
if (integrityCheckResult.isFailed()) {
|
||||
if (newItemsInSecondaryCount === 0) {
|
||||
if (newItemsInSecondaryCount === 0 && updatedItemsInSecondaryCount === 0) {
|
||||
const cleanupResult = await this.deleteItemsForUser(userUuid, this.secondaryItemRepository)
|
||||
if (cleanupResult.isFailed()) {
|
||||
this.logger.error(
|
||||
|
@ -124,34 +136,46 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
|
|||
private async getNewItemsCreatedInSecondaryDatabase(userUuid: Uuid): Promise<{
|
||||
alreadyExistingInPrimary: Item[]
|
||||
newItemsInSecondary: Item[]
|
||||
updatedInSecondary: Item[]
|
||||
}> {
|
||||
const items = await (this.secondaryItemRepository as ItemRepositoryInterface).findAll({
|
||||
userUuid: userUuid.value,
|
||||
})
|
||||
|
||||
const alreadyExistingInPrimary: Item[] = []
|
||||
const updatedInSecondary: Item[] = []
|
||||
const newItemsInSecondary: Item[] = []
|
||||
|
||||
for (const item of items) {
|
||||
const itemExistsInPrimary = await this.checkIfItemExistsInPrimaryDatabase(item)
|
||||
if (itemExistsInPrimary) {
|
||||
const { itemInPrimary, newerItemInSecondary } = await this.checkIfItemExistsInPrimaryDatabase(item)
|
||||
if (itemInPrimary !== null) {
|
||||
alreadyExistingInPrimary.push(item)
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
if (newerItemInSecondary !== null) {
|
||||
updatedInSecondary.push(newerItemInSecondary)
|
||||
continue
|
||||
}
|
||||
if (itemInPrimary === null && newerItemInSecondary === null) {
|
||||
newItemsInSecondary.push(item)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
alreadyExistingInPrimary: alreadyExistingInPrimary,
|
||||
newItemsInSecondary: newItemsInSecondary,
|
||||
alreadyExistingInPrimary,
|
||||
newItemsInSecondary,
|
||||
updatedInSecondary,
|
||||
}
|
||||
}
|
||||
|
||||
private async checkIfItemExistsInPrimaryDatabase(item: Item): Promise<boolean> {
|
||||
private async checkIfItemExistsInPrimaryDatabase(
|
||||
item: Item,
|
||||
): Promise<{ itemInPrimary: Item | null; newerItemInSecondary: Item | null }> {
|
||||
const itemInPrimary = await this.primaryItemRepository.findByUuid(item.uuid)
|
||||
|
||||
if (itemInPrimary === null) {
|
||||
return false
|
||||
return { itemInPrimary: null, newerItemInSecondary: null }
|
||||
}
|
||||
|
||||
if (!item.isIdenticalTo(itemInPrimary)) {
|
||||
|
@ -161,13 +185,16 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
|
|||
)}, revision in primary database: ${JSON.stringify(itemInPrimary)}`,
|
||||
)
|
||||
|
||||
return false
|
||||
return {
|
||||
itemInPrimary: null,
|
||||
newerItemInSecondary: item.props.timestamps.updatedAt > itemInPrimary.props.timestamps.updatedAt ? item : null,
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
return { itemInPrimary: itemInPrimary, newerItemInSecondary: null }
|
||||
}
|
||||
|
||||
private async migrateItemsForUser(userUuid: Uuid): Promise<Result<void>> {
|
||||
private async migrateItemsForUser(userUuid: Uuid, updatedItemsInSecondary: Item[]): Promise<Result<void>> {
|
||||
try {
|
||||
const totalItemsCountForUser = await this.primaryItemRepository.countAll({ userUuid: userUuid.value })
|
||||
const totalPages = Math.ceil(totalItemsCountForUser / this.pageSize)
|
||||
|
@ -181,6 +208,9 @@ export class TransitionItemsFromPrimaryToSecondaryDatabaseForUser implements Use
|
|||
const items = await this.primaryItemRepository.findAll(query)
|
||||
|
||||
for (const item of items) {
|
||||
if (updatedItemsInSecondary.find((updatedItem) => updatedItem.uuid.equals(item.uuid))) {
|
||||
continue
|
||||
}
|
||||
await (this.secondaryItemRepository as ItemRepositoryInterface).save(item)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue