diff --git a/packages/analytics/bin/report.ts b/packages/analytics/bin/report.ts index cd45e80f7..55c7036cc 100644 --- a/packages/analytics/bin/report.ts +++ b/packages/analytics/bin/report.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra' -import { EmailLevel, ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.AnalyticsScheduledTask }) -sdk.start() - import { Logger } from 'winston' import { DomainEventPublisherInterface } from '@standardnotes/domain-events' @@ -22,6 +16,7 @@ import { CalculateMonthlyRecurringRevenue } from '../src/Domain/UseCase/Calculat import { getBody, getSubject } from '../src/Domain/Email/DailyAnalyticsReport' import { TimerInterface } from '@standardnotes/time' import { StatisticMeasureName } from '../src/Domain/Statistics/StatisticMeasureName' +import { EmailLevel } from '@standardnotes/domain-core' const requestReport = async ( analyticsStore: AnalyticsStoreInterface, @@ -275,9 +270,6 @@ void container.load().then((container) => { logger.info(`Sending report to following admins: ${adminEmails}`) - const tracer = new OpenTelemetryTracer() - tracer.startSpan(ServiceIdentifier.NAMES.AnalyticsScheduledTask, 'report') - Promise.resolve( requestReport( analyticsStore, @@ -293,15 +285,11 @@ void container.load().then((container) => { .then(() => { logger.info('Usage report generation complete') - tracer.stopSpan() - process.exit(0) }) .catch((error) => { logger.error(`Could not finish usage report generation: ${error.message}`) - tracer.stopSpanWithError(error) - process.exit(1) }) }) diff --git a/packages/analytics/bin/worker.ts b/packages/analytics/bin/worker.ts index ba213113f..a09406d9b 100644 --- a/packages/analytics/bin/worker.ts +++ b/packages/analytics/bin/worker.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.AnalyticsWorker }) -sdk.start() - import { Logger } from 'winston' import { DomainEventSubscriberInterface } from '@standardnotes/domain-events' import * as dayjs from 'dayjs' diff --git a/packages/analytics/src/Bootstrap/Container.ts b/packages/analytics/src/Bootstrap/Container.ts index 7494ac030..1dd56ac51 100644 --- a/packages/analytics/src/Bootstrap/Container.ts +++ b/packages/analytics/src/Bootstrap/Container.ts @@ -7,7 +7,7 @@ import { DomainEventPublisherInterface, DomainEventSubscriberInterface, } from '@standardnotes/domain-events' -import { MapperInterface, ServiceIdentifier } from '@standardnotes/domain-core' +import { MapperInterface } from '@standardnotes/domain-core' // eslint-disable-next-line @typescript-eslint/no-var-requires const Mixpanel = require('mixpanel') @@ -16,9 +16,9 @@ import TYPES from './Types' import { AppDataSource } from './DataSource' import { DomainEventFactory } from '../Domain/Event/DomainEventFactory' import { - SNSOpenTelemetryDomainEventPublisher, + SNSDomainEventPublisher, + SQSDomainEventSubscriber, SQSEventMessageHandler, - SQSOpenTelemetryDomainEventSubscriber, } from '@standardnotes/domain-events-infra' import { Timer, TimerInterface } from '@standardnotes/time' import { PeriodKeyGeneratorInterface } from '../Domain/Time/PeriodKeyGeneratorInterface' @@ -139,9 +139,7 @@ export class ContainerConfigLoader { container .bind(TYPES.DomainEventPublisher) - .toConstantValue( - new SNSOpenTelemetryDomainEventPublisher(container.get(TYPES.SNS), container.get(TYPES.SNS_TOPIC_ARN)), - ) + .toConstantValue(new SNSDomainEventPublisher(container.get(TYPES.SNS), container.get(TYPES.SNS_TOPIC_ARN))) if (env.get('MIXPANEL_TOKEN', true)) { container.bind(TYPES.MixpanelClient).toConstantValue(Mixpanel.init(env.get('MIXPANEL_TOKEN', true))) } @@ -242,8 +240,7 @@ export class ContainerConfigLoader { container .bind(TYPES.DomainEventSubscriber) .toConstantValue( - new SQSOpenTelemetryDomainEventSubscriber( - ServiceIdentifier.NAMES.AnalyticsWorker, + new SQSDomainEventSubscriber( container.get(TYPES.SQS), container.get(TYPES.SQS_QUEUE_URL), container.get(TYPES.DomainEventMessageHandler), diff --git a/packages/api-gateway/bin/server.ts b/packages/api-gateway/bin/server.ts index a2f67802b..07e169f42 100644 --- a/packages/api-gateway/bin/server.ts +++ b/packages/api-gateway/bin/server.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.ApiGateway }) -sdk.start() - import '../src/Controller/LegacyController' import '../src/Controller/HealthCheckController' diff --git a/packages/auth/bin/backup.ts b/packages/auth/bin/backup.ts index 542417d7d..0cc871834 100644 --- a/packages/auth/bin/backup.ts +++ b/packages/auth/bin/backup.ts @@ -1,10 +1,6 @@ import 'reflect-metadata' -import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier, SettingName } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.AuthScheduledTask }) -sdk.start() +import { SettingName } from '@standardnotes/domain-core' import { Stream } from 'stream' @@ -106,24 +102,17 @@ void container.load().then((container) => { const domainEventPublisher: DomainEventPublisherInterface = container.get(TYPES.Auth_DomainEventPublisher) const getUserKeyParamsUseCase: GetUserKeyParams = container.get(TYPES.Auth_GetUserKeyParams) - const tracer = new OpenTelemetryTracer() - tracer.startSpan(ServiceIdentifier.NAMES.AuthScheduledTask, 'backup') - Promise.resolve( requestBackups(settingRepository, roleService, domainEventFactory, domainEventPublisher, getUserKeyParamsUseCase), ) .then(() => { logger.info(`${backupFrequency} ${backupProvider} backup requesting complete`) - tracer.stopSpan() - process.exit(0) }) .catch((error) => { logger.error(`Could not finish ${backupFrequency} ${backupProvider} backup requesting: ${error.message}`) - tracer.stopSpanWithError(error) - process.exit(1) }) }) diff --git a/packages/auth/bin/cleanup.ts b/packages/auth/bin/cleanup.ts index 0644309a8..daa1b35a3 100644 --- a/packages/auth/bin/cleanup.ts +++ b/packages/auth/bin/cleanup.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.AuthScheduledTask }) -sdk.start() - import { Logger } from 'winston' import { ContainerConfigLoader } from '../src/Bootstrap/Container' @@ -36,22 +30,15 @@ void container.load().then((container) => { const cleanupSessionTraces: CleanupSessionTraces = container.get(TYPES.Auth_CleanupSessionTraces) const cleanupExpiredSessions: CleanupExpiredSessions = container.get(TYPES.Auth_CleanupExpiredSessions) - const tracer = new OpenTelemetryTracer() - tracer.startSpan(ServiceIdentifier.NAMES.AuthScheduledTask, 'cleanup') - Promise.resolve(cleanup(cleanupSessionTraces, cleanupExpiredSessions)) .then(() => { logger.info('Expired sessions and session traces cleaned.') - tracer.stopSpan() - process.exit(0) }) .catch((error) => { logger.error(`Could not clean sessions and session traces: ${error.message}`) - tracer.stopSpanWithError(error) - process.exit(1) }) }) diff --git a/packages/auth/bin/server.ts b/packages/auth/bin/server.ts index 263a97b23..39d27a297 100644 --- a/packages/auth/bin/server.ts +++ b/packages/auth/bin/server.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.Auth }) -sdk.start() - import '../src/Infra/InversifyExpressUtils/AnnotatedAuthController' import '../src/Infra/InversifyExpressUtils/AnnotatedAuthenticatorsController' import '../src/Infra/InversifyExpressUtils/AnnotatedSessionsController' diff --git a/packages/auth/bin/stats.ts b/packages/auth/bin/stats.ts index 72a00ecee..7edddb4ed 100644 --- a/packages/auth/bin/stats.ts +++ b/packages/auth/bin/stats.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.AuthScheduledTask }) -sdk.start() - import { Logger } from 'winston' import { TimerInterface } from '@standardnotes/time' @@ -26,9 +20,6 @@ void container.load().then((container) => { const persistStats: PersistStatistics = container.get(TYPES.Auth_PersistStatistics) const timer: TimerInterface = container.get(TYPES.Auth_Timer) - const tracer = new OpenTelemetryTracer() - tracer.startSpan(ServiceIdentifier.NAMES.AuthScheduledTask, 'stats') - Promise.resolve( persistStats.execute({ sessionsInADay: timer.getUTCDateNDaysAgo(1), @@ -37,15 +28,11 @@ void container.load().then((container) => { .then(() => { logger.info('Stats persisted.') - tracer.stopSpan() - process.exit(0) }) .catch((error) => { logger.error(`Could not persist stats: ${error.message}`) - tracer.stopSpanWithError(error) - process.exit(1) }) }) diff --git a/packages/auth/bin/user_email_backup.ts b/packages/auth/bin/user_email_backup.ts index 8ebe484f2..16c36421b 100644 --- a/packages/auth/bin/user_email_backup.ts +++ b/packages/auth/bin/user_email_backup.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra' -import { Email, ServiceIdentifier, SettingName } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.AuthScheduledTask }) -sdk.start() - import { Logger } from 'winston' import * as dayjs from 'dayjs' import * as utc from 'dayjs/plugin/utc' @@ -21,6 +15,7 @@ import { RoleServiceInterface } from '../src/Domain/Role/RoleServiceInterface' import { PermissionName } from '@standardnotes/features' import { UserRepositoryInterface } from '../src/Domain/User/UserRepositoryInterface' import { GetUserKeyParams } from '../src/Domain/UseCase/GetUserKeyParams/GetUserKeyParams' +import { Email, SettingName } from '@standardnotes/domain-core' const inputArgs = process.argv.slice(2) const backupEmail = inputArgs[0] @@ -94,9 +89,6 @@ void container.load().then((container) => { const domainEventPublisher: DomainEventPublisherInterface = container.get(TYPES.Auth_DomainEventPublisher) const getUserKeyParamsUseCase: GetUserKeyParams = container.get(TYPES.Auth_GetUserKeyParams) - const tracer = new OpenTelemetryTracer() - tracer.startSpan(ServiceIdentifier.NAMES.AuthScheduledTask, 'user_email_backup') - Promise.resolve( requestBackups( userRepository, @@ -110,15 +102,11 @@ void container.load().then((container) => { .then(() => { logger.info(`Email backup requesting complete for ${backupEmail}`) - tracer.stopSpan() - process.exit(0) }) .catch((error) => { logger.error(`Could not finish email backup requesting for ${backupEmail}: ${error.message}`) - tracer.stopSpanWithError(error) - process.exit(1) }) }) diff --git a/packages/auth/bin/worker.ts b/packages/auth/bin/worker.ts index 4c1ef16c5..29d86acfb 100644 --- a/packages/auth/bin/worker.ts +++ b/packages/auth/bin/worker.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.AuthWorker }) -sdk.start() - import { Logger } from 'winston' import { ContainerConfigLoader } from '../src/Bootstrap/Container' diff --git a/packages/auth/src/Bootstrap/Container.ts b/packages/auth/src/Bootstrap/Container.ts index 56afad51e..3679272e9 100644 --- a/packages/auth/src/Bootstrap/Container.ts +++ b/packages/auth/src/Bootstrap/Container.ts @@ -79,9 +79,9 @@ import { ExtensionKeyGrantedEventHandler } from '../Domain/Handler/ExtensionKeyG import { DirectCallDomainEventPublisher, DirectCallEventMessageHandler, - SNSOpenTelemetryDomainEventPublisher, + SNSDomainEventPublisher, + SQSDomainEventSubscriber, SQSEventMessageHandler, - SQSOpenTelemetryDomainEventSubscriber, } from '@standardnotes/domain-events-infra' import { GetUserSubscription } from '../Domain/UseCase/GetUserSubscription/GetUserSubscription' import { ChangeCredentials } from '../Domain/UseCase/ChangeCredentials/ChangeCredentials' @@ -170,7 +170,6 @@ import { ControllerContainer, ControllerContainerInterface, MapperInterface, - ServiceIdentifier, SharedVaultUser, } from '@standardnotes/domain-core' import { SessionTracePersistenceMapper } from '../Mapping/SessionTracePersistenceMapper' @@ -379,10 +378,7 @@ export class ContainerConfigLoader { .toConstantValue( isConfiguredForHomeServer ? directCallDomainEventPublisher - : new SNSOpenTelemetryDomainEventPublisher( - container.get(TYPES.Auth_SNS), - container.get(TYPES.Auth_SNS_TOPIC_ARN), - ), + : new SNSDomainEventPublisher(container.get(TYPES.Auth_SNS), container.get(TYPES.Auth_SNS_TOPIC_ARN)), ) // Mapping @@ -1514,8 +1510,7 @@ export class ContainerConfigLoader { container .bind(TYPES.Auth_DomainEventSubscriber) .toConstantValue( - new SQSOpenTelemetryDomainEventSubscriber( - ServiceIdentifier.NAMES.AuthWorker, + new SQSDomainEventSubscriber( container.get(TYPES.Auth_SQS), container.get(TYPES.Auth_SQS_QUEUE_URL), container.get(TYPES.Auth_DomainEventMessageHandler), diff --git a/packages/domain-events-infra/src/Infra/SQS/SQSDomainEventSubscriber.ts b/packages/domain-events-infra/src/Infra/SQS/SQSDomainEventSubscriber.ts new file mode 100644 index 000000000..e48f67d93 --- /dev/null +++ b/packages/domain-events-infra/src/Infra/SQS/SQSDomainEventSubscriber.ts @@ -0,0 +1,36 @@ +import { Consumer } from 'sqs-consumer' +import { Message, SQSClient } from '@aws-sdk/client-sqs' +import { DomainEventSubscriberInterface, DomainEventMessageHandlerInterface } from '@standardnotes/domain-events' +import { Logger } from 'winston' + +export class SQSDomainEventSubscriber implements DomainEventSubscriberInterface { + constructor( + private sqs: SQSClient, + private queueUrl: string, + private domainEventMessageHandler: DomainEventMessageHandlerInterface, + private logger: Logger, + ) {} + + start(): void { + const sqsConsumer = Consumer.create({ + attributeNames: ['All'], + messageAttributeNames: ['All'], + queueUrl: this.queueUrl, + sqs: this.sqs, + handleMessage: this.handleMessage.bind(this), + }) + + sqsConsumer.on('error', this.handleError.bind(this)) + sqsConsumer.on('processing_error', this.handleError.bind(this)) + + sqsConsumer.start() + } + + async handleMessage(message: Message): Promise { + await this.domainEventMessageHandler.handleMessage(message.Body) + } + + handleError(error: Error): void { + this.logger.error('Error occured while handling SQS message: %O', error) + } +} diff --git a/packages/domain-events-infra/src/Infra/index.ts b/packages/domain-events-infra/src/Infra/index.ts index c3a6bcf19..9adabaa51 100644 --- a/packages/domain-events-infra/src/Infra/index.ts +++ b/packages/domain-events-infra/src/Infra/index.ts @@ -15,6 +15,7 @@ export * from './SNS/SNSDomainEventPublisher' export * from './SNS/SNSOpenTelemetryDomainEventPublisher' export * from './SQS/SQSBounceNotificiationHandler' +export * from './SQS/SQSDomainEventSubscriber' export * from './SQS/SQSDomainEventSubscriberFactory' export * from './SQS/SQSEventMessageHandler' export * from './SQS/SQSOpenTelemetryDomainEventSubscriber' diff --git a/packages/files/bin/server.ts b/packages/files/bin/server.ts index ecdf7e0e6..7ebb83ae8 100644 --- a/packages/files/bin/server.ts +++ b/packages/files/bin/server.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.Files }) -sdk.start() - import * as busboy from 'connect-busboy' import '../src/Infra/InversifyExpress/AnnotatedFallbackController' diff --git a/packages/files/bin/worker.ts b/packages/files/bin/worker.ts index bb45a1bbc..e36dd7af9 100644 --- a/packages/files/bin/worker.ts +++ b/packages/files/bin/worker.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.FilesWorker }) -sdk.start() - import { Logger } from 'winston' import { ContainerConfigLoader } from '../src/Bootstrap/Container' diff --git a/packages/files/src/Bootstrap/Container.ts b/packages/files/src/Bootstrap/Container.ts index a0f5e6e99..a5bd5299d 100644 --- a/packages/files/src/Bootstrap/Container.ts +++ b/packages/files/src/Bootstrap/Container.ts @@ -16,9 +16,9 @@ import { DomainEventFactory } from '../Domain/Event/DomainEventFactory' import { DirectCallDomainEventPublisher, DirectCallEventMessageHandler, - SNSOpenTelemetryDomainEventPublisher, + SNSDomainEventPublisher, + SQSDomainEventSubscriber, SQSEventMessageHandler, - SQSOpenTelemetryDomainEventSubscriber, } from '@standardnotes/domain-events-infra' import { StreamDownloadFile } from '../Domain/UseCase/StreamDownloadFile/StreamDownloadFile' import { FileDownloaderInterface } from '../Domain/Services/FileDownloaderInterface' @@ -52,7 +52,6 @@ import { S3FileMover } from '../Infra/S3/S3FileMover' import { FSFileMover } from '../Infra/FS/FSFileMover' import { MoveFile } from '../Domain/UseCase/MoveFile/MoveFile' import { SharedVaultValetTokenAuthMiddleware } from '../Infra/InversifyExpress/Middleware/SharedVaultValetTokenAuthMiddleware' -import { ServiceIdentifier } from '@standardnotes/domain-core' export class ContainerConfigLoader { async load(configuration?: { @@ -175,10 +174,7 @@ export class ContainerConfigLoader { container .bind(TYPES.Files_DomainEventPublisher) .toConstantValue( - new SNSOpenTelemetryDomainEventPublisher( - container.get(TYPES.Files_SNS), - container.get(TYPES.Files_SNS_TOPIC_ARN), - ), + new SNSDomainEventPublisher(container.get(TYPES.Files_SNS), container.get(TYPES.Files_SNS_TOPIC_ARN)), ) } @@ -301,8 +297,7 @@ export class ContainerConfigLoader { container .bind(TYPES.Files_DomainEventSubscriber) .toConstantValue( - new SQSOpenTelemetryDomainEventSubscriber( - ServiceIdentifier.NAMES.FilesWorker, + new SQSDomainEventSubscriber( container.get(TYPES.Files_SQS), container.get(TYPES.Files_SQS_QUEUE_URL), container.get(TYPES.Files_DomainEventMessageHandler), diff --git a/packages/revisions/bin/server.ts b/packages/revisions/bin/server.ts index 126c55876..aae68df3f 100644 --- a/packages/revisions/bin/server.ts +++ b/packages/revisions/bin/server.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.Revisions }) -sdk.start() - import * as cors from 'cors' import { urlencoded, json, Request, Response, NextFunction } from 'express' import * as winston from 'winston' diff --git a/packages/revisions/bin/worker.ts b/packages/revisions/bin/worker.ts index cadfe81d2..5016e1ef4 100644 --- a/packages/revisions/bin/worker.ts +++ b/packages/revisions/bin/worker.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.RevisionsWorker }) -sdk.start() - import { Logger } from 'winston' import TYPES from '../src/Bootstrap/Types' diff --git a/packages/revisions/src/Bootstrap/Container.ts b/packages/revisions/src/Bootstrap/Container.ts index af079de60..8740e8b96 100644 --- a/packages/revisions/src/Bootstrap/Container.ts +++ b/packages/revisions/src/Bootstrap/Container.ts @@ -1,9 +1,4 @@ -import { - ControllerContainer, - ControllerContainerInterface, - MapperInterface, - ServiceIdentifier, -} from '@standardnotes/domain-core' +import { ControllerContainer, ControllerContainerInterface, MapperInterface } from '@standardnotes/domain-core' import { Container, interfaces } from 'inversify' import { Repository } from 'typeorm' import * as winston from 'winston' @@ -34,7 +29,7 @@ import { SQSEventMessageHandler, DirectCallEventMessageHandler, DirectCallDomainEventPublisher, - SQSOpenTelemetryDomainEventSubscriber, + SQSDomainEventSubscriber, } from '@standardnotes/domain-events-infra' import { DumpRepositoryInterface } from '../Domain/Dump/DumpRepositoryInterface' import { AccountDeletionRequestedEventHandler } from '../Domain/Handler/AccountDeletionRequestedEventHandler' @@ -342,8 +337,7 @@ export class ContainerConfigLoader { container .bind(TYPES.Revisions_DomainEventSubscriber) .toConstantValue( - new SQSOpenTelemetryDomainEventSubscriber( - ServiceIdentifier.NAMES.RevisionsWorker, + new SQSDomainEventSubscriber( container.get(TYPES.Revisions_SQS), container.get(TYPES.Revisions_SQS_QUEUE_URL), container.get(TYPES.Revisions_DomainEventMessageHandler), diff --git a/packages/scheduler/bin/verify.ts b/packages/scheduler/bin/verify.ts index c786824cd..fdf6825c4 100644 --- a/packages/scheduler/bin/verify.ts +++ b/packages/scheduler/bin/verify.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK, OpenTelemetryTracer } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.SchedulerScheduledTask }) -sdk.start() - import { Logger } from 'winston' import * as dayjs from 'dayjs' import * as utc from 'dayjs/plugin/utc' @@ -35,22 +29,15 @@ void container.load().then((container) => { const verifyPredicates: VerifyPredicates = container.get(TYPES.VerifyPredicates) - const tracer = new OpenTelemetryTracer() - tracer.startSpan(ServiceIdentifier.NAMES.SchedulerScheduledTask, 'verify') - Promise.resolve(verifyJobs(now, verifyPredicates)) .then(() => { logger.info('Verification of overdue jobs complete.') - tracer.stopSpan() - process.exit(0) }) .catch((error) => { logger.error(`Could not finish verification of overdue jobs: ${error.message}`) - tracer.stopSpanWithError(error) - process.exit(1) }) }) diff --git a/packages/scheduler/bin/worker.ts b/packages/scheduler/bin/worker.ts index 2bd8017f7..a09406d9b 100644 --- a/packages/scheduler/bin/worker.ts +++ b/packages/scheduler/bin/worker.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.SchedulerWorker }) -sdk.start() - import { Logger } from 'winston' import { DomainEventSubscriberInterface } from '@standardnotes/domain-events' import * as dayjs from 'dayjs' diff --git a/packages/scheduler/src/Bootstrap/Container.ts b/packages/scheduler/src/Bootstrap/Container.ts index 367bc76be..e3b2ccf24 100644 --- a/packages/scheduler/src/Bootstrap/Container.ts +++ b/packages/scheduler/src/Bootstrap/Container.ts @@ -15,9 +15,9 @@ import TYPES from './Types' import { AppDataSource } from './DataSource' import { DomainEventFactory } from '../Domain/Event/DomainEventFactory' import { - SNSOpenTelemetryDomainEventPublisher, + SNSDomainEventPublisher, + SQSDomainEventSubscriber, SQSEventMessageHandler, - SQSOpenTelemetryDomainEventSubscriber, } from '@standardnotes/domain-events-infra' import { Timer, TimerInterface } from '@standardnotes/time' import { PredicateRepositoryInterface } from '../Domain/Predicate/PredicateRepositoryInterface' @@ -35,7 +35,6 @@ import { VerifyPredicates } from '../Domain/UseCase/VerifyPredicates/VerifyPredi import { UserRegisteredEventHandler } from '../Domain/Handler/UserRegisteredEventHandler' import { SubscriptionCancelledEventHandler } from '../Domain/Handler/SubscriptionCancelledEventHandler' import { ExitDiscountAppliedEventHandler } from '../Domain/Handler/ExitDiscountAppliedEventHandler' -import { ServiceIdentifier } from '@standardnotes/domain-core' export class ContainerConfigLoader { async load(): Promise { @@ -136,9 +135,7 @@ export class ContainerConfigLoader { container .bind(TYPES.DomainEventPublisher) - .toConstantValue( - new SNSOpenTelemetryDomainEventPublisher(container.get(TYPES.SNS), container.get(TYPES.SNS_TOPIC_ARN)), - ) + .toConstantValue(new SNSDomainEventPublisher(container.get(TYPES.SNS), container.get(TYPES.SNS_TOPIC_ARN))) const eventHandlers: Map = new Map([ ['PREDICATE_VERIFIED', container.get(TYPES.PredicateVerifiedEventHandler)], @@ -153,8 +150,7 @@ export class ContainerConfigLoader { container .bind(TYPES.DomainEventSubscriber) .toConstantValue( - new SQSOpenTelemetryDomainEventSubscriber( - ServiceIdentifier.NAMES.SchedulerWorker, + new SQSDomainEventSubscriber( container.get(TYPES.SQS), container.get(TYPES.SQS_QUEUE_URL), container.get(TYPES.DomainEventMessageHandler), diff --git a/packages/syncing-server/bin/server.ts b/packages/syncing-server/bin/server.ts index 6ec03da29..ece34c050 100644 --- a/packages/syncing-server/bin/server.ts +++ b/packages/syncing-server/bin/server.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.SyncingServer }) -sdk.start() - import '../src/Infra/InversifyExpressUtils/AnnotatedFallbackController' import '../src/Infra/InversifyExpressUtils/AnnotatedHealthCheckController' import '../src/Infra/InversifyExpressUtils/AnnotatedItemsController' diff --git a/packages/syncing-server/bin/worker.ts b/packages/syncing-server/bin/worker.ts index 5bd8e889b..783022b83 100644 --- a/packages/syncing-server/bin/worker.ts +++ b/packages/syncing-server/bin/worker.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.SyncingServerWorker }) -sdk.start() - import { Logger } from 'winston' import TYPES from '../src/Bootstrap/Types' diff --git a/packages/syncing-server/src/Bootstrap/Container.ts b/packages/syncing-server/src/Bootstrap/Container.ts index f1fcfd9ba..e2b490746 100644 --- a/packages/syncing-server/src/Bootstrap/Container.ts +++ b/packages/syncing-server/src/Bootstrap/Container.ts @@ -11,9 +11,9 @@ import { Item } from '../Domain/Item/Item' import { DirectCallDomainEventPublisher, DirectCallEventMessageHandler, - SNSOpenTelemetryDomainEventPublisher, + SNSDomainEventPublisher, + SQSDomainEventSubscriber, SQSEventMessageHandler, - SQSOpenTelemetryDomainEventSubscriber, } from '@standardnotes/domain-events-infra' import { DomainEventFactoryInterface } from '../Domain/Event/DomainEventFactoryInterface' import { DomainEventFactory } from '../Domain/Event/DomainEventFactory' @@ -58,7 +58,6 @@ import { ControllerContainer, ControllerContainerInterface, MapperInterface, - ServiceIdentifier, SharedVaultUser, } from '@standardnotes/domain-core' import { BaseItemsController } from '../Infra/InversifyExpressUtils/Base/BaseItemsController' @@ -252,7 +251,7 @@ export class ContainerConfigLoader { container .bind(TYPES.Sync_DomainEventPublisher) .toDynamicValue((context: interfaces.Context) => { - return new SNSOpenTelemetryDomainEventPublisher( + return new SNSDomainEventPublisher( context.container.get(TYPES.Sync_SNS), context.container.get(TYPES.Sync_SNS_TOPIC_ARN), ) @@ -1022,8 +1021,7 @@ export class ContainerConfigLoader { container .bind(TYPES.Sync_DomainEventSubscriber) .toConstantValue( - new SQSOpenTelemetryDomainEventSubscriber( - ServiceIdentifier.NAMES.SyncingServerWorker, + new SQSDomainEventSubscriber( container.get(TYPES.Sync_SQS), container.get(TYPES.Sync_SQS_QUEUE_URL), container.get(TYPES.Sync_DomainEventMessageHandler), diff --git a/packages/websockets/bin/server.ts b/packages/websockets/bin/server.ts index 9787383c5..7e6cb8c66 100644 --- a/packages/websockets/bin/server.ts +++ b/packages/websockets/bin/server.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.Websockets }) -sdk.start() - import '../src/Infra/InversifyExpressUtils/AnnotatedHealthCheckController' import '../src/Infra/InversifyExpressUtils/AnnotatedWebSocketsController' diff --git a/packages/websockets/bin/worker.ts b/packages/websockets/bin/worker.ts index a84e88f12..b3d34fff3 100644 --- a/packages/websockets/bin/worker.ts +++ b/packages/websockets/bin/worker.ts @@ -1,11 +1,5 @@ import 'reflect-metadata' -import { OpenTelemetrySDK } from '@standardnotes/domain-events-infra' -import { ServiceIdentifier } from '@standardnotes/domain-core' - -const sdk = new OpenTelemetrySDK({ serviceName: ServiceIdentifier.NAMES.WebsocketsWorker }) -sdk.start() - import { Logger } from 'winston' import { ContainerConfigLoader } from '../src/Bootstrap/Container' diff --git a/packages/websockets/src/Bootstrap/Container.ts b/packages/websockets/src/Bootstrap/Container.ts index 22ca436c7..9b822690b 100644 --- a/packages/websockets/src/Bootstrap/Container.ts +++ b/packages/websockets/src/Bootstrap/Container.ts @@ -18,7 +18,7 @@ import { RedisWebSocketsConnectionRepository } from '../Infra/Redis/RedisWebSock import { AddWebSocketsConnection } from '../Domain/UseCase/AddWebSocketsConnection/AddWebSocketsConnection' import { RemoveWebSocketsConnection } from '../Domain/UseCase/RemoveWebSocketsConnection/RemoveWebSocketsConnection' import { WebSocketsClientMessenger } from '../Infra/WebSockets/WebSocketsClientMessenger' -import { SQSEventMessageHandler, SQSOpenTelemetryDomainEventSubscriber } from '@standardnotes/domain-events-infra' +import { SQSDomainEventSubscriber, SQSEventMessageHandler } from '@standardnotes/domain-events-infra' import { ApiGatewayAuthMiddleware } from '../Controller/ApiGatewayAuthMiddleware' import { @@ -34,7 +34,6 @@ import { WebSocketsController } from '../Controller/WebSocketsController' import { WebSocketServerInterface } from '@standardnotes/api' import { ClientMessengerInterface } from '../Client/ClientMessengerInterface' import { WebSocketMessageRequestedEventHandler } from '../Domain/Handler/WebSocketMessageRequestedEventHandler' -import { ServiceIdentifier } from '@standardnotes/domain-core' export class ContainerConfigLoader { async load(): Promise { @@ -145,8 +144,7 @@ export class ContainerConfigLoader { container .bind(TYPES.DomainEventSubscriber) .toConstantValue( - new SQSOpenTelemetryDomainEventSubscriber( - ServiceIdentifier.NAMES.WebsocketsWorker, + new SQSDomainEventSubscriber( container.get(TYPES.SQS), container.get(TYPES.SQS_QUEUE_URL), container.get(TYPES.DomainEventMessageHandler),