diff --git a/rust/processor/src/config.rs b/rust/processor/src/config.rs index 63c59e94f..92a696f68 100644 --- a/rust/processor/src/config.rs +++ b/rust/processor/src/config.rs @@ -114,6 +114,9 @@ pub struct IndexerGrpcHttp2Config { /// Indexer GRPC http2 ping timeout in seconds. Defaults to 10. indexer_grpc_http2_ping_timeout_in_secs: u64, + + /// Seconds before timeout for grpc connection. + indexer_grpc_connection_timeout_secs: u64, } impl IndexerGrpcHttp2Config { @@ -124,6 +127,10 @@ impl IndexerGrpcHttp2Config { pub fn grpc_http2_ping_timeout_in_secs(&self) -> Duration { Duration::from_secs(self.indexer_grpc_http2_ping_timeout_in_secs) } + + pub fn grpc_connection_timeout_secs(&self) -> Duration { + Duration::from_secs(self.indexer_grpc_connection_timeout_secs) + } } impl Default for IndexerGrpcHttp2Config { @@ -131,6 +138,7 @@ impl Default for IndexerGrpcHttp2Config { Self { indexer_grpc_http2_ping_interval_in_secs: 30, indexer_grpc_http2_ping_timeout_in_secs: 10, + indexer_grpc_connection_timeout_secs: 5, } } } diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index 6665da62c..39c0c0942 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -74,6 +74,7 @@ pub async fn get_stream( indexer_grpc_data_service_address: Url, indexer_grpc_http2_ping_interval: Duration, indexer_grpc_http2_ping_timeout: Duration, + indexer_grpc_reconnection_timeout_secs: Duration, starting_version: u64, ending_version: Option, auth_token: String, @@ -121,7 +122,7 @@ pub async fn get_stream( let mut connect_retries = 0; let connect_res = loop { let res = timeout( - Duration::from_secs(5), + indexer_grpc_reconnection_timeout_secs, RawDataClient::connect(channel.clone()), ) .await; @@ -182,7 +183,7 @@ pub async fn get_stream( // Retry this connection a few times before giving up let mut connect_retries = 0; let stream_res = loop { - let timeout_res = timeout(Duration::from_secs(5), async { + let timeout_res = timeout(indexer_grpc_reconnection_timeout_secs, async { let request = grpc_request_builder( starting_version, count, @@ -235,6 +236,7 @@ pub async fn get_chain_id( indexer_grpc_data_service_address: Url, indexer_grpc_http2_ping_interval: Duration, indexer_grpc_http2_ping_timeout: Duration, + indexer_grpc_reconnection_timeout_secs: Duration, auth_token: String, processor_name: String, ) -> u64 { @@ -248,6 +250,7 @@ pub async fn get_chain_id( indexer_grpc_data_service_address.clone(), indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, + indexer_grpc_reconnection_timeout_secs, 1, Some(2), auth_token.clone(), @@ -304,6 +307,7 @@ pub async fn create_fetcher_loop( indexer_grpc_data_service_address: Url, indexer_grpc_http2_ping_interval: Duration, indexer_grpc_http2_ping_timeout: Duration, + indexer_grpc_reconnection_timeout_secs: Duration, starting_version: u64, request_ending_version: Option, auth_token: String, @@ -324,6 +328,7 @@ pub async fn create_fetcher_loop( indexer_grpc_data_service_address.clone(), indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, + indexer_grpc_reconnection_timeout_secs, starting_version, request_ending_version, auth_token.clone(), @@ -634,6 +639,7 @@ pub async fn create_fetcher_loop( indexer_grpc_data_service_address.clone(), indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, + indexer_grpc_reconnection_timeout_secs, next_version_to_fetch, request_ending_version, auth_token.clone(), diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index fbec6c4ab..24b70f356 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -66,6 +66,7 @@ pub struct Worker { } impl Worker { + #[allow(clippy::too_many_arguments)] pub async fn new( processor_config: ProcessorConfig, postgres_connection_string: String, @@ -173,6 +174,7 @@ impl Worker { self.indexer_grpc_data_service_address.clone(), self.grpc_http2_config.grpc_http2_ping_interval_in_secs(), self.grpc_http2_config.grpc_http2_ping_timeout_in_secs(), + self.grpc_http2_config.grpc_connection_timeout_secs(), self.auth_token.clone(), processor_name.to_string(), ) @@ -189,6 +191,8 @@ impl Worker { self.grpc_http2_config.grpc_http2_ping_interval_in_secs(); let indexer_grpc_http2_ping_timeout = self.grpc_http2_config.grpc_http2_ping_timeout_in_secs(); + let indexer_grpc_reconnection_timeout_secs = + self.grpc_http2_config.grpc_connection_timeout_secs(); let pb_channel_txn_chunk_size = self.pb_channel_txn_chunk_size; // Create a transaction fetcher thread that will continuously fetch transactions from the GRPC stream @@ -212,6 +216,7 @@ impl Worker { indexer_grpc_data_service_address.clone(), indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, + indexer_grpc_reconnection_timeout_secs, starting_version, request_ending_version, auth_token.clone(),