Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK-parquet] parquet default processor extractor step #601

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl CoinSupply {
}

// Convert to TableItem model. Some fields are just placeholders
let (table_item_model, _) =
let table_item_model =
TableItem::from_write_table_item(write_table_item, 0, txn_version, 0);

// Return early if not aptos coin aggregator key
Expand Down
97 changes: 75 additions & 22 deletions rust/processor/src/db/common/models/default_models/move_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,42 @@ pub struct TableMetadata {
pub value_type: String,
yuunlimm marked this conversation as resolved.
Show resolved Hide resolved
}

impl TableItem {
pub trait TableItemConvertible {
fn from_raw(raw_item: &RawTableItem) -> Self;
}

/// RawTableItem is a struct that will be used to converted into Postgres or Parquet TableItem
pub struct RawTableItem {
pub txn_version: i64,
pub block_timestamp: chrono::NaiveDateTime,
pub write_set_change_index: i64,
pub transaction_block_height: i64,
pub table_key: String,
pub table_handle: String,
pub decoded_key: String,
pub decoded_value: Option<String>,
pub is_deleted: bool,
}

impl RawTableItem {
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
write_set_change_index: i64,
transaction_version: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, CurrentTableItem) {
(
Self {
transaction_version,
txn_version,
write_set_change_index,
transaction_block_height,
key: write_table_item.key.to_string(),
table_key: write_table_item.key.to_string(),
table_handle: standardize_address(&write_table_item.handle.to_string()),
decoded_key: serde_json::from_str(
write_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),
decoded_value: serde_json::from_str(
write_table_item.data.as_ref().unwrap().value.as_str(),
)
.unwrap(),
decoded_key: write_table_item.data.as_ref().unwrap().key.clone(),
decoded_value: Some(write_table_item.data.as_ref().unwrap().value.clone()),
is_deleted: false,
block_timestamp,
},
CurrentTableItem {
table_handle: standardize_address(&write_table_item.handle.to_string()),
Expand All @@ -83,7 +96,7 @@ impl TableItem {
write_table_item.data.as_ref().unwrap().value.as_str(),
)
.unwrap(),
last_transaction_version: transaction_version,
last_transaction_version: txn_version,
is_deleted: false,
},
)
Expand All @@ -92,23 +105,21 @@ impl TableItem {
pub fn from_delete_table_item(
delete_table_item: &DeleteTableItem,
write_set_change_index: i64,
transaction_version: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, CurrentTableItem) {
(
Self {
transaction_version,
txn_version,
write_set_change_index,
transaction_block_height,
key: delete_table_item.key.to_string(),
table_key: delete_table_item.key.to_string(),
table_handle: standardize_address(&delete_table_item.handle.to_string()),
decoded_key: serde_json::from_str(
delete_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),

decoded_key: delete_table_item.data.as_ref().unwrap().key.clone(),
decoded_value: None,
is_deleted: true,
block_timestamp,
},
CurrentTableItem {
table_handle: standardize_address(&delete_table_item.handle.to_string()),
Expand All @@ -119,13 +130,55 @@ impl TableItem {
)
.unwrap(),
decoded_value: None,
last_transaction_version: transaction_version,
last_transaction_version: txn_version,
is_deleted: true,
},
)
}
}

impl TableItemConvertible for TableItem {
fn from_raw(raw_item: &RawTableItem) -> Self {
TableItem {
transaction_version: raw_item.txn_version,
write_set_change_index: raw_item.write_set_change_index,
transaction_block_height: raw_item.transaction_block_height,
key: raw_item.table_key.clone(),
table_handle: raw_item.table_handle.clone(),
decoded_key: serde_json::from_str(raw_item.decoded_key.as_str()).unwrap(),
decoded_value: raw_item
.decoded_value
.clone()
.map(|v| serde_json::from_str(v.as_str()).unwrap()),
is_deleted: raw_item.is_deleted,
}
}
}

impl TableItem {
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
write_set_change_index: i64,
transaction_version: i64,
transaction_block_height: i64,
) -> Self {
Self {
transaction_version,
write_set_change_index,
transaction_block_height,
key: write_table_item.key.to_string(),
table_handle: standardize_address(&write_table_item.handle.to_string()),
decoded_key: serde_json::from_str(write_table_item.data.as_ref().unwrap().key.as_str())
.unwrap(),
decoded_value: serde_json::from_str(
write_table_item.data.as_ref().unwrap().value.as_str(),
)
.unwrap(),
is_deleted: false,
}
}
yuunlimm marked this conversation as resolved.
Show resolved Hide resolved
}

impl TableMetadata {
pub fn from_write_table_item(table_item: &WriteTableItem) -> Self {
Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::default_models::move_tables::{RawTableItem, TableItemConvertible},
utils::util::{hash_str, standardize_address},
};
use allocative_derive::Allocative;
Expand Down Expand Up @@ -164,3 +165,19 @@ impl TableMetadata {
}
}
}

impl TableItemConvertible for TableItem {
fn from_raw(raw_item: &RawTableItem) -> Self {
TableItem {
txn_version: raw_item.txn_version,
write_set_change_index: raw_item.write_set_change_index,
transaction_block_height: raw_item.transaction_block_height,
table_key: raw_item.table_key.clone(),
table_handle: raw_item.table_handle.clone(),
decoded_key: raw_item.decoded_key.clone(),
decoded_value: raw_item.decoded_value.clone(),
is_deleted: raw_item.is_deleted,
block_timestamp: raw_item.block_timestamp,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ impl Transaction {
let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) =
Self::from_transaction(txn);
txns.push(txn.clone());
// TODO: Remove once fully migrated
transaction_version_to_struct_count
.entry(txn.txn_version)
.and_modify(|e| *e += 1)
Expand All @@ -386,6 +387,7 @@ impl Transaction {
block_metadata_txns.push(a.clone());
}

// TODO: Remove once fully migrated
if !wsc_list.is_empty() {
transaction_version_to_struct_count
.entry(txn.txn_version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl CoinSupply {
}

// Convert to TableItem model. Some fields are just placeholders
let (table_item_model, _) =
let table_item_model =
TableItem::from_write_table_item(write_table_item, 0, txn_version, 0);

// Return early if not aptos coin aggregator key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl CurrentDelegatorBalance {
let delegator_address = standardize_address(&write_table_item.key.to_string());

// Convert to TableItem model. Some fields are just placeholders
let (table_item_model, _) =
yuunlimm marked this conversation as resolved.
Show resolved Hide resolved
let table_item_model =
yuunlimm marked this conversation as resolved.
Show resolved Hide resolved
TableItem::from_write_table_item(write_table_item, 0, txn_version, 0);

let shares: BigDecimal = table_item_model
Expand Down Expand Up @@ -171,7 +171,7 @@ impl CurrentDelegatorBalance {
};
let delegator_address = standardize_address(&write_table_item.key.to_string());
// Convert to TableItem model. Some fields are just placeholders
let (table_item_model, _) =
let table_item_model =
TableItem::from_write_table_item(write_table_item, 0, txn_version, 0);

let shares: BigDecimal = table_item_model
Expand Down
29 changes: 22 additions & 7 deletions rust/processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait};
use crate::{
db::common::models::default_models::{
block_metadata_transactions::BlockMetadataTransactionModel,
move_tables::{CurrentTableItem, TableItem, TableMetadata},
move_tables::{
CurrentTableItem, RawTableItem, TableItem, TableItemConvertible, TableMetadata,
},
},
gap_detectors::ProcessingResult,
schema,
Expand Down Expand Up @@ -216,10 +218,14 @@ impl ProcessorTrait for DefaultProcessor {
let processing_start = std::time::Instant::now();
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();
let flags = self.deprecated_tables;
let (block_metadata_transactions, table_items, current_table_items, table_metadata) =
let (block_metadata_transactions, raw_table_items, current_table_items, table_metadata) =
tokio::task::spawn_blocking(move || process_transactions(transactions, flags))
.await
.expect("Failed to spawn_blocking for TransactionModel::from_transactions");

let postgres_table_items: Vec<TableItem> =
raw_table_items.iter().map(TableItem::from_raw).collect();

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

Expand All @@ -229,7 +235,7 @@ impl ProcessorTrait for DefaultProcessor {
start_version,
end_version,
&block_metadata_transactions,
(&table_items, &current_table_items, &table_metadata),
(&postgres_table_items, &current_table_items, &table_metadata),
&self.per_table_chunk_sizes,
)
.await;
Expand All @@ -238,7 +244,7 @@ impl ProcessorTrait for DefaultProcessor {
// make it faster.
tokio::task::spawn(async move {
drop(block_metadata_transactions);
drop(table_items);
drop(postgres_table_items);
drop(current_table_items);
drop(table_metadata);
});
Expand Down Expand Up @@ -272,6 +278,7 @@ impl ProcessorTrait for DefaultProcessor {
}
}

// TODO: we can further optimize this by passing in a falg to selectively parse only the required data (e.g. table_items for parquet)
/// Processes a list of transactions and extracts relevant data into different models.
///
/// This function iterates over a list of transactions, extracting block metadata transactions,
Expand All @@ -297,7 +304,7 @@ pub fn process_transactions(
flags: TableFlags,
yuunlimm marked this conversation as resolved.
Show resolved Hide resolved
) -> (
Vec<BlockMetadataTransactionModel>,
Vec<TableItem>,
Vec<RawTableItem>,
Vec<CurrentTableItem>,
Vec<TableMetadata>,
) {
Expand All @@ -318,6 +325,10 @@ pub fn process_transactions(
.info
.as_ref()
.expect("Transaction info doesn't exist!");

#[allow(deprecated)]
let block_timestamp = chrono::NaiveDateTime::from_timestamp_opt(timestamp.seconds, 0)
.expect("Txn Timestamp is invalid!");
let txn_data = match transaction.txn_data.as_ref() {
Some(txn_data) => txn_data,
None => {
Expand Down Expand Up @@ -349,11 +360,12 @@ pub fn process_transactions(
.expect("WriteSetChange must have a change")
{
WriteSetChangeEnum::WriteTableItem(inner) => {
let (ti, cti) = TableItem::from_write_table_item(
let (ti, cti) = RawTableItem::from_write_table_item(
inner,
index as i64,
version,
block_height,
block_timestamp,
);
table_items.push(ti);
current_table_items.insert(
Expand All @@ -366,11 +378,12 @@ pub fn process_transactions(
);
},
WriteSetChangeEnum::DeleteTableItem(inner) => {
let (ti, cti) = TableItem::from_delete_table_item(
let (ti, cti) = RawTableItem::from_delete_table_item(
inner,
index as i64,
version,
block_height,
block_timestamp,
);
table_items.push(ti);
current_table_items
Expand All @@ -391,9 +404,11 @@ pub fn process_transactions(
.sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash)));
table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle));

// TODO: remove this, since we are not going to deprecate this anytime soon?
if flags.contains(TableFlags::TABLE_ITEMS) {
table_items.clear();
}
// TODO: migrate to Parquet
if flags.contains(TableFlags::TABLE_METADATAS) {
table_metadata.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
let (
(move_resources, write_set_changes, transactions, table_items, move_modules),
transaction_version_to_struct_count,
) = tokio::task::spawn_blocking(move || process_transactions(transactions))
) = tokio::task::spawn_blocking(move || process_transactions_parquet(transactions))
.await
.expect("Failed to spawn_blocking for TransactionModel::from_transactions");

Expand Down Expand Up @@ -213,7 +213,8 @@ impl ProcessorTrait for ParquetDefaultProcessor {
}
}

pub fn process_transactions(
// TODO: Remove transaction_version_to_struct_count after migration
pub fn process_transactions_parquet(
transactions: Vec<Transaction>,
) -> (
(
Expand Down
10 changes: 10 additions & 0 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ impl ParquetTypeStructs {
ParquetTypeEnum::MoveModule => ParquetTypeStructs::MoveModule(Vec::new()),
}
}

pub fn get_table_name(&self) -> &'static str {
match self {
ParquetTypeStructs::MoveResource(_) => "move_resources",
ParquetTypeStructs::WriteSetChange(_) => "write_set_changes",
ParquetTypeStructs::Transaction(_) => "parquet_transactions",
ParquetTypeStructs::TableItem(_) => "table_items",
ParquetTypeStructs::MoveModule(_) => "move_modules",
}
}
}

#[cfg(test)]
Expand Down
Loading
Loading