Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix parquet event processor to handle miss-matching events and event … #474

Merged
merged 2 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
};
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
use itertools::Itertools;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -85,23 +84,31 @@ impl Event {
block_height: i64,
event_size_info: &[EventSizeInfo],
block_timestamp: chrono::NaiveDateTime,
is_user_txn_type: bool,
) -> Vec<Self> {
// Ensure that lengths match, otherwise log and panic to investigate
// event_size_info will be used for user transactions only, no promises for other transactions.
// If event_size_info is missing due to fewer or no events, it defaults to 0.
// No need to backfill as event_size_info is primarily for debugging user transactions.
if events.len() != event_size_info.len() {
tracing::error!(
tracing::warn!(
events_len = events.len(),
event_size_info_len = event_size_info.len(),
txn_version,
"Length mismatch: events size does not match event_size_info size.",
);
panic!("Length mismatch: events len does not match event_size_info len");
if is_user_txn_type {
panic!("Event size info is missing for user transaction.")
}
}

events
.iter()
.zip_eq(event_size_info.iter())
.enumerate()
.map(|(index, (event, size_info))| {
.map(|(index, event)| {
let size_info = event_size_info.get(index).unwrap_or(&EventSizeInfo {
type_tag_bytes: 0,
total_bytes: 0,
});
Self::from_event(
event,
txn_version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,14 @@ impl ProcessorTrait for ParquetEventsProcessor {
},
};
let default = vec![];
let mut is_user_txn_type = false;
let raw_events = match txn_data {
TxnData::BlockMetadata(tx_inner) => &tx_inner.events,
TxnData::Genesis(tx_inner) => &tx_inner.events,
TxnData::User(tx_inner) => &tx_inner.events,
TxnData::User(tx_inner) => {
is_user_txn_type = true;
&tx_inner.events
},
TxnData::Validator(txn) => &txn.events,
_ => &default,
};
Expand All @@ -133,6 +137,7 @@ impl ProcessorTrait for ParquetEventsProcessor {
block_height,
size_info.event_size_info.as_slice(),
block_timestamp,
is_user_txn_type,
);
transaction_version_to_struct_count
.entry(txn_version)
Expand Down
Loading