From e3cb1faba46bbfd8741f6c827daa9438934dd710 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20S=C3=B3jko?= Date: Fri, 5 Jan 2024 13:06:55 +0100 Subject: [PATCH] fix(syncing-server): add traffic abuse check in gRPC coms --- packages/syncing-server/bin/server.ts | 7 ++ .../src/Infra/gRPC/SyncingServer.ts | 68 ++++++++++++++++++- 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/packages/syncing-server/bin/server.ts b/packages/syncing-server/bin/server.ts index beab82f7a..4a618afb2 100644 --- a/packages/syncing-server/bin/server.ts +++ b/packages/syncing-server/bin/server.ts @@ -29,6 +29,7 @@ import { SyncingServer } from '../src/Infra/gRPC/SyncingServer' import { SyncItems } from '../src/Domain/UseCase/Syncing/SyncItems/SyncItems' import { SyncResponseFactoryResolverInterface } from '../src/Domain/Item/SyncResponse/SyncResponseFactoryResolverInterface' import { SyncResponse20200115 } from '../src/Domain/Item/SyncResponse/SyncResponse20200115' +import { CheckForTrafficAbuse } from '../src/Domain/UseCase/Syncing/CheckForTrafficAbuse/CheckForTrafficAbuse' const container = new ContainerConfigLoader() void container.load().then((container) => { @@ -114,6 +115,12 @@ void container.load().then((container) => { container.get(TYPES.Sync_SyncItems), container.get(TYPES.Sync_SyncResponseFactoryResolver), container.get>(TYPES.Sync_SyncResponseGRPCMapper), + container.get(TYPES.Sync_CheckForTrafficAbuse), + container.get(TYPES.Sync_STRICT_ABUSE_PROTECTION), + container.get(TYPES.Sync_ITEM_OPERATIONS_ABUSE_TIMEFRAME_LENGTH_IN_MINUTES), + container.get(TYPES.Sync_ITEM_OPERATIONS_ABUSE_THRESHOLD), + container.get(TYPES.Sync_PAYLOAD_SIZE_ABUSE_THRESHOLD), + container.get(TYPES.Sync_PAYLOAD_SIZE_ABUSE_TIMEFRAME_LENGTH_IN_MINUTES), container.get(TYPES.Sync_Logger), ) diff --git a/packages/syncing-server/src/Infra/gRPC/SyncingServer.ts b/packages/syncing-server/src/Infra/gRPC/SyncingServer.ts index 3d8c15b59..e45667d9a 100644 --- a/packages/syncing-server/src/Infra/gRPC/SyncingServer.ts +++ b/packages/syncing-server/src/Infra/gRPC/SyncingServer.ts @@ -9,12 +9,20 @@ import { SyncItems } from '../../Domain/UseCase/Syncing/SyncItems/SyncItems' import { ApiVersion } from '../../Domain/Api/ApiVersion' import { SyncResponseFactoryResolverInterface } from '../../Domain/Item/SyncResponse/SyncResponseFactoryResolverInterface' import { SyncResponse20200115 } from '../../Domain/Item/SyncResponse/SyncResponse20200115' +import { CheckForTrafficAbuse } from '../../Domain/UseCase/Syncing/CheckForTrafficAbuse/CheckForTrafficAbuse' +import { Metric } from '../../Domain/Metrics/Metric' export class SyncingServer implements ISyncingServer { constructor( private syncItemsUseCase: SyncItems, private syncResponseFactoryResolver: SyncResponseFactoryResolverInterface, private mapper: MapperInterface, + protected checkForTrafficAbuse: CheckForTrafficAbuse, + private strictAbuseProtection: boolean, + private itemOperationsAbuseTimeframeLengthInMinutes: number, + private itemOperationsAbuseThreshold: number, + private payloadSizeAbuseThreshold: number, + private payloadSizeAbuseTimeframeLengthInMinutes: number, private logger: Logger, ) {} @@ -23,6 +31,63 @@ export class SyncingServer implements ISyncingServer { callback: grpc.sendUnaryData, ): Promise { try { + const userUuid = call.metadata.get('x-user-uuid').pop() as string + + const checkForItemOperationsAbuseResult = await this.checkForTrafficAbuse.execute({ + metricToCheck: Metric.NAMES.ItemOperation, + userUuid, + threshold: this.itemOperationsAbuseThreshold, + timeframeLengthInMinutes: this.itemOperationsAbuseTimeframeLengthInMinutes, + }) + if (checkForItemOperationsAbuseResult.isFailed()) { + this.logger.warn(checkForItemOperationsAbuseResult.getError(), { + userId: userUuid, + }) + if (this.strictAbuseProtection) { + const metadata = new grpc.Metadata() + metadata.set('x-sync-error-message', checkForItemOperationsAbuseResult.getError()) + metadata.set('x-sync-error-response-code', '429') + + return callback( + { + code: Status.INVALID_ARGUMENT, + message: checkForItemOperationsAbuseResult.getError(), + name: 'INVALID_ARGUMENT', + metadata, + }, + null, + ) + } + } + + const checkForPayloadSizeAbuseResult = await this.checkForTrafficAbuse.execute({ + metricToCheck: Metric.NAMES.ContentSizeUtilized, + userUuid, + threshold: this.payloadSizeAbuseThreshold, + timeframeLengthInMinutes: this.payloadSizeAbuseTimeframeLengthInMinutes, + }) + if (checkForPayloadSizeAbuseResult.isFailed()) { + this.logger.warn(checkForPayloadSizeAbuseResult.getError(), { + userId: userUuid, + }) + + if (this.strictAbuseProtection) { + const metadata = new grpc.Metadata() + metadata.set('x-sync-error-message', checkForPayloadSizeAbuseResult.getError()) + metadata.set('x-sync-error-response-code', '429') + + return callback( + { + code: Status.INVALID_ARGUMENT, + message: checkForPayloadSizeAbuseResult.getError(), + name: 'INVALID_ARGUMENT', + metadata, + }, + null, + ) + } + } + const itemHashesRPC = call.request.getItemsList() const itemHashes: ItemHash[] = [] for (const itemHash of itemHashesRPC) { @@ -39,7 +104,7 @@ export class SyncingServer implements ISyncingServer { created_at_timestamp: itemHash.hasCreatedAtTimestamp() ? itemHash.getCreatedAtTimestamp() : undefined, updated_at: itemHash.hasUpdatedAt() ? itemHash.getUpdatedAt() : undefined, updated_at_timestamp: itemHash.hasUpdatedAtTimestamp() ? itemHash.getUpdatedAtTimestamp() : undefined, - user_uuid: call.metadata.get('userUuid').pop() as string, + user_uuid: userUuid, key_system_identifier: itemHash.hasKeySystemIdentifier() ? (itemHash.getKeySystemIdentifier() as string) : null, @@ -72,7 +137,6 @@ export class SyncingServer implements ISyncingServer { } const apiVersion = call.request.hasApiVersion() ? (call.request.getApiVersion() as string) : ApiVersion.v20161215 - const userUuid = call.metadata.get('x-user-uuid').pop() as string const readOnlyAccess = call.metadata.get('x-read-only-access').pop() === 'true' if (readOnlyAccess) { this.logger.debug('Syncing with read-only access', {