From 7d3dbe7c8846ce840eaa0dd84249ea90d0864705 Mon Sep 17 00:00:00 2001 From: dermanyang Date: Tue, 26 Nov 2024 14:18:27 -0800 Subject: [PATCH 1/8] Init --- rust/processor/parser.yaml | 10 +- .../postgres/models/events_models/events.rs | 18 ++ .../db/postgres/models/events_models/mod.rs | 3 +- .../models/events_models/parquet_events.rs | 21 ++ .../models/events_models/raw_events.rs | 22 ++ .../src/processors/events_processor.rs | 63 +++--- .../parquet_events_processor.rs | 121 +++++----- .../src/parquet_processors/mod.rs | 17 +- .../parquet_events_processor.rs | 212 ++++++++++++++++++ .../src/steps/common/gcs_uploader.rs | 4 + .../src/steps/parquet_events_processor/mod.rs | 0 .../parquet_events_extractor.rs | 83 +++++++ 12 files changed, 479 insertions(+), 95 deletions(-) create mode 100644 rust/processor/src/db/postgres/models/events_models/raw_events.rs create mode 100644 rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs create mode 100644 rust/sdk-processor/src/steps/parquet_events_processor/mod.rs create mode 100644 rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs diff --git a/rust/processor/parser.yaml b/rust/processor/parser.yaml index 25ed06b54..373978ea9 100644 --- a/rust/processor/parser.yaml +++ b/rust/processor/parser.yaml @@ -4,7 +4,9 @@ health_check_port: 8084 server_config: processor_config: - type: default_processor - postgres_connection_string: postgresql://postgres:@localhost:5432/default_processor - indexer_grpc_data_service_address: http://127.0.0.1:50051 - auth_token: AUTH_TOKEN \ No newline at end of file + type: token_v2_processor + postgres_connection_string: postgresql://postgres:@localhost:5432/example + indexer_grpc_data_service_address: https://grpc.testnet.aptoslabs.com:443 + auth_token: "aptoslabs_No5KWpMQKvz_7ohYQejzxmgpL27BS9gK784GpA14EP9mb" + starting_version: 5985000000 + ending_version: 6000000000 \ No newline at end of file diff --git a/rust/processor/src/db/postgres/models/events_models/events.rs b/rust/processor/src/db/postgres/models/events_models/events.rs index 6747636ab..6eb00ed55 100644 --- a/rust/processor/src/db/postgres/models/events_models/events.rs +++ b/rust/processor/src/db/postgres/models/events_models/events.rs @@ -11,6 +11,8 @@ use aptos_protos::transaction::v1::Event as EventPB; use field_count::FieldCount; use serde::{Deserialize, Serialize}; +use super::raw_events::{EventConvertible, RawEvent}; + // p99 currently is 303 so using 300 as a safe max length const EVENT_TYPE_MAX_LENGTH: usize = 300; @@ -74,3 +76,19 @@ impl Event { // Prevent conflicts with other things named `Event` pub type EventModel = Event; + +impl EventConvertible for Event { + fn from_raw(raw_item: &RawEvent) -> Self { + Event { + sequence_number: raw_item.sequence_number, + creation_number: raw_item.creation_number, + account_address: raw_item.account_address.clone(), + transaction_version: raw_item.txn_version, + transaction_block_height: raw_item.block_height, + type_: raw_item.event_type.clone(), + data: serde_json::from_str(raw_item.data.as_str()).unwrap(), + event_index: raw_item.event_index, + indexed_type: truncate_str(&raw_item.event_type, EVENT_TYPE_MAX_LENGTH), + } + } +} diff --git a/rust/processor/src/db/postgres/models/events_models/mod.rs b/rust/processor/src/db/postgres/models/events_models/mod.rs index ce54f7734..cc40b5cef 100644 --- a/rust/processor/src/db/postgres/models/events_models/mod.rs +++ b/rust/processor/src/db/postgres/models/events_models/mod.rs @@ -2,6 +2,5 @@ // SPDX-License-Identifier: Apache-2.0 pub mod events; - -// parquet model pub mod parquet_events; +pub mod raw_events; diff --git a/rust/processor/src/db/postgres/models/events_models/parquet_events.rs b/rust/processor/src/db/postgres/models/events_models/parquet_events.rs index 683b6a1f7..4dff2f62b 100644 --- a/rust/processor/src/db/postgres/models/events_models/parquet_events.rs +++ b/rust/processor/src/db/postgres/models/events_models/parquet_events.rs @@ -13,6 +13,8 @@ use lazy_static::lazy_static; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; +use super::raw_events::{EventConvertible, RawEvent}; + // p99 currently is 303 so using 300 as a safe max length const EVENT_TYPE_MAX_LENGTH: usize = 300; const DEFAULT_CREATION_NUMBER: i64 = 0; @@ -201,4 +203,23 @@ fn handle_user_txn_type( .collect() } +impl EventConvertible for Event { + fn from_raw(raw_item: &RawEvent) -> Self { + Event { + sequence_number: raw_item.sequence_number, + creation_number: raw_item.creation_number, + account_address: raw_item.account_address.clone(), + txn_version: raw_item.txn_version, + block_height: raw_item.block_height, + event_type: raw_item.event_type.clone(), + data: raw_item.data.clone(), + event_index: raw_item.event_index, + indexed_type: truncate_str(&raw_item.event_type, EVENT_TYPE_MAX_LENGTH), + type_tag_bytes: raw_item.type_tag_bytes, + total_bytes: raw_item.total_bytes, + event_version: raw_item.event_version, + block_timestamp: raw_item.block_timestamp, + } + } +} pub type ParquetEventModel = Event; diff --git a/rust/processor/src/db/postgres/models/events_models/raw_events.rs b/rust/processor/src/db/postgres/models/events_models/raw_events.rs new file mode 100644 index 000000000..adaf44160 --- /dev/null +++ b/rust/processor/src/db/postgres/models/events_models/raw_events.rs @@ -0,0 +1,22 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RawEvent { + pub txn_version: i64, + pub account_address: String, + pub sequence_number: i64, + pub creation_number: i64, + pub block_height: i64, + pub event_type: String, + pub data: String, + pub event_index: i64, + pub indexed_type: String, + pub type_tag_bytes: i64, + pub total_bytes: i64, + pub event_version: i8, + pub block_timestamp: chrono::NaiveDateTime, +} + +pub trait EventConvertible { + fn from_raw(raw_item: &RawEvent) -> Self; +} diff --git a/rust/processor/src/processors/events_processor.rs b/rust/processor/src/processors/events_processor.rs index cbf196412..e7b37f93e 100644 --- a/rust/processor/src/processors/events_processor.rs +++ b/rust/processor/src/processors/events_processor.rs @@ -108,35 +108,7 @@ impl ProcessorTrait for EventsProcessor { let processing_start = std::time::Instant::now(); let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); - let mut events = vec![]; - for txn in &transactions { - let txn_version = txn.version as i64; - let block_height = txn.block_height as i64; - let txn_data = match txn.txn_data.as_ref() { - Some(data) => data, - None => { - tracing::warn!( - transaction_version = txn_version, - "Transaction data doesn't exist" - ); - PROCESSOR_UNKNOWN_TYPE_COUNT - .with_label_values(&["EventsProcessor"]) - .inc(); - continue; - }, - }; - let default = vec![]; - let raw_events = match txn_data { - TxnData::BlockMetadata(tx_inner) => &tx_inner.events, - TxnData::Genesis(tx_inner) => &tx_inner.events, - TxnData::User(tx_inner) => &tx_inner.events, - TxnData::Validator(tx_inner) => &tx_inner.events, - _ => &default, - }; - - let txn_events = EventModel::from_events(raw_events, txn_version, block_height); - events.extend(txn_events); - } + let events = process_transactions(transactions); let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -179,3 +151,36 @@ impl ProcessorTrait for EventsProcessor { &self.connection_pool } } + +pub fn process_transactions(transactions: Vec) -> Vec { + let mut events = vec![]; + for txn in &transactions { + let txn_version = txn.version as i64; + let block_height = txn.block_height as i64; + let txn_data = match txn.txn_data.as_ref() { + Some(data) => data, + None => { + tracing::warn!( + transaction_version = txn_version, + "Transaction data doesn't exist" + ); + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["EventsProcessor"]) + .inc(); + continue; + }, + }; + let default = vec![]; + let raw_events = match txn_data { + TxnData::BlockMetadata(tx_inner) => &tx_inner.events, + TxnData::Genesis(tx_inner) => &tx_inner.events, + TxnData::User(tx_inner) => &tx_inner.events, + TxnData::Validator(tx_inner) => &tx_inner.events, + _ => &default, + }; + + let txn_events = EventModel::from_events(raw_events, txn_version, block_height); + events.extend(txn_events); + } + events +} diff --git a/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs index 229f694a5..8a9503053 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs @@ -90,62 +90,9 @@ impl ProcessorTrait for ParquetEventsProcessor { _: Option, ) -> anyhow::Result { let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); - let mut transaction_version_to_struct_count: AHashMap = AHashMap::new(); - - let mut events = vec![]; - for txn in &transactions { - let txn_version = txn.version as i64; - let block_height = txn.block_height as i64; - let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); - let size_info = match txn.size_info.as_ref() { - Some(size_info) => size_info, - None => { - warn!(version = txn.version, "Transaction size info not found"); - continue; - }, - }; - let txn_data = match txn.txn_data.as_ref() { - Some(data) => data, - None => { - tracing::warn!( - transaction_version = txn_version, - "Transaction data doesn't exist" - ); - PROCESSOR_UNKNOWN_TYPE_COUNT - .with_label_values(&["ParquetEventsProcessor"]) - .inc(); - - continue; - }, - }; - let default = vec![]; - let mut is_user_txn_type = false; - let raw_events = match txn_data { - TxnData::BlockMetadata(tx_inner) => &tx_inner.events, - TxnData::Genesis(tx_inner) => &tx_inner.events, - TxnData::User(tx_inner) => { - is_user_txn_type = true; - &tx_inner.events - }, - TxnData::Validator(txn) => &txn.events, - _ => &default, - }; - - let txn_events = ParquetEventModel::from_events( - raw_events, - txn_version, - block_height, - size_info.event_size_info.as_slice(), - block_timestamp, - is_user_txn_type, - ); - transaction_version_to_struct_count - .entry(txn_version) - .and_modify(|e| *e += txn_events.len() as i64) - .or_insert(txn_events.len() as i64); - - events.extend(txn_events); - } + + let (transaction_version_to_struct_count, events) = + process_transactions_parquet(transactions); let event_parquet_data = ParquetDataGeneric { data: events }; @@ -170,3 +117,65 @@ impl ProcessorTrait for ParquetEventsProcessor { &self.connection_pool } } + +pub fn process_transactions_parquet( + transactions: Vec, +) -> (AHashMap, Vec) { + let mut transaction_version_to_struct_count: AHashMap = AHashMap::new(); + + let mut events = vec![]; + for txn in &transactions { + let txn_version = txn.version as i64; + let block_height = txn.block_height as i64; + let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); + let size_info = match txn.size_info.as_ref() { + Some(size_info) => size_info, + None => { + warn!(version = txn.version, "Transaction size info not found"); + continue; + }, + }; + let txn_data = match txn.txn_data.as_ref() { + Some(data) => data, + None => { + tracing::warn!( + transaction_version = txn_version, + "Transaction data doesn't exist" + ); + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["ParquetEventsProcessor"]) + .inc(); + + continue; + }, + }; + let default = vec![]; + let mut is_user_txn_type = false; + let raw_events = match txn_data { + TxnData::BlockMetadata(tx_inner) => &tx_inner.events, + TxnData::Genesis(tx_inner) => &tx_inner.events, + TxnData::User(tx_inner) => { + is_user_txn_type = true; + &tx_inner.events + }, + TxnData::Validator(txn) => &txn.events, + _ => &default, + }; + + let txn_events = ParquetEventModel::from_events( + raw_events, + txn_version, + block_height, + size_info.event_size_info.as_slice(), + block_timestamp, + is_user_txn_type, + ); + transaction_version_to_struct_count + .entry(txn_version) + .and_modify(|e| *e += txn_events.len() as i64) + .or_insert(txn_events.len() as i64); + + events.extend(txn_events); + } + (transaction_version_to_struct_count, events) +} diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 218c9d8ac..18e280e6c 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -1,13 +1,17 @@ use aptos_indexer_processor_sdk::utils::errors::ProcessorError; -use processor::db::parquet::models::default_models::{ - parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, - parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, - parquet_write_set_changes::WriteSetChangeModel, +use processor::db::{ + parquet::models::default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, + }, + postgres::models::events_models::parquet_events::Event, }; use serde::{Deserialize, Serialize}; use strum::{Display, EnumIter}; pub mod parquet_default_processor; +pub mod parquet_events_processor; /// Enum representing the different types of Parquet files that can be processed. #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Display, EnumIter)] @@ -33,6 +37,7 @@ pub enum ParquetTypeEnum { Transaction, TableItem, MoveModule, + Event, } #[derive(Clone, Debug, strum::EnumDiscriminants)] @@ -57,6 +62,7 @@ pub enum ParquetTypeStructs { Transaction(Vec), TableItem(Vec), MoveModule(Vec), + Event(Vec), } impl ParquetTypeStructs { @@ -67,6 +73,7 @@ impl ParquetTypeStructs { ParquetTypeEnum::Transaction => ParquetTypeStructs::Transaction(Vec::new()), ParquetTypeEnum::TableItem => ParquetTypeStructs::TableItem(Vec::new()), ParquetTypeEnum::MoveModule => ParquetTypeStructs::MoveModule(Vec::new()), + ParquetTypeEnum::Event => ParquetTypeStructs::Event(Vec::new()), } } @@ -77,6 +84,7 @@ impl ParquetTypeStructs { ParquetTypeStructs::Transaction(_) => "transactions", ParquetTypeStructs::TableItem(_) => "table_items", ParquetTypeStructs::MoveModule(_) => "move_modules", + ParquetTypeStructs::Event(_) => "events", } } @@ -87,6 +95,7 @@ impl ParquetTypeStructs { ParquetTypeStructs::Transaction(data) => allocative::size_of_unique(data), ParquetTypeStructs::TableItem(data) => allocative::size_of_unique(data), ParquetTypeStructs::MoveModule(data) => allocative::size_of_unique(data), + ParquetTypeStructs::Event(data) => allocative::size_of_unique(data), } } diff --git a/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs new file mode 100644 index 000000000..b8e6ae7f5 --- /dev/null +++ b/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs @@ -0,0 +1,212 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + parquet_processors::ParquetTypeEnum, + steps::{ + common::{ + gcs_uploader::{create_new_writer, GCSUploader}, + parquet_buffer_step::ParquetBufferStep, + }, + parquet_default_processor::parquet_default_extractor::ParquetDefaultExtractor, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{new_db_pool, run_migrations, ArcDbPool}, + starting_version::{get_min_last_success_version_parquet, get_starting_version}, + }, +}; +use anyhow::Context; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::TransactionStreamStep, + traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, +}; +use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; +use parquet::schema::types::Type; +use processor::{ + bq_analytics::generic_parquet_processor::HasParquetSchema, + db::{ + parquet::models::default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, + parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, + }, + postgres::models::events_models::events::Event, + }, + worker::TableFlags, +}; +use std::{collections::HashMap, sync::Arc, time::Duration}; +use tracing::{debug, info}; + +const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; + +pub struct ParquetDefaultProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, // for processor status +} + +impl ParquetDefaultProcessor { + pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { + match config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + let conn_pool = new_db_pool( + &postgres_config.connection_string, + Some(postgres_config.db_pool_size), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create connection pool for PostgresConfig: {:?}", + e + ) + })?; + + Ok(Self { + config, + db_pool: conn_pool, + }) + }, + } + } +} + +#[async_trait::async_trait] +impl ProcessorTrait for ParquetDefaultProcessor { + fn name(&self) -> &'static str { + self.config.processor_config.name() + } + + async fn run_processor(&self) -> anyhow::Result<()> { + // Run Migrations + match self.config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + }, + } + + // Determine the processing mode (backfill or regular) + let is_backfill = self.config.backfill_config.is_some(); + + // TODO: Revisit when parquet version tracker is available. + // Query the starting version + let starting_version = if is_backfill { + get_starting_version(&self.config, self.db_pool.clone()).await? + } else { + // Regular mode logic: Fetch the minimum last successful version across all relevant tables + let table_names = self + .config + .processor_config + .get_table_names() + .context("Failed to get table names for the processor")?; + + get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) + .await? + }; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let parquet_processor_config = match self.config.processor_config.clone() { + ProcessorConfig::ParquetDefaultProcessor(parquet_processor_config) => { + parquet_processor_config + }, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor configuration for ParquetDefaultProcessor {:?}", + self.config.processor_config + )); + }, + }; + + // Define processor transaction stream config + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + + let parquet_default_extractor = ParquetDefaultExtractor { + opt_in_tables: TableFlags::empty(), + }; + + let credentials = parquet_processor_config + .google_application_credentials + .clone(); + + if let Some(credentials) = credentials { + std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); + } + + let gcs_config = GcsClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client config"); + + let gcs_client = Arc::new(GCSClient::new(gcs_config)); + + let parquet_type_to_schemas: HashMap> = + [(ParquetTypeEnum::Event, Event::schema())] + .into_iter() + .collect(); + + let parquet_type_to_writer = parquet_type_to_schemas + .iter() + .map(|(key, schema)| { + let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); + (*key, writer) + }) + .collect(); + + let buffer_uploader = GCSUploader::new( + gcs_client.clone(), + parquet_type_to_schemas, + parquet_type_to_writer, + parquet_processor_config.bucket_name.clone(), + parquet_processor_config.bucket_root.clone(), + self.name().to_string(), + )?; + + let channel_size = parquet_processor_config.channel_size; + + let default_size_buffer_step = ParquetBufferStep::new( + Duration::from_secs(parquet_processor_config.parquet_upload_interval), + buffer_uploader, + parquet_processor_config.max_buffer_size, + ); + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(parquet_default_extractor.into_runnable_step(), channel_size) + .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); + + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/steps/common/gcs_uploader.rs b/rust/sdk-processor/src/steps/common/gcs_uploader.rs index c6d8f435f..bb400a4fd 100644 --- a/rust/sdk-processor/src/steps/common/gcs_uploader.rs +++ b/rust/sdk-processor/src/steps/common/gcs_uploader.rs @@ -61,6 +61,10 @@ impl Uploadable for GCSUploader { self.upload_generic(&modules[..], ParquetTypeEnum::MoveModule, table_name) .await }, + ParquetTypeStructs::Event(events) => { + self.upload_generic(&events[..], ParquetTypeEnum::Event, table_name) + .await + }, }; if let Err(e) = result { diff --git a/rust/sdk-processor/src/steps/parquet_events_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_events_processor/mod.rs new file mode 100644 index 000000000..e69de29bb diff --git a/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs b/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs new file mode 100644 index 000000000..e66aea86a --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs @@ -0,0 +1,83 @@ +use crate::{ + parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, + utils::parquet_extractor_helper::add_to_map_if_opted_in_for_backfill, +}; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + db::{ + common::models::default_models::raw_table_items::TableItemConvertible, + parquet::models::default_models::parquet_move_tables::TableItem, + }, + processors::parquet_processors::parquet_events_processor::process_transactions_parquet, + worker::TableFlags, +}; +use std::collections::HashMap; +use tracing::debug; + +/// Extracts parquet data from transactions, allowing optional selection of specific tables. +pub struct ParquetEventsExtractor +where + Self: Processable + Send + Sized + 'static, +{ + pub opt_in_tables: TableFlags, +} + +type ParquetTypeMap = HashMap; + +#[async_trait] +impl Processable for ParquetEventsExtractor { + type Input = Vec; + type Output = ParquetTypeMap; + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext, + ) -> anyhow::Result>, ProcessorError> { + let (_, raw_table_items, _, _) = process_transactions(transactions.data.clone()); + + let parquet_table_items: Vec = + raw_table_items.iter().map(TableItem::from_raw).collect(); + + let (hmap, events) = process_transactions_parquet(transactions.data); + + let mut map: HashMap = HashMap::new(); + + // Array of tuples for each data type and its corresponding enum variant and flag + let data_types = [( + TableFlags::MOVE_RESOURCES, + ParquetTypeEnum::MoveResource, + ParquetTypeStructs::MoveResource(move_resources), + )]; + + // Populate the map based on opt-in tables + for (table_flag, enum_type, data) in data_types { + add_to_map_if_opted_in_for_backfill( + self.opt_in_tables, + &mut map, + table_flag, + enum_type, + data, + ); + } + + Ok(Some(TransactionContext { + data: map, + metadata: transactions.metadata, + })) + } +} + +impl AsyncStep for ParquetEventsExtractor {} + +impl NamedStep for ParquetEventsExtractor { + fn name(&self) -> String { + "ParquetEventsExtractor".to_string() + } +} From 5228492d11ec8e1441b3b9b52c38bddcfdbd459b Mon Sep 17 00:00:00 2001 From: dermanyang Date: Wed, 27 Nov 2024 14:04:09 -0800 Subject: [PATCH 2/8] Implement extractor and refactor models --- rust/processor/parser.yaml | 10 ++- .../postgres/models/events_models/events.rs | 51 ++++++--------- .../models/events_models/parquet_events.rs | 62 ++++++++----------- .../models/events_models/raw_events.rs | 52 +++++++++++++--- rust/processor/src/worker.rs | 3 + .../src/config/indexer_processor_config.rs | 9 ++- .../src/config/processor_config.rs | 8 ++- .../parquet_events_processor.rs | 29 +++------ rust/sdk-processor/src/steps/mod.rs | 1 + .../src/steps/parquet_events_processor/mod.rs | 1 + .../parquet_events_extractor.rs | 18 ++---- 11 files changed, 127 insertions(+), 117 deletions(-) diff --git a/rust/processor/parser.yaml b/rust/processor/parser.yaml index 373978ea9..25ed06b54 100644 --- a/rust/processor/parser.yaml +++ b/rust/processor/parser.yaml @@ -4,9 +4,7 @@ health_check_port: 8084 server_config: processor_config: - type: token_v2_processor - postgres_connection_string: postgresql://postgres:@localhost:5432/example - indexer_grpc_data_service_address: https://grpc.testnet.aptoslabs.com:443 - auth_token: "aptoslabs_No5KWpMQKvz_7ohYQejzxmgpL27BS9gK784GpA14EP9mb" - starting_version: 5985000000 - ending_version: 6000000000 \ No newline at end of file + type: default_processor + postgres_connection_string: postgresql://postgres:@localhost:5432/default_processor + indexer_grpc_data_service_address: http://127.0.0.1:50051 + auth_token: AUTH_TOKEN \ No newline at end of file diff --git a/rust/processor/src/db/postgres/models/events_models/events.rs b/rust/processor/src/db/postgres/models/events_models/events.rs index 6eb00ed55..6d472e341 100644 --- a/rust/processor/src/db/postgres/models/events_models/events.rs +++ b/rust/processor/src/db/postgres/models/events_models/events.rs @@ -3,19 +3,13 @@ #![allow(clippy::extra_unused_lifetimes)] -use crate::{ - schema::events, - utils::util::{standardize_address, truncate_str}, -}; +use crate::schema::events; use aptos_protos::transaction::v1::Event as EventPB; use field_count::FieldCount; use serde::{Deserialize, Serialize}; use super::raw_events::{EventConvertible, RawEvent}; -// p99 currently is 303 so using 300 as a safe max length -const EVENT_TYPE_MAX_LENGTH: usize = 300; - #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(transaction_version, event_index))] #[diesel(table_name = events)] @@ -38,20 +32,15 @@ impl Event { transaction_block_height: i64, event_index: i64, ) -> Self { - let t: &str = event.type_str.as_ref(); - Event { - account_address: standardize_address( - event.key.as_ref().unwrap().account_address.as_str(), - ), - creation_number: event.key.as_ref().unwrap().creation_number as i64, - sequence_number: event.sequence_number as i64, + let raw = RawEvent::from_raw_event( + event, transaction_version, transaction_block_height, - type_: t.to_string(), - data: serde_json::from_str(event.data.as_str()).unwrap(), event_index, - indexed_type: truncate_str(t, EVENT_TYPE_MAX_LENGTH), - } + None, + None, + ); + Self::from_raw(&raw) } pub fn from_events( @@ -74,21 +63,21 @@ impl Event { } } -// Prevent conflicts with other things named `Event` -pub type EventModel = Event; - impl EventConvertible for Event { - fn from_raw(raw_item: &RawEvent) -> Self { + fn from_raw(raw: &RawEvent) -> Self { Event { - sequence_number: raw_item.sequence_number, - creation_number: raw_item.creation_number, - account_address: raw_item.account_address.clone(), - transaction_version: raw_item.txn_version, - transaction_block_height: raw_item.block_height, - type_: raw_item.event_type.clone(), - data: serde_json::from_str(raw_item.data.as_str()).unwrap(), - event_index: raw_item.event_index, - indexed_type: truncate_str(&raw_item.event_type, EVENT_TYPE_MAX_LENGTH), + sequence_number: raw.sequence_number, + creation_number: raw.creation_number, + account_address: raw.account_address.clone(), + transaction_version: raw.transaction_version, + transaction_block_height: raw.transaction_block_height, + type_: raw.type_.clone(), + data: serde_json::from_str(&raw.data).unwrap(), + event_index: raw.event_index, + indexed_type: raw.indexed_type.clone(), } } } + +// Prevent conflicts with other things named `Event` +pub type EventModel = Event; diff --git a/rust/processor/src/db/postgres/models/events_models/parquet_events.rs b/rust/processor/src/db/postgres/models/events_models/parquet_events.rs index 4dff2f62b..5b52ddfcc 100644 --- a/rust/processor/src/db/postgres/models/events_models/parquet_events.rs +++ b/rust/processor/src/db/postgres/models/events_models/parquet_events.rs @@ -3,9 +3,10 @@ #![allow(clippy::extra_unused_lifetimes)] +use super::raw_events::{EventConvertible, RawEvent, EVENT_TYPE_MAX_LENGTH}; use crate::{ bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, - utils::util::{standardize_address, truncate_str}, + utils::util::truncate_str, }; use allocative_derive::Allocative; use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo}; @@ -13,12 +14,11 @@ use lazy_static::lazy_static; use parquet_derive::ParquetRecordWriter; use serde::{Deserialize, Serialize}; -use super::raw_events::{EventConvertible, RawEvent}; - -// p99 currently is 303 so using 300 as a safe max length -const EVENT_TYPE_MAX_LENGTH: usize = 300; const DEFAULT_CREATION_NUMBER: i64 = 0; const DEFAULT_SEQUENCE_NUMBER: i64 = 0; +// This is for future proofing. TODO: change when events v2 comes +const EVENT_VERSION: i8 = 1i8; + lazy_static! { pub static ref DEFAULT_ACCOUNT_ADDRESS: String = "NULL_ACCOUNT_ADDRESS".to_string(); pub static ref DEFAULT_EVENT_TYPE: String = "NULL_EVENT_TYPE".to_string(); @@ -68,24 +68,15 @@ impl Event { size_info: &EventSizeInfo, block_timestamp: chrono::NaiveDateTime, ) -> Self { - let event_type: &str = event.type_str.as_ref(); - Event { - account_address: standardize_address( - event.key.as_ref().unwrap().account_address.as_str(), - ), - creation_number: event.key.as_ref().unwrap().creation_number as i64, - sequence_number: event.sequence_number as i64, + let raw = RawEvent::from_raw_event( + event, txn_version, block_height, - event_type: event_type.to_string(), - data: event.data.clone(), event_index, - indexed_type: truncate_str(event_type, EVENT_TYPE_MAX_LENGTH), - type_tag_bytes: size_info.type_tag_bytes as i64, - total_bytes: size_info.total_bytes as i64, - event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes - block_timestamp, - } + Some(size_info), + Some(block_timestamp), + ); + Self::from_raw(&raw) } // This function is added to handle the txn with events filtered, but event_size_info is not filtered. @@ -108,7 +99,7 @@ impl Event { indexed_type: truncate_str(&DEFAULT_EVENT_TYPE, EVENT_TYPE_MAX_LENGTH), type_tag_bytes: size_info.type_tag_bytes as i64, total_bytes: size_info.total_bytes as i64, - event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes + event_version: EVENT_VERSION, block_timestamp, } } @@ -204,22 +195,23 @@ fn handle_user_txn_type( } impl EventConvertible for Event { - fn from_raw(raw_item: &RawEvent) -> Self { + fn from_raw(raw: &RawEvent) -> Self { Event { - sequence_number: raw_item.sequence_number, - creation_number: raw_item.creation_number, - account_address: raw_item.account_address.clone(), - txn_version: raw_item.txn_version, - block_height: raw_item.block_height, - event_type: raw_item.event_type.clone(), - data: raw_item.data.clone(), - event_index: raw_item.event_index, - indexed_type: truncate_str(&raw_item.event_type, EVENT_TYPE_MAX_LENGTH), - type_tag_bytes: raw_item.type_tag_bytes, - total_bytes: raw_item.total_bytes, - event_version: raw_item.event_version, - block_timestamp: raw_item.block_timestamp, + txn_version: raw.transaction_version, + account_address: raw.account_address.clone(), + sequence_number: raw.sequence_number, + creation_number: raw.creation_number, + block_height: raw.transaction_block_height, + event_type: raw.type_.clone(), + data: raw.data.clone(), + event_index: raw.event_index, + indexed_type: raw.indexed_type.clone(), + type_tag_bytes: raw.type_tag_bytes.unwrap_or(0), + total_bytes: raw.total_bytes.unwrap_or(0), + event_version: EVENT_VERSION, + block_timestamp: raw.block_timestamp.unwrap(), } } } + pub type ParquetEventModel = Event; diff --git a/rust/processor/src/db/postgres/models/events_models/raw_events.rs b/rust/processor/src/db/postgres/models/events_models/raw_events.rs index adaf44160..9ef0fdf30 100644 --- a/rust/processor/src/db/postgres/models/events_models/raw_events.rs +++ b/rust/processor/src/db/postgres/models/events_models/raw_events.rs @@ -1,22 +1,58 @@ +use crate::utils::util::{standardize_address, truncate_str}; +use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo}; use serde::{Deserialize, Serialize}; +/// P99 currently is 303 so using 300 as a safe max length +pub const EVENT_TYPE_MAX_LENGTH: usize = 300; + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct RawEvent { - pub txn_version: i64, - pub account_address: String, pub sequence_number: i64, pub creation_number: i64, - pub block_height: i64, - pub event_type: String, + pub account_address: String, + pub transaction_version: i64, + pub transaction_block_height: i64, + pub type_: String, pub data: String, pub event_index: i64, pub indexed_type: String, - pub type_tag_bytes: i64, - pub total_bytes: i64, - pub event_version: i8, - pub block_timestamp: chrono::NaiveDateTime, + pub block_timestamp: Option, + pub type_tag_bytes: Option, + pub total_bytes: Option, } pub trait EventConvertible { fn from_raw(raw_item: &RawEvent) -> Self; } + +impl RawEvent { + pub fn from_raw_event( + event: &EventPB, + txn_version: i64, + txn_block_height: i64, + event_index: i64, + size_info: Option<&EventSizeInfo>, + block_timestamp: Option, + ) -> RawEvent { + let type_tag_bytes = size_info.map_or(0, |info| info.type_tag_bytes as i64); + let total_bytes = size_info.map_or(0, |info| info.total_bytes as i64); + let event_type = event.type_str.to_string(); + + RawEvent { + sequence_number: event.sequence_number as i64, + creation_number: event.key.as_ref().unwrap().creation_number as i64, + account_address: standardize_address( + event.key.as_ref().unwrap().account_address.as_str(), + ), + transaction_version: txn_version, + transaction_block_height: txn_block_height, + type_: event_type.clone(), + data: event.data.clone(), + event_index, + indexed_type: truncate_str(&event_type, EVENT_TYPE_MAX_LENGTH), + block_timestamp, + type_tag_bytes: Some(type_tag_bytes), + total_bytes: Some(total_bytes), + } + } +} diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index e72a69a76..2c1cfe7da 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -111,6 +111,9 @@ bitflags! { // User transaction const SIGNATURES = 1 << 23; + + // Events + const EVENTS = 1 << 26; } } diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index 21e171715..07e7c3019 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -3,7 +3,10 @@ use super::{db_config::DbConfig, processor_config::ProcessorConfig}; use crate::{ - parquet_processors::parquet_default_processor::ParquetDefaultProcessor, + parquet_processors::{ + parquet_default_processor::ParquetDefaultProcessor, + parquet_events_processor::ParquetEventsProcessor, + }, processors::{ account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, default_processor::DefaultProcessor, events_processor::EventsProcessor, @@ -81,6 +84,10 @@ impl RunnableConfig for IndexerProcessorConfig { let parquet_default_processor = ParquetDefaultProcessor::new(self.clone()).await?; parquet_default_processor.run_processor().await }, + ProcessorConfig::ParquetEventsProcessor(_) => { + let parquet_events_processor = ParquetEventsProcessor::new(self.clone()).await?; + parquet_events_processor.run_processor().await + }, } } diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 57d1af2c8..53d71b55a 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -55,6 +55,7 @@ pub enum ProcessorConfig { MonitoringProcessor(DefaultProcessorConfig), // ParquetProcessor ParquetDefaultProcessor(ParquetDefaultProcessorConfig), + ParquetEventsProcessor(ParquetDefaultProcessorConfig), } impl ProcessorConfig { @@ -261,9 +262,10 @@ mod tests { assert!(result.is_ok()); let table_names = result.unwrap(); - assert_eq!(table_names, vec![ - "parquet_default_processor.Transaction".to_string(), - ]); + assert_eq!( + table_names, + vec!["parquet_default_processor.Transaction".to_string(),] + ); } #[test] diff --git a/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs index b8e6ae7f5..f061b0107 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs @@ -9,7 +9,7 @@ use crate::{ gcs_uploader::{create_new_writer, GCSUploader}, parquet_buffer_step::ParquetBufferStep, }, - parquet_default_processor::parquet_default_extractor::ParquetDefaultExtractor, + parquet_events_processor::parquet_events_extractor::ParquetEventsExtractor, }, utils::{ chain_id::check_or_update_chain_id, @@ -28,28 +28,19 @@ use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClien use parquet::schema::types::Type; use processor::{ bq_analytics::generic_parquet_processor::HasParquetSchema, - db::{ - parquet::models::default_models::{ - parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, - parquet_move_tables::TableItem, - parquet_transactions::Transaction as ParquetTransaction, - parquet_write_set_changes::WriteSetChangeModel, - }, - postgres::models::events_models::events::Event, - }, - worker::TableFlags, + db::postgres::models::events_models::parquet_events::Event as EventPQ, worker::TableFlags, }; use std::{collections::HashMap, sync::Arc, time::Duration}; use tracing::{debug, info}; const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; -pub struct ParquetDefaultProcessor { +pub struct ParquetEventsProcessor { pub config: IndexerProcessorConfig, pub db_pool: ArcDbPool, // for processor status } -impl ParquetDefaultProcessor { +impl ParquetEventsProcessor { pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { match config.db_config { DbConfig::PostgresConfig(ref postgres_config) => { @@ -75,7 +66,7 @@ impl ParquetDefaultProcessor { } #[async_trait::async_trait] -impl ProcessorTrait for ParquetDefaultProcessor { +impl ProcessorTrait for ParquetEventsProcessor { fn name(&self) -> &'static str { self.config.processor_config.name() } @@ -119,12 +110,12 @@ impl ProcessorTrait for ParquetDefaultProcessor { check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; let parquet_processor_config = match self.config.processor_config.clone() { - ProcessorConfig::ParquetDefaultProcessor(parquet_processor_config) => { + ProcessorConfig::ParquetEventsProcessor(parquet_processor_config) => { parquet_processor_config }, _ => { return Err(anyhow::anyhow!( - "Invalid processor configuration for ParquetDefaultProcessor {:?}", + "Invalid processor configuration for ParquetEventsProcessor {:?}", self.config.processor_config )); }, @@ -137,7 +128,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { }) .await?; - let parquet_default_extractor = ParquetDefaultExtractor { + let parquet_events_extractor = ParquetEventsExtractor { opt_in_tables: TableFlags::empty(), }; @@ -157,7 +148,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { let gcs_client = Arc::new(GCSClient::new(gcs_config)); let parquet_type_to_schemas: HashMap> = - [(ParquetTypeEnum::Event, Event::schema())] + [(ParquetTypeEnum::Event, EventPQ::schema())] .into_iter() .collect(); @@ -190,7 +181,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( transaction_stream.into_runnable_step(), ) - .connect_to(parquet_default_extractor.into_runnable_step(), channel_size) + .connect_to(parquet_events_extractor.into_runnable_step(), channel_size) .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) .end_and_return_output_receiver(channel_size); diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index f41861537..8fd038d8f 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -6,6 +6,7 @@ pub mod events_processor; pub mod fungible_asset_processor; pub mod objects_processor; pub mod parquet_default_processor; +pub mod parquet_events_processor; pub mod stake_processor; pub mod token_v2_processor; pub mod user_transaction_processor; diff --git a/rust/sdk-processor/src/steps/parquet_events_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_events_processor/mod.rs index e69de29bb..83980cef2 100644 --- a/rust/sdk-processor/src/steps/parquet_events_processor/mod.rs +++ b/rust/sdk-processor/src/steps/parquet_events_processor/mod.rs @@ -0,0 +1 @@ +pub mod parquet_events_extractor; diff --git a/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs b/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs index e66aea86a..fcee26a61 100644 --- a/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs +++ b/rust/sdk-processor/src/steps/parquet_events_processor/parquet_events_extractor.rs @@ -10,15 +10,10 @@ use aptos_indexer_processor_sdk::{ }; use async_trait::async_trait; use processor::{ - db::{ - common::models::default_models::raw_table_items::TableItemConvertible, - parquet::models::default_models::parquet_move_tables::TableItem, - }, processors::parquet_processors::parquet_events_processor::process_transactions_parquet, worker::TableFlags, }; use std::collections::HashMap; -use tracing::debug; /// Extracts parquet data from transactions, allowing optional selection of specific tables. pub struct ParquetEventsExtractor @@ -40,20 +35,15 @@ impl Processable for ParquetEventsExtractor { &mut self, transactions: TransactionContext, ) -> anyhow::Result>, ProcessorError> { - let (_, raw_table_items, _, _) = process_transactions(transactions.data.clone()); - - let parquet_table_items: Vec = - raw_table_items.iter().map(TableItem::from_raw).collect(); - - let (hmap, events) = process_transactions_parquet(transactions.data); + let (_txn_ver_map, events) = process_transactions_parquet(transactions.data); let mut map: HashMap = HashMap::new(); // Array of tuples for each data type and its corresponding enum variant and flag let data_types = [( - TableFlags::MOVE_RESOURCES, - ParquetTypeEnum::MoveResource, - ParquetTypeStructs::MoveResource(move_resources), + TableFlags::EVENTS, + ParquetTypeEnum::Event, + ParquetTypeStructs::Event(events), )]; // Populate the map based on opt-in tables From d2cbef8b8bbd99c546f10b6929b6e9f1b3eeee0f Mon Sep 17 00:00:00 2001 From: dermanyang Date: Mon, 2 Dec 2024 13:03:08 -0800 Subject: [PATCH 3/8] update parquet type struct --- rust/sdk-processor/src/config/processor_config.rs | 3 ++- rust/sdk-processor/src/parquet_processors/mod.rs | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 68c42d9f2..cc08da329 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -79,7 +79,8 @@ impl ProcessorConfig { /// is useful for querying the status from the processor status table in the database. pub fn get_processor_status_table_names(&self) -> anyhow::Result> { match self { - ProcessorConfig::ParquetDefaultProcessor(config) => { + ProcessorConfig::ParquetDefaultProcessor(config) + | ProcessorConfig::ParquetEventsProcessor(config) => { // Get the processor name as a prefix let processor_name = self.name(); diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 39564bcad..f311a0a9e 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -198,6 +198,9 @@ impl ParquetTypeStructs { ) => { handle_append!(self_data, other_data) }, + (ParquetTypeStructs::Event(self_data), ParquetTypeStructs::Event(other_data)) => { + handle_append!(self_data, other_data) + }, _ => Err(ProcessorError::ProcessError { message: "Mismatched buffer types in append operation".to_string(), }), From c695fd0470d68024f5c5c7a18bee3eac5a93a2c5 Mon Sep 17 00:00:00 2001 From: dermanyang Date: Wed, 4 Dec 2024 16:11:42 -0800 Subject: [PATCH 4/8] Refactor models to more specific directories --- rust/processor/src/db/common/models/event_models/mod.rs | 1 + .../models/event_models}/raw_events.rs | 0 rust/processor/src/db/common/models/mod.rs | 1 + rust/processor/src/db/parquet/models/event_models/mod.rs | 1 + .../models/event_models}/parquet_events.rs | 5 +++-- rust/processor/src/db/parquet/models/mod.rs | 1 + .../processor/src/db/postgres/models/events_models/events.rs | 2 +- rust/processor/src/db/postgres/models/events_models/mod.rs | 2 -- .../parquet_processors/parquet_events_processor.rs | 2 +- rust/sdk-processor/src/parquet_processors/mod.rs | 2 +- .../src/parquet_processors/parquet_events_processor.rs | 2 +- 11 files changed, 11 insertions(+), 8 deletions(-) create mode 100644 rust/processor/src/db/common/models/event_models/mod.rs rename rust/processor/src/db/{postgres/models/events_models => common/models/event_models}/raw_events.rs (100%) create mode 100644 rust/processor/src/db/parquet/models/event_models/mod.rs rename rust/processor/src/db/{postgres/models/events_models => parquet/models/event_models}/parquet_events.rs (98%) diff --git a/rust/processor/src/db/common/models/event_models/mod.rs b/rust/processor/src/db/common/models/event_models/mod.rs new file mode 100644 index 000000000..6a11811c2 --- /dev/null +++ b/rust/processor/src/db/common/models/event_models/mod.rs @@ -0,0 +1 @@ +pub mod raw_events; diff --git a/rust/processor/src/db/postgres/models/events_models/raw_events.rs b/rust/processor/src/db/common/models/event_models/raw_events.rs similarity index 100% rename from rust/processor/src/db/postgres/models/events_models/raw_events.rs rename to rust/processor/src/db/common/models/event_models/raw_events.rs diff --git a/rust/processor/src/db/common/models/mod.rs b/rust/processor/src/db/common/models/mod.rs index 16e0f058f..ff5758324 100644 --- a/rust/processor/src/db/common/models/mod.rs +++ b/rust/processor/src/db/common/models/mod.rs @@ -1 +1,2 @@ pub mod default_models; +pub mod event_models; diff --git a/rust/processor/src/db/parquet/models/event_models/mod.rs b/rust/processor/src/db/parquet/models/event_models/mod.rs new file mode 100644 index 000000000..7d33874ce --- /dev/null +++ b/rust/processor/src/db/parquet/models/event_models/mod.rs @@ -0,0 +1 @@ +pub mod parquet_events; diff --git a/rust/processor/src/db/postgres/models/events_models/parquet_events.rs b/rust/processor/src/db/parquet/models/event_models/parquet_events.rs similarity index 98% rename from rust/processor/src/db/postgres/models/events_models/parquet_events.rs rename to rust/processor/src/db/parquet/models/event_models/parquet_events.rs index 5b52ddfcc..df78889e5 100644 --- a/rust/processor/src/db/postgres/models/events_models/parquet_events.rs +++ b/rust/processor/src/db/parquet/models/event_models/parquet_events.rs @@ -2,8 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 #![allow(clippy::extra_unused_lifetimes)] - -use super::raw_events::{EventConvertible, RawEvent, EVENT_TYPE_MAX_LENGTH}; +use crate::db::common::models::event_models::raw_events::{ + EventConvertible, RawEvent, EVENT_TYPE_MAX_LENGTH, +}; use crate::{ bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, utils::util::truncate_str, diff --git a/rust/processor/src/db/parquet/models/mod.rs b/rust/processor/src/db/parquet/models/mod.rs index 16e0f058f..ff5758324 100644 --- a/rust/processor/src/db/parquet/models/mod.rs +++ b/rust/processor/src/db/parquet/models/mod.rs @@ -1 +1,2 @@ pub mod default_models; +pub mod event_models; diff --git a/rust/processor/src/db/postgres/models/events_models/events.rs b/rust/processor/src/db/postgres/models/events_models/events.rs index 6d472e341..35e1d9cc4 100644 --- a/rust/processor/src/db/postgres/models/events_models/events.rs +++ b/rust/processor/src/db/postgres/models/events_models/events.rs @@ -8,7 +8,7 @@ use aptos_protos::transaction::v1::Event as EventPB; use field_count::FieldCount; use serde::{Deserialize, Serialize}; -use super::raw_events::{EventConvertible, RawEvent}; +use crate::db::common::models::event_models::raw_events::{EventConvertible, RawEvent}; #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(transaction_version, event_index))] diff --git a/rust/processor/src/db/postgres/models/events_models/mod.rs b/rust/processor/src/db/postgres/models/events_models/mod.rs index cc40b5cef..9d363699e 100644 --- a/rust/processor/src/db/postgres/models/events_models/mod.rs +++ b/rust/processor/src/db/postgres/models/events_models/mod.rs @@ -2,5 +2,3 @@ // SPDX-License-Identifier: Apache-2.0 pub mod events; -pub mod parquet_events; -pub mod raw_events; diff --git a/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs index 8a9503053..3c6b93409 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs @@ -6,7 +6,7 @@ use crate::{ create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric, ParquetProcessingResult, }, - db::postgres::models::events_models::parquet_events::{Event, ParquetEventModel}, + db::parquet::models::event_models::parquet_events::{Event, ParquetEventModel}, gap_detectors::ProcessingResult, processors::{parquet_processors::ParquetProcessorTrait, ProcessorName, ProcessorTrait}, utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, database::ArcDbPool, util::parse_timestamp}, diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index f311a0a9e..717cbeea5 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -11,7 +11,7 @@ use async_trait::async_trait; use enum_dispatch::enum_dispatch; use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; use parquet::schema::types::Type; -use processor::db::postgres::models::events_models::parquet_events::Event; +use processor::db::parquet::models::event_models::parquet_events::Event; use processor::{ db::parquet::models::default_models::{ parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, diff --git a/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs index bb8ff9db4..992286f12 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs @@ -30,7 +30,7 @@ use aptos_indexer_processor_sdk::{ use parquet::schema::types::Type; use processor::{ bq_analytics::generic_parquet_processor::HasParquetSchema, - db::postgres::models::events_models::parquet_events::Event as EventPQ, + db::parquet::models::event_models::parquet_events::Event as EventPQ, }; use std::{collections::HashMap, sync::Arc}; use tracing::{debug, info}; From 4f2983a6c82143feb155cb3faf7b698b94cf7494 Mon Sep 17 00:00:00 2001 From: dermanyang Date: Wed, 4 Dec 2024 16:16:48 -0800 Subject: [PATCH 5/8] Add eventsPQ to name switch statement --- rust/sdk-processor/src/config/processor_config.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index cc08da329..c323de59b 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -8,10 +8,13 @@ use crate::{ use ahash::AHashMap; use processor::{ bq_analytics::generic_parquet_processor::NamedTable, - db::parquet::models::default_models::{ - parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, - parquet_move_tables::TableItem, parquet_transactions::Transaction, - parquet_write_set_changes::WriteSetChangeModel, + db::parquet::models::{ + default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, parquet_transactions::Transaction, + parquet_write_set_changes::WriteSetChangeModel, + }, + event_models::parquet_events::Event as EventPQ, }, }; use serde::{Deserialize, Serialize}; @@ -115,6 +118,7 @@ impl ProcessorConfig { WriteSetChangeModel::TABLE_NAME.to_string(), TableItem::TABLE_NAME.to_string(), MoveModule::TABLE_NAME.to_string(), + EventPQ::TABLE_NAME.to_string(), ]), _ => HashSet::new(), // Default case for unsupported processors } From 2d4bdc477a4b625730ceaa0153572cc81947252f Mon Sep 17 00:00:00 2001 From: dermanyang Date: Wed, 4 Dec 2024 17:02:05 -0800 Subject: [PATCH 6/8] Formatting --- .../parquet/models/event_models/parquet_events.rs | 6 +++--- .../src/db/postgres/models/events_models/events.rs | 7 ++++--- rust/sdk-processor/src/parquet_processors/mod.rs | 14 ++++++++------ .../parquet_user_transaction_processor.rs | 0 4 files changed, 15 insertions(+), 12 deletions(-) create mode 100644 rust/sdk-processor/src/parquet_processors/parquet_user_transaction_processor.rs diff --git a/rust/processor/src/db/parquet/models/event_models/parquet_events.rs b/rust/processor/src/db/parquet/models/event_models/parquet_events.rs index df78889e5..14953890f 100644 --- a/rust/processor/src/db/parquet/models/event_models/parquet_events.rs +++ b/rust/processor/src/db/parquet/models/event_models/parquet_events.rs @@ -2,11 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 #![allow(clippy::extra_unused_lifetimes)] -use crate::db::common::models::event_models::raw_events::{ - EventConvertible, RawEvent, EVENT_TYPE_MAX_LENGTH, -}; use crate::{ bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + db::common::models::event_models::raw_events::{ + EventConvertible, RawEvent, EVENT_TYPE_MAX_LENGTH, + }, utils::util::truncate_str, }; use allocative_derive::Allocative; diff --git a/rust/processor/src/db/postgres/models/events_models/events.rs b/rust/processor/src/db/postgres/models/events_models/events.rs index 35e1d9cc4..d1e010484 100644 --- a/rust/processor/src/db/postgres/models/events_models/events.rs +++ b/rust/processor/src/db/postgres/models/events_models/events.rs @@ -3,13 +3,14 @@ #![allow(clippy::extra_unused_lifetimes)] -use crate::schema::events; +use crate::{ + db::common::models::event_models::raw_events::{EventConvertible, RawEvent}, + schema::events, +}; use aptos_protos::transaction::v1::Event as EventPB; use field_count::FieldCount; use serde::{Deserialize, Serialize}; -use crate::db::common::models::event_models::raw_events::{EventConvertible, RawEvent}; - #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(transaction_version, event_index))] #[diesel(table_name = events)] diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 717cbeea5..731f2abf9 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -11,16 +11,18 @@ use async_trait::async_trait; use enum_dispatch::enum_dispatch; use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; use parquet::schema::types::Type; -use processor::db::parquet::models::event_models::parquet_events::Event; use processor::{ - db::parquet::models::default_models::{ - parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, - parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, - parquet_write_set_changes::WriteSetChangeModel, + db::parquet::models::{ + default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, + parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, + }, + event_models::parquet_events::Event, }, worker::TableFlags, }; - #[allow(unused_imports)] use serde::{Deserialize, Serialize}; use std::{ diff --git a/rust/sdk-processor/src/parquet_processors/parquet_user_transaction_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_user_transaction_processor.rs new file mode 100644 index 000000000..e69de29bb From 23301eb38ca9999f9c3c2c701805097202c40bdb Mon Sep 17 00:00:00 2001 From: dermanyang Date: Thu, 5 Dec 2024 15:00:54 -0800 Subject: [PATCH 7/8] Merge main + formatting --- rust/processor/src/worker.rs | 2 +- .../src/config/processor_config.rs | 15 +++---- .../src/parquet_processors/mod.rs | 45 ++++--------------- .../src/steps/common/parquet_buffer_step.rs | 1 + 4 files changed, 16 insertions(+), 47 deletions(-) diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index c51abf3d3..b1fb6d5b1 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -114,7 +114,7 @@ bitflags! { // More tables const CURRENT_TABLE_ITEMS = 1 << 24; const BLOCK_METADATA_TRANSACTIONS = 1 << 25; - + // Events const EVENTS = 1 << 24; } diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index fc24dc397..3da369030 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -10,19 +10,14 @@ use processor::{ bq_analytics::generic_parquet_processor::NamedTable, db::parquet::models::{ default_models::{ - parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, - parquet_move_tables::TableItem, parquet_transactions::Transaction, + parquet_block_metadata_transactions::BlockMetadataTransaction, + parquet_move_modules::MoveModule, + parquet_move_resources::MoveResource, + parquet_move_tables::{CurrentTableItem, TableItem, TableMetadata}, + parquet_transactions::Transaction, parquet_write_set_changes::WriteSetChangeModel, }, event_models::parquet_events::Event as EventPQ, - db::parquet::models::default_models::{ - parquet_block_metadata_transactions::BlockMetadataTransaction, - parquet_move_modules::MoveModule, - parquet_move_resources::MoveResource, - parquet_move_tables::{CurrentTableItem, TableItem, TableMetadata}, - parquet_transactions::Transaction, - parquet_write_set_changes::WriteSetChangeModel, - event_models::parquet_events::Event as EventPQ, }, }; use serde::{Deserialize, Serialize}; diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 6c94d9dc4..31aa229b2 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -15,14 +15,15 @@ use processor::{ db::parquet::models::{ default_models::{ parquet_block_metadata_transactions::BlockMetadataTransaction, - parquet_move_modules::MoveModule, - parquet_move_resources::MoveResource, - parquet_move_tables::{CurrentTableItem, TableItem}, - parquet_table_metadata::TableMetadata, - parquet_transactions::Transaction as ParquetTransaction, - parquet_write_set_changes::WriteSetChangeModel, + parquet_move_modules::MoveModule, + parquet_move_resources::MoveResource, + parquet_move_tables::{CurrentTableItem, TableItem}, + parquet_table_metadata::TableMetadata, + parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, }, event_models::parquet_events::Event, + }, worker::TableFlags, }; #[allow(unused_imports)] @@ -59,11 +60,6 @@ const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; ) )] pub enum ParquetTypeEnum { - MoveResource, - WriteSetChange, - Transaction, - TableItem, - MoveModule, Event, MoveResources, WriteSetChanges, @@ -73,7 +69,6 @@ pub enum ParquetTypeEnum { CurrentTableItems, BlockMetadataTransactions, TableMetadata, - Event, } /// Trait for handling various Parquet types. @@ -131,7 +126,7 @@ impl_parquet_trait!( ); impl_parquet_trait!(TableMetadata, ParquetTypeEnum::TableMetadata); impl_parquet_trait!(Event, ParquetTypeEnum::Event); - + #[derive(Debug, Clone)] #[enum_dispatch(ParquetTypeTrait)] pub enum ParquetTypeStructs { @@ -163,28 +158,6 @@ impl ParquetTypeStructs { } } - pub fn get_table_name(&self) -> &'static str { - match self { - ParquetTypeStructs::MoveResource(_) => "move_resources", - ParquetTypeStructs::WriteSetChange(_) => "write_set_changes", - ParquetTypeStructs::Transaction(_) => "transactions", - ParquetTypeStructs::TableItem(_) => "table_items", - ParquetTypeStructs::MoveModule(_) => "move_modules", - ParquetTypeStructs::Event(_) => "events", - } - } - - pub fn calculate_size(&self) -> usize { - match self { - ParquetTypeStructs::MoveResource(data) => allocative::size_of_unique(data), - ParquetTypeStructs::WriteSetChange(data) => allocative::size_of_unique(data), - ParquetTypeStructs::Transaction(data) => allocative::size_of_unique(data), - ParquetTypeStructs::TableItem(data) => allocative::size_of_unique(data), - ParquetTypeStructs::MoveModule(data) => allocative::size_of_unique(data), - ParquetTypeStructs::Event(data) => allocative::size_of_unique(data), - } - } - /// Appends data to the current buffer within each ParquetTypeStructs variant. pub fn append(&mut self, other: ParquetTypeStructs) -> Result<(), ProcessorError> { macro_rules! handle_append { @@ -226,7 +199,7 @@ impl ParquetTypeStructs { handle_append!(self_data, other_data) }, (ParquetTypeStructs::Event(self_data), ParquetTypeStructs::Event(other_data)) => { - handle_append!(self_data, other_data) + handle_append!(self_data, other_data) }, ( ParquetTypeStructs::CurrentTableItem(self_data), diff --git a/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs b/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs index b5f004131..9fd054f47 100644 --- a/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs +++ b/rust/sdk-processor/src/steps/common/parquet_buffer_step.rs @@ -1,3 +1,4 @@ +use crate::parquet_processors::ParquetTypeTrait; #[allow(unused_imports)] use crate::{ parquet_processors::{ParquetTypeEnum, ParquetTypeStructs}, From f584bfd20308d366163146c44778473a27ec977c Mon Sep 17 00:00:00 2001 From: dermanyang Date: Thu, 5 Dec 2024 15:04:45 -0800 Subject: [PATCH 8/8] start events flags at 30 to avoid conflicts --- rust/processor/src/worker.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index b1fb6d5b1..a812c4af8 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -111,12 +111,13 @@ bitflags! { // User transaction const SIGNATURES = 1 << 23; + // More tables const CURRENT_TABLE_ITEMS = 1 << 24; const BLOCK_METADATA_TRANSACTIONS = 1 << 25; // Events - const EVENTS = 1 << 24; + const EVENTS = 1 << 30; // start at 30 to avoid conflicts with other flags. } }