Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(state): implement jsonrpc for the state network #1175

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 241 additions & 2 deletions trin-state/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@ use discv5::{enr::NodeId, Enr};
use portalnet::overlay_service::OverlayRequestError;
use serde_json::{json, Value};
use tokio::sync::mpsc;
use tracing::error;
use trin_storage::ContentStore;

use crate::network::StateNetwork;
use ethportal_api::{
jsonrpsee::core::Serialize,
types::{
distance::Distance,
jsonrpc::{endpoints::StateEndpoint, request::StateJsonRpcRequest},
portal::{FindNodesInfo, PongInfo},
portal::{AcceptInfo, FindNodesInfo, PongInfo},
portal_wire::Content,
query_trace::QueryTrace,
state::{ContentInfo, TraceContentInfo},
},
utils::bytes::hex_encode,
ContentValue, OverlayContentKey, StateContentKey, StateContentValue,
};

/// Handles State network JSON-RPC requests
Expand Down Expand Up @@ -42,7 +49,43 @@ impl StateRequestHandler {
recursive_find_nodes(network, node_id).await
}
StateEndpoint::DataRadius => radius(network),
_ => Err("Not implemented".to_string()),
StateEndpoint::LocalContent(content_key) => local_content(network, content_key).await,
StateEndpoint::FindContent(enr, content_key) => {
find_content(network, enr, content_key).await
}
StateEndpoint::RecursiveFindContent(content_key) => {
recursive_find_content(network, content_key, /* is_trace= */ false).await
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, didn't know we could inline comments this way 💡

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I've mentioned in a previous pr, I don't love it. But, I guess in this case especially I see the usefulness. And creating a let is_trace=false; variable does seem overkill. So, maybe I'll have to change my mind about this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They might look weird at first, but they grow on you.

They shouldn't be overused, but they have their use cases.

}
StateEndpoint::TraceRecursiveFindContent(content_key) => {
recursive_find_content(network, content_key, /* is_trace= */ true).await
}
StateEndpoint::Store(content_key, content_value) => {
store(network, content_key, content_value).await
}
StateEndpoint::Offer(enr, content_key, content_value) => {
offer(network, enr, content_key, content_value).await
}
StateEndpoint::Gossip(content_key, content_value) => {
gossip(
network,
content_key,
content_value,
/* is_trace= */ false,
)
.await
}
StateEndpoint::TraceGossip(content_key, content_value) => {
gossip(
network,
content_key,
content_value,
/* is_trace= */ true,
)
.await
}
StateEndpoint::PaginateLocalContentKeys(_, _) => {
Err("Pagination not implemented for state network".to_string())
}
};

let _ = request.resp.send(response);
Expand Down Expand Up @@ -114,6 +157,202 @@ async fn recursive_find_nodes(
Ok(json!(nodes))
}

fn local_storage_lookup(
network: &Arc<StateNetwork>,
content_key: &StateContentKey,
) -> Result<Option<Vec<u8>>, String> {
network
.overlay
.store
.read()
.get(content_key)
.map_err(|err| err.to_string())
}

async fn local_content(
network: Arc<StateNetwork>,
content_key: StateContentKey,
) -> Result<Value, String> {
match local_storage_lookup(&network, &content_key) {
Ok(Some(content)) => Ok(Value::String(hex_encode(content))),
Ok(None) => {
let err = json!({
"message": "Content not found in local storage",
});
Err(err.to_string())
}
Err(err) => Err(format!(
"LocalContent failed: error while looking for content key in local storage: {err:?}",
)),
}
}

async fn find_content(
network: Arc<StateNetwork>,
enr: Enr,
content_key: StateContentKey,
) -> Result<Value, String> {
let result = network
.overlay
.send_find_content(enr, content_key.into())
.await
.and_then(|(content, utp_transfer)| match content {
Content::ConnectionId(id) => Err(OverlayRequestError::Failure(format!(
"FindContent request returned a connection id ({id:?}) instead of conducting utp transfer."
))),
Content::Content(content) => Ok(json!({
"content": hex_encode(content),
"utpTransfer": utp_transfer,
})),
Content::Enrs(enrs) => Ok(json!({
"enrs": enrs,
})),
});
to_json_result("FindContent", result)
}

async fn recursive_find_content(
network: Arc<StateNetwork>,
content_key: StateContentKey,
is_trace: bool,
) -> Result<Value, String> {
let local_content = match local_storage_lookup(&network, &content_key) {
Ok(data) => data,
Err(err) => {
error!(
error = %err,
content.key = %content_key,
"Error checking local store for content",
);
None
}
};
let (content_bytes, utp_transfer, trace) = match local_content {
Some(value) => {
let trace = if is_trace {
let local_enr = network.overlay.local_enr();
let mut trace = QueryTrace::new(&local_enr, content_key.content_id());
trace.node_responded_with_content(&local_enr);
Some(trace)
} else {
None
};
(value, false, trace)
}
None => network
.overlay
.lookup_content(content_key.clone(), is_trace)
.await
.map_err(|err| err.to_string())?
.map_err(|err| match err {
OverlayRequestError::ContentNotFound {
message,
utp,
trace,
} => {
let err = json!({
"message": format!("{message}: utp: {utp}"),
"trace": trace
});
err.to_string()
}
_ => {
error!(
error = %err,
content.key = %content_key,
"Error looking up content",
);
err.to_string()
}
})?,
};

let content =
StateContentValue::decode(content_bytes.as_ref()).map_err(|err| err.to_string())?;

if is_trace {
Ok(json!(TraceContentInfo {
content,
utp_transfer,
trace: trace.ok_or("Content query trace requested but none provided.".to_string())?,
}))
} else {
Ok(json!(ContentInfo::Content {
content,
utp_transfer
}))
}
}

async fn store(
network: Arc<StateNetwork>,
content_key: StateContentKey,
content_value: StateContentValue,
) -> Result<Value, String> {
to_json_result(
"Store",
network
.overlay
.store
.write()
.put(content_key, content_value.encode())
.map(|_| true)
.map_err(|err| OverlayRequestError::Failure(err.to_string())),
)
}

async fn offer(
network: Arc<StateNetwork>,
enr: Enr,
content_key: StateContentKey,
content_value: Option<StateContentValue>,
) -> Result<Value, String> {
if let Some(content_value) = content_value {
to_json_result(
"Populate Offer",
network
.overlay
.send_populated_offer(enr, content_key.into(), content_value.encode())
.await
.map(|accept| AcceptInfo {
content_keys: accept.content_keys,
}),
)
} else {
to_json_result(
"Offer",
network
.overlay
.send_offer(vec![content_key.into()], enr)
.await
.map(|accept| AcceptInfo {
content_keys: accept.content_keys,
}),
)
}
}

async fn gossip(
network: Arc<StateNetwork>,
content_key: StateContentKey,
content_value: StateContentValue,
is_trace: bool,
) -> Result<Value, String> {
if is_trace {
Ok(json!(
network
.overlay
.propagate_gossip_trace(content_key, content_value.encode())
.await
))
} else {
Ok(network
.overlay
.propagate_gossip(vec![(content_key, content_value.encode())])
.into())
}
}

fn to_json_result(
request: &str,
result: Result<impl Serialize, OverlayRequestError>,
Expand Down
Loading