Skip to content

Commit

Permalink
ROLL-13 Missing Tezos facade endpoints (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
matsakiv committed Jul 18, 2023
1 parent 185fe51 commit 46179e2
Show file tree
Hide file tree
Showing 15 changed files with 855 additions and 77 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion tezos_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ tezos_core = { git = "https://github.com/baking-bad/tezos-rust-sdk", branch = "d
tezos_operation = { git = "https://github.com/baking-bad/tezos-rust-sdk", branch = "develop", package = "tezos-operation", default-features = false, features = ["ed25519"] }
tezos_michelson = { git = "https://github.com/baking-bad/tezos-rust-sdk", branch = "develop", package = "tezos-michelson", default-features = false }
tezos_contract = { git = "https://github.com/baking-bad/tezos-rust-sdk", branch = "develop", package = "tezos-contract", default-features = false }
chrono = { version = "0.4", default-features = false }
ibig = { version = "0.3", features = ["std", "num-traits"], default-features = false }
tokio = { version = "1.25.0", features = ["process", "macros"] }
tokio-stream = "0.1.14"

[dev-dependencies]
tokio = { version = "1.25.0", features = ["process", "macros"] }
static_init = "1.0.3"
tezos_rpc = { git = "https://github.com/baking-bad/tezos-rust-sdk", branch = "develop", package = "tezos-rpc", features = ["http"] }
3 changes: 3 additions & 0 deletions tezos_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,8 @@ async fn main() -> std::io::Result<()> {

let data = Data::new(client);
let host = Data::new(rpc_host);

tezos_node::rollup::rpc_client::run_block_updater(&data);

launch_node::<RollupRpcClient>(data, &args.rpc_addr, args.port, host).await
}
14 changes: 12 additions & 2 deletions tezos_node/src/rollup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ pub mod rpc_backend;
pub mod rpc_client;
pub mod rpc_helpers;

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

use actix_web::web::Bytes;
use async_trait::async_trait;
use layered_store::StoreType;
use serde::Serialize;
Expand All @@ -26,18 +30,23 @@ use tezos_rpc::models::{
operation::Operation,
version::VersionInfo,
};
use tokio::sync::mpsc::Receiver;

use crate::Result;
pub use block_id::BlockId;

#[async_trait]
pub trait RollupClient {
pub trait RollupClient: Sync + Send {
async fn initialize(&mut self) -> Result<()>;
async fn store_get<T: StoreType>(&self, key: String, block_id: &BlockId) -> Result<T>;
async fn get_chain_id(&self) -> Result<ChainId>;
async fn get_version(&self) -> Result<VersionInfo>;
async fn is_chain_synced(&self) -> Result<bool>;
async fn inject_batch(&self, messages: Vec<Vec<u8>>) -> Result<()>;
fn get_ttl_blocks(&self) -> Result<Arc<Mutex<VecDeque<BlockHash>>>>;
fn create_channel(&self) -> Result<Receiver<Result<Bytes>>>;
async fn broadcast_to_channels(&self, data: Bytes) -> Result<()>;
fn channels_count(&self) -> usize;

async fn get_batch_head(&self, block_id: &BlockId) -> Result<Head> {
let head: Head = self.store_get("/head".into(), block_id).await?;
Expand Down Expand Up @@ -83,7 +92,7 @@ pub trait TezosFacade {
async fn get_block_header(&self, block_id: &BlockId) -> Result<FullHeader>;
async fn get_block_metadata(&self, block_id: &BlockId) -> Result<Metadata>;
async fn get_block_protocols(&self, block_id: &BlockId) -> Result<BlockProtocols>;
async fn get_live_blocks(&self, block_id: &BlockId) -> Result<Vec<BlockHash>>;
async fn get_live_blocks(&self, block_id: &BlockId) -> Result<VecDeque<BlockHash>>;
async fn get_contract(&self, block_id: &BlockId, address: &Address) -> Result<ContractInfo>;
async fn get_contract_balance(&self, block_id: &BlockId, address: &Address) -> Result<Mutez>;
async fn get_contract_counter(
Expand Down Expand Up @@ -140,6 +149,7 @@ pub trait TezosFacade {
async fn get_operation(&self, block_id: &BlockId, pass: i32, index: i32) -> Result<Operation>;
async fn get_operation_list(&self, block_id: &BlockId, pass: i32) -> Result<Vec<Operation>>;
async fn get_operation_list_list(&self, block_id: &BlockId) -> Result<Vec<Vec<Operation>>>;
async fn get_heads_main_channel(&self) -> Result<Receiver<Result<Bytes>>>;
}

#[async_trait]
Expand Down
20 changes: 14 additions & 6 deletions tezos_node/src/rollup/facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
//
// SPDX-License-Identifier: MIT

use actix_web::web::Bytes;
use async_trait::async_trait;
use michelson_vm::entrypoints::collect_entrypoints;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use tezos_core::types::encoded::{
Address, BlockHash, ContractAddress, Encoded, ImplicitAddress, OperationHash, PublicKey,
ScriptExprHash,
Expand All @@ -17,14 +18,15 @@ use tezos_rpc::models::{
contract::{ContractEntrypoints, ContractInfo, ContractScript},
operation::Operation,
};
use tokio::sync::mpsc::Receiver;

use crate::{
rollup::{BlockId, BlockProtocols, RollupClient, TezosFacade},
Error, Result,
};

#[async_trait]
impl<T: RollupClient + Sync + Send> TezosFacade for T {
impl<T: RollupClient + Send + Sync> TezosFacade for T {
async fn get_block_hash(&self, block_id: &BlockId) -> Result<BlockHash> {
match block_id {
BlockId::Hash(hash) => Ok(hash.clone()),
Expand Down Expand Up @@ -278,9 +280,15 @@ impl<T: RollupClient + Sync + Send> TezosFacade for T {
})
}

async fn get_live_blocks(&self, block_id: &BlockId) -> Result<Vec<BlockHash>> {
let receipt = self.get_batch_receipt(block_id).await?;
// TODO: ttl blocks
Ok(vec![receipt.header.predecessor])
async fn get_live_blocks(&self, _block_id: &BlockId) -> Result<VecDeque<BlockHash>> {
let live_blocks_ptr = self.get_ttl_blocks().unwrap();
let live_blocks = live_blocks_ptr.lock().unwrap();
// TODO: remove hashes after block_id?
//let block_hash = self.get_block_hash(block_id).await.unwrap();
Ok(live_blocks.clone())
}

async fn get_heads_main_channel(&self) -> Result<Receiver<Result<Bytes>>> {
self.create_channel()
}
}
51 changes: 49 additions & 2 deletions tezos_node/src/rollup/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
//
// SPDX-License-Identifier: MIT

use actix_web::web::Bytes;
use async_trait::async_trait;
use layered_store::{ephemeral::EphemeralCopy, StoreType};
use log::debug;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::sync::Mutex;
use tezos_core::types::encoded::{ChainId, Encoded, OperationHash};
use std::{cell::RefCell, sync::Arc};
use tezos_core::types::encoded::{BlockHash, ChainId, Encoded, OperationHash};
use tezos_operation::operations::SignedOperation;
use tezos_proto::{
batcher::apply_batch,
Expand All @@ -19,6 +21,7 @@ use tezos_rpc::models::operation::Operation;
use tezos_rpc::models::version::{
AdditionalInfo, CommitInfo, NetworkVersion, Version, VersionInfo,
};
use tokio::sync::mpsc::{channel, Receiver, Sender};

use crate::{
rollup::{rpc_helpers::parse_operation, BlockId, RollupClient, TezosHelpers},
Expand All @@ -30,6 +33,8 @@ const CHAIN_ID: &str = "NetXP2FfcNxFANL";
pub struct RollupMockClient {
context: Mutex<RefCell<TezosEphemeralContext>>,
mempool: Mutex<RefCell<Vec<(OperationHash, SignedOperation)>>>,
ttl_blocks: Arc<Mutex<VecDeque<BlockHash>>>,
channels: Arc<Mutex<Vec<Sender<Result<Bytes>>>>>,
}

macro_rules! get_mut {
Expand All @@ -38,11 +43,15 @@ macro_rules! get_mut {
};
}

const MAX_TTL_BLOCKS_COUNT: usize = 60;

impl Default for RollupMockClient {
fn default() -> Self {
Self {
context: Mutex::new(RefCell::new(TezosEphemeralContext::default())),
mempool: Mutex::new(RefCell::new(Vec::new())),
ttl_blocks: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_TTL_BLOCKS_COUNT))),
channels: Arc::new(Mutex::new(Vec::new())),
}
}
}
Expand Down Expand Up @@ -114,6 +123,44 @@ impl RollupClient for RollupMockClient {
async fn inject_batch(&self, _messages: Vec<Vec<u8>>) -> Result<()> {
unreachable!()
}

fn create_channel(&self) -> Result<Receiver<Result<Bytes>>> {
const LONG_POLL_CHANNEL_SIZE: usize = 1;
let (tx, rx) = channel::<Result<Bytes>>(LONG_POLL_CHANNEL_SIZE);
let mut channels = self.channels.lock().unwrap();
channels.push(tx);
Ok(rx)
}

fn get_ttl_blocks(&self) -> Result<Arc<Mutex<VecDeque<BlockHash>>>> {
Ok(Arc::clone(&self.ttl_blocks))
}

async fn broadcast_to_channels(&self, data: Bytes) -> Result<()> {
let mut channels = self.channels.lock().unwrap();
let mut i = 0;
while i < channels.len() {
if channels[i].is_closed() {
channels.remove(i);
continue;
}

let value = data.clone();
if let Err(_) = channels[i].try_send(Ok(value)) {
channels.remove(i);
continue;
}

i += 1;
}

Ok(())
}

fn channels_count(&self) -> usize {
let channels_ptr = self.channels.lock().unwrap();
channels_ptr.len()
}
}

#[async_trait]
Expand Down
Loading

0 comments on commit 46179e2

Please sign in to comment.