Skip to content

Commit

Permalink
add dedup step
Browse files Browse the repository at this point in the history
  • Loading branch information
rtso committed Sep 30, 2024
1 parent 55c3294 commit 8060957
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 10 deletions.
14 changes: 7 additions & 7 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ testing-transactions = { path = "testing-transactions" }

ahash = { version = "0.8.7", features = ["serde"] }
anyhow = "1.0.86"
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" }
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "028e5d3df748fbb846f2058159bd1f8304c96aba" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "028e5d3df748fbb846f2058159bd1f8304c96aba" }
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" }
aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" }
aptos-indexer-test-transactions = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use bigdecimal::{BigDecimal, Zero};
use field_count::FieldCount;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::borrow::Borrow;
use std::{
borrow::Borrow,
hash::{Hash, Hasher},
};

// Storage id
pub type CurrentFungibleAssetBalancePK = String;
Expand Down Expand Up @@ -154,6 +157,12 @@ pub struct CurrentFungibleAssetBalance {
pub token_standard: String,
}

// impl Hash for CurrentFungibleAssetBalance {
// fn hash<H: Hasher>(&self, state: &mut H) {
// self.storage_id.hash(state);
// }
// }

/// Note that this used to be called current_unified_fungible_asset_balances_to_be_renamed
/// and was renamed to current_fungible_asset_balances to facilitate migration
#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize, Default)]
Expand Down
4 changes: 4 additions & 0 deletions rust/sdk-processor/src/processors/fungible_asset_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
common::latest_processed_version_tracker::LatestVersionProcessedTracker,
events_processor::{EventsExtractor, EventsStorer},
fungible_asset_processor::{
fungible_asset_deduper::FungibleAssetDeduper,
fungible_asset_extractor::FungibleAssetExtractor,
fungible_asset_storer::FungibleAssetStorer,
},
Expand All @@ -26,6 +27,7 @@ use aptos_indexer_processor_sdk::{
traits::IntoRunnableStep,
};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{debug, info};

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -111,6 +113,7 @@ impl FungibleAssetProcessor {
})
.await?;
let fa_extractor = FungibleAssetExtractor {};
let fa_deduper = FungibleAssetDeduper::new(Duration::from_secs(1));
let fa_storer = FungibleAssetStorer::new(self.db_pool.clone(), fa_config);
let version_tracker = LatestVersionProcessedTracker::new(
self.db_pool.clone(),
Expand All @@ -123,6 +126,7 @@ impl FungibleAssetProcessor {
transaction_stream.into_runnable_step(),
)
.connect_to(fa_extractor.into_runnable_step(), channel_size)
.connect_to(fa_deduper.into_runnable_step(), channel_size)
.connect_to(fa_storer.into_runnable_step(), channel_size)
.connect_to(version_tracker.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use ahash::AHashMap;
use aptos_indexer_processor_sdk::{
aptos_protos::transaction::v1::Transaction,
traits::{
async_step::AsyncRunType, AsyncStep, NamedStep, PollableAsyncRunType, PollableAsyncStep,
Processable,
},
types::transaction_context::{Context, TransactionContext},
utils::errors::ProcessorError,
};
use async_trait::async_trait;
use diesel::associations::Identifiable;
use processor::db::common::models::{
coin_models::coin_supply::CoinSupply,
fungible_asset_models::{
v2_fungible_asset_activities::FungibleAssetActivity,
v2_fungible_asset_balances::{
CurrentFungibleAssetBalance, CurrentFungibleAssetBalancePK,
CurrentUnifiedFungibleAssetBalance, FungibleAssetBalance,
},
v2_fungible_metadata::FungibleAssetMetadataModel,
},
};
use std::{collections::BTreeSet, time::Duration};

pub struct FungibleAssetDeduper
where
Self: Sized + Send + 'static,
{
// The duration to collect and dedup data before releasing it
poll_interval: Duration,
merged_cfab: AHashMap<CurrentFungibleAssetBalancePK, CurrentFungibleAssetBalance>,
contexts: BTreeSet<Context>,
}

impl FungibleAssetDeduper
where
Self: Sized + Send + 'static,
{
pub fn new(poll_interval: Duration) -> Self {
Self {
poll_interval,
merged_cfab: AHashMap::new(),
contexts: BTreeSet::new(),
}
}
}

#[async_trait]
impl Processable for FungibleAssetDeduper {
type Input = (
Vec<FungibleAssetActivity>,
Vec<FungibleAssetMetadataModel>,
Vec<FungibleAssetBalance>,
Vec<CurrentFungibleAssetBalance>,
Vec<CurrentUnifiedFungibleAssetBalance>,
Vec<CoinSupply>,
);
type Output = (
Vec<FungibleAssetActivity>,
Vec<FungibleAssetMetadataModel>,
Vec<FungibleAssetBalance>,
Vec<CurrentFungibleAssetBalance>,
Vec<CurrentUnifiedFungibleAssetBalance>,
Vec<CoinSupply>,
);
type RunType = PollableAsyncRunType;

async fn process(
&mut self,
items: TransactionContext<Self::Input>,
) -> Result<Option<TransactionContext<Self::Output>>, ProcessorError> {
let (_, _, _, cfab, _, _) = &items.data[0];

// Update transaction contexts
for context in items.context.iter() {
self.contexts.insert(context.clone());
}

// Dedup
for balance in cfab.iter() {
self.merged_cfab
.insert(balance.id().to_string(), balance.clone());
}

Ok(None)
}
}

#[async_trait]
impl PollableAsyncStep for FungibleAssetDeduper {
fn poll_interval(&self) -> Duration {
self.poll_interval
}

async fn poll(
&mut self,
) -> Result<Option<Vec<TransactionContext<Self::Output>>>, ProcessorError> {
let transaction_context = TransactionContext::new_with_contexts(
vec![(
vec![],
vec![],
vec![],
std::mem::take(&mut self.merged_cfab)
.values()
.cloned()
.collect::<Vec<_>>(),
vec![],
vec![],
)],
self.contexts.clone(),
);
Ok(Some(vec![transaction_context]))
}
}

impl NamedStep for FungibleAssetDeduper {
fn name(&self) -> String {
"FungibleAssetDeduper".to_string()
}
}

0 comments on commit 8060957

Please sign in to comment.