From f2e4709fb6d883d5a3e2ece02ead7310745262bb Mon Sep 17 00:00:00 2001 From: William Smith Date: Tue, 18 Jun 2024 17:41:43 -0700 Subject: [PATCH 1/3] [TrafficController] correctly categorize spam --- crates/sui-core/src/authority_server.rs | 143 ++++++++++++------ crates/sui-core/src/traffic_controller/mod.rs | 3 +- .../src/traffic_controller/policies.rs | 6 + crates/sui-json-rpc/src/axum_router.rs | 14 +- 4 files changed, 116 insertions(+), 50 deletions(-) diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index 23adc633564ee..80e9a84e6db91 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -346,7 +346,9 @@ impl ValidatorService { async fn handle_transaction( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { + let tally_spam = false; + let Self { state, consensus_adapter, @@ -414,9 +416,14 @@ impl ValidatorService { // to save more CPU. return Err(error.into()); } - Ok(tonic::Response::new(info)) + Ok((tonic::Response::new(info), tally_spam)) } + // In addition to the response from handling the certificates, + // returns a bool indicating whether the request should be tallied + // toward spam count. In general, this should be set to true for + // requests that are read-only and thus do not consume gas, such + // as when the transaction is already executed. async fn handle_certificates( &self, certificates: NonEmpty, @@ -426,7 +433,7 @@ impl ValidatorService { _include_auxiliary_data: bool, epoch_store: &Arc, wait_for_effects: bool, - ) -> Result>, tonic::Status> { + ) -> Result<(Option>, bool), tonic::Status> { // Validate if cert can be executed // Fullnode does not serve handle_certificate call. fp_ensure!( @@ -472,13 +479,16 @@ impl ValidatorService { None }; - return Ok(Some(vec![HandleCertificateResponseV3 { - effects: signed_effects.into_inner(), - events, - input_objects: None, - output_objects: None, - auxiliary_data: None, - }])); + return Ok(( + Some(vec![HandleCertificateResponseV3 { + effects: signed_effects.into_inner(), + events, + input_objects: None, + output_objects: None, + auxiliary_data: None, + }]), + true, + )); }; } @@ -562,7 +572,7 @@ impl ValidatorService { epoch_store, ); } - return Ok(None); + return Ok((None, false)); } // 4) Execute the certificates immediately if they contain only owned object transactions, @@ -602,22 +612,24 @@ impl ValidatorService { )) .await?; - Ok(Some(responses)) + Ok((Some(responses), false)) } } +type WrappedServiceResponse = Result<(tonic::Response, bool), tonic::Status>; + impl ValidatorService { async fn transaction_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { self.handle_transaction(request).await } async fn submit_certificate_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let epoch_store = self.state.load_epoch_store_one_call_per_task(); let certificate = request.into_inner(); Self::transaction_validity_check(&epoch_store, certificate.data())?; @@ -634,17 +646,20 @@ impl ValidatorService { ) .instrument(span) .await - .map(|executed| { - tonic::Response::new(SubmitCertificateResponse { - executed: executed.map(|mut x| x.remove(0)).map(Into::into), - }) + .map(|(executed, tally_spam)| { + ( + tonic::Response::new(SubmitCertificateResponse { + executed: executed.map(|mut x| x.remove(0)).map(Into::into), + }), + tally_spam, + ) }) } async fn handle_certificate_v2_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let epoch_store = self.state.load_epoch_store_one_call_per_task(); let certificate = request.into_inner(); Self::transaction_validity_check(&epoch_store, certificate.data())?; @@ -661,11 +676,16 @@ impl ValidatorService { ) .instrument(span) .await - .map(|v| { - tonic::Response::new( - v.expect("handle_certificate should not return none with wait_for_effects=true") + .map(|(resp, tally_spam)| { + ( + tonic::Response::new( + resp.expect( + "handle_certificate should not return none with wait_for_effects=true", + ) .remove(0) .into(), + ), + tally_spam, ) }) } @@ -673,7 +693,7 @@ impl ValidatorService { async fn handle_certificate_v3_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let epoch_store = self.state.load_epoch_store_one_call_per_task(); let request = request.into_inner(); Self::transaction_validity_check(&epoch_store, request.certificate.data())?; @@ -690,10 +710,15 @@ impl ValidatorService { ) .instrument(span) .await - .map(|v| { - tonic::Response::new( - v.expect("handle_certificate should not return none with wait_for_effects=true") + .map(|(resp, tally_spam)| { + ( + tonic::Response::new( + resp.expect( + "handle_certificate should not return none with wait_for_effects=true", + ) .remove(0), + ), + tally_spam, ) }) } @@ -785,7 +810,7 @@ impl ValidatorService { async fn handle_soft_bundle_certificates_v3_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let epoch_store = self.state.load_epoch_store_one_call_per_task(); let request = request.into_inner(); @@ -815,10 +840,13 @@ impl ValidatorService { ) .instrument(span) .await - .map(|v| { - tonic::Response::new(HandleSoftBundleCertificatesResponseV3 { - responses: v.unwrap_or_default(), - }) + .map(|(resp, tally_spam)| { + ( + tonic::Response::new(HandleSoftBundleCertificatesResponseV3 { + responses: resp.unwrap_or_default(), + }), + tally_spam, + ) }) } @@ -847,49 +875,48 @@ impl ValidatorService { async fn object_info_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let request = request.into_inner(); let response = self.state.handle_object_info_request(request).await?; - Ok(tonic::Response::new(response)) + Ok((tonic::Response::new(response), true)) } async fn transaction_info_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let request = request.into_inner(); let response = self.state.handle_transaction_info_request(request).await?; - Ok(tonic::Response::new(response)) + Ok((tonic::Response::new(response), true)) } async fn checkpoint_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let request = request.into_inner(); let response = self.state.handle_checkpoint_request(&request)?; - Ok(tonic::Response::new(response)) + Ok((tonic::Response::new(response), true)) } async fn checkpoint_v2_impl( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let request = request.into_inner(); let response = self.state.handle_checkpoint_request_v2(&request)?; - Ok(tonic::Response::new(response)) + Ok((tonic::Response::new(response), true)) } async fn get_system_state_object_impl( &self, _request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> WrappedServiceResponse { let response = self .state .get_object_cache_reader() .get_sui_system_state_object_unsafe()?; - - Ok(tonic::Response::new(response)) + Ok((tonic::Response::new(response), true)) } async fn handle_traffic_req(&self, client: Option) -> Result<(), tonic::Status> { @@ -909,6 +936,7 @@ impl ValidatorService { &self, client: Option, response: &Result, tonic::Status>, + tally_spam: bool, ) { let error: Option = if let Err(status) = response { Some(SuiError::from(status.clone())) @@ -921,6 +949,7 @@ impl ValidatorService { direct: client, through_fullnode: None, error_weight: error.map(normalize).unwrap_or(Weight::zero()), + tally_spam, timestamp: SystemTime::now(), }) } @@ -960,7 +989,14 @@ fn normalize(err: SuiError) -> Weight { macro_rules! handle_with_decoration { ($self:ident, $func_name:ident, $request:ident) => {{ if $self.client_id_source.is_none() { - return $self.$func_name($request).await; + return match $self.$func_name($request).await { + Ok((result, _)) => { + Ok(result) + } + Err(err) => { + Err(err) + } + } } let client = match $self.client_id_source.as_ref().unwrap() { @@ -1024,11 +1060,22 @@ macro_rules! handle_with_decoration { // check if either IP is blocked, in which case return early $self.handle_traffic_req(client.clone()).await?; - // handle request - let response = $self.$func_name($request).await; - // handle response tallying - $self.handle_traffic_resp(client, &response); - response + + // handle response tallying. + match $self.$func_name($request).await { + Ok((result, tally_spam)) => { + let response = Ok(result); + $self.handle_traffic_resp(client, &response, tally_spam); + response + } + Err(err) => { + // TODO: should we set tally_spam to true here for error + // case? + let response = Err(err); + $self.handle_traffic_resp(client, &response, false); + response + } + } }}; } diff --git a/crates/sui-core/src/traffic_controller/mod.rs b/crates/sui-core/src/traffic_controller/mod.rs index 0f7b38270faf2..dd1508fd073ed 100644 --- a/crates/sui-core/src/traffic_controller/mod.rs +++ b/crates/sui-core/src/traffic_controller/mod.rs @@ -364,7 +364,7 @@ async fn handle_spam_tally( metrics: Arc, mem_drainfile_present: bool, ) -> Result<(), reqwest::Error> { - if !policy_config.spam_sample_rate.is_sampled().await { + if tally.tally_spam && !policy_config.spam_sample_rate.is_sampled().await { return Ok(()); } let resp = policy.handle_tally(tally.clone()); @@ -651,6 +651,7 @@ impl TrafficSim { None, // TODO add weight adjustment Weight::one(), + true, )); } else { if !currently_blocked { diff --git a/crates/sui-core/src/traffic_controller/policies.rs b/crates/sui-core/src/traffic_controller/policies.rs index 64ebfe8cbb59b..0461b3312fda4 100644 --- a/crates/sui-core/src/traffic_controller/policies.rs +++ b/crates/sui-core/src/traffic_controller/policies.rs @@ -137,6 +137,7 @@ pub struct TrafficTally { pub direct: Option, pub through_fullnode: Option, pub error_weight: Weight, + pub tally_spam: bool, pub timestamp: SystemTime, } @@ -145,11 +146,13 @@ impl TrafficTally { direct: Option, through_fullnode: Option, error_weight: Weight, + tally_spam: bool, ) -> Self { Self { direct, through_fullnode, error_weight, + tally_spam, timestamp: SystemTime::now(), } } @@ -418,18 +421,21 @@ mod tests { direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))), through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))), error_weight: Weight::zero(), + tally_spam: true, timestamp: SystemTime::now(), }; let bob = TrafficTally { direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))), through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(4, 3, 2, 1))), error_weight: Weight::zero(), + tally_spam: true, timestamp: SystemTime::now(), }; let charlie = TrafficTally { direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))), through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8))), error_weight: Weight::zero(), + tally_spam: true, timestamp: SystemTime::now(), }; diff --git a/crates/sui-json-rpc/src/axum_router.rs b/crates/sui-json-rpc/src/axum_router.rs index 5700f708f37e9..e199ffcf3ae26 100644 --- a/crates/sui-json-rpc/src/axum_router.rs +++ b/crates/sui-json-rpc/src/axum_router.rs @@ -165,11 +165,12 @@ async fn process_raw_request( return blocked_response; } } + let tally_spam = tally_spam_for_method(&request.method); let response = process_request(request, api_version, service.call_data()).await; // handle response tallying if let Some(traffic_controller) = &service.traffic_controller { - handle_traffic_resp(traffic_controller.clone(), client, &response); + handle_traffic_resp(traffic_controller.clone(), client, &response, tally_spam); } response } else if let Ok(_batch) = serde_json::from_str::>(raw_request) { @@ -197,16 +198,27 @@ async fn handle_traffic_req( } } +fn tally_spam_for_method(method: &str) -> bool { + // unless request requires gas payment, count against + // spam tally + match method { + "sui_executeTransactionBlock" | "sui_devInspectTransactionBlock" => false, + _ => true, + } +} + fn handle_traffic_resp( traffic_controller: Arc, client: Option, response: &MethodResponse, + tally_spam: bool, ) { let error = response.error_code.map(ErrorCode::from); traffic_controller.tally(TrafficTally { direct: client, through_fullnode: None, error_weight: error.map(normalize).unwrap_or(Weight::zero()), + tally_spam, timestamp: SystemTime::now(), }); } From 0a75fd974b028a575dacc13fcfb81d12840cc3d1 Mon Sep 17 00:00:00 2001 From: William Smith Date: Thu, 20 Jun 2024 16:06:59 -0700 Subject: [PATCH 2/3] introduce spam_weight --- crates/sui-core/src/authority_server.rs | 77 +++---- crates/sui-core/src/traffic_controller/mod.rs | 14 +- .../src/traffic_controller/policies.rs | 12 +- .../tests/traffic_control_tests.rs | 217 ++++++++++++++++-- crates/sui-json-rpc/src/axum_router.rs | 19 +- 5 files changed, 259 insertions(+), 80 deletions(-) diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index 80e9a84e6db91..a034fcbc0b31a 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -347,8 +347,6 @@ impl ValidatorService { &self, request: tonic::Request, ) -> WrappedServiceResponse { - let tally_spam = false; - let Self { state, consensus_adapter, @@ -416,7 +414,7 @@ impl ValidatorService { // to save more CPU. return Err(error.into()); } - Ok((tonic::Response::new(info), tally_spam)) + Ok((tonic::Response::new(info), Weight::zero())) } // In addition to the response from handling the certificates, @@ -433,7 +431,7 @@ impl ValidatorService { _include_auxiliary_data: bool, epoch_store: &Arc, wait_for_effects: bool, - ) -> Result<(Option>, bool), tonic::Status> { + ) -> Result<(Option>, Weight), tonic::Status> { // Validate if cert can be executed // Fullnode does not serve handle_certificate call. fp_ensure!( @@ -487,7 +485,7 @@ impl ValidatorService { output_objects: None, auxiliary_data: None, }]), - true, + Weight::one(), )); }; } @@ -572,7 +570,7 @@ impl ValidatorService { epoch_store, ); } - return Ok((None, false)); + return Ok((None, Weight::zero())); } // 4) Execute the certificates immediately if they contain only owned object transactions, @@ -612,11 +610,11 @@ impl ValidatorService { )) .await?; - Ok((Some(responses), false)) + Ok((Some(responses), Weight::zero())) } } -type WrappedServiceResponse = Result<(tonic::Response, bool), tonic::Status>; +type WrappedServiceResponse = Result<(tonic::Response, Weight), tonic::Status>; impl ValidatorService { async fn transaction_impl( @@ -646,12 +644,12 @@ impl ValidatorService { ) .instrument(span) .await - .map(|(executed, tally_spam)| { + .map(|(executed, spam_weight)| { ( tonic::Response::new(SubmitCertificateResponse { executed: executed.map(|mut x| x.remove(0)).map(Into::into), }), - tally_spam, + spam_weight, ) }) } @@ -676,7 +674,7 @@ impl ValidatorService { ) .instrument(span) .await - .map(|(resp, tally_spam)| { + .map(|(resp, spam_weight)| { ( tonic::Response::new( resp.expect( @@ -685,7 +683,7 @@ impl ValidatorService { .remove(0) .into(), ), - tally_spam, + spam_weight, ) }) } @@ -710,7 +708,7 @@ impl ValidatorService { ) .instrument(span) .await - .map(|(resp, tally_spam)| { + .map(|(resp, spam_weight)| { ( tonic::Response::new( resp.expect( @@ -718,7 +716,7 @@ impl ValidatorService { ) .remove(0), ), - tally_spam, + spam_weight, ) }) } @@ -840,12 +838,12 @@ impl ValidatorService { ) .instrument(span) .await - .map(|(resp, tally_spam)| { + .map(|(resp, spam_weight)| { ( tonic::Response::new(HandleSoftBundleCertificatesResponseV3 { responses: resp.unwrap_or_default(), }), - tally_spam, + spam_weight, ) }) } @@ -878,7 +876,7 @@ impl ValidatorService { ) -> WrappedServiceResponse { let request = request.into_inner(); let response = self.state.handle_object_info_request(request).await?; - Ok((tonic::Response::new(response), true)) + Ok((tonic::Response::new(response), Weight::one())) } async fn transaction_info_impl( @@ -887,7 +885,7 @@ impl ValidatorService { ) -> WrappedServiceResponse { let request = request.into_inner(); let response = self.state.handle_transaction_info_request(request).await?; - Ok((tonic::Response::new(response), true)) + Ok((tonic::Response::new(response), Weight::one())) } async fn checkpoint_impl( @@ -896,7 +894,7 @@ impl ValidatorService { ) -> WrappedServiceResponse { let request = request.into_inner(); let response = self.state.handle_checkpoint_request(&request)?; - Ok((tonic::Response::new(response), true)) + Ok((tonic::Response::new(response), Weight::one())) } async fn checkpoint_v2_impl( @@ -905,7 +903,7 @@ impl ValidatorService { ) -> WrappedServiceResponse { let request = request.into_inner(); let response = self.state.handle_checkpoint_request_v2(&request)?; - Ok((tonic::Response::new(response), true)) + Ok((tonic::Response::new(response), Weight::one())) } async fn get_system_state_object_impl( @@ -916,7 +914,7 @@ impl ValidatorService { .state .get_object_cache_reader() .get_sui_system_state_object_unsafe()?; - Ok((tonic::Response::new(response), true)) + Ok((tonic::Response::new(response), Weight::one())) } async fn handle_traffic_req(&self, client: Option) -> Result<(), tonic::Status> { @@ -935,13 +933,15 @@ impl ValidatorService { fn handle_traffic_resp( &self, client: Option, - response: &Result, tonic::Status>, - tally_spam: bool, - ) { - let error: Option = if let Err(status) = response { - Some(SuiError::from(status.clone())) - } else { - None + wrapped_response: WrappedServiceResponse, + ) -> Result, tonic::Status> { + let (error, spam_weight, unwrapped_response) = match wrapped_response { + Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)), + Err(status) => ( + Some(SuiError::from(status.clone())), + Weight::zero(), + Err(status.clone()), + ), }; if let Some(traffic_controller) = self.traffic_controller.clone() { @@ -949,10 +949,11 @@ impl ValidatorService { direct: client, through_fullnode: None, error_weight: error.map(normalize).unwrap_or(Weight::zero()), - tally_spam, + spam_weight, timestamp: SystemTime::now(), }) } + unwrapped_response } } @@ -1061,21 +1062,9 @@ macro_rules! handle_with_decoration { // check if either IP is blocked, in which case return early $self.handle_traffic_req(client.clone()).await?; - // handle response tallying. - match $self.$func_name($request).await { - Ok((result, tally_spam)) => { - let response = Ok(result); - $self.handle_traffic_resp(client, &response, tally_spam); - response - } - Err(err) => { - // TODO: should we set tally_spam to true here for error - // case? - let response = Err(err); - $self.handle_traffic_resp(client, &response, false); - response - } - } + // handle traffic tallying + let wrapped_response = $self.$func_name($request).await; + $self.handle_traffic_resp(client, wrapped_response) }}; } diff --git a/crates/sui-core/src/traffic_controller/mod.rs b/crates/sui-core/src/traffic_controller/mod.rs index dd1508fd073ed..07a3e96eb5d11 100644 --- a/crates/sui-core/src/traffic_controller/mod.rs +++ b/crates/sui-core/src/traffic_controller/mod.rs @@ -364,7 +364,15 @@ async fn handle_spam_tally( metrics: Arc, mem_drainfile_present: bool, ) -> Result<(), reqwest::Error> { - if tally.tally_spam && !policy_config.spam_sample_rate.is_sampled().await { + let sampled = futures::future::join_all(vec![ + tally.spam_weight.is_sampled(), + policy_config.spam_sample_rate.is_sampled(), + ]) + .await + .into_iter() + .all(|sampled| sampled); + + if !sampled { return Ok(()); } let resp = policy.handle_tally(tally.clone()); @@ -649,9 +657,9 @@ impl TrafficSim { client, // TODO add proxy IP for testing None, - // TODO add weight adjustment + // TODO add weight adjustments + Weight::one(), Weight::one(), - true, )); } else { if !currently_blocked { diff --git a/crates/sui-core/src/traffic_controller/policies.rs b/crates/sui-core/src/traffic_controller/policies.rs index 0461b3312fda4..806206dc68028 100644 --- a/crates/sui-core/src/traffic_controller/policies.rs +++ b/crates/sui-core/src/traffic_controller/policies.rs @@ -137,7 +137,7 @@ pub struct TrafficTally { pub direct: Option, pub through_fullnode: Option, pub error_weight: Weight, - pub tally_spam: bool, + pub spam_weight: Weight, pub timestamp: SystemTime, } @@ -146,13 +146,13 @@ impl TrafficTally { direct: Option, through_fullnode: Option, error_weight: Weight, - tally_spam: bool, + spam_weight: Weight, ) -> Self { Self { direct, through_fullnode, error_weight, - tally_spam, + spam_weight, timestamp: SystemTime::now(), } } @@ -421,21 +421,21 @@ mod tests { direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))), through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))), error_weight: Weight::zero(), - tally_spam: true, + spam_weight: Weight::one(), timestamp: SystemTime::now(), }; let bob = TrafficTally { direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))), through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(4, 3, 2, 1))), error_weight: Weight::zero(), - tally_spam: true, + spam_weight: Weight::one(), timestamp: SystemTime::now(), }; let charlie = TrafficTally { direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))), through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8))), error_weight: Weight::zero(), - tally_spam: true, + spam_weight: Weight::one(), timestamp: SystemTime::now(), }; diff --git a/crates/sui-e2e-tests/tests/traffic_control_tests.rs b/crates/sui-e2e-tests/tests/traffic_control_tests.rs index df12c95bee009..c46997a68c8f9 100644 --- a/crates/sui-e2e-tests/tests/traffic_control_tests.rs +++ b/crates/sui-e2e-tests/tests/traffic_control_tests.rs @@ -135,16 +135,16 @@ async fn test_validator_traffic_control_dry_run() -> Result<(), anyhow::Error> { .build() .await; - assert_traffic_control_dry_run(test_cluster, n as usize).await + assert_validator_traffic_control_dry_run(test_cluster, n as usize).await } #[tokio::test] async fn test_fullnode_traffic_control_dry_run() -> Result<(), anyhow::Error> { - let n = 15; + let txn_count = 15; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 1, proxy_blocklist_ttl_sec: 5, - spam_policy_type: PolicyType::TestNConnIP(n - 1), + spam_policy_type: PolicyType::TestNConnIP(txn_count - 1), spam_sample_rate: Weight::one(), // This should never be invoked when set as an error policy // as we are not sending requests that error @@ -156,7 +156,49 @@ async fn test_fullnode_traffic_control_dry_run() -> Result<(), anyhow::Error> { .with_fullnode_policy_config(Some(policy_config)) .build() .await; - assert_traffic_control_dry_run(test_cluster, n as usize).await + + let context = &mut test_cluster.wallet; + let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; + let mut txns = batch_make_transfer_transactions(context, txn_count).await; + assert!( + txns.len() >= txn_count, + "Expect at least {} txns. Do we generate enough gas objects during genesis?", + txn_count, + ); + + let txn = txns.swap_remove(0); + let tx_digest = txn.digest(); + let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); + let params = rpc_params![ + tx_bytes, + signatures, + SuiTransactionBlockResponseOptions::new(), + ExecuteTransactionRequestType::WaitForLocalExecution + ]; + + let response: SuiTransactionBlockResponse = jsonrpc_client + .request("sui_executeTransactionBlock", params.clone()) + .await + .unwrap(); + let SuiTransactionBlockResponse { + digest, + confirmed_local_execution, + .. + } = response; + assert_eq!(&digest, tx_digest); + assert!(confirmed_local_execution.unwrap()); + + // it should take no more than 4 requests to be added to the blocklist + for _ in 0..txn_count { + let response: RpcResult = jsonrpc_client + .request("sui_getTransactionBlock", rpc_params![*tx_digest]) + .await; + assert!( + response.is_ok(), + "Expected request to succeed in dry-run mode" + ); + } + Ok(()) } #[tokio::test] @@ -177,16 +219,16 @@ async fn test_validator_traffic_control_spam_blocked() -> Result<(), anyhow::Err .set_network_config(network_config) .build() .await; - assert_traffic_control_spam_blocked(test_cluster, n as usize).await + assert_validator_traffic_control_spam_blocked(test_cluster, n as usize).await } #[tokio::test] async fn test_fullnode_traffic_control_spam_blocked() -> Result<(), anyhow::Error> { - let n = 15; + let txn_count = 15; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 3, // Test that any N requests will cause an IP to be added to the blocklist. - spam_policy_type: PolicyType::TestNConnIP(n - 1), + spam_policy_type: PolicyType::TestNConnIP(txn_count - 1), spam_sample_rate: Weight::one(), dry_run: false, ..Default::default() @@ -195,7 +237,57 @@ async fn test_fullnode_traffic_control_spam_blocked() -> Result<(), anyhow::Erro .with_fullnode_policy_config(Some(policy_config)) .build() .await; - assert_traffic_control_spam_blocked(test_cluster, n as usize).await + + let context = &mut test_cluster.wallet; + let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; + + let mut txns = batch_make_transfer_transactions(context, txn_count).await; + assert!( + txns.len() >= txn_count, + "Expect at least {} txns. Do we generate enough gas objects during genesis?", + txn_count, + ); + + let txn = txns.swap_remove(0); + let tx_digest = txn.digest(); + let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); + let params = rpc_params![ + tx_bytes, + signatures, + SuiTransactionBlockResponseOptions::new(), + ExecuteTransactionRequestType::WaitForLocalExecution + ]; + + let response: SuiTransactionBlockResponse = jsonrpc_client + .request("sui_executeTransactionBlock", params.clone()) + .await + .unwrap(); + let SuiTransactionBlockResponse { + digest, + confirmed_local_execution, + .. + } = response; + assert_eq!(&digest, tx_digest); + assert!(confirmed_local_execution.unwrap()); + + // it should take no more than 4 requests to be added to the blocklist + for _ in 0..txn_count { + let response: RpcResult = jsonrpc_client + .request("sui_getTransactionBlock", rpc_params![*tx_digest]) + .await; + if let Err(err) = response { + // TODO: fix validator blocking error handling such that the error message + // is not misleading. The full error message currently is the following: + // Transaction execution failed due to issues with transaction inputs, please + // review the errors and try again: Too many requests. + assert!( + err.to_string().contains("Too many requests"), + "Error not due to spam policy" + ); + return Ok(()); + } + } + panic!("Expected spam policy to trigger within {txn_count} requests"); } #[tokio::test] @@ -228,18 +320,18 @@ async fn test_validator_traffic_control_spam_delegated() -> Result<(), anyhow::E .set_network_config(network_config) .build() .await; - assert_traffic_control_spam_delegated(test_cluster, n as usize, port).await + assert_validator_traffic_control_spam_delegated(test_cluster, n as usize, port).await } #[tokio::test] async fn test_fullnode_traffic_control_spam_delegated() -> Result<(), anyhow::Error> { - let n = 10; + let txn_count = 10; let port = 65001; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 120, proxy_blocklist_ttl_sec: 120, // Test that any N - 1 requests will cause an IP to be added to the blocklist. - spam_policy_type: PolicyType::TestNConnIP(n - 1), + spam_policy_type: PolicyType::TestNConnIP(txn_count - 1), spam_sample_rate: Weight::one(), dry_run: false, ..Default::default() @@ -258,7 +350,57 @@ async fn test_fullnode_traffic_control_spam_delegated() -> Result<(), anyhow::Er .with_fullnode_fw_config(Some(firewall_config.clone())) .build() .await; - assert_traffic_control_spam_delegated(test_cluster, n as usize, port).await + + // start test firewall server + let mut server = NodeFwTestServer::new(); + server.start(listen_port).await; + // await for the server to start + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + let context = &mut test_cluster.wallet; + let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; + let mut txns = batch_make_transfer_transactions(context, txn_count).await; + assert!( + txns.len() >= txn_count, + "Expect at least {} txns. Do we generate enough gas objects during genesis?", + txn_count, + ); + + let txn = txns.swap_remove(0); + let tx_digest = txn.digest(); + let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); + let params = rpc_params![ + tx_bytes, + signatures, + SuiTransactionBlockResponseOptions::new(), + ExecuteTransactionRequestType::WaitForLocalExecution + ]; + + // it should take no more than 4 requests to be added to the blocklist + let response: SuiTransactionBlockResponse = jsonrpc_client + .request("sui_executeTransactionBlock", params.clone()) + .await + .unwrap(); + let SuiTransactionBlockResponse { + digest, + confirmed_local_execution, + .. + } = response; + assert_eq!(&digest, tx_digest); + assert!(confirmed_local_execution.unwrap()); + + for _ in 0..txn_count { + let response: RpcResult = jsonrpc_client + .request("sui_getTransactionBlock", rpc_params![*tx_digest]) + .await; + assert!(response.is_ok(), "Expected request to succeed"); + } + let fw_blocklist = server.list_addresses_rpc().await; + assert!( + !fw_blocklist.is_empty(), + "Expected blocklist to be non-empty" + ); + server.stop().await; + Ok(()) } #[tokio::test] @@ -511,7 +653,7 @@ async fn assert_traffic_control_ok(mut test_cluster: TestCluster) -> Result<(), /// Test that in dry-run mode, actions that would otherwise /// lead to request blocking (in this case, a spammy client) /// are allowed to proceed. -async fn assert_traffic_control_dry_run( +async fn assert_validator_traffic_control_dry_run( mut test_cluster: TestCluster, txn_count: usize, ) -> Result<(), anyhow::Error> { @@ -525,6 +667,7 @@ async fn assert_traffic_control_dry_run( ); let txn = txns.swap_remove(0); + let tx_digest = txn.digest(); let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); let params = rpc_params![ tx_bytes, @@ -533,10 +676,22 @@ async fn assert_traffic_control_dry_run( ExecuteTransactionRequestType::WaitForLocalExecution ]; + let response: SuiTransactionBlockResponse = jsonrpc_client + .request("sui_executeTransactionBlock", params.clone()) + .await + .unwrap(); + let SuiTransactionBlockResponse { + digest, + confirmed_local_execution, + .. + } = response; + assert_eq!(&digest, tx_digest); + assert!(confirmed_local_execution.unwrap()); + // it should take no more than 4 requests to be added to the blocklist for _ in 0..txn_count { let response: RpcResult = jsonrpc_client - .request("sui_executeTransactionBlock", params.clone()) + .request("sui_getTransactionBlock", rpc_params![*tx_digest]) .await; assert!( response.is_ok(), @@ -546,7 +701,7 @@ async fn assert_traffic_control_dry_run( Ok(()) } -async fn assert_traffic_control_spam_blocked( +async fn assert_validator_traffic_control_spam_blocked( mut test_cluster: TestCluster, txn_count: usize, ) -> Result<(), anyhow::Error> { @@ -561,6 +716,7 @@ async fn assert_traffic_control_spam_blocked( ); let txn = txns.swap_remove(0); + let tx_digest = txn.digest(); let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); let params = rpc_params![ tx_bytes, @@ -569,10 +725,22 @@ async fn assert_traffic_control_spam_blocked( ExecuteTransactionRequestType::WaitForLocalExecution ]; + let response: SuiTransactionBlockResponse = jsonrpc_client + .request("sui_executeTransactionBlock", params.clone()) + .await + .unwrap(); + let SuiTransactionBlockResponse { + digest, + confirmed_local_execution, + .. + } = response; + assert_eq!(&digest, tx_digest); + assert!(confirmed_local_execution.unwrap()); + // it should take no more than 4 requests to be added to the blocklist for _ in 0..txn_count { let response: RpcResult = jsonrpc_client - .request("sui_executeTransactionBlock", params.clone()) + .request("sui_getTransactionBlock", rpc_params![*tx_digest]) .await; if let Err(err) = response { // TODO: fix validator blocking error handling such that the error message @@ -589,7 +757,7 @@ async fn assert_traffic_control_spam_blocked( panic!("Expected spam policy to trigger within {txn_count} requests"); } -async fn assert_traffic_control_spam_delegated( +async fn assert_validator_traffic_control_spam_delegated( mut test_cluster: TestCluster, txn_count: usize, listen_port: u16, @@ -609,6 +777,7 @@ async fn assert_traffic_control_spam_delegated( ); let txn = txns.swap_remove(0); + let tx_digest = txn.digest(); let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); let params = rpc_params![ tx_bytes, @@ -618,9 +787,21 @@ async fn assert_traffic_control_spam_delegated( ]; // it should take no more than 4 requests to be added to the blocklist + let response: SuiTransactionBlockResponse = jsonrpc_client + .request("sui_executeTransactionBlock", params.clone()) + .await + .unwrap(); + let SuiTransactionBlockResponse { + digest, + confirmed_local_execution, + .. + } = response; + assert_eq!(&digest, tx_digest); + assert!(confirmed_local_execution.unwrap()); + for _ in 0..txn_count { let response: RpcResult = jsonrpc_client - .request("sui_executeTransactionBlock", params.clone()) + .request("sui_getTransactionBlock", rpc_params![*tx_digest]) .await; assert!(response.is_ok(), "Expected request to succeed"); } diff --git a/crates/sui-json-rpc/src/axum_router.rs b/crates/sui-json-rpc/src/axum_router.rs index e199ffcf3ae26..c724ae1f4c8d0 100644 --- a/crates/sui-json-rpc/src/axum_router.rs +++ b/crates/sui-json-rpc/src/axum_router.rs @@ -165,13 +165,14 @@ async fn process_raw_request( return blocked_response; } } - let tally_spam = tally_spam_for_method(&request.method); - let response = process_request(request, api_version, service.call_data()).await; // handle response tallying + let method = request.method.to_string(); + let response = process_request(request, api_version, service.call_data()).await; if let Some(traffic_controller) = &service.traffic_controller { - handle_traffic_resp(traffic_controller.clone(), client, &response, tally_spam); + handle_traffic_resp(traffic_controller.clone(), client, &response, method); } + response } else if let Ok(_batch) = serde_json::from_str::>(raw_request) { MethodResponse::error( @@ -198,12 +199,12 @@ async fn handle_traffic_req( } } -fn tally_spam_for_method(method: &str) -> bool { +fn spam_weight_for_method(method: String) -> Weight { // unless request requires gas payment, count against // spam tally - match method { - "sui_executeTransactionBlock" | "sui_devInspectTransactionBlock" => false, - _ => true, + match method.as_str() { + "sui_executeTransactionBlock" | "sui_devInspectTransactionBlock" => Weight::zero(), + _ => Weight::one(), } } @@ -211,14 +212,14 @@ fn handle_traffic_resp( traffic_controller: Arc, client: Option, response: &MethodResponse, - tally_spam: bool, + method: String, ) { let error = response.error_code.map(ErrorCode::from); traffic_controller.tally(TrafficTally { direct: client, through_fullnode: None, error_weight: error.map(normalize).unwrap_or(Weight::zero()), - tally_spam, + spam_weight: spam_weight_for_method(method), timestamp: SystemTime::now(), }); } From b4a59bfe457f105903117939de15f803fdebbf16 Mon Sep 17 00:00:00 2001 From: William Smith Date: Fri, 21 Jun 2024 16:47:05 -0700 Subject: [PATCH 3/3] count everything as spam on rpc nodes --- crates/sui-core/src/authority_server.rs | 11 +- crates/sui-core/src/traffic_controller/mod.rs | 12 +- .../tests/traffic_control_tests.rs | 217 +++++++----------- crates/sui-json-rpc/src/axum_router.rs | 22 +- crates/sui-types/src/traffic_control.rs | 7 +- crates/test-cluster/src/lib.rs | 5 + 6 files changed, 107 insertions(+), 167 deletions(-) diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index a034fcbc0b31a..703bdbf33358b 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -971,7 +971,7 @@ fn make_tonic_request_for_testing(message: T) -> tonic::Request { // TODO: refine error matching here fn normalize(err: SuiError) -> Weight { - match err { + match dbg!(err) { SuiError::UserInputError { .. } | SuiError::InvalidSignature { .. } | SuiError::SignerSignatureAbsent { .. } @@ -990,14 +990,7 @@ fn normalize(err: SuiError) -> Weight { macro_rules! handle_with_decoration { ($self:ident, $func_name:ident, $request:ident) => {{ if $self.client_id_source.is_none() { - return match $self.$func_name($request).await { - Ok((result, _)) => { - Ok(result) - } - Err(err) => { - Err(err) - } - } + return $self.$func_name($request).await.map(|(result, _)| result); } let client = match $self.client_id_source.as_ref().unwrap() { diff --git a/crates/sui-core/src/traffic_controller/mod.rs b/crates/sui-core/src/traffic_controller/mod.rs index 07a3e96eb5d11..a133a90adaff0 100644 --- a/crates/sui-core/src/traffic_controller/mod.rs +++ b/crates/sui-core/src/traffic_controller/mod.rs @@ -330,7 +330,7 @@ async fn handle_error_tally( metrics: Arc, mem_drainfile_present: bool, ) -> Result<(), reqwest::Error> { - if !tally.error_weight.is_sampled().await { + if !tally.error_weight.is_sampled() { return Ok(()); } let resp = policy.handle_tally(tally.clone()); @@ -364,15 +364,7 @@ async fn handle_spam_tally( metrics: Arc, mem_drainfile_present: bool, ) -> Result<(), reqwest::Error> { - let sampled = futures::future::join_all(vec![ - tally.spam_weight.is_sampled(), - policy_config.spam_sample_rate.is_sampled(), - ]) - .await - .into_iter() - .all(|sampled| sampled); - - if !sampled { + if !(tally.spam_weight.is_sampled() && policy_config.spam_sample_rate.is_sampled()) { return Ok(()); } let resp = policy.handle_tally(tally.clone()); diff --git a/crates/sui-e2e-tests/tests/traffic_control_tests.rs b/crates/sui-e2e-tests/tests/traffic_control_tests.rs index c46997a68c8f9..9fd583c017c95 100644 --- a/crates/sui-e2e-tests/tests/traffic_control_tests.rs +++ b/crates/sui-e2e-tests/tests/traffic_control_tests.rs @@ -4,12 +4,15 @@ //! NB: Most tests in this module expect real network connections and interactions, thus //! they should nearly all be tokio::test rather than simtest. +use core::panic; use jsonrpsee::{ core::{client::ClientT, RpcResult}, rpc_params, }; use std::fs::File; use std::time::Duration; +use sui_core::authority_client::make_network_authority_clients_with_network_config; +use sui_core::authority_client::AuthorityAPI; use sui_core::traffic_controller::{ nodefw_test_server::NodeFwTestServer, TrafficController, TrafficSim, }; @@ -17,6 +20,7 @@ use sui_json_rpc_types::{ SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions, }; use sui_macros::sim_test; +use sui_network::default_mysten_network_config; use sui_swarm_config::network_config_builder::ConfigBuilder; use sui_test_transaction_builder::batch_make_transfer_transactions; use sui_types::{ @@ -157,11 +161,11 @@ async fn test_fullnode_traffic_control_dry_run() -> Result<(), anyhow::Error> { .build() .await; - let context = &mut test_cluster.wallet; + let context = test_cluster.wallet; let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; - let mut txns = batch_make_transfer_transactions(context, txn_count).await; + let mut txns = batch_make_transfer_transactions(&context, txn_count as usize).await; assert!( - txns.len() >= txn_count, + txns.len() >= txn_count as usize, "Expect at least {} txns. Do we generate enough gas objects during genesis?", txn_count, ); @@ -202,24 +206,47 @@ async fn test_fullnode_traffic_control_dry_run() -> Result<(), anyhow::Error> { } #[tokio::test] -async fn test_validator_traffic_control_spam_blocked() -> Result<(), anyhow::Error> { +async fn test_validator_traffic_control_error_blocked() -> Result<(), anyhow::Error> { let n = 5; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 1, // Test that any N requests will cause an IP to be added to the blocklist. - spam_policy_type: PolicyType::TestNConnIP(n - 1), - spam_sample_rate: Weight::one(), + error_policy_type: PolicyType::TestNConnIP(n - 1), dry_run: false, ..Default::default() }; let network_config = ConfigBuilder::new_with_temp_dir() .with_policy_config(Some(policy_config)) .build(); - let test_cluster = TestClusterBuilder::new() + let committee = network_config.committee_with_network(); + let _test_cluster = TestClusterBuilder::new() .set_network_config(network_config) .build() .await; - assert_validator_traffic_control_spam_blocked(test_cluster, n as usize).await + let local_clients = make_network_authority_clients_with_network_config( + &committee, + &default_mysten_network_config(), + ) + .unwrap(); + let (_, auth_client) = local_clients.first_key_value().unwrap(); + + // transaction signed using user wallet from a different chain/genesis, + // therefore we should fail with UserInputError + let other_cluster = TestClusterBuilder::new().build().await; + + let mut txns = batch_make_transfer_transactions(&other_cluster.wallet, n as usize).await; + let tx = txns.swap_remove(0); + + // it should take no more than 4 requests to be added to the blocklist + for _ in 0..n { + let response = auth_client.handle_transaction(tx.clone(), None).await; + if let Err(err) = response { + if err.to_string().contains("Too many requests") { + return Ok(()); + } + } + } + panic!("Expected spam policy to trigger within {n} requests"); } #[tokio::test] @@ -238,12 +265,12 @@ async fn test_fullnode_traffic_control_spam_blocked() -> Result<(), anyhow::Erro .build() .await; - let context = &mut test_cluster.wallet; + let context = test_cluster.wallet; let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; - let mut txns = batch_make_transfer_transactions(context, txn_count).await; + let mut txns = batch_make_transfer_transactions(&context, txn_count as usize).await; assert!( - txns.len() >= txn_count, + txns.len() >= txn_count as usize, "Expect at least {} txns. Do we generate enough gas objects during genesis?", txn_count, ); @@ -291,15 +318,14 @@ async fn test_fullnode_traffic_control_spam_blocked() -> Result<(), anyhow::Erro } #[tokio::test] -async fn test_validator_traffic_control_spam_delegated() -> Result<(), anyhow::Error> { - let n = 4; +async fn test_validator_traffic_control_error_delegated() -> Result<(), anyhow::Error> { + let n = 5; let port = 65000; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 120, proxy_blocklist_ttl_sec: 120, // Test that any N - 1 requests will cause an IP to be added to the blocklist. - spam_policy_type: PolicyType::TestNConnIP(n - 1), - spam_sample_rate: Weight::one(), + error_policy_type: PolicyType::TestNConnIP(n - 1), dry_run: false, ..Default::default() }; @@ -316,11 +342,47 @@ async fn test_validator_traffic_control_spam_delegated() -> Result<(), anyhow::E .with_policy_config(Some(policy_config)) .with_firewall_config(Some(firewall_config)) .build(); - let test_cluster = TestClusterBuilder::new() + let committee = network_config.committee_with_network(); + let _test_cluster = TestClusterBuilder::new() .set_network_config(network_config) .build() .await; - assert_validator_traffic_control_spam_delegated(test_cluster, n as usize, port).await + let local_clients = make_network_authority_clients_with_network_config( + &committee, + &default_mysten_network_config(), + ) + .unwrap(); + let (_, auth_client) = local_clients.first_key_value().unwrap(); + + // transaction signed using user wallet from a different chain/genesis, + // therefore we should fail with UserInputError + let other_cluster = TestClusterBuilder::new().build().await; + + let mut txns = batch_make_transfer_transactions(&other_cluster.wallet, n as usize).await; + let tx = txns.swap_remove(0); + + // start test firewall server + let mut server = NodeFwTestServer::new(); + server.start(port).await; + // await for the server to start + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + // it should take no more than 4 requests to be added to the blocklist + for _ in 0..n { + let response = auth_client.handle_transaction(tx.clone(), None).await; + if let Err(err) = response { + if err.to_string().contains("Too many requests") { + return Ok(()); + } + } + } + let fw_blocklist = server.list_addresses_rpc().await; + assert!( + !fw_blocklist.is_empty(), + "Expected blocklist to be non-empty" + ); + server.stop().await; + Ok(()) } #[tokio::test] @@ -353,14 +415,14 @@ async fn test_fullnode_traffic_control_spam_delegated() -> Result<(), anyhow::Er // start test firewall server let mut server = NodeFwTestServer::new(); - server.start(listen_port).await; + server.start(port).await; // await for the server to start tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - let context = &mut test_cluster.wallet; + let context = test_cluster.wallet; let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; - let mut txns = batch_make_transfer_transactions(context, txn_count).await; + let mut txns = batch_make_transfer_transactions(&context, txn_count as usize).await; assert!( - txns.len() >= txn_count, + txns.len() >= txn_count as usize, "Expect at least {} txns. Do we generate enough gas objects during genesis?", txn_count, ); @@ -700,116 +762,3 @@ async fn assert_validator_traffic_control_dry_run( } Ok(()) } - -async fn assert_validator_traffic_control_spam_blocked( - mut test_cluster: TestCluster, - txn_count: usize, -) -> Result<(), anyhow::Error> { - let context = &mut test_cluster.wallet; - let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; - - let mut txns = batch_make_transfer_transactions(context, txn_count).await; - assert!( - txns.len() >= txn_count, - "Expect at least {} txns. Do we generate enough gas objects during genesis?", - txn_count, - ); - - let txn = txns.swap_remove(0); - let tx_digest = txn.digest(); - let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); - let params = rpc_params![ - tx_bytes, - signatures, - SuiTransactionBlockResponseOptions::new(), - ExecuteTransactionRequestType::WaitForLocalExecution - ]; - - let response: SuiTransactionBlockResponse = jsonrpc_client - .request("sui_executeTransactionBlock", params.clone()) - .await - .unwrap(); - let SuiTransactionBlockResponse { - digest, - confirmed_local_execution, - .. - } = response; - assert_eq!(&digest, tx_digest); - assert!(confirmed_local_execution.unwrap()); - - // it should take no more than 4 requests to be added to the blocklist - for _ in 0..txn_count { - let response: RpcResult = jsonrpc_client - .request("sui_getTransactionBlock", rpc_params![*tx_digest]) - .await; - if let Err(err) = response { - // TODO: fix validator blocking error handling such that the error message - // is not misleading. The full error message currently is the following: - // Transaction execution failed due to issues with transaction inputs, please - // review the errors and try again: Too many requests. - assert!( - err.to_string().contains("Too many requests"), - "Error not due to spam policy" - ); - return Ok(()); - } - } - panic!("Expected spam policy to trigger within {txn_count} requests"); -} - -async fn assert_validator_traffic_control_spam_delegated( - mut test_cluster: TestCluster, - txn_count: usize, - listen_port: u16, -) -> Result<(), anyhow::Error> { - // start test firewall server - let mut server = NodeFwTestServer::new(); - server.start(listen_port).await; - // await for the server to start - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - let context = &mut test_cluster.wallet; - let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; - let mut txns = batch_make_transfer_transactions(context, txn_count).await; - assert!( - txns.len() >= txn_count, - "Expect at least {} txns. Do we generate enough gas objects during genesis?", - txn_count, - ); - - let txn = txns.swap_remove(0); - let tx_digest = txn.digest(); - let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures(); - let params = rpc_params![ - tx_bytes, - signatures, - SuiTransactionBlockResponseOptions::new(), - ExecuteTransactionRequestType::WaitForLocalExecution - ]; - - // it should take no more than 4 requests to be added to the blocklist - let response: SuiTransactionBlockResponse = jsonrpc_client - .request("sui_executeTransactionBlock", params.clone()) - .await - .unwrap(); - let SuiTransactionBlockResponse { - digest, - confirmed_local_execution, - .. - } = response; - assert_eq!(&digest, tx_digest); - assert!(confirmed_local_execution.unwrap()); - - for _ in 0..txn_count { - let response: RpcResult = jsonrpc_client - .request("sui_getTransactionBlock", rpc_params![*tx_digest]) - .await; - assert!(response.is_ok(), "Expected request to succeed"); - } - let fw_blocklist = server.list_addresses_rpc().await; - assert!( - !fw_blocklist.is_empty(), - "Expected blocklist to be non-empty" - ); - server.stop().await; - Ok(()) -} diff --git a/crates/sui-json-rpc/src/axum_router.rs b/crates/sui-json-rpc/src/axum_router.rs index c724ae1f4c8d0..f7ef9d7693b67 100644 --- a/crates/sui-json-rpc/src/axum_router.rs +++ b/crates/sui-json-rpc/src/axum_router.rs @@ -167,10 +167,9 @@ async fn process_raw_request( } // handle response tallying - let method = request.method.to_string(); let response = process_request(request, api_version, service.call_data()).await; if let Some(traffic_controller) = &service.traffic_controller { - handle_traffic_resp(traffic_controller.clone(), client, &response, method); + handle_traffic_resp(traffic_controller.clone(), client, &response); } response @@ -199,27 +198,24 @@ async fn handle_traffic_req( } } -fn spam_weight_for_method(method: String) -> Weight { - // unless request requires gas payment, count against - // spam tally - match method.as_str() { - "sui_executeTransactionBlock" | "sui_devInspectTransactionBlock" => Weight::zero(), - _ => Weight::one(), - } -} - fn handle_traffic_resp( traffic_controller: Arc, client: Option, response: &MethodResponse, - method: String, ) { let error = response.error_code.map(ErrorCode::from); traffic_controller.tally(TrafficTally { direct: client, through_fullnode: None, error_weight: error.map(normalize).unwrap_or(Weight::zero()), - spam_weight: spam_weight_for_method(method), + // For now, count everything as spam with equal weight + // on the rpc node side, including gas-charging endpoints + // such as `sui_executeTransactionBlock`, as this can enable + // node operators who wish to rate limit their transcation + // traffic and incentivize high volume clients to choose a + // suitable rpc provider (or run their own). Later we may want + // to provide a weight distribution based on the method being called. + spam_weight: Weight::one(), timestamp: SystemTime::now(), }); } diff --git a/crates/sui-types/src/traffic_control.rs b/crates/sui-types/src/traffic_control.rs index c5d5d41116f0a..e976081abcecd 100644 --- a/crates/sui-types/src/traffic_control.rs +++ b/crates/sui-types/src/traffic_control.rs @@ -53,7 +53,7 @@ impl Weight { self.0 } - pub async fn is_sampled(&self) -> bool { + pub fn is_sampled(&self) -> bool { let mut rng = rand::thread_rng(); let sample = rand::distributions::Uniform::new(0.0, 1.0).sample(&mut rng); sample <= self.value() @@ -201,6 +201,11 @@ pub struct PolicyConfig { #[serde(default = "default_channel_capacity")] pub channel_capacity: usize, #[serde(default = "default_spam_sample_rate")] + /// Note that this sample policy is applied on top of the + /// endpoint-specific sample policy (not configurable) which + /// weighs endpoints by the relative effort required to serve + /// them. Therefore a sample rate of N will yield an actual + /// sample rate <= N. pub spam_sample_rate: Weight, #[serde(default = "default_dry_run")] pub dry_run: bool, diff --git a/crates/test-cluster/src/lib.rs b/crates/test-cluster/src/lib.rs index 7b2b084832ab5..78f7d65a077cd 100644 --- a/crates/test-cluster/src/lib.rs +++ b/crates/test-cluster/src/lib.rs @@ -23,6 +23,7 @@ use sui_bridge::types::CertifiedBridgeAction; use sui_bridge::types::VerifiedCertifiedBridgeAction; use sui_bridge::utils::publish_and_register_coins_return_add_coins_on_sui_action; use sui_bridge::utils::wait_for_server_to_be_up; +use sui_config::genesis::Genesis; use sui_config::local_ip_utils::get_available_port; use sui_config::node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange}; use sui_config::{Config, SUI_CLIENT_CONFIG, SUI_NETWORK_CONFIG}; @@ -210,6 +211,10 @@ impl TestCluster { self.swarm.active_validators().map(|v| v.name()).collect() } + pub fn get_genesis(&self) -> Genesis { + self.swarm.config().genesis.clone() + } + pub fn stop_node(&self, name: &AuthorityName) { self.swarm.node(name).unwrap().stop(); }