Skip to content

Commit

Permalink
Merge pull request #145 from kureuil/introduce-error-type
Browse files Browse the repository at this point in the history
Introduce dedicated error types to the lapin-futures crate
  • Loading branch information
Keruspe authored Nov 17, 2018
2 parents c34e17d + 3148e07 commit b213113
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 116 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
### Unreleased

#### Breaking Changes

* **futures:**
* Introduce a new `Error` type, replacing occurences of `io::Error` in public APIs ([#145](https://github.com/sozu-proxy/lapin/pull/145))

### 0.14.1 (2018-11-16)

#### Housekeeping
Expand All @@ -8,7 +15,6 @@
#### Bug Fixes

* Fix heartbeat interval

### 0.14.0 (2018-10-17)

#### Housekeeping
Expand Down
1 change: 1 addition & 0 deletions futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ license = "MIT"
nom = "^4.0"
log = "^0.4"
bytes = "^0.4"
failure = "^0.1"
futures = "^0.1"
tokio-codec = "^0.1"
tokio-io = "^0.1"
Expand Down
12 changes: 7 additions & 5 deletions futures/examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#[macro_use] extern crate log;
extern crate lapin_futures as lapin;
extern crate failure;
extern crate futures;
extern crate tokio;
extern crate env_logger;

use failure::Error;
use futures::future::Future;
use futures::Stream;
use tokio::net::TcpStream;
Expand All @@ -18,13 +20,13 @@ fn main() {
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "127.0.0.1:5672".to_string()).parse().unwrap();

Runtime::new().unwrap().block_on_all(
TcpStream::connect(&addr).and_then(|stream| {
TcpStream::connect(&addr).map_err(Error::from).and_then(|stream| {
lapin::client::Client::connect(stream, ConnectionOptions {
frame_max: 65535,
..Default::default()
})
}).map_err(Error::from)
}).and_then(|(client, heartbeat)| {
tokio::spawn(heartbeat.map_err(|e| eprintln!("{:?}", e)));
tokio::spawn(heartbeat.map_err(|e| eprintln!("heartbeat error: {}", e)));

client.create_confirm_channel(ConfirmSelectOptions::default()).and_then(|channel| {
let id = channel.id;
Expand Down Expand Up @@ -82,7 +84,7 @@ fn main() {
c.basic_ack(message.delivery_tag, false)
})
})
})
}).map_err(|err| eprintln!("error: {:?}", err))
}).map_err(Error::from)
}).map_err(|err| eprintln!("An error occured: {}", err))
).expect("runtime exited with failure")
}
27 changes: 16 additions & 11 deletions futures/examples/consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ extern crate env_logger;
extern crate lapin_futures as lapin;
#[macro_use]
extern crate log;
extern crate failure;
extern crate futures;
extern crate tokio;

use std::io;

use failure::{err_msg, Error};
use futures::future::Future;
use futures::{IntoFuture, Stream};
use tokio::io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -36,7 +36,7 @@ fn create_consumer<T: AsyncRead + AsyncWrite + Sync + Send + 'static>(client: &C
println!("consumer '{}' got '{}'", n, std::str::from_utf8(&message.data).unwrap());
channel.basic_ack(message.delivery_tag, false)
})
}).map(|_| ()).map_err(move |err| eprintln!("got error in consumer '{}': {:?}", n, err))
}).map(|_| ()).map_err(move |err| eprintln!("got error in consumer '{}': {}", n, err))
}

fn main() {
Expand All @@ -47,19 +47,24 @@ fn main() {
// let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();

runtime.block_on_all(
TcpStream::connect(&addr).and_then(|stream| {
TcpStream::connect(&addr).map_err(Error::from).and_then(|stream| {
Client::connect(stream, ConnectionOptions {
frame_max: 65535,
heartbeat: 20,
..Default::default()
})
}).map_err(Error::from)
}).and_then(|(client, heartbeat)| {
tokio::spawn(heartbeat.map_err(|e| eprintln!("heartbeat error: {:?}", e)))
.into_future().map(|_| client).map_err(|_| io::Error::new(io::ErrorKind::Other, "spawn error"))
tokio::spawn(heartbeat.map_err(|e| eprintln!("heartbeat error: {}", e)))
.into_future()
.map(|_| client)
.map_err(|_| err_msg("Couldn't spawn the heartbeat task"))
}).and_then(|client| {
let _client = client.clone();
futures::stream::iter_ok(0..N_CONSUMERS).for_each(move |n| tokio::spawn(create_consumer(&_client, n)))
.into_future().map(move |_| client).map_err(|_| io::Error::new(io::ErrorKind::Other, "spawn error"))
futures::stream::iter_ok(0..N_CONSUMERS)
.for_each(move |n| tokio::spawn(create_consumer(&_client, n)))
.into_future()
.map(move |_| client)
.map_err(|_| err_msg("Couldn't spawn the consumer task"))
}).and_then(|client| {
client.create_confirm_channel(ConfirmSelectOptions::default()).and_then(move |channel| {
futures::stream::iter_ok((0..N_CONSUMERS).flat_map(|c| {
Expand All @@ -77,7 +82,7 @@ fn main() {
})
})
})
})
}).map_err(|err| eprintln!("error: {:?}", err))
}).map_err(Error::from)
}).map_err(|err| eprintln!("An error occured: {}", err))
).expect("runtime exited with failure");
}
38 changes: 21 additions & 17 deletions futures/examples/topic.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
extern crate env_logger;
extern crate failure;
extern crate lapin_futures as lapin;
extern crate log;
extern crate futures;
extern crate tokio;

use std::io;

use failure::{err_msg, Error};
use futures::future::Future;
use futures::IntoFuture;
use tokio::net::TcpStream;
Expand All @@ -21,27 +21,31 @@ fn main() {
let runtime = Runtime::new().unwrap();

runtime.block_on_all(
TcpStream::connect(&addr).and_then(|stream| {
TcpStream::connect(&addr).map_err(Error::from).and_then(|stream| {
Client::connect(stream, ConnectionOptions {
frame_max: 65535,
heartbeat: 20,
..Default::default()
})
}).map_err(Error::from)
}).and_then(|(client, heartbeat)| {
tokio::spawn(heartbeat.map_err(|e| eprintln!("heartbeat error: {:?}", e)))
.into_future().map(|_| client).map_err(|_| io::Error::new(io::ErrorKind::Other, "spawn error"))
tokio::spawn(heartbeat.map_err(|e| eprintln!("heartbeat error: {}", e)))
.into_future()
.map(|_| client)
.map_err(|_| err_msg("spawn error"))
}).and_then(|client| {
client.create_confirm_channel(ConfirmSelectOptions::default())
}).and_then(|channel| {
channel.clone().exchange_declare("hello_topic", "topic", ExchangeDeclareOptions::default(), FieldTable::new()).map(move |_| channel)
}).and_then(|channel| {
channel.clone().queue_declare("topic_queue", QueueDeclareOptions::default(), FieldTable::new()).map(move |_| channel)
}).and_then(|channel| {
channel.clone().queue_bind("topic_queue", "hello_topic", "*.foo.*", QueueBindOptions::default(), FieldTable::new()).map(move |_| channel)
}).and_then(|channel| {
channel.basic_publish("hello_topic", "hello.fooo.bar", b"hello".to_vec(), BasicPublishOptions::default(), BasicProperties::default()).map(|confirmation| {
println!("got confirmation of publication: {:?}", confirmation);
})
}).map_err(|err| eprintln!("error: {:?}", err))
.and_then(|channel| {
channel.clone().exchange_declare("hello_topic", "topic", ExchangeDeclareOptions::default(), FieldTable::new()).map(move |_| channel)
}).and_then(|channel| {
channel.clone().queue_declare("topic_queue", QueueDeclareOptions::default(), FieldTable::new()).map(move |_| channel)
}).and_then(|channel| {
channel.clone().queue_bind("topic_queue", "hello_topic", "*.foo.*", QueueBindOptions::default(), FieldTable::new()).map(move |_| channel)
}).and_then(|channel| {
channel.basic_publish("hello_topic", "hello.fooo.bar", b"hello".to_vec(), BasicPublishOptions::default(), BasicProperties::default()).map(|confirmation| {
println!("got confirmation of publication: {:?}", confirmation);
})
})
.map_err(Error::from)
}).map_err(|err| eprintln!("An error occured: {}", err))
).expect("runtime exited with failure");
}
Loading

0 comments on commit b213113

Please sign in to comment.