Skip to content

Commit

Permalink
count everything as spam on rpc nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
williampsmith committed Jun 27, 2024
1 parent 0a75fd9 commit b4a59bf
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 167 deletions.
11 changes: 2 additions & 9 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {

// TODO: refine error matching here
fn normalize(err: SuiError) -> Weight {
match err {
match dbg!(err) {
SuiError::UserInputError { .. }
| SuiError::InvalidSignature { .. }
| SuiError::SignerSignatureAbsent { .. }
Expand All @@ -990,14 +990,7 @@ fn normalize(err: SuiError) -> Weight {
macro_rules! handle_with_decoration {
($self:ident, $func_name:ident, $request:ident) => {{
if $self.client_id_source.is_none() {
return match $self.$func_name($request).await {
Ok((result, _)) => {
Ok(result)
}
Err(err) => {
Err(err)
}
}
return $self.$func_name($request).await.map(|(result, _)| result);
}

let client = match $self.client_id_source.as_ref().unwrap() {
Expand Down
12 changes: 2 additions & 10 deletions crates/sui-core/src/traffic_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ async fn handle_error_tally(
metrics: Arc<TrafficControllerMetrics>,
mem_drainfile_present: bool,
) -> Result<(), reqwest::Error> {
if !tally.error_weight.is_sampled().await {
if !tally.error_weight.is_sampled() {
return Ok(());
}
let resp = policy.handle_tally(tally.clone());
Expand Down Expand Up @@ -364,15 +364,7 @@ async fn handle_spam_tally(
metrics: Arc<TrafficControllerMetrics>,
mem_drainfile_present: bool,
) -> Result<(), reqwest::Error> {
let sampled = futures::future::join_all(vec![
tally.spam_weight.is_sampled(),
policy_config.spam_sample_rate.is_sampled(),
])
.await
.into_iter()
.all(|sampled| sampled);

if !sampled {
if !(tally.spam_weight.is_sampled() && policy_config.spam_sample_rate.is_sampled()) {
return Ok(());
}
let resp = policy.handle_tally(tally.clone());
Expand Down
217 changes: 83 additions & 134 deletions crates/sui-e2e-tests/tests/traffic_control_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@
//! NB: Most tests in this module expect real network connections and interactions, thus
//! they should nearly all be tokio::test rather than simtest.
use core::panic;
use jsonrpsee::{
core::{client::ClientT, RpcResult},
rpc_params,
};
use std::fs::File;
use std::time::Duration;
use sui_core::authority_client::make_network_authority_clients_with_network_config;
use sui_core::authority_client::AuthorityAPI;
use sui_core::traffic_controller::{
nodefw_test_server::NodeFwTestServer, TrafficController, TrafficSim,
};
use sui_json_rpc_types::{
SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions,
};
use sui_macros::sim_test;
use sui_network::default_mysten_network_config;
use sui_swarm_config::network_config_builder::ConfigBuilder;
use sui_test_transaction_builder::batch_make_transfer_transactions;
use sui_types::{
Expand Down Expand Up @@ -157,11 +161,11 @@ async fn test_fullnode_traffic_control_dry_run() -> Result<(), anyhow::Error> {
.build()
.await;

let context = &mut test_cluster.wallet;
let context = test_cluster.wallet;
let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client;
let mut txns = batch_make_transfer_transactions(context, txn_count).await;
let mut txns = batch_make_transfer_transactions(&context, txn_count as usize).await;
assert!(
txns.len() >= txn_count,
txns.len() >= txn_count as usize,
"Expect at least {} txns. Do we generate enough gas objects during genesis?",
txn_count,
);
Expand Down Expand Up @@ -202,24 +206,47 @@ async fn test_fullnode_traffic_control_dry_run() -> Result<(), anyhow::Error> {
}

#[tokio::test]
async fn test_validator_traffic_control_spam_blocked() -> Result<(), anyhow::Error> {
async fn test_validator_traffic_control_error_blocked() -> Result<(), anyhow::Error> {
let n = 5;
let policy_config = PolicyConfig {
connection_blocklist_ttl_sec: 1,
// Test that any N requests will cause an IP to be added to the blocklist.
spam_policy_type: PolicyType::TestNConnIP(n - 1),
spam_sample_rate: Weight::one(),
error_policy_type: PolicyType::TestNConnIP(n - 1),
dry_run: false,
..Default::default()
};
let network_config = ConfigBuilder::new_with_temp_dir()
.with_policy_config(Some(policy_config))
.build();
let test_cluster = TestClusterBuilder::new()
let committee = network_config.committee_with_network();
let _test_cluster = TestClusterBuilder::new()
.set_network_config(network_config)
.build()
.await;
assert_validator_traffic_control_spam_blocked(test_cluster, n as usize).await
let local_clients = make_network_authority_clients_with_network_config(
&committee,
&default_mysten_network_config(),
)
.unwrap();
let (_, auth_client) = local_clients.first_key_value().unwrap();

// transaction signed using user wallet from a different chain/genesis,
// therefore we should fail with UserInputError
let other_cluster = TestClusterBuilder::new().build().await;

let mut txns = batch_make_transfer_transactions(&other_cluster.wallet, n as usize).await;
let tx = txns.swap_remove(0);

// it should take no more than 4 requests to be added to the blocklist
for _ in 0..n {
let response = auth_client.handle_transaction(tx.clone(), None).await;
if let Err(err) = response {
if err.to_string().contains("Too many requests") {
return Ok(());
}
}
}
panic!("Expected spam policy to trigger within {n} requests");
}

#[tokio::test]
Expand All @@ -238,12 +265,12 @@ async fn test_fullnode_traffic_control_spam_blocked() -> Result<(), anyhow::Erro
.build()
.await;

let context = &mut test_cluster.wallet;
let context = test_cluster.wallet;
let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client;

let mut txns = batch_make_transfer_transactions(context, txn_count).await;
let mut txns = batch_make_transfer_transactions(&context, txn_count as usize).await;
assert!(
txns.len() >= txn_count,
txns.len() >= txn_count as usize,
"Expect at least {} txns. Do we generate enough gas objects during genesis?",
txn_count,
);
Expand Down Expand Up @@ -291,15 +318,14 @@ async fn test_fullnode_traffic_control_spam_blocked() -> Result<(), anyhow::Erro
}

#[tokio::test]
async fn test_validator_traffic_control_spam_delegated() -> Result<(), anyhow::Error> {
let n = 4;
async fn test_validator_traffic_control_error_delegated() -> Result<(), anyhow::Error> {
let n = 5;
let port = 65000;
let policy_config = PolicyConfig {
connection_blocklist_ttl_sec: 120,
proxy_blocklist_ttl_sec: 120,
// Test that any N - 1 requests will cause an IP to be added to the blocklist.
spam_policy_type: PolicyType::TestNConnIP(n - 1),
spam_sample_rate: Weight::one(),
error_policy_type: PolicyType::TestNConnIP(n - 1),
dry_run: false,
..Default::default()
};
Expand All @@ -316,11 +342,47 @@ async fn test_validator_traffic_control_spam_delegated() -> Result<(), anyhow::E
.with_policy_config(Some(policy_config))
.with_firewall_config(Some(firewall_config))
.build();
let test_cluster = TestClusterBuilder::new()
let committee = network_config.committee_with_network();
let _test_cluster = TestClusterBuilder::new()
.set_network_config(network_config)
.build()
.await;
assert_validator_traffic_control_spam_delegated(test_cluster, n as usize, port).await
let local_clients = make_network_authority_clients_with_network_config(
&committee,
&default_mysten_network_config(),
)
.unwrap();
let (_, auth_client) = local_clients.first_key_value().unwrap();

// transaction signed using user wallet from a different chain/genesis,
// therefore we should fail with UserInputError
let other_cluster = TestClusterBuilder::new().build().await;

let mut txns = batch_make_transfer_transactions(&other_cluster.wallet, n as usize).await;
let tx = txns.swap_remove(0);

// start test firewall server
let mut server = NodeFwTestServer::new();
server.start(port).await;
// await for the server to start
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

// it should take no more than 4 requests to be added to the blocklist
for _ in 0..n {
let response = auth_client.handle_transaction(tx.clone(), None).await;
if let Err(err) = response {
if err.to_string().contains("Too many requests") {
return Ok(());
}
}
}
let fw_blocklist = server.list_addresses_rpc().await;
assert!(
!fw_blocklist.is_empty(),
"Expected blocklist to be non-empty"
);
server.stop().await;
Ok(())
}

#[tokio::test]
Expand Down Expand Up @@ -353,14 +415,14 @@ async fn test_fullnode_traffic_control_spam_delegated() -> Result<(), anyhow::Er

// start test firewall server
let mut server = NodeFwTestServer::new();
server.start(listen_port).await;
server.start(port).await;
// await for the server to start
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let context = &mut test_cluster.wallet;
let context = test_cluster.wallet;
let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client;
let mut txns = batch_make_transfer_transactions(context, txn_count).await;
let mut txns = batch_make_transfer_transactions(&context, txn_count as usize).await;
assert!(
txns.len() >= txn_count,
txns.len() >= txn_count as usize,
"Expect at least {} txns. Do we generate enough gas objects during genesis?",
txn_count,
);
Expand Down Expand Up @@ -700,116 +762,3 @@ async fn assert_validator_traffic_control_dry_run(
}
Ok(())
}

async fn assert_validator_traffic_control_spam_blocked(
mut test_cluster: TestCluster,
txn_count: usize,
) -> Result<(), anyhow::Error> {
let context = &mut test_cluster.wallet;
let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client;

let mut txns = batch_make_transfer_transactions(context, txn_count).await;
assert!(
txns.len() >= txn_count,
"Expect at least {} txns. Do we generate enough gas objects during genesis?",
txn_count,
);

let txn = txns.swap_remove(0);
let tx_digest = txn.digest();
let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures();
let params = rpc_params![
tx_bytes,
signatures,
SuiTransactionBlockResponseOptions::new(),
ExecuteTransactionRequestType::WaitForLocalExecution
];

let response: SuiTransactionBlockResponse = jsonrpc_client
.request("sui_executeTransactionBlock", params.clone())
.await
.unwrap();
let SuiTransactionBlockResponse {
digest,
confirmed_local_execution,
..
} = response;
assert_eq!(&digest, tx_digest);
assert!(confirmed_local_execution.unwrap());

// it should take no more than 4 requests to be added to the blocklist
for _ in 0..txn_count {
let response: RpcResult<SuiTransactionBlockResponse> = jsonrpc_client
.request("sui_getTransactionBlock", rpc_params![*tx_digest])
.await;
if let Err(err) = response {
// TODO: fix validator blocking error handling such that the error message
// is not misleading. The full error message currently is the following:
// Transaction execution failed due to issues with transaction inputs, please
// review the errors and try again: Too many requests.
assert!(
err.to_string().contains("Too many requests"),
"Error not due to spam policy"
);
return Ok(());
}
}
panic!("Expected spam policy to trigger within {txn_count} requests");
}

async fn assert_validator_traffic_control_spam_delegated(
mut test_cluster: TestCluster,
txn_count: usize,
listen_port: u16,
) -> Result<(), anyhow::Error> {
// start test firewall server
let mut server = NodeFwTestServer::new();
server.start(listen_port).await;
// await for the server to start
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let context = &mut test_cluster.wallet;
let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client;
let mut txns = batch_make_transfer_transactions(context, txn_count).await;
assert!(
txns.len() >= txn_count,
"Expect at least {} txns. Do we generate enough gas objects during genesis?",
txn_count,
);

let txn = txns.swap_remove(0);
let tx_digest = txn.digest();
let (tx_bytes, signatures) = txn.to_tx_bytes_and_signatures();
let params = rpc_params![
tx_bytes,
signatures,
SuiTransactionBlockResponseOptions::new(),
ExecuteTransactionRequestType::WaitForLocalExecution
];

// it should take no more than 4 requests to be added to the blocklist
let response: SuiTransactionBlockResponse = jsonrpc_client
.request("sui_executeTransactionBlock", params.clone())
.await
.unwrap();
let SuiTransactionBlockResponse {
digest,
confirmed_local_execution,
..
} = response;
assert_eq!(&digest, tx_digest);
assert!(confirmed_local_execution.unwrap());

for _ in 0..txn_count {
let response: RpcResult<SuiTransactionBlockResponse> = jsonrpc_client
.request("sui_getTransactionBlock", rpc_params![*tx_digest])
.await;
assert!(response.is_ok(), "Expected request to succeed");
}
let fw_blocklist = server.list_addresses_rpc().await;
assert!(
!fw_blocklist.is_empty(),
"Expected blocklist to be non-empty"
);
server.stop().await;
Ok(())
}
Loading

0 comments on commit b4a59bf

Please sign in to comment.