Skip to content

Commit

Permalink
tokio::time::Interval based approach
Browse files Browse the repository at this point in the history
batching at 10us doesn't work well enough, prob the future is ready
too soon. batching factor is just 1.5

https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e004780b79c8dd6d007dbb120
  • Loading branch information
problame committed Nov 20, 2024
1 parent f3ed569 commit 81d9970
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 42 deletions.
60 changes: 19 additions & 41 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ struct PageServerHandler {

/// See [`PageServerConf::server_side_batch_timeout`]
server_side_batch_timeout: Option<Duration>,

server_side_batch_interval: Option<tokio::time::Interval>,
}

struct Carry {
Expand Down Expand Up @@ -585,6 +587,11 @@ impl PageServerHandler {
timeline_handles: TimelineHandles::new(tenant_manager),
cancel,
server_side_batch_timeout,
server_side_batch_interval: server_side_batch_timeout.map(|timeout| {
let mut itv = tokio::time::interval(timeout);
itv.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
itv
}),
}
}

Expand Down Expand Up @@ -626,51 +633,22 @@ impl PageServerHandler {
{
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();

let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell?

loop {
// Create a future that will become ready when we need to stop batching.
use futures::future::Either;
let batching_deadline = match (
&*maybe_carry as &Option<Carry>,
&mut batching_deadline_storage,
) {
(None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
(None, Some(_)) => unreachable!(),
(Some(_), Some(fut)) => Either::Right(fut), // below arm already ran
(Some(carry), None) => {
match self.server_side_batch_timeout {
None => {
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]))
}
Some(batch_timeout) => {
// Take into consideration the time the carry spent waiting.
let batch_timeout =
batch_timeout.saturating_sub(carry.started_at.elapsed());
if batch_timeout.is_zero() {
// the timer doesn't support restarting with zero duration
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]));
} else {
batching_deadline_storage = Some(Box::pin(async move {
tokio::time::sleep(batch_timeout).await;
}));
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)
}
}
let batching_deadline = match &*maybe_carry as &Option<Carry> {
None => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
Some(carry) => match &mut self.server_side_batch_interval {
None => {
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]))
}
}
Some(interval) => Either::Right(interval.tick()),
},
};
let msg = tokio::select! {
biased;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import humantime_to_ms

TARGET_RUNTIME = 60
TARGET_RUNTIME = 5


@pytest.mark.parametrize(
Expand All @@ -18,10 +18,12 @@
# the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout
(50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"),
(50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"),
(50, "20us", TARGET_RUNTIME, 1, 128, "not batchable 20us timeout"),
(50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"),
# the next 4 cases demonstrate how batchable workloads benefit from batching
(50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"),
(50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"),
(50, "20us", TARGET_RUNTIME, 100, 128, "batchable 20us timeout"),
(50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"),
(50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"),
],
Expand Down

0 comments on commit 81d9970

Please sign in to comment.