-
Notifications
You must be signed in to change notification settings - Fork 83
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Migrate parquet txn metadata processor to sdk
- Loading branch information
Showing
12 changed files
with
319 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
pub mod default_models; | ||
pub mod fungible_asset_models; | ||
pub mod transaction_metadata_model; |
4 changes: 4 additions & 0 deletions
4
rust/processor/src/db/parquet/models/transaction_metadata_model/mod.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
// Copyright © Aptos Foundation | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
pub mod parquet_write_set_size_info; |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
179 changes: 179 additions & 0 deletions
179
rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
use crate::{ | ||
config::{ | ||
db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, | ||
processor_config::ProcessorConfig, | ||
}, | ||
parquet_processors::{ | ||
initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, | ||
set_backfill_table_flag, ParquetTypeEnum, | ||
}, | ||
steps::{ | ||
common::{ | ||
parquet_version_tracker_step::ParquetVersionTrackerStep, | ||
processor_status_saver::get_processor_status_saver, | ||
}, | ||
parquet_transaction_metadata_processor::parquet_transaction_metadata_extractor::ParquetTransactionMetadataExtractor, | ||
}, | ||
utils::{ | ||
chain_id::check_or_update_chain_id, | ||
database::{run_migrations, ArcDbPool}, | ||
starting_version::get_min_last_success_version_parquet, | ||
}, | ||
}; | ||
use anyhow::Context; | ||
use aptos_indexer_processor_sdk::{ | ||
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, | ||
builder::ProcessorBuilder, | ||
common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS}, | ||
traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, | ||
}; | ||
use parquet::schema::types::Type; | ||
use processor::{ | ||
bq_analytics::generic_parquet_processor::HasParquetSchema, | ||
db::parquet::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize, | ||
}; | ||
use std::{collections::HashMap, sync::Arc}; | ||
use tracing::{debug, info}; | ||
|
||
pub struct ParquetTransactionMetadataProcessor { | ||
pub config: IndexerProcessorConfig, | ||
pub db_pool: ArcDbPool, | ||
} | ||
|
||
impl ParquetTransactionMetadataProcessor { | ||
pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result<Self> { | ||
let db_pool = initialize_database_pool(&config.db_config).await?; | ||
Ok(Self { config, db_pool }) | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl ProcessorTrait for ParquetTransactionMetadataProcessor { | ||
fn name(&self) -> &'static str { | ||
self.config.processor_config.name() | ||
} | ||
|
||
async fn run_processor(&self) -> anyhow::Result<()> { | ||
// Run Migrations | ||
let parquet_db_config = match self.config.db_config { | ||
DbConfig::ParquetConfig(ref parquet_config) => { | ||
run_migrations( | ||
parquet_config.connection_string.clone(), | ||
self.db_pool.clone(), | ||
) | ||
.await; | ||
parquet_config | ||
}, | ||
_ => { | ||
return Err(anyhow::anyhow!( | ||
"Invalid db config for ParquetTransactionMetadataProcessor {:?}", | ||
self.config.db_config | ||
)); | ||
}, | ||
}; | ||
|
||
// Check and update the ledger chain id to ensure we're indexing the correct chain | ||
let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) | ||
.await? | ||
.get_chain_id() | ||
.await?; | ||
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; | ||
|
||
let parquet_processor_config = match self.config.processor_config.clone() { | ||
ProcessorConfig::ParquetTransactionMetadataProcessor(parquet_processor_config) => { | ||
parquet_processor_config | ||
}, | ||
_ => { | ||
return Err(anyhow::anyhow!( | ||
"Invalid processor configuration for ParquetTransactionMetadataProcessor {:?}", | ||
self.config.processor_config | ||
)); | ||
}, | ||
}; | ||
|
||
let processor_status_table_names = self | ||
.config | ||
.processor_config | ||
.get_processor_status_table_names() | ||
.context("Failed to get table names for the processor status table")?; | ||
|
||
let starting_version = get_min_last_success_version_parquet( | ||
&self.config, | ||
self.db_pool.clone(), | ||
processor_status_table_names, | ||
) | ||
.await?; | ||
println!("Starting version: {:?}", starting_version); | ||
|
||
// Define processor transaction stream config | ||
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { | ||
starting_version: Some(starting_version), | ||
..self.config.transaction_stream_config.clone() | ||
}) | ||
.await?; | ||
|
||
let backfill_table = set_backfill_table_flag(parquet_processor_config.backfill_table); | ||
let parquet_txn_metadata_extractor = ParquetTransactionMetadataExtractor { | ||
opt_in_tables: backfill_table, | ||
}; | ||
|
||
let gcs_client = | ||
initialize_gcs_client(parquet_db_config.google_application_credentials.clone()).await; | ||
|
||
let parquet_type_to_schemas: HashMap<ParquetTypeEnum, Arc<Type>> = | ||
[(ParquetTypeEnum::WriteSetSize, WriteSetSize::schema())] | ||
.into_iter() | ||
.collect(); | ||
|
||
let default_size_buffer_step = initialize_parquet_buffer_step( | ||
gcs_client.clone(), | ||
parquet_type_to_schemas, | ||
parquet_processor_config.upload_interval, | ||
parquet_processor_config.max_buffer_size, | ||
parquet_db_config.bucket_name.clone(), | ||
parquet_db_config.bucket_root.clone(), | ||
self.name().to_string(), | ||
) | ||
.await | ||
.unwrap_or_else(|e| { | ||
panic!("Failed to initialize parquet buffer step: {:?}", e); | ||
}); | ||
|
||
let parquet_version_tracker_step = ParquetVersionTrackerStep::new( | ||
get_processor_status_saver(self.db_pool.clone(), self.config.clone()), | ||
DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, | ||
); | ||
|
||
let channel_size = parquet_processor_config.channel_size; | ||
|
||
// Connect processor steps together | ||
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( | ||
transaction_stream.into_runnable_step(), | ||
) | ||
.connect_to( | ||
parquet_txn_metadata_extractor.into_runnable_step(), | ||
channel_size, | ||
) | ||
.connect_to(default_size_buffer_step.into_runnable_step(), channel_size) | ||
.connect_to( | ||
parquet_version_tracker_step.into_runnable_step(), | ||
channel_size, | ||
) | ||
.end_and_return_output_receiver(channel_size); | ||
|
||
loop { | ||
match buffer_receiver.recv().await { | ||
Ok(txn_context) => { | ||
debug!( | ||
"Finished processing versions [{:?}, {:?}]", | ||
txn_context.metadata.start_version, txn_context.metadata.end_version, | ||
); | ||
}, | ||
Err(e) => { | ||
info!("No more transactions in channel: {:?}", e); | ||
break Ok(()); | ||
}, | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 change: 1 addition & 0 deletions
1
rust/sdk-processor/src/steps/parquet_transaction_metadata_processor/mod.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
pub mod parquet_transaction_metadata_extractor; |
Oops, something went wrong.