diff --git a/rust/processor/src/db/common/models/events_models/parquet_events.rs b/rust/processor/src/db/common/models/events_models/parquet_events.rs index 7e318dc11..683b6a1f7 100644 --- a/rust/processor/src/db/common/models/events_models/parquet_events.rs +++ b/rust/processor/src/db/common/models/events_models/parquet_events.rs @@ -9,11 +9,19 @@ use crate::{ }; use allocative_derive::Allocative; use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo}; +use lazy_static::lazy_static; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; // p99 currently is 303 so using 300 as a safe max length const EVENT_TYPE_MAX_LENGTH: usize = 300; +const DEFAULT_CREATION_NUMBER: i64 = 0; +const DEFAULT_SEQUENCE_NUMBER: i64 = 0; +lazy_static! { + pub static ref DEFAULT_ACCOUNT_ADDRESS: String = "NULL_ACCOUNT_ADDRESS".to_string(); + pub static ref DEFAULT_EVENT_TYPE: String = "NULL_EVENT_TYPE".to_string(); + pub static ref DEFAULT_EVENT_DATA: String = "NULL_EVENT_DATA".to_string(); +} #[derive(Allocative, Clone, Debug, Default, Deserialize, ParquetRecordWriter, Serialize)] pub struct Event { @@ -78,6 +86,31 @@ impl Event { } } + // This function is added to handle the txn with events filtered, but event_size_info is not filtered. + pub fn from_null_event( + txn_version: i64, + block_height: i64, + event_index: i64, + size_info: &EventSizeInfo, + block_timestamp: chrono::NaiveDateTime, + ) -> Self { + Event { + account_address: DEFAULT_ACCOUNT_ADDRESS.clone(), + creation_number: DEFAULT_CREATION_NUMBER, + sequence_number: DEFAULT_SEQUENCE_NUMBER, + txn_version, + block_height, + event_type: DEFAULT_EVENT_TYPE.clone(), + data: DEFAULT_EVENT_DATA.clone(), + event_index, + indexed_type: truncate_str(&DEFAULT_EVENT_TYPE, EVENT_TYPE_MAX_LENGTH), + type_tag_bytes: size_info.type_tag_bytes as i64, + total_bytes: size_info.total_bytes as i64, + event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes + block_timestamp, + } + } + pub fn from_events( events: &[EventPB], txn_version: i64, @@ -86,10 +119,12 @@ impl Event { block_timestamp: chrono::NaiveDateTime, is_user_txn_type: bool, ) -> Vec { + let mut temp_events = events.to_vec(); + // event_size_info will be used for user transactions only, no promises for other transactions. // If event_size_info is missing due to fewer or no events, it defaults to 0. // No need to backfill as event_size_info is primarily for debugging user transactions. - if events.len() != event_size_info.len() { + if temp_events.len() != event_size_info.len() { tracing::warn!( events_len = events.len(), event_size_info_len = event_size_info.len(), @@ -97,11 +132,16 @@ impl Event { "Length mismatch: events size does not match event_size_info size.", ); if is_user_txn_type { - panic!("Event size info is missing for user transaction.") + return handle_user_txn_type( + &mut temp_events, + txn_version, + event_size_info, + block_timestamp, + block_height, + ); } } - - events + temp_events .iter() .enumerate() .map(|(index, event)| { @@ -122,4 +162,43 @@ impl Event { } } +fn handle_user_txn_type( + temp_events: &mut Vec, + txn_version: i64, + event_size_info: &[EventSizeInfo], + block_timestamp: chrono::NaiveDateTime, + block_height: i64, +) -> Vec { + if event_size_info.is_empty() { + tracing::error!( + txn_version, + "Event size info is missing for user transaction." + ); + panic!("Event size info is missing for user transaction."); + } + // Add default events to temp_events until its length matches event_size_info length + tracing::info!( + txn_version, + "Events are empty but event_size_info is not empty." + ); + temp_events.resize(event_size_info.len(), EventPB::default()); + temp_events + .iter() + .enumerate() + .map(|(index, _event)| { + let size_info = event_size_info.get(index).unwrap_or(&EventSizeInfo { + type_tag_bytes: 0, + total_bytes: 0, + }); + Event::from_null_event( + txn_version, + block_height, + index as i64, + size_info, + block_timestamp, + ) + }) + .collect() +} + pub type ParquetEventModel = Event; diff --git a/rust/processor/src/gap_detectors/mod.rs b/rust/processor/src/gap_detectors/mod.rs index c93b19929..278175e20 100644 --- a/rust/processor/src/gap_detectors/mod.rs +++ b/rust/processor/src/gap_detectors/mod.rs @@ -155,6 +155,7 @@ pub async fn create_gap_detector_status_tracker_loop( { tracing::info!( last_processed_version = res.next_version_to_process, + processor_name, "Updating last processed version" ); processor