Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ABCI crate #750

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b910ffd
Update proto and tendermint deps to make way for abci crate
thanethomson Dec 15, 2020
db3e7ea
Add ABCI echo request
thanethomson Dec 15, 2020
241ddac
Re-export encode_varint and decode_varint for use in ABCI codec
thanethomson Dec 15, 2020
5c347b7
Add rudimentary Tokio-based ABCI echo server
thanethomson Dec 15, 2020
362f201
Update p2p prost dependency to be compatible with tendermint-proto
thanethomson Dec 16, 2020
03e25a5
Merge latest changes from master
thanethomson Dec 17, 2020
366c0bb
Merge latest changes from master
thanethomson Dec 23, 2020
dd9768d
Bump to v0.17.0
thanethomson Dec 23, 2020
20a8e41
Merge latest changes from master
thanethomson Jan 11, 2021
21ca9ae
Bump Tendermint crate dependency versions to v0.17.1
thanethomson Jan 11, 2021
dc6cc1a
Facilitate conversion of ABCI request/response inner types to the wra…
thanethomson Jan 12, 2021
37650c7
Refactor to clean up interface and add client functionality
thanethomson Jan 12, 2021
1630370
Allow developer to disable client code
thanethomson Jan 12, 2021
28d0f71
Add async-std client/server
thanethomson Jan 12, 2021
3753178
Test multiple echo requests/responses with client/server
thanethomson Jan 12, 2021
5a44bd1
Add ABCI info request/response
thanethomson Jan 13, 2021
febdf3e
Add support for ABCI info request/response
thanethomson Jan 13, 2021
cba4c45
Allow for direct construction of ABCI structs
thanethomson Jan 21, 2021
80773b5
Rename runtime-related features
thanethomson Jan 21, 2021
4942a81
Refactor over generic async runtime
thanethomson Jan 23, 2021
4383f26
Restructure errors and remove redundant ones
thanethomson Jan 23, 2021
6b6d7d5
Make async feature compulsory for Tokio/async-std-related features
thanethomson Jan 23, 2021
814ebd3
Reorganize runtime-specific exports
thanethomson Jan 23, 2021
104bae7
Structure appropriately for simple std/multi-threaded runtime
thanethomson Jan 24, 2021
4846ada
Merge from master and bump Tokio/prost/bytes versions
thanethomson Jan 24, 2021
1d86246
Ignore tests if async-std runtime is not enabled
thanethomson Jan 24, 2021
f3adbbe
Add rudimentary README
thanethomson Jan 24, 2021
1713d47
Fix comment in integration test
thanethomson Jan 24, 2021
e7b2f9c
Separate out runtime-dependent from runtime-independent code
thanethomson Jan 25, 2021
c67c718
Add cargo make Makefile
thanethomson Jan 25, 2021
d7c1540
Add crate description
thanethomson Jan 25, 2021
746fdbf
Add comment about which interface is exposed through the async featur…
thanethomson Jan 25, 2021
beddc04
Refactor to allow for simultaneous inclusion of both blocking and non…
thanethomson Jan 28, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]

members = [
"abci",
"light-client",
"light-node",
"p2p",
Expand Down
31 changes: 31 additions & 0 deletions abci/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "tendermint-abci"
version = "0.17.1"
authors = ["Thane Thomson <[email protected]>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
client = []
echo-app = []
async = [ "async-trait", "futures", "pin-project" ]
runtime-async-std = [ "async-channel", "async-std" ]
runtime-std = []
runtime-tokio = [ "tokio", "tokio-util" ]

[dependencies]
bytes = "0.6"
eyre = "0.6"
log = "0.4"
tendermint = { version = "0.17.1", path = "../tendermint" }
tendermint-proto = { version = "0.17.1", path = "../proto" }
thiserror = "1.0"

async-trait = { version = "0.1", optional = true }
async-channel = { version = "1.5", optional = true }
async-std = { version = "1.8", features = [ "attributes" ], optional = true }
futures = { version = "0.3", optional = true }
pin-project = { version = "1.0", optional = true }
tokio = { version = "0.3", features = [ "macros", "net", "rt", "sync" ], optional = true }
tokio-util = { version = "0.5", features = [ "codec" ], optional = true }
30 changes: 30 additions & 0 deletions abci/src/application.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//! ABCI application-related types.

#[cfg(feature = "echo-app")]
pub mod echo;

use tendermint::abci::{request, response};

/// ABCI server application interface.
pub trait Application: Send + Clone + 'static {
/// Request that the ABCI server echo back the same message sent to it.
fn echo(&self, req: request::Echo) -> response::Echo {
response::Echo {
message: req.message,
}
}

/// Receive information about the Tendermint node and respond with
/// information about the ABCI application.
fn info(&self, _req: request::Info) -> response::Info {
Default::default()
}

/// Generic handler for mapping incoming requests to responses.
fn handle(&self, req: request::Request) -> response::Response {
match req {
request::Request::Echo(echo) => response::Response::Echo(self.echo(echo)),
request::Request::Info(info) => response::Response::Info(self.info(info)),
}
}
}
41 changes: 41 additions & 0 deletions abci/src/application/echo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//! Trivial ABCI application that just implements echo functionality.

use crate::Application;
use tendermint::abci::{request, response};

/// Trivial ABCI application that just implements echo functionality.
#[derive(Clone)]
pub struct EchoApp {
data: String,
version: String,
app_version: u64,
}

impl EchoApp {
/// Constructor.
pub fn new<S: AsRef<str>>(data: S, version: S, app_version: u64) -> Self {
Self {
data: data.as_ref().to_owned(),
version: version.as_ref().to_owned(),
app_version,
}
}
}

impl Default for EchoApp {
fn default() -> Self {
EchoApp::new("Echo App", "0.0.1", 1)
}
}

impl Application for EchoApp {
fn info(&self, _req: request::Info) -> response::Info {
response::Info {
data: self.data.clone(),
version: self.version.clone(),
app_version: self.app_version,
last_block_height: 1,
last_block_app_hash: vec![],
}
}
}
48 changes: 48 additions & 0 deletions abci/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//! ABCI clients for interacting with ABCI servers.

use crate::runtime::{ClientCodec, Runtime, TcpStream};
use crate::{Error, Result};
use std::convert::TryInto;
use tendermint::abci::request::RequestInner;
use tendermint::abci::{request, response};

/// A runtime-dependent ABCI client.
pub struct Client<Rt: Runtime> {
codec: Rt::ClientCodec,
}

#[cfg(feature = "async")]
impl<Rt: Runtime> Client<Rt> {
/// Connect to the ABCI server at the given network address.
pub async fn connect<S: AsRef<str>>(addr: S) -> Result<Self> {
let stream = Rt::TcpStream::connect(addr.as_ref()).await?;
Ok(Self {
codec: Rt::ClientCodec::from_tcp_stream(stream),
})
}

/// Request that the ABCI server echo back the message in the given
/// request.
pub async fn echo(&mut self, req: request::Echo) -> Result<response::Echo> {
self.perform(req).await
}

/// Provide information to the ABCI server about the Tendermint node in
/// exchange for information about the application.
pub async fn info(&mut self, req: request::Info) -> Result<response::Info> {
self.perform(req).await
}

async fn perform<Req: RequestInner>(&mut self, req: Req) -> Result<Req::Response> {
use futures::{SinkExt, StreamExt};

self.codec.send(req.into()).await?;
let res: std::result::Result<Req::Response, tendermint::Error> = self
.codec
.next()
.await
.ok_or(Error::ServerStreamTerminated)??
.try_into();
Ok(res?)
}
}
163 changes: 163 additions & 0 deletions abci/src/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
//! ABCI codec.

use crate::{Error, Result};
use bytes::{Buf, BufMut, BytesMut};
use tendermint::abci::request::Request;
use tendermint::abci::response::Response;
use tendermint_proto::Protobuf;

// The maximum number of bytes we expect in a varint. We use this to check if
// we're encountering a decoding error for a varint.
const MAX_VARINT_LENGTH: usize = 16;

/// Tendermint Socket Protocol encoder.
pub struct TspEncoder {}

impl TspEncoder {
/// Encode the given request to its raw wire-level representation and store
/// this in the given buffer.
#[cfg(feature = "client")]
pub fn encode_request(request: Request, mut dst: &mut BytesMut) -> Result<()> {
encode_length_delimited(|mut b| Ok(request.encode(&mut b)?), &mut dst)
}

/// Encode the given response to its raw wire-level representation and
/// store this in the given buffer.
pub fn encode_response(response: Response, mut dst: &mut BytesMut) -> Result<()> {
encode_length_delimited(|mut b| Ok(response.encode(&mut b)?), &mut dst)
}
}

/// Tendermint Socket Protocol decoder.
pub struct TspDecoder {
read_buf: BytesMut,
}

impl TspDecoder {
/// Constructor.
pub fn new() -> Self {
Self {
read_buf: BytesMut::new(),
}
}

/// Attempt to decode a request from the given buffer.
///
/// Returns `Ok(None)` if we don't yet have enough data to decode a full
/// request.
pub fn decode_request(&mut self, buf: &mut BytesMut) -> Result<Option<Request>> {
self.read_buf.put(buf);
decode_length_delimited(&mut self.read_buf, |mut b| Ok(Request::decode(&mut b)?))
}

/// Attempt to decode a response from the given buffer.
///
/// Returns `Ok(None)` if we don't yet have enough data to decode a full
/// response.
#[cfg(feature = "client")]
pub fn decode_response(&mut self, buf: &mut BytesMut) -> Result<Option<Response>> {
self.read_buf.put(buf);
decode_length_delimited(&mut self.read_buf, |mut b| Ok(Response::decode(&mut b)?))
}
}

// encode_varint and decode_varint will be removed once
// https://github.com/tendermint/tendermint/issues/5783 lands in Tendermint.
fn encode_varint<B: BufMut>(val: u64, mut buf: &mut B) {
tendermint_proto::encode_varint(val << 1, &mut buf);
}

fn decode_varint<B: Buf>(mut buf: &mut B) -> Result<u64> {
let len = tendermint_proto::decode_varint(&mut buf)
.map_err(|_| Error::Protobuf(tendermint_proto::Kind::DecodeMessage.into()))?;
Ok(len >> 1)
}

// Allows us to avoid having to re-export `prost::Message`.
// TODO(thane): Investigate a better approach here.
fn encode_length_delimited<F, B>(mut encode_fn: F, mut dst: &mut B) -> Result<()>
where
F: FnMut(&mut BytesMut) -> Result<()>,
B: BufMut,
{
let mut buf = BytesMut::new();
encode_fn(&mut buf)?;
let buf = buf.freeze();
encode_varint(buf.len() as u64, &mut dst);
dst.put(buf);
Ok(())
}

fn decode_length_delimited<F, T>(src: &mut BytesMut, mut decode_fn: F) -> Result<Option<T>>
where
F: FnMut(&mut BytesMut) -> Result<T>,
{
let src_len = src.len();
let mut tmp = src.clone().freeze();
let encoded_len = match decode_varint(&mut tmp) {
Ok(len) => len,
Err(e) => {
return if src_len <= MAX_VARINT_LENGTH {
// We've potentially only received a partial length delimiter
Ok(None)
} else {
Err(e)
};
}
};
let remaining = tmp.remaining() as u64;
if remaining < encoded_len {
// We don't have enough data yet to decode the entire message
Ok(None)
} else {
let delim_len = src_len - tmp.remaining();
// We only advance the source buffer once we're sure we have enough
// data to try to decode the result.
src.advance(delim_len + (encoded_len as usize));

let mut result_bytes = BytesMut::from(tmp.split_to(encoded_len as usize).as_ref());
Ok(Some(decode_fn(&mut result_bytes)?))
}
}

#[cfg(feature = "client")]
#[cfg(test)]
mod test {
use super::*;
use tendermint::abci::request::Echo;

#[test]
fn single_request() {
let request = Request::Echo(Echo {
message: "Hello TSP!".to_owned(),
});
let mut buf = BytesMut::new();
TspEncoder::encode_request(request.clone(), &mut buf).unwrap();

let mut decoder = TspDecoder::new();
let decoded_request = decoder.decode_request(&mut buf).unwrap().unwrap();

assert_eq!(request, decoded_request);
}

#[test]
fn multiple_requests() {
let requests = (0..5)
.map(|r| {
Request::Echo(Echo {
message: format!("Request {}", r),
})
})
.collect::<Vec<Request>>();
let mut buf = BytesMut::new();
requests
.iter()
.for_each(|request| TspEncoder::encode_request(request.clone(), &mut buf).unwrap());

let mut decoder = TspDecoder::new();
for request in requests {
let decoded = decoder.decode_request(&mut buf).unwrap().unwrap();
assert_eq!(decoded, request);
}
}
}
38 changes: 38 additions & 0 deletions abci/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//! ABCI framework for building applications with Tendermint.

mod application;
#[cfg(any(
feature = "client",
feature = "runtime-tokio",
feature = "runtime-async-std"
))]
mod codec;
mod result;
pub mod runtime;
mod server;

// Client exports
#[cfg(feature = "client")]
mod client;

#[cfg(feature = "client")]
pub use client::Client;

#[cfg(all(feature = "client", feature = "runtime-tokio"))]
pub type TokioClient = Client<runtime::tokio::Tokio>;
#[cfg(all(feature = "client", feature = "runtime-async-std"))]
pub type AsyncStdClient = Client<runtime::async_std::AsyncStd>;

// Example applications
#[cfg(feature = "echo-app")]
pub use application::echo::EchoApp;

// Common exports
pub use application::Application;
pub use result::{Error, Result};
pub use server::Server;

#[cfg(feature = "runtime-tokio")]
pub type TokioServer<A> = Server<A, runtime::tokio::Tokio>;
#[cfg(feature = "runtime-async-std")]
pub type AsyncStdServer<A> = Server<A, runtime::async_std::AsyncStd>;
Loading