Skip to content

Commit

Permalink
use tpu-client-next in send_transaction_service (#3515)
Browse files Browse the repository at this point in the history
* Add tpu-client-next to send_transaction_service

* rename with_option to new

* Update Cargo.lock

(cherry picked from commit 5c0f173)

# Conflicts:
#	banks-server/src/banks_server.rs
#	programs/sbf/Cargo.lock
#	rpc/src/rpc.rs
#	rpc/src/rpc_service.rs
#	send-transaction-service/src/send_transaction_service.rs
#	send-transaction-service/src/tpu_info.rs
#	send-transaction-service/src/transaction_client.rs
#	svm/examples/Cargo.lock
  • Loading branch information
KirillLykov authored and mergify[bot] committed Dec 3, 2024
1 parent d07fc9b commit 6c1270e
Show file tree
Hide file tree
Showing 17 changed files with 10,061 additions and 152 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion banks-server/src/banks_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use {
solana_send_transaction_service::{
send_transaction_service::{SendTransactionService, TransactionInfo},
tpu_info::NullTpuInfo,
transaction_client::ConnectionCacheClient,
},
std::{
io,
Expand Down Expand Up @@ -453,17 +454,26 @@ pub async fn start_tcp_server(
.map(move |chan| {
let (sender, receiver) = unbounded();

<<<<<<< HEAD
SendTransactionService::new::<NullTpuInfo>(
tpu_addr,
&bank_forks,
None,
receiver,
&connection_cache,
5_000,
=======
let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
tpu_addr,
None,
None,
>>>>>>> 5c0f173b88 (use tpu-client-next in send_transaction_service (#3515))
0,
exit.clone(),
);

SendTransactionService::new(&bank_forks, receiver, client, 5_000, exit.clone());

let server = BanksServer::new(
bank_forks.clone(),
block_commitment_cache.clone(),
Expand Down
44 changes: 43 additions & 1 deletion programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 24 additions & 53 deletions rpc/src/cluster_tpu_info.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use {
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::{
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
pubkey::Pubkey,
},
solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey},
solana_send_transaction_service::tpu_info::TpuInfo,
std::{
collections::HashMap,
Expand Down Expand Up @@ -50,7 +47,7 @@ impl TpuInfo for ClusterTpuInfo {
.collect();
}

fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
fn get_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.read().unwrap();
let leaders: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
Expand All @@ -70,37 +67,23 @@ impl TpuInfo for ClusterTpuInfo {
unique_leaders
}

fn get_leader_tpus_with_slots(
&self,
max_count: u64,
protocol: Protocol,
) -> Vec<(&SocketAddr, Slot)> {
fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.read().unwrap();
let leaders: Vec<_> = (0..max_count)
.rev()
.filter_map(|future_slot| {
NUM_CONSECUTIVE_LEADER_SLOTS
.checked_mul(future_slot)
.and_then(|slots_in_the_future| {
recorder.leader_and_slot_after_n_slots(slots_in_the_future)
})
})
let leader_pubkeys: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
.collect();
drop(recorder);
let addrs_to_slots = leaders
.into_iter()
.filter_map(|(leader_id, leader_slot)| {
leader_pubkeys
.iter()
.filter_map(|leader_pubkey| {
self.recent_peers
.get(&leader_id)
.map(|(udp_tpu, quic_tpu)| match protocol {
Protocol::UDP => (udp_tpu, leader_slot),
Protocol::QUIC => (quic_tpu, leader_slot),
.get(leader_pubkey)
.map(|addr| match protocol {
Protocol::UDP => &addr.0,
Protocol::QUIC => &addr.1,
})
})
.collect::<HashMap<_, _>>();
let mut unique_leaders = Vec::from_iter(addrs_to_slots);
unique_leaders.sort_by_key(|(_addr, slot)| *slot);
unique_leaders
.collect()
}
}

Expand Down Expand Up @@ -275,12 +258,12 @@ mod test {
let first_leader =
solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap();
assert_eq!(
leader_info.get_leader_tpus(1, Protocol::UDP),
leader_info.get_unique_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
);
assert_eq!(
leader_info.get_leader_tpus_with_slots(1, Protocol::UDP),
vec![(&recent_peers.get(&first_leader).unwrap().0, 0)]
leader_info.get_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
);

let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
Expand All @@ -294,15 +277,12 @@ mod test {
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_leader_tpus(2, Protocol::UDP),
leader_info.get_unique_leader_tpus(2, Protocol::UDP),
expected_leader_sockets
);
assert_eq!(
leader_info.get_leader_tpus_with_slots(2, Protocol::UDP),
leader_info.get_leader_tpus(2, Protocol::UDP),
expected_leader_sockets
.into_iter()
.zip([0, 4])
.collect::<Vec<_>>()
);

let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
Expand All @@ -317,26 +297,17 @@ mod test {
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_leader_tpus(3, Protocol::UDP),
leader_info.get_unique_leader_tpus(3, Protocol::UDP),
expected_leader_sockets
);
// Only 2 leader tpus are returned always... so [0, 4, 8] isn't right here.
// This assumption is safe. After all, leader schedule generation must be deterministic.
assert_eq!(
leader_info.get_leader_tpus_with_slots(3, Protocol::UDP),
expected_leader_sockets
.into_iter()
.zip([0, 4])
.collect::<Vec<_>>()
);

for x in 4..8 {
assert!(leader_info.get_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len());
assert!(
leader_info
.get_leader_tpus_with_slots(x, Protocol::UDP)
.len()
<= recent_peers.len()
leader_info.get_unique_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len()
);
assert_eq!(
leader_info.get_leader_tpus(x, Protocol::UDP).len(),
x as usize
);
}
}
Expand Down
37 changes: 27 additions & 10 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ use {
solana_runtime::commitment::CommitmentSlots,
solana_send_transaction_service::{
send_transaction_service::SendTransactionService, tpu_info::NullTpuInfo,
transaction_client::ConnectionCacheClient,
},
solana_streamer::socket::SocketAddrSpace,
};
Expand Down Expand Up @@ -378,16 +379,21 @@ impl JsonRpcRequestProcessor {
.tpu(connection_cache.protocol())
.unwrap();
let (sender, receiver) = unbounded();
SendTransactionService::new::<NullTpuInfo>(

let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
tpu_address,
&bank_forks,
None,
<<<<<<< HEAD
receiver,
&connection_cache,
1000,
=======
None,
>>>>>>> 5c0f173b88 (use tpu-client-next in send_transaction_service (#3515))
1,
exit.clone(),
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let startup_verification_complete = Arc::clone(bank.get_startup_verification_complete());
Expand Down Expand Up @@ -4385,7 +4391,9 @@ pub mod tests {
},
vote::state::VoteState,
},
solana_send_transaction_service::tpu_info::NullTpuInfo,
solana_send_transaction_service::{
tpu_info::NullTpuInfo, transaction_client::ConnectionCacheClient,
},
solana_transaction_status::{
EncodedConfirmedBlock, EncodedTransaction, EncodedTransactionWithStatusMeta,
TransactionDetails,
Expand Down Expand Up @@ -6491,16 +6499,20 @@ pub mod tests {
Arc::new(AtomicU64::default()),
Arc::new(PrioritizationFeeCache::default()),
);
SendTransactionService::new::<NullTpuInfo>(
let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
tpu_address,
&bank_forks,
None,
<<<<<<< HEAD
receiver,
&connection_cache,
1000,
=======
None,
>>>>>>> 5c0f173b88 (use tpu-client-next in send_transaction_service (#3515))
1,
exit,
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

let mut bad_transaction = system_transaction::transfer(
&mint_keypair,
Expand Down Expand Up @@ -6765,16 +6777,21 @@ pub mod tests {
Arc::new(AtomicU64::default()),
Arc::new(PrioritizationFeeCache::default()),
);
SendTransactionService::new::<NullTpuInfo>(
let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
tpu_address,
&bank_forks,
None,
<<<<<<< HEAD
receiver,
&connection_cache,
1000,
=======
None,
>>>>>>> 5c0f173b88 (use tpu-client-next in send_transaction_service (#3515))
1,
exit,
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

assert_eq!(
request_processor.get_block_commitment(0),
RpcBlockCommitment {
Expand Down
21 changes: 19 additions & 2 deletions rpc/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use {
exit::Exit, genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH, hash::Hash,
native_token::lamports_to_sol,
},
solana_send_transaction_service::send_transaction_service::{self, SendTransactionService},
solana_send_transaction_service::{
send_transaction_service::{self, SendTransactionService},
transaction_client::ConnectionCacheClient,
},
solana_storage_bigtable::CredentialType,
std::{
net::SocketAddr,
Expand Down Expand Up @@ -474,15 +477,29 @@ impl JsonRpcService {

let leader_info =
poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder));
<<<<<<< HEAD
let _send_transaction_service = Arc::new(SendTransactionService::new_with_config(
tpu_address,
&bank_forks,
leader_info,
receiver,
&connection_cache,
=======
let client = ConnectionCacheClient::new(
connection_cache,
tpu_address,
send_transaction_service_config.tpu_peers.clone(),
leader_info,
send_transaction_service_config.leader_forward_count,
);
let _send_transaction_service = SendTransactionService::new_with_config(
&bank_forks,
receiver,
client,
>>>>>>> 5c0f173b88 (use tpu-client-next in send_transaction_service (#3515))
send_transaction_service_config,
exit,
));
);

#[cfg(test)]
let test_request_processor = request_processor.clone();
Expand Down
Loading

0 comments on commit 6c1270e

Please sign in to comment.