Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow 1 count block request to return 0 blocks #5554

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 40 additions & 41 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ struct InboundInfo<E: EthSpec> {
/// Protocol of the original request we received from the peer.
protocol: Protocol,
/// Responses that the peer is still expecting from us.
remaining_chunks: u64,
max_remaining_chunks: u64,
/// Useful to timing how long each request took to process. Currently only used by
/// BlocksByRange.
request_start_time: Instant,
Expand All @@ -180,7 +180,7 @@ struct OutboundInfo<Id, E: EthSpec> {
/// Info over the protocol this substream is handling.
proto: Protocol,
/// Number of chunks to be seen from the peer's response.
remaining_chunks: Option<u64>,
max_remaining_chunks: Option<u64>,
/// `Id` as given by the application that sent the request.
req_id: Id,
}
Expand Down Expand Up @@ -471,7 +471,7 @@ where
// Process one more message if one exists.
if let Some(message) = info.pending_items.pop_front() {
// If this is the last chunk, terminate the stream.
let last_chunk = info.remaining_chunks <= 1;
let last_chunk = info.max_remaining_chunks <= 1;
let fut =
send_message_to_inbound_substream(substream, message, last_chunk)
.boxed();
Expand Down Expand Up @@ -537,7 +537,8 @@ where
{
// The substream is still active, decrement the remaining
// chunks expected.
info.remaining_chunks = info.remaining_chunks.saturating_sub(1);
info.max_remaining_chunks =
info.max_remaining_chunks.saturating_sub(1);

// If this substream has not ended, we reset the timer.
// Each chunk is allowed RESPONSE_TIMEOUT to be sent.
Expand All @@ -552,7 +553,7 @@ where
// Process one more message if one exists.
if let Some(message) = info.pending_items.pop_front() {
// If this is the last chunk, terminate the stream.
let last_chunk = info.remaining_chunks <= 1;
let last_chunk = info.max_remaining_chunks <= 1;
let fut = send_message_to_inbound_substream(
substream, message, last_chunk,
)
Expand Down Expand Up @@ -664,15 +665,19 @@ where
request,
} => match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(response))) => {
if request.expected_responses() > 1 && !response.close_after() {
if request.expect_exactly_one_response() || response.close_after() {
// either this is a single response request or this response closes the
// stream
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
} else {
let substream_entry = entry.get_mut();
let delay_key = &substream_entry.delay_key;
// chunks left after this one
let remaining_chunks = substream_entry
.remaining_chunks
let max_remaining_chunks = substream_entry
.max_remaining_chunks
.map(|count| count.saturating_sub(1))
.unwrap_or_else(|| 0);
if remaining_chunks == 0 {
if max_remaining_chunks == 0 {
// this is the last expected message, close the stream as all expected chunks have been received
substream_entry.state = OutboundSubstreamState::Closing(substream);
} else {
Expand All @@ -682,14 +687,10 @@ where
substream,
request,
};
substream_entry.remaining_chunks = Some(remaining_chunks);
substream_entry.max_remaining_chunks = Some(max_remaining_chunks);
self.outbound_substreams_delay
.reset(delay_key, self.resp_timeout);
}
} else {
// either this is a single response request or this response closes the
// stream
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
}

// Check what type of response we got and report it accordingly
Expand Down Expand Up @@ -725,7 +726,16 @@ where
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
// notify the application error
if request.expected_responses() > 1 {
if request.expect_exactly_one_response() {
// return an error, stream should not have closed early.
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Err(HandlerErr::Outbound {
id: request_id,
proto: request.versioned_protocol().protocol(),
error: RPCError::IncompleteStream,
}),
));
} else {
// return an end of stream result
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Ok(RPCReceived::EndOfStream(
Expand All @@ -734,16 +744,6 @@ where
)),
));
}

// else we return an error, stream should not have closed early.
let outbound_err = HandlerErr::Outbound {
id: request_id,
proto: request.versioned_protocol().protocol(),
error: RPCError::IncompleteStream,
};
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Err(outbound_err),
));
}
Poll::Pending => {
entry.get_mut().state =
Expand Down Expand Up @@ -880,10 +880,10 @@ where
}

let (req, substream) = substream;
let expected_responses = req.expected_responses();
let max_responses = req.max_responses();

// store requests that expect responses
if expected_responses > 0 {
if max_responses > 0 {
if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS {
// Store the stream and tag the output.
let delay_key = self
Expand All @@ -894,14 +894,13 @@ where
self.current_inbound_substream_id,
InboundInfo {
state: awaiting_stream,
pending_items: VecDeque::with_capacity(std::cmp::min(
expected_responses,
128,
) as usize),
pending_items: VecDeque::with_capacity(
std::cmp::min(max_responses, 128) as usize
),
delay_key: Some(delay_key),
protocol: req.versioned_protocol().protocol(),
request_start_time: Instant::now(),
remaining_chunks: expected_responses,
max_remaining_chunks: max_responses,
},
);
} else {
Expand Down Expand Up @@ -948,8 +947,14 @@ where
}

// add the stream to substreams if we expect a response, otherwise drop the stream.
let expected_responses = request.expected_responses();
if expected_responses > 0 {
let max_responses = request.max_responses();
if max_responses > 0 {
let max_remaining_chunks = if request.expect_exactly_one_response() {
// Currently enforced only for multiple responses
None
} else {
Some(max_responses)
};
// new outbound request. Store the stream and tag the output.
let delay_key = self
.outbound_substreams_delay
Expand All @@ -958,12 +963,6 @@ where
substream: Box::new(substream),
request,
};
let expected_responses = if expected_responses > 1 {
// Currently enforced only for multiple responses
Some(expected_responses)
} else {
None
};
if self
.outbound_substreams
.insert(
Expand All @@ -972,7 +971,7 @@ where
state: awaiting_stream,
delay_key,
proto,
remaining_chunks: expected_responses,
max_remaining_chunks,
req_id: id,
},
)
Expand Down
21 changes: 0 additions & 21 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,27 +483,6 @@ impl<E: EthSpec> RPCCodedResponse<E> {
RPCCodedResponse::Error(code, err)
}

/// Specifies which response allows for multiple chunks for the stream handler.
pub fn multiple_responses(&self) -> bool {
match self {
RPCCodedResponse::Success(resp) => match resp {
RPCResponse::Status(_) => false,
RPCResponse::BlocksByRange(_) => true,
RPCResponse::BlocksByRoot(_) => true,
RPCResponse::BlobsByRange(_) => true,
RPCResponse::BlobsByRoot(_) => true,
RPCResponse::Pong(_) => false,
RPCResponse::MetaData(_) => false,
RPCResponse::LightClientBootstrap(_) => false,
RPCResponse::LightClientOptimisticUpdate(_) => false,
RPCResponse::LightClientFinalityUpdate(_) => false,
},
RPCCodedResponse::Error(_, _) => true,
// Stream terminations are part of responses that have chunks
RPCCodedResponse::StreamTermination(_) => true,
}
}

/// Returns true if this response always terminates the stream.
pub fn close_after(&self) -> bool {
!matches!(self, RPCCodedResponse::Success(_))
Expand Down
17 changes: 15 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ impl<E: EthSpec> OutboundRequest<E> {
}
/* These functions are used in the handler for stream management */

/// Number of responses expected for this request.
pub fn expected_responses(&self) -> u64 {
/// Maximum number of responses expected for this request.
pub fn max_responses(&self) -> u64 {
match self {
OutboundRequest::Status(_) => 1,
OutboundRequest::Goodbye(_) => 0,
Expand All @@ -105,6 +105,19 @@ impl<E: EthSpec> OutboundRequest<E> {
}
}

pub fn expect_exactly_one_response(&self) -> bool {
match self {
OutboundRequest::Status(_) => true,
OutboundRequest::Goodbye(_) => false,
OutboundRequest::BlocksByRange(_) => false,
OutboundRequest::BlocksByRoot(_) => false,
OutboundRequest::BlobsByRange(_) => false,
OutboundRequest::BlobsByRoot(_) => false,
OutboundRequest::Ping(_) => true,
OutboundRequest::MetaData(_) => true,
}
}

/// Gives the corresponding `SupportedProtocol` to this request.
pub fn versioned_protocol(&self) -> SupportedProtocol {
match self {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,8 +654,8 @@ pub enum InboundRequest<E: EthSpec> {
impl<E: EthSpec> InboundRequest<E> {
/* These functions are used in the handler for stream management */

/// Number of responses expected for this request.
pub fn expected_responses(&self) -> u64 {
/// Maximum number of responses expected for this request.
pub fn max_responses(&self) -> u64 {
match self {
InboundRequest::Status(_) => 1,
InboundRequest::Goodbye(_) => 0,
Expand Down
12 changes: 6 additions & 6 deletions beacon_node/lighthouse_network/src/rpc/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,16 @@ impl RPCRateLimiterBuilder {

pub trait RateLimiterItem {
fn protocol(&self) -> Protocol;
fn expected_responses(&self) -> u64;
fn max_responses(&self) -> u64;
}

impl<E: EthSpec> RateLimiterItem for super::InboundRequest<E> {
fn protocol(&self) -> Protocol {
self.versioned_protocol().protocol()
}

fn expected_responses(&self) -> u64 {
self.expected_responses()
fn max_responses(&self) -> u64 {
self.max_responses()
}
}

Expand All @@ -246,8 +246,8 @@ impl<E: EthSpec> RateLimiterItem for super::OutboundRequest<E> {
self.versioned_protocol().protocol()
}

fn expected_responses(&self) -> u64 {
self.expected_responses()
fn max_responses(&self) -> u64 {
self.max_responses()
}
}
impl RPCRateLimiter {
Expand Down Expand Up @@ -299,7 +299,7 @@ impl RPCRateLimiter {
request: &Item,
) -> Result<(), RateLimitedErr> {
let time_since_start = self.init_time.elapsed();
let tokens = request.expected_responses().max(1);
let tokens = request.max_responses().max(1);

let check =
|limiter: &mut Limiter<PeerId>| limiter.allows(time_since_start, peer_id, tokens);
Expand Down
Loading