Skip to content

Commit

Permalink
[SDK-parquet] parquet default processor extractor step
Browse files Browse the repository at this point in the history
  • Loading branch information
yuunlimm committed Nov 12, 2024
1 parent d475fad commit b4f46db
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ impl Transaction {
let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) =
Self::from_transaction(txn);
txns.push(txn.clone());
// TODO: Remove once fully migrated
transaction_version_to_struct_count
.entry(txn.txn_version)
.and_modify(|e| *e += 1)
Expand All @@ -386,6 +387,7 @@ impl Transaction {
block_metadata_txns.push(a.clone());
}

// TODO: Remove once fully migrated
if !wsc_list.is_empty() {
transaction_version_to_struct_count
.entry(txn.txn_version)
Expand Down
49 changes: 49 additions & 0 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,55 @@ impl ParquetTypeStructs {
ParquetTypeEnum::MoveModule => ParquetTypeStructs::MoveModule(Vec::new()),
}
}

pub fn get_table_name(&self) -> &'static str {
match self {
ParquetTypeStructs::MoveResource(_) => "move_resources",
ParquetTypeStructs::WriteSetChange(_) => "write_set_changes",
ParquetTypeStructs::Transaction(_) => "parquet_transactions",
ParquetTypeStructs::TableItem(_) => "table_items",
ParquetTypeStructs::MoveModule(_) => "move_modules",
}
}
}

pub trait ParquetStruct {}

impl ParquetStruct for MoveResource {}
impl ParquetStruct for WriteSetChangeModel {}
impl ParquetStruct for ParquetTransaction {}
impl ParquetStruct for TableItem {}
impl ParquetStruct for MoveModule {}

impl ParquetTypeStructs {
pub fn get_type(&self) -> ParquetTypeEnum {
match self {
ParquetTypeStructs::MoveResource(_) => ParquetTypeEnum::MoveResource,
ParquetTypeStructs::WriteSetChange(_) => ParquetTypeEnum::WriteSetChange,
ParquetTypeStructs::Transaction(_) => ParquetTypeEnum::Transaction,
ParquetTypeStructs::TableItem(_) => ParquetTypeEnum::TableItem,
ParquetTypeStructs::MoveModule(_) => ParquetTypeEnum::MoveModule,
}
}

/// Get a vector of trait object references to the inner structs
pub fn get_structs(&self) -> Vec<&dyn ParquetStruct> {
match self {
ParquetTypeStructs::MoveResource(v) => {
v.iter().map(|s| s as &dyn ParquetStruct).collect()
},
ParquetTypeStructs::WriteSetChange(v) => {
v.iter().map(|s| s as &dyn ParquetStruct).collect()
},
ParquetTypeStructs::Transaction(v) => {
v.iter().map(|s| s as &dyn ParquetStruct).collect()
},
ParquetTypeStructs::TableItem(v) => v.iter().map(|s| s as &dyn ParquetStruct).collect(),
ParquetTypeStructs::MoveModule(v) => {
v.iter().map(|s| s as &dyn ParquetStruct).collect()
},
}
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use processor::{
processors::default_processor::process_transactions,
worker::TableFlags,
};
pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64;

pub struct DefaultExtractor
where
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use crate::parquet_processors::{ParquetTypeEnum, ParquetTypeStructs};
use ahash::AHashMap;
use aptos_indexer_processor_sdk::{
aptos_protos::transaction::v1::Transaction,
traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable},
types::transaction_context::TransactionContext,
utils::errors::ProcessorError,
};
use async_trait::async_trait;
use processor::db::common::models::default_models::{
parquet_move_modules::MoveModule,
parquet_move_resources::MoveResource,
parquet_move_tables::TableItem,
parquet_transactions::TransactionModel,
parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel},
};
use std::collections::HashMap;

/// Extracts parquet data from transactions, allowing optional selection of specific tables.
pub struct ParquetDefaultExtractor
where
Self: Processable + Send + Sized + 'static,
{
pub opt_in_tables: Option<Vec<String>>,
}

impl ParquetDefaultExtractor {
fn add_if_opted_in(
&self,
map: &mut HashMap<ParquetTypeEnum, ParquetTypeStructs>,
enum_type: ParquetTypeEnum,
data: ParquetTypeStructs,
) {
if let Some(ref opt_in_tables) = self.opt_in_tables {
let table_name = enum_type.to_string();
if opt_in_tables.contains(&table_name) {
map.insert(enum_type, data);
}
} else {
// If there's no opt-in table, include all data
map.insert(enum_type, data);
}
}
}

type ParquetTypeMap = HashMap<ParquetTypeEnum, ParquetTypeStructs>;

#[async_trait]
impl Processable for ParquetDefaultExtractor {
type Input = Vec<Transaction>;
type Output = ParquetTypeMap;
type RunType = AsyncRunType;

async fn process(
&mut self,
transactions: TransactionContext<Self::Input>,
) -> anyhow::Result<Option<TransactionContext<ParquetTypeMap>>, ProcessorError> {
let (move_resources, write_set_changes, parquet_transactions, table_items, move_modules) =
process_transactions(transactions.data);

// Print the size of each extracted data type
println!("Processed data sizes:");
println!(" - MoveResources: {}", move_resources.len());
println!(" - WriteSetChanges: {}", write_set_changes.len());
println!(" - ParquetTransactions: {}", parquet_transactions.len());
println!(" - TableItems: {}", table_items.len());
println!(" - MoveModules: {}", move_modules.len());

let mut map: HashMap<ParquetTypeEnum, ParquetTypeStructs> = HashMap::new();
// Populate the map based on opt-in tables
self.add_if_opted_in(
&mut map,
ParquetTypeEnum::MoveResource,
ParquetTypeStructs::MoveResource(move_resources),
);
self.add_if_opted_in(
&mut map,
ParquetTypeEnum::WriteSetChange,
ParquetTypeStructs::WriteSetChange(write_set_changes),
);
self.add_if_opted_in(
&mut map,
ParquetTypeEnum::Transaction,
ParquetTypeStructs::Transaction(parquet_transactions),
);
self.add_if_opted_in(
&mut map,
ParquetTypeEnum::TableItem,
ParquetTypeStructs::TableItem(table_items),
);
self.add_if_opted_in(
&mut map,
ParquetTypeEnum::MoveModule,
ParquetTypeStructs::MoveModule(move_modules),
);
println!(
"Map populated with data for the following tables: {:?}",
map.keys().collect::<Vec<_>>()
);

Ok(Some(TransactionContext {
data: map,
metadata: transactions.metadata,
}))
}
}

pub fn process_transactions(
transactions: Vec<Transaction>,
) -> (
Vec<MoveResource>,
Vec<WriteSetChangeModel>,
Vec<TransactionModel>,
Vec<TableItem>,
Vec<MoveModule>,
) {
// this will be removed in the future.
let mut transaction_version_to_struct_count = AHashMap::new();
let (txns, _, write_set_changes, wsc_details) = TransactionModel::from_transactions(
&transactions,
&mut transaction_version_to_struct_count,
);

let mut move_modules = vec![];
let mut move_resources = vec![];
let mut table_items = vec![];

for detail in wsc_details {
match detail {
WriteSetChangeDetail::Module(module) => {
move_modules.push(module);
},
WriteSetChangeDetail::Resource(resource) => {
move_resources.push(resource);
},
WriteSetChangeDetail::Table(item, _, _) => {
table_items.push(item);
},
}
}

(
move_resources,
write_set_changes,
txns,
table_items,
move_modules,
)
}

impl AsyncStep for ParquetDefaultExtractor {}

impl NamedStep for ParquetDefaultExtractor {
fn name(&self) -> String {
"ParquetDefaultExtractor".to_string()
}
}

0 comments on commit b4f46db

Please sign in to comment.