Skip to content

Commit

Permalink
Migrate parquet txn metadata processor to sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Dec 5, 2024
1 parent 6052d0f commit 69a1b32
Show file tree
Hide file tree
Showing 13 changed files with 322 additions and 29 deletions.
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod default_models;
pub mod fungible_asset_models;
pub mod transaction_metadata_model;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod parquet_write_set_size_info;
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,3 @@
pub mod event_size_info;
pub mod transaction_size_info;
pub mod write_set_size_info;

// parquet models
pub mod parquet_write_set_size_info;
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric,
ParquetProcessingResult,
},
db::postgres::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
db::parquet::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
gap_detectors::ProcessingResult,
processors::{parquet_processors::ParquetProcessorTrait, ProcessorName, ProcessorTrait},
utils::{database::ArcDbPool, util::parse_timestamp},
Expand Down Expand Up @@ -92,31 +92,8 @@ impl ProcessorTrait for ParquetTransactionMetadataProcessor {
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();
let mut transaction_version_to_struct_count: AHashMap<i64, i64> = AHashMap::new();

let mut write_set_sizes = vec![];

for txn in &transactions {
let txn_version = txn.version as i64;
let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version);
let size_info = match txn.size_info.as_ref() {
Some(size_info) => size_info,
None => {
warn!(version = txn.version, "Transaction size info not found");
continue;
},
};
for (index, write_set_size_info) in size_info.write_op_size_info.iter().enumerate() {
write_set_sizes.push(WriteSetSize::from_transaction_info(
write_set_size_info,
txn_version,
index as i64,
block_timestamp,
));
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += 1)
.or_insert(1);
}
}
let write_set_sizes =
process_transaction(transactions, &mut transaction_version_to_struct_count);

Check warning on line 96 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L95-L96

Added lines #L95 - L96 were not covered by tests

let write_set_size_info_parquet_data = ParquetDataGeneric {
data: write_set_sizes,
Expand All @@ -143,3 +120,35 @@ impl ProcessorTrait for ParquetTransactionMetadataProcessor {
&self.connection_pool
}
}

pub fn process_transaction(
transactions: Vec<Transaction>,
transaction_version_to_struct_count: &mut AHashMap<i64, i64>,
) -> Vec<WriteSetSize> {
let mut write_set_sizes = vec![];

Check warning on line 128 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L124-L128

Added lines #L124 - L128 were not covered by tests

for txn in transactions {
let txn_version = txn.version as i64;
let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version);
let size_info = match txn.size_info.as_ref() {
Some(size_info) => size_info,

Check warning on line 134 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L130-L134

Added lines #L130 - L134 were not covered by tests
None => {
warn!(version = txn.version, "Transaction size info not found");
continue;

Check warning on line 137 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L136-L137

Added lines #L136 - L137 were not covered by tests
},
};
for (index, write_set_size_info) in size_info.write_op_size_info.iter().enumerate() {
write_set_sizes.push(WriteSetSize::from_transaction_info(
write_set_size_info,
txn_version,
index as i64,
block_timestamp,
));
transaction_version_to_struct_count
.entry(txn_version)
.and_modify(|e| *e += 1)
.or_insert(1);
}

Check warning on line 151 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L140-L151

Added lines #L140 - L151 were not covered by tests
}
write_set_sizes
}

Check warning on line 154 in rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_transaction_metadata_processor.rs#L153-L154

Added lines #L153 - L154 were not covered by tests
3 changes: 3 additions & 0 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ bitflags! {

const FUNGIBLE_ASSET_ACTIVITIES = 1 << 26;
const FUNGIBLE_ASSET_METADATA = 1 << 27;

// Transaction Metadata
const WRITE_SET_SIZE = 1 << 29;
}
}

Expand Down
6 changes: 6 additions & 0 deletions rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
parquet_processors::{
parquet_default_processor::ParquetDefaultProcessor,
parquet_fungible_asset_processor::ParquetFungibleAssetProcessor,
parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor,
},
processors::{
account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor,
Expand Down Expand Up @@ -89,6 +90,11 @@ impl RunnableConfig for IndexerProcessorConfig {
ParquetFungibleAssetProcessor::new(self.clone()).await?;
parquet_fungible_asset_processor.run_processor().await
},
ProcessorConfig::ParquetTransactionMetadataProcessor(_) => {
let parquet_transaction_metadata_processor =
ParquetTransactionMetadataProcessor::new(self.clone()).await?;
parquet_transaction_metadata_processor.run_processor().await

Check warning on line 96 in rust/sdk-processor/src/config/indexer_processor_config.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/config/indexer_processor_config.rs#L94-L96

Added lines #L94 - L96 were not covered by tests
},
}
}

Expand Down
6 changes: 6 additions & 0 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use processor::{
parquet_write_set_changes::WriteSetChangeModel,
},
fungible_asset_models::parquet_v2_fungible_asset_activities::FungibleAssetActivity,
transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
},
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -70,6 +71,7 @@ pub enum ProcessorConfig {
// ParquetProcessor
ParquetDefaultProcessor(ParquetDefaultProcessorConfig),
ParquetFungibleAssetProcessor(ParquetDefaultProcessorConfig),
ParquetTransactionMetadataProcessor(ParquetDefaultProcessorConfig),
}

impl ProcessorConfig {
Expand All @@ -86,6 +88,7 @@ impl ProcessorConfig {
pub fn get_processor_status_table_names(&self) -> anyhow::Result<Vec<String>> {
match self {
ProcessorConfig::ParquetDefaultProcessor(config)
| ProcessorConfig::ParquetTransactionMetadataProcessor(config)

Check warning on line 91 in rust/sdk-processor/src/config/processor_config.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/config/processor_config.rs#L91

Added line #L91 was not covered by tests
| ProcessorConfig::ParquetFungibleAssetProcessor(config) => {
// Get the processor name as a prefix
let processor_name = self.name();
Expand Down Expand Up @@ -128,6 +131,9 @@ impl ProcessorConfig {
ProcessorName::ParquetFungibleAssetProcessor => {
HashSet::from([FungibleAssetActivity::TABLE_NAME.to_string()])
},
ProcessorName::ParquetTransactionMetadataProcessor => {
HashSet::from([WriteSetSize::TABLE_NAME.to_string()])
},
_ => HashSet::new(), // Default case for unsupported processors
}
}
Expand Down
13 changes: 13 additions & 0 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use processor::{
},
parquet_v2_fungible_metadata::FungibleAssetMetadataModel,
},
transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
},
worker::TableFlags,
};
Expand All @@ -44,6 +45,7 @@ use strum::{Display, EnumIter};

pub mod parquet_default_processor;
pub mod parquet_fungible_asset_processor;
pub mod parquet_transaction_metadata_processor;

const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";

Expand Down Expand Up @@ -82,6 +84,8 @@ pub enum ParquetTypeEnum {
FungibleAssetBalance,
CurrentFungibleAssetBalance,
CurrentUnifiedFungibleAssetBalance,
// txn metadata,
WriteSetSize,
}

/// Trait for handling various Parquet types.
Expand Down Expand Up @@ -155,6 +159,7 @@ impl_parquet_trait!(
CurrentUnifiedFungibleAssetBalance,
ParquetTypeEnum::CurrentUnifiedFungibleAssetBalance
);
impl_parquet_trait!(WriteSetSize, ParquetTypeEnum::WriteSetSize);

#[derive(Debug, Clone)]
#[enum_dispatch(ParquetTypeTrait)]
Expand All @@ -172,6 +177,7 @@ pub enum ParquetTypeStructs {
FungibleAssetBalance(Vec<FungibleAssetBalance>),
CurrentFungibleAssetBalance(Vec<CurrentFungibleAssetBalance>),
CurrentUnifiedFungibleAssetBalance(Vec<CurrentUnifiedFungibleAssetBalance>),
WriteSetSize(Vec<WriteSetSize>),
}

impl ParquetTypeStructs {
Expand Down Expand Up @@ -202,6 +208,7 @@ impl ParquetTypeStructs {
ParquetTypeEnum::CurrentUnifiedFungibleAssetBalance => {
ParquetTypeStructs::CurrentUnifiedFungibleAssetBalance(Vec::new())
},
ParquetTypeEnum::WriteSetSize => ParquetTypeStructs::WriteSetSize(Vec::new()),

Check warning on line 211 in rust/sdk-processor/src/parquet_processors/mod.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/mod.rs#L211

Added line #L211 was not covered by tests
}
}

Expand Down Expand Up @@ -292,6 +299,12 @@ impl ParquetTypeStructs {
) => {
handle_append!(self_data, other_data)
},
(
ParquetTypeStructs::WriteSetSize(self_data),
ParquetTypeStructs::WriteSetSize(other_data),
) => {
handle_append!(self_data, other_data)

Check warning on line 306 in rust/sdk-processor/src/parquet_processors/mod.rs

View check run for this annotation

Codecov / codecov/patch

rust/sdk-processor/src/parquet_processors/mod.rs#L303-L306

Added lines #L303 - L306 were not covered by tests
},
_ => Err(ProcessorError::ProcessError {
message: "Mismatched buffer types in append operation".to_string(),
}),
Expand Down
Loading

0 comments on commit 69a1b32

Please sign in to comment.