Skip to content

Commit

Permalink
feat(builtins): implement subnet.resolve [NET-549] (#1798)
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh authored Sep 21, 2023
1 parent 9e380ac commit 85665e2
Show file tree
Hide file tree
Showing 13 changed files with 797 additions and 16 deletions.
440 changes: 440 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ members = [
"crates/health",
"sorcerer",
"crates/nox-tests",
"crates/subnet-resolver",
"nox",
"aquamarine",
"particle-protocol",
Expand Down Expand Up @@ -90,6 +91,7 @@ spell-storage = { path = "spell-storage" }
particle-execution = { path = "particle-execution" }
system-services = { path = "crates/system-services" }
health = { path = "crates/health" }
subnet-resolver = { path = "crates/subnet-resolver" }

# spell
fluence-spell-dtos = "=0.5.20"
Expand Down
6 changes: 6 additions & 0 deletions crates/created-swarm/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ pub struct SwarmConfig {
pub enabled_system_services: Vec<String>,
pub extend_system_services: Vec<system_services::PackageDistro>,
pub http_port: u16,
pub connector_api_endpoint: Option<String>,
}

impl SwarmConfig {
Expand All @@ -272,6 +273,7 @@ impl SwarmConfig {
enabled_system_services: vec![],
extend_system_services: vec![],
http_port: 0,
connector_api_endpoint: None,
}
}
}
Expand Down Expand Up @@ -374,6 +376,10 @@ pub fn create_swarm_with_runtime<RT: AquaRuntime>(
})
.collect();

if let Some(endpoint) = config.connector_api_endpoint.clone() {
resolved.system_services.decider.network_api_endpoint = endpoint;
}

let management_kp = fluence_keypair::KeyPair::generate_ed25519();
let management_peer_id = libp2p::identity::Keypair::from(management_kp.clone())
.public()
Expand Down
2 changes: 2 additions & 0 deletions crates/nox-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ local-vm = { path = "../local-vm" }
control-macro = { path = "../control-macro" }
json-utils = { path = "../json-utils" }
system-services = { workspace = true }
subnet-resolver = { workspace = true }

log-utils = { workspace = true }
fluence-spell-dtos = { workspace = true }
Expand Down Expand Up @@ -56,3 +57,4 @@ base64 = { workspace = true }
tokio = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
mockito = "1.1.0"
113 changes: 113 additions & 0 deletions crates/nox-tests/tests/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use service_modules::load_module;
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
use subnet_resolver::SubnetResolveResult;
use test_constants::PARTICLE_TTL;
use test_utils::create_service;

Expand Down Expand Up @@ -2166,3 +2167,115 @@ async fn aliases_restart() {
panic!("incorrect args: expected array")
}
}

#[tokio::test]
async fn subnet_resolve() {
let expected_request = r#"{"jsonrpc":"2.0","id":0,"method":"eth_call","params":[{"data":"0xf3b6a45d","to":"0x6dD1aFfe90415C61AeDf5c0ACcA9Cf5fD5031517"},"latest"]}"#;
let expected_request: serde_json::Value =
serde_json::from_str(expected_request).expect("parse expected_request as json");

let jsonrpc = r#"
{
"jsonrpc": "2.0",
"id": 0,
"result": "0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000032b7083358039745e731fb9809204d9304b48797406593e180b4e5a762a47321400000000000000000000000000000000000000000000000000000000000000012623d2cc0692ce6cb68ab094f95daa06a92a36f3cf7190e9baf7dd61562899f4a510574bbf0159ca28b7fb191d252346d1a32f853a3f0b1c9c5e59e28cfd546c0000000000000000000000000b9b9ac40dc527ea6a98110b796b0007074d49dd0000000000000000000000000000000000000000000000000de0b6b3a7640000000000000000000000000000000000000000000000000000000000000000004fdbfb375f013a592c50174ad241c67a4cf1b9ec81c902900b75f801f83cd2657a00000000000000000000000000000000000000000000000000000000000000022623d2cc0692ce6cb68ab094f95daa06a92a36f3cf7190e9baf7dd61562899f400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000b9b9ac40dc527ea6a98110b796b0007074d49dd0000000000000000000000000000000000000000000000000de0b6b3a7640000000000000000000000000000000000000000000000000000000000000000004fec7c6fea91d971bc7c5ed340ec86265bb93386fff248e842a1a69a94b58d2d9e00000000000000000000000000000000000000000000000000000000000000032623d2cc0692ce6cb68ab094f95daa06a92a36f3cf7190e9baf7dd61562899f400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000b9b9ac40dc527ea6a98110b796b0007074d49dd0000000000000000000000000000000000000000000000000de0b6b3a7640000000000000000000000000000000000000000000000000000000000000000004f"
}
"#;

// Create a mock
let mut server = mockito::Server::new();
let url = server.url();
let mock = server
.mock("POST", "/")
.with_body_from_request(move |req| {
let body = req.body().expect("mock: get req body");
let body: serde_json::Value =
serde_json::from_slice(body).expect("mock: parse req body as json");
assert_eq!(
body, expected_request,
"invalid request. expected {}, got {}",
expected_request, body
);
jsonrpc.into()
})
// expect to receive this exact body in POST
// .match_body(r#"{"jsonrpc":"2.0","id":0,"method":"eth_getLogs","params":[{"fromBlock":"0x52","toBlock":"0x246","address":"0x6328bb918a01603adc91eae689b848a9ecaef26d","topics":["0x55e61a24ecdae954582245e5e611fb06905d6af967334fff4db72793bebc72a9","0x7a82a5feefcaad4a89c689412031e5f87c02b29e3fced583be5f05c7077354b7"]}]}"#)
// expect exactly 1 POST request
.expect(1)
.with_status(200)
.with_header("content-type", "application/json")
.create();

let invalid_mock = server
.mock("POST", "/")
.expect(0)
.with_status(404)
.with_body("invalid mock was hit. Check that request body matches 'match_body' clause'")
.create();

let swarms = make_swarms_with_cfg(1, |mut cfg| {
cfg.connector_api_endpoint = Some(url.clone());
cfg
})
.await;

let mut client = ConnectedClient::connect_with_keypair(
swarms[0].multiaddr.clone(),
Some(swarms[0].management_keypair.clone()),
)
.await
.wrap_err("connect client")
.unwrap();

client.send_particle(
r#"
(seq
(call relay ("subnet" "resolve") ["0x6dD1aFfe90415C61AeDf5c0ACcA9Cf5fD5031517"] subnet)
(call %init_peer_id% ("op" "return") [subnet])
)
"#,
hashmap! {
"relay" => json!(client.node.to_string()),
},
);

let mut result = client.receive_args().await.unwrap();

let subnet: SubnetResolveResult = serde_json::from_value(result.remove(0)).unwrap();

assert!(subnet.success, "{:?}", subnet.error);
assert_eq!(subnet.error.len(), 0);
let pats: Vec<_> = subnet
.workers
.iter()
.map(|p| (p.pat_id.as_str(), p.host_id.as_str(), p.worker_id.clone()))
.collect();

assert_eq!(
pats,
vec![
(
"0x2b7083358039745e731fb9809204d9304b48797406593e180b4e5a762a473214",
"12D3KooWCPFLtcLwzT1k4gsacu3gkM2gYJTXdnTSfsPFZ67FrD4F",
vec!["12D3KooWLvhtdbBuFTzxvDXUGYcyxyeZrab1tZWEY4YY8K6PTjTH".to_string()],
),
(
"0xdbfb375f013a592c50174ad241c67a4cf1b9ec81c902900b75f801f83cd2657a",
"12D3KooWCPFLtcLwzT1k4gsacu3gkM2gYJTXdnTSfsPFZ67FrD4F",
vec![],
),
(
"0xec7c6fea91d971bc7c5ed340ec86265bb93386fff248e842a1a69a94b58d2d9e",
"12D3KooWCPFLtcLwzT1k4gsacu3gkM2gYJTXdnTSfsPFZ67FrD4F",
vec![],
),
]
);

// assert that there was no invalid requests
invalid_mock.assert();

// TODO: how to check request body?
// check that mock was called
mock.assert();
}
18 changes: 18 additions & 0 deletions crates/subnet-resolver/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "subnet-resolver"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ethabi = "18.0.0"
jsonrpsee = { version = "0.20.1", features = ["http-client", "macros"] }

libp2p-identity = { workspace = true, features = ["peerid"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
hex = "0.4.3"
eyre = { workspace = true }
serde = { workspace = true }
30 changes: 30 additions & 0 deletions crates/subnet-resolver/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use libp2p_identity::ParseError;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum ChainDataError {
#[error("empty data, nothing to parse")]
Empty,
#[error("missing token for field '{0}'")]
MissingParsedToken(&'static str),
#[error("invalid token for field '{0}'")]
InvalidParsedToken(&'static str),
#[error("data is not a valid hex: '{0}'")]
DecodeHex(#[source] hex::FromHexError),
#[error(transparent)]
EthError(#[from] ethabi::Error),
}

#[derive(Error, Debug)]
pub enum ResolveSubnetError {
#[error("error encoding function: '{0}'")]
EncodeFunction(#[from] ethabi::Error),
#[error("error sending jsonrpc request: '{0}'")]
RpcError(#[from] jsonrpsee::core::error::Error),
#[error(transparent)]
ChainData(#[from] ChainDataError),
#[error("getPATs response is empty")]
Empty,
#[error("'{1}' from getPATs is not a valid PeerId")]
InvalidPeerId(#[source] ParseError, &'static str),
}
6 changes: 6 additions & 0 deletions crates/subnet-resolver/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#![feature(try_blocks)]
mod error;
mod resolve;
mod utils;

pub use resolve::{resolve_subnet, SubnetResolveResult, Worker};
131 changes: 131 additions & 0 deletions crates/subnet-resolver/src/resolve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use crate::error::{ChainDataError, ResolveSubnetError};
use crate::utils::{decode_hex, next_opt, parse_peer_id};
use ethabi::ParamType::{Address, Array, FixedBytes, Tuple, Uint};
use ethabi::{Function, ParamType, StateMutability, Token};
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::rpc_params;

use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::runtime::Handle;

/// Parse data from chain. Accepts data with and without "0x" prefix.
pub fn parse_chain_data(data: &str) -> Result<Vec<Token>, ChainDataError> {
if data.is_empty() {
return Err(ChainDataError::Empty);
}
let data = decode_hex(data).map_err(ChainDataError::DecodeHex)?;
let signature: ParamType = Array(Box::new(Tuple(vec![
// bytes32 id
FixedBytes(32),
// uint256 index
Uint(256),
// bytes32 peerId
FixedBytes(32),
// bytes32 workerId
FixedBytes(32),
// address owner
Address,
// uint256 collateral
Uint(256),
// uint256 created
Uint(256),
])));
Ok(ethabi::decode(&[signature], &data)?)
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Worker {
pub pat_id: String,
pub host_id: String,
pub worker_id: Vec<String>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SubnetResolveResult {
pub success: bool,
pub workers: Vec<Worker>,
pub error: Vec<String>,
}

fn decode_pats(data: String) -> Result<Vec<Worker>, ResolveSubnetError> {
let tokens = parse_chain_data(&data)?;
let tokens = tokens.into_iter().next().ok_or(ResolveSubnetError::Empty)?;
let tokens = tokens
.into_array()
.ok_or(ChainDataError::InvalidParsedToken("response"))?;
let mut result = vec![];
for token in tokens {
let tuple = token
.into_tuple()
.ok_or(ChainDataError::InvalidParsedToken("tuple"))?;
let mut tuple = tuple.into_iter();

let pat_id = next_opt(&mut tuple, "pat_id", Token::into_fixed_bytes)?;
let pat_id = hex::encode(pat_id);

// skip 'index' field
let mut tuple = tuple.skip(1);

let peer_id = next_opt(&mut tuple, "compute_peer_id", Token::into_fixed_bytes)?;
let peer_id = parse_peer_id(peer_id)
.map_err(|e| ResolveSubnetError::InvalidPeerId(e, "compute_peer_id"))?;
let worker_id = next_opt(&mut tuple, "compute_worker_id", Token::into_fixed_bytes)?;
// if all bytes are 0, then worker_id is considered empty
let all_zeros = worker_id.iter().all(|b| *b == 0);
let worker_id = if all_zeros {
vec![]
} else {
let worker_id = parse_peer_id(worker_id)
.map_err(|e| ResolveSubnetError::InvalidPeerId(e, "worker_id"))?;
vec![worker_id.to_string()]
};

let pat = Worker {
pat_id: format!("0x{}", pat_id),
host_id: peer_id.to_string(),
worker_id,
};
result.push(pat);
}

Ok(result)
}

pub fn resolve_subnet(deal_id: String, api_endpoint: &str) -> SubnetResolveResult {
let res: Result<_, ResolveSubnetError> = try {
// Description of the `getPATs` function from the `chain.workers` smart contract on chain
#[allow(deprecated)]
let input = Function {
name: String::from("getPATs"),
inputs: vec![],
outputs: vec![],
constant: None,
state_mutability: StateMutability::View,
}
.encode_input(&[])?;
let input = format!("0x{}", hex::encode(input));
let client = HttpClientBuilder::default().build(api_endpoint)?;
let params = rpc_params![json!({ "data": input, "to": deal_id }), json!("latest")];
let response: Result<String, _> = tokio::task::block_in_place(move || {
Handle::current().block_on(async move { client.request("eth_call", params).await })
});

let pats = response?;

decode_pats(pats)?
};

match res {
Ok(workers) => SubnetResolveResult {
success: true,
workers,
error: vec![],
},
Err(err) => SubnetResolveResult {
success: false,
workers: vec![],
error: vec![format!("{}", err)],
},
}
}
30 changes: 30 additions & 0 deletions crates/subnet-resolver/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::error::ChainDataError;
use ethabi::Token;
use libp2p_identity::{ParseError, PeerId};

/// Static prefix of the PeerId. Protobuf encoding + multihash::identity + length and so on.
pub(crate) const PEER_ID_PREFIX: &[u8] = &[0, 36, 8, 1, 18, 32];

pub(crate) fn parse_peer_id(bytes: Vec<u8>) -> Result<PeerId, ParseError> {
let peer_id = [PEER_ID_PREFIX, &bytes].concat();

PeerId::from_bytes(&peer_id)
}

pub(crate) fn decode_hex(h: &str) -> Result<Vec<u8>, hex::FromHexError> {
let h = h.trim_start_matches("0x");
hex::decode(h)
}

pub(crate) fn next_opt<T>(
data_tokens: &mut impl Iterator<Item = Token>,
name: &'static str,
f: impl Fn(Token) -> Option<T>,
) -> Result<T, ChainDataError> {
let next = data_tokens
.next()
.ok_or(ChainDataError::MissingParsedToken(name))?;
let parsed = f(next).ok_or(ChainDataError::InvalidParsedToken(name))?;

Ok(parsed)
}
Loading

0 comments on commit 85665e2

Please sign in to comment.