diff --git a/Cargo.lock b/Cargo.lock index a6e40117..2b4d502c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -550,9 +550,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.3" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "414dcefbc63d77c526a76b3afcf6fbb9b5e2791c19c3aa2297733208750c6e53" +checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" [[package]] name = "base64ct" @@ -1615,9 +1615,9 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" [[package]] name = "form_urlencoded" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] @@ -1768,6 +1768,29 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "geyser-grpc-connector" +version = "0.7.0+yellowstone.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e94e6b95f0c048f9062479859095cf4fd28977c8dddc897360c6429ae392589f" +dependencies = [ + "anyhow", + "async-stream", + "base64 0.21.5", + "bincode", + "derive_more", + "futures", + "itertools", + "log", + "merge-streams", + "solana-sdk", + "tokio", + "tracing", + "url", + "yellowstone-grpc-client", + "yellowstone-grpc-proto", +] + [[package]] name = "gimli" version = "0.28.0" @@ -2081,9 +2104,9 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -2439,7 +2462,7 @@ dependencies = [ "anyhow", "async-channel", "async-trait", - "base64 0.21.3", + "base64 0.21.5", "bench", "bincode", "bs58", @@ -2547,6 +2570,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "merge-streams" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f84f6452969abd246e7ac1fe4fe75906c76e8ec88d898df9aef37e0f3b6a7c2" +dependencies = [ + "futures-core", + "pin-project", +] + [[package]] name = "merlin" version = "3.0.0" @@ -2975,9 +3008,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "percentage" @@ -3108,7 +3141,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" dependencies = [ - "base64 0.21.3", + "base64 0.21.5", "byteorder", "bytes", "fallible-iterator", @@ -3552,7 +3585,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" dependencies = [ "async-compression", - "base64 0.21.3", + "base64 0.21.5", "bytes", "encoding_rs", "futures-core", @@ -3707,7 +3740,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" dependencies = [ - "base64 0.21.3", + "base64 0.21.5", ] [[package]] @@ -4070,7 +4103,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "121e55656c2094950f374247e1303dd09517f1ed49c91bf60bf114760b286eb4" dependencies = [ "Inflector", - "base64 0.21.3", + "base64 0.21.5", "bincode", "bs58", "bv", @@ -4247,7 +4280,7 @@ dependencies = [ "anyhow", "async-channel", "async-trait", - "base64 0.21.3", + "base64 0.21.5", "bincode", "bs58", "bytes", @@ -4255,9 +4288,11 @@ dependencies = [ "dashmap", "derive_more", "futures", + "geyser-grpc-connector", "itertools", "lazy_static", "log", + "merge-streams", "prometheus", "quinn", "rustls 0.20.9", @@ -4284,7 +4319,7 @@ version = "0.2.3" dependencies = [ "anyhow", "async-trait", - "base64 0.21.3", + "base64 0.21.5", "bincode", "bs58", "bytes", @@ -4317,7 +4352,7 @@ version = "0.2.3" dependencies = [ "anyhow", "async-trait", - "base64 0.21.3", + "base64 0.21.5", "bincode", "chrono", "dashmap", @@ -4342,7 +4377,7 @@ dependencies = [ "anyhow", "async-channel", "async-trait", - "base64 0.21.3", + "base64 0.21.5", "bincode", "bs58", "bytes", @@ -4380,7 +4415,7 @@ dependencies = [ "anyhow", "async-channel", "async-trait", - "base64 0.21.3", + "base64 0.21.5", "bincode", "bs58", "bytes", @@ -4421,7 +4456,7 @@ dependencies = [ "anyhow", "async-channel", "async-trait", - "base64 0.21.3", + "base64 0.21.5", "bincode", "bs58", "bytes", @@ -4548,7 +4583,7 @@ dependencies = [ "ark-ff", "ark-serialize", "array-bytes", - "base64 0.21.3", + "base64 0.21.5", "bincode", "bitflags 1.3.2", "blake3", @@ -4598,7 +4633,7 @@ version = "1.16.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "036d6ecf67a3a7c6dc74d4f7fa6ab321e7ce8feccb7c9dff8384a41d0a12345b" dependencies = [ - "base64 0.21.3", + "base64 0.21.5", "bincode", "eager", "enum-iterator", @@ -4709,7 +4744,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc51a85c6ff03bb4a3e1fde1e36dcb553b990f2b3e66aed941a31a6a7c20fa33" dependencies = [ "async-trait", - "base64 0.21.3", + "base64 0.21.5", "bincode", "bs58", "indicatif", @@ -4734,7 +4769,7 @@ version = "1.16.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6756a1f89f509154644a958869c7cc6c70cc622f44faddf9b94612d8d2d8eed5" dependencies = [ - "base64 0.21.3", + "base64 0.21.5", "bs58", "jsonrpc-core", "reqwest", @@ -4770,7 +4805,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4106cda3d10833ba957dbd25fb841b50aeca7480ccf8f54859294716f54bcd4b" dependencies = [ "assert_matches", - "base64 0.21.3", + "base64 0.21.5", "bincode", "bitflags 1.3.2", "borsh 0.10.3", @@ -4909,7 +4944,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "236dd4e43b8a7402bce250228e04c0c68d9493a3e19c71b377ccc7c4390fd969" dependencies = [ "Inflector", - "base64 0.21.3", + "base64 0.21.5", "bincode", "borsh 0.10.3", "bs58", @@ -4988,7 +5023,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "278c08e13bc04b6940997602909052524a375154b00cf0bfa934359a3bb7e6f0" dependencies = [ "aes-gcm-siv", - "base64 0.21.3", + "base64 0.21.5", "bincode", "bytemuck", "byteorder", @@ -5639,7 +5674,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64 0.21.3", + "base64 0.21.5", "bytes", "flate2", "h2", @@ -5893,9 +5928,9 @@ dependencies = [ [[package]] name = "url" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", "idna", diff --git a/cluster-endpoints/Cargo.toml b/cluster-endpoints/Cargo.toml index 7345d557..b105dfeb 100644 --- a/cluster-endpoints/Cargo.toml +++ b/cluster-endpoints/Cargo.toml @@ -8,6 +8,10 @@ repository = "https://github.com/blockworks-foundation/lite-rpc" license = "AGPL" [dependencies] +#geyser-grpc-connector = { path = "../../geyser-grpc-connector" } +#geyser-grpc-connector = { tag = "v0.5.0+yellowstone.1.11+solana.1.16.17", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" } +geyser-grpc-connector = "0.7.0+yellowstone.1.11" + solana-sdk = { workspace = true } solana-rpc-client-api = { workspace = true } solana-transaction-status = { workspace = true } @@ -24,6 +28,7 @@ bs58 = { workspace = true } base64 = { workspace = true } thiserror = { workspace = true } futures = { workspace = true } +merge-streams = "0.1.2" bytes = { workspace = true } anyhow = { workspace = true } log = { workspace = true } diff --git a/cluster-endpoints/src/grpc_inspect.rs b/cluster-endpoints/src/grpc_inspect.rs new file mode 100644 index 00000000..4fecd5d6 --- /dev/null +++ b/cluster-endpoints/src/grpc_inspect.rs @@ -0,0 +1,66 @@ +use log::{debug, warn}; +use solana_lite_rpc_core::types::BlockStream; +use solana_sdk::commitment_config::CommitmentConfig; +use tokio::sync::broadcast::error::RecvError; +use tokio::task::JoinHandle; + +pub fn block_debug_listen( + mut block_notifier: BlockStream, + commitment_config: CommitmentConfig, +) -> JoinHandle<()> { + tokio::spawn(async move { + let mut last_highest_slot_number = 0; + + loop { + match block_notifier.recv().await { + Ok(block) => { + if block.commitment_config != commitment_config { + continue; + } + + debug!( + "Saw block: {} @ {} with {} txs", + block.slot, + block.commitment_config.commitment, + block.transactions.len() + ); + + if last_highest_slot_number != 0 { + if block.parent_slot == last_highest_slot_number { + debug!( + "parent slot is correct ({} -> {})", + block.slot, block.parent_slot + ); + } else { + warn!( + "parent slot not correct ({} -> {})", + block.slot, block.parent_slot + ); + } + } + + if block.slot > last_highest_slot_number { + last_highest_slot_number = block.slot; + } else { + // note: ATM this fails very often (using the RPC poller) + warn!( + "Monotonic check failed - block {} is out of order, last highest was {}", + block.slot, last_highest_slot_number + ); + } + } // -- Ok + Err(RecvError::Lagged(missed_blocks)) => { + warn!( + "Could not keep up with producer - missed {} blocks", + missed_blocks + ); + } + Err(other_err) => { + panic!("Error receiving block: {:?}", other_err); + } + } + + // ... + } + }) +} diff --git a/cluster-endpoints/src/grpc_multiplex.rs b/cluster-endpoints/src/grpc_multiplex.rs new file mode 100644 index 00000000..0f8884f8 --- /dev/null +++ b/cluster-endpoints/src/grpc_multiplex.rs @@ -0,0 +1,157 @@ +use crate::grpc_stream_utils::channelize_stream; +use crate::grpc_subscription::map_block_update; +use geyser_grpc_connector::grpc_subscription_autoreconnect::{ + create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, +}; +use geyser_grpc_connector::grpcmultiplex_fastestwins::{ + create_multiplexed_stream, FromYellowstoneExtractor, +}; +use log::info; +use merge_streams::MergeStreams; +use solana_lite_rpc_core::structures::produced_block::ProducedBlock; +use solana_lite_rpc_core::structures::slot_notification::SlotNotification; +use solana_lite_rpc_core::AnyhowJoinHandle; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use std::collections::HashMap; +use std::time::Duration; +use tokio::sync::broadcast::Receiver; +use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; +use yellowstone_grpc_proto::geyser::{ + SubscribeRequest, SubscribeRequestFilterSlots, SubscribeUpdate, +}; + +struct BlockExtractor(CommitmentConfig); + +impl FromYellowstoneExtractor for BlockExtractor { + type Target = ProducedBlock; + fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> { + match update.update_oneof { + Some(UpdateOneof::Block(update_block_message)) => { + let block = map_block_update(update_block_message, self.0); + Some((block.slot, block)) + } + _ => None, + } + } +} + +pub fn create_grpc_multiplex_blocks_subscription( + grpc_sources: Vec, +) -> (Receiver, AnyhowJoinHandle) { + info!("Setup grpc multiplexed blocks connection..."); + if grpc_sources.is_empty() { + info!("- no grpc connection configured"); + } + for grpc_source in &grpc_sources { + info!("- connection to {}", grpc_source); + } + + let _timeouts = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(5), + request_timeout: Duration::from_secs(5), + subscribe_timeout: Duration::from_secs(5), + }; + + let multiplex_stream_confirmed = { + let commitment_config = CommitmentConfig::confirmed(); + + let mut streams = Vec::new(); + for grpc_source in &grpc_sources { + let stream = create_geyser_reconnecting_stream( + grpc_source.clone(), + GeyserFilter(commitment_config).blocks_and_txs(), + ); + streams.push(stream); + } + + create_multiplexed_stream(streams, BlockExtractor(commitment_config)) + }; + + let multiplex_stream_finalized = { + let commitment_config = CommitmentConfig::finalized(); + + let mut streams = Vec::new(); + for grpc_source in &grpc_sources { + let stream = create_geyser_reconnecting_stream( + grpc_source.clone(), + GeyserFilter(commitment_config).blocks_and_txs(), + ); + streams.push(stream); + } + + create_multiplexed_stream(streams, BlockExtractor(commitment_config)) + }; + + let merged_stream_confirmed_finalize = + (multiplex_stream_confirmed, multiplex_stream_finalized).merge(); + + let (multiplexed_finalized_blocks, jh_channelizer) = + channelize_stream(merged_stream_confirmed_finalize); + + (multiplexed_finalized_blocks, jh_channelizer) +} + +struct SlotExtractor {} + +impl FromYellowstoneExtractor for crate::grpc_multiplex::SlotExtractor { + type Target = SlotNotification; + fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> { + match update.update_oneof { + Some(UpdateOneof::Slot(update_slot_message)) => { + let slot = SlotNotification { + estimated_processed_slot: update_slot_message.slot, + processed_slot: update_slot_message.slot, + }; + Some((update_slot_message.slot, slot)) + } + _ => None, + } + } +} + +pub fn create_grpc_multiplex_slots_subscription( + grpc_sources: Vec, +) -> (Receiver, AnyhowJoinHandle) { + info!("Setup grpc multiplexed slots connection..."); + if grpc_sources.is_empty() { + info!("- no grpc connection configured"); + } + for grpc_source in &grpc_sources { + info!("- connection to {}", grpc_source); + } + + let multiplex_stream = { + let mut streams = Vec::new(); + for grpc_source in &grpc_sources { + let mut slots = HashMap::new(); + slots.insert( + "client".to_string(), + SubscribeRequestFilterSlots { + filter_by_commitment: Some(true), + }, + ); + + let filter = SubscribeRequest { + slots, + accounts: Default::default(), + transactions: HashMap::new(), + entry: Default::default(), + blocks: HashMap::new(), + blocks_meta: HashMap::new(), + commitment: Some(yellowstone_grpc_proto::geyser::CommitmentLevel::Processed as i32), + accounts_data_slice: Default::default(), + ping: None, + }; + + let stream = create_geyser_reconnecting_stream(grpc_source.clone(), filter); + streams.push(stream); + } + + create_multiplexed_stream(streams, SlotExtractor {}) + }; + + let (multiplexed_stream, jh_channelizer) = channelize_stream(multiplex_stream); + + (multiplexed_stream, jh_channelizer) +} diff --git a/cluster-endpoints/src/grpc_stream_utils.rs b/cluster-endpoints/src/grpc_stream_utils.rs new file mode 100644 index 00000000..0589b896 --- /dev/null +++ b/cluster-endpoints/src/grpc_stream_utils.rs @@ -0,0 +1,36 @@ +use futures::{Stream, StreamExt}; +use log::{debug, trace}; +use std::pin::pin; +use tokio::spawn; +use tokio::sync::broadcast::error::SendError; +use tokio::sync::broadcast::Receiver; +use tokio::task::JoinHandle; + +pub fn channelize_stream( + grpc_source_stream: impl Stream + Send + 'static, +) -> (Receiver, JoinHandle>) +where + T: Clone + Send + 'static, +{ + let (tx, multiplexed_messages) = tokio::sync::broadcast::channel::(1000); + + let jh_channelizer = spawn(async move { + let mut source_stream = pin!(grpc_source_stream); + 'main_loop: while let Some(payload) = source_stream.next().await { + match tx.send(payload) { + Ok(receivers) => { + trace!("sent data to {} receivers", receivers); + } + Err(send_error) => match send_error { + SendError(_) => { + debug!("no active receivers - skipping message"); + continue 'main_loop; + } + }, + }; + } + panic!("channelizer task failed"); + }); + + (multiplexed_messages, jh_channelizer) +} diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index 698954bd..13da5faa 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -1,17 +1,18 @@ +use crate::grpc_multiplex::{ + create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_slots_subscription, +}; use crate::{ - endpoint_stremers::EndpointStreaming, + endpoint_stremers::EndpointStreaming, grpc_inspect, rpc_polling::vote_accounts_and_cluster_info_polling::poll_vote_accounts_and_cluster_info, }; -use anyhow::{bail, Context}; +use anyhow::Context; use futures::StreamExt; +use geyser_grpc_connector::grpc_subscription_autoreconnect::GrpcSourceConfig; use itertools::Itertools; use solana_client::nonblocking::rpc_client::RpcClient; use solana_lite_rpc_core::{ encoding::BASE64, - structures::{ - produced_block::{ProducedBlock, TransactionInfo}, - slot_notification::SlotNotification, - }, + structures::produced_block::{ProducedBlock, TransactionInfo}, AnyhowJoinHandle, }; use solana_sdk::{ @@ -32,12 +33,13 @@ use solana_transaction_status::{Reward, RewardType}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::broadcast::Sender; use yellowstone_grpc_client::GeyserGrpcClient; + use yellowstone_grpc_proto::prelude::{ subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, - SubscribeRequestFilterSlots, SubscribeUpdateBlock, + SubscribeUpdateBlock, }; -fn process_block( +pub fn map_block_update( block: SubscribeUpdateBlock, commitment_config: CommitmentConfig, ) -> ProducedBlock { @@ -298,7 +300,7 @@ pub fn create_block_processing_task( match update { UpdateOneof::Block(block) => { - let block = process_block(block, commitment_config); + let block = map_block_update(block, commitment_config); block_sx .send(block) .context("Grpc failed to send a block")?; @@ -319,110 +321,40 @@ pub fn create_block_processing_task( pub fn create_grpc_subscription( rpc_client: Arc, - grpc_addr: String, - grpc_x_token: Option, - expected_grpc_version: String, + grpc_sources: Vec, ) -> anyhow::Result<(EndpointStreaming, Vec)> { - let (slot_sx, slot_notifier) = tokio::sync::broadcast::channel(10); - let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(10); let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10); let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10); - let slot_task: AnyhowJoinHandle = { - let grpc_x_token = grpc_x_token.clone(); - let grpc_addr = grpc_addr.clone(); - tokio::spawn(async move { - loop { - let mut slots = HashMap::new(); - slots.insert( - "client".to_string(), - SubscribeRequestFilterSlots { - filter_by_commitment: Some(true), - }, - ); - // connect to grpc - let mut client = - GeyserGrpcClient::connect(grpc_addr.clone(), grpc_x_token.clone(), None)?; - - let version = client.get_version().await?.version; - if version != expected_grpc_version { - log::warn!( - "Expected grpc version {:?}, got {:?}, continue", - expected_grpc_version, - version - ); - } - let mut stream = client - .subscribe_once( - slots, - Default::default(), - HashMap::new(), - Default::default(), - HashMap::new(), - Default::default(), - Some(CommitmentLevel::Processed), - Default::default(), - None, - ) - .await?; - - while let Some(message) = stream.next().await { - let message = message?; + // processed slot is required to keep up with leader schedule + let (slot_multiplex_channel, jh_multiplex_slotstream) = + create_grpc_multiplex_slots_subscription(grpc_sources.clone()); - let Some(update) = message.update_oneof else { - continue; - }; - - match update { - UpdateOneof::Slot(slot) => { - slot_sx - .send(SlotNotification { - estimated_processed_slot: slot.slot, - processed_slot: slot.slot, - }) - .context("Error sending slot notification")?; - } - UpdateOneof::Ping(_) => { - log::trace!("GRPC Ping"); - } - k => { - bail!("Unexpected update: {k:?}"); - } - }; - } - log::error!("Grpc slot subscription broken (resubscribing)"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }) - }; + let (block_multiplex_channel, jh_multiplex_blockstream) = + create_grpc_multiplex_blocks_subscription(grpc_sources); - let block_confirmed_task: AnyhowJoinHandle = create_block_processing_task( - grpc_addr.clone(), - grpc_x_token.clone(), - block_sx.clone(), - CommitmentLevel::Confirmed, + grpc_inspect::block_debug_listen( + block_multiplex_channel.resubscribe(), + CommitmentConfig::confirmed(), ); - let block_finalized_task: AnyhowJoinHandle = create_block_processing_task( - grpc_addr, - grpc_x_token, - block_sx, - CommitmentLevel::Finalized, + grpc_inspect::block_debug_listen( + block_multiplex_channel.resubscribe(), + CommitmentConfig::finalized(), ); let cluster_info_polling = poll_vote_accounts_and_cluster_info(rpc_client, cluster_info_sx, va_sx); let streamers = EndpointStreaming { - blocks_notifier, - slot_notifier, + blocks_notifier: block_multiplex_channel, + slot_notifier: slot_multiplex_channel, cluster_info_notifier, vote_account_notifier, }; let endpoint_tasks = vec![ - slot_task, - block_confirmed_task, - block_finalized_task, + jh_multiplex_slotstream, + jh_multiplex_blockstream, cluster_info_polling, ]; Ok((streamers, endpoint_tasks)) diff --git a/cluster-endpoints/src/lib.rs b/cluster-endpoints/src/lib.rs index 68b7e363..23060022 100644 --- a/cluster-endpoints/src/lib.rs +++ b/cluster-endpoints/src/lib.rs @@ -1,5 +1,11 @@ pub mod endpoint_stremers; +pub mod grpc_inspect; +pub mod grpc_multiplex; +pub mod grpc_stream_utils; pub mod grpc_subscription; pub mod json_rpc_leaders_getter; pub mod json_rpc_subscription; pub mod rpc_polling; + +pub use geyser_grpc_connector::grpc_subscription_autoreconnect; +pub use yellowstone_grpc_proto::geyser::CommitmentLevel; diff --git a/lite-rpc/src/cli.rs b/lite-rpc/src/cli.rs index 9199f998..c01e5d62 100644 --- a/lite-rpc/src/cli.rs +++ b/lite-rpc/src/cli.rs @@ -42,10 +42,27 @@ pub struct Config { pub quic_proxy_addr: Option, #[serde(default)] pub use_grpc: bool, + #[serde(default = "Config::default_grpc_addr")] pub grpc_addr: String, #[serde(default)] pub grpc_x_token: Option, + + #[serde(default)] + pub grpc_addr2: Option, + #[serde(default)] + pub grpc_x_token2: Option, + + #[serde(default)] + pub grpc_addr3: Option, + #[serde(default)] + pub grpc_x_token3: Option, + + #[serde(default)] + pub grpc_addr4: Option, + #[serde(default)] + pub grpc_x_token4: Option, + /// postgres config #[serde(default)] pub postgres: Option, @@ -116,12 +133,45 @@ impl Config { .map(|_| true) .unwrap_or(config.use_grpc); + // source 1 config.grpc_addr = env::var("GRPC_ADDR").unwrap_or(config.grpc_addr); - config.grpc_x_token = env::var("GRPC_X_TOKEN") .map(Some) .unwrap_or(config.grpc_x_token); + assert!( + env::var("GRPC_ADDR1").is_err(), + "use GRPC_ADDR instead of GRPC_ADDR1" + ); + assert!( + env::var("GRPC_X_TOKEN1").is_err(), + "use GRPC_X_TOKEN instead of GRPC_X_TOKEN1" + ); + + // source 2 + config.grpc_addr2 = env::var("GRPC_ADDR2") + .map(Some) + .unwrap_or(config.grpc_addr2); + config.grpc_x_token2 = env::var("GRPC_X_TOKEN2") + .map(Some) + .unwrap_or(config.grpc_x_token2); + + // source 3 + config.grpc_addr3 = env::var("GRPC_ADDR3") + .map(Some) + .unwrap_or(config.grpc_addr3); + config.grpc_x_token3 = env::var("GRPC_X_TOKEN3") + .map(Some) + .unwrap_or(config.grpc_x_token3); + + // source 4 + config.grpc_addr4 = env::var("GRPC_ADDR4") + .map(Some) + .unwrap_or(config.grpc_addr4); + config.grpc_x_token4 = env::var("GRPC_X_TOKEN4") + .map(Some) + .unwrap_or(config.grpc_x_token4); + config.postgres = PostgresSessionConfig::new_from_env()?.or(config.postgres); Ok(config) @@ -166,4 +216,42 @@ impl Config { pub fn default_grpc_addr() -> String { DEFAULT_GRPC_ADDR.to_string() } + + pub fn get_grpc_sources(&self) -> Vec { + let mut sources: Vec = vec![]; + + sources.push(GrpcSource { + addr: self.grpc_addr.clone(), + x_token: self.grpc_x_token.clone(), + }); + + if self.grpc_addr2.is_some() { + sources.push(GrpcSource { + addr: self.grpc_addr2.clone().unwrap(), + x_token: self.grpc_x_token2.clone(), + }); + } + + if self.grpc_addr3.is_some() { + sources.push(GrpcSource { + addr: self.grpc_addr3.clone().unwrap(), + x_token: self.grpc_x_token3.clone(), + }); + } + + if self.grpc_addr4.is_some() { + sources.push(GrpcSource { + addr: self.grpc_addr4.clone().unwrap(), + x_token: self.grpc_x_token4.clone(), + }); + } + + sources + } +} + +#[derive(Debug, Clone)] +pub struct GrpcSource { + pub addr: String, + pub x_token: Option, } diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 6d5e7666..a3a89454 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -8,12 +8,16 @@ use lite_rpc::bridge::LiteBridge; use lite_rpc::cli::Config; use lite_rpc::postgres_logger::PostgresLogger; use lite_rpc::service_spawner::ServiceSpawner; -use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, GRPC_VERSION}; +use lite_rpc::DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE; use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; use crate::rpc_tester::RpcTester; +use log::info; use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming; use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription; +use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{ + GrpcConnectionTimeouts, GrpcSourceConfig, +}; use solana_lite_rpc_cluster_endpoints::json_rpc_leaders_getter::JsonRpcLeaderGetter; use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription; use solana_lite_rpc_core::keypair_loader::load_identity_keypair; @@ -81,6 +85,7 @@ pub async fn start_postgres( } pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow::Result<()> { + let grpc_sources = args.get_grpc_sources(); let Config { lite_rpc_ws_addr, lite_rpc_http_addr, @@ -92,8 +97,6 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: transaction_retry_after_secs, quic_proxy_addr, use_grpc, - grpc_addr, - grpc_x_token, .. } = args; @@ -108,23 +111,38 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr); let (subscriptions, cluster_endpoint_tasks) = if use_grpc { + info!("Creating geyser subscription..."); + + let timeouts = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(5), + request_timeout: Duration::from_secs(5), + subscribe_timeout: Duration::from_secs(5), + }; + create_grpc_subscription( rpc_client.clone(), - grpc_addr, - grpc_x_token, - GRPC_VERSION.to_string(), + grpc_sources + .iter() + .map(|s| { + GrpcSourceConfig::new(s.addr.clone(), s.x_token.clone(), None, timeouts.clone()) + }) + .collect(), )? } else { + info!("Creating RPC poll subscription..."); create_json_rpc_polling_subscription(rpc_client.clone())? }; + let EndpointStreaming { blocks_notifier, cluster_info_notifier, slot_notifier, vote_account_notifier, } = subscriptions; + let finalized_block = get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await; + info!("Got finalized block: {:?}", finalized_block.slot); let epoch_data = EpochCache::bootstrap_epoch(&rpc_client).await?; @@ -250,6 +268,8 @@ pub async fn main() -> anyhow::Result<()> { let rpc_client = Arc::new(RpcClient::new(rpc_addr.clone())); let rpc_tester = tokio::spawn(RpcTester::new(rpc_client.clone()).start()); + info!("Use RPC address: {}", obfuscate_rpcurl(rpc_addr)); + let main = start_lite_rpc(config, rpc_client); tokio::select! { @@ -295,3 +315,11 @@ fn parse_host_port(host_port: &str) -> Result { Ok(addrs[0]) } } + +// http://mango.rpcpool.com/c232ab232ba2323 +fn obfuscate_rpcurl(rpc_addr: &str) -> String { + if rpc_addr.contains("rpcpool.com") { + return rpc_addr.replacen(char::is_numeric, "X", 99); + } + rpc_addr.to_string() +}