From 7ef29e1af620e829b2f2030bc914c0f98d489ddb Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Thu, 21 Sep 2023 09:56:51 +0200 Subject: [PATCH] implementing history, block storage method changes after groovies review --- Cargo.lock | 15 ++ Cargo.toml | 2 + .../src/rpc_polling/poll_blocks.rs | 139 +----------------- core/src/commitment_utils.rs | 46 ++++++ core/src/lib.rs | 1 + core/src/stores/data_cache.rs | 10 +- core/src/structures/produced_block.rs | 130 +++++++++++++++- core/src/traits/block_storage_interface.rs | 15 +- history/Cargo.toml | 20 +++ .../src/block_stores/inmemory_block_store.rs | 76 ++++++++++ history/src/block_stores/mod.rs | 2 + .../multiple_strategy_block_store.rs | 139 ++++++++++++++++++ history/src/history.rs | 6 + history/src/lib.rs | 2 + history/tests/inmemory_block_store_tests.rs | 54 +++++++ history/tests/mod.rs | 2 + .../multiple_strategy_block_store_tests.rs | 139 ++++++++++++++++++ lite-rpc/Cargo.toml | 8 +- lite-rpc/src/bridge.rs | 34 ++++- lite-rpc/src/main.rs | 20 ++- lite-rpc/src/rpc.rs | 12 +- lite-rpc/src/service_spawner.rs | 2 +- services/src/data_caching_service.rs | 2 +- services/src/transaction_service.rs | 8 +- 24 files changed, 719 insertions(+), 165 deletions(-) create mode 100644 core/src/commitment_utils.rs create mode 100644 history/Cargo.toml create mode 100644 history/src/block_stores/inmemory_block_store.rs create mode 100644 history/src/block_stores/mod.rs create mode 100644 history/src/block_stores/multiple_strategy_block_store.rs create mode 100644 history/src/history.rs create mode 100644 history/src/lib.rs create mode 100644 history/tests/inmemory_block_store_tests.rs create mode 100644 history/tests/mod.rs create mode 100644 history/tests/multiple_strategy_block_store_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 5e9a3c58..73302df7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2458,6 +2458,7 @@ dependencies = [ "serde_json", "solana-lite-rpc-cluster-endpoints", "solana-lite-rpc-core", + "solana-lite-rpc-history", "solana-lite-rpc-services", "solana-rpc-client", "solana-rpc-client-api", @@ -4291,6 +4292,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-lite-rpc-history" +version = "0.2.3" +dependencies = [ + "async-trait", + "dashmap", + "solana-lite-rpc-core", + "solana-rpc-client", + "solana-rpc-client-api", + "solana-sdk", + "solana-transaction-status", + "tokio", +] + [[package]] name = "solana-lite-rpc-quic-forward-proxy" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 8a4e7efa..30f57aa2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "quic-forward-proxy", "quic-forward-proxy-integration-test", "cluster-endpoints", + "history", "bench" ] @@ -55,6 +56,7 @@ rustls = { version = "=0.20.8", default-features = false } solana-lite-rpc-services = {path = "services", version="0.2.3"} solana-lite-rpc-core = {path = "core", version="0.2.3"} solana-lite-rpc-cluster-endpoints = {path = "cluster-endpoints", version="0.2.3"} +solana-lite-rpc-history = {path = "history", version="0.2.3"} async-trait = "0.1.68" yellowstone-grpc-client = "1.9.0" diff --git a/cluster-endpoints/src/rpc_polling/poll_blocks.rs b/cluster-endpoints/src/rpc_polling/poll_blocks.rs index fd680e46..af269fd2 100644 --- a/cluster-endpoints/src/rpc_polling/poll_blocks.rs +++ b/cluster-endpoints/src/rpc_polling/poll_blocks.rs @@ -1,23 +1,12 @@ use anyhow::Context; use solana_client::nonblocking::rpc_client::RpcClient; use solana_lite_rpc_core::{ - structures::{ - produced_block::{ProducedBlock, TransactionInfo}, - slot_notification::SlotNotification, - }, + structures::{produced_block::ProducedBlock, slot_notification::SlotNotification}, AnyhowJoinHandle, }; use solana_rpc_client_api::config::RpcBlockConfig; -use solana_sdk::{ - borsh0_10::try_from_slice_unchecked, - commitment_config::CommitmentConfig, - compute_budget::{self, ComputeBudgetInstruction}, - slot_history::Slot, -}; -use solana_transaction_status::{ - option_serializer::OptionSerializer, RewardType, TransactionDetails, UiTransactionEncoding, - UiTransactionStatusMeta, -}; +use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot}; +use solana_transaction_status::{TransactionDetails, UiTransactionEncoding}; use std::{ sync::{atomic::AtomicU64, Arc}, time::Duration, @@ -42,126 +31,10 @@ pub async fn process_block( ) .await; - if block.is_err() { - return None; + match block { + Ok(block) => Some(ProducedBlock::from_ui_block(block, slot, commitment_config)), + Err(_) => None, } - let block = block.unwrap(); - - let Some(block_height) = block.block_height else { - return None; - }; - - let Some(txs) = block.transactions else { - return None; - }; - - let blockhash = block.blockhash; - let parent_slot = block.parent_slot; - - let txs = txs - .into_iter() - .filter_map(|tx| { - let Some(UiTransactionStatusMeta { - err, - compute_units_consumed, - .. - }) = tx.meta - else { - log::info!("Tx with no meta"); - return None; - }; - - let Some(tx) = tx.transaction.decode() else { - log::info!("Tx could not be decoded"); - return None; - }; - - let signature = tx.signatures[0].to_string(); - let cu_consumed = match compute_units_consumed { - OptionSerializer::Some(cu_consumed) => Some(cu_consumed), - _ => None, - }; - - let legacy_compute_budget = tx.message.instructions().iter().find_map(|i| { - if i.program_id(tx.message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { - units, - additional_fee, - }) = try_from_slice_unchecked(i.data.as_slice()) - { - return Some((units, additional_fee)); - } - } - None - }); - - let mut cu_requested = tx.message.instructions().iter().find_map(|i| { - if i.program_id(tx.message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = - try_from_slice_unchecked(i.data.as_slice()) - { - return Some(limit); - } - } - None - }); - - let mut prioritization_fees = tx.message.instructions().iter().find_map(|i| { - if i.program_id(tx.message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = - try_from_slice_unchecked(i.data.as_slice()) - { - return Some(price); - } - } - - None - }); - - if let Some((units, additional_fee)) = legacy_compute_budget { - cu_requested = Some(units); - if additional_fee > 0 { - prioritization_fees = Some(((units * 1000) / additional_fee).into()) - } - }; - - Some(TransactionInfo { - signature, - err, - cu_requested, - prioritization_fees, - cu_consumed, - }) - }) - .collect(); - - let leader_id = if let Some(rewards) = block.rewards { - rewards - .iter() - .find(|reward| Some(RewardType::Fee) == reward.reward_type) - .map(|leader_reward| leader_reward.pubkey.clone()) - } else { - None - }; - - let block_time = block.block_time.unwrap_or(0) as u64; - - Some(ProducedBlock { - txs, - block_height, - leader_id, - blockhash, - parent_slot, - block_time, - slot, - commitment_config, - }) } pub fn poll_block( diff --git a/core/src/commitment_utils.rs b/core/src/commitment_utils.rs new file mode 100644 index 00000000..a3749bd1 --- /dev/null +++ b/core/src/commitment_utils.rs @@ -0,0 +1,46 @@ +use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[repr(C)] +pub enum Commitment { + Processed = 0, + Confirmed = 1, + Finalized = 2, +} + +impl From for Commitment { + #[allow(deprecated)] + fn from(value: CommitmentLevel) -> Self { + match value { + CommitmentLevel::Finalized | CommitmentLevel::Root | CommitmentLevel::Max => { + Commitment::Finalized + } + CommitmentLevel::Confirmed + | CommitmentLevel::Single + | CommitmentLevel::SingleGossip => Commitment::Confirmed, + CommitmentLevel::Processed | CommitmentLevel::Recent => Commitment::Processed, + } + } +} + +impl From for Commitment { + fn from(value: CommitmentConfig) -> Self { + value.commitment.into() + } +} + +impl Commitment { + pub fn into_commitment_level(&self) -> CommitmentLevel { + match self { + Commitment::Confirmed => CommitmentLevel::Confirmed, + Commitment::Processed => CommitmentLevel::Processed, + Commitment::Finalized => CommitmentLevel::Finalized, + } + } + + pub fn into_commiment_config(&self) -> CommitmentConfig { + CommitmentConfig { + commitment: self.into_commitment_level(), + } + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 9fd528b4..46073b61 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,3 +1,4 @@ +pub mod commitment_utils; pub mod keypair_loader; pub mod quic_connection; pub mod quic_connection_utils; diff --git a/core/src/stores/data_cache.rs b/core/src/stores/data_cache.rs index d0dd4fe4..11b5bd5e 100644 --- a/core/src/stores/data_cache.rs +++ b/core/src/stores/data_cache.rs @@ -28,7 +28,7 @@ pub struct SlotCache { /// The central data store for all data from the cluster. #[derive(Clone)] pub struct DataCache { - pub block_store: BlockInformationStore, + pub block_information_store: BlockInformationStore, pub txs: TxStore, pub tx_subs: SubscriptionStore, pub slot_cache: SlotCache, @@ -39,10 +39,10 @@ pub struct DataCache { impl DataCache { pub async fn clean(&self, ttl_duration: std::time::Duration) { let block_info = self - .block_store + .block_information_store .get_latest_block_info(CommitmentConfig::finalized()) .await; - self.block_store.clean().await; + self.block_information_store.clean().await; self.txs.clean(block_info.block_height); self.tx_subs.clean(ttl_duration); @@ -55,7 +55,7 @@ impl DataCache { self.txs .is_transaction_confirmed(&sent_transaction_info.signature) || self - .block_store + .block_information_store .get_latest_block(CommitmentConfig::processed()) .await .block_height @@ -64,7 +64,7 @@ impl DataCache { pub fn new_for_tests() -> Self { Self { - block_store: BlockInformationStore::new(BlockInformation { + block_information_store: BlockInformationStore::new(BlockInformation { block_height: 0, blockhash: Hash::new_unique().to_string(), cleanup_slot: 1000, diff --git a/core/src/structures/produced_block.rs b/core/src/structures/produced_block.rs index cd80bf45..586f7eb3 100644 --- a/core/src/structures/produced_block.rs +++ b/core/src/structures/produced_block.rs @@ -1,5 +1,12 @@ use solana_sdk::{ - commitment_config::CommitmentConfig, slot_history::Slot, transaction::TransactionError, + borsh0_10::try_from_slice_unchecked, + commitment_config::CommitmentConfig, + compute_budget::{self, ComputeBudgetInstruction}, + slot_history::Slot, + transaction::TransactionError, +}; +use solana_transaction_status::{ + option_serializer::OptionSerializer, RewardType, UiConfirmedBlock, UiTransactionStatusMeta, }; #[derive(Debug, Clone)] @@ -22,3 +29,124 @@ pub struct ProducedBlock { pub block_time: u64, pub commitment_config: CommitmentConfig, } + +impl ProducedBlock { + pub fn from_ui_block( + block: UiConfirmedBlock, + slot: Slot, + commitment_config: CommitmentConfig, + ) -> Self { + let block_height = block.block_height.unwrap_or_default(); + let txs = block.transactions.unwrap_or_default(); + + let blockhash = block.blockhash; + let parent_slot = block.parent_slot; + + let txs = txs + .into_iter() + .filter_map(|tx| { + let Some(UiTransactionStatusMeta { + err, + compute_units_consumed, + .. + }) = tx.meta + else { + // ignoring transaction + log::info!("Tx with no meta"); + return None; + }; + + let Some(tx) = tx.transaction.decode() else { + // ignoring transaction + log::info!("Tx could not be decoded"); + return None; + }; + + let signature = tx.signatures[0].to_string(); + let cu_consumed = match compute_units_consumed { + OptionSerializer::Some(cu_consumed) => Some(cu_consumed), + _ => None, + }; + + let legacy_compute_budget = tx.message.instructions().iter().find_map(|i| { + if i.program_id(tx.message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { + units, + additional_fee, + }) = try_from_slice_unchecked(i.data.as_slice()) + { + return Some((units, additional_fee)); + } + } + None + }); + + let mut cu_requested = tx.message.instructions().iter().find_map(|i| { + if i.program_id(tx.message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(limit); + } + } + None + }); + + let mut prioritization_fees = tx.message.instructions().iter().find_map(|i| { + if i.program_id(tx.message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(price); + } + } + + None + }); + + if let Some((units, additional_fee)) = legacy_compute_budget { + cu_requested = Some(units); + if additional_fee > 0 { + prioritization_fees = Some(((units * 1000) / additional_fee).into()) + } + }; + + Some(TransactionInfo { + signature, + err, + cu_requested, + prioritization_fees, + cu_consumed, + }) + }) + .collect(); + + let leader_id = if let Some(rewards) = block.rewards { + rewards + .iter() + .find(|reward| Some(RewardType::Fee) == reward.reward_type) + .map(|leader_reward| leader_reward.pubkey.clone()) + } else { + None + }; + + let block_time = block.block_time.unwrap_or(0) as u64; + + ProducedBlock { + txs, + block_height, + leader_id, + blockhash, + parent_slot, + block_time, + slot, + commitment_config, + } + } +} diff --git a/core/src/traits/block_storage_interface.rs b/core/src/traits/block_storage_interface.rs index 57306c37..1db772e2 100644 --- a/core/src/traits/block_storage_interface.rs +++ b/core/src/traits/block_storage_interface.rs @@ -1,12 +1,17 @@ +use crate::structures::produced_block::ProducedBlock; use async_trait::async_trait; -use solana_sdk::{commitment_config::CommitmentLevel, slot_history::Slot}; -use solana_transaction_status::UiConfirmedBlock; -use std::sync::Arc; +use solana_rpc_client_api::config::RpcBlockConfig; +use solana_sdk::slot_history::Slot; +use std::{ops::Range, sync::Arc}; #[async_trait] pub trait BlockStorageInterface: Send + Sync { - async fn save(&self, slot: Slot, block: UiConfirmedBlock, commitment: CommitmentLevel); - async fn get(&self, slot: Slot) -> Option; + // will save a block + async fn save(&self, block: ProducedBlock); + // will get a block + async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Option; + // will get range of slots that are stored in the storage + async fn get_slot_range(&self) -> Range; } pub type BlockStorageImpl = Arc; diff --git a/history/Cargo.toml b/history/Cargo.toml new file mode 100644 index 00000000..65fdf78d --- /dev/null +++ b/history/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "solana-lite-rpc-history" +version = "0.2.3" +edition = "2021" +description = "History implementations used by solana lite rpc" +rust-version = "1.70.0" +repository = "https://github.com/blockworks-foundation/lite-rpc" +license = "AGPL" + +[dependencies] +solana-sdk = { workspace = true } +solana-transaction-status = { workspace = true } +solana-rpc-client = { workspace = true } + +dashmap = {workspace = true} +async-trait = { workspace = true } +tokio = "1.*" + +solana-lite-rpc-core = {workspace = true} +solana-rpc-client-api = {workspace = true} diff --git a/history/src/block_stores/inmemory_block_store.rs b/history/src/block_stores/inmemory_block_store.rs new file mode 100644 index 00000000..d97b28c0 --- /dev/null +++ b/history/src/block_stores/inmemory_block_store.rs @@ -0,0 +1,76 @@ +use async_trait::async_trait; +use solana_lite_rpc_core::{ + commitment_utils::Commitment, structures::produced_block::ProducedBlock, + traits::block_storage_interface::BlockStorageInterface, +}; +use solana_rpc_client_api::config::RpcBlockConfig; +use solana_sdk::slot_history::Slot; +use std::{collections::BTreeMap, ops::Range}; +use tokio::sync::RwLock; + +pub struct InmemoryBlockStore { + block_storage: RwLock>, + number_of_blocks_to_store: usize, +} + +impl InmemoryBlockStore { + pub fn new(number_of_blocks_to_store: usize) -> Self { + Self { + number_of_blocks_to_store, + block_storage: RwLock::new(BTreeMap::new()), + } + } + + pub async fn store(&self, block: ProducedBlock) { + let slot = block.slot; + let mut block_storage = self.block_storage.write().await; + let min_slot = match block_storage.first_key_value() { + Some((slot, _)) => *slot, + None => 0, + }; + if slot >= min_slot { + // overwrite block only if confirmation has changed + match block_storage.get_mut(&slot) { + Some(x) => { + let commitment_store = Commitment::from(x.commitment_config); + let commitment_block = Commitment::from(block.commitment_config); + let overwrite = commitment_block > commitment_store; + if overwrite { + *x = block; + } + } + None => { + block_storage.insert(slot, block); + } + } + if block_storage.len() > self.number_of_blocks_to_store { + block_storage.remove(&min_slot); + } + } + } +} + +#[async_trait] +impl BlockStorageInterface for InmemoryBlockStore { + async fn save(&self, block: ProducedBlock) { + self.store(block).await; + } + + async fn get(&self, slot: Slot, _: RpcBlockConfig) -> Option { + self.block_storage.read().await.get(&slot).cloned() + } + + async fn get_slot_range(&self) -> Range { + let lk = self.block_storage.read().await; + let first = lk.first_key_value(); + let last = lk.last_key_value(); + if let Some((first_slot, _)) = first { + let Some((last_slot, _)) = last else { + return Range::default(); + }; + *first_slot..(*last_slot + 1) + } else { + Range::default() + } + } +} diff --git a/history/src/block_stores/mod.rs b/history/src/block_stores/mod.rs new file mode 100644 index 00000000..c03e85de --- /dev/null +++ b/history/src/block_stores/mod.rs @@ -0,0 +1,2 @@ +pub mod inmemory_block_store; +pub mod multiple_strategy_block_store; diff --git a/history/src/block_stores/multiple_strategy_block_store.rs b/history/src/block_stores/multiple_strategy_block_store.rs new file mode 100644 index 00000000..2e80dc12 --- /dev/null +++ b/history/src/block_stores/multiple_strategy_block_store.rs @@ -0,0 +1,139 @@ +// A mixed block store, +// Stores confirmed blocks in memory +// Finalized blocks in long term storage of your choice +// Fetches legacy blocks from faithful + +use crate::block_stores::inmemory_block_store::InmemoryBlockStore; +use async_trait::async_trait; +use solana_lite_rpc_core::{ + commitment_utils::Commitment, + structures::produced_block::ProducedBlock, + traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface}, +}; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_rpc_client_api::config::RpcBlockConfig; +use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot}; +use std::{ + ops::Range, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; + +pub struct MultipleStrategyBlockStorage { + inmemory_for_storage: InmemoryBlockStore, // for confirmed blocks + persistent_block_storage: BlockStorageImpl, // for persistent block storage + faithful_rpc_client: Option>, // to fetch legacy blocks from faithful + last_confirmed_slot: Arc, +} + +impl MultipleStrategyBlockStorage { + pub fn new( + persistent_block_storage: BlockStorageImpl, + faithful_rpc_client: Option>, + number_of_slots_in_memory: usize, + ) -> Self { + Self { + inmemory_for_storage: InmemoryBlockStore::new(number_of_slots_in_memory), + persistent_block_storage, + faithful_rpc_client, + last_confirmed_slot: Arc::new(AtomicU64::new(0)), + } + } + + pub async fn get_in_memory_block(&self, slot: Slot) -> Option { + self.inmemory_for_storage + .get( + slot, + RpcBlockConfig { + encoding: None, + transaction_details: None, + rewards: None, + commitment: None, + max_supported_transaction_version: None, + }, + ) + .await + } +} + +#[async_trait] +impl BlockStorageInterface for MultipleStrategyBlockStorage { + async fn save(&self, block: ProducedBlock) { + let slot = block.slot; + let commitment = Commitment::from(block.commitment_config); + match commitment { + Commitment::Confirmed | Commitment::Processed => { + self.inmemory_for_storage.save(block).await; + } + Commitment::Finalized => { + let block_in_mem = self.get_in_memory_block(block.slot).await; + match block_in_mem { + Some(block_in_mem) => { + // check if inmemory blockhash is same as finalized, update it if they are not + // we can have two machines with same identity publishing two different blocks on same slot + if block_in_mem.blockhash != block.blockhash { + self.inmemory_for_storage.save(block.clone()).await; + } + } + None => self.inmemory_for_storage.save(block.clone()).await, + } + self.persistent_block_storage.save(block).await; + } + }; + if slot > self.last_confirmed_slot.load(Ordering::Relaxed) { + self.last_confirmed_slot.store(slot, Ordering::Relaxed); + } + } + + async fn get( + &self, + slot: solana_sdk::slot_history::Slot, + config: RpcBlockConfig, + ) -> Option { + let last_confirmed_slot = self.last_confirmed_slot.load(Ordering::Relaxed); + if slot > last_confirmed_slot { + None + } else { + let range = self.inmemory_for_storage.get_slot_range().await; + if range.contains(&slot) { + let block = self.inmemory_for_storage.get(slot, config).await; + if block.is_some() { + return block; + } + } + // TODO: Define what data is expected that is definetly not in persistant block storage like data after epoch - 1 + // check persistant block + let persistent_block_range = self.persistent_block_storage.get_slot_range().await; + if persistent_block_range.contains(&slot) { + self.persistent_block_storage.get(slot, config).await + } else if let Some(faithful_rpc_client) = self.faithful_rpc_client.clone() { + match faithful_rpc_client + .get_block_with_config(slot, config) + .await + { + Ok(block) => Some(ProducedBlock::from_ui_block( + block, + slot, + CommitmentConfig::finalized(), + )), + Err(_) => None, + } + } else { + None + } + } + } + + async fn get_slot_range(&self) -> Range { + let in_memory = self.inmemory_for_storage.get_slot_range().await; + // if faithful is available we assume that we have all the blocks + if self.faithful_rpc_client.is_some() { + 0..in_memory.end + } else { + let persistent_storage_range = self.persistent_block_storage.get_slot_range().await; + persistent_storage_range.start..in_memory.end + } + } +} diff --git a/history/src/history.rs b/history/src/history.rs new file mode 100644 index 00000000..cf806806 --- /dev/null +++ b/history/src/history.rs @@ -0,0 +1,6 @@ +use solana_lite_rpc_core::traits::block_storage_interface::BlockStorageInterface; +use std::sync::Arc; + +pub struct History { + pub block_storage: Arc, +} diff --git a/history/src/lib.rs b/history/src/lib.rs new file mode 100644 index 00000000..164ffbdc --- /dev/null +++ b/history/src/lib.rs @@ -0,0 +1,2 @@ +pub mod block_stores; +pub mod history; diff --git a/history/tests/inmemory_block_store_tests.rs b/history/tests/inmemory_block_store_tests.rs new file mode 100644 index 00000000..5d7a69eb --- /dev/null +++ b/history/tests/inmemory_block_store_tests.rs @@ -0,0 +1,54 @@ +use solana_lite_rpc_core::{ + structures::produced_block::ProducedBlock, + traits::block_storage_interface::BlockStorageInterface, +}; +use solana_lite_rpc_history::block_stores::inmemory_block_store::InmemoryBlockStore; +use solana_rpc_client_api::config::RpcBlockConfig; +use solana_sdk::{commitment_config::CommitmentConfig, hash::Hash}; +use std::sync::Arc; + +pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> ProducedBlock { + ProducedBlock { + block_height: slot, + blockhash: Hash::new_unique().to_string(), + parent_slot: slot - 1, + txs: vec![], + block_time: 0, + commitment_config, + leader_id: None, + slot, + } +} + +#[tokio::test] +async fn inmemory_block_store_tests() { + // will store only 10 blocks + let store: Arc = Arc::new(InmemoryBlockStore::new(10)); + + // add 10 blocks + for i in 1..11 { + store + .save(create_test_block(i, CommitmentConfig::finalized())) + .await; + } + + // check if 10 blocks are added + for i in 1..11 { + assert!(store.get(i, RpcBlockConfig::default()).await.is_some()); + } + // add 11th block + store + .save(create_test_block(11, CommitmentConfig::finalized())) + .await; + + // can get 11th block + assert!(store.get(11, RpcBlockConfig::default()).await.is_some()); + // first block is removed + assert!(store.get(1, RpcBlockConfig::default()).await.is_none()); + + // cannot add old blocks + store + .save(create_test_block(1, CommitmentConfig::finalized())) + .await; + assert!(store.get(1, RpcBlockConfig::default()).await.is_none()); +} diff --git a/history/tests/mod.rs b/history/tests/mod.rs new file mode 100644 index 00000000..835c2e28 --- /dev/null +++ b/history/tests/mod.rs @@ -0,0 +1,2 @@ +mod inmemory_block_store_tests; +mod multiple_strategy_block_store_tests; diff --git a/history/tests/multiple_strategy_block_store_tests.rs b/history/tests/multiple_strategy_block_store_tests.rs new file mode 100644 index 00000000..f1bf6981 --- /dev/null +++ b/history/tests/multiple_strategy_block_store_tests.rs @@ -0,0 +1,139 @@ +use solana_lite_rpc_core::{ + structures::produced_block::ProducedBlock, + traits::block_storage_interface::BlockStorageInterface, +}; +use solana_lite_rpc_history::{ + block_stores::inmemory_block_store::InmemoryBlockStore, + block_stores::multiple_strategy_block_store::MultipleStrategyBlockStorage, +}; +use solana_rpc_client_api::config::RpcBlockConfig; +use solana_sdk::{commitment_config::CommitmentConfig, hash::Hash}; +use std::sync::Arc; + +pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> ProducedBlock { + ProducedBlock { + block_height: slot, + blockhash: Hash::new_unique().to_string(), + parent_slot: slot - 1, + txs: vec![], + block_time: 0, + commitment_config, + leader_id: None, + slot, + } +} + +#[tokio::test] +async fn test_in_multiple_stategy_block_store() { + let persistent_store: Arc = Arc::new(InmemoryBlockStore::new(10)); + let number_of_slots_in_memory = 3; + let block_storage = MultipleStrategyBlockStorage::new( + persistent_store.clone(), + None, + number_of_slots_in_memory, + ); + + block_storage + .save(create_test_block(1235, CommitmentConfig::confirmed())) + .await; + block_storage + .save(create_test_block(1236, CommitmentConfig::confirmed())) + .await; + + assert!(block_storage + .get(1235, RpcBlockConfig::default()) + .await + .is_some()); + assert!(block_storage + .get(1236, RpcBlockConfig::default()) + .await + .is_some()); + assert!(persistent_store + .get(1235, RpcBlockConfig::default()) + .await + .is_none()); + assert!(persistent_store + .get(1236, RpcBlockConfig::default()) + .await + .is_none()); + + block_storage + .save(create_test_block(1235, CommitmentConfig::finalized())) + .await; + block_storage + .save(create_test_block(1236, CommitmentConfig::finalized())) + .await; + block_storage + .save(create_test_block(1237, CommitmentConfig::finalized())) + .await; + + assert!(block_storage + .get(1235, RpcBlockConfig::default()) + .await + .is_some()); + assert!(block_storage + .get(1236, RpcBlockConfig::default()) + .await + .is_some()); + assert!(block_storage + .get(1237, RpcBlockConfig::default()) + .await + .is_some()); + assert!(persistent_store + .get(1235, RpcBlockConfig::default()) + .await + .is_some()); + assert!(persistent_store + .get(1236, RpcBlockConfig::default()) + .await + .is_some()); + assert!(persistent_store + .get(1237, RpcBlockConfig::default()) + .await + .is_some()); + assert!(block_storage.get_in_memory_block(1237).await.is_some()); + + // blocks are replaced by finalized blocks + assert_eq!( + persistent_store + .get(1235, RpcBlockConfig::default()) + .await + .unwrap() + .blockhash, + block_storage + .get_in_memory_block(1235) + .await + .unwrap() + .blockhash + ); + assert_eq!( + persistent_store + .get(1236, RpcBlockConfig::default()) + .await + .unwrap() + .blockhash, + block_storage + .get_in_memory_block(1236) + .await + .unwrap() + .blockhash + ); + assert_eq!( + persistent_store + .get(1237, RpcBlockConfig::default()) + .await + .unwrap() + .blockhash, + block_storage + .get_in_memory_block(1237) + .await + .unwrap() + .blockhash + ); + + // no block yet added returns none + assert!(block_storage + .get(1238, RpcBlockConfig::default()) + .await + .is_none()); +} diff --git a/lite-rpc/Cargo.toml b/lite-rpc/Cargo.toml index a1e71e9d..192b6d9a 100644 --- a/lite-rpc/Cargo.toml +++ b/lite-rpc/Cargo.toml @@ -37,13 +37,15 @@ lazy_static = { workspace = true } dotenv = { workspace = true } async-channel = { workspace = true } quinn = { workspace = true } -solana-lite-rpc-core = { workspace = true } -solana-lite-rpc-services = { workspace = true } -solana-lite-rpc-cluster-endpoints = { workspace = true } async-trait = { workspace = true } tokio = { version = "1.28.2", features = ["full", "fs"]} tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] } chrono = { workspace = true } +solana-lite-rpc-core = { workspace = true } +solana-lite-rpc-services = { workspace = true } +solana-lite-rpc-cluster-endpoints = { workspace = true } +solana-lite-rpc-history = { workspace = true } + [dev-dependencies] bench = { path = "../bench" } diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index fc66b06a..366daba6 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -16,13 +16,17 @@ use solana_lite_rpc_core::{ stores::{block_information_store::BlockInformation, data_cache::DataCache, tx_store::TxProps}, AnyhowJoinHandle, }; +use solana_lite_rpc_history::history::History; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client_api::{ - config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig}, + config::{ + RpcBlockConfig, RpcContextConfig, RpcEncodingConfigWrapper, RpcRequestAirdropConfig, + RpcSignatureStatusConfig, + }, response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo}, }; use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot}; -use solana_transaction_status::TransactionStatus; +use solana_transaction_status::{TransactionStatus, UiConfirmedBlock}; use std::{str::FromStr, sync::Arc}; use tokio::net::ToSocketAddrs; @@ -49,6 +53,7 @@ pub struct LiteBridge { // should be removed rpc_client: Arc, transaction_service: TransactionService, + history: History, } impl LiteBridge { @@ -56,11 +61,13 @@ impl LiteBridge { rpc_client: Arc, data_cache: DataCache, transaction_service: TransactionService, + history: History, ) -> Self { Self { rpc_client, data_cache, transaction_service, + history, } } @@ -159,7 +166,7 @@ impl LiteRpcServer for LiteBridge { .. } = self .data_cache - .block_store + .block_information_store .get_latest_block(commitment_config) .await; @@ -189,7 +196,7 @@ impl LiteRpcServer for LiteBridge { let (is_valid, slot) = self .data_cache - .block_store + .block_information_store .is_blockhash_valid(&blockhash, commitment) .await; @@ -218,7 +225,7 @@ impl LiteRpcServer for LiteBridge { context: RpcResponseContext { slot: self .data_cache - .block_store + .block_information_store .get_latest_block_info(CommitmentConfig::finalized()) .await .slot, @@ -288,7 +295,7 @@ impl LiteRpcServer for LiteBridge { let BlockInformation { slot, .. } = self .data_cache - .block_store + .block_information_store .get_latest_block(commitment_config) .await; Ok(slot) @@ -312,4 +319,19 @@ impl LiteRpcServer for LiteBridge { Ok(()) } + + async fn get_block( + &self, + slot: u64, + config: Option>, + ) -> crate::rpc::Result> { + let config = config.map_or(RpcBlockConfig::default(), |x| x.convert_to_current()); + let block = self.history.block_storage.get(slot, config).await; + if block.is_some() { + // TO DO Convert to UIConfirmed Block + Err(jsonrpsee::core::Error::HttpNotImplemented) + } else { + Ok(None) + } + } } diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 77113717..876882ef 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -29,6 +29,8 @@ use solana_lite_rpc_core::structures::{ }; use solana_lite_rpc_core::types::BlockStream; use solana_lite_rpc_core::AnyhowJoinHandle; +use solana_lite_rpc_history::block_stores::inmemory_block_store::InmemoryBlockStore; +use solana_lite_rpc_history::history::History; use solana_lite_rpc_services::data_caching_service::DataCachingService; use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath; use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig}; @@ -117,9 +119,10 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc) -> anyhow::R let finalized_block = get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await; - let block_store = BlockInformationStore::new(BlockInformation::from_block(&finalized_block)); + let block_information_store = + BlockInformationStore::new(BlockInformation::from_block(&finalized_block)); let data_cache = DataCache { - block_store, + block_information_store, cluster_info: ClusterInfo::default(), identity_stakes: IdentityStakes::new(validator_identity.pubkey()), slot_cache: SlotCache::new(finalized_block.slot), @@ -187,9 +190,18 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc) -> anyhow::R let support_service = tokio::spawn(async move { spawner.spawn_support_services().await }); + let history = History { + block_storage: Arc::new(InmemoryBlockStore::new(1024)), + }; + let bridge_service = tokio::spawn( - LiteBridge::new(rpc_client.clone(), data_cache.clone(), transaction_service) - .start(lite_rpc_http_addr, lite_rpc_ws_addr), + LiteBridge::new( + rpc_client.clone(), + data_cache.clone(), + transaction_service, + history, + ) + .start(lite_rpc_http_addr, lite_rpc_ws_addr), ); tokio::select! { res = tx_service_jh => { diff --git a/lite-rpc/src/rpc.rs b/lite-rpc/src/rpc.rs index 03e8bebb..2467aa3d 100644 --- a/lite-rpc/src/rpc.rs +++ b/lite-rpc/src/rpc.rs @@ -1,12 +1,13 @@ use jsonrpsee::core::SubscriptionResult; use jsonrpsee::proc_macros::rpc; use solana_rpc_client_api::config::{ - RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig, + RpcBlockConfig, RpcContextConfig, RpcEncodingConfigWrapper, RpcRequestAirdropConfig, + RpcSignatureStatusConfig, }; use solana_rpc_client_api::response::{Response as RpcResponse, RpcBlockhash, RpcVersionInfo}; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::slot_history::Slot; -use solana_transaction_status::TransactionStatus; +use solana_transaction_status::{TransactionStatus, UiConfirmedBlock}; use crate::configs::{IsBlockHashValidConfig, SendTransactionConfig}; @@ -61,4 +62,11 @@ pub trait LiteRpc { signature: String, commitment_config: CommitmentConfig, ) -> SubscriptionResult; + + #[method(name = "getBlock")] + async fn get_block( + &self, + slot: u64, + config: Option>, + ) -> Result>; } diff --git a/lite-rpc/src/service_spawner.rs b/lite-rpc/src/service_spawner.rs index aa5b28f7..69669509 100644 --- a/lite-rpc/src/service_spawner.rs +++ b/lite-rpc/src/service_spawner.rs @@ -77,7 +77,7 @@ impl ServiceSpawner { ); service_builder.start( notifier, - self.data_cache.block_store.clone(), + self.data_cache.block_information_store.clone(), max_retries, slot_notifications, ) diff --git a/services/src/data_caching_service.rs b/services/src/data_caching_service.rs index 659d9812..9f881d69 100644 --- a/services/src/data_caching_service.rs +++ b/services/src/data_caching_service.rs @@ -53,7 +53,7 @@ impl DataCachingService { let block = block_notifier.recv().await.expect("Should recv blocks"); data_cache - .block_store + .block_information_store .add_block(BlockInformation::from_block(&block)) .await; diff --git a/services/src/transaction_service.rs b/services/src/transaction_service.rs index bf77e4f9..4a1af7f7 100644 --- a/services/src/transaction_service.rs +++ b/services/src/transaction_service.rs @@ -50,7 +50,7 @@ impl TransactionServiceBuilder { pub fn start( self, notifier: Option, - block_store: BlockInformationStore, + block_information_store: BlockInformationStore, max_retries: usize, slot_notifications: SlotStream, ) -> (TransactionService, AnyhowJoinHandle) { @@ -89,7 +89,7 @@ impl TransactionServiceBuilder { TransactionService { transaction_channel, replay_channel, - block_store, + block_information_store, max_retries, replay_offset: self.tx_replayer.retry_offset, }, @@ -102,7 +102,7 @@ impl TransactionServiceBuilder { pub struct TransactionService { pub transaction_channel: Sender, pub replay_channel: UnboundedSender, - pub block_store: BlockInformationStore, + pub block_information_store: BlockInformationStore, pub max_retries: usize, pub replay_offset: Duration, } @@ -126,7 +126,7 @@ impl TransactionService { last_valid_blockheight, .. }) = self - .block_store + .block_information_store .get_block_info(&tx.get_recent_blockhash().to_string()) else { bail!("Blockhash not found in block store".to_string());