Skip to content

Commit

Permalink
add a raw table item struct to support both parquet and postgres with…
Browse files Browse the repository at this point in the history
…out duplicating the parsing code
  • Loading branch information
yuunlimm committed Nov 14, 2024
1 parent 60692df commit 2dfdb15
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 86 deletions.
107 changes: 107 additions & 0 deletions rust/processor/src/db/common/models/default_models/move_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,113 @@ pub struct TableMetadata {
pub value_type: String,
}

pub trait TableItemConvertible {
fn from_raw(raw_item: &RawTableItem) -> Self;
}

pub struct RawTableItem {
pub txn_version: i64,
pub block_timestamp: chrono::NaiveDateTime,
pub write_set_change_index: i64,
pub transaction_block_height: i64,
pub table_key: String,
pub table_handle: String,
pub decoded_key: String,
pub decoded_value: Option<String>,
pub is_deleted: bool,
}

impl RawTableItem {
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
write_set_change_index: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, CurrentTableItem) {
(
Self {
txn_version,
write_set_change_index,
transaction_block_height,
table_key: write_table_item.key.to_string(),
table_handle: standardize_address(&write_table_item.handle.to_string()),
decoded_key: write_table_item.data.as_ref().unwrap().key.clone(),
decoded_value: Some(write_table_item.data.as_ref().unwrap().value.clone()),
is_deleted: false,
block_timestamp,
},
CurrentTableItem {
table_handle: standardize_address(&write_table_item.handle.to_string()),
key_hash: hash_str(&write_table_item.key.to_string()),
key: write_table_item.key.to_string(),
decoded_key: serde_json::from_str(
write_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),
decoded_value: serde_json::from_str(
write_table_item.data.as_ref().unwrap().value.as_str(),
)
.unwrap(),
last_transaction_version: txn_version,
is_deleted: false,
},
)
}

pub fn from_delete_table_item(
delete_table_item: &DeleteTableItem,
write_set_change_index: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, CurrentTableItem) {
(
Self {
txn_version,
write_set_change_index,
transaction_block_height,
table_key: delete_table_item.key.to_string(),
table_handle: standardize_address(&delete_table_item.handle.to_string()),
decoded_key: delete_table_item.data.as_ref().unwrap().key.clone(),
decoded_value: None,
is_deleted: true,
block_timestamp,
},
CurrentTableItem {
table_handle: standardize_address(&delete_table_item.handle.to_string()),
key_hash: hash_str(&delete_table_item.key.to_string()),
key: delete_table_item.key.to_string(),
decoded_key: serde_json::from_str(
delete_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),
decoded_value: None,
last_transaction_version: txn_version,
is_deleted: true,
},
)
}
}

impl TableItemConvertible for TableItem {
fn from_raw(raw_item: &RawTableItem) -> Self {
TableItem {
transaction_version: raw_item.txn_version,
write_set_change_index: raw_item.write_set_change_index,
transaction_block_height: raw_item.transaction_block_height,
key: raw_item.table_key.clone(),
table_handle: raw_item.table_handle.clone(),
decoded_key: serde_json::from_str(raw_item.decoded_key.as_str()).unwrap(),
decoded_value: raw_item
.decoded_value
.clone()
.map(|v| serde_json::from_str(v.as_str()).unwrap()),
is_deleted: raw_item.is_deleted,
}
}
}

impl TableItem {
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::default_models::move_tables::{RawTableItem, TableItemConvertible},
utils::util::{hash_str, standardize_address},
};
use allocative_derive::Allocative;
Expand Down Expand Up @@ -164,3 +165,19 @@ impl TableMetadata {
}
}
}

impl TableItemConvertible for TableItem {
fn from_raw(raw_item: &RawTableItem) -> Self {
TableItem {
txn_version: raw_item.txn_version,
write_set_change_index: raw_item.write_set_change_index,
transaction_block_height: raw_item.transaction_block_height,
table_key: raw_item.table_key.clone(),
table_handle: raw_item.table_handle.clone(),
decoded_key: raw_item.decoded_key.clone(),
decoded_value: raw_item.decoded_value.clone(),
is_deleted: raw_item.is_deleted,
block_timestamp: raw_item.block_timestamp,
}
}
}
29 changes: 22 additions & 7 deletions rust/processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait};
use crate::{
db::common::models::default_models::{
block_metadata_transactions::BlockMetadataTransactionModel,
move_tables::{CurrentTableItem, TableItem, TableMetadata},
move_tables::{
CurrentTableItem, RawTableItem, TableItem, TableItemConvertible, TableMetadata,
},
},
gap_detectors::ProcessingResult,
schema,
Expand Down Expand Up @@ -216,10 +218,14 @@ impl ProcessorTrait for DefaultProcessor {
let processing_start = std::time::Instant::now();
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();
let flags = self.deprecated_tables;
let (block_metadata_transactions, table_items, current_table_items, table_metadata) =
let (block_metadata_transactions, raw_table_items, current_table_items, table_metadata) =
tokio::task::spawn_blocking(move || process_transactions(transactions, flags))
.await
.expect("Failed to spawn_blocking for TransactionModel::from_transactions");

let postgres_table_items: Vec<TableItem> =
raw_table_items.iter().map(TableItem::from_raw).collect();

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

Expand All @@ -229,7 +235,7 @@ impl ProcessorTrait for DefaultProcessor {
start_version,
end_version,
&block_metadata_transactions,
(&table_items, &current_table_items, &table_metadata),
(&postgres_table_items, &current_table_items, &table_metadata),
&self.per_table_chunk_sizes,
)
.await;
Expand All @@ -238,7 +244,7 @@ impl ProcessorTrait for DefaultProcessor {
// make it faster.
tokio::task::spawn(async move {
drop(block_metadata_transactions);
drop(table_items);
drop(postgres_table_items);
drop(current_table_items);
drop(table_metadata);
});
Expand Down Expand Up @@ -272,6 +278,7 @@ impl ProcessorTrait for DefaultProcessor {
}
}

// TODO: we can further optimize this by passing in a falg to selectively parse only the required data (e.g. table_items for parquet)
/// Processes a list of transactions and extracts relevant data into different models.
///
/// This function iterates over a list of transactions, extracting block metadata transactions,
Expand All @@ -297,7 +304,7 @@ pub fn process_transactions(
flags: TableFlags,
) -> (
Vec<BlockMetadataTransactionModel>,
Vec<TableItem>,
Vec<RawTableItem>,
Vec<CurrentTableItem>,
Vec<TableMetadata>,
) {
Expand All @@ -318,6 +325,10 @@ pub fn process_transactions(
.info
.as_ref()
.expect("Transaction info doesn't exist!");

#[allow(deprecated)]
let block_timestamp = chrono::NaiveDateTime::from_timestamp_opt(timestamp.seconds, 0)
.expect("Txn Timestamp is invalid!");
let txn_data = match transaction.txn_data.as_ref() {
Some(txn_data) => txn_data,
None => {
Expand Down Expand Up @@ -349,11 +360,12 @@ pub fn process_transactions(
.expect("WriteSetChange must have a change")
{
WriteSetChangeEnum::WriteTableItem(inner) => {
let (ti, cti) = TableItem::from_write_table_item(
let (ti, cti) = RawTableItem::from_write_table_item(
inner,
index as i64,
version,
block_height,
block_timestamp,
);
table_items.push(ti);
current_table_items.insert(
Expand All @@ -366,11 +378,12 @@ pub fn process_transactions(
);
},
WriteSetChangeEnum::DeleteTableItem(inner) => {
let (ti, cti) = TableItem::from_delete_table_item(
let (ti, cti) = RawTableItem::from_delete_table_item(
inner,
index as i64,
version,
block_height,
block_timestamp,
);
table_items.push(ti);
current_table_items
Expand All @@ -391,9 +404,11 @@ pub fn process_transactions(
.sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash)));
table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle));

// TODO: remove this, since we are not going to deprecate this anytime soon?
if flags.contains(TableFlags::TABLE_ITEMS) {
table_items.clear();
}
// TODO: migrate to Parquet
if flags.contains(TableFlags::TABLE_METADATAS) {
table_metadata.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
let (
(move_resources, write_set_changes, transactions, table_items, move_modules),
transaction_version_to_struct_count,
) = tokio::task::spawn_blocking(move || process_transactions(transactions))
) = tokio::task::spawn_blocking(move || process_transactions_parquet(transactions))
.await
.expect("Failed to spawn_blocking for TransactionModel::from_transactions");

Expand Down Expand Up @@ -213,7 +213,8 @@ impl ProcessorTrait for ParquetDefaultProcessor {
}
}

pub fn process_transactions(
// TODO: Remove transaction_version_to_struct_count after migration
pub fn process_transactions_parquet(
transactions: Vec<Transaction>,
) -> (
(
Expand Down
1 change: 0 additions & 1 deletion rust/sdk-processor/src/steps/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod parquet_extractor_helper;
pub mod processor_status_saver;

pub use processor_status_saver::get_processor_status_saver;
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use async_trait::async_trait;
use processor::{
db::common::models::default_models::{
block_metadata_transactions::BlockMetadataTransactionModel,
move_tables::{CurrentTableItem, TableItem, TableMetadata},
move_tables::{CurrentTableItem, TableItem, TableItemConvertible, TableMetadata},
},
processors::default_processor::process_transactions,
worker::TableFlags,
Expand Down Expand Up @@ -47,13 +47,15 @@ impl Processable for DefaultExtractor {
ProcessorError,
> {
let flags = self.deprecated_table_flags;
let (block_metadata_transactions, table_items, current_table_items, table_metadata) =
let (block_metadata_transactions, raw_table_items, current_table_items, table_metadata) =
process_transactions(transactions.data, flags);
let postgres_table_items: Vec<TableItem> =
raw_table_items.iter().map(TableItem::from_raw).collect();

Ok(Some(TransactionContext {
data: (
block_metadata_transactions,
table_items,
postgres_table_items,
current_table_items,
table_metadata,
),
Expand Down
2 changes: 1 addition & 1 deletion rust/sdk-processor/src/steps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod default_processor;
pub mod events_processor;
pub mod fungible_asset_processor;
pub mod objects_processor;
mod parquet_default_processor;
pub mod parquet_default_processor;
pub mod stake_processor;
pub mod token_v2_processor;
pub mod user_transaction_processor;
Expand Down
Loading

0 comments on commit 2dfdb15

Please sign in to comment.