Skip to content

Commit

Permalink
[TrafficController] correctly categorize spam (#18324)
Browse files Browse the repository at this point in the history
## Description 

This PR redefines spam in the context of traffic controller as any
request type that does not consume gas. There are two such categories
that have bee considered so far - read api requests and transaction
execution requests that do not invoke the vm (which, to my knowledge,
should only happen on submitting a certificate for an already executed
transaction, at which point all validators simply read and return the
effects). Both cases are hadled here.

## Test plan 

Fixed existing tests to exercise the new logic

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
williampsmith authored Jun 27, 2024
1 parent e0a919d commit a025ca0
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 170 deletions.
139 changes: 84 additions & 55 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl ValidatorService {
async fn handle_transaction(
&self,
request: tonic::Request<Transaction>,
) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
) -> WrappedServiceResponse<HandleTransactionResponse> {
let Self {
state,
consensus_adapter,
Expand Down Expand Up @@ -414,9 +414,14 @@ impl ValidatorService {
// to save more CPU.
return Err(error.into());
}
Ok(tonic::Response::new(info))
Ok((tonic::Response::new(info), Weight::zero()))
}

// In addition to the response from handling the certificates,
// returns a bool indicating whether the request should be tallied
// toward spam count. In general, this should be set to true for
// requests that are read-only and thus do not consume gas, such
// as when the transaction is already executed.
async fn handle_certificates(
&self,
certificates: NonEmpty<CertifiedTransaction>,
Expand All @@ -426,7 +431,7 @@ impl ValidatorService {
_include_auxiliary_data: bool,
epoch_store: &Arc<AuthorityPerEpochStore>,
wait_for_effects: bool,
) -> Result<Option<Vec<HandleCertificateResponseV3>>, tonic::Status> {
) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
// Validate if cert can be executed
// Fullnode does not serve handle_certificate call.
fp_ensure!(
Expand Down Expand Up @@ -472,13 +477,16 @@ impl ValidatorService {
None
};

return Ok(Some(vec![HandleCertificateResponseV3 {
effects: signed_effects.into_inner(),
events,
input_objects: None,
output_objects: None,
auxiliary_data: None,
}]));
return Ok((
Some(vec![HandleCertificateResponseV3 {
effects: signed_effects.into_inner(),
events,
input_objects: None,
output_objects: None,
auxiliary_data: None,
}]),
Weight::one(),
));
};
}

Expand Down Expand Up @@ -562,7 +570,7 @@ impl ValidatorService {
epoch_store,
);
}
return Ok(None);
return Ok((None, Weight::zero()));
}

// 4) Execute the certificates immediately if they contain only owned object transactions,
Expand Down Expand Up @@ -605,22 +613,24 @@ impl ValidatorService {
))
.await?;

Ok(Some(responses))
Ok((Some(responses), Weight::zero()))
}
}

type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;

impl ValidatorService {
async fn transaction_impl(
&self,
request: tonic::Request<Transaction>,
) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
) -> WrappedServiceResponse<HandleTransactionResponse> {
self.handle_transaction(request).await
}

async fn submit_certificate_impl(
&self,
request: tonic::Request<CertifiedTransaction>,
) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
) -> WrappedServiceResponse<SubmitCertificateResponse> {
let epoch_store = self.state.load_epoch_store_one_call_per_task();
let certificate = request.into_inner();
Self::transaction_validity_check(&epoch_store, certificate.data())?;
Expand All @@ -637,17 +647,20 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|executed| {
tonic::Response::new(SubmitCertificateResponse {
executed: executed.map(|mut x| x.remove(0)).map(Into::into),
})
.map(|(executed, spam_weight)| {
(
tonic::Response::new(SubmitCertificateResponse {
executed: executed.map(|mut x| x.remove(0)).map(Into::into),
}),
spam_weight,
)
})
}

async fn handle_certificate_v2_impl(
&self,
request: tonic::Request<CertifiedTransaction>,
) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
) -> WrappedServiceResponse<HandleCertificateResponseV2> {
let epoch_store = self.state.load_epoch_store_one_call_per_task();
let certificate = request.into_inner();
Self::transaction_validity_check(&epoch_store, certificate.data())?;
Expand All @@ -664,19 +677,24 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|v| {
tonic::Response::new(
v.expect("handle_certificate should not return none with wait_for_effects=true")
.map(|(resp, spam_weight)| {
(
tonic::Response::new(
resp.expect(
"handle_certificate should not return none with wait_for_effects=true",
)
.remove(0)
.into(),
),
spam_weight,
)
})
}

async fn handle_certificate_v3_impl(
&self,
request: tonic::Request<HandleCertificateRequestV3>,
) -> Result<tonic::Response<HandleCertificateResponseV3>, tonic::Status> {
) -> WrappedServiceResponse<HandleCertificateResponseV3> {
let epoch_store = self.state.load_epoch_store_one_call_per_task();
let request = request.into_inner();
Self::transaction_validity_check(&epoch_store, request.certificate.data())?;
Expand All @@ -693,10 +711,15 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|v| {
tonic::Response::new(
v.expect("handle_certificate should not return none with wait_for_effects=true")
.map(|(resp, spam_weight)| {
(
tonic::Response::new(
resp.expect(
"handle_certificate should not return none with wait_for_effects=true",
)
.remove(0),
),
spam_weight,
)
})
}
Expand Down Expand Up @@ -788,7 +811,7 @@ impl ValidatorService {
async fn handle_soft_bundle_certificates_v3_impl(
&self,
request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV3>, tonic::Status> {
) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV3> {
let epoch_store = self.state.load_epoch_store_one_call_per_task();
let request = request.into_inner();

Expand Down Expand Up @@ -818,10 +841,13 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|v| {
tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
responses: v.unwrap_or_default(),
})
.map(|(resp, spam_weight)| {
(
tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
responses: resp.unwrap_or_default(),
}),
spam_weight,
)
})
}

Expand Down Expand Up @@ -850,49 +876,48 @@ impl ValidatorService {
async fn object_info_impl(
&self,
request: tonic::Request<ObjectInfoRequest>,
) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
) -> WrappedServiceResponse<ObjectInfoResponse> {
let request = request.into_inner();
let response = self.state.handle_object_info_request(request).await?;
Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn transaction_info_impl(
&self,
request: tonic::Request<TransactionInfoRequest>,
) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
) -> WrappedServiceResponse<TransactionInfoResponse> {
let request = request.into_inner();
let response = self.state.handle_transaction_info_request(request).await?;
Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn checkpoint_impl(
&self,
request: tonic::Request<CheckpointRequest>,
) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
) -> WrappedServiceResponse<CheckpointResponse> {
let request = request.into_inner();
let response = self.state.handle_checkpoint_request(&request)?;
Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn checkpoint_v2_impl(
&self,
request: tonic::Request<CheckpointRequestV2>,
) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
) -> WrappedServiceResponse<CheckpointResponseV2> {
let request = request.into_inner();
let response = self.state.handle_checkpoint_request_v2(&request)?;
Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn get_system_state_object_impl(
&self,
_request: tonic::Request<SystemStateRequest>,
) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
) -> WrappedServiceResponse<SuiSystemState> {
let response = self
.state
.get_object_cache_reader()
.get_sui_system_state_object_unsafe()?;

Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
Expand All @@ -911,22 +936,27 @@ impl ValidatorService {
fn handle_traffic_resp<T>(
&self,
client: Option<IpAddr>,
response: &Result<tonic::Response<T>, tonic::Status>,
) {
let error: Option<SuiError> = if let Err(status) = response {
Some(SuiError::from(status.clone()))
} else {
None
wrapped_response: WrappedServiceResponse<T>,
) -> Result<tonic::Response<T>, tonic::Status> {
let (error, spam_weight, unwrapped_response) = match wrapped_response {
Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
Err(status) => (
Some(SuiError::from(status.clone())),
Weight::zero(),
Err(status.clone()),
),
};

if let Some(traffic_controller) = self.traffic_controller.clone() {
traffic_controller.tally(TrafficTally {
direct: client,
through_fullnode: None,
error_weight: error.map(normalize).unwrap_or(Weight::zero()),
spam_weight,
timestamp: SystemTime::now(),
})
}
unwrapped_response
}
}

Expand All @@ -944,7 +974,7 @@ fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {

// TODO: refine error matching here
fn normalize(err: SuiError) -> Weight {
match err {
match dbg!(err) {
SuiError::UserInputError { .. }
| SuiError::InvalidSignature { .. }
| SuiError::SignerSignatureAbsent { .. }
Expand All @@ -963,7 +993,7 @@ fn normalize(err: SuiError) -> Weight {
macro_rules! handle_with_decoration {
($self:ident, $func_name:ident, $request:ident) => {{
if $self.client_id_source.is_none() {
return $self.$func_name($request).await;
return $self.$func_name($request).await.map(|(result, _)| result);
}

let client = match $self.client_id_source.as_ref().unwrap() {
Expand Down Expand Up @@ -1027,11 +1057,10 @@ macro_rules! handle_with_decoration {

// check if either IP is blocked, in which case return early
$self.handle_traffic_req(client.clone()).await?;
// handle request
let response = $self.$func_name($request).await;
// handle response tallying
$self.handle_traffic_resp(client, &response);
response

// handle traffic tallying
let wrapped_response = $self.$func_name($request).await;
$self.handle_traffic_resp(client, wrapped_response)
}};
}

Expand Down
7 changes: 4 additions & 3 deletions crates/sui-core/src/traffic_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ async fn handle_error_tally(
metrics: Arc<TrafficControllerMetrics>,
mem_drainfile_present: bool,
) -> Result<(), reqwest::Error> {
if !tally.error_weight.is_sampled().await {
if !tally.error_weight.is_sampled() {
return Ok(());
}
let resp = policy.handle_tally(tally.clone());
Expand Down Expand Up @@ -364,7 +364,7 @@ async fn handle_spam_tally(
metrics: Arc<TrafficControllerMetrics>,
mem_drainfile_present: bool,
) -> Result<(), reqwest::Error> {
if !policy_config.spam_sample_rate.is_sampled().await {
if !(tally.spam_weight.is_sampled() && policy_config.spam_sample_rate.is_sampled()) {
return Ok(());
}
let resp = policy.handle_tally(tally.clone());
Expand Down Expand Up @@ -649,7 +649,8 @@ impl TrafficSim {
client,
// TODO add proxy IP for testing
None,
// TODO add weight adjustment
// TODO add weight adjustments
Weight::one(),
Weight::one(),
));
} else {
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-core/src/traffic_controller/policies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub struct TrafficTally {
pub direct: Option<IpAddr>,
pub through_fullnode: Option<IpAddr>,
pub error_weight: Weight,
pub spam_weight: Weight,
pub timestamp: SystemTime,
}

Expand All @@ -145,11 +146,13 @@ impl TrafficTally {
direct: Option<IpAddr>,
through_fullnode: Option<IpAddr>,
error_weight: Weight,
spam_weight: Weight,
) -> Self {
Self {
direct,
through_fullnode,
error_weight,
spam_weight,
timestamp: SystemTime::now(),
}
}
Expand Down Expand Up @@ -418,18 +421,21 @@ mod tests {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))),
error_weight: Weight::zero(),
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};
let bob = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(4, 3, 2, 1))),
error_weight: Weight::zero(),
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};
let charlie = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8))),
error_weight: Weight::zero(),
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};

Expand Down
Loading

0 comments on commit a025ca0

Please sign in to comment.