Skip to content

Commit

Permalink
Change the config structure.
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 committed Apr 2, 2024
1 parent ef28f51 commit 03fb799
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 42 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ prost-types = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
server-framework = { workspace = true }
sha2 = { workspace = true }
sha3 = { workspace = true }
Expand Down
160 changes: 139 additions & 21 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
use ahash::AHashMap;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use server_framework::RunnableConfig;
use server_framework::{GenericConfig, RunnableConfig};
use std::time::Duration;
use url::Url;

Expand All @@ -34,10 +34,10 @@ pub struct IndexerGrpcProcessorConfig {
// Size of the pool for writes/reads to the DB. Limits maximum number of queries in flight
pub db_pool_size: Option<u32>,
// Maximum number of batches "missing" before we assume we have an issue with gaps and abort
#[serde(default = "IndexerGrpcProcessorConfig::default_gap_detection_batch_size")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_gap_detection_batch_size")]
pub gap_detection_batch_size: u64,
// Number of protobuff transactions to send per chunk to the processor tasks
#[serde(default = "IndexerGrpcProcessorConfig::default_pb_channel_txn_chunk_size")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_pb_channel_txn_chunk_size")]
pub pb_channel_txn_chunk_size: usize,
// Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2)
#[serde(default = "AHashMap::new")]
Expand All @@ -47,7 +47,16 @@ pub struct IndexerGrpcProcessorConfig {
pub transaction_filter: TransactionFilter,
}

impl IndexerGrpcProcessorConfig {
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerGrpcProcessorConfigV2 {
pub common: CommonConfig,
pub db_connection: DbConnectionConfig,
pub indexer_grpc: IndexerGrpcConnectionConfig,
pub processors: Vec<ProcessorConfigV2>,
}

impl IndexerGrpcProcessorConfigV2 {
pub const fn default_gap_detection_batch_size() -> u64 {
DEFAULT_GAP_DETECTION_BATCH_SIZE
}
Expand All @@ -68,23 +77,30 @@ impl IndexerGrpcProcessorConfig {
}

#[async_trait::async_trait]
impl RunnableConfig for IndexerGrpcProcessorConfig {
impl RunnableConfig for IndexerGrpcProcessorConfigV2 {
async fn run(&self) -> Result<()> {
// TODO(grao): Implement the following things.
if self.processors.len() != 1 {
unimplemented!("Only support 1 processor now.");
}
if self.db_connection.db_connection_urls.len() != 1 {
unimplemented!("Only support 1 db connection URL now.");
}
let mut worker = Worker::new(
self.processor_config.clone(),
self.postgres_connection_string.clone(),
self.indexer_grpc_data_service_address.clone(),
self.grpc_http2_config.clone(),
self.auth_token.clone(),
self.starting_version,
self.ending_version,
self.number_concurrent_processing_tasks,
self.db_pool_size,
self.gap_detection_batch_size,
self.pb_channel_txn_chunk_size,
self.per_table_chunk_sizes.clone(),
self.enable_verbose_logging,
self.transaction_filter.clone(),
self.processors[0].processor.clone(),
self.db_connection.db_connection_urls[0].clone(),
self.indexer_grpc.data_service_address.clone(),
self.indexer_grpc.http2_config.clone(),
self.indexer_grpc.auth_token.clone(),
self.common.version_config.starting_version,
self.common.version_config.ending_version,
self.common.number_concurrent_processing_tasks,
self.db_connection.db_pool_size_per_url,
self.common.gap_detection_batch_size,
self.common.pb_channel_txn_chunk_size,
self.common.per_table_chunk_sizes.clone(),
self.common.enable_verbose_logging,
self.common.transaction_filter.clone(),
)
.await
.context("Failed to build worker")?;
Expand All @@ -94,8 +110,8 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {

fn get_server_name(&self) -> String {
// Get the part before the first _ and trim to 12 characters.
let before_underscore = self
.processor_config
let before_underscore = self.processors[0]
.processor
.name()
.split('_')
.next()
Expand All @@ -104,6 +120,67 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct CommonConfig {
#[serde(flatten)]
pub version_config: VersionConfig,
// Maximum number of batches "missing" before we assume we have an issue with gaps and abort
#[serde(default = "IndexerGrpcProcessorConfigV2::default_gap_detection_batch_size")]
pub gap_detection_batch_size: u64,
// Number of protobuf transactions to send per chunk to the processor tasks
#[serde(default = "IndexerGrpcProcessorConfigV2::default_pb_channel_txn_chunk_size")]
pub pb_channel_txn_chunk_size: usize,
// Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2)
// TODO(grao): Revisit this config, it's probably better to have it on each processor config.
#[serde(default = "AHashMap::new")]
pub per_table_chunk_sizes: AHashMap<String, usize>,
// Number of tasks waiting to pull transaction batches from the channel and process them
pub number_concurrent_processing_tasks: Option<usize>,
#[serde(default)]
pub transaction_filter: TransactionFilter,
pub enable_verbose_logging: Option<bool>,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ProcessorConfigV2 {
pub processor: ProcessorConfig,
pub overrides: ConfigOverrides,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct DbConnectionConfig {
pub db_connection_urls: Vec<String>,
// Size of the pool for writes/reads to the DB. Limits maximum number of queries in flight
pub db_pool_size_per_url: Option<u32>,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerGrpcConnectionConfig {
// TODO: Add TLS support.
pub data_service_address: Url,
#[serde(flatten)]
pub http2_config: IndexerGrpcHttp2Config,
pub auth_token: String,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigOverrides {
#[serde(flatten)]
pub version_overrides: VersionConfig,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct VersionConfig {
pub starting_version: Option<u64>,
pub ending_version: Option<u64>,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(default)]
Expand Down Expand Up @@ -134,3 +211,44 @@ impl Default for IndexerGrpcHttp2Config {
}
}
}

pub fn from_v1_config(
v1_config: GenericConfig<IndexerGrpcProcessorConfig>,
) -> GenericConfig<IndexerGrpcProcessorConfigV2> {
GenericConfig::<IndexerGrpcProcessorConfigV2> {
health_check_port: v1_config.health_check_port,
server_config: v1_config.server_config.into(),
}
}

impl From<IndexerGrpcProcessorConfig> for IndexerGrpcProcessorConfigV2 {
fn from(v1_config: IndexerGrpcProcessorConfig) -> Self {
IndexerGrpcProcessorConfigV2 {
common: CommonConfig {
version_config: VersionConfig {
starting_version: v1_config.starting_version,
ending_version: v1_config.ending_version,
},
gap_detection_batch_size: v1_config.gap_detection_batch_size,
pb_channel_txn_chunk_size: v1_config.pb_channel_txn_chunk_size,
per_table_chunk_sizes: v1_config.per_table_chunk_sizes,
number_concurrent_processing_tasks: v1_config.number_concurrent_processing_tasks,
enable_verbose_logging: v1_config.enable_verbose_logging,
transaction_filter: v1_config.transaction_filter,
},
db_connection: DbConnectionConfig {
db_connection_urls: vec![v1_config.postgres_connection_string],
db_pool_size_per_url: v1_config.db_pool_size,
},
indexer_grpc: IndexerGrpcConnectionConfig {
data_service_address: v1_config.indexer_grpc_data_service_address,
http2_config: v1_config.grpc_http2_config,
auth_token: v1_config.auth_token,
},
processors: vec![ProcessorConfigV2 {
processor: v1_config.processor_config,
overrides: Default::default(),
}],
}
}
}
2 changes: 1 addition & 1 deletion rust/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#[macro_use]
extern crate diesel;

pub use config::IndexerGrpcProcessorConfig;
pub use config::{from_v1_config, IndexerGrpcProcessorConfig, IndexerGrpcProcessorConfigV2};

mod config;
pub mod gap_detector;
Expand Down
24 changes: 21 additions & 3 deletions rust/processor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use anyhow::Result;
use clap::Parser;
use processor::IndexerGrpcProcessorConfig;
use processor::{from_v1_config, IndexerGrpcProcessorConfig, IndexerGrpcProcessorConfigV2};
use server_framework::ServerArgs;

const RUNTIME_WORKER_MULTIPLIER: usize = 2;
Expand All @@ -25,7 +25,25 @@ fn main() -> Result<()> {
.unwrap()
.block_on(async {
let args = ServerArgs::parse();
args.run::<IndexerGrpcProcessorConfig>(tokio::runtime::Handle::current())
.await
match args.load_generic_config::<IndexerGrpcProcessorConfigV2>() {
Ok(_) => {
args.run::<IndexerGrpcProcessorConfigV2>(tokio::runtime::Handle::current())
.await
},
Err(_) => {
let v1_config = args.load_generic_config::<IndexerGrpcProcessorConfig>()?;
let v2_config = from_v1_config(v1_config);
println!(
"You are using a deprecated config format, please migrate! The equivalent config in new format is: {}",
serde_yaml::to_string(&v2_config).unwrap()
);
args.pre_run();
server_framework::run_server_with_config(
v2_config,
tokio::runtime::Handle::current(),
)
.await
},
}
})
}
6 changes: 3 additions & 3 deletions rust/processor/src/processors/nft_metadata_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
database::{PgDbPool, PgPoolConnection},
util::{parse_timestamp, remove_null_bytes, standardize_address},
},
IndexerGrpcProcessorConfig,
IndexerGrpcProcessorConfigV2,
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::{write_set_change::Change, Transaction};
Expand All @@ -39,9 +39,9 @@ pub const CHUNK_SIZE: usize = 1000;
pub struct NftMetadataProcessorConfig {
pub pubsub_topic_name: String,
pub google_application_credentials: Option<String>,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retries")]
pub query_retries: u32,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retry_delay_ms")]
pub query_retry_delay_ms: u64,
}

Expand Down
6 changes: 3 additions & 3 deletions rust/processor/src/processors/objects_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool},
util::standardize_address,
},
IndexerGrpcProcessorConfig,
IndexerGrpcProcessorConfigV2,
};
use ahash::AHashMap;
use anyhow::bail;
Expand All @@ -30,9 +30,9 @@ use tracing::error;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ObjectsProcessorConfig {
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retries")]
pub query_retries: u32,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retry_delay_ms")]
pub query_retry_delay_ms: u64,
}
pub struct ObjectsProcessor {
Expand Down
6 changes: 3 additions & 3 deletions rust/processor/src/processors/stake_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool},
util::{parse_timestamp, standardize_address},
},
IndexerGrpcProcessorConfig,
IndexerGrpcProcessorConfigV2,
};
use ahash::AHashMap;
use anyhow::bail;
Expand All @@ -39,9 +39,9 @@ use tracing::error;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct StakeProcessorConfig {
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retries")]
pub query_retries: u32,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retry_delay_ms")]
pub query_retry_delay_ms: u64,
}

Expand Down
6 changes: 3 additions & 3 deletions rust/processor/src/processors/token_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
},
schema,
utils::database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool},
IndexerGrpcProcessorConfig,
IndexerGrpcProcessorConfigV2,
};
use ahash::AHashMap;
use anyhow::bail;
Expand All @@ -36,9 +36,9 @@ use tracing::error;
#[serde(deny_unknown_fields)]
pub struct TokenProcessorConfig {
pub nft_points_contract: Option<String>,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retries")]
pub query_retries: u32,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retry_delay_ms")]
pub query_retry_delay_ms: u64,
}

Expand Down
6 changes: 3 additions & 3 deletions rust/processor/src/processors/token_v2_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool, PgPoolConnection},
util::{get_entry_function_from_user_request, parse_timestamp, standardize_address},
},
IndexerGrpcProcessorConfig,
IndexerGrpcProcessorConfigV2,
};
use ahash::{AHashMap, AHashSet};
use anyhow::bail;
Expand All @@ -51,9 +51,9 @@ use tracing::error;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct TokenV2ProcessorConfig {
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retries")]
pub query_retries: u32,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retry_delay_ms")]
pub query_retry_delay_ms: u64,
}

Expand Down
12 changes: 10 additions & 2 deletions rust/server-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,19 @@ impl ServerArgs {
where
C: RunnableConfig,
{
self.pre_run();
let config = self.load_generic_config::<C>()?;
run_server_with_config(config, handle).await
}

pub fn pre_run(&self) {
// Set up the server.
setup_logging();
setup_panic_handler();
let config = load::<GenericConfig<C>>(&self.config_path)?;
run_server_with_config(config, handle).await
}

pub fn load_generic_config<C: for<'de> Deserialize<'de>>(&self) -> Result<GenericConfig<C>> {
load::<GenericConfig<C>>(&self.config_path)
}
}

Expand Down

0 comments on commit 03fb799

Please sign in to comment.