diff --git a/src/filters/erc20_transfer_events.ts b/src/filters/erc20_transfer_events.ts index ed5e3c7a..6e1a0e18 100644 --- a/src/filters/erc20_transfer_events.ts +++ b/src/filters/erc20_transfer_events.ts @@ -2,23 +2,41 @@ import { Web3Source } from '../data_sources/events/web3'; import { Event, Transaction } from '../entities'; import { getParseTxsAsync } from '../scripts/utils/web3_utils'; -export async function filterERC20TransferEventsGetContext(events: Event[], web3Source: Web3Source): Promise { +export async function filterERC20TransferEventsGetContext( + events: Event[], + web3Source: Web3Source, + allowedTxnList?: Set, +): Promise { if (events.length > 0) { const txHashes = events.map((log: Event) => log.transactionHash); - const txData = await getParseTxsAsync(web3Source, txHashes); - const filteredTxsHashes = txData.parsedTxs - .filter((tx: Transaction) => tx.quoteId) - .map((tx: Transaction) => tx.transactionHash); + let validTxHashSet: Set; + if (allowedTxnList) { + validTxHashSet = allowedTxnList; + } else { + const txData = await getParseTxsAsync(web3Source, txHashes); + const filteredTxsHashes = txData.parsedTxs + .filter((tx: Transaction) => tx.quoteId) + .map((tx: Transaction) => tx.transactionHash); + + validTxHashSet = new Set(filteredTxsHashes); + } - const validTxHashSet = new Set(filteredTxsHashes); const filteredLogs = events.filter((log: Event) => validTxHashSet.has(log.transactionHash)); return filteredLogs.filter((e) => e !== null); } return []; } -export function filterERC20TransferEvents(events: Event[], transaction: Transaction): Event[] { - if (transaction.quoteId) { +export function filterERC20TransferEvents( + events: Event[], + transaction: Transaction, + allowedTxnList?: Set, +): Event[] { + if (allowedTxnList) { + if (allowedTxnList.has(transaction.transactionHash)) { + return events.filter((e) => e !== null); + } + } else if (transaction.quoteId) { return events.filter((e) => e !== null); } return []; diff --git a/src/scripts/pull_and_save_block_events.ts b/src/scripts/pull_and_save_block_events.ts index ea804c4d..1660f75c 100644 --- a/src/scripts/pull_and_save_block_events.ts +++ b/src/scripts/pull_and_save_block_events.ts @@ -73,15 +73,17 @@ const provider = web3Factory.getRpcProvider({ }); const web3Source = new Web3Source(provider, EVM_RPC_URL); -function parseBlockTransactionsEvents(fullBlock: FullBlock): ParsedFullBlock { +function parseBlockTransactionsEvents(fullBlock: FullBlock, allowedTxnList?: Set): ParsedFullBlock { const parsedBlock = parseBlock({ ...fullBlock, transactions: [''] }); const usefullTxs: ParsedTransaction[] = fullBlock.transactions .map((transaction: FullTransaction): ParsedTransaction | null => { - const parsedTransactionEvents = parseTransactionEvents(transaction); - - if (parsedTransactionEvents.parsedTransaction !== null) { - return parsedTransactionEvents; + // Only parse transactions if they are in the allowed list or if no list is provided. + if (!allowedTxnList || allowedTxnList.has(transaction.hash)) { + const parsedTransactionEvents = parseTransactionEvents(transaction); + if (parsedTransactionEvents.parsedTransaction !== null) { + return parsedTransactionEvents; + } } return null; }) @@ -270,6 +272,7 @@ async function getParseSaveBlocksTransactionsEvents( producer: Producer | null, newBlocks: EVMBlock[], allowPartialSuccess: boolean, + allowedTxnList?: Set, ): Promise { const blockNumbers = newBlocks.map((newBlock) => newBlock.number!); @@ -280,7 +283,7 @@ async function getParseSaveBlocksTransactionsEvents( const newBlocksReceipts = await web3Source.getBatchBlockReceiptsAsync(blockNumbers); const filteredNewBlocksReceipts = newBlocksReceipts.filter( - (blockReciepts) => blockReciepts !== null && blockReciepts !== undefined, + (blockReceipts) => blockReceipts !== null && blockReceipts !== undefined, ); if (newBlocksReceipts.length !== filteredNewBlocksReceipts.length) { @@ -288,12 +291,12 @@ async function getParseSaveBlocksTransactionsEvents( return false; } const { nullOnlyAtEnd } = newBlocksReceipts.reduce( - (state, blockReciepts) => { - if (state.hasSeenNull && blockReciepts !== null) { + (state, blockReceipts) => { + if (state.hasSeenNull && blockReceipts !== null) { state.nullOnlyAtEnd = false; } - if (newBlocksReceipts === null) { + if (blockReceipts === null) { state.hasSeenNull = true; } return state; @@ -302,9 +305,9 @@ async function getParseSaveBlocksTransactionsEvents( ); if (nullOnlyAtEnd) { - logger.info('Last block(s) reciepts not found, retrying that block(s) on the next run'); + logger.info('Last block(s) receipts not found, retrying those block(s) on the next run'); } else { - logger.error("Missing intermideate block reciepts, can't continue. Retrying next run"); + logger.error("Missing intermediate block receipts, can't continue. Retrying next run"); logger.error(newBlocksReceipts); return false; } @@ -327,7 +330,9 @@ async function getParseSaveBlocksTransactionsEvents( return { ...newBlocks[blockIndex], transactions: transactionsWithLogs }; }); - const parsedFullBlocks = fullBlocks.map(parseBlockTransactionsEvents); + const parsedFullBlocks = fullBlocks.map((fullBlock) => + parseBlockTransactionsEvents(fullBlock, allowedTxnList) + ); const eventTables = eventScrperProps .filter((props) => props.enabled) @@ -371,9 +376,26 @@ export class BlockEventsScraper { const blockNumbers = oldestBlocksToBackfill.map( (backfillBlock: { block_number: number }) => backfillBlock.block_number, ); + const allowedTxnListQuery = await connection.query( + `SELECT DISTINCT txn_hash + FROM ${SCHEMA}.tx_backfill + WHERE block_number IN (${blockNumbers.join(',')}) AND done = false` + ); + + const allowedTxnList = new Set( + allowedTxnListQuery.map((row: { txn_hash: string }) => row.txn_hash) + ); const newBlocks = await web3Source.getBatchBlockInfoAsync(blockNumbers, true); - const success = await getParseSaveBlocksTransactionsEvents(connection, producer, newBlocks, false); + + const success = await getParseSaveBlocksTransactionsEvents( + connection, + producer, + newBlocks, + false, + allowedTxnList + ); + if (success) { const newBlockNumbers = newBlocks.map((block) => block.number); const queryRunner = connection.createQueryRunner();