Skip to content

Commit

Permalink
add a raw table item struct to support both parquet and postgres with…
Browse files Browse the repository at this point in the history
…out duplicating the parsing code
  • Loading branch information
yuunlimm committed Nov 13, 2024
1 parent 9c6dc91 commit 472b366
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 71 deletions.
105 changes: 105 additions & 0 deletions rust/processor/src/db/common/models/default_models/move_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,111 @@ pub struct TableMetadata {
pub value_type: String,
}

pub trait TableItemConvertible {
fn from_raw(raw_item: &RawTableItem) -> Self;
}

pub struct RawTableItem {
pub txn_version: i64,
pub block_timestamp: chrono::NaiveDateTime,
pub write_set_change_index: i64,
pub transaction_block_height: i64,
pub table_key: String,
pub table_handle: String,
pub decoded_key: String,
pub decoded_value: Option<String>,
pub is_deleted: bool,
}

impl RawTableItem {
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
write_set_change_index: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, CurrentTableItem) {
(
Self {
txn_version,
write_set_change_index,
transaction_block_height,
table_key: write_table_item.key.to_string(),
table_handle: standardize_address(&write_table_item.handle.to_string()),
decoded_key: write_table_item.data.as_ref().unwrap().key.clone(),
decoded_value: Some(write_table_item.data.as_ref().unwrap().value.clone()),
is_deleted: false,
block_timestamp,
},
CurrentTableItem {
table_handle: standardize_address(&write_table_item.handle.to_string()),
key_hash: hash_str(&write_table_item.key.to_string()),
key: write_table_item.key.to_string(),
decoded_key: serde_json::from_str(
write_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),
decoded_value: serde_json::from_str(
write_table_item.data.as_ref().unwrap().value.as_str(),
)
.unwrap(),
last_transaction_version: txn_version,
is_deleted: false,
},
)
}

pub fn from_delete_table_item(
delete_table_item: &DeleteTableItem,
write_set_change_index: i64,
txn_version: i64,
transaction_block_height: i64,
block_timestamp: chrono::NaiveDateTime,
) -> (Self, CurrentTableItem) {
(
Self {
txn_version,
write_set_change_index,
transaction_block_height,
table_key: delete_table_item.key.to_string(),
table_handle: standardize_address(&delete_table_item.handle.to_string()),
decoded_key: delete_table_item.data.as_ref().unwrap().key.clone(),
decoded_value: None,
is_deleted: true,
block_timestamp,
},
CurrentTableItem {
table_handle: standardize_address(&delete_table_item.handle.to_string()),
key_hash: hash_str(&delete_table_item.key.to_string()),
key: delete_table_item.key.to_string(),
decoded_key: serde_json::from_str(
delete_table_item.data.as_ref().unwrap().key.as_str(),
)
.unwrap(),
decoded_value: None,
last_transaction_version: txn_version,
is_deleted: true,
},
)
}
}

// In move_tables.rs
impl TableItemConvertible for TableItem {
fn from_raw(raw_item: &RawTableItem) -> Self {
TableItem {
transaction_version: raw_item.txn_version,
write_set_change_index: raw_item.write_set_change_index,
transaction_block_height: raw_item.transaction_block_height,
key: raw_item.table_key.clone(),
table_handle: raw_item.table_handle.clone(),
decoded_key: serde_json::from_str(raw_item.decoded_key.as_str()).unwrap(),
decoded_value: raw_item.decoded_value.clone().map(|v| serde_json::from_str(v.as_str()).unwrap()),
is_deleted: raw_item.is_deleted,
}
}
}

impl TableItem {
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use aptos_protos::transaction::v1::{DeleteTableItem, WriteTableItem};
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};
use crate::db::common::models::default_models::move_tables::{RawTableItem, TableItemConvertible};

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter,
Expand Down Expand Up @@ -164,3 +165,21 @@ impl TableMetadata {
}
}
}


// In parquet_move_tables.rs
impl TableItemConvertible for TableItem {
fn from_raw(raw_item: &RawTableItem) -> Self {
TableItem {
txn_version: raw_item.txn_version,
write_set_change_index: raw_item.write_set_change_index,
transaction_block_height: raw_item.transaction_block_height,
table_key: raw_item.table_key.clone(),
table_handle: raw_item.table_handle.clone(),
decoded_key: raw_item.decoded_key.clone(),
decoded_value: raw_item.decoded_value.clone(),
is_deleted: raw_item.is_deleted,
block_timestamp: raw_item.block_timestamp,
}
}
}
33 changes: 27 additions & 6 deletions rust/processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ use diesel::{
use std::fmt::Debug;
use tokio::join;
use tracing::error;
use crate::db::common::models::default_models::move_tables::RawTableItem;
use crate::db::common::models::default_models::move_tables::TableItemConvertible;


pub struct DefaultProcessor {
connection_pool: ArcDbPool,
Expand Down Expand Up @@ -216,10 +219,16 @@ impl ProcessorTrait for DefaultProcessor {
let processing_start = std::time::Instant::now();
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();
let flags = self.deprecated_tables;
let (block_metadata_transactions, table_items, current_table_items, table_metadata) =
let (block_metadata_transactions, raw_table_items, current_table_items, table_metadata) =
tokio::task::spawn_blocking(move || process_transactions(transactions, flags))
.await
.expect("Failed to spawn_blocking for TransactionModel::from_transactions");

let postgres_table_items: Vec<TableItem> = raw_table_items
.iter()
.map(TableItem::from_raw)
.collect();

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

Expand All @@ -229,7 +238,7 @@ impl ProcessorTrait for DefaultProcessor {
start_version,
end_version,
&block_metadata_transactions,
(&table_items, &current_table_items, &table_metadata),
(&postgres_table_items, &current_table_items, &table_metadata),
&self.per_table_chunk_sizes,
)
.await;
Expand All @@ -238,7 +247,7 @@ impl ProcessorTrait for DefaultProcessor {
// make it faster.
tokio::task::spawn(async move {
drop(block_metadata_transactions);
drop(table_items);
drop(postgres_table_items);
drop(current_table_items);
drop(table_metadata);
});
Expand Down Expand Up @@ -297,7 +306,7 @@ pub fn process_transactions(
flags: TableFlags,
) -> (
Vec<BlockMetadataTransactionModel>,
Vec<TableItem>,
Vec<RawTableItem>,
Vec<CurrentTableItem>,
Vec<TableMetadata>,
) {
Expand All @@ -318,6 +327,10 @@ pub fn process_transactions(
.info
.as_ref()
.expect("Transaction info doesn't exist!");

#[allow(deprecated)]
let block_timestamp = chrono::NaiveDateTime::from_timestamp_opt(timestamp.seconds, 0)
.expect("Txn Timestamp is invalid!");
let txn_data = match transaction.txn_data.as_ref() {
Some(txn_data) => txn_data,
None => {
Expand Down Expand Up @@ -349,12 +362,19 @@ pub fn process_transactions(
.expect("WriteSetChange must have a change")
{
WriteSetChangeEnum::WriteTableItem(inner) => {
let (ti, cti) = TableItem::from_write_table_item(
let (ti, cti) = RawTableItem::from_write_table_item(
inner,
index as i64,
version,
block_height,
block_timestamp,
);
// let (ti, cti) = TableItem::from_write_table_item(
// inner,
// index as i64,
// version,
// block_height,
// );
table_items.push(ti);
current_table_items.insert(
(cti.table_handle.clone(), cti.key_hash.clone()),
Expand All @@ -366,11 +386,12 @@ pub fn process_transactions(
);
},
WriteSetChangeEnum::DeleteTableItem(inner) => {
let (ti, cti) = TableItem::from_delete_table_item(
let (ti, cti) = RawTableItem::from_delete_table_item(
inner,
index as i64,
version,
block_height,
block_timestamp
);
table_items.push(ti);
current_table_items
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
let (
(move_resources, write_set_changes, transactions, table_items, move_modules),
transaction_version_to_struct_count,
) = tokio::task::spawn_blocking(move || process_transactions(transactions))
) = tokio::task::spawn_blocking(move || process_transactions2(transactions))
.await
.expect("Failed to spawn_blocking for TransactionModel::from_transactions");

Expand Down Expand Up @@ -213,7 +213,7 @@ impl ProcessorTrait for ParquetDefaultProcessor {
}
}

pub fn process_transactions(
pub fn process_transactions2(
transactions: Vec<Transaction>,
) -> (
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use async_trait::async_trait;
use processor::{
db::common::models::default_models::{
block_metadata_transactions::BlockMetadataTransactionModel,
move_tables::{CurrentTableItem, TableItem, TableMetadata},
move_tables::{CurrentTableItem, TableItem, TableItemConvertible, TableMetadata},
},
processors::default_processor::process_transactions,
worker::TableFlags,
Expand Down Expand Up @@ -47,13 +47,15 @@ impl Processable for DefaultExtractor {
ProcessorError,
> {
let flags = self.deprecated_table_flags;
let (block_metadata_transactions, table_items, current_table_items, table_metadata) =
let (block_metadata_transactions, raw_table_items, current_table_items, table_metadata) =
process_transactions(transactions.data, flags);
let postgres_table_items: Vec<TableItem> =
raw_table_items.iter().map(TableItem::from_raw).collect();

Ok(Some(TransactionContext {
data: (
block_metadata_transactions,
table_items,
postgres_table_items,
current_table_items,
table_metadata,
),
Expand Down
2 changes: 1 addition & 1 deletion rust/sdk-processor/src/steps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod default_processor;
pub mod events_processor;
pub mod fungible_asset_processor;
pub mod objects_processor;
mod parquet_default_processor;
pub mod parquet_default_processor;
pub mod stake_processor;
pub mod token_v2_processor;
pub mod user_transaction_processor;
Expand Down
Loading

0 comments on commit 472b366

Please sign in to comment.