浏览代码

feat: add opentelemetry tracing in distributed system

Karol Sójko 1 年之前
父节点
当前提交
72c9b28ebe
共有 22 个文件被更改,包括 180 次插入64 次删除
  1. 17 3
      packages/analytics/src/Bootstrap/Container.ts
  2. 1 0
      packages/analytics/src/Bootstrap/Types.ts
  3. 13 2
      packages/auth/src/Bootstrap/Container.ts
  4. 1 0
      packages/auth/src/Bootstrap/Types.ts
  5. 17 0
      packages/domain-events-infra/src/Infra/OpenTelemetry/OpenTelemetryPropagation.ts
  6. 6 0
      packages/domain-events-infra/src/Infra/OpenTelemetry/OpenTelemetryPropagationInterface.ts
  7. 3 3
      packages/domain-events-infra/src/Infra/OpenTelemetry/OpenTelemetryTracer.ts
  8. 3 1
      packages/domain-events-infra/src/Infra/OpenTelemetry/OpenTelemetryTracerInterface.ts
  9. 0 45
      packages/domain-events-infra/src/Infra/SNS/SNSDomainEventPublisher.spec.ts
  10. 48 0
      packages/domain-events-infra/src/Infra/SNS/SNSOpenTelemetryDomainEventPublisher.ts
  11. 8 1
      packages/domain-events-infra/src/Infra/SQS/SQSOpenTelemetryEventMessageHandler.ts
  12. 3 0
      packages/domain-events-infra/src/Infra/index.ts
  13. 4 0
      packages/domain-events/src/Domain/Event/DomainEventInterface.ts
  14. 13 2
      packages/files/src/Bootstrap/Container.ts
  15. 1 0
      packages/files/src/Bootstrap/Types.ts
  16. 10 2
      packages/revisions/src/Bootstrap/Container.ts
  17. 1 0
      packages/revisions/src/Bootstrap/Types.ts
  18. 17 3
      packages/scheduler/src/Bootstrap/Container.ts
  19. 1 0
      packages/scheduler/src/Bootstrap/Types.ts
  20. 10 2
      packages/syncing-server/src/Bootstrap/Container.ts
  21. 1 0
      packages/syncing-server/src/Bootstrap/Types.ts
  22. 2 0
      packages/websockets/src/Bootstrap/Container.ts

+ 17 - 3
packages/analytics/src/Bootstrap/Container.ts

@@ -4,6 +4,7 @@ import { Container } from 'inversify'
 import {
   DomainEventHandlerInterface,
   DomainEventMessageHandlerInterface,
+  DomainEventPublisherInterface,
   DomainEventSubscriberFactoryInterface,
 } from '@standardnotes/domain-events'
 import { MapperInterface, ServiceIdentifier } from '@standardnotes/domain-core'
@@ -15,7 +16,9 @@ import TYPES from './Types'
 import { AppDataSource } from './DataSource'
 import { DomainEventFactory } from '../Domain/Event/DomainEventFactory'
 import {
-  SNSDomainEventPublisher,
+  OpenTelemetryPropagation,
+  OpenTelemetryPropagationInterface,
+  SNSOpenTelemetryDomainEventPublisher,
   SQSDomainEventSubscriberFactory,
   SQSOpenTelemetryEventMessageHandler,
 } from '@standardnotes/domain-events-infra'
@@ -86,6 +89,10 @@ export class ContainerConfigLoader {
     })
     container.bind<winston.Logger>(TYPES.Logger).toConstantValue(logger)
 
+    container
+      .bind<OpenTelemetryPropagationInterface>(TYPES.OTEL_PROPAGATOR)
+      .toConstantValue(new OpenTelemetryPropagation())
+
     const snsConfig: SNSClientConfig = {
       apiVersion: 'latest',
       region: env.get('SNS_AWS_REGION', true),
@@ -137,8 +144,14 @@ export class ContainerConfigLoader {
     container.bind<TimerInterface>(TYPES.Timer).toConstantValue(new Timer())
 
     container
-      .bind<SNSDomainEventPublisher>(TYPES.DomainEventPublisher)
-      .toConstantValue(new SNSDomainEventPublisher(container.get(TYPES.SNS), container.get(TYPES.SNS_TOPIC_ARN)))
+      .bind<DomainEventPublisherInterface>(TYPES.DomainEventPublisher)
+      .toConstantValue(
+        new SNSOpenTelemetryDomainEventPublisher(
+          container.get<OpenTelemetryPropagationInterface>(TYPES.OTEL_PROPAGATOR),
+          container.get(TYPES.SNS),
+          container.get(TYPES.SNS_TOPIC_ARN),
+        ),
+      )
     if (env.get('MIXPANEL_TOKEN', true)) {
       container.bind<Mixpanel>(TYPES.MixpanelClient).toConstantValue(Mixpanel.init(env.get('MIXPANEL_TOKEN', true)))
     }
@@ -238,6 +251,7 @@ export class ContainerConfigLoader {
       .toConstantValue(
         new SQSOpenTelemetryEventMessageHandler(
           ServiceIdentifier.NAMES.AnalyticsWorker,
+          container.get<OpenTelemetryPropagationInterface>(TYPES.OTEL_PROPAGATOR),
           eventHandlers,
           container.get(TYPES.Logger),
         ),

+ 1 - 0
packages/analytics/src/Bootstrap/Types.ts

@@ -3,6 +3,7 @@ const TYPES = {
   Redis: Symbol.for('Redis'),
   SNS: Symbol.for('SNS'),
   SQS: Symbol.for('SQS'),
+  OTEL_PROPAGATOR: Symbol.for('OTEL_PROPAGATOR'),
   // env vars
   REDIS_URL: Symbol.for('REDIS_URL'),
   SNS_TOPIC_ARN: Symbol.for('SNS_TOPIC_ARN'),

+ 13 - 2
packages/auth/src/Bootstrap/Container.ts

@@ -89,7 +89,9 @@ import { ExtensionKeyGrantedEventHandler } from '../Domain/Handler/ExtensionKeyG
 import {
   DirectCallDomainEventPublisher,
   DirectCallEventMessageHandler,
-  SNSDomainEventPublisher,
+  OpenTelemetryPropagation,
+  OpenTelemetryPropagationInterface,
+  SNSOpenTelemetryDomainEventPublisher,
   SQSDomainEventSubscriberFactory,
   SQSEventMessageHandler,
   SQSOpenTelemetryEventMessageHandler,
@@ -308,6 +310,10 @@ export class ContainerConfigLoader {
     }
     container.bind<winston.Logger>(TYPES.Auth_Logger).toConstantValue(logger)
 
+    container
+      .bind<OpenTelemetryPropagationInterface>(TYPES.Auth_OTEL_PROPAGATOR)
+      .toConstantValue(new OpenTelemetryPropagation())
+
     const appDataSource = new AppDataSource({ env, runMigrations: this.mode === 'server' })
     await appDataSource.initialize()
 
@@ -725,7 +731,11 @@ export class ContainerConfigLoader {
       .toConstantValue(
         isConfiguredForHomeServer
           ? directCallDomainEventPublisher
-          : new SNSDomainEventPublisher(container.get(TYPES.Auth_SNS), container.get(TYPES.Auth_SNS_TOPIC_ARN)),
+          : new SNSOpenTelemetryDomainEventPublisher(
+              container.get<OpenTelemetryPropagationInterface>(TYPES.Auth_OTEL_PROPAGATOR),
+              container.get(TYPES.Auth_SNS),
+              container.get(TYPES.Auth_SNS_TOPIC_ARN),
+            ),
       )
 
     // Middleware
@@ -1228,6 +1238,7 @@ export class ContainerConfigLoader {
             ? new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Auth_Logger))
             : new SQSOpenTelemetryEventMessageHandler(
                 ServiceIdentifier.NAMES.AuthWorker,
+                container.get<OpenTelemetryPropagationInterface>(TYPES.Auth_OTEL_PROPAGATOR),
                 eventHandlers,
                 container.get(TYPES.Auth_Logger),
               ),

+ 1 - 0
packages/auth/src/Bootstrap/Types.ts

@@ -3,6 +3,7 @@ const TYPES = {
   Auth_Redis: Symbol.for('Auth_Redis'),
   Auth_SNS: Symbol.for('Auth_SNS'),
   Auth_SQS: Symbol.for('Auth_SQS'),
+  Auth_OTEL_PROPAGATOR: Symbol.for('Auth_OTEL_PROPAGATOR'),
   // Mapping
   Auth_SessionTracePersistenceMapper: Symbol.for('Auth_SessionTracePersistenceMapper'),
   Auth_AuthenticatorChallengePersistenceMapper: Symbol.for('Auth_AuthenticatorChallengePersistenceMapper'),

+ 17 - 0
packages/domain-events-infra/src/Infra/OpenTelemetry/OpenTelemetryPropagation.ts

@@ -0,0 +1,17 @@
+import * as OpenTelemetryApi from '@opentelemetry/api'
+
+export class OpenTelemetryPropagation {
+  inject(): { traceparent?: string; tracestate?: string } {
+    const output = {}
+
+    OpenTelemetryApi.propagation.inject(OpenTelemetryApi.context.active(), output)
+
+    return output as { traceparent?: string; tracestate?: string }
+  }
+
+  extract(input: { traceparent?: string; tracestate?: string }): OpenTelemetryApi.Context {
+    const activeContext = OpenTelemetryApi.propagation.extract(OpenTelemetryApi.context.active(), input)
+
+    return activeContext
+  }
+}

+ 6 - 0
packages/domain-events-infra/src/Infra/OpenTelemetry/OpenTelemetryPropagationInterface.ts

@@ -0,0 +1,6 @@
+import { Context } from '@opentelemetry/api'
+
+export interface OpenTelemetryPropagationInterface {
+  inject(): { traceparent?: string; tracestate?: string }
+  extract(input: { traceparent?: string; tracestate?: string }): Context
+}

+ 3 - 3
packages/domain-events-infra/src/Infra/OpenTelemetry/OpenTelemetryTracer.ts

@@ -6,11 +6,11 @@ export class OpenTelemetryTracer implements OpenTelemetryTracerInterface {
   private parentSpan: OpenTelemetryApi.Span | undefined
   private internalSpan: OpenTelemetryApi.Span | undefined
 
-  startSpan(parentSpanName: string, internalSpanName: string): void {
+  startSpan(parentSpanName: string, internalSpanName: string, activeContext?: OpenTelemetryApi.Context): void {
     const tracer = OpenTelemetryApi.trace.getTracer(`${parentSpanName}-handler`)
 
-    this.parentSpan = tracer.startSpan(parentSpanName, { kind: OpenTelemetryApi.SpanKind.CONSUMER })
-    const ctx = OpenTelemetryApi.trace.setSpan(OpenTelemetryApi.context.active(), this.parentSpan)
+    this.parentSpan = tracer.startSpan(parentSpanName, { kind: OpenTelemetryApi.SpanKind.CONSUMER }, activeContext)
+    const ctx = OpenTelemetryApi.trace.setSpan(activeContext ?? OpenTelemetryApi.context.active(), this.parentSpan)
 
     this.internalSpan = tracer.startSpan(internalSpanName, { kind: OpenTelemetryApi.SpanKind.INTERNAL }, ctx)
   }

+ 3 - 1
packages/domain-events-infra/src/Infra/OpenTelemetry/OpenTelemetryTracerInterface.ts

@@ -1,5 +1,7 @@
+import { Context } from '@opentelemetry/api'
+
 export interface OpenTelemetryTracerInterface {
-  startSpan(parentSpanName: string, internalSpanName: string): void
+  startSpan(parentSpanName: string, internalSpanName: string, activeContext?: Context): void
   stopSpan(): void
   stopSpanWithError(error: Error): void
 }

+ 0 - 45
packages/domain-events-infra/src/Infra/SNS/SNSDomainEventPublisher.spec.ts

@@ -1,45 +0,0 @@
-import 'reflect-metadata'
-
-import { DomainEventInterface, DomainEventService } from '@standardnotes/domain-events'
-import { SNSClient } from '@aws-sdk/client-sns'
-
-import { SNSDomainEventPublisher } from './SNSDomainEventPublisher'
-
-describe('SNSDomainEventPublisher', () => {
-  let sns: SNSClient
-  const topicArn = 'test-topic-arn'
-  let event: DomainEventInterface
-
-  const createPublisher = () => new SNSDomainEventPublisher(sns, topicArn)
-
-  beforeEach(() => {
-    sns = {} as jest.Mocked<SNSClient>
-    sns.send = jest.fn()
-
-    event = {} as jest.Mocked<DomainEventInterface>
-    event.type = 'TEST'
-    event.payload = { foo: 'bar' }
-    event.createdAt = new Date(1)
-    event.meta = {
-      correlation: {
-        userIdentifier: '1-2-3',
-        userIdentifierType: 'uuid',
-      },
-      origin: DomainEventService.Auth,
-    }
-  })
-
-  it('should publish a domain event', async () => {
-    await createPublisher().publish(event)
-
-    expect(sns.send).toHaveBeenCalled()
-  })
-
-  it('should publish a targeted domain event', async () => {
-    event.meta.target = DomainEventService.SyncingServer
-
-    await createPublisher().publish(event)
-
-    expect(sns.send).toHaveBeenCalled()
-  })
-})

+ 48 - 0
packages/domain-events-infra/src/Infra/SNS/SNSOpenTelemetryDomainEventPublisher.ts

@@ -0,0 +1,48 @@
+import * as zlib from 'zlib'
+import { MessageAttributeValue, PublishCommand, PublishCommandInput, SNSClient } from '@aws-sdk/client-sns'
+
+import { DomainEventInterface, DomainEventPublisherInterface } from '@standardnotes/domain-events'
+import { OpenTelemetryPropagationInterface } from '../OpenTelemetry/OpenTelemetryPropagationInterface'
+
+export class SNSOpenTelemetryDomainEventPublisher implements DomainEventPublisherInterface {
+  constructor(
+    private propagator: OpenTelemetryPropagationInterface,
+    private snsClient: SNSClient,
+    private topicArn: string,
+  ) {}
+
+  async publish(event: DomainEventInterface): Promise<void> {
+    const trace = this.propagator.inject()
+    event.meta.trace = trace
+
+    const message: PublishCommandInput = {
+      TopicArn: this.topicArn,
+      MessageAttributes: {
+        event: {
+          DataType: 'String',
+          StringValue: event.type,
+        },
+        compression: {
+          DataType: 'String',
+          StringValue: 'true',
+        },
+        origin: {
+          DataType: 'String',
+          StringValue: event.meta.origin,
+        },
+      },
+      Message: zlib.deflateSync(JSON.stringify(event)).toString('base64'),
+    }
+
+    if (event.meta.target !== undefined) {
+      ;(message.MessageAttributes as Record<string, MessageAttributeValue>).target = {
+        DataType: 'String',
+        StringValue: event.meta.target,
+      }
+    }
+
+    const command = new PublishCommand(message)
+
+    await this.snsClient.send(command)
+  }
+}

+ 8 - 1
packages/domain-events-infra/src/Infra/SQS/SQSOpenTelemetryEventMessageHandler.ts

@@ -7,12 +7,14 @@ import {
 } from '@standardnotes/domain-events'
 import { OpenTelemetryTracer } from '../OpenTelemetry/OpenTelemetryTracer'
 import { OpenTelemetryTracerInterface } from '../OpenTelemetry/OpenTelemetryTracerInterface'
+import { OpenTelemetryPropagationInterface } from '../OpenTelemetry/OpenTelemetryPropagationInterface'
 
 export class SQSOpenTelemetryEventMessageHandler implements DomainEventMessageHandlerInterface {
   private tracer: OpenTelemetryTracerInterface | undefined
 
   constructor(
     private serviceName: string,
+    private propagator: OpenTelemetryPropagationInterface,
     private handlers: Map<string, DomainEventHandlerInterface>,
     private logger: Logger,
   ) {}
@@ -37,7 +39,12 @@ export class SQSOpenTelemetryEventMessageHandler implements DomainEventMessageHa
 
     this.tracer = new OpenTelemetryTracer()
 
-    this.tracer.startSpan(this.serviceName, domainEvent.type)
+    let activeContext = undefined
+    if (domainEvent.meta.trace) {
+      activeContext = this.propagator.extract(domainEvent.meta.trace)
+    }
+
+    this.tracer.startSpan(this.serviceName, domainEvent.type, activeContext)
 
     try {
       await handler.handle(domainEvent)

+ 3 - 0
packages/domain-events-infra/src/Infra/index.ts

@@ -1,6 +1,8 @@
 export * from './DirectCall/DirectCallDomainEventPublisher'
 export * from './DirectCall/DirectCallEventMessageHandler'
 
+export * from './OpenTelemetry/OpenTelemetryPropagation'
+export * from './OpenTelemetry/OpenTelemetryPropagationInterface'
 export * from './OpenTelemetry/OpenTelemetrySDK'
 export * from './OpenTelemetry/OpenTelemetrySDKInterface'
 export * from './OpenTelemetry/OpenTelemetryTracer'
@@ -12,6 +14,7 @@ export * from './Redis/RedisDomainEventSubscriberFactory'
 export * from './Redis/RedisEventMessageHandler'
 
 export * from './SNS/SNSDomainEventPublisher'
+export * from './SNS/SNSOpenTelemetryDomainEventPublisher'
 
 export * from './SQS/SQSBounceNotificiationHandler'
 export * from './SQS/SQSDomainEventSubscriberFactory'

+ 4 - 0
packages/domain-events/src/Domain/Event/DomainEventInterface.ts

@@ -9,6 +9,10 @@ export interface DomainEventInterface {
       userIdentifier: string
       userIdentifierType: 'uuid' | 'email' | 'shared-vault-uuid'
     }
+    trace?: {
+      traceparent?: string
+      tracestate?: string
+    }
     origin: DomainEventService
     target?: DomainEventService
   }

+ 13 - 2
packages/files/src/Bootstrap/Container.ts

@@ -16,7 +16,9 @@ import { DomainEventFactory } from '../Domain/Event/DomainEventFactory'
 import {
   DirectCallDomainEventPublisher,
   DirectCallEventMessageHandler,
-  SNSDomainEventPublisher,
+  OpenTelemetryPropagation,
+  OpenTelemetryPropagationInterface,
+  SNSOpenTelemetryDomainEventPublisher,
   SQSDomainEventSubscriberFactory,
   SQSEventMessageHandler,
   SQSOpenTelemetryEventMessageHandler,
@@ -98,6 +100,10 @@ export class ContainerConfigLoader {
 
     container.bind<TimerInterface>(TYPES.Files_Timer).toConstantValue(new Timer())
 
+    container
+      .bind<OpenTelemetryPropagationInterface>(TYPES.Files_OTEL_PROPAGATOR)
+      .toConstantValue(new OpenTelemetryPropagation())
+
     // services
     container
       .bind<TokenDecoderInterface<ValetTokenData>>(TYPES.Files_ValetTokenDecoder)
@@ -176,7 +182,11 @@ export class ContainerConfigLoader {
       container
         .bind<DomainEventPublisherInterface>(TYPES.Files_DomainEventPublisher)
         .toConstantValue(
-          new SNSDomainEventPublisher(container.get(TYPES.Files_SNS), container.get(TYPES.Files_SNS_TOPIC_ARN)),
+          new SNSOpenTelemetryDomainEventPublisher(
+            container.get<OpenTelemetryPropagationInterface>(TYPES.Files_OTEL_PROPAGATOR),
+            container.get(TYPES.Files_SNS),
+            container.get(TYPES.Files_SNS_TOPIC_ARN),
+          ),
         )
     }
 
@@ -300,6 +310,7 @@ export class ContainerConfigLoader {
             ? new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Files_Logger))
             : new SQSOpenTelemetryEventMessageHandler(
                 ServiceIdentifier.NAMES.FilesWorker,
+                container.get<OpenTelemetryPropagationInterface>(TYPES.Files_OTEL_PROPAGATOR),
                 eventHandlers,
                 container.get(TYPES.Files_Logger),
               ),

+ 1 - 0
packages/files/src/Bootstrap/Types.ts

@@ -5,6 +5,7 @@ const TYPES = {
   Files_S3: Symbol.for('Files_S3'),
   Files_SNS: Symbol.for('Files_SNS'),
   Files_SQS: Symbol.for('Files_SQS'),
+  Files_OTEL_PROPAGATOR: Symbol.for('Files_OTEL_PROPAGATOR'),
 
   // use cases
   Files_UploadFileChunk: Symbol.for('Files_UploadFileChunk'),

+ 10 - 2
packages/revisions/src/Bootstrap/Container.ts

@@ -39,8 +39,10 @@ import {
   SQSDomainEventSubscriberFactory,
   DirectCallEventMessageHandler,
   DirectCallDomainEventPublisher,
-  SNSDomainEventPublisher,
   SQSOpenTelemetryEventMessageHandler,
+  OpenTelemetryPropagation,
+  SNSOpenTelemetryDomainEventPublisher,
+  OpenTelemetryPropagationInterface,
 } from '@standardnotes/domain-events-infra'
 import { DumpRepositoryInterface } from '../Domain/Dump/DumpRepositoryInterface'
 import { AccountDeletionRequestedEventHandler } from '../Domain/Handler/AccountDeletionRequestedEventHandler'
@@ -140,6 +142,10 @@ export class ContainerConfigLoader {
 
     container.bind<TimerInterface>(TYPES.Revisions_Timer).toDynamicValue(() => new Timer())
 
+    container
+      .bind<OpenTelemetryPropagationInterface>(TYPES.Revisions_OTEL_PROPAGATOR)
+      .toConstantValue(new OpenTelemetryPropagation())
+
     const appDataSource = new AppDataSource({ env, runMigrations: this.mode === 'server' })
     await appDataSource.initialize()
 
@@ -180,7 +186,8 @@ export class ContainerConfigLoader {
       container
         .bind<DomainEventPublisherInterface>(TYPES.Revisions_DomainEventPublisher)
         .toDynamicValue((context: interfaces.Context) => {
-          return new SNSDomainEventPublisher(
+          return new SNSOpenTelemetryDomainEventPublisher(
+            context.container.get<OpenTelemetryPropagationInterface>(TYPES.Revisions_OTEL_PROPAGATOR),
             context.container.get(TYPES.Revisions_SNS),
             context.container.get(TYPES.Revisions_SNS_TOPIC_ARN),
           )
@@ -515,6 +522,7 @@ export class ContainerConfigLoader {
             ? new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Revisions_Logger))
             : new SQSOpenTelemetryEventMessageHandler(
                 ServiceIdentifier.NAMES.RevisionsWorker,
+                container.get<OpenTelemetryPropagationInterface>(TYPES.Revisions_OTEL_PROPAGATOR),
                 eventHandlers,
                 container.get(TYPES.Revisions_Logger),
               ),

+ 1 - 0
packages/revisions/src/Bootstrap/Types.ts

@@ -6,6 +6,7 @@ const TYPES = {
   Revisions_SNS: Symbol.for('Revisions_SNS'),
   Revisions_S3: Symbol.for('Revisions_S3'),
   Revisions_Env: Symbol.for('Revisions_Env'),
+  Revisions_OTEL_PROPAGATOR: Symbol.for('Revisions_OTEL_PROPAGATOR'),
   // Map
   Revisions_SQLLegacyRevisionMetadataPersistenceMapper: Symbol.for(
     'Revisions_SQLLegacyRevisionMetadataPersistenceMapper',

+ 17 - 3
packages/scheduler/src/Bootstrap/Container.ts

@@ -6,6 +6,7 @@ import { Container } from 'inversify'
 import {
   DomainEventHandlerInterface,
   DomainEventMessageHandlerInterface,
+  DomainEventPublisherInterface,
   DomainEventSubscriberFactoryInterface,
 } from '@standardnotes/domain-events'
 
@@ -14,7 +15,9 @@ import TYPES from './Types'
 import { AppDataSource } from './DataSource'
 import { DomainEventFactory } from '../Domain/Event/DomainEventFactory'
 import {
-  SNSDomainEventPublisher,
+  OpenTelemetryPropagation,
+  OpenTelemetryPropagationInterface,
+  SNSOpenTelemetryDomainEventPublisher,
   SQSDomainEventSubscriberFactory,
   SQSOpenTelemetryEventMessageHandler,
 } from '@standardnotes/domain-events-infra'
@@ -65,6 +68,10 @@ export class ContainerConfigLoader {
     })
     container.bind<winston.Logger>(TYPES.Logger).toConstantValue(logger)
 
+    container
+      .bind<OpenTelemetryPropagationInterface>(TYPES.Scheduler_OTEL_PROPAGATOR)
+      .toConstantValue(new OpenTelemetryPropagation())
+
     if (env.get('SNS_TOPIC_ARN', true)) {
       const snsConfig: SNSClientConfig = {
         apiVersion: 'latest',
@@ -134,8 +141,14 @@ export class ContainerConfigLoader {
     container.bind<JobDoneInterpreterInterface>(TYPES.JobDoneInterpreter).to(JobDoneInterpreter)
 
     container
-      .bind<SNSDomainEventPublisher>(TYPES.DomainEventPublisher)
-      .toConstantValue(new SNSDomainEventPublisher(container.get(TYPES.SNS), container.get(TYPES.SNS_TOPIC_ARN)))
+      .bind<DomainEventPublisherInterface>(TYPES.DomainEventPublisher)
+      .toConstantValue(
+        new SNSOpenTelemetryDomainEventPublisher(
+          container.get<OpenTelemetryPropagationInterface>(TYPES.Scheduler_OTEL_PROPAGATOR),
+          container.get(TYPES.SNS),
+          container.get(TYPES.SNS_TOPIC_ARN),
+        ),
+      )
 
     const eventHandlers: Map<string, DomainEventHandlerInterface> = new Map([
       ['PREDICATE_VERIFIED', container.get(TYPES.PredicateVerifiedEventHandler)],
@@ -149,6 +162,7 @@ export class ContainerConfigLoader {
       .toConstantValue(
         new SQSOpenTelemetryEventMessageHandler(
           ServiceIdentifier.NAMES.SchedulerWorker,
+          container.get<OpenTelemetryPropagationInterface>(TYPES.Scheduler_OTEL_PROPAGATOR),
           eventHandlers,
           container.get(TYPES.Logger),
         ),

+ 1 - 0
packages/scheduler/src/Bootstrap/Types.ts

@@ -3,6 +3,7 @@ const TYPES = {
   Redis: Symbol.for('Redis'),
   SNS: Symbol.for('SNS'),
   SQS: Symbol.for('SQS'),
+  Scheduler_OTEL_PROPAGATOR: Symbol.for('Scheduler_OTEL_PROPAGATOR'),
   // env vars
   REDIS_URL: Symbol.for('REDIS_URL'),
   SNS_TOPIC_ARN: Symbol.for('SNS_TOPIC_ARN'),

+ 10 - 2
packages/syncing-server/src/Bootstrap/Container.ts

@@ -13,7 +13,9 @@ import { Item } from '../Domain/Item/Item'
 import {
   DirectCallDomainEventPublisher,
   DirectCallEventMessageHandler,
-  SNSDomainEventPublisher,
+  OpenTelemetryPropagation,
+  OpenTelemetryPropagationInterface,
+  SNSOpenTelemetryDomainEventPublisher,
   SQSDomainEventSubscriberFactory,
   SQSEventMessageHandler,
   SQSOpenTelemetryEventMessageHandler,
@@ -216,6 +218,10 @@ export class ContainerConfigLoader {
     }
     container.bind<winston.Logger>(TYPES.Sync_Logger).toConstantValue(logger)
 
+    container
+      .bind<OpenTelemetryPropagationInterface>(TYPES.Sync_OTEL_PROPAGATOR)
+      .toConstantValue(new OpenTelemetryPropagation())
+
     const appDataSource = new AppDataSource({ env, runMigrations: this.mode === 'server' })
     await appDataSource.initialize()
 
@@ -286,7 +292,8 @@ export class ContainerConfigLoader {
       container
         .bind<DomainEventPublisherInterface>(TYPES.Sync_DomainEventPublisher)
         .toDynamicValue((context: interfaces.Context) => {
-          return new SNSDomainEventPublisher(
+          return new SNSOpenTelemetryDomainEventPublisher(
+            context.container.get<OpenTelemetryPropagationInterface>(TYPES.Sync_OTEL_PROPAGATOR),
             context.container.get(TYPES.Sync_SNS),
             context.container.get(TYPES.Sync_SNS_TOPIC_ARN),
           )
@@ -1157,6 +1164,7 @@ export class ContainerConfigLoader {
             ? new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Sync_Logger))
             : new SQSOpenTelemetryEventMessageHandler(
                 ServiceIdentifier.NAMES.SyncingServerWorker,
+                container.get<OpenTelemetryPropagationInterface>(TYPES.Sync_OTEL_PROPAGATOR),
                 eventHandlers,
                 container.get(TYPES.Sync_Logger),
               ),

+ 1 - 0
packages/syncing-server/src/Bootstrap/Types.ts

@@ -6,6 +6,7 @@ const TYPES = {
   Sync_SQS: Symbol.for('Sync_SQS'),
   Sync_S3: Symbol.for('Sync_S3'),
   Sync_Env: Symbol.for('Sync_Env'),
+  Sync_OTEL_PROPAGATOR: Symbol.for('Sync_OTEL_PROPAGATOR'),
   // Repositories
   Sync_ItemRepositoryResolver: Symbol.for('Sync_ItemRepositoryResolver'),
   Sync_SQLItemRepository: Symbol.for('Sync_SQLItemRepository'),

+ 2 - 0
packages/websockets/src/Bootstrap/Container.ts

@@ -19,6 +19,7 @@ import { AddWebSocketsConnection } from '../Domain/UseCase/AddWebSocketsConnecti
 import { RemoveWebSocketsConnection } from '../Domain/UseCase/RemoveWebSocketsConnection/RemoveWebSocketsConnection'
 import { WebSocketsClientMessenger } from '../Infra/WebSockets/WebSocketsClientMessenger'
 import {
+  OpenTelemetryPropagation,
   SQSDomainEventSubscriberFactory,
   SQSOpenTelemetryEventMessageHandler,
 } from '@standardnotes/domain-events-infra'
@@ -147,6 +148,7 @@ export class ContainerConfigLoader {
       .toConstantValue(
         new SQSOpenTelemetryEventMessageHandler(
           ServiceIdentifier.NAMES.WebsocketsWorker,
+          new OpenTelemetryPropagation(),
           eventHandlers,
           container.get(TYPES.Logger),
         ),