Skip to content

Commit

Permalink
Make Kafka optional to Scrapers
Browse files Browse the repository at this point in the history
  • Loading branch information
Ktl-XV committed Dec 4, 2023
1 parent 8a961fd commit 8d1da5f
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 52 deletions.
9 changes: 5 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ services:
SCHEMA: 'events'
# FEAT_EXCLUSIVE_TOKENS_FROM_TRANSACTIONS: "true"
# TOKENS_FROM_TRANSACTIONS_START_BLOCK: 9193266
KAFKA_BROKERS: '${KAFKA_BROKERS}'
KAFKA_SSL: '${KAFKA_SSL}'
KAFKA_AUTH_USER: '${KAFKA_AUTH_USER}'
KAFKA_AUTH_PASSWORD: '${KAFKA_AUTH_PASSWORD}'
#KAFKA_BROKERS: '${KAFKA_BROKERS}'
#KAFKA_SSL: '${KAFKA_SSL}'
#KAFKA_AUTH_USER: '${KAFKA_AUTH_USER}'
#KAFKA_AUTH_PASSWORD: '${KAFKA_AUTH_PASSWORD}'
EP_DEPLOYMENT_BLOCK: 10247094
MAX_BLOCKS_TO_SEARCH: 1000
MAX_BLOCKS_TO_PULL: 1000
MAX_TX_TO_PULL: 1000
BLOCK_FINALITY_THRESHOLD: 0
SECONDS_BETWEEN_RUNS: 1
RESCRAPE_BLOCKS: 10
FEAT_UNISWAP_V2_VIP_SWAP_EVENT: "true"
UNISWAP_V2_VIP_SWAP_SOURCES: "UniswapV2,SushiSwap"
UNISWAP_V2_VIP_SWAP_START_BLOCK: 10917104
Expand Down
17 changes: 11 additions & 6 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
DEFAULT_FEAT_LIMIT_ORDERS,
DEFAULT_FEAT_META_TRANSACTION_EXECUTED_EVENT,
DEFAULT_FEAT_NFT,
DEFAULT_FEAT_ONCHAIN_GOVERNANCE,
DEFAULT_FEAT_OTC_ORDERS,
DEFAULT_FEAT_PLP_SWAP_EVENT,
DEFAULT_FEAT_POLYGON_RFQM_PAYMENTS,
Expand All @@ -19,26 +20,28 @@ import {
DEFAULT_FEAT_UNISWAP_V2_PAIR_CREATED_EVENT,
DEFAULT_FEAT_UNISWAP_V2_SYNC_EVENT,
DEFAULT_FEAT_UNISWAP_V2_VIP_SWAP_EVENT,
DEFAULT_FEAT_UNISWAP_V3_VIP_SWAP_EVENT,
DEFAULT_FEAT_UNISWAP_V3_SWAP_EVENT,
DEFAULT_FEAT_UNISWAP_V3_VIP_SWAP_EVENT,
DEFAULT_FEAT_V3_FILL_EVENT,
DEFAULT_FEAT_V3_NATIVE_FILL,
DEFAULT_FEAT_WRAP_UNWRAP_NATIVE_EVENT,
DEFAULT_FEAT_WRAP_UNWRAP_NATIVE_TRANSFER_EVENT,
DEFAULT_LOCAL_POSTGRES_URI,
DEFAULT_MAX_BLOCKS_REORG,
DEFAULT_MAX_BLOCKS_TO_PULL,
DEFAULT_MAX_BLOCKS_TO_SEARCH,
DEFAULT_MAX_BLOCKS_REORG,
DEFAULT_MAX_TIME_TO_SEARCH,
DEFAULT_MAX_TX_TO_PULL,
DEFAULT_METRICS_PATH,
DEFAULT_MINUTES_BETWEEN_RUNS,
DEFAULT_PROMETHEUS_PORT,
DEFAULT_RESCRAPE_BLOCKS,
DEFAULT_STAKING_POOLS_JSON_URL,
DEFAULT_STAKING_POOLS_METADATA_JSON_URL,
DEFAULT_FEAT_ONCHAIN_GOVERNANCE,
DEFAULT_FEAT_WRAP_UNWRAP_NATIVE_EVENT,
DEFAULT_FEAT_WRAP_UNWRAP_NATIVE_TRANSFER_EVENT,
} from './constants';

import { logger } from './utils';

const throwError = (err: string) => {
throw new Error(err);
};
Expand Down Expand Up @@ -407,7 +410,7 @@ validateAddress(

export const KAFKA_BROKERS = process.env.KAFKA_BROKERS ? process.env.KAFKA_BROKERS.split(',') : [];
if (KAFKA_BROKERS.length === 0) {
throwError(`KAFKA_BROKERS is missing`);
logger.warn('KAFKA_BROKERS is empty, disabling kafka');
}
export const KAFKA_AUTH_USER = process.env.KAFKA_AUTH_USER!;
export const KAFKA_AUTH_PASSWORD = process.env.KAFKA_AUTH_PASSWORD!;
Expand All @@ -431,6 +434,8 @@ validateStartBlock(
FEAT_SOCKET_BRIDGE_EVENT,
);

export const RESCRAPE_BLOCKS = getIntConfig('RESCRAPE_BLOCKS', DEFAULT_RESCRAPE_BLOCKS);

function getBoolConfig(env: string, defaultValue: boolean): boolean {
if (Object.prototype.hasOwnProperty.call(process.env, env)) {
return process.env[env] === 'true';
Expand Down
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export const DEFAULT_MAX_BLOCKS_TO_PULL = 120;
export const DEFAULT_MAX_BLOCKS_TO_SEARCH = 120;
export const DEFAULT_MAX_TX_TO_PULL = 1000;
export const DEFAULT_BLOCK_FINALITY_THRESHOLD = 10;
export const DEFAULT_RESCRAPE_BLOCKS = 0;
export const DEFAULT_MINUTES_BETWEEN_RUNS = 3;
export const DEFAULT_STAKING_POOLS_JSON_URL =
'https://raw.githubusercontent.com/0xProject/0x-staking-pool-registry/master/staking_pools.json';
Expand Down
34 changes: 20 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,24 @@ import { startMetricsServer } from './utils/metrics';
import { TokenMetadataSingleton } from './tokenMetadataSingleton';
import { UniV2PoolSingleton } from './uniV2PoolSingleton';

const kafka = new Kafka({
clientId: 'event-pipeline',
brokers: KAFKA_BROKERS,
ssl: KAFKA_SSL,
sasl: KAFKA_SSL
? {
mechanism: 'plain',
username: KAFKA_AUTH_USER,
password: KAFKA_AUTH_PASSWORD,
}
: undefined,
});
let producer: Producer | null = null;

const producer = kafka.producer();
if (KAFKA_BROKERS.length > 0) {
const kafka = new Kafka({
clientId: 'event-pipeline',
brokers: KAFKA_BROKERS,
ssl: KAFKA_SSL,
sasl: KAFKA_SSL
? {
mechanism: 'plain',
username: KAFKA_AUTH_USER,
password: KAFKA_AUTH_PASSWORD,
}
: undefined,
});

producer = kafka.producer();
}

logger.info('App is running...');

Expand All @@ -68,7 +72,9 @@ chainIdChecker.checkChainId(CHAIN_ID);
// run pull and save events
createConnection(ormConfig as ConnectionOptions)
.then(async (connection) => {
await producer.connect();
if (producer) {
await producer.connect();
}
await TokenMetadataSingleton.getInstance(connection, producer);
if (FEAT_UNISWAP_V2_PAIR_CREATED_EVENT) {
await UniV2PoolSingleton.initInstance(connection);
Expand Down
4 changes: 2 additions & 2 deletions src/scripts/utils/event_abi_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { TokenMetadataMap, extractTokensFromLogs, getParseSaveTokensAsync, getPa

import { RawLogEntry } from 'ethereum-types';

import { CHAIN_NAME_LOWER, MAX_BLOCKS_REORG, MAX_BLOCKS_TO_SEARCH, SCHEMA } from '../../config';
import { CHAIN_NAME_LOWER, MAX_BLOCKS_REORG, MAX_BLOCKS_TO_SEARCH, RESCRAPE_BLOCKS, SCHEMA } from '../../config';
import { LastBlockProcessed } from '../../entities';
import { SCAN_END_BLOCK, RPC_LOGS_ERROR, SCAN_RESULTS, SCAN_START_BLOCK, SKIPPED_EVENTS } from '../../utils/metrics';

Expand Down Expand Up @@ -406,7 +406,7 @@ export const getStartBlockAsync = async (
};
}
return {
startBlockNumber: lastKnownBlockNumber + 1,
startBlockNumber: lastKnownBlockNumber - (RESCRAPE_BLOCKS - 1),
hasLatestBlockChanged: true,
reorgLikely: false,
};
Expand Down
2 changes: 1 addition & 1 deletion src/tokenMetadataSingleton.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class TokenMetadataSingleton {
this.tokens = [];
}

static async getInstance(connection: Connection, producer: Producer): Promise<TokenMetadataSingleton> {
static async getInstance(connection: Connection, producer: Producer | null): Promise<TokenMetadataSingleton> {
if (!TokenMetadataSingleton.instance) {
TokenMetadataSingleton.instance = new TokenMetadataSingleton();
const tmp = await connection
Expand Down
56 changes: 31 additions & 25 deletions src/utils/kafka_send.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface CommandMessage {
}

export async function kafkaSendRawAsync(
producer: Producer,
producer: Producer | null,
topic: string,
keyFields: string[],
payload: any[],
Expand All @@ -38,51 +38,57 @@ export async function kafkaSendRawAsync(
let currentSize = 0;
let messages = [];

for (const message of payload) {
const jsonMessage = JSON.stringify(message);
const keyValues = keyFields.map((keyField) => String(message[keyField]));
const key = keyValues.join('-');
const messageLength = jsonMessage.length;
if (producer !== null) {
for (const message of payload) {
const jsonMessage = JSON.stringify(message);
const keyValues = keyFields.map((keyField) => String(message[keyField]));
const key = keyValues.join('-');
const messageLength = jsonMessage.length;

if (currentSize + messageLength >= MAX_SIZE) {
await producer.send({
topic,
messages,
});
currentSize = 0;
messages = [];
if (currentSize + messageLength >= MAX_SIZE) {
await producer.send({
topic,
messages,
});
currentSize = 0;
messages = [];
}
currentSize += messageLength;
messages.push({ key, value: jsonMessage });
}
currentSize += messageLength;
messages.push({ key, value: jsonMessage });
}
await producer.send({
topic,
messages,
});
await producer.send({
topic,
messages,
});

logger.info(`Emitted ${payload.length} messages to ${topic}`);
logger.info(`Emitted ${payload.length} messages to ${topic}`);
}
}

export async function kafkaSendAsync(
producer: Producer,
producer: Producer | null,
topic: string,
keyFields: string[],
payload: any[],
): Promise<void> {
const dataPayload = payload.map((message) => {
return { type: 'data', message };
});
await kafkaSendRawAsync(producer, topic, keyFields, dataPayload);
if (producer != null) {
await kafkaSendRawAsync(producer, topic, keyFields, dataPayload);
}
}

export async function kafkaSendCommandAsync(
producer: Producer,
producer: Producer | null,
topic: string,
keyFields: string[],
payload: CommandMessage[],
): Promise<void> {
const commandPayload = payload.map((message) => {
return { type: 'command', message };
});
await kafkaSendRawAsync(producer, topic, keyFields, commandPayload);
if (producer != null) {
await kafkaSendRawAsync(producer, topic, keyFields, commandPayload);
}
}

0 comments on commit 8d1da5f

Please sign in to comment.