Explorar o código

feat: add opentelemetry for scheduled tasks

Karol Sójko hai 1 ano
pai
achega
443235a861

+ 13 - 1
packages/analytics/bin/report.ts

@@ -1,8 +1,13 @@
 import 'reflect-metadata'
 
+import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra'
+import { EmailLevel, ServiceIdentifier } from '@standardnotes/domain-core'
+
+const sdk = new OpenTelemetrySDK(ServiceIdentifier.NAMES.AnalyticsScheduledTask)
+sdk.start()
+
 import { Logger } from 'winston'
 
-import { EmailLevel } from '@standardnotes/domain-core'
 import { DomainEventPublisherInterface } from '@standardnotes/domain-events'
 import { AnalyticsActivity } from '../src/Domain/Analytics/AnalyticsActivity'
 import { Period } from '../src/Domain/Time/Period'
@@ -270,6 +275,9 @@ void container.load().then((container) => {
 
   logger.info(`Sending report to following admins: ${adminEmails}`)
 
+  const tracer = new OpenTelemetryTracer()
+  tracer.startSpan(ServiceIdentifier.NAMES.AnalyticsScheduledTask, 'report')
+
   Promise.resolve(
     requestReport(
       analyticsStore,
@@ -285,11 +293,15 @@ void container.load().then((container) => {
     .then(() => {
       logger.info('Usage report generation complete')
 
+      tracer.stopSpan()
+
       process.exit(0)
     })
     .catch((error) => {
       logger.error(`Could not finish usage report generation: ${error.message}`)
 
+      tracer.stopSpanWithError(error)
+
       process.exit(1)
     })
 })

+ 13 - 0
packages/auth/bin/backup.ts

@@ -1,5 +1,11 @@
 import 'reflect-metadata'
 
+import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra'
+import { ServiceIdentifier } from '@standardnotes/domain-core'
+
+const sdk = new OpenTelemetrySDK(ServiceIdentifier.NAMES.AuthScheduledTask)
+sdk.start()
+
 import { Stream } from 'stream'
 
 import { Logger } from 'winston'
@@ -91,15 +97,22 @@ void container.load().then((container) => {
   const domainEventFactory: DomainEventFactoryInterface = container.get(TYPES.Auth_DomainEventFactory)
   const domainEventPublisher: DomainEventPublisherInterface = container.get(TYPES.Auth_DomainEventPublisher)
 
+  const tracer = new OpenTelemetryTracer()
+  tracer.startSpan(ServiceIdentifier.NAMES.AuthScheduledTask, 'backup')
+
   Promise.resolve(requestBackups(settingRepository, roleService, domainEventFactory, domainEventPublisher))
     .then(() => {
       logger.info(`${backupFrequency} ${backupProvider} backup requesting complete`)
 
+      tracer.stopSpan()
+
       process.exit(0)
     })
     .catch((error) => {
       logger.error(`Could not finish ${backupFrequency} ${backupProvider} backup requesting: ${error.message}`)
 
+      tracer.stopSpanWithError(error)
+
       process.exit(1)
     })
 })

+ 13 - 0
packages/auth/bin/cleanup.ts

@@ -1,5 +1,11 @@
 import 'reflect-metadata'
 
+import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra'
+import { ServiceIdentifier } from '@standardnotes/domain-core'
+
+const sdk = new OpenTelemetrySDK(ServiceIdentifier.NAMES.AuthScheduledTask)
+sdk.start()
+
 import { Logger } from 'winston'
 
 import { ContainerConfigLoader } from '../src/Bootstrap/Container'
@@ -30,15 +36,22 @@ void container.load().then((container) => {
   const cleanupSessionTraces: CleanupSessionTraces = container.get(TYPES.Auth_CleanupSessionTraces)
   const cleanupExpiredSessions: CleanupExpiredSessions = container.get(TYPES.Auth_CleanupExpiredSessions)
 
+  const tracer = new OpenTelemetryTracer()
+  tracer.startSpan(ServiceIdentifier.NAMES.AuthScheduledTask, 'cleanup')
+
   Promise.resolve(cleanup(cleanupSessionTraces, cleanupExpiredSessions))
     .then(() => {
       logger.info('Expired sessions and session traces cleaned.')
 
+      tracer.stopSpan()
+
       process.exit(0)
     })
     .catch((error) => {
       logger.error(`Could not clean sessions and session traces: ${error.message}`)
 
+      tracer.stopSpanWithError(error)
+
       process.exit(1)
     })
 })

+ 14 - 1
packages/auth/bin/stats.ts

@@ -1,5 +1,11 @@
 import 'reflect-metadata'
 
+import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra'
+import { ServiceIdentifier } from '@standardnotes/domain-core'
+
+const sdk = new OpenTelemetrySDK(ServiceIdentifier.NAMES.AuthScheduledTask)
+sdk.start()
+
 import { Logger } from 'winston'
 import { TimerInterface } from '@standardnotes/time'
 
@@ -15,11 +21,14 @@ void container.load().then((container) => {
 
   const logger: Logger = container.get(TYPES.Auth_Logger)
 
-  logger.info('Starting session traces cleanup')
+  logger.info('Starting statistics persistence...')
 
   const persistStats: PersistStatistics = container.get(TYPES.Auth_PersistStatistics)
   const timer: TimerInterface = container.get(TYPES.Auth_Timer)
 
+  const tracer = new OpenTelemetryTracer()
+  tracer.startSpan(ServiceIdentifier.NAMES.AuthScheduledTask, 'stats')
+
   Promise.resolve(
     persistStats.execute({
       sessionsInADay: timer.getUTCDateNDaysAgo(1),
@@ -28,11 +37,15 @@ void container.load().then((container) => {
     .then(() => {
       logger.info('Stats persisted.')
 
+      tracer.stopSpan()
+
       process.exit(0)
     })
     .catch((error) => {
       logger.error(`Could not persist stats: ${error.message}`)
 
+      tracer.stopSpanWithError(error)
+
       process.exit(1)
     })
 })

+ 13 - 1
packages/auth/bin/transition.ts

@@ -1,5 +1,11 @@
 import 'reflect-metadata'
 
+import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra'
+import { ServiceIdentifier, RoleName, TransitionStatus } from '@standardnotes/domain-core'
+
+const sdk = new OpenTelemetrySDK(ServiceIdentifier.NAMES.AuthScheduledTask)
+sdk.start()
+
 import { Logger } from 'winston'
 import * as dayjs from 'dayjs'
 import * as utc from 'dayjs/plugin/utc'
@@ -12,7 +18,6 @@ import { DomainEventFactoryInterface } from '../src/Domain/Event/DomainEventFact
 import { UserRepositoryInterface } from '../src/Domain/User/UserRepositoryInterface'
 import { TimerInterface } from '@standardnotes/time'
 import { TransitionStatusRepositoryInterface } from '../src/Domain/Transition/TransitionStatusRepositoryInterface'
-import { RoleName, TransitionStatus } from '@standardnotes/domain-core'
 
 const inputArgs = process.argv.slice(2)
 const startDateString = inputArgs[0]
@@ -124,6 +129,9 @@ void container.load().then((container) => {
     TYPES.Auth_TransitionStatusRepository,
   )
 
+  const tracer = new OpenTelemetryTracer()
+  tracer.startSpan(ServiceIdentifier.NAMES.AuthScheduledTask, 'transition')
+
   Promise.resolve(
     requestTransition(
       transitionStatusRepository,
@@ -137,6 +145,8 @@ void container.load().then((container) => {
     .then(() => {
       logger.info(`Finished transition request for users created between ${startDateString} and ${endDateString}`)
 
+      tracer.stopSpan()
+
       process.exit(0)
     })
     .catch((error) => {
@@ -144,6 +154,8 @@ void container.load().then((container) => {
         `Error while requesting transition for users created between ${startDateString} and ${endDateString}: ${error}`,
       )
 
+      tracer.stopSpanWithError(error)
+
       process.exit(1)
     })
 })

+ 13 - 1
packages/auth/bin/user_email_backup.ts

@@ -1,5 +1,11 @@
 import 'reflect-metadata'
 
+import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra'
+import { Email, ServiceIdentifier } from '@standardnotes/domain-core'
+
+const sdk = new OpenTelemetrySDK(ServiceIdentifier.NAMES.AuthScheduledTask)
+sdk.start()
+
 import { Logger } from 'winston'
 import * as dayjs from 'dayjs'
 import * as utc from 'dayjs/plugin/utc'
@@ -14,7 +20,6 @@ import { MuteFailedBackupsEmailsOption, SettingName } from '@standardnotes/setti
 import { RoleServiceInterface } from '../src/Domain/Role/RoleServiceInterface'
 import { PermissionName } from '@standardnotes/features'
 import { UserRepositoryInterface } from '../src/Domain/User/UserRepositoryInterface'
-import { Email } from '@standardnotes/domain-core'
 
 const inputArgs = process.argv.slice(2)
 const backupEmail = inputArgs[0]
@@ -80,17 +85,24 @@ void container.load().then((container) => {
   const domainEventFactory: DomainEventFactoryInterface = container.get(TYPES.Auth_DomainEventFactory)
   const domainEventPublisher: DomainEventPublisherInterface = container.get(TYPES.Auth_DomainEventPublisher)
 
+  const tracer = new OpenTelemetryTracer()
+  tracer.startSpan(ServiceIdentifier.NAMES.AuthScheduledTask, 'user_email_backup')
+
   Promise.resolve(
     requestBackups(userRepository, settingRepository, roleService, domainEventFactory, domainEventPublisher),
   )
     .then(() => {
       logger.info(`Email backup requesting complete for ${backupEmail}`)
 
+      tracer.stopSpan()
+
       process.exit(0)
     })
     .catch((error) => {
       logger.error(`Could not finish email backup requesting for ${backupEmail}: ${error.message}`)
 
+      tracer.stopSpanWithError(error)
+
       process.exit(1)
     })
 })

+ 3 - 0
packages/domain-core/src/Domain/Service/ServiceIdentifier.ts

@@ -5,9 +5,11 @@ import { ServiceIdentifierProps } from './ServiceIdentifierProps'
 export class ServiceIdentifier extends ValueObject<ServiceIdentifierProps> {
   static readonly NAMES = {
     AnalyticsWorker: 'AnalyticsWorker',
+    AnalyticsScheduledTask: 'AnalyticsScheduledTask',
     ApiGateway: 'ApiGateway',
     Auth: 'Auth',
     AuthWorker: 'AuthWorker',
+    AuthScheduledTask: 'AuthScheduledTask',
     SyncingServer: 'SyncingServer',
     SyncingServerWorker: 'SyncingServerWorker',
     Revisions: 'Revisions',
@@ -15,6 +17,7 @@ export class ServiceIdentifier extends ValueObject<ServiceIdentifierProps> {
     Files: 'Files',
     FilesWorker: 'FilesWorker',
     SchedulerWorker: 'SchedulerWorker',
+    SchedulerScheduledTask: 'SchedulerScheduledTask',
     Email: 'Email',
     EmailWorker: 'EmailWorker',
     Websockets: 'Websockets',

+ 42 - 0
packages/domain-events-infra/src/Infra/OpenTelemetry/OpenTelemetryTracer.ts

@@ -0,0 +1,42 @@
+import * as OpenTelemetryApi from '@opentelemetry/api'
+
+import { OpenTelemetryTracerInterface } from './OpenTelemetryTracerInterface'
+
+export class OpenTelemetryTracer implements OpenTelemetryTracerInterface {
+  private parentSpan: OpenTelemetryApi.Span | undefined
+  private internalSpan: OpenTelemetryApi.Span | undefined
+
+  startSpan(parentSpanName: string, internalSpanName: string): void {
+    const tracer = OpenTelemetryApi.trace.getTracer(`${parentSpanName}-handler`)
+
+    this.parentSpan = tracer.startSpan(parentSpanName, { kind: OpenTelemetryApi.SpanKind.CONSUMER })
+    const ctx = OpenTelemetryApi.trace.setSpan(OpenTelemetryApi.context.active(), this.parentSpan)
+
+    this.internalSpan = tracer.startSpan(internalSpanName, { kind: OpenTelemetryApi.SpanKind.INTERNAL }, ctx)
+  }
+
+  stopSpan(): void {
+    if (this.internalSpan) {
+      this.internalSpan.end()
+      this.internalSpan = undefined
+    }
+
+    if (this.parentSpan) {
+      this.parentSpan.end()
+      this.parentSpan = undefined
+    }
+  }
+
+  stopSpanWithError(error: Error): void {
+    if (this.internalSpan) {
+      this.internalSpan.recordException(error)
+      this.internalSpan.end()
+      this.internalSpan = undefined
+    }
+
+    if (this.parentSpan) {
+      this.parentSpan.end()
+      this.parentSpan = undefined
+    }
+  }
+}

+ 5 - 0
packages/domain-events-infra/src/Infra/OpenTelemetry/OpenTelemetryTracerInterface.ts

@@ -0,0 +1,5 @@
+export interface OpenTelemetryTracerInterface {
+  startSpan(parentSpanName: string, internalSpanName: string): void
+  stopSpan(): void
+  stopSpanWithError(error: Error): void
+}

+ 0 - 28
packages/domain-events-infra/src/Infra/SQS/SQSDomainEventSubscriberFactory.spec.ts

@@ -1,28 +0,0 @@
-import 'reflect-metadata'
-
-import { SQSDomainEventSubscriberFactory } from './SQSDomainEventSubscriberFactory'
-import { DomainEventMessageHandlerInterface } from '@standardnotes/domain-events'
-import { Consumer } from 'sqs-consumer'
-import { SQSClient } from '@aws-sdk/client-sqs'
-
-describe('SQSDomainEventSubscriberFactory', () => {
-  let sqs: SQSClient
-  const queueUrl = 'https://queue-url'
-  let domainEventMessageHandler: DomainEventMessageHandlerInterface
-
-  const createFactory = () => new SQSDomainEventSubscriberFactory(sqs, queueUrl, domainEventMessageHandler)
-
-  beforeEach(() => {
-    sqs = {} as jest.Mocked<SQSClient>
-
-    domainEventMessageHandler = {} as jest.Mocked<DomainEventMessageHandlerInterface>
-    domainEventMessageHandler.handleMessage = jest.fn()
-    domainEventMessageHandler.handleError = jest.fn()
-  })
-
-  it('should create a domain event subscriber', () => {
-    const subscriber = createFactory().create()
-
-    expect(subscriber).toBeInstanceOf(Consumer)
-  })
-})

+ 0 - 60
packages/domain-events-infra/src/Infra/SQS/SQSEventMessageHandler.spec.ts

@@ -1,60 +0,0 @@
-import 'reflect-metadata'
-
-import { Logger } from 'winston'
-
-import { DomainEventHandlerInterface } from '@standardnotes/domain-events'
-
-import { SQSEventMessageHandler } from './SQSEventMessageHandler'
-
-describe('SQSEventMessageHandler', () => {
-  let handler: DomainEventHandlerInterface
-  let handlers: Map<string, DomainEventHandlerInterface>
-  let logger: Logger
-
-  const createHandler = () => new SQSEventMessageHandler(handlers, logger)
-
-  beforeEach(() => {
-    handler = {} as jest.Mocked<DomainEventHandlerInterface>
-    handler.handle = jest.fn()
-
-    handlers = new Map([['TEST', handler]])
-
-    logger = {} as jest.Mocked<Logger>
-    logger.debug = jest.fn()
-    logger.error = jest.fn()
-  })
-
-  it('should handle messages', async () => {
-    const sqsMessage = `{
-      "Message" : "eJyrViqpLEhVslIKcQ0OUdJRKkiszMlPTFGyqlZKy88HiiclFinV6iglF6UmlqSmOJYAhQwtzQ10DQyBKMTAwAqM9AwMDKOUagGlWhXt"
-    }`
-
-    await createHandler().handleMessage(sqsMessage)
-
-    expect(handler.handle).toHaveBeenCalledWith({
-      payload: {
-        foo: 'bar',
-      },
-      type: 'TEST',
-      createdAt: new Date(1),
-    })
-  })
-
-  it('should handle errors', async () => {
-    await createHandler().handleError(new Error('test'))
-
-    expect(logger.error).toHaveBeenCalled()
-  })
-
-  it('should tell if there is no handler for an event', async () => {
-    const sqsMessage = `{
-      "Message" : "eJyrViqpLEhVslIKcQ0OMVLSUSpIrMzJT0xRsqpWSsvPB0okJRYp1dYCAABHDLY="
-    }`
-
-    await createHandler().handleMessage(sqsMessage)
-
-    expect(logger.debug).toHaveBeenCalledWith('Event handler for event type TEST2 does not exist')
-
-    expect(handler.handle).not.toHaveBeenCalled()
-  })
-})

+ 12 - 25
packages/domain-events-infra/src/Infra/SQS/SQSOpenTelemetryEventMessageHandler.ts

@@ -1,15 +1,15 @@
 import { Logger } from 'winston'
 import * as zlib from 'zlib'
-import * as OpenTelemetryApi from '@opentelemetry/api'
 import {
   DomainEventHandlerInterface,
   DomainEventInterface,
   DomainEventMessageHandlerInterface,
 } from '@standardnotes/domain-events'
+import { OpenTelemetryTracer } from '../OpenTelemetry/OpenTelemetryTracer'
+import { OpenTelemetryTracerInterface } from '../OpenTelemetry/OpenTelemetryTracerInterface'
 
 export class SQSOpenTelemetryEventMessageHandler implements DomainEventMessageHandlerInterface {
-  private currentSpan: OpenTelemetryApi.Span | undefined
-  private internalSpan: OpenTelemetryApi.Span | undefined
+  private tracer: OpenTelemetryTracerInterface | undefined
 
   constructor(
     private serviceName: string,
@@ -35,35 +35,22 @@ export class SQSOpenTelemetryEventMessageHandler implements DomainEventMessageHa
 
     this.logger.debug(`Received event: ${domainEvent.type}`)
 
-    const tracer = OpenTelemetryApi.trace.getTracer('sqs-handler')
+    this.tracer = new OpenTelemetryTracer()
 
-    this.currentSpan = tracer.startSpan(this.serviceName, { kind: OpenTelemetryApi.SpanKind.CONSUMER })
-    const ctx = OpenTelemetryApi.trace.setSpan(OpenTelemetryApi.context.active(), this.currentSpan)
+    this.tracer.startSpan(this.serviceName, domainEvent.type)
 
-    this.internalSpan = tracer.startSpan(domainEvent.type, { kind: OpenTelemetryApi.SpanKind.INTERNAL }, ctx)
+    try {
+      await handler.handle(domainEvent)
+    } catch (error) {
+      this.tracer.stopSpanWithError(error as Error)
 
-    await handler.handle(domainEvent)
-
-    this.internalSpan.end()
-
-    this.currentSpan.end()
+      throw error
+    }
 
-    this.internalSpan = undefined
-    this.currentSpan = undefined
+    this.tracer.stopSpan()
   }
 
   async handleError(error: Error): Promise<void> {
-    if (this.internalSpan) {
-      this.internalSpan.recordException(error)
-      this.internalSpan.end()
-      this.internalSpan = undefined
-    }
-
-    if (this.currentSpan) {
-      this.currentSpan.end()
-      this.currentSpan = undefined
-    }
-
     this.logger.error('Error occured while handling SQS message: %O', error)
   }
 }

+ 2 - 0
packages/domain-events-infra/src/Infra/index.ts

@@ -3,6 +3,8 @@ export * from './DirectCall/DirectCallEventMessageHandler'
 
 export * from './OpenTelemetry/OpenTelemetrySDK'
 export * from './OpenTelemetry/OpenTelemetrySDKInterface'
+export * from './OpenTelemetry/OpenTelemetryTracer'
+export * from './OpenTelemetry/OpenTelemetryTracerInterface'
 
 export * from './Redis/RedisDomainEventPublisher'
 export * from './Redis/RedisDomainEventSubscriber'

+ 13 - 0
packages/scheduler/bin/verify.ts

@@ -1,5 +1,11 @@
 import 'reflect-metadata'
 
+import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra'
+import { ServiceIdentifier } from '@standardnotes/domain-core'
+
+const sdk = new OpenTelemetrySDK(ServiceIdentifier.NAMES.SchedulerScheduledTask)
+sdk.start()
+
 import { Logger } from 'winston'
 import * as dayjs from 'dayjs'
 import * as utc from 'dayjs/plugin/utc'
@@ -29,15 +35,22 @@ void container.load().then((container) => {
 
   const verifyPredicates: VerifyPredicates = container.get(TYPES.VerifyPredicates)
 
+  const tracer = new OpenTelemetryTracer()
+  tracer.startSpan(ServiceIdentifier.NAMES.SchedulerScheduledTask, 'verify')
+
   Promise.resolve(verifyJobs(now, verifyPredicates))
     .then(() => {
       logger.info('Verification of overdue jobs complete.')
 
+      tracer.stopSpan()
+
       process.exit(0)
     })
     .catch((error) => {
       logger.error(`Could not finish verification of overdue jobs: ${error.message}`)
 
+      tracer.stopSpanWithError(error)
+
       process.exit(1)
     })
 })