Skip to content

Commit

Permalink
Migrate Parquet Events Processor to SDK (#618)
Browse files Browse the repository at this point in the history
  • Loading branch information
dermanyang authored Dec 5, 2024
1 parent 3c62913 commit b823242
Show file tree
Hide file tree
Showing 19 changed files with 505 additions and 141 deletions.
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/event_models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod raw_events;
58 changes: 58 additions & 0 deletions rust/processor/src/db/common/models/event_models/raw_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::utils::util::{standardize_address, truncate_str};
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
use serde::{Deserialize, Serialize};

/// P99 currently is 303 so using 300 as a safe max length
pub const EVENT_TYPE_MAX_LENGTH: usize = 300;

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RawEvent {
pub sequence_number: i64,
pub creation_number: i64,
pub account_address: String,
pub transaction_version: i64,
pub transaction_block_height: i64,
pub type_: String,
pub data: String,
pub event_index: i64,
pub indexed_type: String,
pub block_timestamp: Option<chrono::NaiveDateTime>,
pub type_tag_bytes: Option<i64>,
pub total_bytes: Option<i64>,
}

pub trait EventConvertible {
fn from_raw(raw_item: &RawEvent) -> Self;
}

impl RawEvent {
pub fn from_raw_event(
event: &EventPB,
txn_version: i64,
txn_block_height: i64,
event_index: i64,
size_info: Option<&EventSizeInfo>,
block_timestamp: Option<chrono::NaiveDateTime>,
) -> RawEvent {
let type_tag_bytes = size_info.map_or(0, |info| info.type_tag_bytes as i64);
let total_bytes = size_info.map_or(0, |info| info.total_bytes as i64);
let event_type = event.type_str.to_string();

RawEvent {
sequence_number: event.sequence_number as i64,
creation_number: event.key.as_ref().unwrap().creation_number as i64,
account_address: standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
),
transaction_version: txn_version,
transaction_block_height: txn_block_height,
type_: event_type.clone(),
data: event.data.clone(),
event_index,
indexed_type: truncate_str(&event_type, EVENT_TYPE_MAX_LENGTH),
block_timestamp,
type_tag_bytes: Some(type_tag_bytes),
total_bytes: Some(total_bytes),
}
}
}
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod default_models;
pub mod event_models;
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/event_models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod parquet_events;
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::extra_unused_lifetimes)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
utils::util::{standardize_address, truncate_str},
db::common::models::event_models::raw_events::{
EventConvertible, RawEvent, EVENT_TYPE_MAX_LENGTH,
},
utils::util::truncate_str,
};
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
use lazy_static::lazy_static;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

// p99 currently is 303 so using 300 as a safe max length
const EVENT_TYPE_MAX_LENGTH: usize = 300;
const DEFAULT_CREATION_NUMBER: i64 = 0;
const DEFAULT_SEQUENCE_NUMBER: i64 = 0;
// This is for future proofing. TODO: change when events v2 comes
const EVENT_VERSION: i8 = 1i8;

lazy_static! {
pub static ref DEFAULT_ACCOUNT_ADDRESS: String = "NULL_ACCOUNT_ADDRESS".to_string();
pub static ref DEFAULT_EVENT_TYPE: String = "NULL_EVENT_TYPE".to_string();
Expand Down Expand Up @@ -66,24 +69,15 @@ impl Event {
size_info: &EventSizeInfo,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
let event_type: &str = event.type_str.as_ref();
Event {
account_address: standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
),
creation_number: event.key.as_ref().unwrap().creation_number as i64,
sequence_number: event.sequence_number as i64,
let raw = RawEvent::from_raw_event(
event,
txn_version,
block_height,
event_type: event_type.to_string(),
data: event.data.clone(),
event_index,
indexed_type: truncate_str(event_type, EVENT_TYPE_MAX_LENGTH),
type_tag_bytes: size_info.type_tag_bytes as i64,
total_bytes: size_info.total_bytes as i64,
event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes
block_timestamp,
}
Some(size_info),
Some(block_timestamp),
);
Self::from_raw(&raw)
}

// This function is added to handle the txn with events filtered, but event_size_info is not filtered.
Expand All @@ -106,7 +100,7 @@ impl Event {
indexed_type: truncate_str(&DEFAULT_EVENT_TYPE, EVENT_TYPE_MAX_LENGTH),
type_tag_bytes: size_info.type_tag_bytes as i64,
total_bytes: size_info.total_bytes as i64,
event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes
event_version: EVENT_VERSION,
block_timestamp,
}
}
Expand Down Expand Up @@ -201,4 +195,24 @@ fn handle_user_txn_type(
.collect()
}

impl EventConvertible for Event {
fn from_raw(raw: &RawEvent) -> Self {
Event {
txn_version: raw.transaction_version,
account_address: raw.account_address.clone(),
sequence_number: raw.sequence_number,
creation_number: raw.creation_number,
block_height: raw.transaction_block_height,
event_type: raw.type_.clone(),
data: raw.data.clone(),
event_index: raw.event_index,
indexed_type: raw.indexed_type.clone(),
type_tag_bytes: raw.type_tag_bytes.unwrap_or(0),
total_bytes: raw.total_bytes.unwrap_or(0),
event_version: EVENT_VERSION,
block_timestamp: raw.block_timestamp.unwrap(),
}
}
}

pub type ParquetEventModel = Event;
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod default_models;
pub mod event_models;
38 changes: 23 additions & 15 deletions rust/processor/src/db/postgres/models/events_models/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
db::common::models::event_models::raw_events::{EventConvertible, RawEvent},
schema::events,
utils::util::{standardize_address, truncate_str},
};
use aptos_protos::transaction::v1::Event as EventPB;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

// p99 currently is 303 so using 300 as a safe max length
const EVENT_TYPE_MAX_LENGTH: usize = 300;

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(transaction_version, event_index))]
#[diesel(table_name = events)]
Expand All @@ -36,20 +33,15 @@ impl Event {
transaction_block_height: i64,
event_index: i64,
) -> Self {
let t: &str = event.type_str.as_ref();
Event {
account_address: standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
),
creation_number: event.key.as_ref().unwrap().creation_number as i64,
sequence_number: event.sequence_number as i64,
let raw = RawEvent::from_raw_event(
event,
transaction_version,
transaction_block_height,
type_: t.to_string(),
data: serde_json::from_str(event.data.as_str()).unwrap(),
event_index,
indexed_type: truncate_str(t, EVENT_TYPE_MAX_LENGTH),
}
None,
None,
);
Self::from_raw(&raw)
}

pub fn from_events(
Expand All @@ -72,5 +64,21 @@ impl Event {
}
}

impl EventConvertible for Event {
fn from_raw(raw: &RawEvent) -> Self {
Event {
sequence_number: raw.sequence_number,
creation_number: raw.creation_number,
account_address: raw.account_address.clone(),
transaction_version: raw.transaction_version,
transaction_block_height: raw.transaction_block_height,
type_: raw.type_.clone(),
data: serde_json::from_str(&raw.data).unwrap(),
event_index: raw.event_index,
indexed_type: raw.indexed_type.clone(),
}
}
}

// Prevent conflicts with other things named `Event`
pub type EventModel = Event;
3 changes: 0 additions & 3 deletions rust/processor/src/db/postgres/models/events_models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,3 @@
// SPDX-License-Identifier: Apache-2.0

pub mod events;

// parquet model
pub mod parquet_events;
63 changes: 34 additions & 29 deletions rust/processor/src/processors/events_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,35 +108,7 @@ impl ProcessorTrait for EventsProcessor {
let processing_start = std::time::Instant::now();
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();

let mut events = vec![];
for txn in &transactions {
let txn_version = txn.version as i64;
let block_height = txn.block_height as i64;
let txn_data = match txn.txn_data.as_ref() {
Some(data) => data,
None => {
tracing::warn!(
transaction_version = txn_version,
"Transaction data doesn't exist"
);
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&["EventsProcessor"])
.inc();
continue;
},
};
let default = vec![];
let raw_events = match txn_data {
TxnData::BlockMetadata(tx_inner) => &tx_inner.events,
TxnData::Genesis(tx_inner) => &tx_inner.events,
TxnData::User(tx_inner) => &tx_inner.events,
TxnData::Validator(tx_inner) => &tx_inner.events,
_ => &default,
};

let txn_events = EventModel::from_events(raw_events, txn_version, block_height);
events.extend(txn_events);
}
let events = process_transactions(transactions);

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();
Expand Down Expand Up @@ -179,3 +151,36 @@ impl ProcessorTrait for EventsProcessor {
&self.connection_pool
}
}

pub fn process_transactions(transactions: Vec<Transaction>) -> Vec<EventModel> {
let mut events = vec![];
for txn in &transactions {
let txn_version = txn.version as i64;
let block_height = txn.block_height as i64;
let txn_data = match txn.txn_data.as_ref() {
Some(data) => data,
None => {
tracing::warn!(
transaction_version = txn_version,
"Transaction data doesn't exist"
);
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&["EventsProcessor"])
.inc();
continue;
},
};
let default = vec![];
let raw_events = match txn_data {
TxnData::BlockMetadata(tx_inner) => &tx_inner.events,
TxnData::Genesis(tx_inner) => &tx_inner.events,
TxnData::User(tx_inner) => &tx_inner.events,
TxnData::Validator(tx_inner) => &tx_inner.events,
_ => &default,
};

let txn_events = EventModel::from_events(raw_events, txn_version, block_height);
events.extend(txn_events);
}
events
}
Loading

0 comments on commit b823242

Please sign in to comment.