Skip to content

Commit

Permalink
implement Stream for Responses
Browse files Browse the repository at this point in the history
Signed-off-by: Petros Angelatos <[email protected]>
  • Loading branch information
petrosagg committed Jan 13, 2023
1 parent b7755b2 commit 47e3c45
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use crate::{
use bytes::{Buf, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
use futures_util::{future, pin_mut, ready, Stream, StreamExt, TryStreamExt};
use parking_lot::Mutex;
use postgres_protocol::message::{backend::Message, frontend};
use postgres_types::BorrowToSql;
use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -59,6 +60,17 @@ impl Responses {
}
}

impl Stream for Responses {
type Item = Result<Message, Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!((*self).poll_next(cx)) {
Err(err) if err.is_closed() => Poll::Ready(None),
msg => Poll::Ready(Some(msg)),
}
}
}

/// A cache of type info and prepared statements for fetching type info
/// (corresponding to the queries in the [prepare](prepare) module).
#[derive(Default)]
Expand Down

0 comments on commit 47e3c45

Please sign in to comment.