Selaa lähdekoodia

feat: add custom tracing on workers

Karol Sójko 1 vuosi sitten
vanhempi
commit
65ced2cc7b

+ 0 - 4
.github/workflows/common-deploy.yml

@@ -38,10 +38,6 @@ jobs:
       run: |
         jq '(.containerDefinitions[] | select(.name=="${{ inputs.service_name }}-prod") | .environment[] | select(.name=="VERSION")).value = "${{ github.sha }}"' task-definition.json > tmp.json && mv tmp.json task-definition.json
 
-    - name: Fill in the new package version in the Amazon ECS task definition
-      run: |
-        jq '(.containerDefinitions[] | select(.name=="${{ inputs.service_name }}-prod") | .environment[] | select(.name=="npm_package_version")).value = "${{ github.sha }}"' task-definition.json > tmp.json && mv tmp.json task-definition.json
-
     - name: Fill in the new image ID in the Amazon ECS task definition
       id: task-def-prod
       uses: aws-actions/amazon-ecs-render-task-definition@v1

BIN
.yarn/cache/@opentelemetry-api-npm-1.6.0-58fdf34ce1-b8daefad2c.zip


+ 6 - 0
packages/auth/bin/worker.ts

@@ -8,6 +8,7 @@ import { Env } from '../src/Bootstrap/Env'
 import { DomainEventSubscriberFactoryInterface } from '@standardnotes/domain-events'
 import * as dayjs from 'dayjs'
 import * as utc from 'dayjs/plugin/utc'
+import { OpenTelemetrySDKInterface } from '@standardnotes/domain-events-infra'
 
 const container = new ContainerConfigLoader('worker')
 void container.load().then((container) => {
@@ -20,6 +21,11 @@ void container.load().then((container) => {
 
   logger.info('Starting worker...')
 
+  if (!container.get<boolean>(TYPES.Auth_IS_CONFIGURED_FOR_HOME_SERVER_OR_SELF_HOSTING)) {
+    const openTelemetrySDK = container.get<OpenTelemetrySDKInterface>(TYPES.Auth_OpenTelemetrySDK)
+    openTelemetrySDK.start()
+  }
+
   const subscriberFactory: DomainEventSubscriberFactoryInterface = container.get(
     TYPES.Auth_DomainEventSubscriberFactory,
   )

+ 11 - 2
packages/auth/src/Bootstrap/Container.ts

@@ -94,6 +94,7 @@ import {
   SNSDomainEventPublisher,
   SQSDomainEventSubscriberFactory,
   SQSEventMessageHandler,
+  SQSOpenTelemetryEventMessageHandler,
 } from '@standardnotes/domain-events-infra'
 import { GetUserSubscription } from '../Domain/UseCase/GetUserSubscription/GetUserSubscription'
 import { ChangeCredentials } from '../Domain/UseCase/ChangeCredentials/ChangeCredentials'
@@ -333,7 +334,11 @@ export class ContainerConfigLoader {
     if (!isConfiguredForHomeServerOrSelfHosting) {
       container
         .bind<OpenTelemetrySDKInterface>(TYPES.Auth_OpenTelemetrySDK)
-        .toConstantValue(new OpenTelemetrySDK(ServiceIdentifier.NAMES.Auth))
+        .toConstantValue(
+          new OpenTelemetrySDK(
+            this.mode === 'server' ? ServiceIdentifier.NAMES.Auth : ServiceIdentifier.NAMES.AuthWorker,
+          ),
+        )
     }
 
     if (!isConfiguredForInMemoryCache) {
@@ -1238,7 +1243,11 @@ export class ContainerConfigLoader {
     } else {
       container
         .bind<DomainEventMessageHandlerInterface>(TYPES.Auth_DomainEventMessageHandler)
-        .toConstantValue(new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Auth_Logger)))
+        .toConstantValue(
+          isConfiguredForHomeServerOrSelfHosting
+            ? new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Auth_Logger))
+            : new SQSOpenTelemetryEventMessageHandler(eventHandlers, container.get(TYPES.Auth_Logger)),
+        )
 
       container
         .bind<DomainEventSubscriberFactoryInterface>(TYPES.Auth_DomainEventSubscriberFactory)

+ 56 - 0
packages/domain-events-infra/src/Infra/SQS/SQSOpenTelemetryEventMessageHandler.ts

@@ -0,0 +1,56 @@
+import { Logger } from 'winston'
+import * as zlib from 'zlib'
+import * as OpenTelemetryApi from '@opentelemetry/api'
+import {
+  DomainEventHandlerInterface,
+  DomainEventInterface,
+  DomainEventMessageHandlerInterface,
+} from '@standardnotes/domain-events'
+
+export class SQSOpenTelemetryEventMessageHandler implements DomainEventMessageHandlerInterface {
+  private currentSpan: OpenTelemetryApi.Span | undefined
+
+  constructor(
+    private handlers: Map<string, DomainEventHandlerInterface>,
+    private logger: Logger,
+  ) {}
+
+  async handleMessage(message: string): Promise<void> {
+    const messageParsed = JSON.parse(message)
+
+    const domainEventJson = zlib.unzipSync(Buffer.from(messageParsed.Message, 'base64')).toString()
+
+    const domainEvent: DomainEventInterface = JSON.parse(domainEventJson)
+
+    domainEvent.createdAt = new Date(domainEvent.createdAt)
+
+    const handler = this.handlers.get(domainEvent.type)
+    if (!handler) {
+      this.logger.debug(`Event handler for event type ${domainEvent.type} does not exist`)
+
+      return
+    }
+
+    this.logger.debug(`Received event: ${domainEvent.type}`)
+
+    const tracer = OpenTelemetryApi.trace.getTracer('sqs-handler')
+
+    this.currentSpan = tracer.startSpan(domainEvent.type)
+
+    await handler.handle(domainEvent)
+
+    this.currentSpan.end()
+
+    this.currentSpan = undefined
+  }
+
+  async handleError(error: Error): Promise<void> {
+    if (this.currentSpan) {
+      this.currentSpan.recordException(error)
+      this.currentSpan.end()
+      this.currentSpan = undefined
+    }
+
+    this.logger.error('Error occured while handling SQS message: %O', error)
+  }
+}

+ 1 - 0
packages/domain-events-infra/src/Infra/index.ts

@@ -15,3 +15,4 @@ export * from './SQS/SQSNewRelicBounceNotificiationHandler'
 export * from './SQS/SQSDomainEventSubscriberFactory'
 export * from './SQS/SQSEventMessageHandler'
 export * from './SQS/SQSNewRelicEventMessageHandler'
+export * from './SQS/SQSOpenTelemetryEventMessageHandler'

+ 7 - 1
packages/files/bin/worker.ts

@@ -8,8 +8,9 @@ import { Env } from '../src/Bootstrap/Env'
 import { DomainEventSubscriberFactoryInterface } from '@standardnotes/domain-events'
 import * as dayjs from 'dayjs'
 import * as utc from 'dayjs/plugin/utc'
+import { OpenTelemetrySDKInterface } from '@standardnotes/domain-events-infra'
 
-const container = new ContainerConfigLoader()
+const container = new ContainerConfigLoader('worker')
 void container.load().then((container) => {
   dayjs.extend(utc)
 
@@ -20,6 +21,11 @@ void container.load().then((container) => {
 
   logger.info('Starting worker...')
 
+  if (!container.get<boolean>(TYPES.Files_IS_CONFIGURED_FOR_HOME_SERVER_OR_SELF_HOSTING)) {
+    const openTelemetrySDK = container.get<OpenTelemetrySDKInterface>(TYPES.Files_OpenTelemetrySDK)
+    openTelemetrySDK.start()
+  }
+
   const subscriberFactory: DomainEventSubscriberFactoryInterface = container.get(
     TYPES.Files_DomainEventSubscriberFactory,
   )

+ 13 - 2
packages/files/src/Bootstrap/Container.ts

@@ -21,6 +21,7 @@ import {
   SNSDomainEventPublisher,
   SQSDomainEventSubscriberFactory,
   SQSEventMessageHandler,
+  SQSOpenTelemetryEventMessageHandler,
 } from '@standardnotes/domain-events-infra'
 import { StreamDownloadFile } from '../Domain/UseCase/StreamDownloadFile/StreamDownloadFile'
 import { FileDownloaderInterface } from '../Domain/Services/FileDownloaderInterface'
@@ -57,6 +58,8 @@ import { SharedVaultValetTokenAuthMiddleware } from '../Infra/InversifyExpress/M
 import { ServiceIdentifier } from '@standardnotes/domain-core'
 
 export class ContainerConfigLoader {
+  constructor(private mode: 'server' | 'worker' = 'server') {}
+
   async load(configuration?: {
     directCallDomainEventPublisher?: DirectCallDomainEventPublisher
     logger?: Transform
@@ -96,7 +99,11 @@ export class ContainerConfigLoader {
     if (!isConfiguredForHomeServerOrSelfHosting) {
       container
         .bind<OpenTelemetrySDKInterface>(TYPES.Files_OpenTelemetrySDK)
-        .toConstantValue(new OpenTelemetrySDK(ServiceIdentifier.NAMES.Files))
+        .toConstantValue(
+          new OpenTelemetrySDK(
+            this.mode === 'server' ? ServiceIdentifier.NAMES.Files : ServiceIdentifier.NAMES.FilesWorker,
+          ),
+        )
     }
 
     let logger: winston.Logger
@@ -306,7 +313,11 @@ export class ContainerConfigLoader {
     } else {
       container
         .bind<DomainEventMessageHandlerInterface>(TYPES.Files_DomainEventMessageHandler)
-        .toConstantValue(new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Files_Logger)))
+        .toConstantValue(
+          isConfiguredForHomeServerOrSelfHosting
+            ? new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Files_Logger))
+            : new SQSOpenTelemetryEventMessageHandler(eventHandlers, container.get(TYPES.Files_Logger)),
+        )
       container
         .bind<DomainEventSubscriberFactoryInterface>(TYPES.Files_DomainEventSubscriberFactory)
         .toConstantValue(

+ 6 - 0
packages/revisions/bin/worker.ts

@@ -6,6 +6,7 @@ import TYPES from '../src/Bootstrap/Types'
 import { Env } from '../src/Bootstrap/Env'
 import { DomainEventSubscriberFactoryInterface } from '@standardnotes/domain-events'
 import { ContainerConfigLoader } from '../src/Bootstrap/Container'
+import { OpenTelemetrySDKInterface } from '@standardnotes/domain-events-infra'
 
 const container = new ContainerConfigLoader('worker')
 void container.load().then((container) => {
@@ -16,6 +17,11 @@ void container.load().then((container) => {
 
   logger.info('Starting worker...')
 
+  if (!container.get<boolean>(TYPES.Revisions_IS_CONFIGURED_FOR_HOME_SERVER_OR_SELF_HOSTING)) {
+    const openTelemetrySDK = container.get<OpenTelemetrySDKInterface>(TYPES.Revisions_OpenTelemetrySDK)
+    openTelemetrySDK.start()
+  }
+
   const subscriberFactory: DomainEventSubscriberFactoryInterface = container.get(
     TYPES.Revisions_DomainEventSubscriberFactory,
   )

+ 11 - 2
packages/revisions/src/Bootstrap/Container.ts

@@ -42,6 +42,7 @@ import {
   SNSDomainEventPublisher,
   OpenTelemetrySDKInterface,
   OpenTelemetrySDK,
+  SQSOpenTelemetryEventMessageHandler,
 } from '@standardnotes/domain-events-infra'
 import { DumpRepositoryInterface } from '../Domain/Dump/DumpRepositoryInterface'
 import { AccountDeletionRequestedEventHandler } from '../Domain/Handler/AccountDeletionRequestedEventHandler'
@@ -161,7 +162,11 @@ export class ContainerConfigLoader {
     if (!isConfiguredForHomeServerOrSelfHosting) {
       container
         .bind<OpenTelemetrySDKInterface>(TYPES.Revisions_OpenTelemetrySDK)
-        .toConstantValue(new OpenTelemetrySDK(ServiceIdentifier.NAMES.Revisions))
+        .toConstantValue(
+          new OpenTelemetrySDK(
+            this.mode === 'server' ? ServiceIdentifier.NAMES.Revisions : ServiceIdentifier.NAMES.RevisionsWorker,
+          ),
+        )
     }
 
     if (!isConfiguredForHomeServer) {
@@ -525,7 +530,11 @@ export class ContainerConfigLoader {
     } else {
       container
         .bind<DomainEventMessageHandlerInterface>(TYPES.Revisions_DomainEventMessageHandler)
-        .toConstantValue(new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Revisions_Logger)))
+        .toConstantValue(
+          isConfiguredForHomeServerOrSelfHosting
+            ? new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Revisions_Logger))
+            : new SQSOpenTelemetryEventMessageHandler(eventHandlers, container.get(TYPES.Revisions_Logger)),
+        )
 
       container
         .bind<DomainEventSubscriberFactoryInterface>(TYPES.Revisions_DomainEventSubscriberFactory)

+ 6 - 0
packages/syncing-server/bin/worker.ts

@@ -6,6 +6,7 @@ import TYPES from '../src/Bootstrap/Types'
 import { Env } from '../src/Bootstrap/Env'
 import { DomainEventSubscriberFactoryInterface } from '@standardnotes/domain-events'
 import { ContainerConfigLoader } from '../src/Bootstrap/Container'
+import { OpenTelemetrySDKInterface } from '@standardnotes/domain-events-infra'
 
 const container = new ContainerConfigLoader('worker')
 void container.load().then((container) => {
@@ -16,6 +17,11 @@ void container.load().then((container) => {
 
   logger.info('Starting worker...')
 
+  if (!container.get<boolean>(TYPES.Sync_IS_CONFIGURED_FOR_HOME_SERVER_OR_SELF_HOSTING)) {
+    const openTelemetrySDK = container.get<OpenTelemetrySDKInterface>(TYPES.Sync_OpenTelemetrySDK)
+    openTelemetrySDK.start()
+  }
+
   const subscriberFactory: DomainEventSubscriberFactoryInterface = container.get(
     TYPES.Sync_DomainEventSubscriberFactory,
   )

+ 13 - 2
packages/syncing-server/src/Bootstrap/Container.ts

@@ -18,6 +18,7 @@ import {
   SNSDomainEventPublisher,
   SQSDomainEventSubscriberFactory,
   SQSEventMessageHandler,
+  SQSOpenTelemetryEventMessageHandler,
 } from '@standardnotes/domain-events-infra'
 import { DomainEventFactoryInterface } from '../Domain/Event/DomainEventFactoryInterface'
 import { DomainEventFactory } from '../Domain/Event/DomainEventFactory'
@@ -244,7 +245,13 @@ export class ContainerConfigLoader {
     if (!isConfiguredForHomeServerOrSelfHosting) {
       container
         .bind<OpenTelemetrySDKInterface>(TYPES.Sync_OpenTelemetrySDK)
-        .toConstantValue(new OpenTelemetrySDK(ServiceIdentifier.NAMES.SyncingServer))
+        .toConstantValue(
+          new OpenTelemetrySDK(
+            this.mode === 'server'
+              ? ServiceIdentifier.NAMES.SyncingServer
+              : ServiceIdentifier.NAMES.SyncingServerWorker,
+          ),
+        )
     }
 
     if (!isConfiguredForInMemoryCache) {
@@ -1168,7 +1175,11 @@ export class ContainerConfigLoader {
     } else {
       container
         .bind<DomainEventMessageHandlerInterface>(TYPES.Sync_DomainEventMessageHandler)
-        .toConstantValue(new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Sync_Logger)))
+        .toConstantValue(
+          isConfiguredForHomeServerOrSelfHosting
+            ? new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Sync_Logger))
+            : new SQSOpenTelemetryEventMessageHandler(eventHandlers, container.get(TYPES.Sync_Logger)),
+        )
     }
 
     container

+ 4 - 0
packages/websockets/bin/worker.ts

@@ -6,6 +6,7 @@ import { ContainerConfigLoader } from '../src/Bootstrap/Container'
 import TYPES from '../src/Bootstrap/Types'
 import { Env } from '../src/Bootstrap/Env'
 import { DomainEventSubscriberFactoryInterface } from '@standardnotes/domain-events'
+import { OpenTelemetrySDKInterface } from '@standardnotes/domain-events-infra'
 
 const container = new ContainerConfigLoader()
 void container.load().then((container) => {
@@ -16,6 +17,9 @@ void container.load().then((container) => {
 
   logger.info('Starting worker...')
 
+  const openTelemetrySDK = container.get<OpenTelemetrySDKInterface>(TYPES.WebSockets_OpenTelemetrySDK)
+  openTelemetrySDK.start()
+
   const subscriberFactory: DomainEventSubscriberFactoryInterface = container.get(TYPES.DomainEventSubscriberFactory)
   subscriberFactory.create().start()
 })

+ 9 - 3
packages/websockets/src/Bootstrap/Container.ts

@@ -22,7 +22,7 @@ import {
   OpenTelemetrySDK,
   OpenTelemetrySDKInterface,
   SQSDomainEventSubscriberFactory,
-  SQSEventMessageHandler,
+  SQSOpenTelemetryEventMessageHandler,
 } from '@standardnotes/domain-events-infra'
 import { ApiGatewayAuthMiddleware } from '../Controller/ApiGatewayAuthMiddleware'
 
@@ -42,6 +42,8 @@ import { WebSocketMessageRequestedEventHandler } from '../Domain/Handler/WebSock
 import { ServiceIdentifier } from '@standardnotes/domain-core'
 
 export class ContainerConfigLoader {
+  constructor(private mode: 'server' | 'worker' = 'server') {}
+
   async load(): Promise<Container> {
     const env: Env = new Env()
     env.load()
@@ -50,7 +52,11 @@ export class ContainerConfigLoader {
 
     container
       .bind<OpenTelemetrySDKInterface>(TYPES.WebSockets_OpenTelemetrySDK)
-      .toConstantValue(new OpenTelemetrySDK(ServiceIdentifier.NAMES.Websockets))
+      .toConstantValue(
+        new OpenTelemetrySDK(
+          this.mode === 'server' ? ServiceIdentifier.NAMES.Websockets : ServiceIdentifier.NAMES.WebsocketsWorker,
+        ),
+      )
 
     const redisUrl = env.get('REDIS_URL')
     const isRedisInClusterMode = redisUrl.indexOf(',') > 0
@@ -158,7 +164,7 @@ export class ContainerConfigLoader {
 
     container
       .bind<DomainEventMessageHandlerInterface>(TYPES.DomainEventMessageHandler)
-      .toConstantValue(new SQSEventMessageHandler(eventHandlers, container.get(TYPES.Logger)))
+      .toConstantValue(new SQSOpenTelemetryEventMessageHandler(eventHandlers, container.get(TYPES.Logger)))
     container
       .bind<DomainEventSubscriberFactoryInterface>(TYPES.DomainEventSubscriberFactory)
       .toConstantValue(