From 8060957bd8b0b81ab8bcabec45ac6c45625e35e6 Mon Sep 17 00:00:00 2001 From: rtso <8248583+rtso@users.noreply.github.com> Date: Mon, 30 Sep 2024 19:51:58 -0400 Subject: [PATCH] add dedup step --- rust/Cargo.lock | 14 +- rust/Cargo.toml | 4 +- .../v2_fungible_asset_balances.rs | 11 +- .../processors/fungible_asset_processor.rs | 4 + .../fungible_asset_deduper.rs | 121 ++++++++++++++++++ 5 files changed, 144 insertions(+), 10 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 49f7a9a8c..9ccee2ff9 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -147,7 +147,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "aptos-indexer-processor-sdk" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=4923bbb8bf82298bc85dc3d16971fef2f22fc0b0#4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba#028e5d3df748fbb846f2058159bd1f8304c96aba" dependencies = [ "anyhow", "aptos-indexer-transaction-stream", @@ -179,7 +179,7 @@ dependencies = [ [[package]] name = "aptos-indexer-processor-sdk-server-framework" version = "1.0.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=4923bbb8bf82298bc85dc3d16971fef2f22fc0b0#4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba#028e5d3df748fbb846f2058159bd1f8304c96aba" dependencies = [ "anyhow", "aptos-indexer-processor-sdk", @@ -212,10 +212,10 @@ dependencies = [ [[package]] name = "aptos-indexer-transaction-stream" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=4923bbb8bf82298bc85dc3d16971fef2f22fc0b0#4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba#028e5d3df748fbb846f2058159bd1f8304c96aba" dependencies = [ "anyhow", - "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=4923bbb8bf82298bc85dc3d16971fef2f22fc0b0)", + "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba)", "aptos-protos 1.3.1 (git+https://github.com/aptos-labs/aptos-core.git?rev=5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb)", "chrono", "futures-util", @@ -240,7 +240,7 @@ dependencies = [ [[package]] name = "aptos-moving-average" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=4923bbb8bf82298bc85dc3d16971fef2f22fc0b0#4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba#028e5d3df748fbb846f2058159bd1f8304c96aba" dependencies = [ "chrono", ] @@ -2203,7 +2203,7 @@ dependencies = [ [[package]] name = "instrumented-channel" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=4923bbb8bf82298bc85dc3d16971fef2f22fc0b0#4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba#028e5d3df748fbb846f2058159bd1f8304c96aba" dependencies = [ "delegate", "derive_builder", @@ -4066,7 +4066,7 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "sample" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=4923bbb8bf82298bc85dc3d16971fef2f22fc0b0#4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=028e5d3df748fbb846f2058159bd1f8304c96aba#028e5d3df748fbb846f2058159bd1f8304c96aba" dependencies = [ "tracing", ] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 7e3666ad0..4f497b010 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -30,8 +30,8 @@ testing-transactions = { path = "testing-transactions" } ahash = { version = "0.8.7", features = ["serde"] } anyhow = "1.0.86" -aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" } -aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" } +aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "028e5d3df748fbb846f2058159bd1f8304c96aba" } +aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "028e5d3df748fbb846f2058159bd1f8304c96aba" } aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" } aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" } aptos-indexer-test-transactions = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" } diff --git a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs index 41f9ffb18..3fef4fbcc 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs @@ -29,7 +29,10 @@ use bigdecimal::{BigDecimal, Zero}; use field_count::FieldCount; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; -use std::borrow::Borrow; +use std::{ + borrow::Borrow, + hash::{Hash, Hasher}, +}; // Storage id pub type CurrentFungibleAssetBalancePK = String; @@ -154,6 +157,12 @@ pub struct CurrentFungibleAssetBalance { pub token_standard: String, } +// impl Hash for CurrentFungibleAssetBalance { +// fn hash(&self, state: &mut H) { +// self.storage_id.hash(state); +// } +// } + /// Note that this used to be called current_unified_fungible_asset_balances_to_be_renamed /// and was renamed to current_fungible_asset_balances to facilitate migration #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize, Default)] diff --git a/rust/sdk-processor/src/processors/fungible_asset_processor.rs b/rust/sdk-processor/src/processors/fungible_asset_processor.rs index f2edaabaf..c484d8fa4 100644 --- a/rust/sdk-processor/src/processors/fungible_asset_processor.rs +++ b/rust/sdk-processor/src/processors/fungible_asset_processor.rs @@ -7,6 +7,7 @@ use crate::{ common::latest_processed_version_tracker::LatestVersionProcessedTracker, events_processor::{EventsExtractor, EventsStorer}, fungible_asset_processor::{ + fungible_asset_deduper::FungibleAssetDeduper, fungible_asset_extractor::FungibleAssetExtractor, fungible_asset_storer::FungibleAssetStorer, }, @@ -26,6 +27,7 @@ use aptos_indexer_processor_sdk::{ traits::IntoRunnableStep, }; use serde::{Deserialize, Serialize}; +use std::time::Duration; use tracing::{debug, info}; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -111,6 +113,7 @@ impl FungibleAssetProcessor { }) .await?; let fa_extractor = FungibleAssetExtractor {}; + let fa_deduper = FungibleAssetDeduper::new(Duration::from_secs(1)); let fa_storer = FungibleAssetStorer::new(self.db_pool.clone(), fa_config); let version_tracker = LatestVersionProcessedTracker::new( self.db_pool.clone(), @@ -123,6 +126,7 @@ impl FungibleAssetProcessor { transaction_stream.into_runnable_step(), ) .connect_to(fa_extractor.into_runnable_step(), channel_size) + .connect_to(fa_deduper.into_runnable_step(), channel_size) .connect_to(fa_storer.into_runnable_step(), channel_size) .connect_to(version_tracker.into_runnable_step(), channel_size) .end_and_return_output_receiver(channel_size); diff --git a/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_deduper.rs b/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_deduper.rs index e69de29bb..8b9bed061 100644 --- a/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_deduper.rs +++ b/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_deduper.rs @@ -0,0 +1,121 @@ +use ahash::AHashMap; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{ + async_step::AsyncRunType, AsyncStep, NamedStep, PollableAsyncRunType, PollableAsyncStep, + Processable, + }, + types::transaction_context::{Context, TransactionContext}, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use diesel::associations::Identifiable; +use processor::db::common::models::{ + coin_models::coin_supply::CoinSupply, + fungible_asset_models::{ + v2_fungible_asset_activities::FungibleAssetActivity, + v2_fungible_asset_balances::{ + CurrentFungibleAssetBalance, CurrentFungibleAssetBalancePK, + CurrentUnifiedFungibleAssetBalance, FungibleAssetBalance, + }, + v2_fungible_metadata::FungibleAssetMetadataModel, + }, +}; +use std::{collections::BTreeSet, time::Duration}; + +pub struct FungibleAssetDeduper +where + Self: Sized + Send + 'static, +{ + // The duration to collect and dedup data before releasing it + poll_interval: Duration, + merged_cfab: AHashMap, + contexts: BTreeSet, +} + +impl FungibleAssetDeduper +where + Self: Sized + Send + 'static, +{ + pub fn new(poll_interval: Duration) -> Self { + Self { + poll_interval, + merged_cfab: AHashMap::new(), + contexts: BTreeSet::new(), + } + } +} + +#[async_trait] +impl Processable for FungibleAssetDeduper { + type Input = ( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + ); + type Output = ( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + ); + type RunType = PollableAsyncRunType; + + async fn process( + &mut self, + items: TransactionContext, + ) -> Result>, ProcessorError> { + let (_, _, _, cfab, _, _) = &items.data[0]; + + // Update transaction contexts + for context in items.context.iter() { + self.contexts.insert(context.clone()); + } + + // Dedup + for balance in cfab.iter() { + self.merged_cfab + .insert(balance.id().to_string(), balance.clone()); + } + + Ok(None) + } +} + +#[async_trait] +impl PollableAsyncStep for FungibleAssetDeduper { + fn poll_interval(&self) -> Duration { + self.poll_interval + } + + async fn poll( + &mut self, + ) -> Result>>, ProcessorError> { + let transaction_context = TransactionContext::new_with_contexts( + vec![( + vec![], + vec![], + vec![], + std::mem::take(&mut self.merged_cfab) + .values() + .cloned() + .collect::>(), + vec![], + vec![], + )], + self.contexts.clone(), + ); + Ok(Some(vec![transaction_context])) + } +} + +impl NamedStep for FungibleAssetDeduper { + fn name(&self) -> String { + "FungibleAssetDeduper".to_string() + } +}