From 80f72bd306df4cc1dff9f669c97dbb4f759dc4e0 Mon Sep 17 00:00:00 2001 From: roy-dydx <133032749+roy-dydx@users.noreply.github.com> Date: Thu, 22 Aug 2024 12:29:48 -0400 Subject: [PATCH] Add instance id to socks metrics (#2137) (cherry picked from commit 76bd1c81affcd9d4bb95f9646fd629d31f199ed5) --- indexer/Dockerfile.bazooka.remote | 1 + .../base/__tests__/instance-id.test.ts | 87 +++++++++++++++++++ indexer/packages/base/package.json | 6 +- indexer/packages/base/src/config.ts | 3 +- indexer/packages/base/src/index.ts | 1 + indexer/packages/base/src/instance-id.ts | 51 +++++++++++ indexer/packages/base/tsconfig.json | 3 +- indexer/pnpm-lock.yaml | 7 +- indexer/services/socks/src/helpers/wss.ts | 13 ++- indexer/services/socks/src/index.ts | 16 +++- .../socks/src/lib/message-forwarder.ts | 43 ++++++++- .../services/socks/src/lib/subscription.ts | 8 ++ indexer/services/socks/src/websocket/index.ts | 25 +++++- 13 files changed, 248 insertions(+), 16 deletions(-) create mode 100644 indexer/packages/base/__tests__/instance-id.test.ts create mode 100644 indexer/packages/base/src/instance-id.ts diff --git a/indexer/Dockerfile.bazooka.remote b/indexer/Dockerfile.bazooka.remote index 0d7a711e06..c392b957d9 100644 --- a/indexer/Dockerfile.bazooka.remote +++ b/indexer/Dockerfile.bazooka.remote @@ -11,6 +11,7 @@ COPY ./patches ./patches # Copy bazooka and imported packages COPY ./packages/base/ ./packages/base/ +COPY ./packages/dev/ ./packages/dev/ COPY ./packages/postgres/ ./packages/postgres/ COPY ./packages/v4-protos/ ./packages/v4-protos/ COPY ./packages/kafka/ ./packages/kafka/ diff --git a/indexer/packages/base/__tests__/instance-id.test.ts b/indexer/packages/base/__tests__/instance-id.test.ts new file mode 100644 index 0000000000..58eef877fd --- /dev/null +++ b/indexer/packages/base/__tests__/instance-id.test.ts @@ -0,0 +1,87 @@ +import { setInstanceId, getInstanceId, resetForTests } from '../src/instance-id'; +import { axiosRequest } from '../src/axios'; +import { asMock } from '@dydxprotocol-indexer/dev'; +import logger from '../src/logger'; +import config from '../src/config'; + +jest.mock('../src/axios', () => ({ + ...(jest.requireActual('../src/axios') as object), + axiosRequest: jest.fn(), +})); + +describe('instance-id', () => { + describe('setInstanceId', () => { + const defaultTaskArn = 'defaultTaskArn'; + const defaultResponse = { + TaskARN: defaultTaskArn, + }; + const ecsUrl = config.ECS_CONTAINER_METADATA_URI_V4; + + beforeEach(() => { + config.ECS_CONTAINER_METADATA_URI_V4 = ecsUrl; + resetForTests(); + jest.resetAllMocks(); + jest.restoreAllMocks(); + asMock(axiosRequest).mockResolvedValue(defaultResponse); + }); + + afterAll(() => { + jest.clearAllMocks(); + jest.restoreAllMocks(); + }); + + it('should set instance id to task ARN in staging', async () => { + jest.spyOn(config, 'isStaging').mockReturnValueOnce(true); + config.ECS_CONTAINER_METADATA_URI_V4 = 'url'; + await setInstanceId(); + + expect(getInstanceId()).toEqual(defaultTaskArn); + }); + + it('should set instance id to task ARN in production', async () => { + jest.spyOn(config, 'isProduction').mockReturnValueOnce(true); + config.ECS_CONTAINER_METADATA_URI_V4 = 'url'; + await setInstanceId(); + + expect(getInstanceId()).toEqual(defaultTaskArn); + }); + + it('should not call metadata endpoint if not production or staging', async () => { + config.ECS_CONTAINER_METADATA_URI_V4 = 'url'; + await setInstanceId(); + + expect(getInstanceId()).not.toEqual(defaultTaskArn); + expect(asMock(axiosRequest)).not.toHaveBeenCalled(); + }); + + it('should not set instance id if already set', async () => { + jest.spyOn(config, 'isStaging').mockReturnValue(true); + config.ECS_CONTAINER_METADATA_URI_V4 = 'url'; + await setInstanceId(); + const instanceId = getInstanceId(); + await setInstanceId(); + + expect(getInstanceId()).toEqual(instanceId); + expect(axiosRequest).toHaveBeenCalledTimes(1); + }); + + it('should log error and set instance id to uuid if request errors', async () => { + jest.spyOn(config, 'isStaging').mockReturnValue(true); + config.ECS_CONTAINER_METADATA_URI_V4 = 'url'; + const loggerErrorSpy = jest.spyOn(logger, 'error'); + const emptyInstanceId = getInstanceId(); + asMock(axiosRequest).mockRejectedValueOnce(new Error()); + await setInstanceId(); + + expect(loggerErrorSpy).toHaveBeenCalledTimes(1); + expect(getInstanceId()).not.toEqual(emptyInstanceId); + }); + + it('should not call metadata endpoint if url is empty', async () => { + jest.spyOn(config, 'isStaging').mockReturnValue(true); + await setInstanceId(); + + expect(axiosRequest).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/indexer/packages/base/package.json b/indexer/packages/base/package.json index 316f38de61..b4c9e344eb 100644 --- a/indexer/packages/base/package.json +++ b/indexer/packages/base/package.json @@ -2,7 +2,7 @@ "name": "@dydxprotocol-indexer/base", "version": "0.0.1", "description": "", - "main": "build/index.js", + "main": "build/src/index.js", "devDependencies": { "@dydxprotocol-indexer/dev": "workspace:^0.0.1", "@types/big.js": "^6.1.5", @@ -10,6 +10,7 @@ "@types/lodash": "^4.14.182", "@types/traverse": "^0.6.32", "express": "^4.18.1", + "jest": "^28.1.2", "typescript": "^4.7.4" }, "scripts": { @@ -18,7 +19,7 @@ "build": "rm -rf build/ && tsc", "build:prod": "pnpm run build", "build:watch": "pnpm run build -- --watch", - "test": "echo \"Error: no test specified\"" + "test": "NODE_ENV=test jest --runInBand --forceExit" }, "repository": { "type": "git", @@ -38,6 +39,7 @@ "hot-shots": "^9.1.0", "lodash": "^4.17.21", "traverse": "^0.6.6", + "uuid": "^8.3.2", "winston": "^3.8.1", "winston-transport": "^4.5.0", "@bugsnag/core": "^7.18.0", diff --git a/indexer/packages/base/src/config.ts b/indexer/packages/base/src/config.ts index 65d7b2663c..1d88cc3284 100644 --- a/indexer/packages/base/src/config.ts +++ b/indexer/packages/base/src/config.ts @@ -30,7 +30,7 @@ export const baseConfigSchema = { SEND_BUGSNAG_ERRORS: parseBoolean({ default: true, }), - SERVICE_NAME: parseString(), + SERVICE_NAME: parseString({ default: '' }), // Optional environment variables. NODE_ENV: parseString({ default: null }), @@ -38,6 +38,7 @@ export const baseConfigSchema = { STATSD_HOST: parseString({ default: 'localhost' }), STATSD_PORT: parseInteger({ default: 8125 }), LOG_LEVEL: parseString({ default: 'debug' }), + ECS_CONTAINER_METADATA_URI_V4: parseString({ default: '' }), }; export default parseSchema(baseConfigSchema); diff --git a/indexer/packages/base/src/index.ts b/indexer/packages/base/src/index.ts index 2bd3784a9c..5ec5dae958 100644 --- a/indexer/packages/base/src/index.ts +++ b/indexer/packages/base/src/index.ts @@ -13,6 +13,7 @@ export * from './constants'; export * from './bugsnag'; export * from './stats-util'; export * from './date-helpers'; +export * from './instance-id'; // Do this outside logger.ts to avoid a dependency cycle with logger transports that may trigger // additional logging. diff --git a/indexer/packages/base/src/instance-id.ts b/indexer/packages/base/src/instance-id.ts new file mode 100644 index 0000000000..5db6dfdba9 --- /dev/null +++ b/indexer/packages/base/src/instance-id.ts @@ -0,0 +1,51 @@ +import { v4 as uuidv4 } from 'uuid'; + +import { axiosRequest } from './axios'; +import config from './config'; +import logger from './logger'; + +let INSTANCE_ID: string = ''; + +export function getInstanceId(): string { + return INSTANCE_ID; +} + +export async function setInstanceId(): Promise { + if (INSTANCE_ID !== '') { + return; + } + if (config.ECS_CONTAINER_METADATA_URI_V4 !== '' && + ( + config.isProduction() || config.isStaging() + ) + ) { + // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4.html + const taskUrl = `${config.ECS_CONTAINER_METADATA_URI_V4}/task`; + try { + const response = await axiosRequest({ + method: 'GET', + url: taskUrl, + }) as { TaskARN: string }; + INSTANCE_ID = response.TaskARN; + } catch (error) { + logger.error({ + at: 'instance-id#setInstanceId', + message: 'Failed to retrieve task arn from metadata endpoint. Falling back to uuid.', + error, + taskUrl, + }); + INSTANCE_ID = uuidv4(); + } + } else { + INSTANCE_ID = uuidv4(); + + } +} + +// Exported for tests +export function resetForTests(): void { + if (!config.isTest()) { + throw new Error(`resetForTests() cannot be called for env: ${config.NODE_ENV}`); + } + INSTANCE_ID = ''; +} diff --git a/indexer/packages/base/tsconfig.json b/indexer/packages/base/tsconfig.json index 8914796c0b..ece83b94c0 100644 --- a/indexer/packages/base/tsconfig.json +++ b/indexer/packages/base/tsconfig.json @@ -4,6 +4,7 @@ "outDir": "build" }, "include": [ - "src" + "src", + "__tests__" ] } \ No newline at end of file diff --git a/indexer/pnpm-lock.yaml b/indexer/pnpm-lock.yaml index 31eb93dbe1..d4bf518059 100644 --- a/indexer/pnpm-lock.yaml +++ b/indexer/pnpm-lock.yaml @@ -28,9 +28,11 @@ importers: dotenv-flow: ^3.2.0 express: ^4.18.1 hot-shots: ^9.1.0 + jest: ^28.1.2 lodash: ^4.17.21 traverse: ^0.6.6 typescript: ^4.7.4 + uuid: ^8.3.2 winston: ^3.8.1 winston-transport: ^4.5.0 dependencies: @@ -45,6 +47,7 @@ importers: hot-shots: 9.1.0 lodash: 4.17.21 traverse: 0.6.6 + uuid: 8.3.2 winston: 3.8.1 winston-transport: 4.5.0 devDependencies: @@ -54,6 +57,7 @@ importers: '@types/lodash': 4.14.182 '@types/traverse': 0.6.32 express: 4.18.1 + jest: 28.1.2 typescript: 4.7.4 packages/compliance: @@ -10001,7 +10005,7 @@ packages: pretty-format: 28.1.1 slash: 3.0.0 strip-json-comments: 3.1.1 - ts-node: 10.8.2_2ee97d30e4a239eb38d57e3751ee8d16 + ts-node: 10.8.2_2dd5d46eecda2aef953638919121af58 transitivePeerDependencies: - supports-color @@ -13144,6 +13148,7 @@ packages: typescript: 4.9.5 v8-compile-cache-lib: 3.0.1 yn: 3.1.1 + dev: true /ts-node/10.8.2_4ea55324100c26d4019c6e6bcc89fac6: resolution: {integrity: sha512-LYdGnoGddf1D6v8REPtIH+5iq/gTDuZqv2/UJUU7tKjuEU8xVZorBM+buCGNjj+pGEud+sOoM4CX3/YzINpENA==} diff --git a/indexer/services/socks/src/helpers/wss.ts b/indexer/services/socks/src/helpers/wss.ts index c835129598..8e6f21e253 100644 --- a/indexer/services/socks/src/helpers/wss.ts +++ b/indexer/services/socks/src/helpers/wss.ts @@ -1,4 +1,4 @@ -import { stats, logger } from '@dydxprotocol-indexer/base'; +import { stats, getInstanceId, logger } from '@dydxprotocol-indexer/base'; import WebSocket from 'ws'; import config from '../config'; @@ -105,6 +105,7 @@ export function sendMessageString( 1, config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, { + instance: getInstanceId(), reason: WEBSOCKET_NOT_OPEN, readyState: ws.readyState.toString(), }, @@ -118,7 +119,10 @@ export function sendMessageString( `${config.SERVICE_NAME}.ws_send.error`, 1, config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, - { code: (error as WssError)?.code }, + { + instance: getInstanceId(), + code: (error as WssError)?.code, + }, ); const errorLog = { // type is InfoObject in node-service-base at: 'wss#sendMessage', @@ -148,7 +152,10 @@ export function sendMessageString( stats.increment( `${config.SERVICE_NAME}.ws_send.stream_destroyed_errors`, 1, - { action: 'close' }, + { + action: 'close', + instance: getInstanceId(), + }, ); } else { logger.error(closeErrorLog); diff --git a/indexer/services/socks/src/index.ts b/indexer/services/socks/src/index.ts index b05a1e83b0..809c0b2a0d 100644 --- a/indexer/services/socks/src/index.ts +++ b/indexer/services/socks/src/index.ts @@ -1,4 +1,6 @@ -import { logger, startBugsnag, wrapBackgroundTask } from '@dydxprotocol-indexer/base'; +import { + logger, setInstanceId, getInstanceId, startBugsnag, wrapBackgroundTask, +} from '@dydxprotocol-indexer/base'; import { startConsumer } from '@dydxprotocol-indexer/kafka'; import { blockHeightRefresher, perpetualMarketRefresher } from '@dydxprotocol-indexer/postgres'; @@ -39,6 +41,18 @@ async function start(): Promise { startBugsnag(); + logger.info({ + at: 'index#start', + message: 'Getting instance id...', + }); + + await setInstanceId(); + + logger.info({ + at: 'index#start', + message: `Got instance id ${getInstanceId()}.`, + }); + // Initialize PerpetualMarkets and BlockHeight cache await Promise.all([ blockHeightRefresher.updateBlockHeight(), diff --git a/indexer/services/socks/src/lib/message-forwarder.ts b/indexer/services/socks/src/lib/message-forwarder.ts index 9e277f39b5..c31325934c 100644 --- a/indexer/services/socks/src/lib/message-forwarder.ts +++ b/indexer/services/socks/src/lib/message-forwarder.ts @@ -1,5 +1,6 @@ import { stats, + getInstanceId, logger, InfoObject, } from '@dydxprotocol-indexer/base'; @@ -98,7 +99,11 @@ export class MessageForwarder { const batch: Batch = payload.batch; const topic: string = batch.topic; const partition: string = batch.partition.toString(); - const metricTags: Record = { topic, partition }; + const metricTags: Record = { + topic, + partition, + instance: getInstanceId(), + }; if (batch.isEmpty()) { logger.error({ at: 'on-batch#onBatch', @@ -195,6 +200,7 @@ export class MessageForwarder { start - Number(message.timestamp), config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, { + instance: getInstanceId(), topic, }, ); @@ -227,6 +233,7 @@ export class MessageForwarder { end - startForwardMessage, config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, { + instance: getInstanceId(), topic, channel: String(messageToForward.channel), }, @@ -239,6 +246,7 @@ export class MessageForwarder { startForwardMessage - Number(originalMessageTimestamp), config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, { + instance: getInstanceId(), topic, event_type: String(message.headers?.event_type), }, @@ -252,6 +260,9 @@ export class MessageForwarder { `${config.SERVICE_NAME}.message_to_forward`, 1, config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, + { + instance: getInstanceId(), + }, ); if (!this.subscriptions.subscriptions[message.channel] && @@ -323,6 +334,9 @@ export class MessageForwarder { `${config.SERVICE_NAME}.forward_to_client_success`, numClientsForwarded, config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, + { + instance: getInstanceId(), + }, ); forwardedToSubscribers = true; } @@ -333,6 +347,9 @@ export class MessageForwarder { `${config.SERVICE_NAME}.forward_message_with_subscribers`, 1, config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, + { + instance: getInstanceId(), + }, ); } } @@ -425,13 +442,25 @@ export class MessageForwarder { message: 'Attempted to forward batched messages, but connection did not exist', connectionId, }); - stats.increment(`${config.SERVICE_NAME}.forward_to_client_batch_error`, 1); + stats.increment( + `${config.SERVICE_NAME}.forward_to_client_batch_error`, + 1, + { + instance: getInstanceId(), + }, + ); this.subscriptions.unsubscribe(connectionId, channel, id); return; } this.index.connections[connectionId].messageId += 1; - stats.increment(`${config.SERVICE_NAME}.forward_to_client_batch_success`, 1); + stats.increment( + `${config.SERVICE_NAME}.forward_to_client_batch_success`, + 1, + { + instance: getInstanceId(), + }, + ); sendMessage( connection.ws, connectionId, @@ -475,7 +504,13 @@ export class MessageForwarder { message: 'Attempted to forward message, but connection did not exist', connectionId, }); - stats.increment(`${config.SERVICE_NAME}.forward_to_client_error`, 1); + stats.increment( + `${config.SERVICE_NAME}.forward_to_client_error`, + 1, + { + instance: getInstanceId(), + }, + ); this.subscriptions.unsubscribe(connectionId, message.channel, message.id); return 0; } diff --git a/indexer/services/socks/src/lib/subscription.ts b/indexer/services/socks/src/lib/subscription.ts index c684367f59..48e3f7f8ec 100644 --- a/indexer/services/socks/src/lib/subscription.ts +++ b/indexer/services/socks/src/lib/subscription.ts @@ -1,5 +1,6 @@ import { AxiosSafeServerError, + getInstanceId, logger, stats, } from '@dydxprotocol-indexer/base'; @@ -179,6 +180,7 @@ export class Subscriptions { 1, undefined, { + instance: getInstanceId(), channel, }, ); @@ -189,6 +191,7 @@ export class Subscriptions { Date.now() - startGetInitialResponse, undefined, { + instance: getInstanceId(), channel, }, ); @@ -280,12 +283,16 @@ export class Subscriptions { `${config.SERVICE_NAME}.subscriptions.channel_size`, this.subscribedIdsPerChannel[channel].size, { + instance: getInstanceId(), channel, }, ); stats.timing( `${config.SERVICE_NAME}.subscribe_send_message`, Date.now() - startSend, + { + instance: getInstanceId(), + }, ); } @@ -341,6 +348,7 @@ export class Subscriptions { `${config.SERVICE_NAME}.subscriptions.channel_size`, this.subscribedIdsPerChannel[channel].size, { + instance: getInstanceId(), channel, }, ); diff --git a/indexer/services/socks/src/websocket/index.ts b/indexer/services/socks/src/websocket/index.ts index ad2d7edf83..795f6a7527 100644 --- a/indexer/services/socks/src/websocket/index.ts +++ b/indexer/services/socks/src/websocket/index.ts @@ -1,5 +1,5 @@ import { - InfoObject, logger, safeJsonStringify, stats, + InfoObject, getInstanceId, logger, safeJsonStringify, stats, } from '@dydxprotocol-indexer/base'; import { v4 as uuidv4 } from 'uuid'; import WebSocket from 'ws'; @@ -108,10 +108,19 @@ export class Index { headers: req.headers, numConcurrentConnections, }); - stats.increment(`${config.SERVICE_NAME}.num_connections`, 1); + stats.increment( + `${config.SERVICE_NAME}.num_connections`, + 1, + { + instance: getInstanceId(), + }, + ); stats.gauge( `${config.SERVICE_NAME}.num_concurrent_connections`, numConcurrentConnections, + { + instance: getInstanceId(), + }, ); try { @@ -179,7 +188,11 @@ export class Index { stats.increment( `${config.SERVICE_NAME}.num_disconnects`, 1, - { code: String(code), reason: String(reason) }, + { + code: String(code), + reason: String(reason), + instance: getInstanceId(), + }, ); this.disconnect(connectionId); @@ -216,6 +229,9 @@ export class Index { `${config.SERVICE_NAME}.on_message`, 1, config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, + { + instance: getInstanceId(), + }, ); if (!this.connections[connectionId]) { logger.info({ @@ -324,6 +340,9 @@ export class Index { `${config.SERVICE_NAME}.message_received_${parsed.type}`, 1, config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, + { + instance: getInstanceId(), + }, ); }