diff --git a/rust/processor/src/db/common/models/event_models/mod.rs b/rust/processor/src/db/common/models/event_models/mod.rs new file mode 100644 index 000000000..6a11811c2 --- /dev/null +++ b/rust/processor/src/db/common/models/event_models/mod.rs @@ -0,0 +1 @@ +pub mod raw_events; diff --git a/rust/processor/src/db/common/models/event_models/raw_events.rs b/rust/processor/src/db/common/models/event_models/raw_events.rs new file mode 100644 index 000000000..9ef0fdf30 --- /dev/null +++ b/rust/processor/src/db/common/models/event_models/raw_events.rs @@ -0,0 +1,58 @@ +use crate::utils::util::{standardize_address, truncate_str}; +use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo}; +use serde::{Deserialize, Serialize}; + +/// P99 currently is 303 so using 300 as a safe max length +pub const EVENT_TYPE_MAX_LENGTH: usize = 300; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RawEvent { + pub sequence_number: i64, + pub creation_number: i64, + pub account_address: String, + pub transaction_version: i64, + pub transaction_block_height: i64, + pub type_: String, + pub data: String, + pub event_index: i64, + pub indexed_type: String, + pub block_timestamp: Option, + pub type_tag_bytes: Option, + pub total_bytes: Option, +} + +pub trait EventConvertible { + fn from_raw(raw_item: &RawEvent) -> Self; +} + +impl RawEvent { + pub fn from_raw_event( + event: &EventPB, + txn_version: i64, + txn_block_height: i64, + event_index: i64, + size_info: Option<&EventSizeInfo>, + block_timestamp: Option, + ) -> RawEvent { + let type_tag_bytes = size_info.map_or(0, |info| info.type_tag_bytes as i64); + let total_bytes = size_info.map_or(0, |info| info.total_bytes as i64); + let event_type = event.type_str.to_string(); + + RawEvent { + sequence_number: event.sequence_number as i64, + creation_number: event.key.as_ref().unwrap().creation_number as i64, + account_address: standardize_address( + event.key.as_ref().unwrap().account_address.as_str(), + ), + transaction_version: txn_version, + transaction_block_height: txn_block_height, + type_: event_type.clone(), + data: event.data.clone(), + event_index, + indexed_type: truncate_str(&event_type, EVENT_TYPE_MAX_LENGTH), + block_timestamp, + type_tag_bytes: Some(type_tag_bytes), + total_bytes: Some(total_bytes), + } + } +} diff --git a/rust/processor/src/db/common/models/mod.rs b/rust/processor/src/db/common/models/mod.rs index 16e0f058f..ff5758324 100644 --- a/rust/processor/src/db/common/models/mod.rs +++ b/rust/processor/src/db/common/models/mod.rs @@ -1 +1,2 @@ pub mod default_models; +pub mod event_models; diff --git a/rust/processor/src/db/parquet/models/event_models/mod.rs b/rust/processor/src/db/parquet/models/event_models/mod.rs new file mode 100644 index 000000000..7d33874ce --- /dev/null +++ b/rust/processor/src/db/parquet/models/event_models/mod.rs @@ -0,0 +1 @@ +pub mod parquet_events; diff --git a/rust/processor/src/db/postgres/models/events_models/parquet_events.rs b/rust/processor/src/db/parquet/models/event_models/parquet_events.rs similarity index 83% rename from rust/processor/src/db/postgres/models/events_models/parquet_events.rs rename to rust/processor/src/db/parquet/models/event_models/parquet_events.rs index 683b6a1f7..14953890f 100644 --- a/rust/processor/src/db/postgres/models/events_models/parquet_events.rs +++ b/rust/processor/src/db/parquet/models/event_models/parquet_events.rs @@ -2,10 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 #![allow(clippy::extra_unused_lifetimes)] - use crate::{ bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, - utils::util::{standardize_address, truncate_str}, + db::common::models::event_models::raw_events::{ + EventConvertible, RawEvent, EVENT_TYPE_MAX_LENGTH, + }, + utils::util::truncate_str, }; use allocative_derive::Allocative; use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo}; @@ -13,10 +15,11 @@ use lazy_static::lazy_static; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; -// p99 currently is 303 so using 300 as a safe max length -const EVENT_TYPE_MAX_LENGTH: usize = 300; const DEFAULT_CREATION_NUMBER: i64 = 0; const DEFAULT_SEQUENCE_NUMBER: i64 = 0; +// This is for future proofing. TODO: change when events v2 comes +const EVENT_VERSION: i8 = 1i8; + lazy_static! { pub static ref DEFAULT_ACCOUNT_ADDRESS: String = "NULL_ACCOUNT_ADDRESS".to_string(); pub static ref DEFAULT_EVENT_TYPE: String = "NULL_EVENT_TYPE".to_string(); @@ -66,24 +69,15 @@ impl Event { size_info: &EventSizeInfo, block_timestamp: chrono::NaiveDateTime, ) -> Self { - let event_type: &str = event.type_str.as_ref(); - Event { - account_address: standardize_address( - event.key.as_ref().unwrap().account_address.as_str(), - ), - creation_number: event.key.as_ref().unwrap().creation_number as i64, - sequence_number: event.sequence_number as i64, + let raw = RawEvent::from_raw_event( + event, txn_version, block_height, - event_type: event_type.to_string(), - data: event.data.clone(), event_index, - indexed_type: truncate_str(event_type, EVENT_TYPE_MAX_LENGTH), - type_tag_bytes: size_info.type_tag_bytes as i64, - total_bytes: size_info.total_bytes as i64, - event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes - block_timestamp, - } + Some(size_info), + Some(block_timestamp), + ); + Self::from_raw(&raw) } // This function is added to handle the txn with events filtered, but event_size_info is not filtered. @@ -106,7 +100,7 @@ impl Event { indexed_type: truncate_str(&DEFAULT_EVENT_TYPE, EVENT_TYPE_MAX_LENGTH), type_tag_bytes: size_info.type_tag_bytes as i64, total_bytes: size_info.total_bytes as i64, - event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes + event_version: EVENT_VERSION, block_timestamp, } } @@ -201,4 +195,24 @@ fn handle_user_txn_type( .collect() } +impl EventConvertible for Event { + fn from_raw(raw: &RawEvent) -> Self { + Event { + txn_version: raw.transaction_version, + account_address: raw.account_address.clone(), + sequence_number: raw.sequence_number, + creation_number: raw.creation_number, + block_height: raw.transaction_block_height, + event_type: raw.type_.clone(), + data: raw.data.clone(), + event_index: raw.event_index, + indexed_type: raw.indexed_type.clone(), + type_tag_bytes: raw.type_tag_bytes.unwrap_or(0), + total_bytes: raw.total_bytes.unwrap_or(0), + event_version: EVENT_VERSION, + block_timestamp: raw.block_timestamp.unwrap(), + } + } +} + pub type ParquetEventModel = Event; diff --git a/rust/processor/src/db/parquet/models/mod.rs b/rust/processor/src/db/parquet/models/mod.rs index 16e0f058f..ff5758324 100644 --- a/rust/processor/src/db/parquet/models/mod.rs +++ b/rust/processor/src/db/parquet/models/mod.rs @@ -1 +1,2 @@ pub mod default_models; +pub mod event_models; diff --git a/rust/processor/src/db/postgres/models/events_models/events.rs b/rust/processor/src/db/postgres/models/events_models/events.rs index 6747636ab..d1e010484 100644 --- a/rust/processor/src/db/postgres/models/events_models/events.rs +++ b/rust/processor/src/db/postgres/models/events_models/events.rs @@ -4,16 +4,13 @@ #![allow(clippy::extra_unused_lifetimes)] use crate::{ + db::common::models::event_models::raw_events::{EventConvertible, RawEvent}, schema::events, - utils::util::{standardize_address, truncate_str}, }; use aptos_protos::transaction::v1::Event as EventPB; use field_count::FieldCount; use serde::{Deserialize, Serialize}; -// p99 currently is 303 so using 300 as a safe max length -const EVENT_TYPE_MAX_LENGTH: usize = 300; - #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(transaction_version, event_index))] #[diesel(table_name = events)] @@ -36,20 +33,15 @@ impl Event { transaction_block_height: i64, event_index: i64, ) -> Self { - let t: &str = event.type_str.as_ref(); - Event { - account_address: standardize_address( - event.key.as_ref().unwrap().account_address.as_str(), - ), - creation_number: event.key.as_ref().unwrap().creation_number as i64, - sequence_number: event.sequence_number as i64, + let raw = RawEvent::from_raw_event( + event, transaction_version, transaction_block_height, - type_: t.to_string(), - data: serde_json::from_str(event.data.as_str()).unwrap(), event_index, - indexed_type: truncate_str(t, EVENT_TYPE_MAX_LENGTH), - } + None, + None, + ); + Self::from_raw(&raw) } pub fn from_events( @@ -72,5 +64,21 @@ impl Event { } } +impl EventConvertible for Event { + fn from_raw(raw: &RawEvent) -> Self { + Event { + sequence_number: raw.sequence_number, + creation_number: raw.creation_number, + account_address: raw.account_address.clone(), + transaction_version: raw.transaction_version, + transaction_block_height: raw.transaction_block_height, + type_: raw.type_.clone(), + data: serde_json::from_str(&raw.data).unwrap(), + event_index: raw.event_index, + indexed_type: raw.indexed_type.clone(), + } + } +} + // Prevent conflicts with other things named `Event` pub type EventModel = Event; diff --git a/rust/processor/src/db/postgres/models/events_models/mod.rs b/rust/processor/src/db/postgres/models/events_models/mod.rs index ce54f7734..9d363699e 100644 --- a/rust/processor/src/db/postgres/models/events_models/mod.rs +++ b/rust/processor/src/db/postgres/models/events_models/mod.rs @@ -2,6 +2,3 @@ // SPDX-License-Identifier: Apache-2.0 pub mod events; - -// parquet model -pub mod parquet_events; diff --git a/rust/processor/src/processors/events_processor.rs b/rust/processor/src/processors/events_processor.rs index cbf196412..e7b37f93e 100644 --- a/rust/processor/src/processors/events_processor.rs +++ b/rust/processor/src/processors/events_processor.rs @@ -108,35 +108,7 @@ impl ProcessorTrait for EventsProcessor { let processing_start = std::time::Instant::now(); let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); - let mut events = vec![]; - for txn in &transactions { - let txn_version = txn.version as i64; - let block_height = txn.block_height as i64; - let txn_data = match txn.txn_data.as_ref() { - Some(data) => data, - None => { - tracing::warn!( - transaction_version = txn_version, - "Transaction data doesn't exist" - ); - PROCESSOR_UNKNOWN_TYPE_COUNT - .with_label_values(&["EventsProcessor"]) - .inc(); - continue; - }, - }; - let default = vec![]; - let raw_events = match txn_data { - TxnData::BlockMetadata(tx_inner) => &tx_inner.events, - TxnData::Genesis(tx_inner) => &tx_inner.events, - TxnData::User(tx_inner) => &tx_inner.events, - TxnData::Validator(tx_inner) => &tx_inner.events, - _ => &default, - }; - - let txn_events = EventModel::from_events(raw_events, txn_version, block_height); - events.extend(txn_events); - } + let events = process_transactions(transactions); let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -179,3 +151,36 @@ impl ProcessorTrait for EventsProcessor { &self.connection_pool } } + +pub fn process_transactions(transactions: Vec) -> Vec { + let mut events = vec![]; + for txn in &transactions { + let txn_version = txn.version as i64; + let block_height = txn.block_height as i64; + let txn_data = match txn.txn_data.as_ref() { + Some(data) => data, + None => { + tracing::warn!( + transaction_version = txn_version, + "Transaction data doesn't exist" + ); + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["EventsProcessor"]) + .inc(); + continue; + }, + }; + let default = vec![]; + let raw_events = match txn_data { + TxnData::BlockMetadata(tx_inner) => &tx_inner.events, + TxnData::Genesis(tx_inner) => &tx_inner.events, + TxnData::User(tx_inner) => &tx_inner.events, + TxnData::Validator(tx_inner) => &tx_inner.events, + _ => &default, + }; + + let txn_events = EventModel::from_events(raw_events, txn_version, block_height); + events.extend(txn_events); + } + events +} diff --git a/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs index 229f694a5..3c6b93409 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs @@ -6,7 +6,7 @@ use crate::{ create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric, ParquetProcessingResult, }, - db::postgres::models::events_models::parquet_events::{Event, ParquetEventModel}, + db::parquet::models::event_models::parquet_events::{Event, ParquetEventModel}, gap_detectors::ProcessingResult, processors::{parquet_processors::ParquetProcessorTrait, ProcessorName, ProcessorTrait}, utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, database::ArcDbPool, util::parse_timestamp}, @@ -90,62 +90,9 @@ impl ProcessorTrait for ParquetEventsProcessor { _: Option, ) -> anyhow::Result { let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); - let mut transaction_version_to_struct_count: AHashMap = AHashMap::new(); - - let mut events = vec![]; - for txn in &transactions { - let txn_version = txn.version as i64; - let block_height = txn.block_height as i64; - let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); - let size_info = match txn.size_info.as_ref() { - Some(size_info) => size_info, - None => { - warn!(version = txn.version, "Transaction size info not found"); - continue; - }, - }; - let txn_data = match txn.txn_data.as_ref() { - Some(data) => data, - None => { - tracing::warn!( - transaction_version = txn_version, - "Transaction data doesn't exist" - ); - PROCESSOR_UNKNOWN_TYPE_COUNT - .with_label_values(&["ParquetEventsProcessor"]) - .inc(); - - continue; - }, - }; - let default = vec![]; - let mut is_user_txn_type = false; - let raw_events = match txn_data { - TxnData::BlockMetadata(tx_inner) => &tx_inner.events, - TxnData::Genesis(tx_inner) => &tx_inner.events, - TxnData::User(tx_inner) => { - is_user_txn_type = true; - &tx_inner.events - }, - TxnData::Validator(txn) => &txn.events, - _ => &default, - }; - - let txn_events = ParquetEventModel::from_events( - raw_events, - txn_version, - block_height, - size_info.event_size_info.as_slice(), - block_timestamp, - is_user_txn_type, - ); - transaction_version_to_struct_count - .entry(txn_version) - .and_modify(|e| *e += txn_events.len() as i64) - .or_insert(txn_events.len() as i64); - - events.extend(txn_events); - } + + let (transaction_version_to_struct_count, events) = + process_transactions_parquet(transactions); let event_parquet_data = ParquetDataGeneric { data: events }; @@ -170,3 +117,65 @@ impl ProcessorTrait for ParquetEventsProcessor { &self.connection_pool } } + +pub fn process_transactions_parquet( + transactions: Vec, +) -> (AHashMap, Vec) { + let mut transaction_version_to_struct_count: AHashMap = AHashMap::new(); + + let mut events = vec![]; + for txn in &transactions { + let txn_version = txn.version as i64; + let block_height = txn.block_height as i64; + let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); + let size_info = match txn.size_info.as_ref() { + Some(size_info) => size_info, + None => { + warn!(version = txn.version, "Transaction size info not found"); + continue; + }, + }; + let txn_data = match txn.txn_data.as_ref() { + Some(data) => data, + None => { + tracing::warn!( + transaction_version = txn_version, + "Transaction data doesn't exist" + ); + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["ParquetEventsProcessor"]) + .inc(); + + continue; + }, + }; + let default = vec![]; + let mut is_user_txn_type = false; + let raw_events = match txn_data { + TxnData::BlockMetadata(tx_inner) => &tx_inner.events, + TxnData::Genesis(tx_inner) => &tx_inner.events, + TxnData::User(tx_inner) => { + is_user_txn_type = true; + &tx_inner.events + }, + TxnData::Validator(txn) => &txn.events, + _ => &default, + }; + + let txn_events = ParquetEventModel::from_events( + raw_events, + txn_version, + block_height, + size_info.event_size_info.as_slice(), + block_timestamp, + is_user_txn_type, + ); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += txn_events.len() as i64) + .or_insert(txn_events.len() as i64); + + events.extend(txn_events); + } + (transaction_version_to_struct_count, events) +} diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 1227395fe..a812c4af8 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -115,6 +115,9 @@ bitflags! { // More tables const CURRENT_TABLE_ITEMS = 1 << 24; const BLOCK_METADATA_TRANSACTIONS = 1 << 25; + + // Events + const EVENTS = 1 << 30; // start at 30 to avoid conflicts with other flags. } } diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index 21e171715..07e7c3019 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -3,7 +3,10 @@ use super::{db_config::DbConfig, processor_config::ProcessorConfig}; use crate::{ - parquet_processors::parquet_default_processor::ParquetDefaultProcessor, + parquet_processors::{ + parquet_default_processor::ParquetDefaultProcessor, + parquet_events_processor::ParquetEventsProcessor, + }, processors::{ account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, default_processor::DefaultProcessor, events_processor::EventsProcessor, @@ -81,6 +84,10 @@ impl RunnableConfig for IndexerProcessorConfig { let parquet_default_processor = ParquetDefaultProcessor::new(self.clone()).await?; parquet_default_processor.run_processor().await }, + ProcessorConfig::ParquetEventsProcessor(_) => { + let parquet_events_processor = ParquetEventsProcessor::new(self.clone()).await?; + parquet_events_processor.run_processor().await + }, } } diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index ee1d4b583..3da369030 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -8,13 +8,16 @@ use crate::{ use ahash::AHashMap; use processor::{ bq_analytics::generic_parquet_processor::NamedTable, - db::parquet::models::default_models::{ - parquet_block_metadata_transactions::BlockMetadataTransaction, - parquet_move_modules::MoveModule, - parquet_move_resources::MoveResource, - parquet_move_tables::{CurrentTableItem, TableItem, TableMetadata}, - parquet_transactions::Transaction, - parquet_write_set_changes::WriteSetChangeModel, + db::parquet::models::{ + default_models::{ + parquet_block_metadata_transactions::BlockMetadataTransaction, + parquet_move_modules::MoveModule, + parquet_move_resources::MoveResource, + parquet_move_tables::{CurrentTableItem, TableItem, TableMetadata}, + parquet_transactions::Transaction, + parquet_write_set_changes::WriteSetChangeModel, + }, + event_models::parquet_events::Event as EventPQ, }, }; use serde::{Deserialize, Serialize}; @@ -66,6 +69,7 @@ pub enum ProcessorConfig { MonitoringProcessor(DefaultProcessorConfig), // ParquetProcessor ParquetDefaultProcessor(ParquetDefaultProcessorConfig), + ParquetEventsProcessor(ParquetDefaultProcessorConfig), } impl ProcessorConfig { @@ -81,7 +85,8 @@ impl ProcessorConfig { /// is useful for querying the status from the processor status table in the database. pub fn get_processor_status_table_names(&self) -> anyhow::Result> { match self { - ProcessorConfig::ParquetDefaultProcessor(config) => { + ProcessorConfig::ParquetDefaultProcessor(config) + | ProcessorConfig::ParquetEventsProcessor(config) => { // Get the processor name as a prefix let processor_name = self.name(); @@ -116,6 +121,7 @@ impl ProcessorConfig { WriteSetChangeModel::TABLE_NAME.to_string(), TableItem::TABLE_NAME.to_string(), MoveModule::TABLE_NAME.to_string(), + EventPQ::TABLE_NAME.to_string(), BlockMetadataTransaction::TABLE_NAME.to_string(), CurrentTableItem::TABLE_NAME.to_string(), TableMetadata::TABLE_NAME.to_string(), diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 2df9a14e8..31aa229b2 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -12,14 +12,17 @@ use enum_dispatch::enum_dispatch; use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; use parquet::schema::types::Type; use processor::{ - db::parquet::models::default_models::{ - parquet_block_metadata_transactions::BlockMetadataTransaction, - parquet_move_modules::MoveModule, - parquet_move_resources::MoveResource, - parquet_move_tables::{CurrentTableItem, TableItem}, - parquet_table_metadata::TableMetadata, - parquet_transactions::Transaction as ParquetTransaction, - parquet_write_set_changes::WriteSetChangeModel, + db::parquet::models::{ + default_models::{ + parquet_block_metadata_transactions::BlockMetadataTransaction, + parquet_move_modules::MoveModule, + parquet_move_resources::MoveResource, + parquet_move_tables::{CurrentTableItem, TableItem}, + parquet_table_metadata::TableMetadata, + parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, + }, + event_models::parquet_events::Event, }, worker::TableFlags, }; @@ -33,6 +36,7 @@ use std::{ use strum::{Display, EnumIter}; pub mod parquet_default_processor; +pub mod parquet_events_processor; const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; @@ -56,6 +60,7 @@ const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; ) )] pub enum ParquetTypeEnum { + Event, MoveResources, WriteSetChanges, Transactions, @@ -120,6 +125,7 @@ impl_parquet_trait!( ParquetTypeEnum::BlockMetadataTransactions ); impl_parquet_trait!(TableMetadata, ParquetTypeEnum::TableMetadata); +impl_parquet_trait!(Event, ParquetTypeEnum::Event); #[derive(Debug, Clone)] #[enum_dispatch(ParquetTypeTrait)] @@ -129,6 +135,7 @@ pub enum ParquetTypeStructs { Transaction(Vec), TableItem(Vec), MoveModule(Vec), + Event(Vec), CurrentTableItem(Vec), BlockMetadataTransaction(Vec), TableMetadata(Vec), @@ -147,9 +154,11 @@ impl ParquetTypeStructs { ParquetTypeStructs::BlockMetadataTransaction(Vec::new()) }, ParquetTypeEnum::TableMetadata => ParquetTypeStructs::TableMetadata(Vec::new()), + ParquetTypeEnum::Event => ParquetTypeStructs::Event(Vec::new()), } } + /// Appends data to the current buffer within each ParquetTypeStructs variant. pub fn append(&mut self, other: ParquetTypeStructs) -> Result<(), ProcessorError> { macro_rules! handle_append { ($self_data:expr, $other_data:expr) => {{ @@ -189,6 +198,9 @@ impl ParquetTypeStructs { ) => { handle_append!(self_data, other_data) }, + (ParquetTypeStructs::Event(self_data), ParquetTypeStructs::Event(other_data)) => { + handle_append!(self_data, other_data) + }, ( ParquetTypeStructs::CurrentTableItem(self_data), ParquetTypeStructs::CurrentTableItem(other_data), diff --git a/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs new file mode 100644 index 000000000..992286f12 --- /dev/null +++ b/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs @@ -0,0 +1,174 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + parquet_processors::{ + initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, + set_backfill_table_flag, ParquetTypeEnum, + }, + steps::{ + common::{ + parquet_version_tracker_step::ParquetVersionTrackerStep, + processor_status_saver::get_processor_status_saver, + }, + parquet_events_processor::parquet_events_extractor::ParquetEventsExtractor, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{run_migrations, ArcDbPool}, + starting_version::get_min_last_success_version_parquet, + }, +}; +use anyhow::Context; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS}, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use parquet::schema::types::Type; +use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::parquet::models::event_models::parquet_events::Event as EventPQ, +}; +use std::{collections::HashMap, sync::Arc}; +use tracing::{debug, info}; + +pub struct ParquetEventsProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, // for processor status +} + +impl ParquetEventsProcessor { + pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { + let db_pool = initialize_database_pool(&config.db_config).await?; + Ok(Self { config, db_pool }) + } +} +#[async_trait::async_trait] +impl ProcessorTrait for ParquetEventsProcessor { + fn name(&self) -> &'static str { + self.config.processor_config.name() + } + + async fn run_processor(&self) -> anyhow::Result<()> { + // Run Migrations + let parquet_db_config = match self.config.db_config { + DbConfig::ParquetConfig(ref parquet_config) => { + run_migrations( + parquet_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + parquet_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid db config for ParquetEventsProcessor {:?}", + self.config.db_config + )); + }, + }; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let parquet_processor_config = match self.config.processor_config.clone() { + ProcessorConfig::ParquetEventsProcessor(parquet_processor_config) => { + parquet_processor_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor configuration for ParquetEventsProcessor {:?}", + self.config.processor_config + )); + }, + }; + + let processor_status_table_names = self + .config + .processor_config + .get_processor_status_table_names() + .context("Failed to get table names for the processor status table")?; + + let starting_version = get_min_last_success_version_parquet( + &self.config, + self.db_pool.clone(), + processor_status_table_names, + ) + .await?; + + // Define processor transaction stream config + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + + let backfill_table = set_backfill_table_flag(parquet_processor_config.backfill_table); + let parquet_events_extractor = ParquetEventsExtractor { + opt_in_tables: backfill_table, + }; + + let gcs_client = + initialize_gcs_client(parquet_db_config.google_application_credentials.clone()).await; + + let parquet_type_to_schemas: HashMap> = + [(ParquetTypeEnum::Event, EventPQ::schema())] + .into_iter() + .collect(); + + let default_size_buffer_step = initialize_parquet_buffer_step( + gcs_client.clone(), + parquet_type_to_schemas, + parquet_processor_config.upload_interval, + parquet_processor_config.max_buffer_size, + parquet_db_config.bucket_name.clone(), + parquet_db_config.bucket_root.clone(), + self.name().to_string(), + ) + .await + .unwrap_or_else(|e| { + panic!("Failed to initialize parquet buffer step: {:?}", e); + }); + + let parquet_version_tracker_step = ParquetVersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); + + let channel_size = parquet_processor_config.channel_size; + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(parquet_events_extractor.into_runnable_step(), channel_size) + .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .connect_to( + parquet_version_tracker_step.into_runnable_step(), + channel_size, + ) + .end_and_return_output_receiver(channel_size); + + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/parquet_processors/parquet_user_transaction_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_user_transaction_processor.rs new file mode 100644 index 000000000..e69de29bb diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index f41861537..8fd038d8f 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -6,6 +6,7 @@ pub mod events_processor; pub mod fungible_asset_processor; pub mod objects_processor; pub mod parquet_default_processor; +pub mod parquet_events_processor; pub mod stake_processor; pub mod token_v2_processor; pub mod user_transaction_processor; diff --git a/rust/sdk-processor/src/steps/parquet_events_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_events_processor/mod.rs new file mode 100644 index 000000000..83980cef2 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_events_processor/mod.rs @@ -0,0 +1 @@ +pub mod parquet_events_extractor; diff --git a/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs b/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs new file mode 100644 index 000000000..a76e22a87 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs @@ -0,0 +1,65 @@ +use crate::{ + parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, + utils::parquet_extractor_helper::add_to_map_if_opted_in_for_backfill, +}; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + processors::parquet_processors::parquet_events_processor::process_transactions_parquet, + worker::TableFlags, +}; +use std::collections::HashMap; + +/// Extracts parquet data from transactions, allowing optional selection of specific tables. +pub struct ParquetEventsExtractor +where + Self: Processable + Send + Sized + 'static, +{ + pub opt_in_tables: TableFlags, +} + +type ParquetTypeMap = HashMap; + +#[async_trait] +impl Processable for ParquetEventsExtractor { + type Input = Vec; + type Output = ParquetTypeMap; + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext, + ) -> anyhow::Result>, ProcessorError> { + let (_txn_ver_map, events) = process_transactions_parquet(transactions.data); + + let mut map: HashMap = HashMap::new(); + + // Array of tuples for each data type and its corresponding enum variant and flag + let data_types = [( + TableFlags::EVENTS, + ParquetTypeEnum::Event, + ParquetTypeStructs::Event(events), + )]; + + // Populate the map based on opt-in tables + add_to_map_if_opted_in_for_backfill(self.opt_in_tables, &mut map, data_types.to_vec()); + + Ok(Some(TransactionContext { + data: map, + metadata: transactions.metadata, + })) + } +} + +impl AsyncStep for ParquetEventsExtractor {} + +impl NamedStep for ParquetEventsExtractor { + fn name(&self) -> String { + "ParquetEventsExtractor".to_string() + } +}