From b22c23ba6014c2020c97051d28efb0c771603369 Mon Sep 17 00:00:00 2001 From: Louis Kureuil Person Date: Sun, 4 Nov 2018 01:09:54 +0100 Subject: [PATCH 1/5] Introduce dedicated error types to the lapin-futures crate This change replaces all occurrences of io::Error by the newly created Error type, the only exception being for AMQPCodec. This change allows future-proofing the error that can be returned by the crate. For example by changing the internals of the Error type, it is now possible to provide users with a way of knowing what kind of error occurred without having to resort to parsing error messages. The only exception to this change concerns AMQPCodec: a new Error type was created just for it because tokio-codec's Encoder and Decoder traits impose a From bound of the error type. Because I didn't want to introduce a way to create an Error instance from outside the crate, I isolated the possible errors from this section into its own type. --- futures/Cargo.toml | 1 + futures/examples/client.rs | 8 ++-- futures/examples/consumers.rs | 16 ++++---- futures/examples/topic.rs | 20 +++++----- futures/src/channel.rs | 70 ++++++++++++++++----------------- futures/src/client.rs | 22 ++++++----- futures/src/consumer.rs | 8 ++-- futures/src/error.rs | 32 +++++++++++++++ futures/src/lib.rs | 19 ++++++--- futures/src/transport.rs | 73 ++++++++++++++++++++++------------- futures/tests/connection.rs | 8 ++-- 11 files changed, 174 insertions(+), 103 deletions(-) create mode 100644 futures/src/error.rs diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 03602464..cbd2e6d6 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -14,6 +14,7 @@ license = "MIT" nom = "^4.0" log = "^0.4" bytes = "^0.4" +failure = "^0.1" futures = "^0.1" tokio-codec = "^0.1" tokio-io = "^0.1" diff --git a/futures/examples/client.rs b/futures/examples/client.rs index 8ed4a0ab..cbb7344f 100644 --- a/futures/examples/client.rs +++ b/futures/examples/client.rs @@ -1,9 +1,11 @@ #[macro_use] extern crate log; extern crate lapin_futures as lapin; +extern crate failure; extern crate futures; extern crate tokio; extern crate env_logger; +use failure::Error; use futures::future::Future; use futures::Stream; use tokio::net::TcpStream; @@ -18,11 +20,11 @@ fn main() { let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "127.0.0.1:5672".to_string()).parse().unwrap(); Runtime::new().unwrap().block_on_all( - TcpStream::connect(&addr).and_then(|stream| { + TcpStream::connect(&addr).map_err(Error::from).and_then(|stream| { lapin::client::Client::connect(stream, ConnectionOptions { frame_max: 65535, ..Default::default() - }) + }).map_err(Error::from) }).and_then(|(client, heartbeat)| { tokio::spawn(heartbeat.map_err(|e| eprintln!("{:?}", e))); @@ -82,7 +84,7 @@ fn main() { c.basic_ack(message.delivery_tag, false) }) }) - }) + }).map_err(Error::from) }).map_err(|err| eprintln!("error: {:?}", err)) ).expect("runtime exited with failure") } diff --git a/futures/examples/consumers.rs b/futures/examples/consumers.rs index e89ad127..7514151c 100644 --- a/futures/examples/consumers.rs +++ b/futures/examples/consumers.rs @@ -2,11 +2,11 @@ extern crate env_logger; extern crate lapin_futures as lapin; #[macro_use] extern crate log; +extern crate failure; extern crate futures; extern crate tokio; -use std::io; - +use failure::{err_msg, Error}; use futures::future::Future; use futures::{IntoFuture, Stream}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -36,7 +36,7 @@ fn create_consumer(client: &C println!("consumer '{}' got '{}'", n, std::str::from_utf8(&message.data).unwrap()); channel.basic_ack(message.delivery_tag, false) }) - }).map(|_| ()).map_err(move |err| eprintln!("got error in consumer '{}': {:?}", n, err)) + }).map(|_| ()).map_err(move |err| eprintln!("got error in consumer '{}': {}", n, err)) } fn main() { @@ -47,19 +47,19 @@ fn main() { // let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); runtime.block_on_all( - TcpStream::connect(&addr).and_then(|stream| { + TcpStream::connect(&addr).map_err(Error::from).and_then(|stream| { Client::connect(stream, ConnectionOptions { frame_max: 65535, heartbeat: 20, ..Default::default() - }) + }).map_err(Error::from) }).and_then(|(client, heartbeat)| { tokio::spawn(heartbeat.map_err(|e| eprintln!("heartbeat error: {:?}", e))) - .into_future().map(|_| client).map_err(|_| io::Error::new(io::ErrorKind::Other, "spawn error")) + .into_future().map(|_| client).map_err(|_| err_msg("spawn error")) }).and_then(|client| { let _client = client.clone(); futures::stream::iter_ok(0..N_CONSUMERS).for_each(move |n| tokio::spawn(create_consumer(&_client, n))) - .into_future().map(move |_| client).map_err(|_| io::Error::new(io::ErrorKind::Other, "spawn error")) + .into_future().map(move |_| client).map_err(|_| err_msg("spawn error")) }).and_then(|client| { client.create_confirm_channel(ConfirmSelectOptions::default()).and_then(move |channel| { futures::stream::iter_ok((0..N_CONSUMERS).flat_map(|c| { @@ -77,7 +77,7 @@ fn main() { }) }) }) - }) + }).map_err(Error::from) }).map_err(|err| eprintln!("error: {:?}", err)) ).expect("runtime exited with failure"); } diff --git a/futures/examples/topic.rs b/futures/examples/topic.rs index 52fe0c23..1682fb9b 100644 --- a/futures/examples/topic.rs +++ b/futures/examples/topic.rs @@ -1,11 +1,11 @@ extern crate env_logger; +extern crate failure; extern crate lapin_futures as lapin; extern crate log; extern crate futures; extern crate tokio; -use std::io; - +use failure::{err_msg, Error}; use futures::future::Future; use futures::IntoFuture; use tokio::net::TcpStream; @@ -21,27 +21,27 @@ fn main() { let runtime = Runtime::new().unwrap(); runtime.block_on_all( - TcpStream::connect(&addr).and_then(|stream| { + TcpStream::connect(&addr).map_err(Error::from).and_then(|stream| { Client::connect(stream, ConnectionOptions { frame_max: 65535, heartbeat: 20, ..Default::default() - }) + }).map_err(Error::from) }).and_then(|(client, heartbeat)| { tokio::spawn(heartbeat.map_err(|e| eprintln!("heartbeat error: {:?}", e))) - .into_future().map(|_| client).map_err(|_| io::Error::new(io::ErrorKind::Other, "spawn error")) + .into_future().map(|_| client).map_err(|_| err_msg("spawn error")) }).and_then(|client| { - client.create_confirm_channel(ConfirmSelectOptions::default()) + client.create_confirm_channel(ConfirmSelectOptions::default()).map_err(Error::from) }).and_then(|channel| { - channel.clone().exchange_declare("hello_topic", "topic", ExchangeDeclareOptions::default(), FieldTable::new()).map(move |_| channel) + channel.clone().exchange_declare("hello_topic", "topic", ExchangeDeclareOptions::default(), FieldTable::new()).map(move |_| channel).map_err(Error::from) }).and_then(|channel| { - channel.clone().queue_declare("topic_queue", QueueDeclareOptions::default(), FieldTable::new()).map(move |_| channel) + channel.clone().queue_declare("topic_queue", QueueDeclareOptions::default(), FieldTable::new()).map(move |_| channel).map_err(Error::from) }).and_then(|channel| { - channel.clone().queue_bind("topic_queue", "hello_topic", "*.foo.*", QueueBindOptions::default(), FieldTable::new()).map(move |_| channel) + channel.clone().queue_bind("topic_queue", "hello_topic", "*.foo.*", QueueBindOptions::default(), FieldTable::new()).map(move |_| channel).map_err(Error::from) }).and_then(|channel| { channel.basic_publish("hello_topic", "hello.fooo.bar", b"hello".to_vec(), BasicPublishOptions::default(), BasicProperties::default()).map(|confirmation| { println!("got confirmation of publication: {:?}", confirmation); - }) + }).map_err(Error::from) }).map_err(|err| eprintln!("error: {:?}", err)) ).expect("runtime exited with failure"); } diff --git a/futures/src/channel.rs b/futures/src/channel.rs index 13cf2be2..b1e2e53d 100644 --- a/futures/src/channel.rs +++ b/futures/src/channel.rs @@ -1,4 +1,3 @@ -use std::io::{self,Error,ErrorKind}; use futures::{Async,Future,future,Poll,Stream,task}; use tokio_io::{AsyncRead,AsyncWrite}; use std::sync::{Arc,Mutex}; @@ -6,6 +5,7 @@ use lapin_async; use lapin_async::api::{ChannelState, RequestId}; use lapin_async::connection::Connection; +use error::Error; use transport::*; use message::BasicGetMessage; use types::FieldTable; @@ -259,7 +259,7 @@ pub struct ChannelFlowOptions { impl Channel { /// create a channel - pub fn create(transport: Arc>>) -> impl Future + Send + 'static { + pub fn create(transport: Arc>>) -> impl Future + Send + 'static { let channel_transport = transport.clone(); future::poll_fn(move || { @@ -270,7 +270,7 @@ impl Channel { transport: channel_transport.clone(), })) } else { - return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "The maximum number of channels for this connection has been reached")); + return Err(Error::new("The maximum number of channels for this connection has been reached")); } }).and_then(|channel| { let channel_id = channel.id; @@ -282,8 +282,8 @@ impl Channel { match transport.conn.get_state(channel_id) { Some(ChannelState::Connected) => return Ok(Async::Ready(())), - Some(ChannelState::Error) => return Err(io::Error::new(io::ErrorKind::Other, format!("Failed to open channel"))), - Some(ChannelState::Closed) => return Err(io::Error::new(io::ErrorKind::Other, format!("Failed to open channel"))), + Some(ChannelState::Error) => return Err(Error::new("Failed to open channel")), + Some(ChannelState::Closed) => return Err(Error::new("Failed to open channel")), _ => { task::current().notify(); return Ok(Async::NotReady); @@ -299,7 +299,7 @@ impl Channel { /// request access /// /// returns a future that resolves once the access is granted - pub fn access_request(&self, realm: &str, options: AccessRequestOptions) -> impl Future + Send + 'static { + pub fn access_request(&self, realm: &str, options: AccessRequestOptions) -> impl Future + Send + 'static { let channel_id = self.id; let realm = realm.to_string(); @@ -312,7 +312,7 @@ impl Channel { /// declares an exchange /// /// returns a future that resolves once the exchange is available - pub fn exchange_declare(&self, name: &str, exchange_type: &str, options: ExchangeDeclareOptions, arguments: FieldTable) -> impl Future + Send + 'static { + pub fn exchange_declare(&self, name: &str, exchange_type: &str, options: ExchangeDeclareOptions, arguments: FieldTable) -> impl Future + Send + 'static { let channel_id = self.id; let name = name.to_string(); let exchange_type = exchange_type.to_string(); @@ -326,7 +326,7 @@ impl Channel { /// deletes an exchange /// /// returns a future that resolves once the exchange is deleted - pub fn exchange_delete(&self, name: &str, options: ExchangeDeleteOptions) -> impl Future + Send + 'static { + pub fn exchange_delete(&self, name: &str, options: ExchangeDeleteOptions) -> impl Future + Send + 'static { let channel_id = self.id; let name = name.to_string(); @@ -339,7 +339,7 @@ impl Channel { /// binds an exchange to another exchange /// /// returns a future that resolves once the exchanges are bound - pub fn exchange_bind(&self, destination: &str, source: &str, routing_key: &str, options: ExchangeBindOptions, arguments: FieldTable) -> impl Future + Send + 'static { + pub fn exchange_bind(&self, destination: &str, source: &str, routing_key: &str, options: ExchangeBindOptions, arguments: FieldTable) -> impl Future + Send + 'static { let channel_id = self.id; let destination = destination.to_string(); let source = source.to_string(); @@ -354,7 +354,7 @@ impl Channel { /// unbinds an exchange from another one /// /// returns a future that resolves once the exchanges are unbound - pub fn exchange_unbind(&self, destination: &str, source: &str, routing_key: &str, options: ExchangeUnbindOptions, arguments: FieldTable) -> impl Future + Send + 'static { + pub fn exchange_unbind(&self, destination: &str, source: &str, routing_key: &str, options: ExchangeUnbindOptions, arguments: FieldTable) -> impl Future + Send + 'static { let channel_id = self.id; let destination = destination.to_string(); let source = source.to_string(); @@ -372,7 +372,7 @@ impl Channel { /// /// the `mandatory` and `ìmmediate` options can be set to true, /// but the return message will not be handled - pub fn queue_declare(&self, name: &str, options: QueueDeclareOptions, arguments: FieldTable) -> impl Future + Send + 'static { + pub fn queue_declare(&self, name: &str, options: QueueDeclareOptions, arguments: FieldTable) -> impl Future + Send + 'static { let channel_id = self.id; let name = name.to_string(); let transport = self.transport.clone(); @@ -401,7 +401,7 @@ impl Channel { /// binds a queue to an exchange /// /// returns a future that resolves once the queue is bound to the exchange - pub fn queue_bind(&self, name: &str, exchange: &str, routing_key: &str, options: QueueBindOptions, arguments: FieldTable) -> impl Future + Send + 'static { + pub fn queue_bind(&self, name: &str, exchange: &str, routing_key: &str, options: QueueBindOptions, arguments: FieldTable) -> impl Future + Send + 'static { let channel_id = self.id; let name = name.to_string(); let exchange = exchange.to_string(); @@ -416,7 +416,7 @@ impl Channel { /// unbinds a queue from the exchange /// /// returns a future that resolves once the queue is unbound from the exchange - pub fn queue_unbind(&self, name: &str, exchange: &str, routing_key: &str, options: QueueUnbindOptions, arguments: FieldTable) -> impl Future + Send + 'static { + pub fn queue_unbind(&self, name: &str, exchange: &str, routing_key: &str, options: QueueUnbindOptions, arguments: FieldTable) -> impl Future + Send + 'static { let channel_id = self.id; let name = name.to_string(); let exchange = exchange.to_string(); @@ -428,7 +428,7 @@ impl Channel { } /// sets up confirm extension for this channel - pub fn confirm_select(&self, options: ConfirmSelectOptions) -> impl Future + Send + 'static { + pub fn confirm_select(&self, options: ConfirmSelectOptions) -> impl Future + Send + 'static { let channel_id = self.id; self.run_on_locked_transport("confirm_select", "Could not activate confirm extension", move |transport| { @@ -437,7 +437,7 @@ impl Channel { } /// specifies quality of service for a channel - pub fn basic_qos(&self, options: BasicQosOptions) -> impl Future + Send + 'static { + pub fn basic_qos(&self, options: BasicQosOptions) -> impl Future + Send + 'static { let channel_id = self.id; self.run_on_locked_transport("basic_qos", "Could not setup qos", move |transport| { @@ -450,7 +450,7 @@ impl Channel { /// the future's result is: /// - `Some(request_id)` if we're on a confirm channel and the message was ack'd /// - `None` if we're not on a confirm channel or the message was nack'd - pub fn basic_publish(&self, exchange: &str, routing_key: &str, payload: Vec, options: BasicPublishOptions, properties: BasicProperties) -> impl Future, Error = io::Error> + Send + 'static { + pub fn basic_publish(&self, exchange: &str, routing_key: &str, payload: Vec, options: BasicPublishOptions, properties: BasicProperties) -> impl Future, Error = Error> + Send + 'static { let channel_id = self.id; let exchange = exchange.to_string(); let routing_key = routing_key.to_string(); @@ -483,7 +483,7 @@ impl Channel { /// /// `Consumer` implements `futures::Stream`, so it can be used with any of /// the usual combinators - pub fn basic_consume(&self, queue: &Queue, consumer_tag: &str, options: BasicConsumeOptions, arguments: FieldTable) -> impl Future, Error = io::Error> + Send + 'static { + pub fn basic_consume(&self, queue: &Queue, consumer_tag: &str, options: BasicConsumeOptions, arguments: FieldTable) -> impl Future, Error = Error> + Send + 'static { let channel_id = self.id; let transport = self.transport.clone(); let consumer_tag = consumer_tag.to_string(); @@ -512,7 +512,7 @@ impl Channel { } /// acks a message - pub fn basic_ack(&self, delivery_tag: u64, multiple: bool) -> impl Future + Send + 'static { + pub fn basic_ack(&self, delivery_tag: u64, multiple: bool) -> impl Future + Send + 'static { let channel_id = self.id; self.run_on_locked_transport("basic_ack", "Could not ack message", move |transport| { @@ -521,7 +521,7 @@ impl Channel { } /// nacks a message - pub fn basic_nack(&self, delivery_tag: u64, multiple: bool, requeue: bool) -> impl Future + Send + 'static { + pub fn basic_nack(&self, delivery_tag: u64, multiple: bool, requeue: bool) -> impl Future + Send + 'static { let channel_id = self.id; self.run_on_locked_transport("basic_nack", "Could not nack message", move |transport| { @@ -530,7 +530,7 @@ impl Channel { } /// rejects a message - pub fn basic_reject(&self, delivery_tag: u64, requeue: bool) -> impl Future + Send + 'static { + pub fn basic_reject(&self, delivery_tag: u64, requeue: bool) -> impl Future + Send + 'static { let channel_id = self.id; self.run_on_locked_transport("basic_reject", "Could not reject message", move |transport| { @@ -539,7 +539,7 @@ impl Channel { } /// gets a message - pub fn basic_get(&self, queue: &str, options: BasicGetOptions) -> impl Future + Send + 'static { + pub fn basic_get(&self, queue: &str, options: BasicGetOptions) -> impl Future + Send + 'static { let channel_id = self.id; let _queue = queue.to_string(); let queue = queue.to_string(); @@ -560,7 +560,7 @@ impl Channel { Some(answer) => if answer { Ok(Async::Ready(Some(request_id))) } else { - Err(Error::new(ErrorKind::Other, "basic get returned empty")) + Err(Error::new("basic get returned empty")) }, None => { task::current().notify(); @@ -573,7 +573,7 @@ impl Channel { /// Purge a queue. /// /// This method removes all messages from a queue which are not awaiting acknowledgment. - pub fn queue_purge(&self, queue_name: &str, options: QueuePurgeOptions) -> impl Future + Send + 'static { + pub fn queue_purge(&self, queue_name: &str, options: QueuePurgeOptions) -> impl Future + Send + 'static { let channel_id = self.id; let queue_name = queue_name.to_string(); @@ -591,7 +591,7 @@ impl Channel { /// If the queue has consumers the server does not delete it but raises a channel exception instead. /// /// If `if_empty` is set, the server will only delete the queue if it has no messages. - pub fn queue_delete(&self, queue_name: &str, options: QueueDeleteOptions) -> impl Future + Send + 'static { + pub fn queue_delete(&self, queue_name: &str, options: QueueDeleteOptions) -> impl Future + Send + 'static { let channel_id = self.id; let queue_name = queue_name.to_string(); @@ -601,7 +601,7 @@ impl Channel { } /// closes the channel - pub fn close(&self, code: u16, message: &str) -> impl Future + Send + 'static { + pub fn close(&self, code: u16, message: &str) -> impl Future + Send + 'static { let channel_id = self.id; let message = message.to_string(); @@ -611,7 +611,7 @@ impl Channel { } /// ack a channel close - pub fn close_ok(&self) -> impl Future + Send + 'static { + pub fn close_ok(&self) -> impl Future + Send + 'static { let channel_id = self.id; self.run_on_locked_transport("close_ok", "Could not ack closed channel", move |transport| { @@ -620,7 +620,7 @@ impl Channel { } /// update a channel flow - pub fn channel_flow(&self, options: ChannelFlowOptions) -> impl Future + Send + 'static { + pub fn channel_flow(&self, options: ChannelFlowOptions) -> impl Future + Send + 'static { let channel_id = self.id; self.run_on_locked_transport("channel_flow", "Could not update channel flow", move |transport| { @@ -629,7 +629,7 @@ impl Channel { } /// ack an update to a channel flow - pub fn channel_flow_ok(&self, options: ChannelFlowOptions) -> impl Future + Send + 'static { + pub fn channel_flow_ok(&self, options: ChannelFlowOptions) -> impl Future + Send + 'static { let channel_id = self.id; self.run_on_locked_transport("channel_flow_ok", "Could not ack update to channel flow", move |transport| { @@ -637,9 +637,9 @@ impl Channel { }).map(|_| ()) } - fn run_on_locked_transport_full(&self, method: &str, error: &str, action: Action, finished: Finished, payload: Option<(Vec, BasicProperties)>) -> impl Future, Error = io::Error> + Send + 'static + fn run_on_locked_transport_full(&self, method: &str, error: &str, action: Action, finished: Finished, payload: Option<(Vec, BasicProperties)>) -> impl Future, Error = Error> + Send + 'static where Action: 'static + Send + FnOnce(&mut AMQPTransport) -> Result, lapin_async::error::Error>, - Finished: 'static + Send + Fn(&mut Connection, RequestId) -> Poll, io::Error> { + Finished: 'static + Send + Fn(&mut Connection, RequestId) -> Poll, Error> { trace!("run on locked transport; method={:?}", method); let channel_id = self.id; let transport = self.transport.clone(); @@ -661,7 +661,7 @@ impl Channel { // FnMut can be called several time and action which is an FnOnce can only be called // once (which is implemented as a ownership transfer). match action.take().unwrap()(&mut transport) { - Err(e) => Err(Error::new(ErrorKind::Other, format!("{}: {:?}", error, e))), + Err(e) => Err(Error::new(format!("{}: {:?}", error, e))), Ok(request_id) => { trace!("run on locked transport; method={:?} request_id={:?}", _method, request_id); @@ -689,7 +689,7 @@ impl Channel { }) } - fn run_on_lock_transport_basic_finished(conn: &mut Connection, request_id: RequestId) -> Poll, io::Error> { + fn run_on_lock_transport_basic_finished(conn: &mut Connection, request_id: RequestId) -> Poll, Error> { match conn.is_finished(request_id) { Some(answer) if answer => Ok(Async::Ready(Some(request_id))), _ => { @@ -699,14 +699,14 @@ impl Channel { } } - fn run_on_locked_transport(&self, method: &str, error: &str, action: Action) -> impl Future, Error = io::Error> + Send + 'static + fn run_on_locked_transport(&self, method: &str, error: &str, action: Action) -> impl Future, Error = Error> + Send + 'static where Action: 'static + Send + FnOnce(&mut AMQPTransport) -> Result, lapin_async::error::Error> { self.run_on_locked_transport_full(method, error, action, Self::run_on_lock_transport_basic_finished, None) } /// internal method to wait until a request succeeds - pub fn wait_for_answer(tr: &mut AMQPTransport, request_id: RequestId, finished: &Finished) -> Poll, io::Error> - where Finished: 'static + Send + Fn(&mut Connection, RequestId) -> Poll, io::Error> { + pub fn wait_for_answer(tr: &mut AMQPTransport, request_id: RequestId, finished: &Finished) -> Poll, Error> + where Finished: 'static + Send + Fn(&mut Connection, RequestId) -> Poll, Error> { trace!("wait for answer; request_id={:?}", request_id); tr.poll()?; trace!("wait for answer transport poll; request_id={:?} status=NotReady", request_id); diff --git a/futures/src/client.rs b/futures/src/client.rs index 51ed3454..db9442cb 100644 --- a/futures/src/client.rs +++ b/futures/src/client.rs @@ -1,7 +1,6 @@ use amq_protocol::uri::AMQPUri; use lapin_async; use std::default::Default; -use std::io; use std::str::FromStr; use futures::{future,task,Async,Future,Poll,Stream}; use futures::sync::oneshot; @@ -12,6 +11,7 @@ use std::time::{Duration,Instant}; use transport::*; use channel::{Channel, ConfirmSelectOptions}; +use error::Error; /// the Client structures connects to a server and creates channels //#[derive(Clone)] @@ -63,22 +63,22 @@ impl Default for ConnectionOptions { } impl FromStr for ConnectionOptions { - type Err = String; + type Err = Error; fn from_str(s: &str) -> Result { - let uri = AMQPUri::from_str(s)?; + let uri = AMQPUri::from_str(s).map_err(|e| Error::new(e.to_string()))?; Ok(ConnectionOptions::from_uri(uri)) } } pub type ConnectionConfiguration = lapin_async::connection::Configuration; -fn heartbeat_pulse(transport: Arc>>, heartbeat: u16, rx: oneshot::Receiver<()>) -> impl Future + Send + 'static { +fn heartbeat_pulse(transport: Arc>>, heartbeat: u16, rx: oneshot::Receiver<()>) -> impl Future + Send + 'static { let interval = if heartbeat == 0 { Err(()) } else { Ok(Interval::new(Instant::now(), Duration::from_secs(heartbeat.into())) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))) + .map_err(|err| Error::new(err.to_string()))) }; future::select_all(vec![ @@ -161,20 +161,24 @@ impl Client { /// # Example /// /// ``` + /// # extern crate failure; /// # extern crate lapin_futures; /// # extern crate tokio; /// # /// # use tokio::prelude::*; /// # /// # fn main() { + /// use failure::Error; + /// use lapin_futures::client::{Client, ConnectionOptions}; /// use tokio::net::TcpStream; /// use tokio::runtime::Runtime; - /// use lapin_futures::client::{Client, ConnectionOptions}; /// /// let addr = "127.0.0.1:5672".parse().unwrap(); /// let f = TcpStream::connect(&addr) + /// .map_err(Error::from) /// .and_then(|stream| { /// Client::connect(stream, ConnectionOptions::default()) + /// .map_err(Error::from) /// }) /// .and_then(|(client, mut heartbeat)| { /// let handle = heartbeat.handle().unwrap(); @@ -193,7 +197,7 @@ impl Client { /// # } /// ``` pub fn connect(stream: T, options: ConnectionOptions) -> - impl Future + Send + 'static>), Error = io::Error> + Send + 'static + impl Future + Send + 'static>), Error = Error> + Send + 'static { AMQPTransport::connect(stream, options).and_then(|transport| { debug!("got client service"); @@ -216,13 +220,13 @@ impl Client { /// creates a new channel /// /// returns a future that resolves to a `Channel` once the method succeeds - pub fn create_channel(&self) -> impl Future, Error = io::Error> + Send + 'static { + pub fn create_channel(&self) -> impl Future, Error = Error> + Send + 'static { Channel::create(self.transport.clone()) } /// returns a future that resolves to a `Channel` once the method succeeds /// the channel will support RabbitMQ's confirm extension - pub fn create_confirm_channel(&self, options: ConfirmSelectOptions) -> impl Future, Error = io::Error> + Send + 'static { + pub fn create_confirm_channel(&self, options: ConfirmSelectOptions) -> impl Future, Error = Error> + Send + 'static { //FIXME: maybe the confirm channel should be a separate type //especially, if we implement transactions, the methods should be available on the original channel //but not on the confirm channel. And the basic publish method should have different results diff --git a/futures/src/consumer.rs b/futures/src/consumer.rs index 5df49289..98e0d637 100644 --- a/futures/src/consumer.rs +++ b/futures/src/consumer.rs @@ -2,9 +2,9 @@ use futures::{Async,Poll,Stream,task}; use lapin_async::consumer::ConsumerSubscriber; use tokio_io::{AsyncRead,AsyncWrite}; use std::collections::VecDeque; -use std::io; use std::sync::{Arc,Mutex}; +use error::Error; use message::Delivery; use transport::*; @@ -76,16 +76,16 @@ impl Consumer { impl Stream for Consumer { type Item = Delivery; - type Error = io::Error; + type Error = Error; - fn poll(&mut self) -> Poll, io::Error> { + fn poll(&mut self) -> Poll, Error> { trace!("consumer poll; consumer_tag={:?} polling transport", self.consumer_tag); let mut transport = lock_transport!(self.transport); transport.poll()?; let mut inner = match self.inner.lock() { Ok(inner) => inner, Err(_) => if self.inner.is_poisoned() { - return Err(io::Error::new(io::ErrorKind::Other, "Consumer mutex is poisoned")) + return Err(Error::new("Consumer mutex is poisoned")) } else { task::current().notify(); return Ok(Async::NotReady) diff --git a/futures/src/error.rs b/futures/src/error.rs new file mode 100644 index 00000000..3750a1ca --- /dev/null +++ b/futures/src/error.rs @@ -0,0 +1,32 @@ +use std::fmt; +use failure::{Backtrace, Context, Fail}; + +/// The type of error that can be returned in this crate. +#[derive(Debug)] +pub struct Error { + inner: Context, +} + +impl Error { + pub(crate) fn new(message: impl Into) -> Self { + Error { + inner: Context::new(message.into()) + } + } +} + +impl Fail for Error { + fn cause(&self) -> Option<&Fail> { + self.inner.cause() + } + + fn backtrace(&self) -> Option<&Backtrace> { + self.inner.backtrace() + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.inner, f) + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 93de954f..5a7ac305 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -18,9 +18,11 @@ //! ```rust,no_run //! #[macro_use] extern crate log; //! extern crate lapin_futures as lapin; +//! extern crate failure; //! extern crate futures; //! extern crate tokio; //! +//! use failure::Error; //! use futures::future::Future; //! use futures::Stream; //! use tokio::net::TcpStream; @@ -33,16 +35,18 @@ //! let addr = "127.0.0.1:5672".parse().unwrap(); //! //! Runtime::new().unwrap().block_on_all( -//! TcpStream::connect(&addr).and_then(|stream| { +//! TcpStream::connect(&addr).map_err(Error::from).and_then(|stream| { //! //! // connect() returns a future of an AMQP Client //! // that resolves once the handshake is done //! lapin::client::Client::connect(stream, ConnectionOptions::default()) +//! .map_err(Error::from) //! }).and_then(|(client, _ /* heartbeat */)| { //! //! // create_channel returns a future that is resolved //! // once the channel is successfully created //! client.create_channel() +//! .map_err(Error::from) //! }).and_then(|channel| { //! let id = channel.id; //! info!("created channel with id: {}", id); @@ -54,7 +58,7 @@ //! info!("channel {} declared queue {}", id, "hello"); //! //! channel.basic_publish("", "hello", b"hello from tokio".to_vec(), BasicPublishOptions::default(), BasicProperties::default()) -//! }) +//! }).map_err(Error::from) //! }) //! ).expect("runtime failure"); //! } @@ -65,9 +69,11 @@ //! ```rust,no_run //! #[macro_use] extern crate log; //! extern crate lapin_futures as lapin; +//! extern crate failure; //! extern crate futures; //! extern crate tokio; //! +//! use failure::Error; //! use futures::future::Future; //! use futures::Stream; //! use tokio::net::TcpStream; @@ -80,11 +86,12 @@ //! let addr = "127.0.0.1:5672".parse().unwrap(); //! //! Runtime::new().unwrap().block_on_all( -//! TcpStream::connect(&addr).and_then(|stream| { +//! TcpStream::connect(&addr).map_err(Error::from).and_then(|stream| { //! //! // connect() returns a future of an AMQP Client //! // that resolves once the handshake is done //! lapin::client::Client::connect(stream, ConnectionOptions::default()) +//! .map_err(Error::from) //! }).and_then(|(client, heartbeat)| { //! // The heartbeat future should be run in a dedicated thread so that nothing can prevent it from //! // dispatching events on time. @@ -94,7 +101,7 @@ //! //! // create_channel returns a future that is resolved //! // once the channel is successfully created -//! client.create_channel() +//! client.create_channel().map_err(Error::from) //! }).and_then(|channel| { //! let id = channel.id; //! info!("created channel with id: {}", id); @@ -115,7 +122,7 @@ //! info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap()); //! ch.basic_ack(message.delivery_tag, false) //! }) -//! }) +//! }).map_err(Error::from) //! }) //! ).expect("runtime failure"); //! } @@ -125,6 +132,7 @@ extern crate amq_protocol; extern crate cookie_factory; extern crate bytes; +#[macro_use] extern crate failure; extern crate futures; extern crate lapin_async; #[macro_use] extern crate log; @@ -137,6 +145,7 @@ extern crate tokio_timer; pub mod client; pub mod channel; pub mod consumer; +pub mod error; pub mod queue; pub mod message; pub mod types; diff --git a/futures/src/transport.rs b/futures/src/transport.rs index 749e06fe..9ce2faf0 100644 --- a/futures/src/transport.rs +++ b/futures/src/transport.rs @@ -6,13 +6,31 @@ use nom::Offset; use cookie_factory::GenError; use bytes::{BufMut, BytesMut}; use std::cmp; +use std::io; use std::iter::repeat; -use std::io::{self,Error,ErrorKind}; use futures::{Async,AsyncSink,Poll,Sink,StartSend,Stream,Future,future}; use tokio_codec::{Decoder,Encoder,Framed}; use tokio_io::{AsyncRead,AsyncWrite}; + use channel::BasicProperties; use client::ConnectionOptions; +use error::Error; + +#[derive(Fail, Debug)] +pub enum CodecError { + #[fail(display = "IO Error: {}", _0)] + IoError(io::Error), + #[fail(display = "Couldn't parse incoming frame")] + ParseError, + #[fail(display = "Couldn't generate outcoming frame")] + GenerationError, +} + +impl From for CodecError { + fn from(err: io::Error) -> Self { + CodecError::IoError(err) + } +} /// During my testing, it appeared to be the "best" value. /// To fine-tune it, use a queue with a very large amount of messages, consume it and compare the @@ -30,15 +48,15 @@ pub struct AMQPCodec { impl Decoder for AMQPCodec { type Item = AMQPFrame; - type Error = io::Error; + type Error = CodecError; - fn decode(&mut self, buf: &mut BytesMut) -> Result, io::Error> { + fn decode(&mut self, buf: &mut BytesMut) -> Result, CodecError> { let (consumed, f) = match parse_frame(buf) { Err(e) => { if e.is_incomplete() { return Ok(None); } else { - return Err(io::Error::new(io::ErrorKind::Other, format!("parse error: {:?}", e))); + return Err(CodecError::ParseError); } }, Ok((i, frame)) => { @@ -56,7 +74,7 @@ impl Decoder for AMQPCodec { impl Encoder for AMQPCodec { type Item = AMQPFrame; - type Error = io::Error; + type Error = CodecError; fn encode(&mut self, frame: AMQPFrame, buf: &mut BytesMut) -> Result<(), Self::Error> { let frame_max = cmp::max(self.frame_max, 8192) as usize; @@ -87,7 +105,7 @@ impl Encoder for AMQPCodec { }, Err(e) => { error!("error generating frame: {:?}", e); - return Err(Error::new(ErrorKind::InvalidData, "could not generate")); + return Err(CodecError::GenerationError); } } } @@ -109,17 +127,16 @@ impl AMQPTransport /// starts the connection process /// /// returns a future of a `AMQPTransport` that is connected - pub fn connect(stream: T, options: ConnectionOptions) -> impl Future, Error = io::Error> + Send + 'static { + pub fn connect(stream: T, options: ConnectionOptions) -> impl Future, Error = Error> + Send + 'static { let mut conn = Connection::new(); conn.set_credentials(&options.username, &options.password); conn.set_vhost(&options.vhost); conn.set_frame_max(options.frame_max); conn.set_heartbeat(options.heartbeat); - future::result(conn.connect()).map_err(|e| { - let err = format!("Failed to connect: {:?}", e); - Error::new(ErrorKind::ConnectionAborted, err) - }).and_then(|_| { + future::result(conn.connect()) + .map_err(|e| Error::new(format!("Failed to connect: {:?}", e))) + .and_then(|_| { let codec = AMQPCodec { frame_max: conn.configuration.frame_max, }; @@ -156,7 +173,7 @@ impl AMQPTransport } /// Preemptively send an heartbeat frame - pub fn send_heartbeat(&mut self) -> Poll<(), io::Error> { + pub fn send_heartbeat(&mut self) -> Poll<(), Error> { if let Some(frame) = self.heartbeat.take() { self.conn.frame_queue.push_front(frame); } @@ -176,14 +193,13 @@ impl AMQPTransport /// /// * In case of error, it will return `Err(e)` /// * If the socket was closed, it will return `Ok(Async::Ready(()))` - fn poll_recv(&mut self) -> Poll<(), io::Error> { + fn poll_recv(&mut self) -> Poll<(), Error> { for _ in 0..POLL_RECV_LIMIT { match self.upstream.poll() { Ok(Async::Ready(Some(frame))) => { trace!("transport poll_recv; frame={:?}", frame); if let Err(e) = self.conn.handle_frame(frame) { - let err = format!("failed to handle frame: {:?}", e); - return Err(io::Error::new(io::ErrorKind::Other, err)); + return Err(Error::new(format!("failed to handle frame: {:?}", e))); } }, Ok(Async::Ready(None)) => { @@ -196,7 +212,7 @@ impl AMQPTransport }, Err(e) => { error!("transport poll_recv; status=Err({:?})", e); - return Err(From::from(e)); + return Err(Error::new(e.to_string())); }, }; } @@ -204,17 +220,20 @@ impl AMQPTransport } /// Poll the network to send outcoming frames. - fn poll_send(&mut self) -> Poll<(), io::Error> { + fn poll_send(&mut self) -> Poll<(), Error> { while let Some(frame) = self.conn.next_frame() { trace!("transport poll_send; frame={:?}", frame); - match self.start_send(frame)? { - AsyncSink::Ready => { + match self.start_send(frame) { + Ok(AsyncSink::Ready) => { trace!("transport poll_send; status=Ready"); }, - AsyncSink::NotReady(frame) => { + Ok(AsyncSink::NotReady(frame)) => { trace!("transport poll_send; status=NotReady"); self.conn.frame_queue.push_front(frame); return Ok(Async::NotReady); + }, + Err(e) => { + return Err(Error::new(e.to_string())); } } } @@ -227,13 +246,13 @@ impl Stream for AMQPTransport T: Send, T: 'static { type Item = (); - type Error = io::Error; + type Error = Error; - fn poll(&mut self) -> Poll, io::Error> { + fn poll(&mut self) -> Poll, Error> { trace!("transport poll"); if let Async::Ready(()) = self.poll_recv()? { trace!("poll transport; status=Ready"); - return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "The connection was closed by the remote peer")); + return Err(Error::new("The connection was closed by the remote peer")); } self.poll_send().map(|r| r.map(Some)) } @@ -244,16 +263,18 @@ impl Sink for AMQPTransport T: Send, T: 'static { type SinkItem = AMQPFrame; - type SinkError = io::Error; + type SinkError = Error; fn start_send(&mut self, frame: Self::SinkItem) -> StartSend { trace!("transport start_send; frame={:?}", frame); self.upstream.start_send(frame) + .map_err(|e| Error::new(e.to_string())) } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { trace!("transport poll_complete"); self.upstream.poll_complete() + .map_err(|e| Error::new(e.to_string())) } } @@ -271,7 +292,7 @@ impl Future for AMQPTransportConnector T: 'static { type Item = AMQPTransport; - type Error = io::Error; + type Error = Error; fn poll(&mut self) -> Poll { trace!("connector poll; has_transport={:?}", !self.transport.is_none()); @@ -295,7 +316,7 @@ macro_rules! lock_transport ( match $t.lock() { Ok(t) => t, Err(_) => if $t.is_poisoned() { - return Err(io::Error::new(io::ErrorKind::Other, "Transport mutex is poisoned")) + return Err(Error::new("Transport mutex is poisoned")) } else { task::current().notify(); return Ok(Async::NotReady) diff --git a/futures/tests/connection.rs b/futures/tests/connection.rs index 58582149..d3f98855 100644 --- a/futures/tests/connection.rs +++ b/futures/tests/connection.rs @@ -1,9 +1,11 @@ #[macro_use] extern crate log; extern crate lapin_futures as lapin; +extern crate failure; extern crate futures; extern crate tokio; extern crate env_logger; +use failure::Error; use futures::Stream; use futures::future::Future; use tokio::net::TcpStream; @@ -20,8 +22,8 @@ fn connection() { let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "127.0.0.1:5672".to_string()).parse().unwrap(); Runtime::new().unwrap().block_on_all( - TcpStream::connect(&addr).and_then(|stream| { - lapin::client::Client::connect(stream, ConnectionOptions::default()) + TcpStream::connect(&addr).map_err(Error::from).and_then(|stream| { + lapin::client::Client::connect(stream, ConnectionOptions::default()).map_err(Error::from) }).and_then(|(client, _)| { client.create_channel().and_then(|channel| { @@ -62,7 +64,7 @@ fn connection() { ch2.queue_delete("hello", QueueDeleteOptions::default()) }) }) - }) + }).map_err(Error::from) }) ).expect("runtime failure"); } From 6305ad89a3042cf2b4ee22756ba9b0224803bdd6 Mon Sep 17 00:00:00 2001 From: Louis Kureuil Person Date: Sat, 17 Nov 2018 16:35:31 +0100 Subject: [PATCH 2/5] Limit conversions to failure::Error in examples Errors are now printed using the `Display` formatter instead of the `Debug` formatter. --- futures/examples/client.rs | 4 ++-- futures/examples/consumers.rs | 15 ++++++++++----- futures/examples/topic.rs | 32 ++++++++++++++++++-------------- 3 files changed, 30 insertions(+), 21 deletions(-) diff --git a/futures/examples/client.rs b/futures/examples/client.rs index cbb7344f..bb56d92f 100644 --- a/futures/examples/client.rs +++ b/futures/examples/client.rs @@ -26,7 +26,7 @@ fn main() { ..Default::default() }).map_err(Error::from) }).and_then(|(client, heartbeat)| { - tokio::spawn(heartbeat.map_err(|e| eprintln!("{:?}", e))); + tokio::spawn(heartbeat.map_err(|e| eprintln!("heartbeat error: {}", e))); client.create_confirm_channel(ConfirmSelectOptions::default()).and_then(|channel| { let id = channel.id; @@ -85,6 +85,6 @@ fn main() { }) }) }).map_err(Error::from) - }).map_err(|err| eprintln!("error: {:?}", err)) + }).map_err(|err| eprintln!("An error occured: {}", err)) ).expect("runtime exited with failure") } diff --git a/futures/examples/consumers.rs b/futures/examples/consumers.rs index 7514151c..362103c1 100644 --- a/futures/examples/consumers.rs +++ b/futures/examples/consumers.rs @@ -54,12 +54,17 @@ fn main() { ..Default::default() }).map_err(Error::from) }).and_then(|(client, heartbeat)| { - tokio::spawn(heartbeat.map_err(|e| eprintln!("heartbeat error: {:?}", e))) - .into_future().map(|_| client).map_err(|_| err_msg("spawn error")) + tokio::spawn(heartbeat.map_err(|e| eprintln!("heartbeat error: {}", e))) + .into_future() + .map(|_| client) + .map_err(|_| err_msg("Couldn't spawn the heartbeat task")) }).and_then(|client| { let _client = client.clone(); - futures::stream::iter_ok(0..N_CONSUMERS).for_each(move |n| tokio::spawn(create_consumer(&_client, n))) - .into_future().map(move |_| client).map_err(|_| err_msg("spawn error")) + futures::stream::iter_ok(0..N_CONSUMERS) + .for_each(move |n| tokio::spawn(create_consumer(&_client, n))) + .into_future() + .map(move |_| client) + .map_err(|_| err_msg("Couldn't spawn the consumer task")) }).and_then(|client| { client.create_confirm_channel(ConfirmSelectOptions::default()).and_then(move |channel| { futures::stream::iter_ok((0..N_CONSUMERS).flat_map(|c| { @@ -78,6 +83,6 @@ fn main() { }) }) }).map_err(Error::from) - }).map_err(|err| eprintln!("error: {:?}", err)) + }).map_err(|err| eprintln!("An error occured: {}", err)) ).expect("runtime exited with failure"); } diff --git a/futures/examples/topic.rs b/futures/examples/topic.rs index 1682fb9b..e9d7c8ab 100644 --- a/futures/examples/topic.rs +++ b/futures/examples/topic.rs @@ -28,20 +28,24 @@ fn main() { ..Default::default() }).map_err(Error::from) }).and_then(|(client, heartbeat)| { - tokio::spawn(heartbeat.map_err(|e| eprintln!("heartbeat error: {:?}", e))) - .into_future().map(|_| client).map_err(|_| err_msg("spawn error")) + tokio::spawn(heartbeat.map_err(|e| eprintln!("heartbeat error: {}", e))) + .into_future() + .map(|_| client) + .map_err(|_| err_msg("spawn error")) }).and_then(|client| { - client.create_confirm_channel(ConfirmSelectOptions::default()).map_err(Error::from) - }).and_then(|channel| { - channel.clone().exchange_declare("hello_topic", "topic", ExchangeDeclareOptions::default(), FieldTable::new()).map(move |_| channel).map_err(Error::from) - }).and_then(|channel| { - channel.clone().queue_declare("topic_queue", QueueDeclareOptions::default(), FieldTable::new()).map(move |_| channel).map_err(Error::from) - }).and_then(|channel| { - channel.clone().queue_bind("topic_queue", "hello_topic", "*.foo.*", QueueBindOptions::default(), FieldTable::new()).map(move |_| channel).map_err(Error::from) - }).and_then(|channel| { - channel.basic_publish("hello_topic", "hello.fooo.bar", b"hello".to_vec(), BasicPublishOptions::default(), BasicProperties::default()).map(|confirmation| { - println!("got confirmation of publication: {:?}", confirmation); - }).map_err(Error::from) - }).map_err(|err| eprintln!("error: {:?}", err)) + client.create_confirm_channel(ConfirmSelectOptions::default()) + .and_then(|channel| { + channel.clone().exchange_declare("hello_topic", "topic", ExchangeDeclareOptions::default(), FieldTable::new()).map(move |_| channel) + }).and_then(|channel| { + channel.clone().queue_declare("topic_queue", QueueDeclareOptions::default(), FieldTable::new()).map(move |_| channel) + }).and_then(|channel| { + channel.clone().queue_bind("topic_queue", "hello_topic", "*.foo.*", QueueBindOptions::default(), FieldTable::new()).map(move |_| channel) + }).and_then(|channel| { + channel.basic_publish("hello_topic", "hello.fooo.bar", b"hello".to_vec(), BasicPublishOptions::default(), BasicProperties::default()).map(|confirmation| { + println!("got confirmation of publication: {:?}", confirmation); + }) + }) + .map_err(Error::from) + }).map_err(|err| eprintln!("An error occured: {}", err)) ).expect("runtime exited with failure"); } From 02a3b0a050870430b455d155542de127455e87ed Mon Sep 17 00:00:00 2001 From: Louis Kureuil Person Date: Sat, 17 Nov 2018 16:51:23 +0100 Subject: [PATCH 3/5] Display the cause for a GenerationError in AMQPCodec --- futures/src/transport.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/futures/src/transport.rs b/futures/src/transport.rs index 9ce2faf0..00e1adfc 100644 --- a/futures/src/transport.rs +++ b/futures/src/transport.rs @@ -22,8 +22,8 @@ pub enum CodecError { IoError(io::Error), #[fail(display = "Couldn't parse incoming frame")] ParseError, - #[fail(display = "Couldn't generate outcoming frame")] - GenerationError, + #[fail(display = "Couldn't generate outcoming frame: {:?}", _0)] + GenerationError(GenError), } impl From for CodecError { @@ -105,7 +105,7 @@ impl Encoder for AMQPCodec { }, Err(e) => { error!("error generating frame: {:?}", e); - return Err(CodecError::GenerationError); + return Err(CodecError::GenerationError(e)); } } } From f1d327f9805d98cdf8a07480f327643499c0165d Mon Sep 17 00:00:00 2001 From: Louis Kureuil Person Date: Sat, 17 Nov 2018 17:13:13 +0100 Subject: [PATCH 4/5] Add a changelog entry about the new Error type --- CHANGELOG.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1376ddca..eca6b125 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +### Unreleased + +#### Breaking Changes + +* **futures:** + * Introduce a new `Error` type, replacing occurences of `io::Error` in public APIs ([#145](https://github.com/sozu-proxy/lapin/pull/145)) + ### 0.14.1 (2018-11-16) #### Housekeeping @@ -8,7 +15,6 @@ #### Bug Fixes * Fix heartbeat interval - ### 0.14.0 (2018-10-17) #### Housekeeping From 3148e07e248390f34ee8c5463e320ec41d8abb13 Mon Sep 17 00:00:00 2001 From: Louis Kureuil Person Date: Sat, 17 Nov 2018 17:20:19 +0100 Subject: [PATCH 5/5] Display the cause for a ParseError in AMQPCodec --- futures/src/transport.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/futures/src/transport.rs b/futures/src/transport.rs index 00e1adfc..e1921cdb 100644 --- a/futures/src/transport.rs +++ b/futures/src/transport.rs @@ -20,8 +20,8 @@ use error::Error; pub enum CodecError { #[fail(display = "IO Error: {}", _0)] IoError(io::Error), - #[fail(display = "Couldn't parse incoming frame")] - ParseError, + #[fail(display = "Couldn't parse incoming frame: {}", _0)] + ParseError(String), #[fail(display = "Couldn't generate outcoming frame: {:?}", _0)] GenerationError(GenError), } @@ -56,7 +56,7 @@ impl Decoder for AMQPCodec { if e.is_incomplete() { return Ok(None); } else { - return Err(CodecError::ParseError); + return Err(CodecError::ParseError(format!("{:?}", e))); } }, Ok((i, frame)) => {