Skip to content

Commit

Permalink
handle filtered events and make it null events when there are events …
Browse files Browse the repository at this point in the history
…size info
  • Loading branch information
yuunlimm committed Jul 29, 2024
1 parent ee05799 commit 8465ddf
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ use crate::{
};
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
use lazy_static::lazy_static;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

// p99 currently is 303 so using 300 as a safe max length
const EVENT_TYPE_MAX_LENGTH: usize = 300;
const DEFAULT_CREATION_NUMBER: i64 = 0;
const DEFAULT_SEQUENCE_NUMBER: i64 = 0;
lazy_static! {
pub static ref DEFAULT_ACCOUNT_ADDRESS: String = "NULL_ACCOUNT_ADDRESS".to_string();
pub static ref DEFAULT_EVENT_TYPE: String = "NULL_EVENT_TYPE".to_string();
pub static ref DEFAULT_EVENT_DATA: String = "NULL_EVENT_DATA".to_string();
}

#[derive(Allocative, Clone, Debug, Default, Deserialize, ParquetRecordWriter, Serialize)]
pub struct Event {
Expand Down Expand Up @@ -78,6 +86,31 @@ impl Event {
}
}

// This function is added to handle the txn with events filtered, but event_size_info is not filtered.
pub fn from_null_event(
txn_version: i64,
block_height: i64,
event_index: i64,
size_info: &EventSizeInfo,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
Event {
account_address: DEFAULT_ACCOUNT_ADDRESS.clone(),
creation_number: DEFAULT_CREATION_NUMBER,
sequence_number: DEFAULT_SEQUENCE_NUMBER,
txn_version,
block_height,
event_type: DEFAULT_EVENT_TYPE.clone(),
data: DEFAULT_EVENT_DATA.clone(),
event_index,
indexed_type: truncate_str(&DEFAULT_EVENT_TYPE, EVENT_TYPE_MAX_LENGTH),
type_tag_bytes: size_info.type_tag_bytes as i64,
total_bytes: size_info.total_bytes as i64,
event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes
block_timestamp,
}
}

pub fn from_events(
events: &[EventPB],
txn_version: i64,
Expand All @@ -86,22 +119,29 @@ impl Event {
block_timestamp: chrono::NaiveDateTime,
is_user_txn_type: bool,
) -> Vec<Self> {
let mut temp_events = events.to_vec();

// event_size_info will be used for user transactions only, no promises for other transactions.
// If event_size_info is missing due to fewer or no events, it defaults to 0.
// No need to backfill as event_size_info is primarily for debugging user transactions.
if events.len() != event_size_info.len() {
if temp_events.len() != event_size_info.len() {
tracing::warn!(
events_len = events.len(),
event_size_info_len = event_size_info.len(),
txn_version,
"Length mismatch: events size does not match event_size_info size.",
);
if is_user_txn_type {
panic!("Event size info is missing for user transaction.")
return handle_user_txn_type(
&mut temp_events,
txn_version,
event_size_info,
block_timestamp,
block_height,
);
}
}

events
temp_events
.iter()
.enumerate()
.map(|(index, event)| {
Expand All @@ -122,4 +162,43 @@ impl Event {
}
}

fn handle_user_txn_type(
temp_events: &mut Vec<EventPB>,
txn_version: i64,
event_size_info: &[EventSizeInfo],
block_timestamp: chrono::NaiveDateTime,
block_height: i64,
) -> Vec<Event> {
if event_size_info.is_empty() {
tracing::error!(
txn_version,
"Event size info is missing for user transaction."
);
panic!("Event size info is missing for user transaction.");
}
// Add default events to temp_events until its length matches event_size_info length
tracing::info!(
txn_version,
"Events are empty but event_size_info is not empty."
);
temp_events.resize(event_size_info.len(), EventPB::default());
temp_events
.iter()
.enumerate()
.map(|(index, _event)| {
let size_info = event_size_info.get(index).unwrap_or(&EventSizeInfo {
type_tag_bytes: 0,
total_bytes: 0,
});
Event::from_null_event(
txn_version,
block_height,
index as i64,
size_info,
block_timestamp,
)
})
.collect()
}

pub type ParquetEventModel = Event;
1 change: 1 addition & 0 deletions rust/processor/src/gap_detectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ pub async fn create_gap_detector_status_tracker_loop(
{
tracing::info!(
last_processed_version = res.next_version_to_process,
processor_name,
"Updating last processed version"
);
processor
Expand Down

0 comments on commit 8465ddf

Please sign in to comment.