Skip to content

Commit

Permalink
Migrate parquet account txn processor to sdk (#626)
Browse files Browse the repository at this point in the history
### Description
migrating parquet account txn processor to sdk

### Test Plan
mroe rows in parquet is expected
![Screenshot 2024-12-11 at 4 19 37 PM](https://github.com/user-attachments/assets/b592f2bc-4845-4beb-bec8-d0f0bb881226)
![Screenshot 2024-12-11 at 4 14 03 PM](https://github.com/user-attachments/assets/d83c29d2-6fdb-4fc6-b004-71e5e7094fd9)
  • Loading branch information
yuunlimm authored Dec 12, 2024
1 parent 69a0ba1 commit 789a402
Show file tree
Hide file tree
Showing 16 changed files with 477 additions and 96 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod raw_account_transactions;
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::{
db::postgres::models::{
object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
},
utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address},
};
use ahash::AHashSet;
use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Change, Transaction};
use serde::{Deserialize, Serialize};

pub type AccountTransactionPK = (String, i64);

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RawAccountTransaction {
pub transaction_version: i64,
pub account_address: String,
pub block_timestamp: chrono::NaiveDateTime,
}

impl RawAccountTransaction {
/// This table will record every transaction that touch an account which could be
/// a user account, an object, or a resource account.
/// We will consider all transactions that modify a resource or event associated with a particular account.
/// We will do 1 level of redirection for now (e.g. if it's an object, we will record the owner as account address).
/// We will also consider transactions that the account signed or is part of a multi sig / multi agent.
/// TODO: recursively find the parent account of an object
/// TODO: include table items in the detection path
pub fn get_accounts(transaction: &Transaction) -> AHashSet<String> {
let txn_version = transaction.version as i64;
let txn_data = match transaction.txn_data.as_ref() {
Some(data) => data,
None => {
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&["AccountTransaction"])
.inc();
tracing::warn!(
transaction_version = transaction.version,
"Transaction data doesn't exist",
);
return AHashSet::new();
},
};
let transaction_info = transaction.info.as_ref().unwrap_or_else(|| {
panic!("Transaction info doesn't exist for version {}", txn_version)
});
let wscs = &transaction_info.changes;
let (events, signatures) = match txn_data {
TxnData::User(inner) => (
&inner.events,
UserTransaction::get_signatures(
inner.request.as_ref().unwrap_or_else(|| {
panic!("User request doesn't exist for version {}", txn_version)
}),
txn_version,
transaction.block_height as i64,
),
),
TxnData::Genesis(inner) => (&inner.events, vec![]),
TxnData::BlockMetadata(inner) => (&inner.events, vec![]),
TxnData::Validator(inner) => (&inner.events, vec![]),
_ => {
return AHashSet::new();
},
};
let mut accounts = AHashSet::new();
for sig in signatures {
accounts.insert(sig.signer);
}
for event in events {
// Record event account address. We don't really have to worry about objects here
// because it'll be taken care of in the resource section.
accounts.insert(standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
));
}
for wsc in wscs {
match wsc.change.as_ref().unwrap() {
Change::DeleteResource(res) => {
// Record resource account.
// TODO: If the resource is an object, then we need to look for the latest
// owner. This isn't really possible right now given we have parallel threads
// so it'll be very difficult to ensure that we have the correct latest owner.
accounts.insert(standardize_address(res.address.as_str()));
},
Change::WriteResource(res) => {
// Record resource account. If the resource is an object, then we record the
// owner as well.
// This handles partial deletes as well.
accounts.insert(standardize_address(res.address.as_str()));
if let Some(inner) = &ObjectWithMetadata::from_write_resource(res).unwrap() {
accounts.insert(inner.object_core.get_owner_address());
}
},
_ => {},
}
}
accounts
}
}
1 change: 1 addition & 0 deletions rust/processor/src/db/common/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod account_transaction_models;
pub mod ans_models;
pub mod default_models;
pub mod event_models;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub mod parquet_account_transactions;
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable};
use allocative_derive::Allocative;
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

pub type AccountTransactionPK = (String, i64);

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct AccountTransaction {
pub txn_version: i64,
pub account_address: String,
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}

impl NamedTable for AccountTransaction {
const TABLE_NAME: &'static str = "fungible_asset_activities";
}

impl HasVersion for AccountTransaction {
fn version(&self) -> i64 {
self.txn_version
}
}

impl GetTimeStamp for AccountTransaction {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}
1 change: 1 addition & 0 deletions rust/processor/src/db/parquet/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod account_transaction_models;
pub mod ans_models;
pub mod default_models;
pub mod event_models;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,7 @@
// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::{
db::postgres::models::{
object_models::v2_object_utils::ObjectWithMetadata, resources::FromWriteResource,
user_transactions_models::user_transactions::UserTransaction,
},
schema::account_transactions,
utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address},
};
use ahash::AHashSet;
use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Change, Transaction};
use crate::schema::account_transactions;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

Expand All @@ -27,84 +17,3 @@ pub struct AccountTransaction {
pub transaction_version: i64,
pub account_address: String,
}

impl AccountTransaction {
/// This table will record every transaction that touch an account which could be
/// a user account, an object, or a resource account.
/// We will consider all transactions that modify a resource or event associated with a particular account.
/// We will do 1 level of redirection for now (e.g. if it's an object, we will record the owner as account address).
/// We will also consider transactions that the account signed or is part of a multi sig / multi agent.
/// TODO: recursively find the parent account of an object
/// TODO: include table items in the detection path
pub fn get_accounts(transaction: &Transaction) -> AHashSet<String> {
let txn_version = transaction.version as i64;
let txn_data = match transaction.txn_data.as_ref() {
Some(data) => data,
None => {
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&["AccountTransaction"])
.inc();
tracing::warn!(
transaction_version = transaction.version,
"Transaction data doesn't exist",
);
return AHashSet::new();
},
};
let transaction_info = transaction.info.as_ref().unwrap_or_else(|| {
panic!("Transaction info doesn't exist for version {}", txn_version)
});
let wscs = &transaction_info.changes;
let (events, signatures) = match txn_data {
TxnData::User(inner) => (
&inner.events,
UserTransaction::get_signatures(
inner.request.as_ref().unwrap_or_else(|| {
panic!("User request doesn't exist for version {}", txn_version)
}),
txn_version,
transaction.block_height as i64,
),
),
TxnData::Genesis(inner) => (&inner.events, vec![]),
TxnData::BlockMetadata(inner) => (&inner.events, vec![]),
TxnData::Validator(inner) => (&inner.events, vec![]),
_ => {
return AHashSet::new();
},
};
let mut accounts = AHashSet::new();
for sig in signatures {
accounts.insert(sig.signer);
}
for event in events {
// Record event account address. We don't really have to worry about objects here
// because it'll be taken care of in the resource section.
accounts.insert(standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
));
}
for wsc in wscs {
match wsc.change.as_ref().unwrap() {
Change::DeleteResource(res) => {
// Record resource account.
// TODO: If the resource is an object, then we need to look for the latest
// owner. This isn't really possible right now given we have parallel threads
// so it'll be very difficult to ensure that we have the correct latest owner.
accounts.insert(standardize_address(res.address.as_str()));
},
Change::WriteResource(res) => {
// Record resource account. If the resource is an object, then we record the
// owner as well.
// This handles partial deletes as well.
accounts.insert(standardize_address(res.address.as_str()));
if let Some(inner) = &ObjectWithMetadata::from_write_resource(res).unwrap() {
accounts.insert(inner.object_core.get_owner_address());
}
},
_ => {},
}
}
accounts
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait};
use crate::{
db::postgres::models::account_transaction_models::account_transactions::AccountTransaction,
db::{
common::models::account_transaction_models::raw_account_transactions::RawAccountTransaction,
postgres::models::account_transaction_models::account_transactions::AccountTransaction,
},
gap_detectors::ProcessingResult,
schema,
utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool},
Expand Down Expand Up @@ -106,7 +109,7 @@ impl ProcessorTrait for AccountTransactionsProcessor {
.into_par_iter()
.map(|txn| {
let transaction_version = txn.version as i64;
let accounts = AccountTransaction::get_accounts(&txn);
let accounts = RawAccountTransaction::get_accounts(&txn);
accounts
.into_iter()
.map(|account_address| AccountTransaction {
Expand Down
6 changes: 6 additions & 0 deletions rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use super::{db_config::DbConfig, processor_config::ProcessorConfig};
use crate::{
parquet_processors::{
parquet_account_transactions_processor::ParquetAccountTransactionsProcessor,
parquet_default_processor::ParquetDefaultProcessor,
parquet_events_processor::ParquetEventsProcessor,
parquet_fungible_asset_processor::ParquetFungibleAssetProcessor,
Expand Down Expand Up @@ -106,6 +107,11 @@ impl RunnableConfig for IndexerProcessorConfig {
ParquetTransactionMetadataProcessor::new(self.clone()).await?;
parquet_transaction_metadata_processor.run_processor().await
},
ProcessorConfig::ParquetAccountTransactionsProcessor(_) => {
let parquet_account_transactions_processor =
ParquetAccountTransactionsProcessor::new(self.clone()).await?;
parquet_account_transactions_processor.run_processor().await
},
}
}

Expand Down
6 changes: 6 additions & 0 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ahash::AHashMap;
use processor::{
bq_analytics::generic_parquet_processor::NamedTable,
db::parquet::models::{
account_transaction_models::parquet_account_transactions::AccountTransaction,
default_models::{
parquet_block_metadata_transactions::BlockMetadataTransaction,
parquet_move_modules::MoveModule,
Expand Down Expand Up @@ -83,6 +84,7 @@ pub enum ProcessorConfig {
ParquetUserTransactionsProcessor(ParquetDefaultProcessorConfig),
ParquetFungibleAssetProcessor(ParquetDefaultProcessorConfig),
ParquetTransactionMetadataProcessor(ParquetDefaultProcessorConfig),
ParquetAccountTransactionsProcessor(ParquetDefaultProcessorConfig),
}

impl ProcessorConfig {
Expand All @@ -102,6 +104,7 @@ impl ProcessorConfig {
| ProcessorConfig::ParquetEventsProcessor(config)
| ProcessorConfig::ParquetUserTransactionsProcessor(config)
| ProcessorConfig::ParquetTransactionMetadataProcessor(config)
| ProcessorConfig::ParquetAccountTransactionsProcessor(config)
| ProcessorConfig::ParquetFungibleAssetProcessor(config) => {
// Get the processor name as a prefix
let processor_name = self.name();
Expand Down Expand Up @@ -157,6 +160,9 @@ impl ProcessorConfig {
ProcessorName::ParquetTransactionMetadataProcessor => {
HashSet::from([WriteSetSize::TABLE_NAME.to_string()])
},
ProcessorName::ParquetAccountTransactionsProcessor => {
HashSet::from([AccountTransaction::TABLE_NAME.to_string()])
},
_ => HashSet::new(), // Default case for unsupported processors
}
}
Expand Down
Loading

0 comments on commit 789a402

Please sign in to comment.