revisions.ts 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import 'reflect-metadata'
  2. import 'newrelic'
  3. import { Logger } from 'winston'
  4. import { ContainerConfigLoader } from '../src/Bootstrap/Container'
  5. import TYPES from '../src/Bootstrap/Types'
  6. import { Env } from '../src/Bootstrap/Env'
  7. import { DomainEventFactoryInterface } from '../src/Domain/Event/DomainEventFactoryInterface'
  8. import { DomainEventPublisherInterface } from '@standardnotes/domain-events'
  9. import { ItemRepositoryInterface } from '../src/Domain/Item/ItemRepositoryInterface'
  10. import { Stream } from 'stream'
  11. const fixRevisionsOwnership = async (
  12. itemRepository: ItemRepositoryInterface,
  13. domainEventFactory: DomainEventFactoryInterface,
  14. domainEventPublisher: DomainEventPublisherInterface,
  15. logger: Logger,
  16. ): Promise<void> => {
  17. const stream = await itemRepository.streamAll({
  18. createdBefore: new Date('2022-11-23'),
  19. selectFields: ['user_uuid', 'uuid'],
  20. })
  21. return new Promise((resolve, reject) => {
  22. stream
  23. .pipe(
  24. new Stream.Transform({
  25. objectMode: true,
  26. transform: async (rawItemData, _encoding, callback) => {
  27. try {
  28. if (!rawItemData.item_user_uuid || !rawItemData.item_uuid) {
  29. logger.error('Could not process item %O', rawItemData)
  30. return callback()
  31. }
  32. await domainEventPublisher.publish(
  33. domainEventFactory.createRevisionsOwnershipUpdateRequestedEvent({
  34. userUuid: rawItemData.item_user_uuid,
  35. itemUuid: rawItemData.item_uuid,
  36. }),
  37. )
  38. } catch (error) {
  39. logger.error(`Could not process item ${rawItemData.item_uuid}: ${(error as Error).message}`)
  40. }
  41. callback()
  42. },
  43. }),
  44. )
  45. .on('finish', resolve)
  46. .on('error', reject)
  47. })
  48. }
  49. const container = new ContainerConfigLoader()
  50. void container.load().then((container) => {
  51. const env: Env = new Env()
  52. env.load()
  53. const logger: Logger = container.get(TYPES.Logger)
  54. logger.info('Starting revisions ownership fixing')
  55. const itemRepository: ItemRepositoryInterface = container.get(TYPES.ItemRepository)
  56. const domainEventFactory: DomainEventFactoryInterface = container.get(TYPES.DomainEventFactory)
  57. const domainEventPublisher: DomainEventPublisherInterface = container.get(TYPES.DomainEventPublisher)
  58. Promise.resolve(fixRevisionsOwnership(itemRepository, domainEventFactory, domainEventPublisher, logger))
  59. .then(() => {
  60. logger.info('revisions ownership fix complete.')
  61. process.exit(0)
  62. })
  63. .catch((error) => {
  64. logger.error(`Could not finish revisions ownership fix: ${error.message}`)
  65. process.exit(1)
  66. })
  67. })