Skip to content

Commit

Permalink
refactor(perf): don't use OutboundOpenInfo
Browse files Browse the repository at this point in the history
Instead of passing the command along, we store it in a buffer and retrieve it once the stream is upgraded.

Related: #3268.

Pull-Request: #3763.
  • Loading branch information
thomaseizinger authored Apr 28, 2023
1 parent 66466c4 commit 8445079
Showing 1 changed file with 32 additions and 20 deletions.
52 changes: 32 additions & 20 deletions protocols/perf/src/client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct Handler {
>,
>,

requested_streams: VecDeque<Command>,

outbound: FuturesUnordered<BoxFuture<'static, Result<Event, std::io::Error>>>,

keep_alive: KeepAlive,
Expand All @@ -70,6 +72,7 @@ impl Handler {
pub fn new() -> Self {
Self {
queued_events: Default::default(),
requested_streams: Default::default(),
outbound: Default::default(),
keep_alive: KeepAlive::Yes,
}
Expand All @@ -88,17 +91,18 @@ impl ConnectionHandler for Handler {
type Error = Void;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = ReadyUpgrade<&'static [u8]>;
type OutboundOpenInfo = Command;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(DeniedUpgrade, ())
}

fn on_behaviour_event(&mut self, command: Self::InEvent) {
self.requested_streams.push_back(command);
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), command),
protocol: SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ()),
})
}

Expand All @@ -117,26 +121,34 @@ impl ConnectionHandler for Handler {
}) => void::unreachable(protocol),
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol,
info: Command { params, id },
}) => self.outbound.push(
crate::protocol::send_receive(params, protocol)
.map_ok(move |timers| Event {
id,
result: Ok(RunStats { params, timers }),
})
.boxed(),
),
info: (),
}) => {
let Command { id, params } = self
.requested_streams
.pop_front()
.expect("opened a stream without a pending command");
self.outbound.push(
crate::protocol::send_receive(params, protocol)
.map_ok(move |timers| Event {
id,
result: Ok(RunStats { params, timers }),
})
.boxed(),
);
}

ConnectionEvent::AddressChange(_) => {}
ConnectionEvent::DialUpgradeError(DialUpgradeError {
info: Command { id, .. },
error,
}) => self
.queued_events
.push_back(ConnectionHandlerEvent::Custom(Event {
id,
result: Err(error),
})),
ConnectionEvent::DialUpgradeError(DialUpgradeError { info: (), error }) => {
let Command { id, .. } = self
.requested_streams
.pop_front()
.expect("requested stream without pending command");
self.queued_events
.push_back(ConnectionHandlerEvent::Custom(Event {
id,
result: Err(error),
}));
}
ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => {
match error {
ConnectionHandlerUpgrErr::Timeout => {}
Expand Down

0 comments on commit 8445079

Please sign in to comment.