Skip to content

Commit

Permalink
Add config var
Browse files Browse the repository at this point in the history
  • Loading branch information
rtso committed May 2, 2024
1 parent 6b76415 commit c6ea7ae
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 2 deletions.
8 changes: 8 additions & 0 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ pub struct IndexerGrpcHttp2Config {

/// Indexer GRPC http2 ping timeout in seconds. Defaults to 10.
indexer_grpc_http2_ping_timeout_in_secs: u64,

/// Seconds before timeout for grpc connection.
indexer_grpc_connection_timeout_secs: u64,
}

impl IndexerGrpcHttp2Config {
Expand All @@ -124,13 +127,18 @@ impl IndexerGrpcHttp2Config {
pub fn grpc_http2_ping_timeout_in_secs(&self) -> Duration {
Duration::from_secs(self.indexer_grpc_http2_ping_timeout_in_secs)
}

pub fn grpc_connection_timeout_secs(&self) -> Duration {
Duration::from_secs(self.indexer_grpc_connection_timeout_secs)
}
}

impl Default for IndexerGrpcHttp2Config {
fn default() -> Self {
Self {
indexer_grpc_http2_ping_interval_in_secs: 30,
indexer_grpc_http2_ping_timeout_in_secs: 10,
indexer_grpc_connection_timeout_secs: 5,
}
}
}
10 changes: 8 additions & 2 deletions rust/processor/src/grpc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub async fn get_stream(
indexer_grpc_data_service_address: Url,
indexer_grpc_http2_ping_interval: Duration,
indexer_grpc_http2_ping_timeout: Duration,
indexer_grpc_reconnection_timeout_secs: Duration,
starting_version: u64,
ending_version: Option<u64>,
auth_token: String,
Expand Down Expand Up @@ -121,7 +122,7 @@ pub async fn get_stream(
let mut connect_retries = 0;
let connect_res = loop {
let res = timeout(
Duration::from_secs(5),
indexer_grpc_reconnection_timeout_secs,
RawDataClient::connect(channel.clone()),
)
.await;
Expand Down Expand Up @@ -182,7 +183,7 @@ pub async fn get_stream(
// Retry this connection a few times before giving up
let mut connect_retries = 0;
let stream_res = loop {
let timeout_res = timeout(Duration::from_secs(5), async {
let timeout_res = timeout(indexer_grpc_reconnection_timeout_secs, async {
let request = grpc_request_builder(
starting_version,
count,
Expand Down Expand Up @@ -235,6 +236,7 @@ pub async fn get_chain_id(
indexer_grpc_data_service_address: Url,
indexer_grpc_http2_ping_interval: Duration,
indexer_grpc_http2_ping_timeout: Duration,
indexer_grpc_reconnection_timeout_secs: Duration,
auth_token: String,
processor_name: String,
) -> u64 {
Expand All @@ -248,6 +250,7 @@ pub async fn get_chain_id(
indexer_grpc_data_service_address.clone(),
indexer_grpc_http2_ping_interval,
indexer_grpc_http2_ping_timeout,
indexer_grpc_reconnection_timeout_secs,
1,
Some(2),
auth_token.clone(),
Expand Down Expand Up @@ -304,6 +307,7 @@ pub async fn create_fetcher_loop(
indexer_grpc_data_service_address: Url,
indexer_grpc_http2_ping_interval: Duration,
indexer_grpc_http2_ping_timeout: Duration,
indexer_grpc_reconnection_timeout_secs: Duration,
starting_version: u64,
request_ending_version: Option<u64>,
auth_token: String,
Expand All @@ -324,6 +328,7 @@ pub async fn create_fetcher_loop(
indexer_grpc_data_service_address.clone(),
indexer_grpc_http2_ping_interval,
indexer_grpc_http2_ping_timeout,
indexer_grpc_reconnection_timeout_secs,
starting_version,
request_ending_version,
auth_token.clone(),
Expand Down Expand Up @@ -634,6 +639,7 @@ pub async fn create_fetcher_loop(
indexer_grpc_data_service_address.clone(),
indexer_grpc_http2_ping_interval,
indexer_grpc_http2_ping_timeout,
indexer_grpc_reconnection_timeout_secs,
next_version_to_fetch,
request_ending_version,
auth_token.clone(),
Expand Down
5 changes: 5 additions & 0 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub struct Worker {
}

impl Worker {
#[allow(clippy::too_many_arguments)]
pub async fn new(
processor_config: ProcessorConfig,
postgres_connection_string: String,
Expand Down Expand Up @@ -173,6 +174,7 @@ impl Worker {
self.indexer_grpc_data_service_address.clone(),
self.grpc_http2_config.grpc_http2_ping_interval_in_secs(),
self.grpc_http2_config.grpc_http2_ping_timeout_in_secs(),
self.grpc_http2_config.grpc_connection_timeout_secs(),
self.auth_token.clone(),
processor_name.to_string(),
)
Expand All @@ -189,6 +191,8 @@ impl Worker {
self.grpc_http2_config.grpc_http2_ping_interval_in_secs();
let indexer_grpc_http2_ping_timeout =
self.grpc_http2_config.grpc_http2_ping_timeout_in_secs();
let indexer_grpc_reconnection_timeout_secs =
self.grpc_http2_config.grpc_connection_timeout_secs();
let pb_channel_txn_chunk_size = self.pb_channel_txn_chunk_size;

// Create a transaction fetcher thread that will continuously fetch transactions from the GRPC stream
Expand All @@ -212,6 +216,7 @@ impl Worker {
indexer_grpc_data_service_address.clone(),
indexer_grpc_http2_ping_interval,
indexer_grpc_http2_ping_timeout,
indexer_grpc_reconnection_timeout_secs,
starting_version,
request_ending_version,
auth_token.clone(),
Expand Down

0 comments on commit c6ea7ae

Please sign in to comment.