Skip to content

Commit

Permalink
test fix
Browse files Browse the repository at this point in the history
  • Loading branch information
0xyijing committed Oct 9, 2024
1 parent 58bceb0 commit 5fa45e9
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 21 deletions.
34 changes: 26 additions & 8 deletions src/filters/erc20_transfer_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,41 @@ import { Web3Source } from '../data_sources/events/web3';
import { Event, Transaction } from '../entities';
import { getParseTxsAsync } from '../scripts/utils/web3_utils';

export async function filterERC20TransferEventsGetContext(events: Event[], web3Source: Web3Source): Promise<Event[]> {
export async function filterERC20TransferEventsGetContext(
events: Event[],
web3Source: Web3Source,
allowedTxnList?: Set<string>,
): Promise<Event[]> {
if (events.length > 0) {
const txHashes = events.map((log: Event) => log.transactionHash);
const txData = await getParseTxsAsync(web3Source, txHashes);
const filteredTxsHashes = txData.parsedTxs
.filter((tx: Transaction) => tx.quoteId)
.map((tx: Transaction) => tx.transactionHash);
let validTxHashSet: Set<string>;
if (allowedTxnList) {
validTxHashSet = allowedTxnList;
} else {
const txData = await getParseTxsAsync(web3Source, txHashes);
const filteredTxsHashes = txData.parsedTxs
.filter((tx: Transaction) => tx.quoteId)
.map((tx: Transaction) => tx.transactionHash);

validTxHashSet = new Set(filteredTxsHashes);
}

const validTxHashSet = new Set(filteredTxsHashes);
const filteredLogs = events.filter((log: Event) => validTxHashSet.has(log.transactionHash));
return filteredLogs.filter((e) => e !== null);
}
return [];
}

export function filterERC20TransferEvents(events: Event[], transaction: Transaction): Event[] {
if (transaction.quoteId) {
export function filterERC20TransferEvents(
events: Event[],
transaction: Transaction,
allowedTxnList?: Set<string>,
): Event[] {
if (allowedTxnList) {
if (allowedTxnList.has(transaction.transactionHash)) {
return events.filter((e) => e !== null);
}
} else if (transaction.quoteId) {
return events.filter((e) => e !== null);
}
return [];
Expand Down
48 changes: 35 additions & 13 deletions src/scripts/pull_and_save_block_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,17 @@ const provider = web3Factory.getRpcProvider({
});
const web3Source = new Web3Source(provider, EVM_RPC_URL);

function parseBlockTransactionsEvents(fullBlock: FullBlock): ParsedFullBlock {
function parseBlockTransactionsEvents(fullBlock: FullBlock, allowedTxnList?: Set<string>): ParsedFullBlock {
const parsedBlock = parseBlock({ ...fullBlock, transactions: [''] });

const usefullTxs: ParsedTransaction[] = fullBlock.transactions
.map((transaction: FullTransaction): ParsedTransaction | null => {
const parsedTransactionEvents = parseTransactionEvents(transaction);

if (parsedTransactionEvents.parsedTransaction !== null) {
return parsedTransactionEvents;
// Only parse transactions if they are in the allowed list or if no list is provided.
if (!allowedTxnList || allowedTxnList.has(transaction.hash)) {
const parsedTransactionEvents = parseTransactionEvents(transaction);
if (parsedTransactionEvents.parsedTransaction !== null) {
return parsedTransactionEvents;
}
}
return null;
})
Expand Down Expand Up @@ -270,6 +272,7 @@ async function getParseSaveBlocksTransactionsEvents(
producer: Producer | null,
newBlocks: EVMBlock[],
allowPartialSuccess: boolean,
allowedTxnList?: Set<string>,
): Promise<boolean> {
const blockNumbers = newBlocks.map((newBlock) => newBlock.number!);

Expand All @@ -280,20 +283,20 @@ async function getParseSaveBlocksTransactionsEvents(
const newBlocksReceipts = await web3Source.getBatchBlockReceiptsAsync(blockNumbers);

const filteredNewBlocksReceipts = newBlocksReceipts.filter(
(blockReciepts) => blockReciepts !== null && blockReciepts !== undefined,
(blockReceipts) => blockReceipts !== null && blockReceipts !== undefined,
);

if (newBlocksReceipts.length !== filteredNewBlocksReceipts.length) {
if (!allowPartialSuccess) {
return false;
}
const { nullOnlyAtEnd } = newBlocksReceipts.reduce(
(state, blockReciepts) => {
if (state.hasSeenNull && blockReciepts !== null) {
(state, blockReceipts) => {
if (state.hasSeenNull && blockReceipts !== null) {
state.nullOnlyAtEnd = false;
}

if (newBlocksReceipts === null) {
if (blockReceipts === null) {
state.hasSeenNull = true;
}
return state;
Expand All @@ -302,9 +305,9 @@ async function getParseSaveBlocksTransactionsEvents(
);

if (nullOnlyAtEnd) {
logger.info('Last block(s) reciepts not found, retrying that block(s) on the next run');
logger.info('Last block(s) receipts not found, retrying those block(s) on the next run');
} else {
logger.error("Missing intermideate block reciepts, can't continue. Retrying next run");
logger.error("Missing intermediate block receipts, can't continue. Retrying next run");
logger.error(newBlocksReceipts);
return false;
}
Expand All @@ -327,7 +330,9 @@ async function getParseSaveBlocksTransactionsEvents(
return { ...newBlocks[blockIndex], transactions: transactionsWithLogs };
});

const parsedFullBlocks = fullBlocks.map(parseBlockTransactionsEvents);
const parsedFullBlocks = fullBlocks.map((fullBlock) =>
parseBlockTransactionsEvents(fullBlock, allowedTxnList)
);

const eventTables = eventScrperProps
.filter((props) => props.enabled)
Expand Down Expand Up @@ -371,9 +376,26 @@ export class BlockEventsScraper {
const blockNumbers = oldestBlocksToBackfill.map(
(backfillBlock: { block_number: number }) => backfillBlock.block_number,
);
const allowedTxnListQuery = await connection.query(
`SELECT DISTINCT txn_hash
FROM ${SCHEMA}.tx_backfill
WHERE block_number IN (${blockNumbers.join(',')}) AND done = false`
);

const allowedTxnList = new Set(
allowedTxnListQuery.map((row: { txn_hash: string }) => row.txn_hash)
);

const newBlocks = await web3Source.getBatchBlockInfoAsync(blockNumbers, true);
const success = await getParseSaveBlocksTransactionsEvents(connection, producer, newBlocks, false);

const success = await getParseSaveBlocksTransactionsEvents(
connection,
producer,
newBlocks,
false,
allowedTxnList
);

if (success) {
const newBlockNumbers = newBlocks.map((block) => block.number);
const queryRunner = connection.createQueryRunner();
Expand Down

0 comments on commit 5fa45e9

Please sign in to comment.