diff --git a/tokio-postgres/src/client.rs b/tokio-postgres/src/client.rs index 8a7c9d288..2ab5c53b3 100644 --- a/tokio-postgres/src/client.rs +++ b/tokio-postgres/src/client.rs @@ -21,12 +21,13 @@ use crate::{ use bytes::{Buf, BytesMut}; use fallible_iterator::FallibleIterator; use futures_channel::mpsc; -use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt}; +use futures_util::{future, pin_mut, ready, Stream, StreamExt, TryStreamExt}; use parking_lot::Mutex; use postgres_protocol::message::{backend::Message, frontend}; use postgres_types::BorrowToSql; use std::collections::HashMap; use std::fmt; +use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; #[cfg(feature = "runtime")] @@ -59,6 +60,17 @@ impl Responses { } } +impl Stream for Responses { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match ready!((*self).poll_next(cx)) { + Err(err) if err.is_closed() => Poll::Ready(None), + msg => Poll::Ready(Some(msg)), + } + } +} + /// A cache of type info and prepared statements for fetching type info /// (corresponding to the queries in the [prepare](prepare) module). #[derive(Default)]