Skip to content

Commit

Permalink
Deduplicate logic from select_with_strategy poll_next branches
Browse files Browse the repository at this point in the history
  • Loading branch information
414owen committed Mar 24, 2022
1 parent 53651ca commit bd8084d
Showing 1 changed file with 51 additions and 45 deletions.
96 changes: 51 additions & 45 deletions futures-util/src/stream/select_with_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@ pub enum PollNext {
Right,
}

enum PollSide {
/// Poll the first stream.
Left,
/// Poll the second stream.
Right,
}

impl PollNext {
/// Toggle the value and return the old one.
#[must_use]
Expand All @@ -33,6 +26,13 @@ impl PollNext {

old
}

fn other(&self) -> PollNext {
match self {
PollNext::Left => PollNext::Right,
PollNext::Right => PollNext::Left,
}
}
}

impl Default for PollNext {
Expand All @@ -49,16 +49,16 @@ enum InternalState {
}

impl InternalState {
fn finish(&mut self, ps: PollSide) {
fn finish(&mut self, ps: &PollNext) {
match (&self, ps) {
(InternalState::Start, PollSide::Left) => {
(InternalState::Start, PollNext::Left) => {
*self = InternalState::LeftFinished;
}
(InternalState::Start, PollSide::Right) => {
(InternalState::Start, PollNext::Right) => {
*self = InternalState::RightFinished;
}
(InternalState::LeftFinished, PollSide::Right)
| (InternalState::RightFinished, PollSide::Left) => {
(InternalState::LeftFinished, PollNext::Right)
| (InternalState::RightFinished, PollNext::Left) => {
*self = InternalState::BothFinished;
}
_ => {}
Expand All @@ -69,6 +69,7 @@ impl InternalState {
pin_project! {
/// Stream for the [`select_with_strategy()`] function. See function docs for details.
#[must_use = "streams do nothing unless polled"]
#[project = SelectWithStrategyProj]
pub struct SelectWithStrategy<St1, St2, Clos, State> {
#[pin]
stream1: St1,
Expand Down Expand Up @@ -210,6 +211,41 @@ where
}
}

#[inline(always)]
fn poll_side<St1, St2, Clos, State>(select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, side: &PollNext, cx: &mut Context<'_>) -> Poll<Option<St1::Item>>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
match side {
PollNext::Left => select.stream1.as_mut().poll_next(cx),
PollNext::Right => select.stream2.as_mut().poll_next(cx),
}
}

#[inline(always)]
fn poll_inner<St1, St2, Clos, State>(select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, side: &PollNext, cx: &mut Context<'_>) -> Poll<Option<St1::Item>>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
match poll_side(select, side, cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
Poll::Ready(None) => {
select.internal_state.finish(side);
},
Poll::Pending => (),
};
let other = side.other();
match poll_side(select, &other, cx) {
Poll::Ready(None) => {
select.internal_state.finish(&other);
Poll::Ready(None)
},
a => a,
}
}

impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
where
St1: Stream,
Expand All @@ -219,42 +255,12 @@ where
type Item = St1::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
let this = self.project();
let mut this = self.project();

match this.internal_state {
InternalState::Start => match (this.clos)(this.state) {
PollNext::Left => {
match this.stream1.poll_next(cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
Poll::Ready(None) => {
this.internal_state.finish(PollSide::Left);
}
Poll::Pending => (),
};
match this.stream2.poll_next(cx) {
Poll::Ready(None) => {
this.internal_state.finish(PollSide::Right);
Poll::Ready(None)
}
a => a,
}
}
PollNext::Right => {
match this.stream2.poll_next(cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
Poll::Ready(None) => {
this.internal_state.finish(PollSide::Right);
}
Poll::Pending => (),
};
match this.stream1.poll_next(cx) {
Poll::Ready(None) => {
this.internal_state.finish(PollSide::Left);
Poll::Ready(None)
}
a => a,
}
}
PollNext::Left => poll_inner(&mut this, &PollNext::Left, cx),
PollNext::Right => poll_inner(&mut this, &PollNext::Right, cx),
},
InternalState::LeftFinished => match this.stream2.poll_next(cx) {
Poll::Ready(None) => {
Expand Down

0 comments on commit bd8084d

Please sign in to comment.