Sfoglia il codice sorgente

feat: add sending notifications to user via websockets (#799)

* feat: add sending notifications to user via websockets

* fix: notification added for user event payload structure
Karol Sójko 1 anno fa
parent
commit
c0722b173b

+ 8 - 0
packages/domain-events/src/Domain/Event/NotificationAddedForUserEvent.ts

@@ -0,0 +1,8 @@
+import { DomainEventInterface } from './DomainEventInterface'
+
+import { NotificationAddedForUserEventPayload } from './NotificationAddedForUserEventPayload'
+
+export interface NotificationAddedForUserEvent extends DomainEventInterface {
+  type: 'NOTIFICATION_ADDED_FOR_USER'
+  payload: NotificationAddedForUserEventPayload
+}

+ 10 - 0
packages/domain-events/src/Domain/Event/NotificationAddedForUserEventPayload.ts

@@ -0,0 +1,10 @@
+export interface NotificationAddedForUserEventPayload {
+  notification: {
+    uuid: string
+    user_uuid: string
+    type: string
+    payload: string
+    created_at_timestamp: number
+    updated_at_timestamp: number
+  }
+}

+ 2 - 0
packages/domain-events/src/Domain/index.ts

@@ -42,6 +42,8 @@ export * from './Event/ListedAccountRequestedEvent'
 export * from './Event/ListedAccountRequestedEventPayload'
 export * from './Event/MuteEmailsSettingChangedEvent'
 export * from './Event/MuteEmailsSettingChangedEventPayload'
+export * from './Event/NotificationAddedForUserEvent'
+export * from './Event/NotificationAddedForUserEventPayload'
 export * from './Event/PaymentFailedEvent'
 export * from './Event/PaymentFailedEventPayload'
 export * from './Event/PaymentsAccountDeletedEvent'

+ 16 - 1
packages/syncing-server/src/Bootstrap/Container.ts

@@ -161,6 +161,7 @@ import { TriggerTransitionFromPrimaryToSecondaryDatabaseForUser } from '../Domai
 import { SQLItem } from '../Infra/TypeORM/SQLItem'
 import { SQLItemPersistenceMapper } from '../Mapping/Persistence/SQLItemPersistenceMapper'
 import { SQLItemRepository } from '../Infra/TypeORM/SQLItemRepository'
+import { SendEventToClient } from '../Domain/UseCase/Syncing/SendEventToClient/SendEventToClient'
 
 export class ContainerConfigLoader {
   private readonly DEFAULT_CONTENT_SIZE_TRANSFER_LIMIT = 10_000_000
@@ -580,10 +581,24 @@ export class ContainerConfigLoader {
           container.get(TYPES.Sync_DomainEventFactory),
         ),
       )
+    container
+      .bind<SendEventToClient>(TYPES.Sync_SendEventToClient)
+      .toConstantValue(
+        new SendEventToClient(
+          container.get<DomainEventFactoryInterface>(TYPES.Sync_DomainEventFactory),
+          container.get<DomainEventPublisherInterface>(TYPES.Sync_DomainEventPublisher),
+        ),
+      )
     container
       .bind<AddNotificationForUser>(TYPES.Sync_AddNotificationForUser)
       .toConstantValue(
-        new AddNotificationForUser(container.get(TYPES.Sync_NotificationRepository), container.get(TYPES.Sync_Timer)),
+        new AddNotificationForUser(
+          container.get<NotificationRepositoryInterface>(TYPES.Sync_NotificationRepository),
+          container.get<TimerInterface>(TYPES.Sync_Timer),
+          container.get<DomainEventFactoryInterface>(TYPES.Sync_DomainEventFactory),
+          container.get<SendEventToClient>(TYPES.Sync_SendEventToClient),
+          container.get<Logger>(TYPES.Sync_Logger),
+        ),
       )
     container
       .bind<AddNotificationsForUsers>(TYPES.Sync_AddNotificationsForUsers)

+ 1 - 0
packages/syncing-server/src/Bootstrap/Types.ts

@@ -87,6 +87,7 @@ const TYPES = {
   Sync_TriggerTransitionFromPrimaryToSecondaryDatabaseForUser: Symbol.for(
     'Sync_TriggerTransitionFromPrimaryToSecondaryDatabaseForUser',
   ),
+  Sync_SendEventToClient: Symbol.for('Sync_SendEventToClient'),
   // Handlers
   Sync_AccountDeletionRequestedEventHandler: Symbol.for('Sync_AccountDeletionRequestedEventHandler'),
   Sync_DuplicateItemSyncedEventHandler: Symbol.for('Sync_DuplicateItemSyncedEventHandler'),

+ 41 - 0
packages/syncing-server/src/Domain/Event/DomainEventFactory.ts

@@ -5,8 +5,10 @@ import {
   EmailRequestedEvent,
   ItemDumpedEvent,
   ItemRevisionCreationRequestedEvent,
+  NotificationAddedForUserEvent,
   RevisionsCopyRequestedEvent,
   TransitionStatusUpdatedEvent,
+  WebSocketMessageRequestedEvent,
 } from '@standardnotes/domain-events'
 import { TimerInterface } from '@standardnotes/time'
 import { DomainEventFactoryInterface } from './DomainEventFactoryInterface'
@@ -14,6 +16,45 @@ import { DomainEventFactoryInterface } from './DomainEventFactoryInterface'
 export class DomainEventFactory implements DomainEventFactoryInterface {
   constructor(private timer: TimerInterface) {}
 
+  createNotificationAddedForUserEvent(dto: {
+    notification: {
+      uuid: string
+      user_uuid: string
+      type: string
+      payload: string
+      created_at_timestamp: number
+      updated_at_timestamp: number
+    }
+  }): NotificationAddedForUserEvent {
+    return {
+      type: 'NOTIFICATION_ADDED_FOR_USER',
+      createdAt: this.timer.getUTCDate(),
+      meta: {
+        correlation: {
+          userIdentifier: dto.notification.user_uuid,
+          userIdentifierType: 'uuid',
+        },
+        origin: DomainEventService.SyncingServer,
+      },
+      payload: dto,
+    }
+  }
+
+  createWebSocketMessageRequestedEvent(dto: { userUuid: string; message: string }): WebSocketMessageRequestedEvent {
+    return {
+      type: 'WEB_SOCKET_MESSAGE_REQUESTED',
+      createdAt: this.timer.getUTCDate(),
+      meta: {
+        correlation: {
+          userIdentifier: dto.userUuid,
+          userIdentifierType: 'uuid',
+        },
+        origin: DomainEventService.SyncingServer,
+      },
+      payload: dto,
+    }
+  }
+
   createTransitionStatusUpdatedEvent(dto: {
     userUuid: string
     transitionType: 'items' | 'revisions'

+ 13 - 0
packages/syncing-server/src/Domain/Event/DomainEventFactoryInterface.ts

@@ -3,11 +3,24 @@ import {
   EmailRequestedEvent,
   ItemDumpedEvent,
   ItemRevisionCreationRequestedEvent,
+  NotificationAddedForUserEvent,
   RevisionsCopyRequestedEvent,
   TransitionStatusUpdatedEvent,
+  WebSocketMessageRequestedEvent,
 } from '@standardnotes/domain-events'
 
 export interface DomainEventFactoryInterface {
+  createWebSocketMessageRequestedEvent(dto: { userUuid: string; message: string }): WebSocketMessageRequestedEvent
+  createNotificationAddedForUserEvent(dto: {
+    notification: {
+      uuid: string
+      user_uuid: string
+      type: string
+      payload: string
+      created_at_timestamp: number
+      updated_at_timestamp: number
+    }
+  }): NotificationAddedForUserEvent
   createTransitionStatusUpdatedEvent(dto: {
     userUuid: string
     transitionType: 'items' | 'revisions'

+ 36 - 1
packages/syncing-server/src/Domain/UseCase/Messaging/AddNotificationForUser/AddNotificationForUser.spec.ts

@@ -4,13 +4,21 @@ import { NotificationPayload, NotificationType, Result, Uuid } from '@standardno
 import { NotificationRepositoryInterface } from '../../../Notifications/NotificationRepositoryInterface'
 import { Notification } from '../../../Notifications/Notification'
 import { AddNotificationForUser } from './AddNotificationForUser'
+import { DomainEventFactoryInterface } from '../../../Event/DomainEventFactoryInterface'
+import { SendEventToClient } from '../../Syncing/SendEventToClient/SendEventToClient'
+import { NotificationAddedForUserEvent } from '@standardnotes/domain-events'
+import { Logger } from 'winston'
 
 describe('AddNotificationForUser', () => {
   let notificationRepository: NotificationRepositoryInterface
   let timer: TimerInterface
   let payload: NotificationPayload
+  let domainEventFactory: DomainEventFactoryInterface
+  let sendEventToClientUseCase: SendEventToClient
+  let logger: Logger
 
-  const createUseCase = () => new AddNotificationForUser(notificationRepository, timer)
+  const createUseCase = () =>
+    new AddNotificationForUser(notificationRepository, timer, domainEventFactory, sendEventToClientUseCase, logger)
 
   beforeEach(() => {
     notificationRepository = {} as jest.Mocked<NotificationRepositoryInterface>
@@ -24,6 +32,17 @@ describe('AddNotificationForUser', () => {
       type: NotificationType.create(NotificationType.TYPES.RemovedFromSharedVault).getValue(),
       version: '1.0',
     }).getValue()
+
+    domainEventFactory = {} as jest.Mocked<DomainEventFactoryInterface>
+    domainEventFactory.createNotificationAddedForUserEvent = jest.fn().mockReturnValue({
+      type: 'NOTIFICATION_ADDED_FOR_USER',
+    } as jest.Mocked<NotificationAddedForUserEvent>)
+
+    sendEventToClientUseCase = {} as jest.Mocked<SendEventToClient>
+    sendEventToClientUseCase.execute = jest.fn().mockReturnValue(Result.ok())
+
+    logger = {} as jest.Mocked<Logger>
+    logger.error = jest.fn()
   })
 
   it('should save notification', async () => {
@@ -84,4 +103,20 @@ describe('AddNotificationForUser', () => {
 
     mock.mockRestore()
   })
+
+  it('should log error if event could not be sent to client', async () => {
+    sendEventToClientUseCase.execute = jest.fn().mockReturnValue(Result.fail('Oops'))
+
+    const useCase = createUseCase()
+
+    const result = await useCase.execute({
+      userUuid: '0e8c3c7e-3f1a-4f7a-9b5a-5b2b0a7d4b1e',
+      type: NotificationType.TYPES.RemovedFromSharedVault,
+      payload,
+      version: '1.0',
+    })
+
+    expect(result.isFailed()).toBeFalsy()
+    expect(logger.error).toHaveBeenCalled()
+  })
 })

+ 31 - 1
packages/syncing-server/src/Domain/UseCase/Messaging/AddNotificationForUser/AddNotificationForUser.ts

@@ -4,9 +4,18 @@ import { TimerInterface } from '@standardnotes/time'
 import { AddNotificationForUserDTO } from './AddNotificationForUserDTO'
 import { NotificationRepositoryInterface } from '../../../Notifications/NotificationRepositoryInterface'
 import { Notification } from '../../../Notifications/Notification'
+import { SendEventToClient } from '../../Syncing/SendEventToClient/SendEventToClient'
+import { DomainEventFactoryInterface } from '../../../Event/DomainEventFactoryInterface'
+import { Logger } from 'winston'
 
 export class AddNotificationForUser implements UseCaseInterface<Notification> {
-  constructor(private notificationRepository: NotificationRepositoryInterface, private timer: TimerInterface) {}
+  constructor(
+    private notificationRepository: NotificationRepositoryInterface,
+    private timer: TimerInterface,
+    private domainEventFactory: DomainEventFactoryInterface,
+    private sendEventToClientUseCase: SendEventToClient,
+    private logger: Logger,
+  ) {}
 
   async execute(dto: AddNotificationForUserDTO): Promise<Result<Notification>> {
     const userUuidOrError = Uuid.create(dto.userUuid)
@@ -37,6 +46,27 @@ export class AddNotificationForUser implements UseCaseInterface<Notification> {
 
     await this.notificationRepository.save(notification)
 
+    const event = this.domainEventFactory.createNotificationAddedForUserEvent({
+      notification: {
+        uuid: notification.id.toString(),
+        user_uuid: notification.props.userUuid.value,
+        type: notification.props.type.value,
+        payload: notification.props.payload.toString(),
+        created_at_timestamp: notification.props.timestamps.createdAt,
+        updated_at_timestamp: notification.props.timestamps.updatedAt,
+      },
+    })
+
+    const result = await this.sendEventToClientUseCase.execute({
+      userUuid: userUuid.value,
+      event,
+    })
+    if (result.isFailed()) {
+      this.logger.error(
+        `Failed to send notification added event to client for user ${userUuid.value}: ${result.getError()}`,
+      )
+    }
+
     return Result.ok(notification)
   }
 }

+ 56 - 0
packages/syncing-server/src/Domain/UseCase/Syncing/SendEventToClient/SendEventToClient.spec.ts

@@ -0,0 +1,56 @@
+import {
+  DomainEventInterface,
+  DomainEventPublisherInterface,
+  WebSocketMessageRequestedEvent,
+} from '@standardnotes/domain-events'
+import { DomainEventFactoryInterface } from '../../../Event/DomainEventFactoryInterface'
+import { SendEventToClient } from './SendEventToClient'
+
+describe('SendEventToClient', () => {
+  let domainEventFactory: DomainEventFactoryInterface
+  let domainEventPublisher: DomainEventPublisherInterface
+
+  const createUseCase = () => new SendEventToClient(domainEventFactory, domainEventPublisher)
+
+  beforeEach(() => {
+    domainEventFactory = {} as jest.Mocked<DomainEventFactoryInterface>
+    domainEventFactory.createWebSocketMessageRequestedEvent = jest
+      .fn()
+      .mockReturnValue({} as jest.Mocked<WebSocketMessageRequestedEvent>)
+
+    domainEventPublisher = {} as jest.Mocked<DomainEventPublisherInterface>
+    domainEventPublisher.publish = jest.fn()
+  })
+
+  it('should publish a WebSocketMessageRequestedEvent', async () => {
+    const useCase = createUseCase()
+
+    await useCase.execute({
+      userUuid: '00000000-0000-0000-0000-000000000000',
+      event: {
+        type: 'test',
+      } as jest.Mocked<DomainEventInterface>,
+    })
+
+    expect(domainEventFactory.createWebSocketMessageRequestedEvent).toHaveBeenCalledWith({
+      userUuid: '00000000-0000-0000-0000-000000000000',
+      message: JSON.stringify({
+        type: 'test',
+      }),
+    })
+    expect(domainEventPublisher.publish).toHaveBeenCalledWith({} as jest.Mocked<WebSocketMessageRequestedEvent>)
+  })
+
+  it('should return a failed result if user uuid is invalid', async () => {
+    const useCase = createUseCase()
+
+    const result = await useCase.execute({
+      userUuid: 'invalid',
+      event: {
+        type: 'test',
+      } as jest.Mocked<DomainEventInterface>,
+    })
+
+    expect(result.isFailed()).toBe(true)
+  })
+})

+ 29 - 0
packages/syncing-server/src/Domain/UseCase/Syncing/SendEventToClient/SendEventToClient.ts

@@ -0,0 +1,29 @@
+import { Result, UseCaseInterface, Uuid } from '@standardnotes/domain-core'
+import { DomainEventPublisherInterface } from '@standardnotes/domain-events'
+
+import { SendEventToClientDTO } from './SendEventToClientDTO'
+import { DomainEventFactoryInterface } from '../../../Event/DomainEventFactoryInterface'
+
+export class SendEventToClient implements UseCaseInterface<void> {
+  constructor(
+    private domainEventFactory: DomainEventFactoryInterface,
+    private domainEventPublisher: DomainEventPublisherInterface,
+  ) {}
+
+  async execute(dto: SendEventToClientDTO): Promise<Result<void>> {
+    const userUuidOrError = Uuid.create(dto.userUuid)
+    if (userUuidOrError.isFailed()) {
+      return Result.fail(userUuidOrError.getError())
+    }
+    const userUuid = userUuidOrError.getValue()
+
+    const event = this.domainEventFactory.createWebSocketMessageRequestedEvent({
+      userUuid: userUuid.value,
+      message: JSON.stringify(dto.event),
+    })
+
+    await this.domainEventPublisher.publish(event)
+
+    return Result.ok()
+  }
+}

+ 6 - 0
packages/syncing-server/src/Domain/UseCase/Syncing/SendEventToClient/SendEventToClientDTO.ts

@@ -0,0 +1,6 @@
+import { DomainEventInterface } from '@standardnotes/domain-events'
+
+export interface SendEventToClientDTO {
+  userUuid: string
+  event: DomainEventInterface
+}