Skip to content

Commit

Permalink
add a raw table item struct to support both parquet and postgres with…
Browse files Browse the repository at this point in the history
…out duplicating the parsing code
  • Loading branch information
yuunlimm committed Nov 14, 2024
1 parent 60692df commit ef311b0
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl CoinSupply {
}

// Convert to TableItem model. Some fields are just placeholders
let (table_item_model, _) =
let table_item_model =
TableItem::from_write_table_item(write_table_item, 0, txn_version, 0);

// Return early if not aptos coin aggregator key
Expand Down
99 changes: 77 additions & 22 deletions rust/processor/src/db/common/models/default_models/move_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,42 @@ pub struct TableMetadata {
pub value_type: String,
}

impl TableItem {
pub trait TableItemConvertible {
fn from_raw(raw_item: &RawTableItem) -> Self;
}

/// RawTableItem is a struct that will be used to converted into Postgres or Parquet TableItem
pub struct RawTableItem {
pub txn_version: i64,
pub block_timestamp: chrono::NaiveDateTime,
pub write_set_change_index: i64,
pub transaction_block_height: i64,
pub table_key: String,
pub table_handle: String,
pub decoded_key: String,
pub decoded_value: Option<String>,
pub is_deleted: bool,
}

impl RawTableItem {
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
write_set_change_index: i64,
transaction_version: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, CurrentTableItem) {
(
Self {
transaction_version,
txn_version,
write_set_change_index,
transaction_block_height,
key: write_table_item.key.to_string(),
table_key: write_table_item.key.to_string(),
table_handle: standardize_address(&write_table_item.handle.to_string()),
decoded_key: serde_json::from_str(
write_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),
decoded_value: serde_json::from_str(
write_table_item.data.as_ref().unwrap().value.as_str(),
)
.unwrap(),
decoded_key: write_table_item.data.as_ref().unwrap().key.clone(),
decoded_value: Some(write_table_item.data.as_ref().unwrap().value.clone()),
is_deleted: false,
block_timestamp,
},
CurrentTableItem {
table_handle: standardize_address(&write_table_item.handle.to_string()),
Expand All @@ -83,7 +96,7 @@ impl TableItem {
write_table_item.data.as_ref().unwrap().value.as_str(),
)
.unwrap(),
last_transaction_version: transaction_version,
last_transaction_version: txn_version,
is_deleted: false,
},
)
Expand All @@ -92,23 +105,21 @@ impl TableItem {
pub fn from_delete_table_item(
delete_table_item: &DeleteTableItem,
write_set_change_index: i64,
transaction_version: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, CurrentTableItem) {
(
Self {
transaction_version,
txn_version,
write_set_change_index,
transaction_block_height,
key: delete_table_item.key.to_string(),
table_key: delete_table_item.key.to_string(),
table_handle: standardize_address(&delete_table_item.handle.to_string()),
decoded_key: serde_json::from_str(
delete_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),

decoded_key: delete_table_item.data.as_ref().unwrap().key.clone(),
decoded_value: None,
is_deleted: true,
block_timestamp,
},
CurrentTableItem {
table_handle: standardize_address(&delete_table_item.handle.to_string()),
Expand All @@ -119,13 +130,57 @@ impl TableItem {
)
.unwrap(),
decoded_value: None,
last_transaction_version: transaction_version,
last_transaction_version: txn_version,
is_deleted: true,
},
)
}
}

impl TableItemConvertible for TableItem {
fn from_raw(raw_item: &RawTableItem) -> Self {
TableItem {
transaction_version: raw_item.txn_version,
write_set_change_index: raw_item.write_set_change_index,
transaction_block_height: raw_item.transaction_block_height,
key: raw_item.table_key.clone(),
table_handle: raw_item.table_handle.clone(),
decoded_key: serde_json::from_str(raw_item.decoded_key.as_str()).unwrap(),
decoded_value: raw_item
.decoded_value
.clone()
.map(|v| serde_json::from_str(v.as_str()).unwrap()),
is_deleted: raw_item.is_deleted,
}
}
}

impl TableItem {
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
write_set_change_index: i64,
transaction_version: i64,
transaction_block_height: i64,
) -> Self {
Self {
transaction_version,
write_set_change_index,
transaction_block_height,
key: write_table_item.key.to_string(),
table_handle: standardize_address(&write_table_item.handle.to_string()),
decoded_key: serde_json::from_str(
write_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),
decoded_value: serde_json::from_str(
write_table_item.data.as_ref().unwrap().value.as_str(),
)
.unwrap(),
is_deleted: false,
}
}
}

impl TableMetadata {
pub fn from_write_table_item(table_item: &WriteTableItem) -> Self {
Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::default_models::move_tables::{RawTableItem, TableItemConvertible},
utils::util::{hash_str, standardize_address},
};
use allocative_derive::Allocative;
Expand Down Expand Up @@ -164,3 +165,19 @@ impl TableMetadata {
}
}
}

impl TableItemConvertible for TableItem {
fn from_raw(raw_item: &RawTableItem) -> Self {
TableItem {
txn_version: raw_item.txn_version,
write_set_change_index: raw_item.write_set_change_index,
transaction_block_height: raw_item.transaction_block_height,
table_key: raw_item.table_key.clone(),
table_handle: raw_item.table_handle.clone(),
decoded_key: raw_item.decoded_key.clone(),
decoded_value: raw_item.decoded_value.clone(),
is_deleted: raw_item.is_deleted,
block_timestamp: raw_item.block_timestamp,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl CoinSupply {
}

// Convert to TableItem model. Some fields are just placeholders
let (table_item_model, _) =
let table_item_model =
TableItem::from_write_table_item(write_table_item, 0, txn_version, 0);

// Return early if not aptos coin aggregator key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl CurrentDelegatorBalance {
let delegator_address = standardize_address(&write_table_item.key.to_string());

// Convert to TableItem model. Some fields are just placeholders
let (table_item_model, _) =
let table_item_model =
TableItem::from_write_table_item(write_table_item, 0, txn_version, 0);

let shares: BigDecimal = table_item_model
Expand Down Expand Up @@ -171,7 +171,7 @@ impl CurrentDelegatorBalance {
};
let delegator_address = standardize_address(&write_table_item.key.to_string());
// Convert to TableItem model. Some fields are just placeholders
let (table_item_model, _) =
let table_item_model =
TableItem::from_write_table_item(write_table_item, 0, txn_version, 0);

let shares: BigDecimal = table_item_model
Expand Down
29 changes: 22 additions & 7 deletions rust/processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait};
use crate::{
db::common::models::default_models::{
block_metadata_transactions::BlockMetadataTransactionModel,
move_tables::{CurrentTableItem, TableItem, TableMetadata},
move_tables::{
CurrentTableItem, RawTableItem, TableItem, TableItemConvertible, TableMetadata,
},
},
gap_detectors::ProcessingResult,
schema,
Expand Down Expand Up @@ -216,10 +218,14 @@ impl ProcessorTrait for DefaultProcessor {
let processing_start = std::time::Instant::now();
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();
let flags = self.deprecated_tables;
let (block_metadata_transactions, table_items, current_table_items, table_metadata) =
let (block_metadata_transactions, raw_table_items, current_table_items, table_metadata) =
tokio::task::spawn_blocking(move || process_transactions(transactions, flags))
.await
.expect("Failed to spawn_blocking for TransactionModel::from_transactions");

let postgres_table_items: Vec<TableItem> =
raw_table_items.iter().map(TableItem::from_raw).collect();

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

Expand All @@ -229,7 +235,7 @@ impl ProcessorTrait for DefaultProcessor {
start_version,
end_version,
&block_metadata_transactions,
(&table_items, &current_table_items, &table_metadata),
(&postgres_table_items, &current_table_items, &table_metadata),
&self.per_table_chunk_sizes,
)
.await;
Expand All @@ -238,7 +244,7 @@ impl ProcessorTrait for DefaultProcessor {
// make it faster.
tokio::task::spawn(async move {
drop(block_metadata_transactions);
drop(table_items);
drop(postgres_table_items);
drop(current_table_items);
drop(table_metadata);
});
Expand Down Expand Up @@ -272,6 +278,7 @@ impl ProcessorTrait for DefaultProcessor {
}
}

// TODO: we can further optimize this by passing in a falg to selectively parse only the required data (e.g. table_items for parquet)
/// Processes a list of transactions and extracts relevant data into different models.
///
/// This function iterates over a list of transactions, extracting block metadata transactions,
Expand All @@ -297,7 +304,7 @@ pub fn process_transactions(
flags: TableFlags,
) -> (
Vec<BlockMetadataTransactionModel>,
Vec<TableItem>,
Vec<RawTableItem>,
Vec<CurrentTableItem>,
Vec<TableMetadata>,
) {
Expand All @@ -318,6 +325,10 @@ pub fn process_transactions(
.info
.as_ref()
.expect("Transaction info doesn't exist!");

#[allow(deprecated)]
let block_timestamp = chrono::NaiveDateTime::from_timestamp_opt(timestamp.seconds, 0)
.expect("Txn Timestamp is invalid!");
let txn_data = match transaction.txn_data.as_ref() {
Some(txn_data) => txn_data,
None => {
Expand Down Expand Up @@ -349,11 +360,12 @@ pub fn process_transactions(
.expect("WriteSetChange must have a change")
{
WriteSetChangeEnum::WriteTableItem(inner) => {
let (ti, cti) = TableItem::from_write_table_item(
let (ti, cti) = RawTableItem::from_write_table_item(
inner,
index as i64,
version,
block_height,
block_timestamp,
);
table_items.push(ti);
current_table_items.insert(
Expand All @@ -366,11 +378,12 @@ pub fn process_transactions(
);
},
WriteSetChangeEnum::DeleteTableItem(inner) => {
let (ti, cti) = TableItem::from_delete_table_item(
let (ti, cti) = RawTableItem::from_delete_table_item(
inner,
index as i64,
version,
block_height,
block_timestamp,
);
table_items.push(ti);
current_table_items
Expand All @@ -391,9 +404,11 @@ pub fn process_transactions(
.sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash)));
table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle));

// TODO: remove this, since we are not going to deprecate this anytime soon?
if flags.contains(TableFlags::TABLE_ITEMS) {
table_items.clear();
}
// TODO: migrate to Parquet
if flags.contains(TableFlags::TABLE_METADATAS) {
table_metadata.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
let (
(move_resources, write_set_changes, transactions, table_items, move_modules),
transaction_version_to_struct_count,
) = tokio::task::spawn_blocking(move || process_transactions(transactions))
) = tokio::task::spawn_blocking(move || process_transactions_parquet(transactions))
.await
.expect("Failed to spawn_blocking for TransactionModel::from_transactions");

Expand Down Expand Up @@ -213,7 +213,8 @@ impl ProcessorTrait for ParquetDefaultProcessor {
}
}

pub fn process_transactions(
// TODO: Remove transaction_version_to_struct_count after migration
pub fn process_transactions_parquet(
transactions: Vec<Transaction>,
) -> (
(
Expand Down
39 changes: 0 additions & 39 deletions rust/sdk-processor/src/parquet_processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,45 +80,6 @@ impl ParquetTypeStructs {
}
}

pub trait ParquetStruct {}

impl ParquetStruct for MoveResource {}
impl ParquetStruct for WriteSetChangeModel {}
impl ParquetStruct for ParquetTransaction {}
impl ParquetStruct for TableItem {}
impl ParquetStruct for MoveModule {}

impl ParquetTypeStructs {
pub fn get_type(&self) -> ParquetTypeEnum {
match self {
ParquetTypeStructs::MoveResource(_) => ParquetTypeEnum::MoveResource,
ParquetTypeStructs::WriteSetChange(_) => ParquetTypeEnum::WriteSetChange,
ParquetTypeStructs::Transaction(_) => ParquetTypeEnum::Transaction,
ParquetTypeStructs::TableItem(_) => ParquetTypeEnum::TableItem,
ParquetTypeStructs::MoveModule(_) => ParquetTypeEnum::MoveModule,
}
}

/// Get a vector of trait object references to the inner structs
pub fn get_structs(&self) -> Vec<&dyn ParquetStruct> {
match self {
ParquetTypeStructs::MoveResource(v) => {
v.iter().map(|s| s as &dyn ParquetStruct).collect()
},
ParquetTypeStructs::WriteSetChange(v) => {
v.iter().map(|s| s as &dyn ParquetStruct).collect()
},
ParquetTypeStructs::Transaction(v) => {
v.iter().map(|s| s as &dyn ParquetStruct).collect()
},
ParquetTypeStructs::TableItem(v) => v.iter().map(|s| s as &dyn ParquetStruct).collect(),
ParquetTypeStructs::MoveModule(v) => {
v.iter().map(|s| s as &dyn ParquetStruct).collect()
},
}
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
1 change: 0 additions & 1 deletion rust/sdk-processor/src/steps/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod parquet_extractor_helper;
pub mod processor_status_saver;

pub use processor_status_saver::get_processor_status_saver;
Loading

0 comments on commit ef311b0

Please sign in to comment.