diff --git a/Cargo.toml b/Cargo.toml index 3541dc634..034bac036 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ + "abci", "light-client", "light-node", "p2p", diff --git a/abci/Cargo.toml b/abci/Cargo.toml new file mode 100644 index 000000000..afb05590e --- /dev/null +++ b/abci/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "tendermint-abci" +version = "0.17.1" +authors = ["Thane Thomson "] +edition = "2018" +description = """ + tendermint-abci provides a framework for building Tendermint applications + in Rust. + """ + +[features] +blocking = [] +client = [] +echo-app = [] +non-blocking = [ "async-trait", "futures", "pin-project" ] +runtime-async-std = [ "async-channel", "async-std" ] +runtime-std = [] +runtime-tokio = [ "tokio", "tokio-util" ] + +[dependencies] +bytes = "1.0" +eyre = "0.6" +log = "0.4" +tendermint = { version = "0.17.1", path = "../tendermint" } +tendermint-proto = { version = "0.17.1", path = "../proto" } +thiserror = "1.0" + +async-trait = { version = "0.1", optional = true } +async-channel = { version = "1.5", optional = true } +async-std = { version = "1.8", features = [ "attributes" ], optional = true } +futures = { version = "0.3", optional = true } +pin-project = { version = "1.0", optional = true } +tokio = { version = "1.0", features = [ "macros", "net", "rt", "sync" ], optional = true } +tokio-util = { version = "0.6", features = [ "codec" ], optional = true } diff --git a/abci/Makefile.toml b/abci/Makefile.toml new file mode 100644 index 000000000..3b00c6654 --- /dev/null +++ b/abci/Makefile.toml @@ -0,0 +1,22 @@ +# tendermint-abci makefile +# +# For use with https://github.com/sagiegurari/cargo-make + +[tasks.test-std] +command = "cargo" +args = ["test", "--features", "client,runtime-std,echo-app"] + +[tasks.test-async-std] +command = "cargo" +args = ["test", "--features", "async,client,runtime-async-std,echo-app"] + +[tasks.test-tokio] +command = "cargo" +args = ["test", "--features", "async,client,runtime-tokio,echo-app"] + +[tasks.test] +dependencies = [ + "test-std", + "test-async-std", + "test-tokio" +] diff --git a/abci/README.md b/abci/README.md new file mode 100644 index 000000000..835909a27 --- /dev/null +++ b/abci/README.md @@ -0,0 +1,90 @@ +## tendermint-abci + +[![Crate][crate-image]][crate-link] +[![Docs][docs-image]][docs-link] +[![Build Status][build-image]][build-link] +[![Audit Status][audit-image]][audit-link] +[![Apache 2.0 Licensed][license-image]][license-link] +![Rust Stable][rustc-image] + +**NB: Currently heavily under construction** + +Crate for creating ABCI applications for use with Tendermint. See the +[Tendermint ABCI docs][abci-docs] for more details on ABCI. + +## Requirements + +- The latest stable version of Rust + +## Features + +This crate exposes an interface that allows for the construction of ABCI +applications that run using the Rust standard library (i.e. simple +multi-threaded applications), as well as using various `async` runtimes. + +Currently, in terms of `async` runtimes, we support [Tokio] and [`async-std`]. + +These are all enabled/disabled by way of feature flags. + +## Testing + +To run integration tests for the Rust standard library (non-`async`, +multi-threaded), run: + +```bash +cargo test --features client,runtime-std,echo-app +``` + +To run all integration tests for all supported `async` runtimes, run: + +```bash +cargo test --all-features +``` + +To run Tokio-specific integration tests only, run: + +```bash +cargo test --features async,client,runtime-tokio,echo-app +``` + +To run `async-std`-specific integration tests only, run: + +```bash +cargo test --features async,client,runtime-async-std,echo-app +``` + +## License + +Copyright © 2020 Informal Systems + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use the files in this repository except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +[//]: # (badges) + +[crate-image]: https://img.shields.io/crates/v/tendermint-abci.svg +[crate-link]: https://crates.io/crates/tendermint-abci +[docs-image]: https://docs.rs/tendermint-abci/badge.svg +[docs-link]: https://docs.rs/tendermint-abci/ +[build-image]: https://github.com/informalsystems/tendermint-rs/workflows/Rust/badge.svg +[build-link]: https://github.com/informalsystems/tendermint-rs/actions?query=workflow%3ARust +[audit-image]: https://github.com/informalsystems/tendermint-rs/workflows/Audit-Check/badge.svg +[audit-link]: https://github.com/informalsystems/tendermint-rs/actions?query=workflow%3AAudit-Check +[license-image]: https://img.shields.io/badge/license-Apache2.0-blue.svg +[license-link]: https://github.com/informalsystems/tendermint-rs/blob/master/LICENSE +[rustc-image]: https://img.shields.io/badge/rustc-stable-blue.svg + +[//]: # (general links) + +[abci-docs]: https://docs.tendermint.com/master/spec/abci/ +[Tokio]: https://tokio.rs/ +[`async-std`]: https://async.rs/ diff --git a/abci/src/application.rs b/abci/src/application.rs new file mode 100644 index 000000000..1fa4860e2 --- /dev/null +++ b/abci/src/application.rs @@ -0,0 +1,30 @@ +//! ABCI application-related types. + +#[cfg(feature = "echo-app")] +pub mod echo; + +use tendermint::abci::{request, response}; + +/// ABCI server application interface. +pub trait Application: Send + Clone + 'static { + /// Request that the ABCI server echo back the same message sent to it. + fn echo(&self, req: request::Echo) -> response::Echo { + response::Echo { + message: req.message, + } + } + + /// Receive information about the Tendermint node and respond with + /// information about the ABCI application. + fn info(&self, _req: request::Info) -> response::Info { + Default::default() + } + + /// Generic handler for mapping incoming requests to responses. + fn handle(&self, req: request::Request) -> response::Response { + match req { + request::Request::Echo(echo) => response::Response::Echo(self.echo(echo)), + request::Request::Info(info) => response::Response::Info(self.info(info)), + } + } +} diff --git a/abci/src/application/echo.rs b/abci/src/application/echo.rs new file mode 100644 index 000000000..f5a5e5274 --- /dev/null +++ b/abci/src/application/echo.rs @@ -0,0 +1,41 @@ +//! Trivial ABCI application that just implements echo functionality. + +use crate::Application; +use tendermint::abci::{request, response}; + +/// Trivial ABCI application that just implements echo functionality. +#[derive(Clone)] +pub struct EchoApp { + data: String, + version: String, + app_version: u64, +} + +impl EchoApp { + /// Constructor. + pub fn new>(data: S, version: S, app_version: u64) -> Self { + Self { + data: data.as_ref().to_owned(), + version: version.as_ref().to_owned(), + app_version, + } + } +} + +impl Default for EchoApp { + fn default() -> Self { + EchoApp::new("Echo App", "0.0.1", 1) + } +} + +impl Application for EchoApp { + fn info(&self, _req: request::Info) -> response::Info { + response::Info { + data: self.data.clone(), + version: self.version.clone(), + app_version: self.app_version, + last_block_height: 1, + last_block_app_hash: vec![], + } + } +} diff --git a/abci/src/client.rs b/abci/src/client.rs new file mode 100644 index 000000000..00e47652d --- /dev/null +++ b/abci/src/client.rs @@ -0,0 +1,10 @@ +//! ABCI clients for interacting with ABCI servers. + +#[cfg(feature = "blocking")] +pub mod blocking; +#[cfg(feature = "non-blocking")] +pub mod non_blocking; + +/// The default size of the ABCI client's read buffer when reading responses +/// from the server. +pub const DEFAULT_CLIENT_READ_BUF_SIZE: usize = 1024; diff --git a/abci/src/client/blocking.rs b/abci/src/client/blocking.rs new file mode 100644 index 000000000..71884336e --- /dev/null +++ b/abci/src/client/blocking.rs @@ -0,0 +1,81 @@ +//! Blocking ABCI client. + +use crate::client::DEFAULT_CLIENT_READ_BUF_SIZE; +use crate::codec::blocking::Codec; +use crate::runtime::blocking::{Runtime, TcpStream}; +use crate::{Error, Result}; +use std::convert::TryInto; +use tendermint::abci::request::RequestInner; +use tendermint::abci::{request, response}; + +/// A runtime-dependent blocking ABCI client. +pub struct Client { + codec: Rt::ClientCodec, +} + +impl Client { + /// Request that the ABCI server echo back the message in the given + /// request. + pub fn echo(&mut self, req: request::Echo) -> Result { + self.perform(req) + } + + /// Provide information to the ABCI server about the Tendermint node in + /// exchange for information about the application. + pub fn info(&mut self, req: request::Info) -> Result { + self.perform(req) + } + + fn perform(&mut self, req: Req) -> Result { + self.codec.send(req.into())?; + match self.codec.next() { + Some(result) => { + let res = result?; + Ok(res.try_into()?) + } + None => Err(Error::ServerStreamTerminated), + } + } +} + +/// Builder for a blocking ABCI client. +pub struct ClientBuilder { + read_buf_size: usize, + _runtime: std::marker::PhantomData, +} + +impl ClientBuilder { + /// Constructor allowing customization of the client's read buffer size. + pub fn new(read_buf_size: usize) -> Self { + Self { + read_buf_size, + _runtime: Default::default(), + } + } + + /// Constructor for our [`Client`] instance, which attempts to connect to + /// the given address. + pub fn connect(self, addr: S) -> Result> + where + S: AsRef, + { + let stream = Rt::TcpStream::connect(addr.as_ref())?; + Ok(Client { + codec: Rt::ClientCodec::new(stream.into_inner(), self.read_buf_size), + }) + } +} + +impl Default for ClientBuilder { + fn default() -> Self { + Self { + read_buf_size: DEFAULT_CLIENT_READ_BUF_SIZE, + _runtime: Default::default(), + } + } +} + +#[cfg(feature = "runtime-std")] +/// Blocking ABCI client builder when using Rust's standard library as your +/// runtime. +pub type StdClientBuilder = ClientBuilder; diff --git a/abci/src/client/non_blocking.rs b/abci/src/client/non_blocking.rs new file mode 100644 index 000000000..318953a11 --- /dev/null +++ b/abci/src/client/non_blocking.rs @@ -0,0 +1,85 @@ +//! Non-blocking ABCI client. + +use crate::client::DEFAULT_CLIENT_READ_BUF_SIZE; +use crate::codec::non_blocking::Codec; +use crate::runtime::non_blocking::{Runtime, TcpStream}; +use crate::{Error, Result}; +use futures::{SinkExt, StreamExt}; +use std::convert::TryInto; +use tendermint::abci::request::RequestInner; +use tendermint::abci::{request, response}; + +/// A runtime-dependent non-blocking ABCI client. +pub struct Client { + codec: Rt::ClientCodec, +} + +impl Client { + /// Request that the ABCI server echo back the message in the given + /// request. + pub async fn echo(&mut self, req: request::Echo) -> Result { + self.perform(req).await + } + + /// Provide information to the ABCI server about the Tendermint node in + /// exchange for information about the application. + pub async fn info(&mut self, req: request::Info) -> Result { + self.perform(req).await + } + + async fn perform(&mut self, req: Req) -> Result { + self.codec.send(req.into()).await?; + let res: std::result::Result = self + .codec + .next() + .await + .ok_or(Error::ServerStreamTerminated)?? + .try_into(); + Ok(res?) + } +} + +/// Builder for a non-blocking ABCI client. +pub struct ClientBuilder { + read_buf_size: usize, + _runtime: std::marker::PhantomData, +} + +impl ClientBuilder { + /// Constructor allowing configuration of read buffer size. + pub fn new(read_buf_size: usize) -> Self { + Self { + read_buf_size, + _runtime: Default::default(), + } + } + + /// Connect to the ABCI server at the given network address. + pub async fn connect(self, addr: S) -> Result> + where + S: AsRef, + { + let stream = Rt::TcpStream::connect(addr.as_ref()).await?; + Ok(Client { + codec: Rt::ClientCodec::new(stream.into_inner(), self.read_buf_size), + }) + } +} + +impl Default for ClientBuilder { + fn default() -> Self { + Self { + read_buf_size: DEFAULT_CLIENT_READ_BUF_SIZE, + _runtime: Default::default(), + } + } +} + +#[cfg(feature = "runtime-tokio")] +/// Non-blocking ABCI client builder when using Tokio as your runtime. +pub type TokioClientBuilder = ClientBuilder; + +#[cfg(feature = "runtime-async-std")] +/// Non-blocking ABCI client builder when using `async-std` as your runtime. +pub type AsyncStdClientBuilder = + ClientBuilder; diff --git a/abci/src/codec.rs b/abci/src/codec.rs new file mode 100644 index 000000000..bb00993f6 --- /dev/null +++ b/abci/src/codec.rs @@ -0,0 +1,208 @@ +//! ABCI codec. + +#[cfg(feature = "blocking")] +pub mod blocking; +#[cfg(feature = "non-blocking")] +pub mod non_blocking; + +use crate::{Error, Result}; +use bytes::{Buf, BufMut, BytesMut}; +use tendermint::abci::request::Request; +use tendermint::abci::response::Response; +use tendermint_proto::Protobuf; + +/// The size of the server's read buffer for incoming messages. +pub const SERVER_READ_BUF_SIZE: usize = 1024 * 1024; + +/// The size of the client's read buffer for incoming messages. +pub const CLIENT_READ_BUF_SIZE: usize = 1024; + +// The maximum number of bytes we expect in a varint. We use this to check if +// we're encountering a decoding error for a varint. +const MAX_VARINT_LENGTH: usize = 16; + +/// A stateless encoder of `T` into its wire-level representation. +pub trait Encoder { + fn encode(value: T, dst: &mut BytesMut) -> Result<()>; +} + +/// Encodes [`tendermint::abci::Request`]s into their wire-level +/// representation as per the Tendermint Socket Protocol. +pub struct RequestEncoder; + +impl Encoder for RequestEncoder { + fn encode(value: Request, mut dst: &mut BytesMut) -> Result<()> { + encode_length_delimited(|mut b| Ok(value.encode(&mut b)?), &mut dst) + } +} + +/// Encodes [`tendermint::abci::Response`]s into their wire-level +/// representation as per the Tendermint Socket Protocol. +pub struct ResponseEncoder; + +impl Encoder for ResponseEncoder { + fn encode(value: Response, mut dst: &mut BytesMut) -> Result<()> { + encode_length_delimited(|mut b| Ok(value.encode(&mut b)?), &mut dst) + } +} + +/// A potentially stateful decoder of `T` from its wire-level representation. +pub trait Decoder { + fn decode(&mut self, buf: &mut BytesMut) -> Result>; +} + +/// Decodes [`tendermint::abci::Request`]s from their wire-level +/// representation as per the Tendermint Socket Protocol. +pub struct RequestDecoder { + read_buf: BytesMut, +} + +impl RequestDecoder { + /// Constructor. + pub fn new() -> Self { + Self { + read_buf: BytesMut::new(), + } + } +} + +impl Decoder for RequestDecoder { + fn decode(&mut self, buf: &mut BytesMut) -> Result> { + self.read_buf.put(buf); + decode_length_delimited(&mut self.read_buf, |mut b| Ok(Request::decode(&mut b)?)) + } +} + +impl Default for RequestDecoder { + fn default() -> Self { + Self::new() + } +} + +/// Decodes [`tendermint::abci::Response`]s from their wire-level +/// representation as per the Tendermint Socket Protocol. +pub struct ResponseDecoder { + read_buf: BytesMut, +} + +impl ResponseDecoder { + /// Constructor. + pub fn new() -> Self { + Self { + read_buf: BytesMut::new(), + } + } +} + +impl Decoder for ResponseDecoder { + fn decode(&mut self, buf: &mut BytesMut) -> Result> { + self.read_buf.put(buf); + decode_length_delimited(&mut self.read_buf, |mut b| Ok(Response::decode(&mut b)?)) + } +} + +impl Default for ResponseDecoder { + fn default() -> Self { + Self::new() + } +} + +// encode_varint and decode_varint will be removed once +// https://github.com/tendermint/tendermint/issues/5783 lands in Tendermint. +fn encode_varint(val: u64, mut buf: &mut B) { + tendermint_proto::encode_varint(val << 1, &mut buf); +} + +fn decode_varint(mut buf: &mut B) -> Result { + let len = tendermint_proto::decode_varint(&mut buf) + .map_err(|_| Error::Protobuf(tendermint_proto::Kind::DecodeMessage.into()))?; + Ok(len >> 1) +} + +// Allows us to avoid having to re-export `prost::Message`. +// TODO(thane): Investigate a better approach here. +fn encode_length_delimited(mut encode_fn: F, mut dst: &mut B) -> Result<()> +where + F: FnMut(&mut BytesMut) -> Result<()>, + B: BufMut, +{ + let mut buf = BytesMut::new(); + encode_fn(&mut buf)?; + let buf = buf.freeze(); + encode_varint(buf.len() as u64, &mut dst); + dst.put(buf); + Ok(()) +} + +fn decode_length_delimited(src: &mut BytesMut, mut decode_fn: F) -> Result> +where + F: FnMut(&mut BytesMut) -> Result, +{ + let src_len = src.len(); + let mut tmp = src.clone().freeze(); + let encoded_len = match decode_varint(&mut tmp) { + Ok(len) => len, + Err(e) => { + return if src_len <= MAX_VARINT_LENGTH { + // We've potentially only received a partial length delimiter + Ok(None) + } else { + Err(e) + }; + } + }; + let remaining = tmp.remaining() as u64; + if remaining < encoded_len { + // We don't have enough data yet to decode the entire message + Ok(None) + } else { + let delim_len = src_len - tmp.remaining(); + // We only advance the source buffer once we're sure we have enough + // data to try to decode the result. + src.advance(delim_len + (encoded_len as usize)); + + let mut result_bytes = BytesMut::from(tmp.split_to(encoded_len as usize).as_ref()); + Ok(Some(decode_fn(&mut result_bytes)?)) + } +} + +#[cfg(test)] +mod test { + use super::*; + use tendermint::abci::request::Echo; + + #[test] + fn single_request() { + let request = Request::Echo(Echo { + message: "Hello TSP!".to_owned(), + }); + let mut buf = BytesMut::new(); + RequestEncoder::encode(request.clone(), &mut buf).unwrap(); + + let mut decoder = RequestDecoder::new(); + let decoded_request = decoder.decode(&mut buf).unwrap().unwrap(); + + assert_eq!(request, decoded_request); + } + + #[test] + fn multiple_requests() { + let requests = (0..5) + .map(|r| { + Request::Echo(Echo { + message: format!("Request {}", r), + }) + }) + .collect::>(); + let mut buf = BytesMut::new(); + requests + .iter() + .for_each(|request| RequestEncoder::encode(request.clone(), &mut buf).unwrap()); + + let mut decoder = RequestDecoder::new(); + for request in requests { + let decoded = decoder.decode(&mut buf).unwrap().unwrap(); + assert_eq!(decoded, request); + } + } +} diff --git a/abci/src/codec/blocking.rs b/abci/src/codec/blocking.rs new file mode 100644 index 000000000..2c501fe7d --- /dev/null +++ b/abci/src/codec/blocking.rs @@ -0,0 +1,120 @@ +//! Blocking API for frame en/decoding. + +use crate::codec::{ + Decoder, Encoder, RequestDecoder, RequestEncoder, ResponseDecoder, ResponseEncoder, +}; +use crate::Result; +use bytes::{Buf, BytesMut}; +use std::io::{Read, Write}; +use std::marker::PhantomData; +use tendermint::abci::request::Request; +use tendermint::abci::response::Response; + +/// An ABCI server decodes incoming [`tendermint::abci::request::Request`]s +/// and encodes outgoing [`tendermint::abci::response::Response`]s. The stream +/// `S` must implement [`std::io::Read`] and [`std::io::Write`]. +pub type ServerCodec = CodecImpl; + +/// An ABCI client encodes outgoing [`tendermint::abci::request::Request`]s +/// and decodes incoming [`tendermint::abci::response::Response`]s. The stream +/// `S` must implement [`std::io::Read`] and [`std::io::Write`]. +pub type ClientCodec = CodecImpl; + +/// A blocking codec that allows us to iterate over `S`, producing values of +/// type `Result`, and send values of type `O`. +pub trait Codec: Iterator> { + /// Constructor. + fn new(inner: S, read_buf_size: usize) -> Self; + + /// Send the given value out using this codec. + fn send(&mut self, value: O) -> Result<()>; +} + +/// Blocking adapter to convert an underlying I/O stream, which implements +/// [`std::io::Read`] and [`std::io::Write`], into an [`Iterator`] producing +/// entities of type `I` and allowing for sending of entities of type `O`. +/// +/// The blocking iterator terminates once the underlying reader terminates. +pub struct CodecImpl { + inner: S, + _encoder: PhantomData, + _outgoing: PhantomData, + decoder: D, + _incoming: PhantomData, + growable_read_buf: BytesMut, + growable_write_buf: BytesMut, + read_buf: Vec, +} + +impl Iterator for CodecImpl +where + D: Decoder, + S: Read, +{ + type Item = Result; + + fn next(&mut self) -> Option { + loop { + // Try to decode a request from our internal read buffer first + match self.decoder.decode(&mut self.growable_read_buf) { + Ok(req_opt) => { + if let Some(req) = req_opt { + return Some(Ok(req)); + } + } + Err(e) => return Some(Err(e)), + } + + // If we don't have enough data to decode a message, try to read + // more + let bytes_read = match self.inner.read(self.read_buf.as_mut()) { + Ok(br) => br, + Err(e) => return Some(Err(e.into())), + }; + if bytes_read == 0 { + // The stream terminated + return None; + } + self.growable_read_buf + .extend_from_slice(&self.read_buf[..bytes_read]); + } + } +} + +impl Codec for CodecImpl +where + E: Encoder, + D: Decoder + Default, + S: Read + Write, +{ + fn new(inner: S, read_buf_size: usize) -> Self { + Self { + inner, + _encoder: Default::default(), + _outgoing: Default::default(), + decoder: Default::default(), + _incoming: Default::default(), + growable_read_buf: BytesMut::new(), + growable_write_buf: BytesMut::new(), + read_buf: vec![0_u8; read_buf_size], + } + } + + fn send(&mut self, value: O) -> Result<()> { + E::encode(value, &mut self.growable_write_buf)?; + loop { + let bytes_written = self.inner.write(self.growable_write_buf.as_ref())?; + if bytes_written == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::WriteZero, + "failed to write to underlying writer", + ) + .into()); + } + self.growable_write_buf.advance(bytes_written); + if self.growable_write_buf.is_empty() { + return Ok(self.inner.flush()?); + } + } + } +} diff --git a/abci/src/codec/non_blocking.rs b/abci/src/codec/non_blocking.rs new file mode 100644 index 000000000..534a5a204 --- /dev/null +++ b/abci/src/codec/non_blocking.rs @@ -0,0 +1,161 @@ +//! `async` (futures)-compatible API for frame en/decoding. + +use crate::codec::{ + Decoder, Encoder, RequestDecoder, RequestEncoder, ResponseDecoder, ResponseEncoder, +}; +use crate::{Error, Result}; +use bytes::{Buf, BytesMut}; +use futures::task::{Context, Poll}; +use futures::{ready, AsyncRead, AsyncWrite, Sink, Stream}; +use pin_project::pin_project; +use std::marker::PhantomData; +use std::pin::Pin; +use tendermint::abci::request::Request; +use tendermint::abci::response::Response; + +/// An ABCI server decodes incoming [`tendermint::abci::request::Request`]s +/// and encodes outgoing [`tendermint::abci::response::Response`]s. The stream +/// `S` must implement [`futures::AsyncRead`] and [`futures::AsyncWrite`]. +pub type ServerCodec = CodecImpl; + +/// An ABCI client encodes outgoing [`tendermint::abci::request::Request`]s +/// and decodes incoming [`tendermint::abci::response::Response`]s. The stream +/// `S` must implement [`futures::AsyncRead`] and [`futures::AsyncWrite`]. +pub type ClientCodec = CodecImpl; + +/// A non-blocking codec that allows us to iterate over a [`futures::Stream`] +/// producing values of type `Result`. It also implements [`futures::Sink`], +/// allowing us to send values of type `O`. +pub trait Codec: Stream> + Sink + Send + Unpin { + /// Constructor. + fn new(inner: S, read_buf_size: usize) -> Self; +} + +/// Non-blocking adapter to convert an underlying I/O stream, which implements +/// [`futures::AsyncRead`] and [`futures::AsyncWrite`], into a +/// [`futures::Stream`] producing entities of type `I` and a [`futures::Sink`] +/// allowing for sending of entities of type `O`. +#[pin_project] +pub struct CodecImpl { + #[pin] + inner: S, + _encoder: PhantomData, + _output: PhantomData, + decoder: D, + _input: PhantomData, + growable_read_buf: BytesMut, + growable_write_buf: BytesMut, + read_buf: Vec, +} + +impl Stream for CodecImpl +where + D: Decoder, + S: AsyncRead, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let mut inner: Pin<&mut S> = this.inner; + let growable_read_buf: &mut BytesMut = this.growable_read_buf; + let read_buf: &mut Vec = this.read_buf; + let decoder: &mut D = this.decoder; + + loop { + // Try to decode another input value from our existing read buffer + // if we can without resorting to I/O. + match decoder.decode(growable_read_buf) { + Ok(res) => { + if let Some(val) = res { + return Poll::Ready(Some(Ok(val))); + } + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + + // If we don't have enough data to decode an input value, try to + // read some more from the underlying reader. + let bytes_read = match ready!(inner.as_mut().poll_read(cx, read_buf.as_mut())) { + Ok(br) => br, + Err(e) => return Poll::Ready(Some(Err(e.into()))), + }; + if bytes_read == 0 { + // The underlying stream terminated + return Poll::Ready(None); + } + growable_read_buf.extend_from_slice(&read_buf[..bytes_read]); + } + } +} + +impl Sink for CodecImpl +where + E: Encoder, + S: AsyncWrite, +{ + type Error = Error; + + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, item: O) -> Result<()> { + let write_buf: &mut BytesMut = self.project().growable_write_buf; + E::encode(item, write_buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let mut inner: Pin<&mut S> = this.inner; + let write_buf: &mut BytesMut = this.growable_write_buf; + + while !write_buf.is_empty() { + let bytes_written = match ready!(inner.as_mut().poll_write(cx, write_buf.as_ref())) { + Ok(bw) => bw, + Err(e) => return Poll::Ready(Err(e.into())), + }; + if bytes_written == 0 { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::WriteZero, + "failed to write to underlying stream", + ) + .into())); + } + write_buf.advance(bytes_written); + } + // Try to flush the underlying stream + ready!(inner.poll_flush(cx))?; + Poll::Ready(Ok(())) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.as_mut().poll_flush(cx))?; + let inner: Pin<&mut S> = self.project().inner; + ready!(inner.poll_close(cx))?; + + Poll::Ready(Ok(())) + } +} + +impl Codec for CodecImpl +where + E: Encoder + Send, + D: Decoder + Default + Send, + O: Send, + I: Send, + S: AsyncRead + AsyncWrite + Send + Unpin, +{ + fn new(inner: S, read_buf_size: usize) -> Self { + Self { + inner, + _encoder: Default::default(), + _output: Default::default(), + decoder: Default::default(), + _input: Default::default(), + growable_read_buf: BytesMut::new(), + growable_write_buf: BytesMut::new(), + read_buf: vec![0_u8; read_buf_size], + } + } +} diff --git a/abci/src/lib.rs b/abci/src/lib.rs new file mode 100644 index 000000000..a7586f268 --- /dev/null +++ b/abci/src/lib.rs @@ -0,0 +1,37 @@ +//! ABCI framework for building applications with Tendermint. + +mod application; +#[cfg(feature = "client")] +pub mod client; +pub mod codec; +mod result; +pub mod runtime; +pub mod server; + +// Example applications +#[cfg(feature = "echo-app")] +pub use application::echo::EchoApp; + +// Common exports +pub use application::Application; +pub use result::{Error, Result}; + +// Runtime-specific convenience exports +#[cfg(all(feature = "blocking", feature = "runtime-std"))] +pub use server::blocking::StdServerBuilder; +#[cfg(all(feature = "non-blocking", feature = "runtime-async-std"))] +pub use server::non_blocking::AsyncStdServerBuilder; +#[cfg(all(feature = "non-blocking", feature = "runtime-tokio"))] +pub use server::non_blocking::TokioServerBuilder; + +#[cfg(feature = "client")] +mod client_exports { + #[cfg(all(feature = "blocking", feature = "runtime-std"))] + pub use super::client::blocking::StdClientBuilder; + #[cfg(all(feature = "non-blocking", feature = "runtime-async-std"))] + pub use super::client::non_blocking::AsyncStdClientBuilder; + #[cfg(all(feature = "non-blocking", feature = "runtime-tokio"))] + pub use super::client::non_blocking::TokioClientBuilder; +} +#[cfg(feature = "client")] +pub use client_exports::*; diff --git a/abci/src/result.rs b/abci/src/result.rs new file mode 100644 index 000000000..745e87e1c --- /dev/null +++ b/abci/src/result.rs @@ -0,0 +1,31 @@ +//! Results and errors relating to ABCI client/server operations. + +use thiserror::Error; + +/// Convenience type for results produced by the ABCI crate. +pub type Result = std::result::Result; + +/// The various errors produced by the ABCI crate. +#[derive(Debug, Error)] +pub enum Error { + #[error("Tendermint error")] + TendermintError(#[from] tendermint::Error), + + #[error("protocol buffers error")] + Protobuf(#[from] tendermint_proto::Error), + + #[error("network I/O error")] + NetworkIo(#[from] std::io::Error), + + #[error("channel send error: {0}")] + ChannelSend(String), + + #[error("failed to receive message from channel: {0}")] + ChannelRecv(String), + + #[error("sending end of channel closed unexpectedly")] + ChannelSenderClosed, + + #[error("server stream terminated unexpectedly")] + ServerStreamTerminated, +} diff --git a/abci/src/runtime.rs b/abci/src/runtime.rs new file mode 100644 index 000000000..c1453fe1c --- /dev/null +++ b/abci/src/runtime.rs @@ -0,0 +1,6 @@ +//! Abstractions for facilitating runtime-independent code. + +#[cfg(feature = "blocking")] +pub mod blocking; +#[cfg(feature = "non-blocking")] +pub mod non_blocking; diff --git a/abci/src/runtime/blocking.rs b/abci/src/runtime/blocking.rs new file mode 100644 index 000000000..619fbd409 --- /dev/null +++ b/abci/src/runtime/blocking.rs @@ -0,0 +1,64 @@ +//! Blocking API for blocking runtimes. + +#[cfg(feature = "runtime-std")] +pub mod runtime_std; + +use crate::codec::blocking::Codec; +use crate::Result; +use std::io::{Read, Write}; +use tendermint::abci::{request, response}; + +/// Implemented by each blocking runtime we support. +pub trait Runtime: 'static { + type TcpStream: TcpStream; + type TcpListener: TcpListener; + + // Crate-specific types + type ServerCodec: Codec< + <::TcpStream as TcpStream>::Inner, + request::Request, + response::Response, + >; + type ClientCodec: Codec< + <::TcpStream as TcpStream>::Inner, + response::Response, + request::Request, + >; + + /// Spawn a task in a separate thread without caring about its result. + fn spawn_and_forget(task: T) + where + T: FnOnce() + Send + 'static, + T::Output: Send; +} + +pub trait TcpStream: Sized + Send { + type Inner: Read + Write; + + fn connect(addr: &str) -> Result; + + fn into_inner(self) -> Self::Inner; +} + +pub trait TcpListener: Sized { + /// Bind this listener to the given address. + fn bind(addr: &str) -> Result; + + /// Returns the string representation of this listener's local address. + fn local_addr(&self) -> Result; + + /// Attempt to accept an incoming connection. + /// + /// On success, returns a TCP stream and a string representation of its address. + fn accept(&self) -> Result<(T, String)>; +} + +/// The sending end of a channel. +pub trait Sender { + fn send(&self, value: T) -> Result<()>; +} + +/// The receiving end of a channel. +pub trait Receiver { + fn recv(&self) -> Result; +} diff --git a/abci/src/runtime/blocking/runtime_std.rs b/abci/src/runtime/blocking/runtime_std.rs new file mode 100644 index 000000000..277dbbf26 --- /dev/null +++ b/abci/src/runtime/blocking/runtime_std.rs @@ -0,0 +1,59 @@ +//! Rust standard library-based runtime. + +use crate::codec::blocking::{ClientCodec, ServerCodec}; +use crate::runtime::blocking::{Runtime, TcpListener, TcpStream}; +use crate::Result; + +/// Rust standard library-based runtime. +pub struct Std; + +impl Runtime for Std { + type TcpStream = StdTcpStream; + type TcpListener = StdTcpListener; + + type ServerCodec = ServerCodec; + type ClientCodec = ClientCodec; + + fn spawn_and_forget(task: T) + where + T: FnOnce() + Send + 'static, + T::Output: Send, + { + let _ = std::thread::spawn(move || { + task(); + }); + } +} + +/// Rust standard library TCP stream ([`std::net::TcpStream`]). +pub struct StdTcpStream(std::net::TcpStream); + +impl TcpStream for StdTcpStream { + type Inner = std::net::TcpStream; + + fn connect(addr: &str) -> Result { + Ok(Self(std::net::TcpStream::connect(addr)?)) + } + + fn into_inner(self) -> Self::Inner { + self.0 + } +} + +/// Rust standard library TCP listener ([`std::net::TcpListener`]). +pub struct StdTcpListener(std::net::TcpListener); + +impl TcpListener for StdTcpListener { + fn bind(addr: &str) -> Result { + Ok(Self(std::net::TcpListener::bind(addr)?)) + } + + fn local_addr(&self) -> Result { + Ok(self.0.local_addr()?.to_string()) + } + + fn accept(&self) -> Result<(StdTcpStream, String)> { + let (stream, addr) = self.0.accept()?; + Ok((StdTcpStream(stream), addr.to_string())) + } +} diff --git a/abci/src/runtime/non_blocking.rs b/abci/src/runtime/non_blocking.rs new file mode 100644 index 000000000..5c6c7cf14 --- /dev/null +++ b/abci/src/runtime/non_blocking.rs @@ -0,0 +1,79 @@ +//! Non-blocking runtime interface. + +#[cfg(feature = "runtime-async-std")] +pub mod runtime_async_std; +#[cfg(feature = "runtime-tokio")] +pub mod runtime_tokio; + +use crate::codec::non_blocking::Codec; +use crate::Result; +use async_trait::async_trait; +use futures::{AsyncRead, AsyncWrite, Future}; +use tendermint::abci::{request, response}; + +/// Implemented by each non-blocking runtime we support. +pub trait Runtime: 'static { + type TcpStream: TcpStream; + type TcpListener: TcpListener; + + // Crate-specific types + type ServerCodec: Codec< + <::TcpStream as TcpStream>::Inner, + request::Request, + response::Response, + >; + type ClientCodec: Codec< + <::TcpStream as TcpStream>::Inner, + response::Response, + request::Request, + >; + type ChannelNotify: ChannelNotify; + + /// Spawn an asynchronous task without caring about its result. + fn spawn_and_forget(task: T) + where + T: Future + Send + 'static, + T::Output: Send + 'static; +} + +#[async_trait] +pub trait TcpStream: Sized + Send { + type Inner: AsyncRead + AsyncWrite; + + async fn connect(addr: &str) -> Result; + + fn into_inner(self) -> Self::Inner; +} + +#[async_trait] +pub trait TcpListener: Sized { + /// Bind this listener to the given address. + async fn bind(addr: &str) -> Result; + + /// Returns the string representation of this listener's local address. + fn local_addr(&self) -> Result; + + /// Attempt to accept an incoming connection. + async fn accept(&self) -> Result<(T, String)>; +} + +/// The sending end of a channel. +#[async_trait] +pub trait Sender { + async fn send(&self, value: T) -> Result<()>; +} + +/// The receiving end of a channel. +#[async_trait] +pub trait Receiver { + async fn recv(&mut self) -> Result; +} + +/// A simple notification channel. +pub trait ChannelNotify { + type Sender: Sender<()>; + type Receiver: Receiver<()>; + + /// Construct an unbounded channel. + fn unbounded() -> (Self::Sender, Self::Receiver); +} diff --git a/abci/src/runtime/non_blocking/runtime_async_std.rs b/abci/src/runtime/non_blocking/runtime_async_std.rs new file mode 100644 index 000000000..86087720c --- /dev/null +++ b/abci/src/runtime/non_blocking/runtime_async_std.rs @@ -0,0 +1,97 @@ +//! `async-std`-based non-blocking runtime. + +use crate::codec::non_blocking::{ClientCodec, ServerCodec}; +use crate::runtime::non_blocking::{ + ChannelNotify, Receiver, Runtime, Sender, TcpListener, TcpStream, +}; +use crate::{Error, Result}; +use async_trait::async_trait; +use futures::Future; + +pub struct AsyncStd; + +impl Runtime for AsyncStd { + type TcpStream = AsyncStdTcpStream; + type TcpListener = AsyncStdTcpListener; + + type ServerCodec = ServerCodec; + type ClientCodec = ClientCodec; + type ChannelNotify = AsyncStdChannelNotify; + + fn spawn_and_forget(task: T) + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let _ = async_std::task::spawn(task); + } +} + +pub struct AsyncStdTcpStream(async_std::net::TcpStream); + +#[async_trait] +impl TcpStream for AsyncStdTcpStream { + type Inner = async_std::net::TcpStream; + + async fn connect(addr: &str) -> Result { + Ok(Self(async_std::net::TcpStream::connect(addr).await?)) + } + + fn into_inner(self) -> Self::Inner { + self.0 + } +} + +pub struct AsyncStdTcpListener(async_std::net::TcpListener); + +#[async_trait] +impl TcpListener for AsyncStdTcpListener { + async fn bind(addr: &str) -> Result { + Ok(Self(async_std::net::TcpListener::bind(addr).await?)) + } + + fn local_addr(&self) -> Result { + Ok(self.0.local_addr()?.to_string()) + } + + async fn accept(&self) -> Result<(AsyncStdTcpStream, String)> { + let (stream, addr) = self.0.accept().await?; + Ok((AsyncStdTcpStream(stream), addr.to_string())) + } +} + +pub struct AsyncStdSender(async_channel::Sender); + +#[async_trait] +impl Sender for AsyncStdSender { + async fn send(&self, value: T) -> Result<()> { + self.0 + .send(value) + .await + .map_err(|e| Error::ChannelSend(e.to_string())) + } +} + +pub struct AsyncStdReceiver(async_channel::Receiver); + +#[async_trait] +impl Receiver for AsyncStdReceiver { + async fn recv(&mut self) -> Result { + self.0 + .recv() + .await + .map_err(|e| Error::ChannelRecv(e.to_string())) + } +} + +pub struct AsyncStdChannelNotify; + +impl ChannelNotify for AsyncStdChannelNotify { + type Sender = AsyncStdSender<()>; + type Receiver = AsyncStdReceiver<()>; + + fn unbounded() -> (Self::Sender, Self::Receiver) { + let (tx, rx) = async_channel::unbounded(); + (AsyncStdSender(tx), AsyncStdReceiver(rx)) + } +} diff --git a/abci/src/runtime/non_blocking/runtime_tokio.rs b/abci/src/runtime/non_blocking/runtime_tokio.rs new file mode 100644 index 000000000..11b2e156a --- /dev/null +++ b/abci/src/runtime/non_blocking/runtime_tokio.rs @@ -0,0 +1,135 @@ +//! Tokio-based non-blocking runtime. + +use crate::codec::non_blocking::{ClientCodec, ServerCodec}; +use crate::runtime::non_blocking::{ + ChannelNotify, Receiver, Runtime, Sender, TcpListener, TcpStream, +}; +use crate::{Error, Result}; +use async_trait::async_trait; +use futures::task::{Context, Poll}; +use futures::{ready, Future}; +use pin_project::pin_project; +use std::pin::Pin; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +pub struct Tokio; + +impl Runtime for Tokio { + type TcpStream = TokioTcpStream; + type TcpListener = TokioTcpListener; + + type ServerCodec = ServerCodec; + type ClientCodec = ClientCodec; + type ChannelNotify = TokioChannelNotify; + + fn spawn_and_forget(task: T) + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let _ = tokio::spawn(task); + } +} + +/// A wrapper for [`tokio::net::TcpStream`] that implements +/// [`futures::AsyncRead`] and [`futures::AsyncWrite`] to ensure compatibility +/// with our non-blocking (futures-based) interfaces. +#[pin_project] +pub struct FuturesTcpStream(#[pin] tokio::net::TcpStream); + +impl futures::AsyncRead for FuturesTcpStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let mut buf = ReadBuf::new(buf); + ready!(self.project().0.poll_read(cx, &mut buf)?); + Poll::Ready(Ok(buf.filled().len())) + } +} + +impl futures::AsyncWrite for FuturesTcpStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().0.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_shutdown(cx) + } +} + +pub struct TokioTcpStream(FuturesTcpStream); + +#[async_trait] +impl TcpStream for TokioTcpStream { + type Inner = FuturesTcpStream; + + async fn connect(addr: &str) -> Result { + Ok(Self(FuturesTcpStream( + tokio::net::TcpStream::connect(addr).await?, + ))) + } + + fn into_inner(self) -> Self::Inner { + self.0 + } +} + +pub struct TokioTcpListener(tokio::net::TcpListener); + +#[async_trait] +impl TcpListener for TokioTcpListener { + async fn bind(addr: &str) -> Result { + Ok(Self(tokio::net::TcpListener::bind(addr).await?)) + } + + fn local_addr(&self) -> Result { + Ok(self.0.local_addr()?.to_string()) + } + + async fn accept(&self) -> Result<(TokioTcpStream, String)> { + let (stream, addr) = self.0.accept().await?; + Ok((TokioTcpStream(FuturesTcpStream(stream)), addr.to_string())) + } +} + +pub struct TokioChannelNotify; + +impl ChannelNotify for TokioChannelNotify { + type Sender = TokioSender<()>; + type Receiver = TokioReceiver<()>; + + fn unbounded() -> (Self::Sender, Self::Receiver) { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + (TokioSender(tx), TokioReceiver(rx)) + } +} + +pub struct TokioSender(tokio::sync::mpsc::UnboundedSender); + +#[async_trait] +impl Sender for TokioSender { + async fn send(&self, value: T) -> Result<()> { + self.0 + .send(value) + .map_err(|e| Error::ChannelSend(e.to_string())) + } +} + +pub struct TokioReceiver(tokio::sync::mpsc::UnboundedReceiver); + +#[async_trait] +impl Receiver for TokioReceiver { + async fn recv(&mut self) -> Result { + self.0.recv().await.ok_or(Error::ChannelSenderClosed) + } +} diff --git a/abci/src/server.rs b/abci/src/server.rs new file mode 100644 index 000000000..617c9cf06 --- /dev/null +++ b/abci/src/server.rs @@ -0,0 +1,10 @@ +//! ABCI servers. + +#[cfg(feature = "blocking")] +pub mod blocking; +#[cfg(feature = "non-blocking")] +pub mod non_blocking; + +/// The default read buffer size for the server when reading incoming requests +/// from client connections. +pub const DEFAULT_SERVER_READ_BUF_SIZE: usize = 1024 * 1024; diff --git a/abci/src/server/blocking.rs b/abci/src/server/blocking.rs new file mode 100644 index 000000000..89577dde6 --- /dev/null +++ b/abci/src/server/blocking.rs @@ -0,0 +1,127 @@ +//! Blocking ABCI server. + +use crate::codec::blocking::Codec; +use crate::runtime::blocking::{Runtime, TcpListener, TcpStream}; +use crate::server::DEFAULT_SERVER_READ_BUF_SIZE; +use crate::{Application, Result}; +use log::{debug, error, info}; +use tendermint::abci::request; + +/// Runtime-dependent blocking ABCI server. +/// +/// Blocking servers, unfortunately, cannot be terminated gracefully since they +/// block on their listener. +pub struct Server { + app: App, + listener: Rt::TcpListener, + local_addr: String, + read_buf_size: usize, +} + +impl Server +where + App: Application, + Rt: Runtime, +{ + /// Start listening for incoming connections. + pub fn listen(self) -> Result<()> { + loop { + match self.listener.accept() { + Ok(r) => { + let (stream, addr) = r; + info!("Incoming connection from: {}", addr.to_string()); + self.spawn_client_handler(stream); + } + Err(e) => { + error!("Failed to accept incoming connection: {:?}", e); + } + } + } + } + + fn spawn_client_handler(&self, stream: Rt::TcpStream) { + let app_clone = self.app.clone(); + let read_buf_size = self.read_buf_size; + Rt::spawn_and_forget(move || Self::handle_client(stream, app_clone, read_buf_size)); + } + + fn handle_client(stream: Rt::TcpStream, app: App, read_buf_size: usize) { + let mut codec = Rt::ServerCodec::new(stream.into_inner(), read_buf_size); + loop { + let req: request::Request = match codec.next() { + Some(result) => match result { + Ok(r) => r, + Err(e) => { + error!("Failed to read request from client: {}", e); + return; + } + }, + None => { + info!("Client terminated connection"); + return; + } + }; + debug!("Got incoming request from client: {:?}", req); + let res = app.handle(req); + debug!("Sending outgoing response: {:?}", res); + if let Err(e) = codec.send(res) { + error!("Failed to write outgoing response to client: {:?}", e); + return; + } + } + } + + /// Get the local address for the server, once bound. + pub fn local_addr(&self) -> String { + self.local_addr.clone() + } +} + +/// Allows for construction and configuration of a blocking ABCI server. +pub struct ServerBuilder { + read_buf_size: usize, + _runtime: std::marker::PhantomData, +} + +impl ServerBuilder { + /// Constructor for a server builder that allows for customization of the + /// read buffer size. + pub fn new(read_buf_size: usize) -> Self { + Self { + read_buf_size, + _runtime: Default::default(), + } + } + + /// Constructor for a blocking ABCI server. + /// + /// Attempts to bind the specified application to the given network + /// address. + pub fn bind(self, addr: S, app: App) -> Result> + where + S: AsRef, + App: Application, + { + let listener = Rt::TcpListener::bind(addr.as_ref())?; + let local_addr = listener.local_addr()?; + Ok(Server { + app, + listener, + local_addr, + read_buf_size: self.read_buf_size, + }) + } +} + +impl Default for ServerBuilder { + fn default() -> Self { + Self { + read_buf_size: DEFAULT_SERVER_READ_BUF_SIZE, + _runtime: Default::default(), + } + } +} + +#[cfg(feature = "runtime-std")] +/// Convenience export for when using Rust's standard library as your runtime. +pub type StdServerBuilder = ServerBuilder; diff --git a/abci/src/server/non_blocking.rs b/abci/src/server/non_blocking.rs new file mode 100644 index 000000000..4ab0ac98c --- /dev/null +++ b/abci/src/server/non_blocking.rs @@ -0,0 +1,152 @@ +//! Non-blocking ABCI server. + +use crate::codec::non_blocking::Codec; +use crate::runtime::non_blocking::{ChannelNotify, Receiver, Runtime, TcpListener, TcpStream}; +use crate::server::DEFAULT_SERVER_READ_BUF_SIZE; +use crate::{Application, Result}; +use futures::{SinkExt, StreamExt}; +use log::{debug, error, info}; +use tendermint::abci::request; + +/// Non-blocking ABCI server for a specific application and runtime. +pub struct Server { + app: App, + listener: Rt::TcpListener, + local_addr: String, + term_rx: ::Receiver, + read_buf_size: usize, +} + +impl Server +where + App: Application, + Rt: Runtime, +{ + /// Start listening for incoming connections. + pub async fn listen(mut self) -> Result<()> { + use futures::FutureExt; + + loop { + futures::select! { + result = self.listener.accept().fuse() => match result { + Ok(r) => { + let (stream, addr) = r; + info!("Incoming connection from: {}", addr.to_string()); + self.spawn_client_handler(stream).await; + }, + Err(e) => { + error!("Failed to accept incoming connection: {:?}", e); + } + }, + _ = self.term_rx.recv().fuse() => { + info!("Server terminated"); + return Ok(()) + } + } + } + } + + async fn spawn_client_handler(&self, stream: Rt::TcpStream) { + Rt::spawn_and_forget(Self::handle_client( + stream, + self.app.clone(), + self.read_buf_size, + )); + } + + async fn handle_client(stream: Rt::TcpStream, app: App, read_buf_size: usize) { + let mut codec: Rt::ServerCodec = Rt::ServerCodec::new(stream.into_inner(), read_buf_size); + loop { + let req: request::Request = match codec.next().await { + Some(result) => match result { + Ok(r) => r, + Err(e) => { + error!("Failed to read request from client: {}", e); + return; + } + }, + None => { + info!("Client terminated connection"); + return; + } + }; + debug!("Got incoming request from client: {:?}", req); + let res = app.handle(req); + debug!("Sending outgoing response: {:?}", res); + if let Err(e) = codec.send(res).await { + error!("Failed to write outgoing response to client: {}", e); + return; + } + } + } + + /// Get the local address for the server, once bound. + pub fn local_addr(&self) -> String { + self.local_addr.clone() + } +} + +/// Allows for construction and configuration of a non-blocking ABCI server. +pub struct ServerBuilder { + read_buf_size: usize, + _runtime: std::marker::PhantomData, +} + +impl ServerBuilder { + /// Constructor allowing for customization of server parameters. + pub fn new(read_buf_size: usize) -> Self { + Self { + read_buf_size, + _runtime: Default::default(), + } + } + + /// Bind our ABCI application server to the given address. + /// + /// On success, returns our server and the sending end of a channel we can + /// use to terminate the server while it's listening. + pub async fn bind( + self, + addr: S, + app: App, + ) -> Result<( + Server, + ::Sender, + )> + where + S: AsRef, + App: Application, + { + let listener = Rt::TcpListener::bind(addr.as_ref()).await?; + let (term_tx, term_rx) = ::unbounded(); + let local_addr = listener.local_addr()?; + Ok(( + Server { + app, + listener, + local_addr, + term_rx, + read_buf_size: self.read_buf_size, + }, + term_tx, + )) + } +} + +impl Default for ServerBuilder { + fn default() -> Self { + Self { + read_buf_size: DEFAULT_SERVER_READ_BUF_SIZE, + _runtime: Default::default(), + } + } +} + +#[cfg(feature = "runtime-tokio")] +/// Convenience export for when using Tokio's runtime. +pub type TokioServerBuilder = ServerBuilder; + +#[cfg(feature = "runtime-async-std")] +/// Convenience export for when using `async-std`'s runtime. +pub type AsyncStdServerBuilder = + ServerBuilder; diff --git a/abci/tests/async_std.rs b/abci/tests/async_std.rs new file mode 100644 index 000000000..e89d2cfff --- /dev/null +++ b/abci/tests/async_std.rs @@ -0,0 +1,40 @@ +//! `async-std`-based ABCI client/server integration tests. + +#[cfg(all( + feature = "non-blocking", + feature = "runtime-async-std", + feature = "client", + feature = "echo-app" +))] +mod async_std_integration { + use tendermint::abci::request; + use tendermint_abci::runtime::non_blocking::Sender; + use tendermint_abci::{AsyncStdClientBuilder, AsyncStdServerBuilder, EchoApp}; + + #[async_std::test] + async fn echo() { + let requests = (0..5) + .map(|r| request::Echo { + message: format!("echo {}", r), + }) + .collect::>(); + let (server, term_tx) = AsyncStdServerBuilder::default() + .bind("127.0.0.1:0", EchoApp::default()) + .await + .unwrap(); + let server_addr = server.local_addr(); + let server_handle = async_std::task::spawn(async move { server.listen().await }); + + let mut client = AsyncStdClientBuilder::default() + .connect(server_addr) + .await + .unwrap(); + for req in requests { + let res = client.echo(req.clone()).await.unwrap(); + assert_eq!(res.message, req.message); + } + + term_tx.send(()).await.unwrap(); + server_handle.await.unwrap(); + } +} diff --git a/abci/tests/std.rs b/abci/tests/std.rs new file mode 100644 index 000000000..6efd2bfd4 --- /dev/null +++ b/abci/tests/std.rs @@ -0,0 +1,32 @@ +//! Rust standard library-based ABCI client/server integration tests. + +#[cfg(all( + feature = "blocking", + feature = "runtime-std", + feature = "client", + feature = "echo-app" +))] +mod std_integration { + use tendermint::abci::request; + use tendermint_abci::{EchoApp, StdClientBuilder, StdServerBuilder}; + + #[test] + fn echo() { + let requests = (0..5) + .map(|r| request::Echo { + message: format!("echo {}", r), + }) + .collect::>(); + let server = StdServerBuilder::default() + .bind("127.0.0.1:0", EchoApp::default()) + .unwrap(); + let server_addr = server.local_addr(); + let _ = std::thread::spawn(move || server.listen()); + + let mut client = StdClientBuilder::default().connect(server_addr).unwrap(); + for req in requests { + let res = client.echo(req.clone()).unwrap(); + assert_eq!(res.message, req.message); + } + } +} diff --git a/abci/tests/tokio.rs b/abci/tests/tokio.rs new file mode 100644 index 000000000..9e692dff0 --- /dev/null +++ b/abci/tests/tokio.rs @@ -0,0 +1,40 @@ +//! Tokio-based ABCI client/server integration tests. + +#[cfg(all( + feature = "non-blocking", + feature = "runtime-tokio", + feature = "client", + feature = "echo-app" +))] +mod tokio_integration { + use tendermint::abci::request; + use tendermint_abci::runtime::non_blocking::Sender; + use tendermint_abci::{EchoApp, TokioClientBuilder, TokioServerBuilder}; + + #[tokio::test] + async fn echo() { + let requests = (0..5) + .map(|r| request::Echo { + message: format!("echo {}", r), + }) + .collect::>(); + let (server, term_tx) = TokioServerBuilder::default() + .bind("127.0.0.1:0", EchoApp::default()) + .await + .unwrap(); + let server_addr = server.local_addr(); + let server_handle = tokio::spawn(async move { server.listen().await }); + + let mut client = TokioClientBuilder::default() + .connect(server_addr) + .await + .unwrap(); + for req in requests { + let res = client.echo(req.clone()).await.unwrap(); + assert_eq!(res.message, req.message); + } + + term_tx.send(()).await.unwrap(); + server_handle.await.unwrap().unwrap(); + } +} diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 8c180b763..768327f5f 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -22,6 +22,7 @@ use anomaly::BoxError; use bytes::{Buf, BufMut}; pub use error::{Error, Kind}; use prost::encoding::encoded_len_varint; +pub use prost::encoding::{decode_varint, encode_varint}; use prost::Message; use std::convert::{TryFrom, TryInto}; diff --git a/tendermint/src/abci.rs b/tendermint/src/abci.rs index 9a0f84761..3bf92ab77 100644 --- a/tendermint/src/abci.rs +++ b/tendermint/src/abci.rs @@ -13,6 +13,8 @@ mod gas; mod info; mod log; mod path; +pub mod request; +pub mod response; pub mod responses; pub mod tag; pub mod transaction; diff --git a/tendermint/src/abci/request.rs b/tendermint/src/abci/request.rs new file mode 100644 index 000000000..e557eb9f7 --- /dev/null +++ b/tendermint/src/abci/request.rs @@ -0,0 +1,68 @@ +//! ABCI requests. + +mod echo; +mod info; + +pub use echo::Echo; +pub use info::Info; + +use crate::abci::response::ResponseInner; +use crate::{Error, Kind}; +use std::convert::{TryFrom, TryInto}; +use tendermint_proto::abci::request::Value; +use tendermint_proto::abci::Request as RawRequest; +use tendermint_proto::Protobuf; + +/// ABCI request wrapper. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Request { + /// Request that the ABCI server echo a specific message back to the client. + Echo(Echo), + /// Return application info. + Info(Info), +} + +impl Protobuf for Request {} + +impl TryFrom for Request { + type Error = Error; + + fn try_from(raw: RawRequest) -> Result { + let value = raw.value.ok_or(Kind::MissingAbciRequestValue)?; + Ok(match value { + Value::Echo(raw_req) => Self::Echo(raw_req.try_into()?), + Value::Info(raw_req) => Self::Info(raw_req.try_into()?), + _ => unimplemented!(), + // Value::Flush(_) => {} + // Value::SetOption(_) => {} + // Value::InitChain(_) => {} + // Value::Query(_) => {} + // Value::BeginBlock(_) => {} + // Value::CheckTx(_) => {} + // Value::DeliverTx(_) => {} + // Value::EndBlock(_) => {} + // Value::Commit(_) => {} + // Value::ListSnapshots(_) => {} + // Value::OfferSnapshot(_) => {} + // Value::LoadSnapshotChunk(_) => {} + // Value::ApplySnapshotChunk(_) => {} + }) + } +} + +impl From for RawRequest { + fn from(request: Request) -> Self { + Self { + value: Some(match request { + Request::Echo(req) => Value::Echo(req.into()), + Request::Info(req) => Value::Info(req.into()), + }), + } + } +} + +/// The inner type of a [`Request`]. +pub trait RequestInner: TryFrom + Into + Send { + /// The corresponding response type for this request. + type Response: ResponseInner; +} diff --git a/tendermint/src/abci/request/echo.rs b/tendermint/src/abci/request/echo.rs new file mode 100644 index 000000000..cf021107f --- /dev/null +++ b/tendermint/src/abci/request/echo.rs @@ -0,0 +1,56 @@ +//! ABCI echo request. + +use crate::abci::request::{Request, RequestInner}; +use crate::abci::response; +use crate::{Error, Kind}; +use std::convert::TryFrom; +use tendermint_proto::abci::RequestEcho; +use tendermint_proto::Protobuf; + +/// Request that the ABCI server echo a message back the client. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Echo { + /// The message to echo back the client. + pub message: String, +} + +impl Protobuf for Echo {} + +impl TryFrom for Echo { + type Error = Error; + + fn try_from(raw: RequestEcho) -> Result { + Ok(Self { + message: raw.message, + }) + } +} + +impl From for RequestEcho { + fn from(request: Echo) -> Self { + Self { + message: request.message, + } + } +} + +impl RequestInner for Echo { + type Response = response::Echo; +} + +impl TryFrom for Echo { + type Error = Error; + + fn try_from(value: Request) -> Result { + match value { + Request::Echo(r) => Ok(r), + _ => Err(Kind::UnexpectedAbciRequestType("Echo".to_owned(), value).into()), + } + } +} + +impl From for Request { + fn from(req: Echo) -> Self { + Self::Echo(req) + } +} diff --git a/tendermint/src/abci/request/info.rs b/tendermint/src/abci/request/info.rs new file mode 100644 index 000000000..c179aab7d --- /dev/null +++ b/tendermint/src/abci/request/info.rs @@ -0,0 +1,79 @@ +//! ABCI info request. + +use crate::abci::request::{Request, RequestInner}; +use crate::abci::response; +use crate::{Error, Kind}; +use std::convert::TryFrom; +use tendermint_proto::abci::RequestInfo; +use tendermint_proto::Protobuf; + +/// Allows a Tendermint node to provide information about itself to the ABCI +/// server, in exchange for information about the server. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Info { + /// Tendermint software semantic version. + pub version: String, + /// Tendermint block protocol version. + pub block_version: u64, + /// Tendermint P2P protocol version. + pub p2p_version: u64, + /// Tendermint ABCI version. + pub abci_version: u64, +} + +impl Default for Info { + fn default() -> Self { + Self { + version: "".to_string(), + block_version: 0, + p2p_version: 0, + abci_version: 0, + } + } +} + +impl Protobuf for Info {} + +impl TryFrom for Info { + type Error = Error; + + fn try_from(value: RequestInfo) -> Result { + Ok(Self { + version: value.version, + block_version: value.block_version, + p2p_version: value.p2p_version, + abci_version: 0, + }) + } +} + +impl From for RequestInfo { + fn from(value: Info) -> Self { + Self { + version: value.version, + block_version: value.block_version, + p2p_version: value.p2p_version, + } + } +} + +impl RequestInner for Info { + type Response = response::Info; +} + +impl TryFrom for Info { + type Error = Error; + + fn try_from(value: Request) -> Result { + match value { + Request::Info(r) => Ok(r), + _ => Err(Kind::UnexpectedAbciRequestType("Info".to_owned(), value).into()), + } + } +} + +impl From for Request { + fn from(value: Info) -> Self { + Self::Info(value) + } +} diff --git a/tendermint/src/abci/response.rs b/tendermint/src/abci/response.rs new file mode 100644 index 000000000..994a38b84 --- /dev/null +++ b/tendermint/src/abci/response.rs @@ -0,0 +1,64 @@ +//! ABCI responses. + +mod echo; +mod info; + +pub use echo::Echo; +pub use info::Info; + +use crate::{Error, Kind}; +use std::convert::{TryFrom, TryInto}; +use tendermint_proto::abci::response::Value; +use tendermint_proto::abci::Response as RawResponse; +use tendermint_proto::Protobuf; + +/// ABCI response wrapper. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Response { + /// Echo response. + Echo(Echo), + /// Application info. + Info(Info), +} + +impl Protobuf for Response {} + +impl TryFrom for Response { + type Error = Error; + + fn try_from(raw: RawResponse) -> Result { + let value = raw.value.ok_or(Kind::MissingAbciResponseValue)?; + Ok(match value { + Value::Echo(raw_res) => Self::Echo(raw_res.try_into()?), + Value::Info(raw_res) => Self::Info(raw_res.try_into()?), + _ => unimplemented!(), + // Value::Flush(_) => {} + // Value::SetOption(_) => {} + // Value::InitChain(_) => {} + // Value::Query(_) => {} + // Value::BeginBlock(_) => {} + // Value::CheckTx(_) => {} + // Value::DeliverTx(_) => {} + // Value::EndBlock(_) => {} + // Value::Commit(_) => {} + // Value::ListSnapshots(_) => {} + // Value::OfferSnapshot(_) => {} + // Value::LoadSnapshotChunk(_) => {} + // Value::ApplySnapshotChunk(_) => {} + }) + } +} + +impl From for RawResponse { + fn from(request: Response) -> Self { + Self { + value: Some(match request { + Response::Echo(res) => Value::Echo(res.into()), + Response::Info(res) => Value::Info(res.into()), + }), + } + } +} + +/// The inner type of a [`Response`]. +pub trait ResponseInner: TryFrom + Into + Send {} diff --git a/tendermint/src/abci/response/echo.rs b/tendermint/src/abci/response/echo.rs new file mode 100644 index 000000000..42787c820 --- /dev/null +++ b/tendermint/src/abci/response/echo.rs @@ -0,0 +1,53 @@ +//! ABCI echo response. + +use crate::abci::response::{Response, ResponseInner}; +use crate::{Error, Kind}; +use std::convert::TryFrom; +use tendermint_proto::abci::ResponseEcho; +use tendermint_proto::Protobuf; + +/// ABCI echo response. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Echo { + /// The message to be echoed back to the client. + pub message: String, +} + +impl Protobuf for Echo {} + +impl TryFrom for Echo { + type Error = Error; + + fn try_from(raw: ResponseEcho) -> Result { + Ok(Self { + message: raw.message, + }) + } +} + +impl From for ResponseEcho { + fn from(response: Echo) -> Self { + Self { + message: response.message, + } + } +} + +impl ResponseInner for Echo {} + +impl TryFrom for Echo { + type Error = Error; + + fn try_from(value: Response) -> Result { + match value { + Response::Echo(res) => Ok(res), + _ => Err(Kind::UnexpectedAbciResponseType("Echo".to_owned(), value).into()), + } + } +} + +impl From for Response { + fn from(res: Echo) -> Self { + Self::Echo(res) + } +} diff --git a/tendermint/src/abci/response/info.rs b/tendermint/src/abci/response/info.rs new file mode 100644 index 000000000..a1fbe8a02 --- /dev/null +++ b/tendermint/src/abci/response/info.rs @@ -0,0 +1,82 @@ +//! ABCI info response. + +use crate::abci::response::{Response, ResponseInner}; +use crate::{Error, Kind}; +use std::convert::TryFrom; +use tendermint_proto::abci::ResponseInfo; +use tendermint_proto::Protobuf; + +/// Allows the ABCI app to provide information about itself back to the +/// Tendermint node. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Info { + /// Arbitrary (application-specific) information. + pub data: String, + /// Application software semantic version. + pub version: String, + /// Application protocol version. + pub app_version: u64, + /// Latest block for which the application has called Commit. + pub last_block_height: i64, + /// Latest result of Commit. + pub last_block_app_hash: Vec, +} + +impl Default for Info { + fn default() -> Self { + Self { + data: "".to_string(), + version: "".to_string(), + app_version: 0, + last_block_height: 0, + last_block_app_hash: vec![], + } + } +} + +impl Protobuf for Info {} + +impl TryFrom for Info { + type Error = Error; + + fn try_from(value: ResponseInfo) -> Result { + Ok(Self { + data: value.data, + version: value.version, + app_version: value.app_version, + last_block_height: value.last_block_height, + last_block_app_hash: value.last_block_app_hash, + }) + } +} + +impl From for ResponseInfo { + fn from(value: Info) -> Self { + Self { + data: value.data, + version: value.version, + app_version: value.app_version, + last_block_height: value.last_block_height, + last_block_app_hash: value.last_block_app_hash, + } + } +} + +impl ResponseInner for Info {} + +impl TryFrom for Info { + type Error = Error; + + fn try_from(value: Response) -> Result { + match value { + Response::Info(res) => Ok(res), + _ => Err(Kind::UnexpectedAbciResponseType("Info".to_owned(), value).into()), + } + } +} + +impl From for Response { + fn from(value: Info) -> Self { + Self::Info(value) + } +} diff --git a/tendermint/src/error.rs b/tendermint/src/error.rs index f5c678a48..a061c9cb2 100644 --- a/tendermint/src/error.rs +++ b/tendermint/src/error.rs @@ -196,6 +196,22 @@ pub enum Kind { /// Proposer not found in validator set #[error("proposer with address '{}' not found in validator set", _0)] ProposerNotFound(account::Id), + + /// ABCI request is missing its inner value property. + #[error("malformed ABCI request: request is missing its inner value")] + MissingAbciRequestValue, + + /// ABCI response is missing its inner value property. + #[error("malformed ABCI response: response is missing its inner value")] + MissingAbciResponseValue, + + /// The ABCI request type doesn't match our expectations. + #[error("expected ABCI request type {0}, but got {1:?}")] + UnexpectedAbciRequestType(String, crate::abci::request::Request), + + /// The ABCI response type doesn't match our expectations. + #[error("expected ABCI response type {0}, but got {1:?}")] + UnexpectedAbciResponseType(String, crate::abci::response::Response), } impl Kind {