Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

TransactionScheduler: Clean already processed or old transactions from container #34233

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use {
TOTAL_BUFFERED_PACKETS,
},
crossbeam_channel::RecvTimeoutError,
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
solana_measure::measure_us,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
saturating_add_assign, timing::AtomicInterval, transaction::SanitizedTransaction,
clock::MAX_PROCESSING_AGE, saturating_add_assign, timing::AtomicInterval,
transaction::SanitizedTransaction,
},
std::{
sync::{Arc, RwLock},
Expand Down Expand Up @@ -128,7 +130,11 @@ impl SchedulerController {
let (_, clear_time_us) = measure_us!(self.clear_container());
saturating_add_assign!(self.timing_metrics.clear_time_us, clear_time_us);
}
BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => {}
BufferedPacketsDecision::ForwardAndHold => {
let (_, clean_time_us) = measure_us!(self.clean_queue());
saturating_add_assign!(self.timing_metrics.clean_time_us, clean_time_us);
}
BufferedPacketsDecision::Hold => {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice to drop expired or processed txs when getting close to becoming leader. Assume doing so only when ForwardAndHold but not Hold is due to performance concerns? Depends on clean_time_us, maybe can do queue cleaning more frequently to avoid scheduler spending too much time on cleaning instead of receiving when close to be leader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so Hold can happen for a few reasons, one of which is it's our leader slot but the bank isn't ready yet. In that case, you'd expect to get the bank really soon, so we don't really want to do anything until the bank is there and we can schedule.

Previously, the TLMI (thread-local multi-iterator) would filter during ForwardAndHold before forwarding. So this PR is kind of re-introducing that behavior, but just not forwarding.

}

Ok(())
Expand All @@ -143,6 +149,53 @@ impl SchedulerController {
}
}

/// Clean unprocessable transactions from the queue. These will be transactions that are
/// expired, already processed, or are no longer sanitizable.
/// This only clears pending transactions, and does **not** clear in-flight transactions.
fn clean_queue(&mut self) {
// Clean up any transactions that have already been processed, are too old, or do not have
// valid nonce accounts.
const MAX_TRANSACTION_CHECKS: usize = 10_000;
let mut transaction_ids = Vec::with_capacity(MAX_TRANSACTION_CHECKS);

while let Some(id) = self.container.pop() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, could continer.size > MAX_TRANSACTION_CHECKS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it could be. My intent here was to keep the runtime of this fn very short by only cleaning the top-of-queue, i.e. only trnasactions we're very likely to schedule early on during leader slot.

transaction_ids.push(id);
}

let bank = self.bank_forks.read().unwrap().working_bank();

const CHUNK_SIZE: usize = 128;
let mut error_counters = TransactionErrorMetrics::default();

for chunk in transaction_ids.chunks(CHUNK_SIZE) {
let lock_results = vec![Ok(()); chunk.len()];
let sanitized_txs: Vec<_> = chunk
.iter()
.map(|id| {
&self
.container
.get_transaction_ttl(&id.id)
.expect("transaction must exist")
.transaction
})
.collect();

let check_results = bank.check_transactions(
&sanitized_txs,
&lock_results,
MAX_PROCESSING_AGE,
&mut error_counters,
);

for ((result, _nonce), id) in check_results.into_iter().zip(chunk.iter()) {
if result.is_err() {
saturating_add_assign!(self.count_metrics.num_dropped_on_age_and_status, 1);
self.container.remove_by_id(&id.id);
}
}
}
}

/// Receives completed transactions from the workers and updates metrics.
fn receive_completed(&mut self) -> Result<(), SchedulerError> {
let ((num_transactions, num_retryable), receive_completed_time_us) =
Expand Down Expand Up @@ -275,6 +328,8 @@ struct SchedulerCountMetrics {
num_dropped_on_validate_locks: usize,
/// Number of transactions that were dropped due to clearing.
num_dropped_on_clear: usize,
/// Number of transactions that were dropped due to age and status checks.
num_dropped_on_age_and_status: usize,
/// Number of transactions that were dropped due to exceeded capacity.
num_dropped_on_capacity: usize,
}
Expand Down Expand Up @@ -311,6 +366,11 @@ impl SchedulerCountMetrics {
i64
),
("num_dropped_on_clear", self.num_dropped_on_clear, i64),
(
"num_dropped_on_age_and_status",
self.num_dropped_on_age_and_status,
i64
),
("num_dropped_on_capacity", self.num_dropped_on_capacity, i64)
);
}
Expand All @@ -326,6 +386,7 @@ impl SchedulerCountMetrics {
|| self.num_dropped_on_sanitization != 0
|| self.num_dropped_on_validate_locks != 0
|| self.num_dropped_on_clear != 0
|| self.num_dropped_on_age_and_status != 0
|| self.num_dropped_on_capacity != 0
}

Expand All @@ -340,6 +401,7 @@ impl SchedulerCountMetrics {
self.num_dropped_on_sanitization = 0;
self.num_dropped_on_validate_locks = 0;
self.num_dropped_on_clear = 0;
self.num_dropped_on_age_and_status = 0;
self.num_dropped_on_capacity = 0;
}
}
Expand All @@ -357,6 +419,8 @@ struct SchedulerTimingMetrics {
schedule_time_us: u64,
/// Time spent clearing transactions from the container.
clear_time_us: u64,
/// Time spent cleaning expired or processed transactions from the container.
clean_time_us: u64,
/// Time spent receiving completed transactions.
receive_completed_time_us: u64,
}
Expand All @@ -380,6 +444,7 @@ impl SchedulerTimingMetrics {
("buffer_time", self.buffer_time_us, i64),
("schedule_time", self.schedule_time_us, i64),
("clear_time", self.clear_time_us, i64),
("clean_time", self.clean_time_us, i64),
(
"receive_completed_time",
self.receive_completed_time_us,
Expand All @@ -389,9 +454,12 @@ impl SchedulerTimingMetrics {
}

fn reset(&mut self) {
self.decision_time_us = 0;
self.receive_time_us = 0;
self.buffer_time_us = 0;
self.schedule_time_us = 0;
self.clear_time_us = 0;
self.clean_time_us = 0;
self.receive_completed_time_us = 0;
}
}
Expand Down