Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate FA processor to SDK #543

Merged
merged 4 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ testing-transactions = { path = "testing-transactions" }

ahash = { version = "0.8.7", features = ["serde"] }
anyhow = "1.0.86"
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "9ecd252ccff53023664562001dd04c2886488c0d" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "9ecd252ccff53023664562001dd04c2886488c0d" }
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "95d76e07dd66a20a0e50a9e6c559885bff7ab52b" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "95d76e07dd66a20a0e50a9e6c559885bff7ab52b" }
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" }
aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" }
aptos-indexer-test-transactions = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "7246f0536d599789d7dd5e9fb776554abbe11eac" }
Expand Down
16 changes: 8 additions & 8 deletions rust/processor/src/processors/fungible_asset_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async fn insert_to_db(
Ok(())
}

fn insert_fungible_asset_activities_query(
pub fn insert_fungible_asset_activities_query(
items_to_insert: Vec<FungibleAssetActivity>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -186,7 +186,7 @@ fn insert_fungible_asset_activities_query(
)
}

fn insert_fungible_asset_metadata_query(
pub fn insert_fungible_asset_metadata_query(
items_to_insert: Vec<FungibleAssetMetadataModel>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -222,7 +222,7 @@ fn insert_fungible_asset_metadata_query(
)
}

fn insert_fungible_asset_balances_query(
pub fn insert_fungible_asset_balances_query(
items_to_insert: Vec<FungibleAssetBalance>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -239,7 +239,7 @@ fn insert_fungible_asset_balances_query(
)
}

fn insert_current_fungible_asset_balances_query(
pub fn insert_current_fungible_asset_balances_query(
items_to_insert: Vec<CurrentFungibleAssetBalance>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -269,7 +269,7 @@ fn insert_current_fungible_asset_balances_query(
)
}

fn insert_current_unified_fungible_asset_balances_v1_query(
pub fn insert_current_unified_fungible_asset_balances_v1_query(
items_to_insert: Vec<CurrentUnifiedFungibleAssetBalance>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -298,7 +298,7 @@ fn insert_current_unified_fungible_asset_balances_v1_query(
)
}

fn insert_current_unified_fungible_asset_balances_v2_query(
pub fn insert_current_unified_fungible_asset_balances_v2_query(
items_to_insert: Vec<CurrentUnifiedFungibleAssetBalance>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -328,7 +328,7 @@ fn insert_current_unified_fungible_asset_balances_v2_query(
)
}

fn insert_coin_supply_query(
pub fn insert_coin_supply_query(
items_to_insert: Vec<CoinSupply>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -451,7 +451,7 @@ impl ProcessorTrait for FungibleAssetProcessor {
}

/// V2 coin is called fungible assets and this flow includes all data from V1 in coin_processor
async fn parse_v2_coin(
pub async fn parse_v2_coin(
transactions: &[Transaction],
) -> (
Vec<FungibleAssetActivity>,
Expand Down
12 changes: 12 additions & 0 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ bitflags! {
}
}

impl TableFlags {
pub fn from_set(set: &HashSet<String>) -> Self {
let mut flags = TableFlags::empty();
for table in set {
if let Some(flag) = TableFlags::from_name(table) {
flags |= flag;
}
}
flags
}
}

pub struct Worker {
pub db_pool: ArcDbPool,
pub processor_config: ProcessorConfig,
Expand Down
8 changes: 7 additions & 1 deletion rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

use super::{db_config::DbConfig, processor_config::ProcessorConfig};
use crate::processors::events_processor::EventsProcessor;
use crate::processors::{
events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor,
};
use anyhow::Result;
use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig;
use aptos_indexer_processor_sdk_server_framework::RunnableConfig;
Expand All @@ -24,6 +26,10 @@ impl RunnableConfig for IndexerProcessorConfig {
let events_processor = EventsProcessor::new(self.clone()).await?;
events_processor.run_processor().await
},
ProcessorConfig::FungibleAssetProcessor(_) => {
let fungible_asset_processor = FungibleAssetProcessor::new(self.clone()).await?;
fungible_asset_processor.run_processor().await
},
}
}

Expand Down
48 changes: 20 additions & 28 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::processors::events_processor::EventsProcessorConfig;
use ahash::AHashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;

/// This enum captures the configs for all the different processors that are defined.
///
Expand Down Expand Up @@ -34,7 +35,8 @@ use serde::{Deserialize, Serialize};
strum(serialize_all = "snake_case")
)]
pub enum ProcessorConfig {
EventsProcessor(EventsProcessorConfig),
EventsProcessor(DefaultProcessorConfig),
FungibleAssetProcessor(DefaultProcessorConfig),
}

impl ProcessorConfig {
Expand All @@ -44,33 +46,23 @@ impl ProcessorConfig {
self.into()
}
}
#[derive(Debug)]
// To ensure that the variants of ProcessorConfig and Processor line up, in the testing
// build path we derive EnumDiscriminants on this enum as well and make sure the two
// sets of variants match up in `test_processor_names_complete`.
#[cfg_attr(
test,
derive(strum::EnumDiscriminants),
strum_discriminants(
derive(strum::EnumVariantNames),
name(ProcessorDiscriminants),
strum(serialize_all = "snake_case")
)
)]
pub enum Processor {
EventsProcessor,
}

#[cfg(test)]
mod test {
use super::*;
use strum::VariantNames;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct DefaultProcessorConfig {
// Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2)
#[serde(default = "AHashMap::new")]
pub per_table_chunk_sizes: AHashMap<String, usize>,
// Size of channel between steps
#[serde(default = "DefaultProcessorConfig::default_channel_size")]
pub channel_size: usize,
// String vector for deprecated tables to skip db writes
#[serde(default)]
pub deprecated_tables: HashSet<String>,
}

/// This test exists to make sure that when a new processor is added, it is added
/// to both Processor and ProcessorConfig. To make sure this passes, make sure the
/// variants are in the same order (lexicographical) and the names match.
#[test]
fn test_processor_names_complete() {
assert_eq!(ProcessorName::VARIANTS, ProcessorDiscriminants::VARIANTS);
impl DefaultProcessorConfig {
pub const fn default_channel_size() -> usize {
10
}
}
45 changes: 15 additions & 30 deletions rust/sdk-processor/src/processors/events_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,15 @@ use crate::{
starting_version::get_starting_version,
},
};
use ahash::AHashMap;
use anyhow::Result;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::TransactionStreamStep,
traits::IntoRunnableStep,
};
use serde::{Deserialize, Serialize};
use tracing::{debug, info};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct EventsProcessorConfig {
// Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2)
#[serde(default = "AHashMap::new")]
pub per_table_chunk_sizes: AHashMap<String, usize>,
// Size of channel between steps
#[serde(default = "EventsProcessorConfig::default_channel_size")]
pub channel_size: usize,
}

impl EventsProcessorConfig {
pub const fn default_channel_size() -> usize {
10
}
}

pub struct EventsProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool,
Expand Down Expand Up @@ -73,7 +54,7 @@ impl EventsProcessor {
pub async fn run_processor(self) -> Result<()> {
let processor_name = self.config.processor_config.name();

// (Optional) Run migrations
// Run migrations
match self.config.db_config {
DbConfig::PostgresConfig(ref postgres_config) => {
run_migrations(
Expand All @@ -84,19 +65,26 @@ impl EventsProcessor {
},
}

// (Optional) Merge the starting version from config and the latest processed version from the DB
// Merge the starting version from config and the latest processed version from the DB
let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?;

// (Optional) Check and update the ledger chain id to ensure we're indexing the correct chain
// Check and update the ledger chain id to ensure we're indexing the correct chain
let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone())
.await?
.get_chain_id()
.await?;
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;

let ProcessorConfig::EventsProcessor(events_processor_config) =
self.config.processor_config;
let channel_size = events_processor_config.channel_size;
let processor_config = match self.config.processor_config {
ProcessorConfig::EventsProcessor(processor_config) => processor_config,
_ => {
return Err(anyhow::anyhow!(
"Invalid processor config for EventsProcessor: {:?}",
self.config.processor_config
))
},
};
let channel_size = processor_config.channel_size;

// Define processor steps
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
Expand All @@ -105,7 +93,7 @@ impl EventsProcessor {
})
.await?;
let events_extractor = EventsExtractor {};
let events_storer = EventsStorer::new(self.db_pool.clone(), events_processor_config);
let events_storer = EventsStorer::new(self.db_pool.clone(), processor_config);
let version_tracker = LatestVersionProcessedTracker::new(
self.db_pool.clone(),
starting_version,
Expand All @@ -125,12 +113,9 @@ impl EventsProcessor {
loop {
match buffer_receiver.recv().await {
Ok(txn_context) => {
if txn_context.data.is_empty() {
continue;
}
debug!(
"Finished processing events from versions [{:?}, {:?}]",
txn_context.start_version, txn_context.end_version,
txn_context.metadata.start_version, txn_context.metadata.end_version,
);
},
Err(e) => {
Expand Down
Loading
Loading