Skip to content

Commit

Permalink
add(rpc): Add a tonic server in zebra-rpc (#8674)
Browse files Browse the repository at this point in the history
* adds a tonic server

* Adds a test stub, moves method impls to their own modules, minor fixes.

* Moves indexer rpc mod behind a feature, adds a config field for its listen address, and initializes the indexer RPC when zebrad starts

* Skips tonic_build() in zebra-rpc build script unless indexer-rpcs feature is selected, simplifies indexer.proto file, makes tonic deps optional

* formats zebra-rpc Cargo.toml

* Adds tokio_stream dependency, adds chain_tip_change field to IndexerRPC, and implements a simple version of the chain_tip_change RPC method

* passes latest chain tip to indexer::server::init from start cmd and  updates vectors test

* Update zebra-rpc/src/config.rs

* fixes a race condition in trusted_chain_sync_handles_forks_correctly
  • Loading branch information
arya2 authored Jul 16, 2024
1 parent 1238ec0 commit 14463a7
Show file tree
Hide file tree
Showing 15 changed files with 371 additions and 20 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5988,11 +5988,16 @@ dependencies = [
"jsonrpc-derive",
"jsonrpc-http-server",
"proptest",
"prost",
"rand 0.8.5",
"serde",
"serde_json",
"thiserror",
"tokio",
"tokio-stream",
"tonic 0.11.0",
"tonic-build 0.11.0",
"tonic-reflection",
"tower",
"tracing",
"zcash_address",
Expand Down
61 changes: 52 additions & 9 deletions zebra-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,23 @@ homepage = "https://zfnd.org/zebra/"
# crates.io is limited to 5 keywords and categories
keywords = ["zebra", "zcash"]
# Must be one of <https://crates.io/category_slugs>
categories = ["asynchronous", "cryptography::cryptocurrencies", "encoding", "network-programming"]
categories = [
"asynchronous",
"cryptography::cryptocurrencies",
"encoding",
"network-programming",
]

[features]

indexer-rpcs = [
"tonic-build",
"tonic",
"tonic-reflection",
"prost",
"tokio-stream",
]

# Production features that activate extra dependencies, or extra features in dependencies

# Mining RPC support
Expand All @@ -41,7 +54,10 @@ proptest-impl = [
]

[dependencies]
chrono = { version = "0.4.38", default-features = false, features = ["clock", "std"] }
chrono = { version = "0.4.38", default-features = false, features = [
"clock",
"std",
] }
futures = "0.3.30"

# lightwalletd sends JSON-RPC requests over HTTP 1.1
Expand All @@ -55,14 +71,26 @@ jsonrpc-http-server = "18.0.0"
serde_json = { version = "1.0.120", features = ["preserve_order"] }
indexmap = { version = "2.2.6", features = ["serde"] }

tokio = { version = "1.37.0", features = ["time", "rt-multi-thread", "macros", "tracing"] }
tokio = { version = "1.37.0", features = [
"time",
"rt-multi-thread",
"macros",
"tracing",
] }
tower = "0.4.13"

# indexer-rpcs dependencies
tonic = { version = "0.11.0", optional = true }
tonic-reflection = { version = "0.11.0", optional = true }
prost = { version = "0.12.6", optional = true }
tokio-stream = { version = "0.1.15", optional = true }

tracing = "0.1.39"

hex = { version = "0.4.3", features = ["serde"] }
serde = { version = "1.0.204", features = ["serde_derive"] }


zcash_primitives = { version = "0.15.0" }

# Experimental feature getblocktemplate-rpcs
Expand All @@ -73,13 +101,20 @@ zcash_address = { version = "0.3.2", optional = true }
# Test-only feature proptest-impl
proptest = { version = "1.4.0", optional = true }

zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = ["json-conversion"] }
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = [
"json-conversion",
] }
zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38" }
zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38" }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = ["rpc-client"] }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = [
"rpc-client",
] }
zebra-script = { path = "../zebra-script", version = "1.0.0-beta.38" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38" }

[build-dependencies]
tonic-build = { version = "0.11.0", optional = true }

[dev-dependencies]
insta = { version = "1.39.0", features = ["redactions", "json", "ron"] }

Expand All @@ -88,9 +123,17 @@ proptest = "1.4.0"
thiserror = "1.0.61"
tokio = { version = "1.37.0", features = ["full", "tracing", "test-util"] }

zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = [
"proptest-impl",
] }
zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38", features = [
"proptest-impl",
] }
zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38", features = [
"proptest-impl",
] }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38", features = [
"proptest-impl",
] }

zebra-test = { path = "../zebra-test", version = "1.0.0-beta.38" }
15 changes: 15 additions & 0 deletions zebra-rpc/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! Compile proto files
fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(feature = "indexer-rpcs")]
{
use std::{env, path::PathBuf};
let out_dir = env::var("OUT_DIR").map(PathBuf::from);
tonic_build::configure()
.type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]")
.file_descriptor_set_path(out_dir.unwrap().join("indexer_descriptor.bin"))
.compile(&["proto/indexer.proto"], &[""])?;
}

Ok(())
}
10 changes: 10 additions & 0 deletions zebra-rpc/proto/indexer.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
syntax = "proto3";
package zebra.indexer.rpc;

// Used by methods that take no arguments.
message Empty {};

service Indexer {
// Notifies listeners of chain tip changes
rpc ChainTipChange(Empty) returns (stream Empty);
}
19 changes: 19 additions & 0 deletions zebra-rpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ pub struct Config {
/// They can also query your node's state.
pub listen_addr: Option<SocketAddr>,

/// IP address and port for the indexer RPC server.
///
/// Note: The indexer RPC server is disabled by default.
/// To enable the indexer RPC server, compile `zebrad` with the
/// `indexer` feature flag and set a listen address in the config:
/// ```toml
/// [rpc]
/// indexer_listen_addr = '127.0.0.1:8230'
/// ```
///
/// # Security
///
/// If you bind Zebra's indexer RPC port to a public IP address,
/// anyone on the internet can query your node's state.
pub indexer_listen_addr: Option<SocketAddr>,

/// The number of threads used to process RPC requests and responses.
///
/// Zebra's RPC server has a separate thread pool and a `tokio` executor for each thread.
Expand Down Expand Up @@ -65,6 +81,9 @@ impl Default for Config {
// Disable RPCs by default.
listen_addr: None,

// Disable indexer RPCs by default.
indexer_listen_addr: None,

// Use a single thread, so we can detect RPC port conflicts.
#[cfg(not(feature = "getblocktemplate-rpcs"))]
parallel_cpu_threads: 1,
Expand Down
13 changes: 13 additions & 0 deletions zebra-rpc/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//! A tonic RPC server for Zebra's indexer API.
#[cfg(test)]
mod tests;

pub mod methods;
pub mod server;

// The generated indexer proto
tonic::include_proto!("zebra.indexer.rpc");

pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("indexer_descriptor");
57 changes: 57 additions & 0 deletions zebra-rpc/src/indexer/methods.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//! Implements `Indexer` methods on the `IndexerRPC` type
use std::pin::Pin;

use futures::Stream;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Response, Status};
use tower::BoxError;

use zebra_chain::chain_tip::ChainTip;

use super::{indexer_server::Indexer, server::IndexerRPC, Empty};

/// The maximum number of messages that can be queued to be streamed to a client
const RESPONSE_BUFFER_SIZE: usize = 10_000;

#[tonic::async_trait]
impl<ReadStateService, Tip> Indexer for IndexerRPC<ReadStateService, Tip>
where
ReadStateService: tower::Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
type ChainTipChangeStream = Pin<Box<dyn Stream<Item = Result<Empty, Status>> + Send>>;

async fn chain_tip_change(
&self,
_: tonic::Request<Empty>,
) -> Result<Response<Self::ChainTipChangeStream>, Status> {
let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
let response_stream = ReceiverStream::new(response_receiver);
let mut chain_tip_change = self.chain_tip_change.clone();

tokio::spawn(async move {
// Notify the client of chain tip changes until the channel is closed
while let Ok(()) = chain_tip_change.best_tip_changed().await {
let tx = response_sender.clone();
tokio::spawn(async move { tx.send(Ok(Empty {})).await });
}

let _ = response_sender
.send(Err(Status::unavailable(
"chain_tip_change channel has closed",
)))
.await;
});

Ok(Response::new(Box::pin(response_stream)))
}
}
77 changes: 77 additions & 0 deletions zebra-rpc/src/indexer/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! A tonic RPC server for Zebra's indexer API.
use std::net::SocketAddr;

use tokio::task::JoinHandle;
use tonic::transport::{server::TcpIncoming, Server};
use tower::BoxError;
use zebra_chain::chain_tip::ChainTip;

use super::indexer_server::IndexerServer;

type ServerTask = JoinHandle<Result<(), BoxError>>;

/// Indexer RPC service.
pub struct IndexerRPC<ReadStateService, Tip>
where
ReadStateService: tower::Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
_read_state: ReadStateService,
pub(super) chain_tip_change: Tip,
}

/// Initializes the indexer RPC server
pub async fn init<ReadStateService, Tip>(
listen_addr: SocketAddr,
_read_state: ReadStateService,
chain_tip_change: Tip,
) -> Result<(ServerTask, SocketAddr), BoxError>
where
ReadStateService: tower::Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
<ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
let indexer_service = IndexerRPC {
_read_state,
chain_tip_change,
};

let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(crate::indexer::FILE_DESCRIPTOR_SET)
.build()
.unwrap();

tracing::info!("Trying to open indexer RPC endpoint at {}...", listen_addr,);

let tcp_listener = tokio::net::TcpListener::bind(listen_addr).await?;
let listen_addr = tcp_listener.local_addr()?;
let incoming = TcpIncoming::from_listener(tcp_listener, true, None)?;

let server_task: JoinHandle<Result<(), BoxError>> = tokio::spawn(async move {
Server::builder()
.add_service(reflection_service)
.add_service(IndexerServer::new(indexer_service))
.serve_with_incoming(incoming)
.await?;

Ok(())
});

Ok((server_task, listen_addr))
}
1 change: 1 addition & 0 deletions zebra-rpc/src/indexer/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod vectors;
Loading

0 comments on commit 14463a7

Please sign in to comment.