Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use counting semaphore to poll blocks #189

Merged
merged 2 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 50 additions & 32 deletions cluster-endpoints/src/rpc_polling/poll_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use solana_transaction_status::{
option_serializer::OptionSerializer, RewardType, TransactionDetails, UiTransactionEncoding,
UiTransactionStatusMeta,
};
use std::{sync::Arc, time::Duration};
use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use tokio::sync::broadcast::{Receiver, Sender};

pub async fn process_block(
Expand Down Expand Up @@ -169,48 +172,63 @@ pub fn poll_block(
let task_spawner: AnyhowJoinHandle = tokio::spawn(async move {
let counting_semaphore = Arc::new(tokio::sync::Semaphore::new(1024));
let mut slot_notification = slot_notification;
let mut last_processed_slot = 0;
let current_slot = Arc::new(AtomicU64::new(0));
loop {
let SlotNotification { processed_slot, .. } = slot_notification
.recv()
.await
.context("Slot notification channel close")?;
let last_processed_slot = current_slot.load(std::sync::atomic::Ordering::Relaxed);
if processed_slot > last_processed_slot {
last_processed_slot = processed_slot;
let premit = counting_semaphore.clone().acquire_owned().await?;
let rpc_client = rpc_client.clone();
let block_notification_sender = block_notification_sender.clone();
tokio::spawn(async move {
// try 500 times because slot gets
for _ in 0..1024 {
if let Some(processed_block) = process_block(
rpc_client.as_ref(),
processed_slot,
CommitmentConfig::confirmed(),
)
.await
current_slot.store(processed_slot, std::sync::atomic::Ordering::Relaxed);

for slot in last_processed_slot + 1..processed_slot + 1 {
let premit = counting_semaphore.clone().acquire_owned().await?;
let rpc_client = rpc_client.clone();
let block_notification_sender = block_notification_sender.clone();
let current_slot = current_slot.clone();
tokio::spawn(async move {
let mut confirmed_slot_fetch = false;
while current_slot
.load(std::sync::atomic::Ordering::Relaxed)
.saturating_sub(slot)
< 32
{
let _ = block_notification_sender.send(processed_block);
break;
if let Some(processed_block) = process_block(
rpc_client.as_ref(),
slot,
CommitmentConfig::confirmed(),
)
.await
{
let _ = block_notification_sender.send(processed_block);
confirmed_slot_fetch = true;
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}

for _ in 0..1024 {
if let Some(processed_block) = process_block(
rpc_client.as_ref(),
processed_slot,
CommitmentConfig::finalized(),
)
.await
while confirmed_slot_fetch
&& current_slot
.load(std::sync::atomic::Ordering::Relaxed)
.saturating_sub(slot)
< 128
{
let _ = block_notification_sender.send(processed_block);
break;
if let Some(processed_block) = process_block(
rpc_client.as_ref(),
slot,
CommitmentConfig::finalized(),
)
.await
{
let _ = block_notification_sender.send(processed_block);
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
drop(premit)
});
drop(premit)
});
}
}
}
});
Expand Down
2 changes: 1 addition & 1 deletion lite-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 200_000;

/// 25 slots in 10s send to little more leaders
#[from_env]
pub const DEFAULT_FANOUT_SIZE: u64 = 32;
pub const DEFAULT_FANOUT_SIZE: u64 = 10;

#[from_env]
pub const MAX_RETRIES: usize = 40;
Expand Down
4 changes: 2 additions & 2 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
connection_timeout: Duration::from_secs(1),
connection_retry_count: 10,
finalize_timeout: Duration::from_millis(200),
max_number_of_connections: 10,
max_number_of_connections: 8,
unistream_timeout: Duration::from_millis(500),
write_timeout: Duration::from_secs(1),
number_of_transactions_per_unistream: 8,
number_of_transactions_per_unistream: 1,
},
tpu_connection_path,
};
Expand Down
11 changes: 6 additions & 5 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,17 @@ impl ActiveConnection {
let mut exit_oneshot_channel = exit_oneshot_channel;
let identity = self.identity;

let max_uni_stream_connections: u64 = compute_max_allowed_uni_streams(
identity_stakes.peer_type,
identity_stakes.stakes,
identity_stakes.total_stakes,
) as u64;
let number_of_transactions_per_unistream = self
.connection_parameters
.number_of_transactions_per_unistream;
let max_number_of_connections = self.connection_parameters.max_number_of_connections;

let max_uni_stream_connections: u64 = (compute_max_allowed_uni_streams(
identity_stakes.peer_type,
identity_stakes.stakes,
identity_stakes.total_stakes,
) * max_number_of_connections) as u64;

let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let exit_signal = self.exit_signal.clone();
let connection_pool = QuicConnectionPool::new(
Expand Down
10 changes: 1 addition & 9 deletions services/src/tpu_utils/tpu_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,14 @@ impl TpuService {
current_slot: Slot,
estimated_slot: Slot,
) -> anyhow::Result<()> {
let load_slot = if estimated_slot <= current_slot {
current_slot
} else if estimated_slot.saturating_sub(current_slot) > 8 {
estimated_slot - 8
} else {
current_slot
};

let fanout = self.config.fanout_slots;
let last_slot = estimated_slot + fanout;

let cluster_nodes = self.data_cache.cluster_info.cluster_nodes.clone();

let next_leaders = self
.leader_schedule
.get_slot_leaders(load_slot, last_slot)
.get_slot_leaders(current_slot, last_slot)
.await?;
// get next leader with its tpu port
let connections_to_keep = next_leaders
Expand Down