Skip to content

Commit

Permalink
skip writing deprecated tables in default processor (#394)
Browse files Browse the repository at this point in the history
* skip writing deprecated tables in default processor

* lint

* update readme

* use bitflags to skip struct parsing

* lint

* fix directory refac

* move flag check to default processor

* nin: fix the reorder

---------

Co-authored-by: CapCap <[email protected]>
  • Loading branch information
yuunlimm and CapCap authored Jun 13, 2024
1 parent 51092e4 commit a79298a
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 50 deletions.
11 changes: 6 additions & 5 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ base64 = "0.13.0"
bb8 = "0.8.1"
bcs = { git = "https://github.com/aptos-labs/bcs.git", rev = "d31fab9d81748e2594be5cd5cdf845786a30562d" }
bigdecimal = { version = "0.4.0", features = ["serde"] }
bitflags = "2.5.0"
chrono = { version = "0.4.19", features = ["clock", "serde"] }
clap = { version = "4.3.5", features = ["derive", "unstable-styles"] }
# Do NOT enable the postgres feature here, it is conditionally enabled in a feature
Expand Down
1 change: 1 addition & 0 deletions rust/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ aptos-protos = { workspace = true }
async-trait = { workspace = true }
bcs = { workspace = true }
bigdecimal = { workspace = true }
bitflags = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
diesel = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions rust/processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc
- "0x07"
# Skip all transactions that aren't user transactions
focus_user_transactions: false
deprecated_tables: [
"MOVE_RESOURCES",
]
```

#### Config Explanation
Expand Down
5 changes: 4 additions & 1 deletion rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ahash::AHashMap;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use server_framework::RunnableConfig;
use std::time::Duration;
use std::{collections::HashSet, time::Duration};
use url::Url;

pub const QUERY_DEFAULT_RETRIES: u32 = 5;
Expand Down Expand Up @@ -49,6 +49,8 @@ pub struct IndexerGrpcProcessorConfig {

#[serde(default)]
pub transaction_filter: TransactionFilter,
// String vector for deprecated tables to skip db writes
pub deprecated_tables: HashSet<String>,
}

impl IndexerGrpcProcessorConfig {
Expand Down Expand Up @@ -95,6 +97,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
self.enable_verbose_logging,
self.transaction_filter.clone(),
self.grpc_response_item_timeout_in_secs,
self.deprecated_tables.clone(),
)
.await
.context("Failed to build worker")?;
Expand Down
63 changes: 26 additions & 37 deletions rust/processor/src/db/common/models/default_models/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,15 @@ impl Transaction {
.timestamp
.as_ref()
.expect("Transaction timestamp doesn't exist!");

let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes(
&transaction_info.changes,
version,
block_height,
);

match txn_data {
TxnData::User(user_txn) => {
let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes(
&transaction_info.changes,
version,
block_height,
);
let payload = user_txn
.request
.as_ref()
Expand All @@ -202,7 +204,6 @@ impl Transaction {
.expect("Getting payload failed.");
let payload_cleaned = get_clean_payload(payload, version);
let payload_type = get_payload_type(payload);

(
Self::from_transaction_info_with_data(
transaction_info,
Expand All @@ -220,11 +221,6 @@ impl Transaction {
)
},
TxnData::Genesis(genesis_txn) => {
let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes(
&transaction_info.changes,
version,
block_height,
);
let payload = genesis_txn.payload.as_ref().unwrap();
let payload_cleaned = get_clean_writeset(payload, version);
// It's genesis so no big deal
Expand All @@ -245,34 +241,27 @@ impl Transaction {
wsc_detail,
)
},
TxnData::BlockMetadata(block_metadata_txn) => {
let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes(
&transaction_info.changes,
TxnData::BlockMetadata(block_metadata_txn) => (
Self::from_transaction_info_with_data(
transaction_info,
None,
None,
version,
transaction_type,
block_metadata_txn.events.len() as i64,
block_height,
);
(
Self::from_transaction_info_with_data(
transaction_info,
None,
None,
version,
transaction_type,
block_metadata_txn.events.len() as i64,
block_height,
epoch,
),
Some(BlockMetadataTransaction::from_transaction(
block_metadata_txn,
version,
block_height,
epoch,
timestamp,
)),
wsc,
wsc_detail,
)
},
epoch,
),
Some(BlockMetadataTransaction::from_transaction(
block_metadata_txn,
version,
block_height,
epoch,
timestamp,
)),
wsc,
wsc_detail,
),
TxnData::StateCheckpoint(_) => (
Self::from_transaction_info_with_data(
transaction_info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl WriteSetChange {
.change
.as_ref()
.expect("WriteSetChange must have a change");

match change {
WriteSetChangeEnum::WriteModule(inner) => (
Self {
Expand Down
29 changes: 25 additions & 4 deletions rust/processor/src/processors/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
},
schema,
utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool},
worker::TableFlags,
};
use ahash::AHashMap;
use anyhow::bail;
Expand All @@ -30,13 +31,19 @@ use tracing::error;
pub struct DefaultProcessor {
connection_pool: ArcDbPool,
per_table_chunk_sizes: AHashMap<String, usize>,
deprecated_tables: TableFlags,
}

impl DefaultProcessor {
pub fn new(connection_pool: ArcDbPool, per_table_chunk_sizes: AHashMap<String, usize>) -> Self {
pub fn new(
connection_pool: ArcDbPool,
per_table_chunk_sizes: AHashMap<String, usize>,
deprecated_tables: TableFlags,
) -> Self {
Self {
connection_pool,
per_table_chunk_sizes,
deprecated_tables,
}
}
}
Expand Down Expand Up @@ -82,6 +89,7 @@ async fn insert_to_db(
txns,
get_config_table_chunk_size::<TransactionModel>("transactions", per_table_chunk_sizes),
);

let bmt_res = execute_in_chunks(
conn.clone(),
insert_block_metadata_transactions_query,
Expand All @@ -91,6 +99,7 @@ async fn insert_to_db(
per_table_chunk_sizes,
),
);

let wst_res = execute_in_chunks(
conn.clone(),
insert_write_set_changes_query,
Expand All @@ -100,6 +109,7 @@ async fn insert_to_db(
per_table_chunk_sizes,
),
);

let mm_res = execute_in_chunks(
conn.clone(),
insert_move_modules_query,
Expand Down Expand Up @@ -313,15 +323,15 @@ impl ProcessorTrait for DefaultProcessor {
) -> anyhow::Result<ProcessingResult> {
let processing_start = std::time::Instant::now();
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();
let flags = self.deprecated_tables;
let (
txns,
block_metadata_transactions,
write_set_changes,
(move_modules, move_resources, table_items, current_table_items, table_metadata),
) = tokio::task::spawn_blocking(move || process_transactions(transactions))
) = tokio::task::spawn_blocking(move || process_transactions(transactions, flags))
.await
.expect("Failed to spawn_blocking for TransactionModel::from_transactions");

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();

Expand Down Expand Up @@ -373,6 +383,7 @@ impl ProcessorTrait for DefaultProcessor {

fn process_transactions(
transactions: Vec<Transaction>,
flags: TableFlags,
) -> (
Vec<crate::db::common::models::default_models::transactions::Transaction>,
Vec<BlockMetadataTransaction>,
Expand All @@ -385,7 +396,7 @@ fn process_transactions(
Vec<TableMetadata>,
),
) {
let (txns, block_metadata_txns, write_set_changes, wsc_details) =
let (mut txns, block_metadata_txns, mut write_set_changes, wsc_details) =
TransactionModel::from_transactions(&transactions);
let mut block_metadata_transactions = vec![];
for block_metadata_txn in block_metadata_txns {
Expand Down Expand Up @@ -426,6 +437,16 @@ fn process_transactions(
.sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash)));
table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle));

if flags.contains(TableFlags::MOVE_RESOURCES) {
move_resources.clear();
}
if flags.contains(TableFlags::TRANSACTIONS) {
txns.clear();
}
if flags.contains(TableFlags::WRITE_SET_CHANGES) {
write_set_changes.clear();
}

(
txns,
block_metadata_transactions,
Expand Down
Loading

0 comments on commit a79298a

Please sign in to comment.