Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate parquet txn metadata processor to sdk #646

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pub mod ans_models;
pub mod default_models;
pub mod event_models;
pub mod fungible_asset_models;
pub mod transaction_metadata_model;
pub mod user_transaction_models;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod parquet_write_set_size_info;
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,3 @@
pub mod event_size_info;
pub mod transaction_size_info;
pub mod write_set_size_info;

// parquet models
pub mod parquet_write_set_size_info;
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric,
ParquetProcessingResult,
},
db::postgres::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
db::parquet::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
gap_detectors::ProcessingResult,
processors::{parquet_processors::ParquetProcessorTrait, ProcessorName, ProcessorTrait},
utils::{database::ArcDbPool, util::parse_timestamp},
Expand Down Expand Up @@ -92,31 +92,8 @@
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();
let mut transaction_version_to_struct_count: AHashMap<i64, i64> = AHashMap::new();

let mut write_set_sizes = vec![];

for txn in &transactions {
let txn_version = txn.version as i64;
let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version);
let size_info = match txn.size_info.as_ref() {
Some(size_info) => size_info,
None => {
warn!(version = txn.version, "Transaction size info not found");
continue;
},
};
for (index, write_set_size_info) in size_info.write_op_size_info.iter().enumerate() {
write_set_sizes.push(WriteSetSize::from_transaction_info(
write_set_size_info,
txn_version,
index as i64,
block_timestamp,
));
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += 1)
.or_insert(1);
}
}
let write_set_sizes =
process_transaction(transactions, &mut transaction_version_to_struct_count);

Check warning on line 96 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L95-L96

Added lines #L95 - L96 were not covered by tests

let write_set_size_info_parquet_data = ParquetDataGeneric {
data: write_set_sizes,
Expand All @@ -143,3 +120,35 @@
&self.connection_pool
}
}

pub fn process_transaction(
transactions: Vec<Transaction>,
transaction_version_to_struct_count: &mut AHashMap<i64, i64>,
) -> Vec<WriteSetSize> {
let mut write_set_sizes = vec![];

Check warning on line 128 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L124-L128

Added lines #L124 - L128 were not covered by tests

for txn in transactions {
let txn_version = txn.version as i64;
let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version);
let size_info = match txn.size_info.as_ref() {
Some(size_info) => size_info,

Check warning on line 134 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L130-L134

Added lines #L130 - L134 were not covered by tests
None => {
warn!(version = txn.version, "Transaction size info not found");
continue;

Check warning on line 137 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L136-L137

Added lines #L136 - L137 were not covered by tests
},
};
for (index, write_set_size_info) in size_info.write_op_size_info.iter().enumerate() {
write_set_sizes.push(WriteSetSize::from_transaction_info(
write_set_size_info,
txn_version,
index as i64,
block_timestamp,
));
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += 1)
.or_insert(1);
}

Check warning on line 151 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L140-L151

Added lines #L140 - L151 were not covered by tests
}
write_set_sizes
}

Check warning on line 154 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L153-L154

Added lines #L153 - L154 were not covered by tests
6 changes: 6 additions & 0 deletions rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
parquet_default_processor::ParquetDefaultProcessor,
parquet_events_processor::ParquetEventsProcessor,
parquet_fungible_asset_processor::ParquetFungibleAssetProcessor,
parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor,
parquet_user_transaction_processor::ParquetUserTransactionsProcessor,
},
processors::{
Expand Down Expand Up @@ -100,6 +101,11 @@
ParquetFungibleAssetProcessor::new(self.clone()).await?;
parquet_fungible_asset_processor.run_processor().await
},
ProcessorConfig::ParquetTransactionMetadataProcessor(_) => {
let parquet_transaction_metadata_processor =
ParquetTransactionMetadataProcessor::new(self.clone()).await?;
parquet_transaction_metadata_processor.run_processor().await

Check warning on line 107 in rust/sdk-processor/src/config/indexer_processor_config.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/config/indexer_processor_config.rs#L105-L107

Added lines #L105 - L107 were not covered by tests
},
}
}

Expand Down
6 changes: 6 additions & 0 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
},
parquet_v2_fungible_metadata::FungibleAssetMetadataModel,
},
transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
user_transaction_models::parquet_user_transactions::UserTransaction,
},
};
Expand Down Expand Up @@ -81,6 +82,7 @@
ParquetEventsProcessor(ParquetDefaultProcessorConfig),
ParquetUserTransactionsProcessor(ParquetDefaultProcessorConfig),
ParquetFungibleAssetProcessor(ParquetDefaultProcessorConfig),
ParquetTransactionMetadataProcessor(ParquetDefaultProcessorConfig),
}

impl ProcessorConfig {
Expand All @@ -99,6 +101,7 @@
ProcessorConfig::ParquetDefaultProcessor(config)
| ProcessorConfig::ParquetEventsProcessor(config)
| ProcessorConfig::ParquetUserTransactionsProcessor(config)
| ProcessorConfig::ParquetTransactionMetadataProcessor(config)

Check warning on line 104 in rust/sdk-processor/src/config/processor_config.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/config/processor_config.rs#L104

Added line #L104 was not covered by tests
| ProcessorConfig::ParquetFungibleAssetProcessor(config) => {
// Get the processor name as a prefix
let processor_name = self.name();
Expand Down Expand Up @@ -151,6 +154,9 @@
CurrentUnifiedFungibleAssetBalance::TABLE_NAME.to_string(),
FungibleAssetMetadataModel::TABLE_NAME.to_string(),
]),
ProcessorName::ParquetTransactionMetadataProcessor => {
HashSet::from([WriteSetSize::TABLE_NAME.to_string()])
},
_ => HashSet::new(), // Default case for unsupported processors
}
}
Expand Down
13 changes: 13 additions & 0 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
},
parquet_v2_fungible_metadata::FungibleAssetMetadataModel,
},
transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
user_transaction_models::parquet_user_transactions::UserTransaction,
},
postgres::models::ans_models::parquet_ans_lookup_v2::AnsPrimaryNameV2,
Expand All @@ -51,6 +52,7 @@
pub mod parquet_default_processor;
pub mod parquet_events_processor;
pub mod parquet_fungible_asset_processor;
pub mod parquet_transaction_metadata_processor;
pub mod parquet_user_transaction_processor;

const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";
Expand Down Expand Up @@ -99,6 +101,8 @@
FungibleAssetBalances,
CurrentFungibleAssetBalances,
CurrentFungibleAssetBalancesLegacy,
// txn metadata,
WriteSetSize,
}

/// Trait for handling various Parquet types.
Expand Down Expand Up @@ -175,6 +179,7 @@
CurrentFungibleAssetBalance,
ParquetTypeEnum::CurrentFungibleAssetBalancesLegacy
);
impl_parquet_trait!(WriteSetSize, ParquetTypeEnum::WriteSetSize);

#[derive(Debug, Clone)]
#[enum_dispatch(ParquetTypeTrait)]
Expand All @@ -195,6 +200,7 @@
FungibleAssetBalance(Vec<FungibleAssetBalance>),
CurrentFungibleAssetBalance(Vec<CurrentFungibleAssetBalance>),
CurrentUnifiedFungibleAssetBalance(Vec<CurrentUnifiedFungibleAssetBalance>),
WriteSetSize(Vec<WriteSetSize>),
}

impl ParquetTypeStructs {
Expand Down Expand Up @@ -228,6 +234,7 @@
ParquetTypeEnum::CurrentFungibleAssetBalances => {
ParquetTypeStructs::CurrentUnifiedFungibleAssetBalance(Vec::new())
},
ParquetTypeEnum::WriteSetSize => ParquetTypeStructs::WriteSetSize(Vec::new()),

Check warning on line 237 in rust/sdk-processor/src/parquet_processors/mod.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/mod.rs#L237

Added line #L237 was not covered by tests
}
}

Expand Down Expand Up @@ -334,6 +341,12 @@
) => {
handle_append!(self_data, other_data)
},
(
ParquetTypeStructs::WriteSetSize(self_data),
ParquetTypeStructs::WriteSetSize(other_data),
) => {
handle_append!(self_data, other_data)

Check warning on line 348 in rust/sdk-processor/src/parquet_processors/mod.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/mod.rs#L345-L348

Added lines #L345 - L348 were not covered by tests
},
_ => Err(ProcessorError::ProcessError {
message: "Mismatched buffer types in append operation".to_string(),
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ impl ProcessorTrait for ParquetDefaultProcessor {
processor_status_table_names,
)
.await?;
println!("Starting version: {:?}", starting_version);

// Define processor transaction stream config
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use crate::{
config::{
db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig,
processor_config::ProcessorConfig,
},
parquet_processors::{
initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step,
set_backfill_table_flag, ParquetTypeEnum,
},
steps::{
common::{
parquet_version_tracker_step::ParquetVersionTrackerStep,
processor_status_saver::get_processor_status_saver,
},
parquet_transaction_metadata_processor::parquet_transaction_metadata_extractor::ParquetTransactionMetadataExtractor,
},
utils::{
chain_id::check_or_update_chain_id,
database::{run_migrations, ArcDbPool},
starting_version::get_min_last_success_version_parquet,
},
};
use anyhow::Context;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS},
traits::{processor_trait::ProcessorTrait, IntoRunnableStep},
};
use parquet::schema::types::Type;
use processor::{
bq_analytics::generic_parquet_processor::HasParquetSchema,
db::parquet::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
};
use std::{collections::HashMap, sync::Arc};
use tracing::{debug, info};

pub struct ParquetTransactionMetadataProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool,
}

impl ParquetTransactionMetadataProcessor {
pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result<Self> {
let db_pool = initialize_database_pool(&config.db_config).await?;
Ok(Self { config, db_pool })
}

Check warning on line 47 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L44-L47

Added lines #L44 - L47 were not covered by tests
}

#[async_trait::async_trait]
impl ProcessorTrait for ParquetTransactionMetadataProcessor {
fn name(&self) -> &'static str {
self.config.processor_config.name()
}

Check warning on line 54 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L52-L54

Added lines #L52 - L54 were not covered by tests

async fn run_processor(&self) -> anyhow::Result<()> {

Check warning on line 56 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L56

Added line #L56 was not covered by tests
// Run Migrations
let parquet_db_config = match self.config.db_config {
DbConfig::ParquetConfig(ref parquet_config) => {
run_migrations(
parquet_config.connection_string.clone(),
self.db_pool.clone(),
)
.await;
parquet_config

Check warning on line 65 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L58-L65

Added lines #L58 - L65 were not covered by tests
},
_ => {
return Err(anyhow::anyhow!(
"Invalid db config for ParquetTransactionMetadataProcessor {:?}",
self.config.db_config
));

Check warning on line 71 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L68-L71

Added lines #L68 - L71 were not covered by tests
},
};

// Check and update the ledger chain id to ensure we're indexing the correct chain
let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone())
.await?
.get_chain_id()
.await?;
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;

Check warning on line 80 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L76-L80

Added lines #L76 - L80 were not covered by tests

let parquet_processor_config = match self.config.processor_config.clone() {
ProcessorConfig::ParquetTransactionMetadataProcessor(parquet_processor_config) => {
parquet_processor_config

Check warning on line 84 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L82-L84

Added lines #L82 - L84 were not covered by tests
},
_ => {
return Err(anyhow::anyhow!(
"Invalid processor configuration for ParquetTransactionMetadataProcessor {:?}",
self.config.processor_config
));

Check warning on line 90 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L87-L90

Added lines #L87 - L90 were not covered by tests
},
};

let processor_status_table_names = self
.config
.processor_config
.get_processor_status_table_names()
.context("Failed to get table names for the processor status table")?;

Check warning on line 98 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L94-L98

Added lines #L94 - L98 were not covered by tests

let starting_version = get_min_last_success_version_parquet(
&self.config,
self.db_pool.clone(),
processor_status_table_names,
)
.await?;

Check warning on line 105 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L100-L105

Added lines #L100 - L105 were not covered by tests

// Define processor transaction stream config
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;

Check warning on line 112 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L108-L112

Added lines #L108 - L112 were not covered by tests

let backfill_table = set_backfill_table_flag(parquet_processor_config.backfill_table);
let parquet_txn_metadata_extractor = ParquetTransactionMetadataExtractor {
opt_in_tables: backfill_table,
};

Check warning on line 117 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L114-L117

Added lines #L114 - L117 were not covered by tests

let gcs_client =
initialize_gcs_client(parquet_db_config.google_application_credentials.clone()).await;

Check warning on line 120 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L119-L120

Added lines #L119 - L120 were not covered by tests

let parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>> =
[(ParquetTypeEnum::WriteSetSize, WriteSetSize::schema())]
.into_iter()
.collect();

Check warning on line 125 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L122-L125

Added lines #L122 - L125 were not covered by tests

let default_size_buffer_step = initialize_parquet_buffer_step(
gcs_client.clone(),
parquet_type_to_schemas,
parquet_processor_config.upload_interval,
parquet_processor_config.max_buffer_size,
parquet_db_config.bucket_name.clone(),
parquet_db_config.bucket_root.clone(),
self.name().to_string(),
)
.await
.unwrap_or_else(|e| {
panic!("Failed to initialize parquet buffer step: {:?}", e);
});

let parquet_version_tracker_step = ParquetVersionTrackerStep::new(
get_processor_status_saver(self.db_pool.clone(), self.config.clone()),
DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
);

let channel_size = parquet_processor_config.channel_size;

// Connect processor steps together
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(
parquet_txn_metadata_extractor.into_runnable_step(),
channel_size,
)
.connect_to(default_size_buffer_step.into_runnable_step(), channel_size)
.connect_to(
parquet_version_tracker_step.into_runnable_step(),
channel_size,
)
.end_and_return_output_receiver(channel_size);

Check warning on line 161 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L127-L161

Added lines #L127 - L161 were not covered by tests

loop {
match buffer_receiver.recv().await {
Ok(txn_context) => {
debug!(
"Finished processing versions [{:?}, {:?}]",

Check warning on line 167 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L164-L167

Added lines #L164 - L167 were not covered by tests
txn_context.metadata.start_version, txn_context.metadata.end_version,
);
},
Err(e) => {
info!("No more transactions in channel: {:?}", e);
break Ok(());

Check warning on line 173 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L171-L173

Added lines #L171 - L173 were not covered by tests
},
}
}
}

Check warning on line 177 in rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs#L177

Added line #L177 was not covered by tests
}
Loading
Loading