Skip to content

Commit

Permalink
Adds requiredTxnList for tx_backfill. (#160)
Browse files Browse the repository at this point in the history
* test fix

* local dev

* fix

* make prettier

* fix

* Restores docker-compose-dev.yaml

* Changes wording to requiredTxnList, as it forces these txns and allows any others.

* Changes wording to requiredTxnList in remaining file.

* Prettier.

---------

Co-authored-by: Andrés Elizondo <[email protected]>
  • Loading branch information
0xyijing and Andrés Elizondo authored Nov 8, 2024
1 parent 277747c commit c541a45
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 24 deletions.
8 changes: 6 additions & 2 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,12 @@ export type EventScraperProps = {
deleteOptions?: DeleteOptions;
tokenMetadataMap?: TokenMetadataMap;
postProcess?: any;
filterFunction?: (events: Event[], transaction: Transaction) => Event[];
filterFunctionGetContext?: (events: Event[], web3Source: Web3Source) => Promise<Event[]>;
filterFunction?: (events: Event[], transaction: Transaction, requiredTxnList?: Set<string>) => Event[];
filterFunctionGetContext?: (
events: Event[],
web3Source: Web3Source,
requiredTxnList?: Set<string>,
) => Promise<Event[]>;
};

export const eventScrperProps: EventScraperProps[] = [
Expand Down
39 changes: 30 additions & 9 deletions src/filters/erc20_transfer_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,45 @@ import { Web3Source } from '../data_sources/events/web3';
import { Event, Transaction } from '../entities';
import { getParseTxsAsync } from '../scripts/utils/web3_utils';

export async function filterERC20TransferEventsGetContext(events: Event[], web3Source: Web3Source): Promise<Event[]> {
export async function filterERC20TransferEventsGetContext(
events: Event[],
web3Source: Web3Source,
requiredTxnList?: Set<string>,
): Promise<Event[]> {
if (events.length > 0) {
const txHashes = events.map((log: Event) => log.transactionHash);
const txData = await getParseTxsAsync(web3Source, txHashes);
const filteredTxsHashes = txData.parsedTxs
.filter((tx: Transaction) => tx.quoteId)
.map((tx: Transaction) => tx.transactionHash);
let validTxHashSet: Set<string>;
if (requiredTxnList && requiredTxnList.size > 0) {
validTxHashSet = requiredTxnList;
} else {
const txData = await getParseTxsAsync(web3Source, txHashes);
const filteredTxsHashes = txData.parsedTxs
.filter((tx: Transaction) => tx.quoteId)
.map((tx: Transaction) => tx.transactionHash);

validTxHashSet = new Set(filteredTxsHashes);
}

const validTxHashSet = new Set(filteredTxsHashes);
const filteredLogs = events.filter((log: Event) => validTxHashSet.has(log.transactionHash));
return filteredLogs.filter((e) => e !== null);
}
return [];
}

export function filterERC20TransferEvents(events: Event[], transaction: Transaction): Event[] {
export function filterERC20TransferEvents(
events: Event[],
transaction: Transaction,
requiredTxnList?: Set<string>,
): Event[] {
const filteredEvents = new Set<Event>();

if (requiredTxnList && requiredTxnList.size > 0 && requiredTxnList.has(transaction.transactionHash)) {
events.filter((e) => e !== null).forEach((e) => filteredEvents.add(e));
}

if (transaction.quoteId) {
return events.filter((e) => e !== null);
events.filter((e) => e !== null).forEach((e) => filteredEvents.add(e));
}
return [];

return Array.from(filteredEvents);
}
45 changes: 32 additions & 13 deletions src/scripts/pull_and_save_block_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ const provider = web3Factory.getRpcProvider({
});
const web3Source = new Web3Source(provider, EVM_RPC_URL);

function parseBlockTransactionsEvents(fullBlock: FullBlock): ParsedFullBlock {
function parseBlockTransactionsEvents(fullBlock: FullBlock, requiredTxnList?: Set<string>): ParsedFullBlock {
const parsedBlock = parseBlock({ ...fullBlock, transactions: [''] });

const usefullTxs: ParsedTransaction[] = fullBlock.transactions
.map((transaction: FullTransaction): ParsedTransaction | null => {
const parsedTransactionEvents = parseTransactionEvents(transaction);

const parsedTransactionEvents = parseTransactionEvents(transaction, requiredTxnList);
if (parsedTransactionEvents.parsedTransaction !== null) {
return parsedTransactionEvents;
}
Expand All @@ -102,7 +101,7 @@ function parseBlockTransactionsEvents(fullBlock: FullBlock): ParsedFullBlock {
};
}

function parseTransactionEvents(transaction: FullTransaction): ParsedTransaction {
function parseTransactionEvents(transaction: FullTransaction, requiredTxnList?: Set<string>): ParsedTransaction {
if (transaction.input === '0x') {
return {
parsedTransaction: null,
Expand All @@ -122,7 +121,7 @@ function parseTransactionEvents(transaction: FullTransaction): ParsedTransaction
const parsedLogs = baseFilteredLogs.map((log: LogEntry) => props.parser(log));

const filteredLogs = props.filterFunction
? props.filterFunction(parsedLogs, parsedTransaction)
? props.filterFunction(parsedLogs, parsedTransaction, requiredTxnList)
: parsedLogs;

const postProcessedLogs = props.postProcess ? props.postProcess(filteredLogs) : filteredLogs;
Expand Down Expand Up @@ -270,6 +269,7 @@ async function getParseSaveBlocksTransactionsEvents(
producer: Producer | null,
newBlocks: EVMBlock[],
allowPartialSuccess: boolean,
requiredTxnList?: Set<string>,
): Promise<boolean> {
const blockNumbers = newBlocks.map((newBlock) => newBlock.number!);

Expand All @@ -280,20 +280,20 @@ async function getParseSaveBlocksTransactionsEvents(
const newBlocksReceipts = await web3Source.getBatchBlockReceiptsAsync(blockNumbers);

const filteredNewBlocksReceipts = newBlocksReceipts.filter(
(blockReciepts) => blockReciepts !== null && blockReciepts !== undefined,
(blockReceipts) => blockReceipts !== null && blockReceipts !== undefined,
);

if (newBlocksReceipts.length !== filteredNewBlocksReceipts.length) {
if (!allowPartialSuccess) {
return false;
}
const { nullOnlyAtEnd } = newBlocksReceipts.reduce(
(state, blockReciepts) => {
if (state.hasSeenNull && blockReciepts !== null) {
(state, blockReceipts) => {
if (state.hasSeenNull && blockReceipts !== null) {
state.nullOnlyAtEnd = false;
}

if (newBlocksReceipts === null) {
if (blockReceipts === null) {
state.hasSeenNull = true;
}
return state;
Expand All @@ -302,9 +302,9 @@ async function getParseSaveBlocksTransactionsEvents(
);

if (nullOnlyAtEnd) {
logger.info('Last block(s) reciepts not found, retrying that block(s) on the next run');
logger.info('Last block(s) receipts not found, retrying those block(s) on the next run');
} else {
logger.error("Missing intermideate block reciepts, can't continue. Retrying next run");
logger.error("Missing intermediate block receipts, can't continue. Retrying next run");
logger.error(newBlocksReceipts);
return false;
}
Expand All @@ -327,7 +327,9 @@ async function getParseSaveBlocksTransactionsEvents(
return { ...newBlocks[blockIndex], transactions: transactionsWithLogs };
});

const parsedFullBlocks = fullBlocks.map(parseBlockTransactionsEvents);
const parsedFullBlocks = fullBlocks.map((fullBlock) =>
parseBlockTransactionsEvents(fullBlock, requiredTxnList),
);

const eventTables = eventScrperProps
.filter((props) => props.enabled)
Expand Down Expand Up @@ -371,9 +373,26 @@ export class BlockEventsScraper {
const blockNumbers = oldestBlocksToBackfill.map(
(backfillBlock: { block_number: number }) => backfillBlock.block_number,
);
const requiredTxnListQuery = await connection.query(
`SELECT DISTINCT transaction_hash
FROM ${SCHEMA}.tx_backfill
WHERE block_number IN (${blockNumbers.join(',')}) AND done = false`,
);

const requiredTxnList = new Set<string>(
requiredTxnListQuery.map((row: { transaction_hash: string }) => row.transaction_hash),
);

const newBlocks = await web3Source.getBatchBlockInfoAsync(blockNumbers, true);
const success = await getParseSaveBlocksTransactionsEvents(connection, producer, newBlocks, false);

const success = await getParseSaveBlocksTransactionsEvents(
connection,
producer,
newBlocks,
false,
requiredTxnList,
);

if (success) {
const newBlockNumbers = newBlocks.map((block) => block.number);
const queryRunner = connection.createQueryRunner();
Expand Down

0 comments on commit c541a45

Please sign in to comment.