Skip to content

Commit

Permalink
introduce spam_weight
Browse files Browse the repository at this point in the history
  • Loading branch information
williampsmith committed Jun 21, 2024
1 parent 2149cc3 commit aeb24ff
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 80 deletions.
77 changes: 33 additions & 44 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,6 @@ impl ValidatorService {
&self,
request: tonic::Request<Transaction>,
) -> WrappedServiceResponse<HandleTransactionResponse> {
let tally_spam = false;

let Self {
state,
consensus_adapter,
Expand Down Expand Up @@ -412,7 +410,7 @@ impl ValidatorService {
// to save more CPU.
return Err(error.into());
}
Ok((tonic::Response::new(info), tally_spam))
Ok((tonic::Response::new(info), Weight::zero()))
}

// In addition to the response from handling the certificates,
Expand All @@ -429,7 +427,7 @@ impl ValidatorService {
_include_auxiliary_data: bool,
epoch_store: &Arc<AuthorityPerEpochStore>,
wait_for_effects: bool,
) -> Result<(Option<Vec<HandleCertificateResponseV3>>, bool), tonic::Status> {
) -> Result<(Option<Vec<HandleCertificateResponseV3>>, Weight), tonic::Status> {
// Validate if cert can be executed
// Fullnode does not serve handle_certificate call.
fp_ensure!(
Expand Down Expand Up @@ -483,7 +481,7 @@ impl ValidatorService {
output_objects: None,
auxiliary_data: None,
}]),
true,
Weight::one(),
));
};
}
Expand Down Expand Up @@ -568,7 +566,7 @@ impl ValidatorService {
epoch_store,
);
}
return Ok((None, false));
return Ok((None, Weight::zero()));
}

// 4) Execute the certificates immediately if they contain only owned object transactions,
Expand Down Expand Up @@ -608,11 +606,11 @@ impl ValidatorService {
))
.await?;

Ok((Some(responses), false))
Ok((Some(responses), Weight::zero()))
}
}

type WrappedServiceResponse<T> = Result<(tonic::Response<T>, bool), tonic::Status>;
type WrappedServiceResponse<T> = Result<(tonic::Response<T>, Weight), tonic::Status>;

impl ValidatorService {
async fn transaction_impl(
Expand Down Expand Up @@ -642,12 +640,12 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|(executed, tally_spam)| {
.map(|(executed, spam_weight)| {
(
tonic::Response::new(SubmitCertificateResponse {
executed: executed.map(|mut x| x.remove(0)).map(Into::into),
}),
tally_spam,
spam_weight,
)
})
}
Expand All @@ -672,7 +670,7 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|(resp, tally_spam)| {
.map(|(resp, spam_weight)| {
(
tonic::Response::new(
resp.expect(
Expand All @@ -681,7 +679,7 @@ impl ValidatorService {
.remove(0)
.into(),
),
tally_spam,
spam_weight,
)
})
}
Expand All @@ -706,15 +704,15 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|(resp, tally_spam)| {
.map(|(resp, spam_weight)| {
(
tonic::Response::new(
resp.expect(
"handle_certificate should not return none with wait_for_effects=true",
)
.remove(0),
),
tally_spam,
spam_weight,
)
})
}
Expand Down Expand Up @@ -836,12 +834,12 @@ impl ValidatorService {
)
.instrument(span)
.await
.map(|(resp, tally_spam)| {
.map(|(resp, spam_weight)| {
(
tonic::Response::new(HandleSoftBundleCertificatesResponseV3 {
responses: resp.unwrap_or_default(),
}),
tally_spam,
spam_weight,
)
})
}
Expand Down Expand Up @@ -872,7 +870,7 @@ impl ValidatorService {
) -> WrappedServiceResponse<ObjectInfoResponse> {
let request = request.into_inner();
let response = self.state.handle_object_info_request(request).await?;
Ok((tonic::Response::new(response), true))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn transaction_info_impl(
Expand All @@ -881,7 +879,7 @@ impl ValidatorService {
) -> WrappedServiceResponse<TransactionInfoResponse> {
let request = request.into_inner();
let response = self.state.handle_transaction_info_request(request).await?;
Ok((tonic::Response::new(response), true))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn checkpoint_impl(
Expand All @@ -890,7 +888,7 @@ impl ValidatorService {
) -> WrappedServiceResponse<CheckpointResponse> {
let request = request.into_inner();
let response = self.state.handle_checkpoint_request(&request)?;
Ok((tonic::Response::new(response), true))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn checkpoint_v2_impl(
Expand All @@ -899,7 +897,7 @@ impl ValidatorService {
) -> WrappedServiceResponse<CheckpointResponseV2> {
let request = request.into_inner();
let response = self.state.handle_checkpoint_request_v2(&request)?;
Ok((tonic::Response::new(response), true))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn get_system_state_object_impl(
Expand All @@ -910,7 +908,7 @@ impl ValidatorService {
.state
.get_object_cache_reader()
.get_sui_system_state_object_unsafe()?;
Ok((tonic::Response::new(response), true))
Ok((tonic::Response::new(response), Weight::one()))
}

async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
Expand All @@ -929,24 +927,27 @@ impl ValidatorService {
fn handle_traffic_resp<T>(
&self,
client: Option<IpAddr>,
response: &Result<tonic::Response<T>, tonic::Status>,
tally_spam: bool,
) {
let error: Option<SuiError> = if let Err(status) = response {
Some(SuiError::from(status.clone()))
} else {
None
wrapped_response: WrappedServiceResponse<T>,
) -> Result<tonic::Response<T>, tonic::Status> {
let (error, spam_weight, unwrapped_response) = match wrapped_response {
Ok((result, spam_weight)) => (None, spam_weight.clone(), Ok(result)),
Err(status) => (
Some(SuiError::from(status.clone())),
Weight::zero(),
Err(status.clone()),
),
};

if let Some(traffic_controller) = self.traffic_controller.clone() {
traffic_controller.tally(TrafficTally {
direct: client,
through_fullnode: None,
error_weight: error.map(normalize).unwrap_or(Weight::zero()),
tally_spam,
spam_weight,
timestamp: SystemTime::now(),
})
}
unwrapped_response
}
}

Expand Down Expand Up @@ -1055,21 +1056,9 @@ macro_rules! handle_with_decoration {
// check if either IP is blocked, in which case return early
$self.handle_traffic_req(client.clone()).await?;

// handle response tallying.
match $self.$func_name($request).await {
Ok((result, tally_spam)) => {
let response = Ok(result);
$self.handle_traffic_resp(client, &response, tally_spam);
response
}
Err(err) => {
// TODO: should we set tally_spam to true here for error
// case?
let response = Err(err);
$self.handle_traffic_resp(client, &response, false);
response
}
}
// handle traffic tallying
let wrapped_response = $self.$func_name($request).await;
$self.handle_traffic_resp(client, wrapped_response)
}};
}

Expand Down
14 changes: 11 additions & 3 deletions crates/sui-core/src/traffic_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,15 @@ async fn handle_spam_tally(
metrics: Arc<TrafficControllerMetrics>,
mem_drainfile_present: bool,
) -> Result<(), reqwest::Error> {
if tally.tally_spam && !policy_config.spam_sample_rate.is_sampled().await {
let sampled = futures::future::join_all(vec![
tally.spam_weight.is_sampled(),
policy_config.spam_sample_rate.is_sampled(),
])
.await
.into_iter()
.all(|sampled| sampled);

if !sampled {
return Ok(());
}
let resp = policy.handle_tally(tally.clone());
Expand Down Expand Up @@ -615,9 +623,9 @@ impl TrafficSim {
client,
// TODO add proxy IP for testing
None,
// TODO add weight adjustment
// TODO add weight adjustments
Weight::one(),
Weight::one(),
true,
));
} else {
if !currently_blocked {
Expand Down
12 changes: 6 additions & 6 deletions crates/sui-core/src/traffic_controller/policies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub struct TrafficTally {
pub direct: Option<IpAddr>,
pub through_fullnode: Option<IpAddr>,
pub error_weight: Weight,
pub tally_spam: bool,
pub spam_weight: Weight,
pub timestamp: SystemTime,
}

Expand All @@ -146,13 +146,13 @@ impl TrafficTally {
direct: Option<IpAddr>,
through_fullnode: Option<IpAddr>,
error_weight: Weight,
tally_spam: bool,
spam_weight: Weight,
) -> Self {
Self {
direct,
through_fullnode,
error_weight,
tally_spam,
spam_weight,
timestamp: SystemTime::now(),
}
}
Expand Down Expand Up @@ -421,21 +421,21 @@ mod tests {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))),
error_weight: Weight::zero(),
tally_spam: true,
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};
let bob = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(4, 3, 2, 1))),
error_weight: Weight::zero(),
tally_spam: true,
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};
let charlie = TrafficTally {
direct: Some(IpAddr::V4(Ipv4Addr::new(8, 7, 6, 5))),
through_fullnode: Some(IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8))),
error_weight: Weight::zero(),
tally_spam: true,
spam_weight: Weight::one(),
timestamp: SystemTime::now(),
};

Expand Down
Loading

0 comments on commit aeb24ff

Please sign in to comment.