Skip to content

Commit

Permalink
try interval-based impl to cross-chec
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Nov 20, 2024
1 parent 68550f0 commit 721643b
Showing 1 changed file with 15 additions and 43 deletions.
58 changes: 15 additions & 43 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::borrow::Cow;
use std::io;
use std::pin::Pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -319,7 +318,7 @@ struct PageServerHandler {
/// See [`PageServerConf::server_side_batch_timeout`]
server_side_batch_timeout: Option<Duration>,

server_side_batch_timer: Pin<Box<async_timer::timer::Platform>>,
server_side_batch_interval: Option<async_timer::Interval>,
}

struct Carry {
Expand Down Expand Up @@ -589,7 +588,8 @@ impl PageServerHandler {
timeline_handles: TimelineHandles::new(tenant_manager),
cancel,
server_side_batch_timeout,
server_side_batch_timer: Box::pin(async_timer::new_timer(Duration::from_secs(999))), // reset each iteration
server_side_batch_interval: server_side_batch_timeout
.map(|timeout| async_timer::Interval::new(timeout)),
}
}

Expand Down Expand Up @@ -631,50 +631,22 @@ impl PageServerHandler {
{
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();

let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell?

loop {
// Create a future that will become ready when we need to stop batching.
use futures::future::Either;
let batching_deadline = match (
&*maybe_carry as &Option<Carry>,
&mut batching_deadline_storage,
) {
(None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
(None, Some(_)) => unreachable!(),
(Some(_), Some(fut)) => Either::Right(fut), // below arm already ran
(Some(carry), None) => {
match self.server_side_batch_timeout {
None => {
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]))
}
Some(batch_timeout) => {
// Take into consideration the time the carry spent waiting.
let batch_timeout =
batch_timeout.saturating_sub(carry.started_at.elapsed());
if batch_timeout.is_zero() {
// the timer doesn't support restarting with zero duration
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]));
} else {
self.server_side_batch_timer.restart(batch_timeout);
batching_deadline_storage = Some(&mut self.server_side_batch_timer);
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)
}
}
let batching_deadline = match &*maybe_carry as &Option<Carry> {
None => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
Some(carry) => match &mut self.server_side_batch_interval {
None => {
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]))
}
}
Some(interval) => Either::Right(interval),
},
};
let msg = tokio::select! {
biased;
Expand Down

0 comments on commit 721643b

Please sign in to comment.