Browse Source

feat: add wrapping sqs receive message with open telemetry

Karol Sójko 1 year ago
parent
commit
aba4f90485

+ 7 - 7
.pnp.cjs

@@ -5627,7 +5627,7 @@ const RAW_RUNTIME_STATE =
           ["opentelemetry-instrumentation-typeorm", "virtual:685a6222c3349423674bb7f0684ba34e2ab20912010f352e04dcf707a156e13183fc382e2417cb37a60f3e7b52fd0178c53181674890e1773eb83e190dc13378#npm:0.39.1"],\
           ["prettier", "npm:3.0.3"],\
           ["reflect-metadata", "npm:0.1.13"],\
-          ["sqs-consumer", "virtual:685a6222c3349423674bb7f0684ba34e2ab20912010f352e04dcf707a156e13183fc382e2417cb37a60f3e7b52fd0178c53181674890e1773eb83e190dc13378#npm:7.3.0"],\
+          ["sqs-consumer", "virtual:685a6222c3349423674bb7f0684ba34e2ab20912010f352e04dcf707a156e13183fc382e2417cb37a60f3e7b52fd0178c53181674890e1773eb83e190dc13378#npm:7.4.0-canary.0"],\
           ["ts-jest", "virtual:fd909b174d079e30b336c4ce72c38a88c1e447767b1a8dd7655e07719a1e31b97807f0931368724fc78897ff15e6a6d00b83316c0f76d11f85111f342e08bb79#npm:29.1.0"],\
           ["typescript", "patch:typescript@npm%3A5.0.4#optional!builtin<compat/typescript>::version=5.0.4&hash=b5f058"],\
           ["winston", "npm:3.9.0"]\
@@ -15107,17 +15107,17 @@ const RAW_RUNTIME_STATE =
       }]\
     ]],\
     ["sqs-consumer", [\
-      ["npm:7.3.0", {\
-        "packageLocation": "./.yarn/cache/sqs-consumer-npm-7.3.0-a47c08ef71-367ea2a6f3.zip/node_modules/sqs-consumer/",\
+      ["npm:7.4.0-canary.0", {\
+        "packageLocation": "./.yarn/cache/sqs-consumer-npm-7.4.0-canary.0-2eeb36c4ab-7ecff41c97.zip/node_modules/sqs-consumer/",\
         "packageDependencies": [\
-          ["sqs-consumer", "npm:7.3.0"]\
+          ["sqs-consumer", "npm:7.4.0-canary.0"]\
         ],\
         "linkType": "SOFT"\
       }],\
-      ["virtual:685a6222c3349423674bb7f0684ba34e2ab20912010f352e04dcf707a156e13183fc382e2417cb37a60f3e7b52fd0178c53181674890e1773eb83e190dc13378#npm:7.3.0", {\
-        "packageLocation": "./.yarn/__virtual__/sqs-consumer-virtual-bf07118bf0/0/cache/sqs-consumer-npm-7.3.0-a47c08ef71-367ea2a6f3.zip/node_modules/sqs-consumer/",\
+      ["virtual:685a6222c3349423674bb7f0684ba34e2ab20912010f352e04dcf707a156e13183fc382e2417cb37a60f3e7b52fd0178c53181674890e1773eb83e190dc13378#npm:7.4.0-canary.0", {\
+        "packageLocation": "./.yarn/__virtual__/sqs-consumer-virtual-6da87e1296/0/cache/sqs-consumer-npm-7.4.0-canary.0-2eeb36c4ab-7ecff41c97.zip/node_modules/sqs-consumer/",\
         "packageDependencies": [\
-          ["sqs-consumer", "virtual:685a6222c3349423674bb7f0684ba34e2ab20912010f352e04dcf707a156e13183fc382e2417cb37a60f3e7b52fd0178c53181674890e1773eb83e190dc13378#npm:7.3.0"],\
+          ["sqs-consumer", "virtual:685a6222c3349423674bb7f0684ba34e2ab20912010f352e04dcf707a156e13183fc382e2417cb37a60f3e7b52fd0178c53181674890e1773eb83e190dc13378#npm:7.4.0-canary.0"],\
           ["@aws-sdk/client-sqs", "npm:3.427.0"],\
           ["@types/aws-sdk__client-sqs", null],\
           ["debug", "virtual:ac3d8e680759ce54399273724d44e041d6c9b73454d191d411a8c44bb27e22f02aaf6ed9d3ad0ac1c298eac4833cff369c9c7b84c573016112c4f84be2cd8543#npm:4.3.4"]\

BIN
.yarn/cache/sqs-consumer-npm-7.3.0-a47c08ef71-367ea2a6f3.zip → .yarn/cache/sqs-consumer-npm-7.4.0-canary.0-2eeb36c4ab-7ecff41c97.zip


+ 4 - 3
packages/analytics/bin/worker.ts

@@ -7,7 +7,7 @@ const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.Analytic
 sdk.start()
 
 import { Logger } from 'winston'
-import { DomainEventSubscriberFactoryInterface } from '@standardnotes/domain-events'
+import { DomainEventSubscriberInterface } from '@standardnotes/domain-events'
 import * as dayjs from 'dayjs'
 import * as utc from 'dayjs/plugin/utc'
 
@@ -26,6 +26,7 @@ void container.load().then((container) => {
 
   logger.info('Starting worker...')
 
-  const subscriberFactory: DomainEventSubscriberFactoryInterface = container.get(TYPES.DomainEventSubscriberFactory)
-  subscriberFactory.create().start()
+  const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.DomainEventSubscriber)
+
+  subscriber.start()
 })

+ 10 - 8
packages/analytics/src/Bootstrap/Container.ts

@@ -5,9 +5,9 @@ import {
   DomainEventHandlerInterface,
   DomainEventMessageHandlerInterface,
   DomainEventPublisherInterface,
-  DomainEventSubscriberFactoryInterface,
+  DomainEventSubscriberInterface,
 } from '@standardnotes/domain-events'
-import { MapperInterface } from '@standardnotes/domain-core'
+import { MapperInterface, ServiceIdentifier } from '@standardnotes/domain-core'
 // eslint-disable-next-line @typescript-eslint/no-var-requires
 const Mixpanel = require('mixpanel')
 
@@ -17,8 +17,8 @@ import { AppDataSource } from './DataSource'
 import { DomainEventFactory } from '../Domain/Event/DomainEventFactory'
 import {
   SNSOpenTelemetryDomainEventPublisher,
-  SQSDomainEventSubscriberFactory,
   SQSEventMessageHandler,
+  SQSOpenTelemetryDomainEventSubscriber,
 } from '@standardnotes/domain-events-infra'
 import { Timer, TimerInterface } from '@standardnotes/time'
 import { PeriodKeyGeneratorInterface } from '../Domain/Time/PeriodKeyGeneratorInterface'
@@ -240,12 +240,14 @@ export class ContainerConfigLoader {
       .bind<DomainEventMessageHandlerInterface>(TYPES.DomainEventMessageHandler)
       .toConstantValue(new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Logger)))
     container
-      .bind<DomainEventSubscriberFactoryInterface>(TYPES.DomainEventSubscriberFactory)
+      .bind<DomainEventSubscriberInterface>(TYPES.DomainEventSubscriber)
       .toConstantValue(
-        new SQSDomainEventSubscriberFactory(
-          container.get(TYPES.SQS),
-          container.get(TYPES.SQS_QUEUE_URL),
-          container.get(TYPES.DomainEventMessageHandler),
+        new SQSOpenTelemetryDomainEventSubscriber(
+          ServiceIdentifier.NAMES.AnalyticsWorker,
+          container.get<SQSClient>(TYPES.SQS),
+          container.get<string>(TYPES.SQS_QUEUE_URL),
+          container.get<DomainEventMessageHandlerInterface>(TYPES.DomainEventMessageHandler),
+          container.get<winston.Logger>(TYPES.Logger),
         ),
       )
 

+ 1 - 1
packages/analytics/src/Bootstrap/Types.ts

@@ -42,7 +42,7 @@ const TYPES = {
   RevenueModificationMap: Symbol.for('RevenueModificationMap'),
   // Services
   DomainEventPublisher: Symbol.for('DomainEventPublisher'),
-  DomainEventSubscriberFactory: Symbol.for('DomainEventSubscriberFactory'),
+  DomainEventSubscriber: Symbol.for('DomainEventSubscriber'),
   DomainEventFactory: Symbol.for('DomainEventFactory'),
   DomainEventMessageHandler: Symbol.for('DomainEventMessageHandler'),
   AnalyticsStore: Symbol.for('AnalyticsStore'),

+ 4 - 5
packages/auth/bin/worker.ts

@@ -11,7 +11,7 @@ import { Logger } from 'winston'
 import { ContainerConfigLoader } from '../src/Bootstrap/Container'
 import TYPES from '../src/Bootstrap/Types'
 import { Env } from '../src/Bootstrap/Env'
-import { DomainEventSubscriberFactoryInterface } from '@standardnotes/domain-events'
+import { DomainEventSubscriberInterface } from '@standardnotes/domain-events'
 import * as dayjs from 'dayjs'
 import * as utc from 'dayjs/plugin/utc'
 
@@ -26,8 +26,7 @@ void container.load().then((container) => {
 
   logger.info('Starting worker...')
 
-  const subscriberFactory: DomainEventSubscriberFactoryInterface = container.get(
-    TYPES.Auth_DomainEventSubscriberFactory,
-  )
-  subscriberFactory.create().start()
+  const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.Auth_DomainEventSubscriber)
+
+  subscriber.start()
 })

+ 10 - 7
packages/auth/src/Bootstrap/Container.ts

@@ -7,7 +7,7 @@ import {
   DomainEventHandlerInterface,
   DomainEventMessageHandlerInterface,
   DomainEventPublisherInterface,
-  DomainEventSubscriberFactoryInterface,
+  DomainEventSubscriberInterface,
 } from '@standardnotes/domain-events'
 import { TimerInterface, Timer } from '@standardnotes/time'
 import { UAParser } from 'ua-parser-js'
@@ -90,8 +90,8 @@ import {
   DirectCallDomainEventPublisher,
   DirectCallEventMessageHandler,
   SNSOpenTelemetryDomainEventPublisher,
-  SQSDomainEventSubscriberFactory,
   SQSEventMessageHandler,
+  SQSOpenTelemetryDomainEventSubscriber,
 } from '@standardnotes/domain-events-infra'
 import { GetUserSubscription } from '../Domain/UseCase/GetUserSubscription/GetUserSubscription'
 import { ChangeCredentials } from '../Domain/UseCase/ChangeCredentials/ChangeCredentials'
@@ -188,6 +188,7 @@ import {
   ControllerContainer,
   ControllerContainerInterface,
   MapperInterface,
+  ServiceIdentifier,
   SharedVaultUser,
 } from '@standardnotes/domain-core'
 import { SessionTracePersistenceMapper } from '../Mapping/SessionTracePersistenceMapper'
@@ -1262,12 +1263,14 @@ export class ContainerConfigLoader {
         .toConstantValue(new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Auth_Logger)))
 
       container
-        .bind<DomainEventSubscriberFactoryInterface>(TYPES.Auth_DomainEventSubscriberFactory)
+        .bind<DomainEventSubscriberInterface>(TYPES.Auth_DomainEventSubscriber)
         .toConstantValue(
-          new SQSDomainEventSubscriberFactory(
-            container.get(TYPES.Auth_SQS),
-            container.get(TYPES.Auth_SQS_QUEUE_URL),
-            container.get(TYPES.Auth_DomainEventMessageHandler),
+          new SQSOpenTelemetryDomainEventSubscriber(
+            ServiceIdentifier.NAMES.AuthWorker,
+            container.get<SQSClient>(TYPES.Auth_SQS),
+            container.get<string>(TYPES.Auth_SQS_QUEUE_URL),
+            container.get<DomainEventMessageHandlerInterface>(TYPES.Auth_DomainEventMessageHandler),
+            container.get<winston.Logger>(TYPES.Auth_Logger),
           ),
         )
     }

+ 1 - 1
packages/auth/src/Bootstrap/Types.ts

@@ -218,7 +218,7 @@ const TYPES = {
   Auth_WebSocketConnectionTokenDecoder: Symbol.for('Auth_WebSocketConnectionTokenDecoder'),
   Auth_AuthenticationMethodResolver: Symbol.for('Auth_AuthenticationMethodResolver'),
   Auth_DomainEventPublisher: Symbol.for('Auth_DomainEventPublisher'),
-  Auth_DomainEventSubscriberFactory: Symbol.for('Auth_DomainEventSubscriberFactory'),
+  Auth_DomainEventSubscriber: Symbol.for('Auth_DomainEventSubscriber'),
   Auth_DomainEventFactory: Symbol.for('Auth_DomainEventFactory'),
   Auth_DomainEventMessageHandler: Symbol.for('Auth_DomainEventMessageHandler'),
   Auth_HTTPClient: Symbol.for('Auth_HTTPClient'),

+ 1 - 1
packages/domain-events-infra/package.json

@@ -43,7 +43,7 @@
     "ioredis": "^5.2.4",
     "opentelemetry-instrumentation-typeorm": "^0.39.1",
     "reflect-metadata": "^0.1.13",
-    "sqs-consumer": "^7.3.0",
+    "sqs-consumer": "7.4.0-canary.0",
     "winston": "^3.8.1"
   },
   "devDependencies": {

+ 58 - 0
packages/domain-events-infra/src/Infra/SQS/SQSOpenTelemetryDomainEventSubscriber.ts

@@ -0,0 +1,58 @@
+import { Consumer } from 'sqs-consumer'
+import * as OpenTelemetryApi from '@opentelemetry/api'
+import { Message, SQSClient } from '@aws-sdk/client-sqs'
+import { DomainEventSubscriberInterface, DomainEventMessageHandlerInterface } from '@standardnotes/domain-events'
+import { Logger } from 'winston'
+
+export class SQSOpenTelemetryDomainEventSubscriber implements DomainEventSubscriberInterface {
+  private currentSpan: OpenTelemetryApi.Span | undefined
+
+  constructor(
+    private serviceName: string,
+    private sqs: SQSClient,
+    private queueUrl: string,
+    private domainEventMessageHandler: DomainEventMessageHandlerInterface,
+    private logger: Logger,
+  ) {}
+
+  start(): void {
+    const sqsConsumer = Consumer.create({
+      attributeNames: ['All'],
+      messageAttributeNames: ['All'],
+      queueUrl: this.queueUrl,
+      sqs: this.sqs,
+      preReceiveMessageCallback: this.startParentSpan.bind(this),
+      handleMessage: this.handleMessage.bind(this),
+    })
+
+    sqsConsumer.on('error', this.handleError.bind(this))
+    sqsConsumer.on('processing_error', this.handleError.bind(this))
+
+    sqsConsumer.start()
+  }
+
+  async startParentSpan(): Promise<void> {
+    const tracer = OpenTelemetryApi.trace.getTracer(`${this.serviceName}-domain-event-subscriber`)
+
+    this.currentSpan = tracer.startSpan(this.serviceName, { kind: OpenTelemetryApi.SpanKind.CONSUMER })
+  }
+
+  async handleMessage(message: Message): Promise<void> {
+    await this.domainEventMessageHandler.handleMessage(<string>message.Body)
+
+    if (this.currentSpan) {
+      this.currentSpan.end()
+      this.currentSpan = undefined
+    }
+  }
+
+  handleError(error: Error): void {
+    this.logger.error('Error occured while handling SQS message: %O', error)
+
+    if (this.currentSpan) {
+      this.currentSpan.recordException(error)
+      this.currentSpan.end()
+      this.currentSpan = undefined
+    }
+  }
+}

+ 0 - 56
packages/domain-events-infra/src/Infra/SQS/SQSOpenTelemetryEventMessageHandler.ts

@@ -1,56 +0,0 @@
-import { Logger } from 'winston'
-import * as zlib from 'zlib'
-import {
-  DomainEventHandlerInterface,
-  DomainEventInterface,
-  DomainEventMessageHandlerInterface,
-} from '@standardnotes/domain-events'
-import { OpenTelemetryTracer } from '../OpenTelemetry/OpenTelemetryTracer'
-import { OpenTelemetryTracerInterface } from '../OpenTelemetry/OpenTelemetryTracerInterface'
-
-export class SQSOpenTelemetryEventMessageHandler implements DomainEventMessageHandlerInterface {
-  private tracer: OpenTelemetryTracerInterface | undefined
-
-  constructor(
-    private serviceName: string,
-    private handlers: Map<string, DomainEventHandlerInterface>,
-    private logger: Logger,
-  ) {}
-
-  async handleMessage(message: string): Promise<void> {
-    const messageParsed = JSON.parse(message)
-
-    const domainEventJson = zlib.unzipSync(Buffer.from(messageParsed.Message, 'base64')).toString()
-
-    const domainEvent: DomainEventInterface = JSON.parse(domainEventJson)
-
-    domainEvent.createdAt = new Date(domainEvent.createdAt)
-
-    const handler = this.handlers.get(domainEvent.type)
-    if (!handler) {
-      this.logger.debug(`Event handler for event type ${domainEvent.type} does not exist`)
-
-      return
-    }
-
-    this.logger.debug(`Received event: ${domainEvent.type}`)
-
-    this.tracer = new OpenTelemetryTracer()
-
-    this.tracer.startSpan(this.serviceName, domainEvent.type)
-
-    try {
-      await handler.handle(domainEvent)
-    } catch (error) {
-      this.tracer.stopSpanWithError(error as Error)
-
-      throw error
-    }
-
-    this.tracer.stopSpan()
-  }
-
-  async handleError(error: Error): Promise<void> {
-    this.logger.error('Error occured while handling SQS message: %O', error)
-  }
-}

+ 1 - 1
packages/domain-events-infra/src/Infra/index.ts

@@ -17,4 +17,4 @@ export * from './SNS/SNSOpenTelemetryDomainEventPublisher'
 export * from './SQS/SQSBounceNotificiationHandler'
 export * from './SQS/SQSDomainEventSubscriberFactory'
 export * from './SQS/SQSEventMessageHandler'
-export * from './SQS/SQSOpenTelemetryEventMessageHandler'
+export * from './SQS/SQSOpenTelemetryDomainEventSubscriber'

+ 4 - 5
packages/files/bin/worker.ts

@@ -11,7 +11,7 @@ import { Logger } from 'winston'
 import { ContainerConfigLoader } from '../src/Bootstrap/Container'
 import TYPES from '../src/Bootstrap/Types'
 import { Env } from '../src/Bootstrap/Env'
-import { DomainEventSubscriberFactoryInterface } from '@standardnotes/domain-events'
+import { DomainEventSubscriberInterface } from '@standardnotes/domain-events'
 import * as dayjs from 'dayjs'
 import * as utc from 'dayjs/plugin/utc'
 
@@ -26,8 +26,7 @@ void container.load().then((container) => {
 
   logger.info('Starting worker...')
 
-  const subscriberFactory: DomainEventSubscriberFactoryInterface = container.get(
-    TYPES.Files_DomainEventSubscriberFactory,
-  )
-  subscriberFactory.create().start()
+  const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.Files_DomainEventSubscriber)
+
+  subscriber.start()
 })

+ 10 - 7
packages/files/src/Bootstrap/Container.ts

@@ -17,8 +17,8 @@ import {
   DirectCallDomainEventPublisher,
   DirectCallEventMessageHandler,
   SNSOpenTelemetryDomainEventPublisher,
-  SQSDomainEventSubscriberFactory,
   SQSEventMessageHandler,
+  SQSOpenTelemetryDomainEventSubscriber,
 } from '@standardnotes/domain-events-infra'
 import { StreamDownloadFile } from '../Domain/UseCase/StreamDownloadFile/StreamDownloadFile'
 import { FileDownloaderInterface } from '../Domain/Services/FileDownloaderInterface'
@@ -40,7 +40,7 @@ import {
   DomainEventHandlerInterface,
   DomainEventMessageHandlerInterface,
   DomainEventPublisherInterface,
-  DomainEventSubscriberFactoryInterface,
+  DomainEventSubscriberInterface,
 } from '@standardnotes/domain-events'
 import { MarkFilesToBeRemoved } from '../Domain/UseCase/MarkFilesToBeRemoved/MarkFilesToBeRemoved'
 import { AccountDeletionRequestedEventHandler } from '../Domain/Handler/AccountDeletionRequestedEventHandler'
@@ -52,6 +52,7 @@ import { S3FileMover } from '../Infra/S3/S3FileMover'
 import { FSFileMover } from '../Infra/FS/FSFileMover'
 import { MoveFile } from '../Domain/UseCase/MoveFile/MoveFile'
 import { SharedVaultValetTokenAuthMiddleware } from '../Infra/InversifyExpress/Middleware/SharedVaultValetTokenAuthMiddleware'
+import { ServiceIdentifier } from '@standardnotes/domain-core'
 
 export class ContainerConfigLoader {
   async load(configuration?: {
@@ -298,12 +299,14 @@ export class ContainerConfigLoader {
         .bind<DomainEventMessageHandlerInterface>(TYPES.Files_DomainEventMessageHandler)
         .toConstantValue(new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Files_Logger)))
       container
-        .bind<DomainEventSubscriberFactoryInterface>(TYPES.Files_DomainEventSubscriberFactory)
+        .bind<DomainEventSubscriberInterface>(TYPES.Files_DomainEventSubscriber)
         .toConstantValue(
-          new SQSDomainEventSubscriberFactory(
-            container.get(TYPES.Files_SQS),
-            container.get(TYPES.Files_SQS_QUEUE_URL),
-            container.get(TYPES.Files_DomainEventMessageHandler),
+          new SQSOpenTelemetryDomainEventSubscriber(
+            ServiceIdentifier.NAMES.FilesWorker,
+            container.get<SQSClient>(TYPES.Files_SQS),
+            container.get<string>(TYPES.Files_SQS_QUEUE_URL),
+            container.get<DomainEventMessageHandlerInterface>(TYPES.Files_DomainEventMessageHandler),
+            container.get<winston.Logger>(TYPES.Files_Logger),
           ),
         )
     }

+ 1 - 1
packages/files/src/Bootstrap/Types.ts

@@ -52,7 +52,7 @@ const TYPES = {
 
   // Handlers
   Files_DomainEventMessageHandler: Symbol.for('Files_DomainEventMessageHandler'),
-  Files_DomainEventSubscriberFactory: Symbol.for('Files_DomainEventSubscriberFactory'),
+  Files_DomainEventSubscriber: Symbol.for('Files_DomainEventSubscriber'),
   Files_AccountDeletionRequestedEventHandler: Symbol.for('Files_AccountDeletionRequestedEventHandler'),
   Files_SharedSubscriptionInvitationCanceledEventHandler: Symbol.for(
     'Files_SharedSubscriptionInvitationCanceledEventHandler',

+ 4 - 5
packages/revisions/bin/worker.ts

@@ -10,7 +10,7 @@ import { Logger } from 'winston'
 
 import TYPES from '../src/Bootstrap/Types'
 import { Env } from '../src/Bootstrap/Env'
-import { DomainEventSubscriberFactoryInterface } from '@standardnotes/domain-events'
+import { DomainEventSubscriberInterface } from '@standardnotes/domain-events'
 import { ContainerConfigLoader } from '../src/Bootstrap/Container'
 
 const container = new ContainerConfigLoader('worker')
@@ -22,8 +22,7 @@ void container.load().then((container) => {
 
   logger.info('Starting worker...')
 
-  const subscriberFactory: DomainEventSubscriberFactoryInterface = container.get(
-    TYPES.Revisions_DomainEventSubscriberFactory,
-  )
-  subscriberFactory.create().start()
+  const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.Revisions_DomainEventSubscriber)
+
+  subscriber.start()
 })

+ 18 - 11
packages/revisions/src/Bootstrap/Container.ts

@@ -1,4 +1,9 @@
-import { ControllerContainer, ControllerContainerInterface, MapperInterface } from '@standardnotes/domain-core'
+import {
+  ControllerContainer,
+  ControllerContainerInterface,
+  MapperInterface,
+  ServiceIdentifier,
+} from '@standardnotes/domain-core'
 import Redis from 'ioredis'
 import { Container, interfaces } from 'inversify'
 import { MongoRepository, Repository } from 'typeorm'
@@ -26,15 +31,15 @@ import { SQSClient, SQSClientConfig } from '@aws-sdk/client-sqs'
 import {
   DomainEventMessageHandlerInterface,
   DomainEventHandlerInterface,
-  DomainEventSubscriberFactoryInterface,
   DomainEventPublisherInterface,
+  DomainEventSubscriberInterface,
 } from '@standardnotes/domain-events'
 import {
   SQSEventMessageHandler,
-  SQSDomainEventSubscriberFactory,
   DirectCallEventMessageHandler,
   DirectCallDomainEventPublisher,
   SNSOpenTelemetryDomainEventPublisher,
+  SQSOpenTelemetryDomainEventSubscriber,
 } from '@standardnotes/domain-events-infra'
 import { DumpRepositoryInterface } from '../Domain/Dump/DumpRepositoryInterface'
 import { AccountDeletionRequestedEventHandler } from '../Domain/Handler/AccountDeletionRequestedEventHandler'
@@ -507,14 +512,16 @@ export class ContainerConfigLoader {
         .toConstantValue(new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Revisions_Logger)))
 
       container
-        .bind<DomainEventSubscriberFactoryInterface>(TYPES.Revisions_DomainEventSubscriberFactory)
-        .toDynamicValue((context: interfaces.Context) => {
-          return new SQSDomainEventSubscriberFactory(
-            context.container.get(TYPES.Revisions_SQS),
-            context.container.get(TYPES.Revisions_SQS_QUEUE_URL),
-            context.container.get(TYPES.Revisions_DomainEventMessageHandler),
-          )
-        })
+        .bind<DomainEventSubscriberInterface>(TYPES.Revisions_DomainEventSubscriber)
+        .toConstantValue(
+          new SQSOpenTelemetryDomainEventSubscriber(
+            ServiceIdentifier.NAMES.RevisionsWorker,
+            container.get<SQSClient>(TYPES.Revisions_SQS),
+            container.get<string>(TYPES.Revisions_SQS_QUEUE_URL),
+            container.get<DomainEventMessageHandlerInterface>(TYPES.Revisions_DomainEventMessageHandler),
+            container.get<winston.Logger>(TYPES.Revisions_Logger),
+          ),
+        )
     }
 
     // Inversify Controllers

+ 1 - 1
packages/revisions/src/Bootstrap/Types.ts

@@ -65,7 +65,7 @@ const TYPES = {
   Revisions_SharedVaultRemovedEventHandler: Symbol.for('Revisions_SharedVaultRemovedEventHandler'),
   // Services
   Revisions_CrossServiceTokenDecoder: Symbol.for('Revisions_CrossServiceTokenDecoder'),
-  Revisions_DomainEventSubscriberFactory: Symbol.for('Revisions_DomainEventSubscriberFactory'),
+  Revisions_DomainEventSubscriber: Symbol.for('Revisions_DomainEventSubscriber'),
   Revisions_DomainEventMessageHandler: Symbol.for('Revisions_DomainEventMessageHandler'),
   Revisions_DomainEventPublisher: Symbol.for('Revisions_DomainEventPublisher'),
   Revisions_DomainEventFactory: Symbol.for('Revisions_DomainEventFactory'),

+ 4 - 3
packages/scheduler/bin/worker.ts

@@ -7,7 +7,7 @@ const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.Schedule
 sdk.start()
 
 import { Logger } from 'winston'
-import { DomainEventSubscriberFactoryInterface } from '@standardnotes/domain-events'
+import { DomainEventSubscriberInterface } from '@standardnotes/domain-events'
 import * as dayjs from 'dayjs'
 import * as utc from 'dayjs/plugin/utc'
 
@@ -26,6 +26,7 @@ void container.load().then((container) => {
 
   logger.info('Starting worker...')
 
-  const subscriberFactory: DomainEventSubscriberFactoryInterface = container.get(TYPES.DomainEventSubscriberFactory)
-  subscriberFactory.create().start()
+  const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.DomainEventSubscriber)
+
+  subscriber.start()
 })

+ 10 - 7
packages/scheduler/src/Bootstrap/Container.ts

@@ -7,7 +7,7 @@ import {
   DomainEventHandlerInterface,
   DomainEventMessageHandlerInterface,
   DomainEventPublisherInterface,
-  DomainEventSubscriberFactoryInterface,
+  DomainEventSubscriberInterface,
 } from '@standardnotes/domain-events'
 
 import { Env } from './Env'
@@ -16,8 +16,8 @@ import { AppDataSource } from './DataSource'
 import { DomainEventFactory } from '../Domain/Event/DomainEventFactory'
 import {
   SNSOpenTelemetryDomainEventPublisher,
-  SQSDomainEventSubscriberFactory,
   SQSEventMessageHandler,
+  SQSOpenTelemetryDomainEventSubscriber,
 } from '@standardnotes/domain-events-infra'
 import { Timer, TimerInterface } from '@standardnotes/time'
 import { PredicateRepositoryInterface } from '../Domain/Predicate/PredicateRepositoryInterface'
@@ -35,6 +35,7 @@ import { VerifyPredicates } from '../Domain/UseCase/VerifyPredicates/VerifyPredi
 import { UserRegisteredEventHandler } from '../Domain/Handler/UserRegisteredEventHandler'
 import { SubscriptionCancelledEventHandler } from '../Domain/Handler/SubscriptionCancelledEventHandler'
 import { ExitDiscountAppliedEventHandler } from '../Domain/Handler/ExitDiscountAppliedEventHandler'
+import { ServiceIdentifier } from '@standardnotes/domain-core'
 
 export class ContainerConfigLoader {
   async load(): Promise<Container> {
@@ -150,12 +151,14 @@ export class ContainerConfigLoader {
       .bind<DomainEventMessageHandlerInterface>(TYPES.DomainEventMessageHandler)
       .toConstantValue(new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Logger)))
     container
-      .bind<DomainEventSubscriberFactoryInterface>(TYPES.DomainEventSubscriberFactory)
+      .bind<DomainEventSubscriberInterface>(TYPES.DomainEventSubscriber)
       .toConstantValue(
-        new SQSDomainEventSubscriberFactory(
-          container.get(TYPES.SQS),
-          container.get(TYPES.SQS_QUEUE_URL),
-          container.get(TYPES.DomainEventMessageHandler),
+        new SQSOpenTelemetryDomainEventSubscriber(
+          ServiceIdentifier.NAMES.SchedulerWorker,
+          container.get<SQSClient>(TYPES.SQS),
+          container.get<string>(TYPES.SQS_QUEUE_URL),
+          container.get<DomainEventMessageHandlerInterface>(TYPES.DomainEventMessageHandler),
+          container.get<winston.Logger>(TYPES.Logger),
         ),
       )
 

+ 1 - 1
packages/scheduler/src/Bootstrap/Types.ts

@@ -25,7 +25,7 @@ const TYPES = {
   ExitDiscountAppliedEventHandler: Symbol.for('ExitDiscountAppliedEventHandler'),
   // Services
   DomainEventPublisher: Symbol.for('DomainEventPublisher'),
-  DomainEventSubscriberFactory: Symbol.for('DomainEventSubscriberFactory'),
+  DomainEventSubscriber: Symbol.for('DomainEventSubscriber'),
   DomainEventFactory: Symbol.for('DomainEventFactory'),
   DomainEventMessageHandler: Symbol.for('DomainEventMessageHandler'),
   Timer: Symbol.for('Timer'),

+ 4 - 6
packages/syncing-server/bin/worker.ts

@@ -10,7 +10,7 @@ import { Logger } from 'winston'
 
 import TYPES from '../src/Bootstrap/Types'
 import { Env } from '../src/Bootstrap/Env'
-import { DomainEventSubscriberFactoryInterface } from '@standardnotes/domain-events'
+import { DomainEventSubscriberInterface } from '@standardnotes/domain-events'
 import { ContainerConfigLoader } from '../src/Bootstrap/Container'
 
 const container = new ContainerConfigLoader('worker')
@@ -20,11 +20,9 @@ void container.load().then((container) => {
 
   const logger: Logger = container.get(TYPES.Sync_Logger)
 
-  const subscriberFactory: DomainEventSubscriberFactoryInterface = container.get(
-    TYPES.Sync_DomainEventSubscriberFactory,
-  )
-
   logger.info('Starting worker...')
 
-  subscriberFactory.create().start()
+  const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.Sync_DomainEventSubscriber)
+
+  subscriber.start()
 })

+ 13 - 10
packages/syncing-server/src/Bootstrap/Container.ts

@@ -14,8 +14,8 @@ import {
   DirectCallDomainEventPublisher,
   DirectCallEventMessageHandler,
   SNSOpenTelemetryDomainEventPublisher,
-  SQSDomainEventSubscriberFactory,
   SQSEventMessageHandler,
+  SQSOpenTelemetryDomainEventSubscriber,
 } from '@standardnotes/domain-events-infra'
 import { DomainEventFactoryInterface } from '../Domain/Event/DomainEventFactoryInterface'
 import { DomainEventFactory } from '../Domain/Event/DomainEventFactory'
@@ -43,8 +43,8 @@ import { ContentDecoder, ContentDecoderInterface } from '@standardnotes/common'
 import {
   DomainEventMessageHandlerInterface,
   DomainEventHandlerInterface,
-  DomainEventSubscriberFactoryInterface,
   DomainEventPublisherInterface,
+  DomainEventSubscriberInterface,
 } from '@standardnotes/domain-events'
 import axios, { AxiosInstance } from 'axios'
 import { ExtensionsHttpService } from '../Domain/Extension/ExtensionsHttpService'
@@ -60,6 +60,7 @@ import {
   ControllerContainer,
   ControllerContainerInterface,
   MapperInterface,
+  ServiceIdentifier,
   SharedVaultUser,
 } from '@standardnotes/domain-core'
 import { BaseItemsController } from '../Infra/InversifyExpressUtils/Base/BaseItemsController'
@@ -1140,14 +1141,16 @@ export class ContainerConfigLoader {
     }
 
     container
-      .bind<DomainEventSubscriberFactoryInterface>(TYPES.Sync_DomainEventSubscriberFactory)
-      .toDynamicValue((context: interfaces.Context) => {
-        return new SQSDomainEventSubscriberFactory(
-          context.container.get(TYPES.Sync_SQS),
-          context.container.get(TYPES.Sync_SQS_QUEUE_URL),
-          context.container.get(TYPES.Sync_DomainEventMessageHandler),
-        )
-      })
+      .bind<DomainEventSubscriberInterface>(TYPES.Sync_DomainEventSubscriber)
+      .toConstantValue(
+        new SQSOpenTelemetryDomainEventSubscriber(
+          ServiceIdentifier.NAMES.SyncingServerWorker,
+          container.get<SQSClient>(TYPES.Sync_SQS),
+          container.get<string>(TYPES.Sync_SQS_QUEUE_URL),
+          container.get<DomainEventMessageHandlerInterface>(TYPES.Sync_DomainEventMessageHandler),
+          container.get<Logger>(TYPES.Sync_Logger),
+        ),
+      )
 
     container
       .bind<ControllerContainerInterface>(TYPES.Sync_ControllerContainer)

+ 1 - 1
packages/syncing-server/src/Bootstrap/Types.ts

@@ -105,7 +105,7 @@ const TYPES = {
   // Services
   Sync_ContentDecoder: Symbol.for('Sync_ContentDecoder'),
   Sync_DomainEventPublisher: Symbol.for('Sync_DomainEventPublisher'),
-  Sync_DomainEventSubscriberFactory: Symbol.for('Sync_DomainEventSubscriberFactory'),
+  Sync_DomainEventSubscriber: Symbol.for('Sync_DomainEventSubscriber'),
   Sync_DomainEventFactory: Symbol.for('Sync_DomainEventFactory'),
   Sync_DomainEventMessageHandler: Symbol.for('Sync_DomainEventMessageHandler'),
   Sync_HTTPClient: Symbol.for('Sync_HTTPClient'),

+ 4 - 3
packages/websockets/bin/worker.ts

@@ -11,7 +11,7 @@ import { Logger } from 'winston'
 import { ContainerConfigLoader } from '../src/Bootstrap/Container'
 import TYPES from '../src/Bootstrap/Types'
 import { Env } from '../src/Bootstrap/Env'
-import { DomainEventSubscriberFactoryInterface } from '@standardnotes/domain-events'
+import { DomainEventSubscriberInterface } from '@standardnotes/domain-events'
 
 const container = new ContainerConfigLoader()
 void container.load().then((container) => {
@@ -22,6 +22,7 @@ void container.load().then((container) => {
 
   logger.info('Starting worker...')
 
-  const subscriberFactory: DomainEventSubscriberFactoryInterface = container.get(TYPES.DomainEventSubscriberFactory)
-  subscriberFactory.create().start()
+  const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.DomainEventSubscriber)
+
+  subscriber.start()
 })

+ 10 - 7
packages/websockets/src/Bootstrap/Container.ts

@@ -9,7 +9,7 @@ import { Container } from 'inversify'
 import {
   DomainEventHandlerInterface,
   DomainEventMessageHandlerInterface,
-  DomainEventSubscriberFactoryInterface,
+  DomainEventSubscriberInterface,
 } from '@standardnotes/domain-events'
 import { Env } from './Env'
 import TYPES from './Types'
@@ -18,7 +18,7 @@ import { RedisWebSocketsConnectionRepository } from '../Infra/Redis/RedisWebSock
 import { AddWebSocketsConnection } from '../Domain/UseCase/AddWebSocketsConnection/AddWebSocketsConnection'
 import { RemoveWebSocketsConnection } from '../Domain/UseCase/RemoveWebSocketsConnection/RemoveWebSocketsConnection'
 import { WebSocketsClientMessenger } from '../Infra/WebSockets/WebSocketsClientMessenger'
-import { SQSDomainEventSubscriberFactory, SQSEventMessageHandler } from '@standardnotes/domain-events-infra'
+import { SQSEventMessageHandler, SQSOpenTelemetryDomainEventSubscriber } from '@standardnotes/domain-events-infra'
 import { ApiGatewayAuthMiddleware } from '../Controller/ApiGatewayAuthMiddleware'
 
 import {
@@ -34,6 +34,7 @@ import { WebSocketsController } from '../Controller/WebSocketsController'
 import { WebSocketServerInterface } from '@standardnotes/api'
 import { ClientMessengerInterface } from '../Client/ClientMessengerInterface'
 import { WebSocketMessageRequestedEventHandler } from '../Domain/Handler/WebSocketMessageRequestedEventHandler'
+import { ServiceIdentifier } from '@standardnotes/domain-core'
 
 export class ContainerConfigLoader {
   async load(): Promise<Container> {
@@ -142,12 +143,14 @@ export class ContainerConfigLoader {
       .bind<DomainEventMessageHandlerInterface>(TYPES.DomainEventMessageHandler)
       .toConstantValue(new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Logger)))
     container
-      .bind<DomainEventSubscriberFactoryInterface>(TYPES.DomainEventSubscriberFactory)
+      .bind<DomainEventSubscriberInterface>(TYPES.DomainEventSubscriber)
       .toConstantValue(
-        new SQSDomainEventSubscriberFactory(
-          container.get(TYPES.SQS),
-          container.get(TYPES.SQS_QUEUE_URL),
-          container.get(TYPES.DomainEventMessageHandler),
+        new SQSOpenTelemetryDomainEventSubscriber(
+          ServiceIdentifier.NAMES.WebsocketsWorker,
+          container.get<SQSClient>(TYPES.SQS),
+          container.get<string>(TYPES.SQS_QUEUE_URL),
+          container.get<DomainEventMessageHandlerInterface>(TYPES.DomainEventMessageHandler),
+          container.get<winston.Logger>(TYPES.Logger),
         ),
       )
 

+ 1 - 1
packages/websockets/src/Bootstrap/Types.ts

@@ -27,7 +27,7 @@ const TYPES = {
   // Services
   CrossServiceTokenDecoder: Symbol.for('CrossServiceTokenDecoder'),
   WebSocketConnectionTokenEncoder: Symbol.for('WebSocketConnectionTokenEncoder'),
-  DomainEventSubscriberFactory: Symbol.for('DomainEventSubscriberFactory'),
+  DomainEventSubscriber: Symbol.for('DomainEventSubscriber'),
   DomainEventMessageHandler: Symbol.for('DomainEventMessageHandler'),
   HTTPClient: Symbol.for('HTTPClient'),
   WebSocketsClientMessenger: Symbol.for('WebSocketsClientMessenger'),

+ 5 - 5
yarn.lock

@@ -4438,7 +4438,7 @@ __metadata:
     opentelemetry-instrumentation-typeorm: "npm:^0.39.1"
     prettier: "npm:^3.0.3"
     reflect-metadata: "npm:^0.1.13"
-    sqs-consumer: "npm:^7.3.0"
+    sqs-consumer: "npm:7.4.0-canary.0"
     ts-jest: "npm:^29.1.0"
     typescript: "npm:^5.0.4"
     winston: "npm:^3.8.1"
@@ -12735,15 +12735,15 @@ __metadata:
   languageName: node
   linkType: hard
 
-"sqs-consumer@npm:^7.3.0":
-  version: 7.3.0
-  resolution: "sqs-consumer@npm:7.3.0"
+"sqs-consumer@npm:7.4.0-canary.0":
+  version: 7.4.0-canary.0
+  resolution: "sqs-consumer@npm:7.4.0-canary.0"
   dependencies:
     "@aws-sdk/client-sqs": "npm:^3.363.0"
     debug: "npm:^4.3.4"
   peerDependencies:
     "@aws-sdk/client-sqs": ^3.363.0
-  checksum: 367ea2a6f3ea5da331efbb7052e643f1657aafa138c8404f336389effbbaba734858bd4daf29c86ab0b4c8a015863ddac6b19482302324f46d45a38fae731f54
+  checksum: 7ecff41c977775f5e61e9f58f79a86b3939bd9ade9cecd7532d8344c454ec059267645c87544d38dbb4c4403e65c908d51c69aee3776c009310c3ba79c1172c7
   languageName: node
   linkType: hard