Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parquet Events Processor #451

Merged
merged 15 commits into from
Jul 25, 2024
3 changes: 3 additions & 0 deletions rust/processor/src/db/common/models/events_models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

pub mod events;

// parquet model
pub mod parquet_events;
118 changes: 118 additions & 0 deletions rust/processor/src/db/common/models/events_models/parquet_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::extra_unused_lifetimes)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
utils::util::{standardize_address, truncate_str},
};
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
use itertools::Itertools;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

// p99 currently is 303 so using 300 as a safe max length
const EVENT_TYPE_MAX_LENGTH: usize = 300;

#[derive(Allocative, Clone, Debug, Default, Deserialize, ParquetRecordWriter, Serialize)]
pub struct Event {
pub txn_version: i64,
pub account_address: String,
pub sequence_number: i64,
pub creation_number: i64,
pub block_height: i64,
pub event_type: String,
pub data: String,
pub event_index: i64,
pub indexed_type: String,
pub type_tag_bytes: i64,
pub total_bytes: i64,
pub event_version: i8,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}

impl NamedTable for Event {
const TABLE_NAME: &'static str = "events";
}

impl HasVersion for Event {
fn version(&self) -> i64 {
self.txn_version
}
}

impl GetTimeStamp for Event {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

impl Event {
pub fn from_event(
event: &EventPB,
txn_version: i64,
block_height: i64,
event_index: i64,
size_info: &EventSizeInfo,
block_timestamp: chrono::NaiveDateTime,
) -> Self {
let event_type: &str = event.type_str.as_ref();
Event {
account_address: standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
),
creation_number: event.key.as_ref().unwrap().creation_number as i64,
sequence_number: event.sequence_number as i64,
txn_version,
block_height,
event_type: event_type.to_string(),
data: event.data.clone(),
event_index,
indexed_type: truncate_str(event_type, EVENT_TYPE_MAX_LENGTH),
type_tag_bytes: size_info.type_tag_bytes as i64,
total_bytes: size_info.total_bytes as i64,
event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes
block_timestamp,
}
}

pub fn from_events(
events: &[EventPB],
txn_version: i64,
block_height: i64,
event_size_info: &[EventSizeInfo],
block_timestamp: chrono::NaiveDateTime,
) -> Vec<Self> {
// Ensure that lengths match, otherwise log and panic to investigate
if events.len() != event_size_info.len() {
tracing::error!(
events_len = events.len(),
event_size_info_len = event_size_info.len(),
txn_version,
"Length mismatch: events size does not match event_size_info size.",
);
panic!("Length mismatch: events len does not match event_size_info len");
}

events
.iter()
.zip_eq(event_size_info.iter())
larry-aptos marked this conversation as resolved.
Show resolved Hide resolved
.enumerate()
.map(|(index, (event, size_info))| {
Self::from_event(
event,
txn_version,
block_height,
index as i64,
size_info,
block_timestamp,
)
})
.collect::<Vec<ParquetEventModel>>()
}
}

pub type ParquetEventModel = Event;
5 changes: 5 additions & 0 deletions rust/processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::{
processors::parquet_processors::{
parquet_ans_processor::{ParquetAnsProcessor, ParquetAnsProcessorConfig},
parquet_default_processor::{ParquetDefaultProcessor, ParquetDefaultProcessorConfig},
parquet_events_processor::{ParquetEventsProcessor, ParquetEventsProcessorConfig},
parquet_fungible_asset_processor::{
ParquetFungibleAssetProcessor, ParquetFungibleAssetProcessorConfig,
},
Expand Down Expand Up @@ -200,6 +201,7 @@ pub enum ProcessorConfig {
ParquetFungibleAssetProcessor(ParquetFungibleAssetProcessorConfig),
ParquetTransactionMetadataProcessor(ParquetTransactionMetadataProcessorConfig),
ParquetAnsProcessor(ParquetAnsProcessorConfig),
ParquetEventsProcessor(ParquetEventsProcessorConfig),
}

impl ProcessorConfig {
Expand All @@ -216,6 +218,7 @@ impl ProcessorConfig {
| ProcessorConfig::ParquetFungibleAssetProcessor(_)
| ProcessorConfig::ParquetTransactionMetadataProcessor(_)
| ProcessorConfig::ParquetAnsProcessor(_)
| ProcessorConfig::ParquetEventsProcessor(_)
)
}
}
Expand Down Expand Up @@ -250,10 +253,12 @@ pub enum Processor {
TokenV2Processor,
TransactionMetadataProcessor,
UserTransactionProcessor,
// Parquet processors
ParquetDefaultProcessor,
ParquetFungibleAssetProcessor,
ParquetTransactionMetadataProcessor,
ParquetAnsProcessor,
ParquetEventsProcessor,
}

#[cfg(test)]
Expand Down
9 changes: 8 additions & 1 deletion rust/processor/src/processors/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@ use std::time::Duration;

pub mod parquet_ans_processor;
pub mod parquet_default_processor;
pub mod parquet_events_processor;
pub mod parquet_fungible_asset_processor;
pub mod parquet_transaction_metadata_processor;

pub const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

pub trait UploadIntervalConfig {
pub trait ParquetProcessorTrait {
fn parquet_upload_interval_in_secs(&self) -> Duration;

fn set_google_credentials(&self, credentials: Option<String>) {
if let Some(credentials) = credentials {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::{UploadIntervalConfig, GOOGLE_APPLICATION_CREDENTIALS};
use super::ParquetProcessorTrait;
use crate::{
bq_analytics::{
create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric,
Expand Down Expand Up @@ -40,7 +40,7 @@ pub struct ParquetAnsProcessorConfig {
pub parquet_upload_interval: u64,
}

impl UploadIntervalConfig for ParquetAnsProcessorConfig {
impl ParquetProcessorTrait for ParquetAnsProcessorConfig {
fn parquet_upload_interval_in_secs(&self) -> Duration {
Duration::from_secs(self.parquet_upload_interval)
}
Expand All @@ -58,9 +58,7 @@ impl ParquetAnsProcessor {
config: ParquetAnsProcessorConfig,
new_gap_detector_sender: AsyncSender<ProcessingResult>,
) -> Self {
if let Some(credentials) = config.google_application_credentials.clone() {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
}
config.set_google_credentials(config.google_application_credentials.clone());

let ans_primary_name_v2_sender = create_parquet_handler_loop::<AnsPrimaryNameV2>(
new_gap_detector_sender.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ use crate::{
parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel},
},
gap_detectors::ProcessingResult,
processors::{
parquet_processors::{UploadIntervalConfig, GOOGLE_APPLICATION_CREDENTIALS},
ProcessorName, ProcessorTrait,
},
processors::{parquet_processors::ParquetProcessorTrait, ProcessorName, ProcessorTrait},
utils::database::ArcDbPool,
};
use ahash::AHashMap;
Expand All @@ -42,7 +39,7 @@ pub struct ParquetDefaultProcessorConfig {
pub parquet_upload_interval: u64,
}

impl UploadIntervalConfig for ParquetDefaultProcessorConfig {
impl ParquetProcessorTrait for ParquetDefaultProcessorConfig {
fn parquet_upload_interval_in_secs(&self) -> Duration {
Duration::from_secs(self.parquet_upload_interval)
}
Expand All @@ -66,9 +63,7 @@ impl ParquetDefaultProcessor {
config: ParquetDefaultProcessorConfig,
new_gap_detector_sender: AsyncSender<ProcessingResult>,
) -> Self {
if let Some(credentials) = config.google_application_credentials.clone() {
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
}
config.set_google_credentials(config.google_application_credentials.clone());

let transaction_sender = create_parquet_handler_loop::<ParquetTransaction>(
new_gap_detector_sender.clone(),
Expand Down
Loading
Loading