Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the config structure. #337

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ prost-types = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
server-framework = { workspace = true }
sha2 = { workspace = true }
sha3 = { workspace = true }
Expand Down
160 changes: 139 additions & 21 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
use ahash::AHashMap;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use server_framework::RunnableConfig;
use server_framework::{GenericConfig, RunnableConfig};
use std::time::Duration;
use url::Url;

Expand All @@ -34,10 +34,10 @@ pub struct IndexerGrpcProcessorConfig {
// Size of the pool for writes/reads to the DB. Limits maximum number of queries in flight
pub db_pool_size: Option<u32>,
// Maximum number of batches "missing" before we assume we have an issue with gaps and abort
#[serde(default = "IndexerGrpcProcessorConfig::default_gap_detection_batch_size")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_gap_detection_batch_size")]
pub gap_detection_batch_size: u64,
// Number of protobuff transactions to send per chunk to the processor tasks
#[serde(default = "IndexerGrpcProcessorConfig::default_pb_channel_txn_chunk_size")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_pb_channel_txn_chunk_size")]
pub pb_channel_txn_chunk_size: usize,
// Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2)
#[serde(default = "AHashMap::new")]
Expand All @@ -47,7 +47,16 @@ pub struct IndexerGrpcProcessorConfig {
pub transaction_filter: TransactionFilter,
}

impl IndexerGrpcProcessorConfig {
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerGrpcProcessorConfigV2 {
pub common: CommonConfig,
pub db_connection: DbConnectionConfig,
pub indexer_grpc: IndexerGrpcConnectionConfig,
pub processors: Vec<ProcessorConfigV2>,
}

impl IndexerGrpcProcessorConfigV2 {
pub const fn default_gap_detection_batch_size() -> u64 {
DEFAULT_GAP_DETECTION_BATCH_SIZE
}
Expand All @@ -68,23 +77,30 @@ impl IndexerGrpcProcessorConfig {
}

#[async_trait::async_trait]
impl RunnableConfig for IndexerGrpcProcessorConfig {
impl RunnableConfig for IndexerGrpcProcessorConfigV2 {
async fn run(&self) -> Result<()> {
// TODO(grao): Implement the following things.
if self.processors.len() != 1 {
unimplemented!("Only support 1 processor now.");
}
if self.db_connection.db_connection_urls.len() != 1 {
unimplemented!("Only support 1 db connection URL now.");
}
let mut worker = Worker::new(
self.processor_config.clone(),
self.postgres_connection_string.clone(),
self.indexer_grpc_data_service_address.clone(),
self.grpc_http2_config.clone(),
self.auth_token.clone(),
self.starting_version,
self.ending_version,
self.number_concurrent_processing_tasks,
self.db_pool_size,
self.gap_detection_batch_size,
self.pb_channel_txn_chunk_size,
self.per_table_chunk_sizes.clone(),
self.enable_verbose_logging,
self.transaction_filter.clone(),
self.processors[0].processor.clone(),
self.db_connection.db_connection_urls[0].clone(),
self.indexer_grpc.data_service_address.clone(),
self.indexer_grpc.http2_config.clone(),
self.indexer_grpc.auth_token.clone(),
self.common.version_config.starting_version,
self.common.version_config.ending_version,
self.common.number_concurrent_processing_tasks,
self.db_connection.db_pool_size_per_url,
self.common.gap_detection_batch_size,
self.common.pb_channel_txn_chunk_size,
self.common.per_table_chunk_sizes.clone(),
self.common.enable_verbose_logging,
self.common.transaction_filter.clone(),
)
.await
.context("Failed to build worker")?;
Expand All @@ -94,8 +110,8 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {

fn get_server_name(&self) -> String {
// Get the part before the first _ and trim to 12 characters.
let before_underscore = self
.processor_config
let before_underscore = self.processors[0]
.processor
.name()
.split('_')
.next()
Expand All @@ -104,6 +120,67 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct CommonConfig {
#[serde(flatten)]
pub version_config: VersionConfig,
// Maximum number of batches "missing" before we assume we have an issue with gaps and abort
#[serde(default = "IndexerGrpcProcessorConfigV2::default_gap_detection_batch_size")]
pub gap_detection_batch_size: u64,
// Number of protobuf transactions to send per chunk to the processor tasks
#[serde(default = "IndexerGrpcProcessorConfigV2::default_pb_channel_txn_chunk_size")]
pub pb_channel_txn_chunk_size: usize,
// Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2)
// TODO(grao): Revisit this config, it's probably better to have it on each processor config.
#[serde(default = "AHashMap::new")]
pub per_table_chunk_sizes: AHashMap<String, usize>,
// Number of tasks waiting to pull transaction batches from the channel and process them
pub number_concurrent_processing_tasks: Option<usize>,
#[serde(default)]
pub transaction_filter: TransactionFilter,
pub enable_verbose_logging: Option<bool>,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ProcessorConfigV2 {
pub processor: ProcessorConfig,
pub overrides: ConfigOverrides,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct DbConnectionConfig {
pub db_connection_urls: Vec<String>,
// Size of the pool for writes/reads to the DB. Limits maximum number of queries in flight
pub db_pool_size_per_url: Option<u32>,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerGrpcConnectionConfig {
// TODO: Add TLS support.
pub data_service_address: Url,
#[serde(flatten)]
pub http2_config: IndexerGrpcHttp2Config,
pub auth_token: String,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigOverrides {
#[serde(flatten)]
pub version_overrides: VersionConfig,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct VersionConfig {
pub starting_version: Option<u64>,
pub ending_version: Option<u64>,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(default)]
Expand Down Expand Up @@ -134,3 +211,44 @@ impl Default for IndexerGrpcHttp2Config {
}
}
}

pub fn from_v1_config(
v1_config: GenericConfig<IndexerGrpcProcessorConfig>,
) -> GenericConfig<IndexerGrpcProcessorConfigV2> {
GenericConfig::<IndexerGrpcProcessorConfigV2> {
health_check_port: v1_config.health_check_port,
server_config: v1_config.server_config.into(),
}
}

impl From<IndexerGrpcProcessorConfig> for IndexerGrpcProcessorConfigV2 {
fn from(v1_config: IndexerGrpcProcessorConfig) -> Self {
IndexerGrpcProcessorConfigV2 {
common: CommonConfig {
version_config: VersionConfig {
starting_version: v1_config.starting_version,
ending_version: v1_config.ending_version,
},
gap_detection_batch_size: v1_config.gap_detection_batch_size,
pb_channel_txn_chunk_size: v1_config.pb_channel_txn_chunk_size,
per_table_chunk_sizes: v1_config.per_table_chunk_sizes,
number_concurrent_processing_tasks: v1_config.number_concurrent_processing_tasks,
enable_verbose_logging: v1_config.enable_verbose_logging,
transaction_filter: v1_config.transaction_filter,
},
db_connection: DbConnectionConfig {
db_connection_urls: vec![v1_config.postgres_connection_string],
db_pool_size_per_url: v1_config.db_pool_size,
},
indexer_grpc: IndexerGrpcConnectionConfig {
data_service_address: v1_config.indexer_grpc_data_service_address,
http2_config: v1_config.grpc_http2_config,
auth_token: v1_config.auth_token,
},
processors: vec![ProcessorConfigV2 {
processor: v1_config.processor_config,
overrides: Default::default(),
}],
}
}
}
2 changes: 1 addition & 1 deletion rust/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#[macro_use]
extern crate diesel;

pub use config::IndexerGrpcProcessorConfig;
pub use config::{from_v1_config, IndexerGrpcProcessorConfig, IndexerGrpcProcessorConfigV2};

mod config;
pub mod gap_detector;
Expand Down
24 changes: 21 additions & 3 deletions rust/processor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use anyhow::Result;
use clap::Parser;
use processor::IndexerGrpcProcessorConfig;
use processor::{from_v1_config, IndexerGrpcProcessorConfig, IndexerGrpcProcessorConfigV2};
use server_framework::ServerArgs;

const RUNTIME_WORKER_MULTIPLIER: usize = 2;
Expand All @@ -25,7 +25,25 @@ fn main() -> Result<()> {
.unwrap()
.block_on(async {
let args = ServerArgs::parse();
args.run::<IndexerGrpcProcessorConfig>(tokio::runtime::Handle::current())
.await
match args.load_generic_config::<IndexerGrpcProcessorConfigV2>() {
Ok(_) => {
args.run::<IndexerGrpcProcessorConfigV2>(tokio::runtime::Handle::current())
.await
},
Err(_) => {
let v1_config = args.load_generic_config::<IndexerGrpcProcessorConfig>()?;
let v2_config = from_v1_config(v1_config);
println!(
"You are using a deprecated config format, please migrate! The equivalent config in new format is: {}",
serde_yaml::to_string(&v2_config).unwrap()
);
args.pre_run();
server_framework::run_server_with_config(
v2_config,
tokio::runtime::Handle::current(),
)
.await
},
}
})
}
6 changes: 3 additions & 3 deletions rust/processor/src/processors/nft_metadata_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
database::{PgDbPool, PgPoolConnection},
util::{parse_timestamp, remove_null_bytes, standardize_address},
},
IndexerGrpcProcessorConfig,
IndexerGrpcProcessorConfigV2,
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::{write_set_change::Change, Transaction};
Expand All @@ -39,9 +39,9 @@ pub const CHUNK_SIZE: usize = 1000;
pub struct NftMetadataProcessorConfig {
pub pubsub_topic_name: String,
pub google_application_credentials: Option<String>,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retries")]
pub query_retries: u32,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retry_delay_ms")]
pub query_retry_delay_ms: u64,
}

Expand Down
6 changes: 3 additions & 3 deletions rust/processor/src/processors/objects_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool},
util::standardize_address,
},
IndexerGrpcProcessorConfig,
IndexerGrpcProcessorConfigV2,
};
use ahash::AHashMap;
use anyhow::bail;
Expand All @@ -30,9 +30,9 @@ use tracing::error;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ObjectsProcessorConfig {
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retries")]
pub query_retries: u32,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retry_delay_ms")]
pub query_retry_delay_ms: u64,
}
pub struct ObjectsProcessor {
Expand Down
6 changes: 3 additions & 3 deletions rust/processor/src/processors/stake_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool},
util::{parse_timestamp, standardize_address},
},
IndexerGrpcProcessorConfig,
IndexerGrpcProcessorConfigV2,
};
use ahash::AHashMap;
use anyhow::bail;
Expand All @@ -39,9 +39,9 @@ use tracing::error;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct StakeProcessorConfig {
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retries")]
pub query_retries: u32,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retry_delay_ms")]
pub query_retry_delay_ms: u64,
}

Expand Down
6 changes: 3 additions & 3 deletions rust/processor/src/processors/token_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
},
schema,
utils::database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool},
IndexerGrpcProcessorConfig,
IndexerGrpcProcessorConfigV2,
};
use ahash::AHashMap;
use anyhow::bail;
Expand All @@ -36,9 +36,9 @@ use tracing::error;
#[serde(deny_unknown_fields)]
pub struct TokenProcessorConfig {
pub nft_points_contract: Option<String>,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retries")]
pub query_retries: u32,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retry_delay_ms")]
pub query_retry_delay_ms: u64,
}

Expand Down
6 changes: 3 additions & 3 deletions rust/processor/src/processors/token_v2_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool, PgPoolConnection},
util::{get_entry_function_from_user_request, parse_timestamp, standardize_address},
},
IndexerGrpcProcessorConfig,
IndexerGrpcProcessorConfigV2,
};
use ahash::{AHashMap, AHashSet};
use anyhow::bail;
Expand All @@ -51,9 +51,9 @@ use tracing::error;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct TokenV2ProcessorConfig {
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retries")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retries")]
pub query_retries: u32,
#[serde(default = "IndexerGrpcProcessorConfig::default_query_retry_delay_ms")]
#[serde(default = "IndexerGrpcProcessorConfigV2::default_query_retry_delay_ms")]
pub query_retry_delay_ms: u64,
}

Expand Down
12 changes: 10 additions & 2 deletions rust/server-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,19 @@ impl ServerArgs {
where
C: RunnableConfig,
{
self.pre_run();
let config = self.load_generic_config::<C>()?;
run_server_with_config(config, handle).await
}

pub fn pre_run(&self) {
// Set up the server.
setup_logging();
setup_panic_handler();
let config = load::<GenericConfig<C>>(&self.config_path)?;
run_server_with_config(config, handle).await
}

pub fn load_generic_config<C: for<'de> Deserialize<'de>>(&self) -> Result<GenericConfig<C>> {
load::<GenericConfig<C>>(&self.config_path)
}
}

Expand Down
Loading