Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Parquet Events Processor to SDK #618

Merged
merged 10 commits into from
Dec 5, 2024
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/event_models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod raw_events;
58 changes: 58 additions & 0 deletions rust/processor/src/db/common/models/event_models/raw_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::utils::util::{standardize_address, truncate_str};
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
use serde::{Deserialize, Serialize};

/// P99 currently is 303 so using 300 as a safe max length
pub const EVENT_TYPE_MAX_LENGTH: usize = 300;

#[derive(Clone, Debug, Deserialize, Serialize)]

Check warning on line 8 in rust/processor/src/db/common/models/event_models/raw_events.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/event_models/raw_events.rs#L8

Added line #L8 was not covered by tests
pub struct RawEvent {
pub sequence_number: i64,
pub creation_number: i64,
pub account_address: String,
pub transaction_version: i64,
pub transaction_block_height: i64,
pub type_: String,
pub data: String,
pub event_index: i64,
pub indexed_type: String,
pub block_timestamp: Option<chrono::NaiveDateTime>,
pub type_tag_bytes: Option<i64>,
pub total_bytes: Option<i64>,
}

pub trait EventConvertible {
fn from_raw(raw_item: &RawEvent) -> Self;
}

impl RawEvent {
pub fn from_raw_event(
event: &EventPB,
txn_version: i64,
txn_block_height: i64,
event_index: i64,
size_info: Option<&EventSizeInfo>,
block_timestamp: Option<chrono::NaiveDateTime>,
) -> RawEvent {
let type_tag_bytes = size_info.map_or(0, |info| info.type_tag_bytes as i64);
let total_bytes = size_info.map_or(0, |info| info.total_bytes as i64);
let event_type = event.type_str.to_string();

RawEvent {
sequence_number: event.sequence_number as i64,
creation_number: event.key.as_ref().unwrap().creation_number as i64,
account_address: standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
),
transaction_version: txn_version,
transaction_block_height: txn_block_height,
type_: event_type.clone(),
data: event.data.clone(),
event_index,
indexed_type: truncate_str(&event_type, EVENT_TYPE_MAX_LENGTH),
block_timestamp,
type_tag_bytes: Some(type_tag_bytes),
total_bytes: Some(total_bytes),
}
}
}
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod default_models;
pub mod event_models;
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/event_models/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod parquet_events;
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::extra_unused_lifetimes)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
utils::util::{standardize_address, truncate_str},
db::common::models::event_models::raw_events::{
EventConvertible, RawEvent, EVENT_TYPE_MAX_LENGTH,
},
utils::util::truncate_str,
};
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
use lazy_static::lazy_static;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

// p99 currently is 303 so using 300 as a safe max length
const EVENT_TYPE_MAX_LENGTH: usize = 300;
const DEFAULT_CREATION_NUMBER: i64 = 0;
const DEFAULT_SEQUENCE_NUMBER: i64 = 0;
// This is for future proofing. TODO: change when events v2 comes
const EVENT_VERSION: i8 = 1i8;

lazy_static! {
pub static ref DEFAULT_ACCOUNT_ADDRESS: String = "NULL_ACCOUNT_ADDRESS".to_string();
pub static ref DEFAULT_EVENT_TYPE: String = "NULL_EVENT_TYPE".to_string();
Expand Down Expand Up @@ -66,24 +69,15 @@
size_info: &EventSizeInfo,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
let event_type: &str = event.type_str.as_ref();
Event {
account_address: standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
),
creation_number: event.key.as_ref().unwrap().creation_number as i64,
sequence_number: event.sequence_number as i64,
let raw = RawEvent::from_raw_event(
event,

Check warning on line 73 in rust/processor/src/db/parquet/models/event_models/parquet_events.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/event_models/parquet_events.rs#L72-L73

Added lines #L72 - L73 were not covered by tests
txn_version,
block_height,
event_type: event_type.to_string(),
data: event.data.clone(),
event_index,
indexed_type: truncate_str(event_type, EVENT_TYPE_MAX_LENGTH),
type_tag_bytes: size_info.type_tag_bytes as i64,
total_bytes: size_info.total_bytes as i64,
event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes
block_timestamp,
}
Some(size_info),
Some(block_timestamp),
);
Self::from_raw(&raw)

Check warning on line 80 in rust/processor/src/db/parquet/models/event_models/parquet_events.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/event_models/parquet_events.rs#L77-L80

Added lines #L77 - L80 were not covered by tests
}

// This function is added to handle the txn with events filtered, but event_size_info is not filtered.
Expand All @@ -106,7 +100,7 @@
indexed_type: truncate_str(&DEFAULT_EVENT_TYPE, EVENT_TYPE_MAX_LENGTH),
type_tag_bytes: size_info.type_tag_bytes as i64,
total_bytes: size_info.total_bytes as i64,
event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes
event_version: EVENT_VERSION,

Check warning on line 103 in rust/processor/src/db/parquet/models/event_models/parquet_events.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/event_models/parquet_events.rs#L103

Added line #L103 was not covered by tests
block_timestamp,
}
}
Expand Down Expand Up @@ -201,4 +195,24 @@
.collect()
}

impl EventConvertible for Event {
fn from_raw(raw: &RawEvent) -> Self {
Event {
txn_version: raw.transaction_version,
account_address: raw.account_address.clone(),
sequence_number: raw.sequence_number,
creation_number: raw.creation_number,
block_height: raw.transaction_block_height,
event_type: raw.type_.clone(),
data: raw.data.clone(),
event_index: raw.event_index,
indexed_type: raw.indexed_type.clone(),
type_tag_bytes: raw.type_tag_bytes.unwrap_or(0),
total_bytes: raw.total_bytes.unwrap_or(0),
event_version: EVENT_VERSION,
block_timestamp: raw.block_timestamp.unwrap(),
}
}

Check warning on line 215 in rust/processor/src/db/parquet/models/event_models/parquet_events.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/parquet/models/event_models/parquet_events.rs#L199-L215

Added lines #L199 - L215 were not covered by tests
}

pub type ParquetEventModel = Event;
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod default_models;
pub mod event_models;
38 changes: 23 additions & 15 deletions rust/processor/src/db/postgres/models/events_models/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
#![allow(clippy::extra_unused_lifetimes)]

use crate::{
db::common::models::event_models::raw_events::{EventConvertible, RawEvent},
schema::events,
utils::util::{standardize_address, truncate_str},
};
use aptos_protos::transaction::v1::Event as EventPB;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

// p99 currently is 303 so using 300 as a safe max length
const EVENT_TYPE_MAX_LENGTH: usize = 300;

#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(transaction_version, event_index))]
#[diesel(table_name = events)]
Expand All @@ -36,20 +33,15 @@ impl Event {
transaction_block_height: i64,
event_index: i64,
) -> Self {
let t: &str = event.type_str.as_ref();
Event {
account_address: standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
),
creation_number: event.key.as_ref().unwrap().creation_number as i64,
sequence_number: event.sequence_number as i64,
let raw = RawEvent::from_raw_event(
event,
transaction_version,
transaction_block_height,
type_: t.to_string(),
data: serde_json::from_str(event.data.as_str()).unwrap(),
event_index,
indexed_type: truncate_str(t, EVENT_TYPE_MAX_LENGTH),
}
None,
None,
);
Self::from_raw(&raw)
}

pub fn from_events(
Expand All @@ -72,5 +64,21 @@ impl Event {
}
}

impl EventConvertible for Event {
fn from_raw(raw: &RawEvent) -> Self {
Event {
sequence_number: raw.sequence_number,
creation_number: raw.creation_number,
account_address: raw.account_address.clone(),
transaction_version: raw.transaction_version,
transaction_block_height: raw.transaction_block_height,
type_: raw.type_.clone(),
data: serde_json::from_str(&raw.data).unwrap(),
event_index: raw.event_index,
indexed_type: raw.indexed_type.clone(),
}
}
}

// Prevent conflicts with other things named `Event`
pub type EventModel = Event;
3 changes: 0 additions & 3 deletions rust/processor/src/db/postgres/models/events_models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,3 @@
// SPDX-License-Identifier: Apache-2.0

pub mod events;

// parquet model
pub mod parquet_events;
63 changes: 34 additions & 29 deletions rust/processor/src/processors/events_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,35 +108,7 @@
let processing_start = std::time::Instant::now();
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();

let mut events = vec![];
for txn in &transactions {
let txn_version = txn.version as i64;
let block_height = txn.block_height as i64;
let txn_data = match txn.txn_data.as_ref() {
Some(data) => data,
None => {
tracing::warn!(
transaction_version = txn_version,
"Transaction data doesn't exist"
);
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&["EventsProcessor"])
.inc();
continue;
},
};
let default = vec![];
let raw_events = match txn_data {
TxnData::BlockMetadata(tx_inner) => &tx_inner.events,
TxnData::Genesis(tx_inner) => &tx_inner.events,
TxnData::User(tx_inner) => &tx_inner.events,
TxnData::Validator(tx_inner) => &tx_inner.events,
_ => &default,
};

let txn_events = EventModel::from_events(raw_events, txn_version, block_height);
events.extend(txn_events);
}
let events = process_transactions(transactions);

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();
Expand Down Expand Up @@ -179,3 +151,36 @@
&self.connection_pool
}
}

pub fn process_transactions(transactions: Vec<Transaction>) -> Vec<EventModel> {
let mut events = vec![];
for txn in &transactions {
let txn_version = txn.version as i64;
let block_height = txn.block_height as i64;
let txn_data = match txn.txn_data.as_ref() {
Some(data) => data,
None => {
tracing::warn!(

Check warning on line 163 in rust/processor/src/processors/events_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/events_processor.rs#L163

Added line #L163 was not covered by tests
transaction_version = txn_version,
"Transaction data doesn't exist"

Check warning on line 165 in rust/processor/src/processors/events_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/events_processor.rs#L165

Added line #L165 was not covered by tests
);
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&["EventsProcessor"])
.inc();
continue;

Check warning on line 170 in rust/processor/src/processors/events_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/events_processor.rs#L167-L170

Added lines #L167 - L170 were not covered by tests
},
};
let default = vec![];
let raw_events = match txn_data {
TxnData::BlockMetadata(tx_inner) => &tx_inner.events,
TxnData::Genesis(tx_inner) => &tx_inner.events,

Check warning on line 176 in rust/processor/src/processors/events_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/events_processor.rs#L175-L176

Added lines #L175 - L176 were not covered by tests
TxnData::User(tx_inner) => &tx_inner.events,
TxnData::Validator(tx_inner) => &tx_inner.events,
_ => &default,

Check warning on line 179 in rust/processor/src/processors/events_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/events_processor.rs#L179

Added line #L179 was not covered by tests
};

let txn_events = EventModel::from_events(raw_events, txn_version, block_height);
events.extend(txn_events);
}
events
}
Loading
Loading