Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TrafficController] correctly categorize spam #18324

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 84 additions & 55 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl ValidatorService {
async fn handle_transaction(
&self,
request: tonic::Request<Transaction>,
) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
) -> WrappedServiceResponse<HandleTransactionResponse> {
let Self {
state,
consensus_adapter,
Expand Down Expand Up @@ -414,9 +414,14 @@ impl ValidatorService {
// to save more CPU.
return Err(error.into());
}
Ok(tonic::Response::new(info))
Ok((tonic::Response::new(info), Weight::zero()))
}

// In addition to the response from handling the certificates,
// returns a bool indicating whether the request should be tallied
// toward spam count. In general, this should be set to true for
// requests that are read-only and thus do not consume gas, such
// as when the transaction is already executed.
async fn handle_certificates(
&self,
certificates: NonEmpty<CertifiedTransaction>,
Expand All @@ -426,7 +431,7 @@ impl ValidatorService {
_include_auxiliary_data: bool,
epoch_store: &Arc<AuthorityPerEpochStore>,
wait_for_effects: bool,
) -> Result<Option<Vec<HandleCertificateResponseV3>>, tonic::Status> {
) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
// Validate if cert can be executed
// Fullnode does not serve handle_certificate call.
fp_ensure!(
Expand Down Expand Up @@ -472,13 +477,16 @@ impl ValidatorService {
None
};

return Ok(Some(vec![HandleCertificateResponseV3 {
effects: signed_effects.into_inner(),
events,
input_objects: None,
output_objects: None,
auxiliary_data: None,
}]));
return Ok((
Some(vec![HandleCertificateResponseV3 {
effects: signed_effects.into_inner(),
events,
input_objects: None,
output_objects: None,
auxiliary_data: None,
}]),
Weight::one(),
));
};
}

Expand Down Expand Up @@ -562,7 +570,7 @@ impl ValidatorService {
epoch_store,
);
}
return Ok(None);
return Ok((None, Weight::zero()));
}

// 4) Execute the certificates immediately if they contain only owned object transactions,
Expand Down Expand Up @@ -602,22 +610,24 @@ impl ValidatorService {
))
.await?;

Ok(Some(responses))
Ok((Some(responses), Weight::zero()))
}
}

type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;

impl ValidatorService {
async fn transaction_impl(
&self,
request: tonic::Request<Transaction>,
) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
) -> WrappedServiceResponse<HandleTransactionResponse> {
self.handle_transaction(request).await
}

async fn submit_certificate_impl(
&self,
request: tonic::Request<CertifiedTransaction>,
) -> Result<tonic::Response<SubmitCertificateResponse>, tonic::Status> {
) -> WrappedServiceResponse<SubmitCertificateResponse> {
let epoch_store = self.state.load_epoch_store_one_call_per_task();
let certificate = request.into_inner();
Self::transaction_validity_check(&epoch_store, certificate.data())?;
Expand All @@ -634,17 +644,20 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|executed| {
tonic::Response::new(SubmitCertificateResponse {
executed: executed.map(|mut x| x.remove(0)).map(Into::into),
})
.map(|(executed, spam_weight)| {
(
tonic::Response::new(SubmitCertificateResponse {
executed: executed.map(|mut x| x.remove(0)).map(Into::into),
}),
spam_weight,
)
})
}

async fn handle_certificate_v2_impl(
&self,
request: tonic::Request<CertifiedTransaction>,
) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
) -> WrappedServiceResponse<HandleCertificateResponseV2> {
let epoch_store = self.state.load_epoch_store_one_call_per_task();
let certificate = request.into_inner();
Self::transaction_validity_check(&epoch_store, certificate.data())?;
Expand All @@ -661,19 +674,24 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|v| {
tonic::Response::new(
v.expect("handle_certificate should not return none with wait_for_effects=true")
.map(|(resp, spam_weight)| {
(
tonic::Response::new(
resp.expect(
"handle_certificate should not return none with wait_for_effects=true",
)
.remove(0)
.into(),
),
spam_weight,
)
})
}

async fn handle_certificate_v3_impl(
&self,
request: tonic::Request<HandleCertificateRequestV3>,
) -> Result<tonic::Response<HandleCertificateResponseV3>, tonic::Status> {
) -> WrappedServiceResponse<HandleCertificateResponseV3> {
let epoch_store = self.state.load_epoch_store_one_call_per_task();
let request = request.into_inner();
Self::transaction_validity_check(&epoch_store, request.certificate.data())?;
Expand All @@ -690,10 +708,15 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|v| {
tonic::Response::new(
v.expect("handle_certificate should not return none with wait_for_effects=true")
.map(|(resp, spam_weight)| {
(
tonic::Response::new(
resp.expect(
"handle_certificate should not return none with wait_for_effects=true",
)
.remove(0),
),
spam_weight,
)
})
}
Expand Down Expand Up @@ -785,7 +808,7 @@ impl ValidatorService {
async fn handle_soft_bundle_certificates_v3_impl(
&self,
request: tonic::Request<HandleSoftBundleCertificatesRequestV3>,
) -> Result<tonic::Response<HandleSoftBundleCertificatesResponseV3>, tonic::Status> {
) -> WrappedServiceResponse<HandleSoftBundleCertificatesResponseV3> {
let epoch_store = self.state.load_epoch_store_one_call_per_task();
let request = request.into_inner();

Expand Down Expand Up @@ -815,10 +838,13 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|v| {
tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
responses: v.unwrap_or_default(),
})
.map(|(resp, spam_weight)| {
(
tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
responses: resp.unwrap_or_default(),
}),
spam_weight,
)
})
}

Expand Down Expand Up @@ -847,49 +873,48 @@ impl ValidatorService {
async fn object_info_impl(
&self,
request: tonic::Request<ObjectInfoRequest>,
) -> Result<tonic::Response<ObjectInfoResponse>, tonic::Status> {
) -> WrappedServiceResponse<ObjectInfoResponse> {
let request = request.into_inner();
let response = self.state.handle_object_info_request(request).await?;
Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn transaction_info_impl(
&self,
request: tonic::Request<TransactionInfoRequest>,
) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
) -> WrappedServiceResponse<TransactionInfoResponse> {
let request = request.into_inner();
let response = self.state.handle_transaction_info_request(request).await?;
Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn checkpoint_impl(
&self,
request: tonic::Request<CheckpointRequest>,
) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
) -> WrappedServiceResponse<CheckpointResponse> {
let request = request.into_inner();
let response = self.state.handle_checkpoint_request(&request)?;
Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn checkpoint_v2_impl(
&self,
request: tonic::Request<CheckpointRequestV2>,
) -> Result<tonic::Response<CheckpointResponseV2>, tonic::Status> {
) -> WrappedServiceResponse<CheckpointResponseV2> {
let request = request.into_inner();
let response = self.state.handle_checkpoint_request_v2(&request)?;
Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn get_system_state_object_impl(
&self,
_request: tonic::Request<SystemStateRequest>,
) -> Result<tonic::Response<SuiSystemState>, tonic::Status> {
) -> WrappedServiceResponse<SuiSystemState> {
let response = self
.state
.get_object_cache_reader()
.get_sui_system_state_object_unsafe()?;

Ok(tonic::Response::new(response))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
Expand All @@ -908,22 +933,27 @@ impl ValidatorService {
fn handle_traffic_resp<T>(
&self,
client: Option<IpAddr>,
response: &Result<tonic::Response<T>, tonic::Status>,
) {
let error: Option<SuiError> = if let Err(status) = response {
Some(SuiError::from(status.clone()))
} else {
None
wrapped_response: WrappedServiceResponse<T>,
) -> Result<tonic::Response<T>, tonic::Status> {
let (error, spam_weight, unwrapped_response) = match wrapped_response {
Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
Err(status) => (
Some(SuiError::from(status.clone())),
Weight::zero(),
Err(status.clone()),
),
};

if let Some(traffic_controller) = self.traffic_controller.clone() {
traffic_controller.tally(TrafficTally {
direct: client,
through_fullnode: None,
error_weight: error.map(normalize).unwrap_or(Weight::zero()),
spam_weight,
timestamp: SystemTime::now(),
})
}
unwrapped_response
}
}

Expand All @@ -941,7 +971,7 @@ fn make_tonic_request_for_testing<T>(message: T) -> tonic::Request<T> {

// TODO: refine error matching here
fn normalize(err: SuiError) -> Weight {
match err {
match dbg!(err) {
SuiError::UserInputError { .. }
| SuiError::InvalidSignature { .. }
| SuiError::SignerSignatureAbsent { .. }
Expand All @@ -960,7 +990,7 @@ fn normalize(err: SuiError) -> Weight {
macro_rules! handle_with_decoration {
($self:ident, $func_name:ident, $request:ident) => {{
if $self.client_id_source.is_none() {
return $self.$func_name($request).await;
return $self.$func_name($request).await.map(|(result, _)| result);
}

let client = match $self.client_id_source.as_ref().unwrap() {
Expand Down Expand Up @@ -1024,11 +1054,10 @@ macro_rules! handle_with_decoration {

// check if either IP is blocked, in which case return early
$self.handle_traffic_req(client.clone()).await?;
// handle request
let response = $self.$func_name($request).await;
// handle response tallying
$self.handle_traffic_resp(client, &response);
response

// handle traffic tallying
let wrapped_response = $self.$func_name($request).await;
$self.handle_traffic_resp(client, wrapped_response)
}};
}

Expand Down
7 changes: 4 additions & 3 deletions crates/sui-core/src/traffic_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ async fn handle_error_tally(
metrics: Arc<TrafficControllerMetrics>,
mem_drainfile_present: bool,
) -> Result<(), reqwest::Error> {
if !tally.error_weight.is_sampled().await {
if !tally.error_weight.is_sampled() {
return Ok(());
}
let resp = policy.handle_tally(tally.clone());
Expand Down Expand Up @@ -364,7 +364,7 @@ async fn handle_spam_tally(
metrics: Arc<TrafficControllerMetrics>,
mem_drainfile_present: bool,
) -> Result<(), reqwest::Error> {
if !policy_config.spam_sample_rate.is_sampled().await {
if !(tally.spam_weight.is_sampled() && policy_config.spam_sample_rate.is_sampled()) {
return Ok(());
}
let resp = policy.handle_tally(tally.clone());
Expand Down Expand Up @@ -649,7 +649,8 @@ impl TrafficSim {
client,
// TODO add proxy IP for testing
None,
// TODO add weight adjustment
// TODO add weight adjustments
Weight::one(),
Weight::one(),
));
} else {
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-core/src/traffic_controller/policies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub struct TrafficTally {
pub direct: Option<IpAddr>,
pub through_fullnode: Option<IpAddr>,
pub error_weight: Weight,
pub spam_weight: Weight,
pub timestamp: SystemTime,
}

Expand All @@ -145,11 +146,13 @@ impl TrafficTally {
direct: Option<IpAddr>,
through_fullnode: Option<IpAddr>,
error_weight: Weight,
spam_weight: Weight,
) -> Self {
Self {
direct,
through_fullnode,
error_weight,
spam_weight,
timestamp: SystemTime::now(),
}
}
Expand Down Expand Up @@ -418,18 +421,21 @@ mod tests {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))),
error_weight: Weight::zero(),
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};
let bob = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(4, 3, 2, 1))),
error_weight: Weight::zero(),
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};
let charlie = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8))),
error_weight: Weight::zero(),
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};

Expand Down
Loading
Loading