Skip to content

Commit

Permalink
Migrate parquet txn metadata processor to sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Dec 10, 2024
1 parent e1a76bf commit e1b7479
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 29 deletions.
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod ans_models;
pub mod default_models;
pub mod event_models;
pub mod fungible_asset_models;
pub mod transaction_metadata_model;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod parquet_write_set_size_info;
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,3 @@
pub mod event_size_info;
pub mod transaction_size_info;
pub mod write_set_size_info;

// parquet models
pub mod parquet_write_set_size_info;
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric,
ParquetProcessingResult,
},
db::postgres::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
db::parquet::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
gap_detectors::ProcessingResult,
processors::{parquet_processors::ParquetProcessorTrait, ProcessorName, ProcessorTrait},
utils::{database::ArcDbPool, util::parse_timestamp},
Expand Down Expand Up @@ -92,31 +92,8 @@ impl ProcessorTrait for ParquetTransactionMetadataProcessor {
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();
let mut transaction_version_to_struct_count: AHashMap<i64, i64> = AHashMap::new();

let mut write_set_sizes = vec![];

for txn in &transactions {
let txn_version = txn.version as i64;
let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version);
let size_info = match txn.size_info.as_ref() {
Some(size_info) => size_info,
None => {
warn!(version = txn.version, "Transaction size info not found");
continue;
},
};
for (index, write_set_size_info) in size_info.write_op_size_info.iter().enumerate() {
write_set_sizes.push(WriteSetSize::from_transaction_info(
write_set_size_info,
txn_version,
index as i64,
block_timestamp,
));
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += 1)
.or_insert(1);
}
}
let write_set_sizes =
process_transaction(transactions, &mut transaction_version_to_struct_count);

Check warning on line 96 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L95-L96

Added lines #L95 - L96 were not covered by tests

let write_set_size_info_parquet_data = ParquetDataGeneric {
data: write_set_sizes,
Expand All @@ -143,3 +120,35 @@ impl ProcessorTrait for ParquetTransactionMetadataProcessor {
&self.connection_pool
}
}

pub fn process_transaction(
transactions: Vec<Transaction>,
transaction_version_to_struct_count: &mut AHashMap<i64, i64>,
) -> Vec<WriteSetSize> {
let mut write_set_sizes = vec![];

Check warning on line 128 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L124-L128

Added lines #L124 - L128 were not covered by tests

for txn in transactions {
let txn_version = txn.version as i64;
let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version);
let size_info = match txn.size_info.as_ref() {
Some(size_info) => size_info,

Check warning on line 134 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L130-L134

Added lines #L130 - L134 were not covered by tests
None => {
warn!(version = txn.version, "Transaction size info not found");
continue;

Check warning on line 137 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L136-L137

Added lines #L136 - L137 were not covered by tests
},
};
for (index, write_set_size_info) in size_info.write_op_size_info.iter().enumerate() {
write_set_sizes.push(WriteSetSize::from_transaction_info(
write_set_size_info,
txn_version,
index as i64,
block_timestamp,
));
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += 1)
.or_insert(1);
}

Check warning on line 151 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L140-L151

Added lines #L140 - L151 were not covered by tests
}
write_set_sizes
}

Check warning on line 154 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L153-L154

Added lines #L153 - L154 were not covered by tests
6 changes: 6 additions & 0 deletions rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
parquet_default_processor::ParquetDefaultProcessor,
parquet_events_processor::ParquetEventsProcessor,
parquet_fungible_asset_processor::ParquetFungibleAssetProcessor,
parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor,
},
processors::{
account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor,
Expand Down Expand Up @@ -94,6 +95,11 @@ impl RunnableConfig for IndexerProcessorConfig {
ParquetFungibleAssetProcessor::new(self.clone()).await?;
parquet_fungible_asset_processor.run_processor().await
},
ProcessorConfig::ParquetTransactionMetadataProcessor(_) => {
let parquet_transaction_metadata_processor =
ParquetTransactionMetadataProcessor::new(self.clone()).await?;
parquet_transaction_metadata_processor.run_processor().await

Check warning on line 101 in rust/sdk-processor/src/config/indexer_processor_config.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/config/indexer_processor_config.rs#L99-L101

Added lines #L99 - L101 were not covered by tests
},
}
}

Expand Down
6 changes: 6 additions & 0 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use processor::{
},
parquet_v2_fungible_metadata::FungibleAssetMetadataModel,
},
transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
},
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -79,6 +80,7 @@ pub enum ProcessorConfig {
ParquetDefaultProcessor(ParquetDefaultProcessorConfig),
ParquetEventsProcessor(ParquetDefaultProcessorConfig),
ParquetFungibleAssetProcessor(ParquetDefaultProcessorConfig),
ParquetTransactionMetadataProcessor(ParquetDefaultProcessorConfig),
}

impl ProcessorConfig {
Expand All @@ -96,6 +98,7 @@ impl ProcessorConfig {
match self {
ProcessorConfig::ParquetDefaultProcessor(config)
| ProcessorConfig::ParquetEventsProcessor(config)
| ProcessorConfig::ParquetTransactionMetadataProcessor(config)

Check warning on line 101 in rust/sdk-processor/src/config/processor_config.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/config/processor_config.rs#L101

Added line #L101 was not covered by tests
| ProcessorConfig::ParquetFungibleAssetProcessor(config) => {
// Get the processor name as a prefix
let processor_name = self.name();
Expand Down Expand Up @@ -145,6 +148,9 @@ impl ProcessorConfig {
CurrentUnifiedFungibleAssetBalance::TABLE_NAME.to_string(),
FungibleAssetMetadataModel::TABLE_NAME.to_string(),
]),
ProcessorName::ParquetTransactionMetadataProcessor => {
HashSet::from([WriteSetSize::TABLE_NAME.to_string()])
},
_ => HashSet::new(), // Default case for unsupported processors
}
}
Expand Down
14 changes: 14 additions & 0 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use processor::{
},
parquet_v2_fungible_metadata::FungibleAssetMetadataModel,
},
transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
},

postgres::models::ans_models::parquet_ans_lookup_v2::AnsPrimaryNameV2,
},
utils::table_flags::TableFlags,
Expand All @@ -50,6 +52,7 @@ pub mod parquet_ans_processor;
pub mod parquet_default_processor;
pub mod parquet_events_processor;
pub mod parquet_fungible_asset_processor;
pub mod parquet_transaction_metadata_processor;

const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

Expand Down Expand Up @@ -92,6 +95,8 @@ pub enum ParquetTypeEnum {
FungibleAssetBalances,
CurrentFungibleAssetBalances,
CurrentUnifiedFungibleAssetBalances,
// txn metadata,
WriteSetSize,
}

/// Trait for handling various Parquet types.
Expand Down Expand Up @@ -167,6 +172,7 @@ impl_parquet_trait!(
CurrentUnifiedFungibleAssetBalance,
ParquetTypeEnum::CurrentUnifiedFungibleAssetBalances
);
impl_parquet_trait!(WriteSetSize, ParquetTypeEnum::WriteSetSize);

#[derive(Debug, Clone)]
#[enum_dispatch(ParquetTypeTrait)]
Expand All @@ -186,6 +192,7 @@ pub enum ParquetTypeStructs {
FungibleAssetBalance(Vec<FungibleAssetBalance>),
CurrentFungibleAssetBalance(Vec<CurrentFungibleAssetBalance>),
CurrentUnifiedFungibleAssetBalance(Vec<CurrentUnifiedFungibleAssetBalance>),
WriteSetSize(Vec<WriteSetSize>),
}

impl ParquetTypeStructs {
Expand Down Expand Up @@ -218,6 +225,7 @@ impl ParquetTypeStructs {
ParquetTypeEnum::CurrentUnifiedFungibleAssetBalances => {
ParquetTypeStructs::CurrentUnifiedFungibleAssetBalance(Vec::new())
},
ParquetTypeEnum::WriteSetSize => ParquetTypeStructs::WriteSetSize(Vec::new()),

Check warning on line 228 in rust/sdk-processor/src/parquet_processors/mod.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/mod.rs#L228

Added line #L228 was not covered by tests
}
}

Expand Down Expand Up @@ -318,6 +326,12 @@ impl ParquetTypeStructs {
) => {
handle_append!(self_data, other_data)
},
(
ParquetTypeStructs::WriteSetSize(self_data),
ParquetTypeStructs::WriteSetSize(other_data),
) => {
handle_append!(self_data, other_data)

Check warning on line 333 in rust/sdk-processor/src/parquet_processors/mod.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/mod.rs#L330-L333

Added lines #L330 - L333 were not covered by tests
},
_ => Err(ProcessorError::ProcessError {
message: "Mismatched buffer types in append operation".to_string(),
}),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use crate::{
config::{
db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig,
processor_config::ProcessorConfig,
},
parquet_processors::{
initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step,
set_backfill_table_flag, ParquetTypeEnum,
},
steps::{
common::{
parquet_version_tracker_step::ParquetVersionTrackerStep,
processor_status_saver::get_processor_status_saver,
},
parquet_transaction_metadata_processor::parquet_transaction_metadata_extractor::ParquetTransactionMetadataExtractor,
},
utils::{
chain_id::check_or_update_chain_id,
database::{run_migrations, ArcDbPool},
starting_version::get_min_last_success_version_parquet,
},
};
use anyhow::Context;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS},
traits::{processor_trait::ProcessorTrait, IntoRunnableStep},
};
use parquet::schema::types::Type;
use processor::{
bq_analytics::generic_parquet_processor::HasParquetSchema,
db::parquet::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
};
use std::{collections::HashMap, sync::Arc};
use tracing::{debug, info};

pub struct ParquetTransactionMetadataProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool,
}

impl ParquetTransactionMetadataProcessor {
pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result<Self> {
let db_pool = initialize_database_pool(&config.db_config).await?;
Ok(Self { config, db_pool })
}

Check warning on line 47 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L44-L47

Added lines #L44 - L47 were not covered by tests
}

#[async_trait::async_trait]
impl ProcessorTrait for ParquetTransactionMetadataProcessor {
fn name(&self) -> &'static str {
self.config.processor_config.name()
}

Check warning on line 54 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L52-L54

Added lines #L52 - L54 were not covered by tests

async fn run_processor(&self) -> anyhow::Result<()> {

Check warning on line 56 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L56

Added line #L56 was not covered by tests
// Run Migrations
let parquet_db_config = match self.config.db_config {
DbConfig::ParquetConfig(ref parquet_config) => {
run_migrations(
parquet_config.connection_string.clone(),
self.db_pool.clone(),
)
.await;
parquet_config

Check warning on line 65 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L58-L65

Added lines #L58 - L65 were not covered by tests
},
_ => {
return Err(anyhow::anyhow!(
"Invalid db config for ParquetTransactionMetadataProcessor {:?}",
self.config.db_config
));

Check warning on line 71 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L68-L71

Added lines #L68 - L71 were not covered by tests
},
};

// Check and update the ledger chain id to ensure we're indexing the correct chain
let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone())
.await?
.get_chain_id()
.await?;
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;

Check warning on line 80 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L76-L80

Added lines #L76 - L80 were not covered by tests

let parquet_processor_config = match self.config.processor_config.clone() {
ProcessorConfig::ParquetTransactionMetadataProcessor(parquet_processor_config) => {
parquet_processor_config

Check warning on line 84 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L82-L84

Added lines #L82 - L84 were not covered by tests
},
_ => {
return Err(anyhow::anyhow!(
"Invalid processor configuration for ParquetTransactionMetadataProcessor {:?}",
self.config.processor_config
));

Check warning on line 90 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L87-L90

Added lines #L87 - L90 were not covered by tests
},
};

let processor_status_table_names = self
.config
.processor_config
.get_processor_status_table_names()
.context("Failed to get table names for the processor status table")?;

Check warning on line 98 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L94-L98

Added lines #L94 - L98 were not covered by tests

let starting_version = get_min_last_success_version_parquet(
&self.config,
self.db_pool.clone(),
processor_status_table_names,
)
.await?;
println!("Starting version: {:?}", starting_version);

Check warning on line 106 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L100-L106

Added lines #L100 - L106 were not covered by tests

// Define processor transaction stream config
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;

Check warning on line 113 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L109-L113

Added lines #L109 - L113 were not covered by tests

let backfill_table = set_backfill_table_flag(parquet_processor_config.backfill_table);
let parquet_txn_metadata_extractor = ParquetTransactionMetadataExtractor {
opt_in_tables: backfill_table,
};

Check warning on line 118 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L115-L118

Added lines #L115 - L118 were not covered by tests

let gcs_client =
initialize_gcs_client(parquet_db_config.google_application_credentials.clone()).await;

Check warning on line 121 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L120-L121

Added lines #L120 - L121 were not covered by tests

let parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>> =
[(ParquetTypeEnum::WriteSetSize, WriteSetSize::schema())]
.into_iter()
.collect();

Check warning on line 126 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L123-L126

Added lines #L123 - L126 were not covered by tests

let default_size_buffer_step = initialize_parquet_buffer_step(
gcs_client.clone(),
parquet_type_to_schemas,
parquet_processor_config.upload_interval,
parquet_processor_config.max_buffer_size,
parquet_db_config.bucket_name.clone(),
parquet_db_config.bucket_root.clone(),
self.name().to_string(),
)
.await
.unwrap_or_else(|e| {
panic!("Failed to initialize parquet buffer step: {:?}", e);
});

let parquet_version_tracker_step = ParquetVersionTrackerStep::new(
get_processor_status_saver(self.db_pool.clone(), self.config.clone()),
DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
);

let channel_size = parquet_processor_config.channel_size;

// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(
parquet_txn_metadata_extractor.into_runnable_step(),
channel_size,
)
.connect_to(default_size_buffer_step.into_runnable_step(), channel_size)
.connect_to(
parquet_version_tracker_step.into_runnable_step(),
channel_size,
)
.end_and_return_output_receiver(channel_size);

Check warning on line 162 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L128-L162

Added lines #L128 - L162 were not covered by tests

loop {
match buffer_receiver.recv().await {
Ok(txn_context) => {
debug!(
"Finished processing versions [{:?}, {:?}]",

Check warning on line 168 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L165-L168

Added lines #L165 - L168 were not covered by tests
txn_context.metadata.start_version, txn_context.metadata.end_version,
);
},
Err(e) => {
info!("No more transactions in channel: {:?}", e);
break Ok(());

Check warning on line 174 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L172-L174

Added lines #L172 - L174 were not covered by tests
},
}
}
}

Check warning on line 178 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L178

Added line #L178 was not covered by tests
}
Loading

0 comments on commit e1b7479

Please sign in to comment.