Skip to content

Commit

Permalink
async-timer based approach
Browse files Browse the repository at this point in the history
With this, 10us batching timeout works, but it has some other wrinkles:

- it uses the signal-based timer APIs instead of going through epoll (=> timerfd)
= it needs to make a syscall for each batch, which costs around 1-2us, so, probably significant CPU time wasted on this.
  • Loading branch information
problame committed Nov 20, 2024
1 parent af95320 commit 1639b26
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 3 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
async-timer = "0.7.4"
azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
Expand Down
1 change: 1 addition & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ anyhow.workspace = true
arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
async-timer.workspace = true
bit_field.workspace = true
byteorder.workspace = true
bytes.workspace = true
Expand Down
17 changes: 14 additions & 3 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use anyhow::{bail, Context};
use async_compression::tokio::write::GzipEncoder;
use async_timer::Oneshot;
use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
Expand All @@ -22,6 +23,7 @@ use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::borrow::Cow;
use std::io;
use std::pin::Pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -316,6 +318,8 @@ struct PageServerHandler {

/// See [`PageServerConf::server_side_batch_timeout`]
server_side_batch_timeout: Option<Duration>,

server_side_batch_timer: Pin<Box<async_timer::oneshot::Timer>>,
}

struct Carry {
Expand Down Expand Up @@ -585,6 +589,9 @@ impl PageServerHandler {
timeline_handles: TimelineHandles::new(tenant_manager),
cancel,
server_side_batch_timeout,
server_side_batch_timer: Box::pin(async_timer::oneshot::Timer::new(
Duration::from_secs(999),
)), // reset each iteration
}
}

Expand Down Expand Up @@ -661,9 +668,13 @@ impl PageServerHandler {
.msg
]));
} else {
batching_deadline_storage = Some(Box::pin(async move {
tokio::time::sleep(batch_timeout).await;
}));
std::future::poll_fn(|ctx| {
self.server_side_batch_timer
.restart(batch_timeout, ctx.waker());
std::task::Poll::Ready(())
})
.await;
batching_deadline_storage = Some(&mut self.server_side_batch_timer);
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)
Expand Down

0 comments on commit 1639b26

Please sign in to comment.