feat: add graceful shutdown procedures upon SIGTERM (#923)

This commit is contained in:
Karol Sójko 2023-11-10 12:20:21 +01:00 committed by GitHub
parent 4855e1d5f5
commit c24353cc24
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
40 changed files with 147 additions and 141 deletions

View file

@ -22,5 +22,11 @@ void container.load().then((container) => {
const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.DomainEventSubscriber)
process.on('SIGTERM', () => {
logger.info('SIGTERM received. Stopping worker...')
subscriber.stop()
logger.info('Worker stopped.')
})
subscriber.start()
})

View file

@ -6,12 +6,12 @@ COMMAND=$1 && shift 1
case "$COMMAND" in
'start-worker' )
echo "[Docker] Starting Worker..."
node docker/entrypoint-worker.js
exec node docker/entrypoint-worker.js
;;
'report' )
echo "[Docker] Starting Usage Report Generation..."
node docker/entrypoint-report.js
exec node docker/entrypoint-report.js
;;
* )

View file

@ -108,5 +108,12 @@ void container.load().then((container) => {
serverInstance.keepAliveTimeout = keepAliveTimeout
process.on('SIGTERM', () => {
logger.info('SIGTERM signal received: closing HTTP server')
serverInstance.close(() => {
logger.info('HTTP server closed')
})
})
logger.info(`Server started on port ${process.env.PORT}`)
})

View file

@ -6,7 +6,7 @@ COMMAND=$1 && shift 1
case "$COMMAND" in
'start-web' )
echo "Starting Web..."
node docker/entrypoint-server.js
exec node docker/entrypoint-server.js
;;
* )

View file

@ -6,4 +6,4 @@ sh supervisor/wait-for.sh localhost $AUTH_SERVER_PORT
sh supervisor/wait-for.sh localhost $FILES_SERVER_PORT
sh supervisor/wait-for.sh localhost $REVISIONS_SERVER_PORT
sh supervisor/wait-for.sh localhost $SYNCING_SERVER_PORT
node docker/entrypoint-server.js
exec node docker/entrypoint-server.js

View file

@ -64,9 +64,14 @@ void container.load().then((container) => {
})
})
const serverInstance = server.build()
const serverInstance = server.build().listen(env.get('PORT'))
serverInstance.listen(env.get('PORT'))
process.on('SIGTERM', () => {
logger.info('SIGTERM signal received: closing HTTP server')
serverInstance.close(() => {
logger.info('HTTP server closed')
})
})
logger.info(`Server started on port ${process.env.PORT}`)
})

View file

@ -22,5 +22,11 @@ void container.load().then((container) => {
const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.Auth_DomainEventSubscriber)
process.on('SIGTERM', () => {
logger.info('SIGTERM received. Stopping worker...')
subscriber.stop()
logger.info('Worker stopped.')
})
subscriber.start()
})

View file

@ -6,45 +6,45 @@ COMMAND=$1 && shift 1
case "$COMMAND" in
'start-web' )
echo "[Docker] Starting Web..."
node docker/entrypoint-server.js
exec node docker/entrypoint-server.js
;;
'start-worker' )
echo "[Docker] Starting Worker..."
node docker/entrypoint-worker.js
exec node docker/entrypoint-worker.js
;;
'cleanup' )
echo "[Docker] Starting Cleanup..."
node docker/entrypoint-cleanup.js
exec node docker/entrypoint-cleanup.js
;;
'stats' )
echo "[Docker] Starting Persisting Stats..."
node docker/entrypoint-stats.js
exec node docker/entrypoint-stats.js
;;
'email-daily-backup' )
echo "[Docker] Starting Email Daily Backup..."
node docker/entrypoint-backup.js daily
exec node docker/entrypoint-backup.js daily
;;
'email-weekly-backup' )
echo "[Docker] Starting Email Weekly Backup..."
node docker/entrypoint-backup.js weekly
exec node docker/entrypoint-backup.js weekly
;;
'email-backup' )
echo "[Docker] Starting Email Backup For Single User..."
EMAIL=$1 && shift 1
node docker/entrypoint-user-email-backup.js $EMAIL
exec node docker/entrypoint-user-email-backup.js $EMAIL
;;
'delete-accounts' )
echo "[Docker] Starting Accounts Deleting from CSV..."
FILE_NAME=$1 && shift 1
MODE=$1 && shift 1
node docker/entrypoint-delete-accounts.js $FILE_NAME $MODE
exec node docker/entrypoint-delete-accounts.js $FILE_NAME $MODE
;;
* )

View file

@ -3,4 +3,4 @@
set -euo pipefail
sh supervisor/wait-for.sh localhost $SYNCING_SERVER_PORT
node docker/entrypoint-server.js
exec node docker/entrypoint-server.js

View file

@ -3,4 +3,4 @@
set -euo pipefail
sh supervisor/wait-for.sh localhost $AUTH_SERVER_PORT
node docker/entrypoint-worker.js
exec node docker/entrypoint-worker.js

View file

@ -1,23 +0,0 @@
import 'reflect-metadata'
import * as IORedis from 'ioredis'
import { RedisDomainEventSubscriber } from './RedisDomainEventSubscriber'
describe('RedisDomainEventSubscriber', () => {
let redisClient: IORedis.Redis
const eventChannel = 'test-channel'
const createSubscriber = () => new RedisDomainEventSubscriber(redisClient, eventChannel)
beforeEach(() => {
redisClient = {} as jest.Mocked<IORedis.Redis>
redisClient.subscribe = jest.fn()
})
it('should start the subscription', () => {
createSubscriber().start()
expect(redisClient.subscribe).toHaveBeenCalledWith('test-channel')
})
})

View file

@ -1,14 +0,0 @@
import * as IORedis from 'ioredis'
import { DomainEventSubscriberInterface } from '@standardnotes/domain-events'
export class RedisDomainEventSubscriber implements DomainEventSubscriberInterface {
constructor(
private redisClient: IORedis.Redis,
private eventChannel: string,
) {}
start(): void {
void this.redisClient.subscribe(this.eventChannel)
}
}

View file

@ -1,31 +0,0 @@
import 'reflect-metadata'
import * as IORedis from 'ioredis'
import { RedisDomainEventSubscriberFactory } from './RedisDomainEventSubscriberFactory'
import { DomainEventMessageHandlerInterface } from '@standardnotes/domain-events'
import { RedisDomainEventSubscriber } from './RedisDomainEventSubscriber'
describe('RedisDomainEventSubscriberFactory', () => {
let redisClient: IORedis.Redis
let domainEventMessageHandler: DomainEventMessageHandlerInterface
const eventChannel = 'events'
const createFactory = () =>
new RedisDomainEventSubscriberFactory(redisClient, domainEventMessageHandler, eventChannel)
beforeEach(() => {
redisClient = {} as jest.Mocked<IORedis.Redis>
redisClient.on = jest.fn()
domainEventMessageHandler = {} as jest.Mocked<DomainEventMessageHandlerInterface>
domainEventMessageHandler.handleMessage = jest.fn()
})
it('should create an event subscriber', () => {
const subscriber = createFactory().create()
expect(subscriber).toBeInstanceOf(RedisDomainEventSubscriber)
expect(redisClient.on).toHaveBeenCalledWith('message', expect.any(Function))
})
})

View file

@ -1,29 +0,0 @@
import * as IORedis from 'ioredis'
import {
DomainEventSubscriberFactoryInterface,
DomainEventSubscriberInterface,
DomainEventMessageHandlerInterface,
} from '@standardnotes/domain-events'
import { RedisDomainEventSubscriber } from './RedisDomainEventSubscriber'
export class RedisDomainEventSubscriberFactory implements DomainEventSubscriberFactoryInterface {
constructor(
private redisClient: IORedis.Redis,
private domainEventMessageHandler: DomainEventMessageHandlerInterface,
private eventChannel: string,
) {}
create(): DomainEventSubscriberInterface {
const subscriber = new RedisDomainEventSubscriber(this.redisClient, this.eventChannel)
this.redisClient.on(
'message',
/* istanbul ignore next */
async (_channel: string, message: string) => await this.domainEventMessageHandler.handleMessage(message),
)
return subscriber
}
}

View file

@ -4,6 +4,8 @@ import { DomainEventSubscriberInterface, DomainEventMessageHandlerInterface } fr
import { Logger } from 'winston'
export class SQSDomainEventSubscriber implements DomainEventSubscriberInterface {
private consumer: Consumer | undefined
constructor(
private sqs: SQSClient,
private queueUrl: string,
@ -23,9 +25,18 @@ export class SQSDomainEventSubscriber implements DomainEventSubscriberInterface
sqsConsumer.on('error', this.handleError.bind(this))
sqsConsumer.on('processing_error', this.handleError.bind(this))
this.consumer = sqsConsumer
sqsConsumer.start()
}
stop(): void {
if (this.consumer && this.consumer.isRunning) {
this.logger.info('Stopping SQS consumer...')
this.consumer.stop()
}
}
async handleMessage(message: Message): Promise<void> {
await this.domainEventMessageHandler.handleMessage(<string>message.Body)
}

View file

@ -5,6 +5,7 @@ import { DomainEventSubscriberInterface, DomainEventMessageHandlerInterface } fr
import { Logger } from 'winston'
export class SQSOpenTelemetryDomainEventSubscriber implements DomainEventSubscriberInterface {
private consumer: Consumer | undefined
private currentSpan: OpenTelemetryApi.Span | undefined
constructor(
@ -28,9 +29,18 @@ export class SQSOpenTelemetryDomainEventSubscriber implements DomainEventSubscri
sqsConsumer.on('error', this.handleError.bind(this))
sqsConsumer.on('processing_error', this.handleError.bind(this))
this.consumer = sqsConsumer
sqsConsumer.start()
}
stop(): void {
if (this.consumer && this.consumer.isRunning) {
this.logger.info('Stopping SQS consumer...')
this.consumer.stop()
}
}
async startParentSpan(): Promise<void> {
const tracer = OpenTelemetryApi.trace.getTracer(`${this.serviceName}-domain-event-subscriber`)

View file

@ -7,8 +7,6 @@ export * from './OpenTelemetry/OpenTelemetryTracer'
export * from './OpenTelemetry/OpenTelemetryTracerInterface'
export * from './Redis/RedisDomainEventPublisher'
export * from './Redis/RedisDomainEventSubscriber'
export * from './Redis/RedisDomainEventSubscriberFactory'
export * from './Redis/RedisEventMessageHandler'
export * from './SNS/SNSDomainEventPublisher'

View file

@ -1,3 +1,4 @@
export interface DomainEventSubscriberInterface {
start(): void
stop(): void
}

View file

@ -6,7 +6,7 @@ COMMAND=$1 && shift 1
case "$COMMAND" in
'start-worker' )
echo "Starting Worker..."
node docker/entrypoint-worker.js
exec node docker/entrypoint-worker.js
;;
* )

View file

@ -89,9 +89,14 @@ void container.load().then((container) => {
})
})
const serverInstance = server.build()
const serverInstance = server.build().listen(env.get('PORT'))
serverInstance.listen(env.get('PORT'))
process.on('SIGTERM', () => {
logger.info('SIGTERM signal received: closing HTTP server')
serverInstance.close(() => {
logger.info('HTTP server closed')
})
})
logger.info(`Server started on port ${process.env.PORT}`)
})

View file

@ -22,5 +22,11 @@ void container.load().then((container) => {
const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.Files_DomainEventSubscriber)
process.on('SIGTERM', () => {
logger.info('SIGTERM received. Stopping worker...')
subscriber.stop()
logger.info('Worker stopped.')
})
subscriber.start()
})

View file

@ -6,12 +6,12 @@ COMMAND=$1 && shift 1
case "$COMMAND" in
'start-web' )
echo "Starting Web..."
node docker/entrypoint-server.js
exec node docker/entrypoint-server.js
;;
'start-worker' )
echo "Starting Worker..."
node docker/entrypoint-worker.js
exec node docker/entrypoint-worker.js
;;
* )

View file

@ -4,4 +4,4 @@ set -euo pipefail
sh supervisor/wait-for.sh $DB_HOST $DB_PORT
sh supervisor/wait-for.sh $REDIS_HOST $REDIS_PORT
node docker/entrypoint-server.js
exec node docker/entrypoint-server.js

View file

@ -3,4 +3,4 @@
set -euo pipefail
sh supervisor/wait-for.sh localhost $SYNCING_SERVER_PORT
node docker/entrypoint-worker.js
exec node docker/entrypoint-worker.js

View file

@ -174,7 +174,16 @@ export class HomeServer implements HomeServerInterface {
const port = env.get('PORT', true) ? +env.get('PORT', true) : 3000
this.serverInstance = server.build().listen(port)
const serverInstance = server.build().listen(port)
this.serverInstance = serverInstance
process.on('SIGTERM', () => {
logger.info('SIGTERM signal received: closing HTTP server')
serverInstance.close(() => {
logger.info('HTTP server closed')
})
})
logger.info(`Server started on port ${port}. Log level: ${env.get('LOG_LEVEL', true)}.`)

View file

@ -43,9 +43,14 @@ void container.load().then((container) => {
})
})
const serverInstance = server.build()
const serverInstance = server.build().listen(env.get('PORT'))
serverInstance.listen(env.get('PORT'))
process.on('SIGTERM', () => {
logger.info('SIGTERM signal received: closing HTTP server')
serverInstance.close(() => {
logger.info('HTTP server closed')
})
})
logger.info(`Server started on port ${process.env.PORT}`)
})

View file

@ -18,5 +18,11 @@ void container.load().then((container) => {
const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.Revisions_DomainEventSubscriber)
process.on('SIGTERM', () => {
logger.info('SIGTERM received. Stopping worker...')
subscriber.stop()
logger.info('Worker stopped.')
})
subscriber.start()
})

View file

@ -6,12 +6,12 @@ COMMAND=$1 && shift 1
case "$COMMAND" in
'start-web' )
echo "Starting Web..."
node docker/entrypoint-server.js
exec node docker/entrypoint-server.js
;;
'start-worker' )
echo "Starting Worker..."
node docker/entrypoint-worker.js
exec node docker/entrypoint-worker.js
;;
* )

View file

@ -3,4 +3,4 @@
set -euo pipefail
sh supervisor/wait-for.sh localhost $SYNCING_SERVER_PORT
node docker/entrypoint-server.js
exec node docker/entrypoint-server.js

View file

@ -3,4 +3,4 @@
set -euo pipefail
sh supervisor/wait-for.sh localhost $SYNCING_SERVER_PORT
node docker/entrypoint-worker.js
exec node docker/entrypoint-worker.js

View file

@ -22,5 +22,11 @@ void container.load().then((container) => {
const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.DomainEventSubscriber)
process.on('SIGTERM', () => {
logger.info('SIGTERM received. Stopping worker...')
subscriber.stop()
logger.info('Worker stopped.')
})
subscriber.start()
})

View file

@ -6,12 +6,12 @@ COMMAND=$1 && shift 1
case "$COMMAND" in
'start-worker' )
echo "Starting Worker..."
node docker/entrypoint-worker.js
exec node docker/entrypoint-worker.js
;;
'verify-jobs' )
echo "Starting jobs verification..."
node docker/entrypoint-verify.js
exec node docker/entrypoint-verify.js
;;
* )

View file

@ -72,9 +72,14 @@ void container.load().then((container) => {
})
})
const serverInstance = server.build()
const serverInstance = server.build().listen(env.get('PORT'))
serverInstance.listen(env.get('PORT'))
process.on('SIGTERM', () => {
logger.info('SIGTERM signal received: closing HTTP server')
serverInstance.close(() => {
logger.info('HTTP server closed')
})
})
logger.info(`Server started on port ${process.env.PORT}`)
})

View file

@ -18,5 +18,11 @@ void container.load().then((container) => {
const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.Sync_DomainEventSubscriber)
process.on('SIGTERM', () => {
logger.info('SIGTERM received. Stopping worker...')
subscriber.stop()
logger.info('Worker stopped.')
})
subscriber.start()
})

View file

@ -6,12 +6,12 @@ COMMAND=$1 && shift 1
case "$COMMAND" in
'start-web' )
echo "[Docker] Starting Web..."
node docker/entrypoint-server.js
exec node docker/entrypoint-server.js
;;
'start-worker' )
echo "[Docker] Starting Worker..."
node docker/entrypoint-worker.js
exec node docker/entrypoint-worker.js
;;
* )

View file

@ -4,4 +4,4 @@ set -euo pipefail
sh supervisor/wait-for.sh $DB_HOST $DB_PORT
sh supervisor/wait-for.sh $REDIS_HOST $REDIS_PORT
node docker/entrypoint-server.js
exec node docker/entrypoint-server.js

View file

@ -3,4 +3,4 @@
set -euo pipefail
sh supervisor/wait-for.sh localhost $SYNCING_SERVER_PORT
node docker/entrypoint-worker.js
exec node docker/entrypoint-worker.js

View file

@ -44,9 +44,14 @@ void container.load().then((container) => {
})
})
const serverInstance = server.build()
const serverInstance = server.build().listen(env.get('PORT'))
serverInstance.listen(env.get('PORT'))
process.on('SIGTERM', () => {
logger.info('SIGTERM signal received: closing HTTP server')
serverInstance.close(() => {
logger.info('HTTP server closed')
})
})
logger.info(`Server started on port ${process.env.PORT}`)
})

View file

@ -18,5 +18,11 @@ void container.load().then((container) => {
const subscriber = container.get<DomainEventSubscriberInterface>(TYPES.DomainEventSubscriber)
process.on('SIGTERM', () => {
logger.info('SIGTERM received. Stopping worker...')
subscriber.stop()
logger.info('Worker stopped.')
})
subscriber.start()
})

View file

@ -6,12 +6,12 @@ COMMAND=$1 && shift 1
case "$COMMAND" in
'start-web' )
echo "Starting Web..."
node docker/entrypoint-server.js
exec node docker/entrypoint-server.js
;;
'start-worker' )
echo "Starting Worker..."
node docker/entrypoint-worker.js
exec node docker/entrypoint-worker.js
;;
* )