Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle filtered events and make it null events when there are events … #476

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ use crate::{
};
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
use lazy_static::lazy_static;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

// p99 currently is 303 so using 300 as a safe max length
const EVENT_TYPE_MAX_LENGTH: usize = 300;
const DEFAULT_CREATION_NUMBER: i64 = 0;
const DEFAULT_SEQUENCE_NUMBER: i64 = 0;
lazy_static! {
pub static ref DEFAULT_ACCOUNT_ADDRESS: String = "NULL_ACCOUNT_ADDRESS".to_string();
pub static ref DEFAULT_EVENT_TYPE: String = "NULL_EVENT_TYPE".to_string();
pub static ref DEFAULT_EVENT_DATA: String = "NULL_EVENT_DATA".to_string();
}

#[derive(Allocative, Clone, Debug, Default, Deserialize, ParquetRecordWriter, Serialize)]
pub struct Event {
Expand Down Expand Up @@ -78,6 +86,31 @@ impl Event {
}
}

// This function is added to handle the txn with events filtered, but event_size_info is not filtered.
pub fn from_null_event(
txn_version: i64,
block_height: i64,
event_index: i64,
size_info: &EventSizeInfo,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
Event {
account_address: DEFAULT_ACCOUNT_ADDRESS.clone(),
creation_number: DEFAULT_CREATION_NUMBER,
sequence_number: DEFAULT_SEQUENCE_NUMBER,
txn_version,
block_height,
event_type: DEFAULT_EVENT_TYPE.clone(),
data: DEFAULT_EVENT_DATA.clone(),
event_index,
indexed_type: truncate_str(&DEFAULT_EVENT_TYPE, EVENT_TYPE_MAX_LENGTH),
type_tag_bytes: size_info.type_tag_bytes as i64,
total_bytes: size_info.total_bytes as i64,
event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes
block_timestamp,
}
}

pub fn from_events(
events: &[EventPB],
txn_version: i64,
Expand All @@ -86,22 +119,29 @@ impl Event {
block_timestamp: chrono::NaiveDateTime,
is_user_txn_type: bool,
) -> Vec<Self> {
let mut temp_events = events.to_vec();

// event_size_info will be used for user transactions only, no promises for other transactions.
// If event_size_info is missing due to fewer or no events, it defaults to 0.
// No need to backfill as event_size_info is primarily for debugging user transactions.
if events.len() != event_size_info.len() {
if temp_events.len() != event_size_info.len() {
tracing::warn!(
events_len = events.len(),
event_size_info_len = event_size_info.len(),
txn_version,
"Length mismatch: events size does not match event_size_info size.",
);
if is_user_txn_type {
panic!("Event size info is missing for user transaction.")
return handle_user_txn_type(
&mut temp_events,
txn_version,
event_size_info,
block_timestamp,
block_height,
);
}
}

events
temp_events
.iter()
.enumerate()
.map(|(index, event)| {
Expand All @@ -122,4 +162,43 @@ impl Event {
}
}

fn handle_user_txn_type(
temp_events: &mut Vec<EventPB>,
txn_version: i64,
event_size_info: &[EventSizeInfo],
block_timestamp: chrono::NaiveDateTime,
block_height: i64,
) -> Vec<Event> {
if event_size_info.is_empty() {
tracing::error!(
txn_version,
"Event size info is missing for user transaction."
);
panic!("Event size info is missing for user transaction.");
}
// Add default events to temp_events until its length matches event_size_info length
tracing::info!(
txn_version,
"Events are empty but event_size_info is not empty."
);
temp_events.resize(event_size_info.len(), EventPB::default());
temp_events
.iter()
.enumerate()
.map(|(index, _event)| {
let size_info = event_size_info.get(index).unwrap_or(&EventSizeInfo {
type_tag_bytes: 0,
total_bytes: 0,
});
Event::from_null_event(
txn_version,
block_height,
index as i64,
size_info,
block_timestamp,
)
})
.collect()
}

pub type ParquetEventModel = Event;
1 change: 1 addition & 0 deletions rust/processor/src/gap_detectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ pub async fn create_gap_detector_status_tracker_loop(
{
tracing::info!(
last_processed_version = res.next_version_to_process,
processor_name,
"Updating last processed version"
);
processor
Expand Down
Loading