Skip to content

Commit

Permalink
fix: use mayus for structs sections, rename get_available_worker, add…
Browse files Browse the repository at this point in the history
… info logs, fix documentation
  • Loading branch information
SantiagoPittella committed Nov 26, 2024
1 parent c6b5321 commit d11c8e8
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions bin/tx-prover/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
/// The interval between attempts to check for available workers.
const WORKER_CHECK_INTERVAL_MILLIS: Duration = Duration::from_millis(20);

// LoadBalancer
// LOAD BALANCER
// ================================================================================================

/// Load balancer that uses a round robin strategy
Expand Down Expand Up @@ -50,7 +50,7 @@ impl LoadBalancer {
/// Gets an available worker and removes it from the list of available workers.
///
/// If no worker is available, it will return None.
pub async fn get_available_worker(&self) -> Option<Backend> {
pub async fn pop_available_worker(&self) -> Option<Backend> {
self.available_workers.write().await.pop()
}

Expand All @@ -61,14 +61,15 @@ impl LoadBalancer {
pub async fn add_available_worker(&self, worker: Backend) {
let mut available_workers = self.available_workers.write().await;
assert!(!available_workers.contains(&worker), "Worker already available");
info!("Worker {} is now available", worker.addr);
available_workers.push(worker);
}
}

/// Rate limiter
static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));

// Request queue
// REQUEST QUEUE
// ================================================================================================

/// Request queue holds the list of requests that are waiting to be processed by the workers.
Expand Down Expand Up @@ -110,7 +111,7 @@ impl RequestQueue {
/// Shared state. It keeps track of the order of the requests to then assign them to the workers.
static QUEUE: Lazy<RequestQueue> = Lazy::new(RequestQueue::new);

// RequestContext
// REQUEST CONTEXT
// ================================================================================================

/// Custom context for the request/response lifecycle
Expand Down Expand Up @@ -202,10 +203,11 @@ impl ProxyHttp for LoadBalancer {
/// Returns [HttpPeer] corresponding to the worker that will handle the current request.
///
/// Here we enqueue the request and wait for it to be at the front of the queue and a worker
/// becomes available. We then set the SNI, timeouts, and enable HTTP/2.
/// becomes available, then we dequeue the request and process it. We then set the SNI,
/// timeouts, and enable HTTP/2.
///
/// Note that the request is not removed from the queue here. It will be returned later in
/// [Self::logging()] once the worker processes the it.
/// Note that the request will be assigned a worker here, and the worker will be removed from
/// the list of available workers once it reaches the [Self::logging] method.
async fn upstream_peer(
&self,
_session: &mut Session,
Expand All @@ -224,9 +226,9 @@ impl ProxyHttp for LoadBalancer {
}

// Check if there is an available worker
if let Some(worker) = self.get_available_worker().await {
if let Some(worker) = self.pop_available_worker().await {
info!("Worker {} picked up the request with ID: {}", worker.addr, request_id);
ctx.set_worker(worker);
info!("Worker picked up the request with ID: {}", request_id);
break;
}
info!("All workers are busy");
Expand Down

0 comments on commit d11c8e8

Please sign in to comment.