diff --git a/Cargo.lock b/Cargo.lock index c7af140f7df2..140117a7a24f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1664,6 +1664,12 @@ dependencies = [ "utils", ] +[[package]] +name = "diatomic-waker" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab03c107fafeb3ee9f5925686dbb7a73bc76e3932abb0d2b365cb64b169cf04c" + [[package]] name = "diesel" version = "2.2.3" @@ -4022,7 +4028,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" dependencies = [ "base64 0.20.0", "byteorder", @@ -4041,7 +4047,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" dependencies = [ "bytes", "fallible-iterator", @@ -6227,7 +6233,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" dependencies = [ "async-trait", "byteorder", @@ -6759,6 +6765,7 @@ dependencies = [ "chrono", "const_format", "criterion", + "diatomic-waker", "fail", "futures", "git-version", diff --git a/Cargo.toml b/Cargo.toml index dbda9305351f..c3b87744d8a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ comfy-table = "7.1" const_format = "0.2" crc32c = "0.6" dashmap = { version = "5.5.0", features = ["raw-api"] } +diatomic-waker = { version = "0.2.3" } either = "1.8" enum-map = "2.4.2" enumset = "1.0.12" diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index ee20613d6db3..a0a6dedcddaa 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -109,8 +109,7 @@ pub struct ConfigToml { pub virtual_file_io_mode: Option, #[serde(skip_serializing_if = "Option::is_none")] pub no_sync: Option, - #[serde(with = "humantime_serde")] - pub server_side_batch_timeout: Option, + pub page_service_pipelining: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -127,6 +126,21 @@ pub struct DiskUsageEvictionTaskConfig { pub eviction_order: EvictionOrder, } +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct PageServicePipeliningConfig { + /// Causes runtime errors if larger than max get_vectored batch size. + pub max_batch_size: NonZeroUsize, + pub protocol_pipelining_mode: PageServiceProtocolPipeliningMode, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum PageServiceProtocolPipeliningMode { + ConcurrentFutures, + Tasks, +} + pub mod statvfs { pub mod mock { #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -319,8 +333,6 @@ pub mod defaults { pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0; pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512; - - pub const DEFAULT_SERVER_SIDE_BATCH_TIMEOUT: Option<&str> = None; } impl Default for ConfigToml { @@ -401,10 +413,12 @@ impl Default for ConfigToml { ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB), l0_flush: None, virtual_file_io_mode: None, - server_side_batch_timeout: DEFAULT_SERVER_SIDE_BATCH_TIMEOUT - .map(|duration| humantime::parse_duration(duration).unwrap()), tenant_config: TenantConfigToml::default(), no_sync: None, + page_service_pipelining: Some(PageServicePipeliningConfig { + max_batch_size: NonZeroUsize::new(32).unwrap(), + protocol_pipelining_mode: PageServiceProtocolPipeliningMode::ConcurrentFutures, + }), } } } diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 545317f95871..d80e23013ed8 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -19,6 +19,7 @@ bincode.workspace = true bytes.workspace = true camino.workspace = true chrono.workspace = true +diatomic-waker.workspace = true git-version.workspace = true hex = { workspace = true, features = ["serde"] } humantime.workspace = true diff --git a/libs/utils/src/sync.rs b/libs/utils/src/sync.rs index 2ee8f3544911..7aa26e24bcc4 100644 --- a/libs/utils/src/sync.rs +++ b/libs/utils/src/sync.rs @@ -1,3 +1,5 @@ pub mod heavier_once_cell; pub mod gate; + +pub mod spsc_fold; diff --git a/libs/utils/src/sync/spsc_fold.rs b/libs/utils/src/sync/spsc_fold.rs new file mode 100644 index 000000000000..28e8e4b8ade7 --- /dev/null +++ b/libs/utils/src/sync/spsc_fold.rs @@ -0,0 +1,391 @@ +use core::{future::poll_fn, task::Poll}; +use std::sync::{Arc, Mutex}; + +use diatomic_waker::DiatomicWaker; + +pub struct Sender { + state: Arc>, +} + +pub struct Receiver { + state: Arc>, +} + +struct Inner { + wake_receiver: DiatomicWaker, + wake_sender: DiatomicWaker, + value: Mutex>, +} + +enum State { + NoData, + HasData(T), + TryFoldFailed, // transient state + SenderWaitsForReceiverToConsume(T), + SenderGone(Option), + ReceiverGone, + AllGone, + SenderDropping, // transient state + ReceiverDropping, // transient state +} + +pub fn channel() -> (Sender, Receiver) { + let inner = Inner { + wake_receiver: DiatomicWaker::new(), + wake_sender: DiatomicWaker::new(), + value: Mutex::new(State::NoData), + }; + + let state = Arc::new(inner); + ( + Sender { + state: state.clone(), + }, + Receiver { state }, + ) +} + +#[derive(Debug, thiserror::Error)] +pub enum SendError { + #[error("receiver is gone")] + ReceiverGone, +} + +impl Sender { + /// # Panics + /// + /// If `try_fold` panics, any subsequent call to `send` panic. + pub async fn send(&mut self, value: T, try_fold: F) -> Result<(), SendError> + where + F: Fn(&mut T, T) -> Result<(), T>, + { + let mut value = Some(value); + poll_fn(|cx| { + let mut guard = self.state.value.lock().unwrap(); + match &mut *guard { + State::NoData => { + *guard = State::HasData(value.take().unwrap()); + self.state.wake_receiver.notify(); + Poll::Ready(Ok(())) + } + State::HasData(_) => { + let State::HasData(acc_mut) = &mut *guard else { + unreachable!("this match arm guarantees that the guard is HasData"); + }; + match try_fold(acc_mut, value.take().unwrap()) { + Ok(()) => { + // no need to wake receiver, if it was waiting it already + // got a wake-up when we transitioned from NoData to HasData + Poll::Ready(Ok(())) + } + Err(unfoldable_value) => { + value = Some(unfoldable_value); + let State::HasData(acc) = + std::mem::replace(&mut *guard, State::TryFoldFailed) + else { + unreachable!("this match arm guarantees that the guard is HasData"); + }; + *guard = State::SenderWaitsForReceiverToConsume(acc); + // SAFETY: send is single threaded due to `&mut self` requirement, + // therefore register is not concurrent. + unsafe { + self.state.wake_sender.register(cx.waker()); + } + Poll::Pending + } + } + } + State::TryFoldFailed => { + unreachable!(); + } + State::SenderWaitsForReceiverToConsume(_data) => { + // Really, we shouldn't be polled until receiver has consumed and wakes us. + Poll::Pending + } + State::ReceiverGone => Poll::Ready(Err(SendError::ReceiverGone)), + State::SenderGone(_) => { + unreachable!(); + } + State::AllGone => { + unreachable!(); + } + State::SenderDropping => { + unreachable!(); + } + State::ReceiverDropping => { + unreachable!(); + } + } + }) + .await + } +} + +impl Drop for Sender { + fn drop(&mut self) { + let Ok(mut guard) = self.state.value.lock() else { + return; + }; + *guard = match std::mem::replace(&mut *guard, State::SenderDropping) { + State::NoData => State::SenderGone(None), + State::HasData(data) | State::SenderWaitsForReceiverToConsume(data) => { + State::SenderGone(Some(data)) + } + State::TryFoldFailed => unreachable!(), + State::SenderGone(_) => unreachable!(), + State::ReceiverGone => State::AllGone, + State::AllGone => unreachable!(), + State::SenderDropping => unreachable!(), + State::ReceiverDropping => unreachable!(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum RecvError { + #[error("sender is gone")] + SenderGone, +} + +impl Receiver { + pub async fn recv(&mut self) -> Result { + poll_fn(|cx| { + let mut guard = self.state.value.lock().unwrap(); + match &mut *guard { + State::NoData => { + // SAFETY: recv is single threaded due to `&mut self` requirement, + // therefore register is not concurrent. + unsafe { + self.state.wake_receiver.register(cx.waker()); + } + Poll::Pending + } + guard @ State::HasData(_) => { + let State::HasData(data) = std::mem::replace(guard, State::NoData) else { + unreachable!("this match arm guarantees that the guard is HasData"); + }; + self.state.wake_sender.notify(); + Poll::Ready(Ok(data)) + } + State::TryFoldFailed => { + unreachable!(); + } + guard @ State::SenderWaitsForReceiverToConsume(_) => { + let State::SenderWaitsForReceiverToConsume(data) = + std::mem::replace(guard, State::NoData) else { + unreachable!( + "this match arm guarantees that the guard is SenderWaitsForReceiverToConsume" + ); + }; + self.state.wake_sender.notify(); + Poll::Ready(Ok(data)) + } + guard @ State::SenderGone(Some(_)) => { + let State::SenderGone(Some(data)) = + std::mem::replace(guard, State::SenderGone(None)) + else { + unreachable!( + "this match arm guarantees that the guard is SenderGone(Some(_))" + ); + }; + Poll::Ready(Ok(data)) + } + State::SenderGone(None) => Poll::Ready(Err(RecvError::SenderGone)), + State::ReceiverGone => { + unreachable!(); + } + State::AllGone => { + unreachable!(); + } + State::SenderDropping => { + unreachable!(); + } + State::ReceiverDropping => { + unreachable!(); + } + } + }) + .await + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + let Ok(mut guard) = self.state.value.lock() else { + return; + }; + *guard = match std::mem::replace(&mut *guard, State::ReceiverDropping) { + State::NoData => State::ReceiverGone, + State::HasData(_) | State::SenderWaitsForReceiverToConsume(_) => State::ReceiverGone, + State::TryFoldFailed => unreachable!(), + State::SenderGone(_) => State::AllGone, + State::ReceiverGone => unreachable!(), + State::AllGone => unreachable!(), + State::SenderDropping => unreachable!(), + State::ReceiverDropping => unreachable!(), + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX); + + #[tokio::test] + async fn test_send_recv() { + let (mut sender, mut receiver) = channel(); + + sender + .send(42, |acc, val| { + *acc += val; + Ok(()) + }) + .await + .unwrap(); + + let received = receiver.recv().await.unwrap(); + assert_eq!(received, 42); + } + + #[tokio::test] + async fn test_send_recv_with_fold() { + let (mut sender, mut receiver) = channel(); + + sender + .send(1, |acc, val| { + *acc += val; + Ok(()) + }) + .await + .unwrap(); + sender + .send(2, |acc, val| { + *acc += val; + Ok(()) + }) + .await + .unwrap(); + + let received = receiver.recv().await.unwrap(); + assert_eq!(received, 3); + } + + #[tokio::test(start_paused = true)] + async fn test_sender_waits_for_receiver_if_try_fold_fails() { + let (mut sender, mut receiver) = channel(); + + sender.send(23, |_, _| panic!("first send")).await.unwrap(); + + let send_fut = sender.send(42, |_, val| Err(val)); + let mut send_fut = std::pin::pin!(send_fut); + + tokio::select! { + _ = tokio::time::sleep(FOREVER) => {}, + _ = &mut send_fut => { + panic!("send should not complete"); + }, + } + + let val = receiver.recv().await.unwrap(); + assert_eq!(val, 23); + + tokio::select! { + _ = tokio::time::sleep(FOREVER) => { + panic!("receiver should have consumed the value"); + }, + _ = &mut send_fut => { }, + } + + let val = receiver.recv().await.unwrap(); + assert_eq!(val, 42); + } + + #[tokio::test(start_paused = true)] + async fn test_sender_errors_if_waits_for_receiver_and_receiver_drops() { + let (mut sender, receiver) = channel(); + + sender.send(23, |_, _| unreachable!()).await.unwrap(); + + let send_fut = sender.send(42, |_, val| Err(val)); + let send_fut = std::pin::pin!(send_fut); + + drop(receiver); + + let result = send_fut.await; + assert!(matches!(result, Err(SendError::ReceiverGone))); + } + + #[tokio::test(start_paused = true)] + async fn test_receiver_errors_if_waits_for_sender_and_sender_drops() { + let (sender, mut receiver) = channel::<()>(); + + let recv_fut = receiver.recv(); + let recv_fut = std::pin::pin!(recv_fut); + + drop(sender); + + let result = recv_fut.await; + assert!(matches!(result, Err(RecvError::SenderGone))); + } + + #[tokio::test(start_paused = true)] + async fn test_receiver_errors_if_waits_for_sender_and_sender_drops_with_data() { + let (mut sender, mut receiver) = channel(); + + sender.send(42, |_, _| unreachable!()).await.unwrap(); + + { + let recv_fut = receiver.recv(); + let recv_fut = std::pin::pin!(recv_fut); + + drop(sender); + + let val = recv_fut.await.unwrap(); + assert_eq!(val, 42); + } + + let result = receiver.recv().await; + assert!(matches!(result, Err(RecvError::SenderGone))); + } + + #[tokio::test(start_paused = true)] + async fn test_receiver_waits_for_sender_if_no_data() { + let (mut sender, mut receiver) = channel(); + + let recv_fut = receiver.recv(); + let mut recv_fut = std::pin::pin!(recv_fut); + + tokio::select! { + _ = tokio::time::sleep(FOREVER) => {}, + _ = &mut recv_fut => { + panic!("recv should not complete"); + }, + } + + sender.send(42, |_, _| Ok(())).await.unwrap(); + + let val = recv_fut.await.unwrap(); + assert_eq!(val, 42); + } + + #[tokio::test] + async fn test_receiver_gone_while_nodata() { + let (mut sender, receiver) = channel(); + drop(receiver); + + let result = sender.send(42, |_, _| Ok(())).await; + assert!(matches!(result, Err(SendError::ReceiverGone))); + } + + #[tokio::test] + async fn test_sender_gone_while_nodata() { + let (sender, mut receiver) = super::channel::(); + drop(sender); + + let result = receiver.recv().await; + assert!(matches!(result, Err(RecvError::SenderGone))); + } +} diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f7be6ecaabd4..86c3621cf00a 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -183,9 +183,7 @@ pub struct PageServerConf { /// Optionally disable disk syncs (unsafe!) pub no_sync: bool, - /// Maximum amount of time for which a get page request request - /// might be held up for request merging. - pub server_side_batch_timeout: Option, + pub page_service_pipelining: Option, } /// Token for authentication to safekeepers @@ -340,9 +338,9 @@ impl PageServerConf { concurrent_tenant_warmup, concurrent_tenant_size_logical_size_queries, virtual_file_io_engine, - server_side_batch_timeout, tenant_config, no_sync, + page_service_pipelining, } = config_toml; let mut conf = PageServerConf { @@ -382,7 +380,7 @@ impl PageServerConf { image_compression, timeline_offloading, ephemeral_bytes_per_memory_kb, - server_side_batch_timeout, + page_service_pipelining, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index ef6711397a8c..ff6af3566c82 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -356,6 +356,25 @@ async fn timed( } } +/// Like [`timed`], but the warning timeout only starts after `cancel` has been cancelled. +async fn timed_after_cancellation( + fut: Fut, + name: &str, + warn_at: std::time::Duration, + cancel: &CancellationToken, +) -> ::Output { + let mut fut = std::pin::pin!(fut); + + tokio::select! { + _ = cancel.cancelled() => { + timed(fut, name, warn_at).await + } + ret = &mut fut => { + ret + } + } +} + #[cfg(test)] mod timed_tests { use super::timed; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3472dda378d6..c8ea3a8ca709 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -7,6 +7,7 @@ use bytes::Buf; use futures::FutureExt; use itertools::Itertools; use once_cell::sync::OnceCell; +use pageserver_api::config::{PageServicePipeliningConfig, PageServiceProtocolPipeliningMode}; use pageserver_api::models::{self, TenantState}; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -16,12 +17,15 @@ use pageserver_api::models::{ PagestreamProtocolVersion, }; use pageserver_api::shard::TenantShardId; -use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError}; +use postgres_backend::{ + is_expected_io_error, AuthType, PostgresBackend, PostgresBackendReader, QueryError, +}; use pq_proto::framed::ConnectionError; use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; use std::io; +use std::num::NonZeroUsize; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -32,6 +36,7 @@ use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; +use utils::sync::spsc_fold; use utils::{ auth::{Claims, Scope, SwappableJwtAuth}, id::{TenantId, TimelineId}, @@ -40,7 +45,6 @@ use utils::{ }; use crate::auth::check_permission; -use crate::basebackup; use crate::basebackup::BasebackupError; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; @@ -58,6 +62,7 @@ use crate::tenant::timeline::{self, WaitLsnError}; use crate::tenant::GetTimelineError; use crate::tenant::PageReconstructError; use crate::tenant::Timeline; +use crate::{basebackup, timed_after_cancellation}; use pageserver_api::key::rel_block_to_key; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; @@ -105,7 +110,7 @@ pub fn spawn( pg_auth, tcp_listener, conf.pg_auth_type, - conf.server_side_batch_timeout, + conf.page_service_pipelining.clone(), libpq_ctx, cancel.clone(), ) @@ -154,7 +159,7 @@ pub async fn libpq_listener_main( auth: Option>, listener: tokio::net::TcpListener, auth_type: AuthType, - server_side_batch_timeout: Option, + pipelining_config: Option, listener_ctx: RequestContext, listener_cancel: CancellationToken, ) -> Connections { @@ -185,7 +190,7 @@ pub async fn libpq_listener_main( local_auth, socket, auth_type, - server_side_batch_timeout, + pipelining_config.clone(), connection_ctx, connections_cancel.child_token(), )); @@ -213,7 +218,7 @@ async fn page_service_conn_main( auth: Option>, socket: tokio::net::TcpStream, auth_type: AuthType, - server_side_batch_timeout: Option, + pipelining_config: Option, connection_ctx: RequestContext, cancel: CancellationToken, ) -> ConnectionHandlerResult { @@ -256,7 +261,7 @@ async fn page_service_conn_main( // a while: we will tear down this PageServerHandler and instantiate a new one if/when // they reconnect. socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms))); - let socket = std::pin::pin!(socket); + let socket = Box::pin(socket); fail::fail_point!("ps::connection-start::pre-login"); @@ -267,7 +272,7 @@ async fn page_service_conn_main( let mut conn_handler = PageServerHandler::new( tenant_manager, auth, - server_side_batch_timeout, + pipelining_config, connection_ctx, cancel.clone(), ); @@ -283,7 +288,7 @@ async fn page_service_conn_main( info!("Postgres client disconnected ({io_error})"); Ok(()) } else { - let tenant_id = conn_handler.timeline_handles.tenant_id(); + let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id(); Err(io_error).context(format!( "Postgres connection error for tenant_id={:?} client at peer_addr={}", tenant_id, peer_addr @@ -291,7 +296,7 @@ async fn page_service_conn_main( } } other => { - let tenant_id = conn_handler.timeline_handles.tenant_id(); + let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id(); other.context(format!( "Postgres query error for tenant_id={:?} client peer_addr={}", tenant_id, peer_addr @@ -312,15 +317,10 @@ struct PageServerHandler { cancel: CancellationToken, - timeline_handles: TimelineHandles, - - /// See [`PageServerConf::server_side_batch_timeout`] - server_side_batch_timeout: Option, -} + /// None only while pagestream protocol is being processed. + timeline_handles: Option, -struct Carry { - msg: BatchedFeMessage, - started_at: Instant, + pipelining_config: Option, } struct TimelineHandles { @@ -537,10 +537,12 @@ impl From for QueryError { enum BatchedFeMessage { Exists { span: Span, + shard: timeline::handle::Handle, req: models::PagestreamExistsRequest, }, Nblocks { span: Span, + shard: timeline::handle::Handle, req: models::PagestreamNblocksRequest, }, GetPage { @@ -551,10 +553,12 @@ enum BatchedFeMessage { }, DbSize { span: Span, + shard: timeline::handle::Handle, req: models::PagestreamDbSizeRequest, }, GetSlruSegment { span: Span, + shard: timeline::handle::Handle, req: models::PagestreamGetSlruSegmentRequest, }, RespondError { @@ -563,18 +567,11 @@ enum BatchedFeMessage { }, } -enum BatchOrEof { - /// In the common case, this has one entry. - /// At most, it has two entries: the first is the leftover batch, the second is an error. - Batch(smallvec::SmallVec<[BatchedFeMessage; 1]>), - Eof, -} - impl PageServerHandler { pub fn new( tenant_manager: Arc, auth: Option>, - server_side_batch_timeout: Option, + pipelining_config: Option, connection_ctx: RequestContext, cancel: CancellationToken, ) -> Self { @@ -582,9 +579,9 @@ impl PageServerHandler { auth, claims: None, connection_ctx, - timeline_handles: TimelineHandles::new(tenant_manager), + timeline_handles: Some(TimelineHandles::new(tenant_manager)), cancel, - server_side_batch_timeout, + pipelining_config, } } @@ -612,256 +609,351 @@ impl PageServerHandler { ) } - #[instrument(skip_all, level = tracing::Level::TRACE)] - async fn read_batch_from_connection( - &mut self, - pgb: &mut PostgresBackend, - tenant_id: &TenantId, - timeline_id: &TimelineId, - maybe_carry: &mut Option, + async fn pagestream_read_message( + pgb: &mut PostgresBackendReader, + tenant_id: TenantId, + timeline_id: TimelineId, + timeline_handles: &mut TimelineHandles, + cancel: &CancellationToken, ctx: &RequestContext, - ) -> Result + parent_span: Span, + ) -> Result>, QueryError> where - IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { - debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); + let msg = tokio::select! { + biased; + _ = cancel.cancelled() => { + return Err(QueryError::Shutdown) + } + msg = pgb.read_message() => { msg } + }; - let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell? + let copy_data_bytes = match msg? { + Some(FeMessage::CopyData(bytes)) => bytes, + Some(FeMessage::Terminate) => { + return Ok(None); + } + Some(m) => { + return Err(QueryError::Other(anyhow::anyhow!( + "unexpected message: {m:?} during COPY" + ))); + } + None => { + return Ok(None); + } // client disconnected + }; + trace!("query: {copy_data_bytes:?}"); - loop { - // Create a future that will become ready when we need to stop batching. - use futures::future::Either; - let batching_deadline = match ( - &*maybe_carry as &Option, - &mut batching_deadline_storage, - ) { - (None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched - (None, Some(_)) => unreachable!(), - (Some(_), Some(fut)) => Either::Right(fut), // below arm already ran - (Some(carry), None) => { - match self.server_side_batch_timeout { - None => { - return Ok(BatchOrEof::Batch(smallvec::smallvec![ - maybe_carry - .take() - .expect("we already checked it's Some") - .msg - ])) - } - Some(batch_timeout) => { - // Take into consideration the time the carry spent waiting. - let batch_timeout = - batch_timeout.saturating_sub(carry.started_at.elapsed()); - if batch_timeout.is_zero() { - // the timer doesn't support restarting with zero duration - return Ok(BatchOrEof::Batch(smallvec::smallvec![ - maybe_carry - .take() - .expect("we already checked it's Some") - .msg - ])); - } else { - batching_deadline_storage = Some(Box::pin(async move { - tokio::time::sleep(batch_timeout).await; - })); - Either::Right( - batching_deadline_storage.as_mut().expect("we just set it"), - ) - } - } - } - } - }; - let msg = tokio::select! { - biased; - _ = self.cancel.cancelled() => { - return Err(QueryError::Shutdown) - } - _ = batching_deadline => { - return Ok(BatchOrEof::Batch(smallvec::smallvec![maybe_carry.take().expect("per construction of batching_deadline").msg])); - } - msg = pgb.read_message() => { msg } - }; + fail::fail_point!("ps::handle-pagerequest-message"); - let msg_start = Instant::now(); + // parse request + let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; - // Rest of this loop body is trying to batch `msg` into `batch`. - // If we can add msg to batch we continue into the next loop iteration. - // If we can't add msg to batch batch, we carry `msg` over to the next call. + let batched_msg = match neon_fe_msg { + PagestreamFeMessage::Exists(req) => { + let span = tracing::info_span!(parent: parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn); + let shard = timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .instrument(span.clone()) // sets `shard_id` field + .await?; + BatchedFeMessage::Exists { span, shard, req } + } + PagestreamFeMessage::Nblocks(req) => { + let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn); + let shard = timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .instrument(span.clone()) // sets `shard_id` field + .await?; + BatchedFeMessage::Nblocks { span, shard, req } + } + PagestreamFeMessage::DbSize(req) => { + let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn); + let shard = timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .instrument(span.clone()) // sets `shard_id` field + .await?; + BatchedFeMessage::DbSize { span, shard, req } + } + PagestreamFeMessage::GetSlruSegment(req) => { + let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn); + let shard = timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .instrument(span.clone()) // sets `shard_id` field + .await?; + BatchedFeMessage::GetSlruSegment { span, shard, req } + } + PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + request_lsn, + not_modified_since, + rel, + blkno, + }) => { + let span = tracing::info_span!(parent: parent_span, "handle_get_page_at_lsn_request_batched", req_lsn = %request_lsn); - let copy_data_bytes = match msg? { - Some(FeMessage::CopyData(bytes)) => bytes, - Some(FeMessage::Terminate) => { - return Ok(BatchOrEof::Eof); - } - Some(m) => { - return Err(QueryError::Other(anyhow::anyhow!( - "unexpected message: {m:?} during COPY" - ))); + macro_rules! respond_error { + ($error:expr) => {{ + let error = BatchedFeMessage::RespondError { + span, + error: $error, + }; + Ok(Some(Box::new(error))) + }}; } - None => { - return Ok(BatchOrEof::Eof); - } // client disconnected - }; - trace!("query: {copy_data_bytes:?}"); - fail::fail_point!("ps::handle-pagerequest-message"); + let key = rel_block_to_key(rel, blkno); + let shard = match timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Page(key)) + .instrument(span.clone()) // sets `shard_id` field + .await + { + Ok(tl) => tl, + Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { + // We already know this tenant exists in general, because we resolved it at + // start of connection. Getting a NotFound here indicates that the shard containing + // the requested page is not present on this node: the client's knowledge of shard->pageserver + // mapping is out of date. + // + // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via + // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration + // and talk to a different pageserver. + return respond_error!(PageStreamError::Reconnect( + "getpage@lsn request routed to wrong shard".into() + )); + } + Err(e) => { + return respond_error!(e.into()); + } + }; + let effective_request_lsn = match Self::wait_or_get_last_lsn( + &shard, + request_lsn, + not_modified_since, + &shard.get_latest_gc_cutoff_lsn(), + ctx, + ) + // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait + .await + { + Ok(lsn) => lsn, + Err(e) => { + return respond_error!(e); + } + }; + BatchedFeMessage::GetPage { + span, + shard, + effective_request_lsn, + pages: smallvec::smallvec![(rel, blkno)], + } + } + }; + Ok(Some(Box::new(batched_msg))) + } - // parse request - let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + /// Post-condition: `batch` is Some() + #[instrument(skip_all, level = tracing::Level::TRACE)] + #[allow(clippy::boxed_local)] + fn pagestream_do_batch( + max_batch_size: NonZeroUsize, + batch: &mut Box, + this_msg: Box, + ) -> Result<(), Box> { + debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - let this_msg = match neon_fe_msg { - PagestreamFeMessage::Exists(req) => BatchedFeMessage::Exists { - span: tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn), - req, + match (&mut **batch, *this_msg) { + // something batched already, let's see if we can add this message to the batch + ( + BatchedFeMessage::GetPage { + span: _, + shard: accum_shard, + pages: ref mut accum_pages, + effective_request_lsn: accum_lsn, }, - PagestreamFeMessage::Nblocks(req) => BatchedFeMessage::Nblocks { - span: tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn), - req, + // would be nice to have box pattern here + BatchedFeMessage::GetPage { + span: _, + shard: this_shard, + pages: this_pages, + effective_request_lsn: this_lsn, }, - PagestreamFeMessage::DbSize(req) => BatchedFeMessage::DbSize { - span: tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn), - req, - }, - PagestreamFeMessage::GetSlruSegment(req) => BatchedFeMessage::GetSlruSegment { - span: tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn), - req, - }, - PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - request_lsn, - not_modified_since, - rel, - blkno, - }) => { - // shard_id is filled in by the handler - let span = tracing::info_span!( - "handle_get_page_at_lsn_request_batched", - %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, - batch_size = tracing::field::Empty, batch_id = tracing::field::Empty - ); - - macro_rules! current_batch_and_error { - ($error:expr) => {{ - let error = BatchedFeMessage::RespondError { - span, - error: $error, - }; - let batch_and_error = match maybe_carry.take() { - Some(carry) => smallvec::smallvec![carry.msg, error], - None => smallvec::smallvec![error], - }; - Ok(BatchOrEof::Batch(batch_and_error)) - }}; - } + ) if (|| { + assert_eq!(this_pages.len(), 1); + if accum_pages.len() >= max_batch_size.get() { + trace!(%accum_lsn, %this_lsn, %max_batch_size, "stopping batching because of batch size"); + assert_eq!(accum_pages.len(), max_batch_size.get()); + return false; + } + if (accum_shard.tenant_shard_id, accum_shard.timeline_id) + != (this_shard.tenant_shard_id, this_shard.timeline_id) + { + trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch"); + // TODO: we _could_ batch & execute each shard seperately (and in parallel). + // But the current logic for keeping responses in order does not support that. + return false; + } + // the vectored get currently only supports a single LSN, so, bounce as soon + // as the effective request_lsn changes + if *accum_lsn != this_lsn { + trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed"); + return false; + } + true + })() => + { + // ok to batch + accum_pages.extend(this_pages); + Ok(()) + } + // something batched already but this message is unbatchable + (_, this_msg) => { + // by default, don't continue batching + Err(Box::new(this_msg)) // TODO: avoid re-box + } + } + } - let key = rel_block_to_key(rel, blkno); - let shard = match self - .timeline_handles - .get(*tenant_id, *timeline_id, ShardSelector::Page(key)) - .instrument(span.clone()) - .await - { - Ok(tl) => tl, - Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { - // We already know this tenant exists in general, because we resolved it at - // start of connection. Getting a NotFound here indicates that the shard containing - // the requested page is not present on this node: the client's knowledge of shard->pageserver - // mapping is out of date. - // - // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via - // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration - // and talk to a different pageserver. - return current_batch_and_error!(PageStreamError::Reconnect( - "getpage@lsn request routed to wrong shard".into() - )); - } - Err(e) => { - return current_batch_and_error!(e.into()); - } - }; - let effective_request_lsn = match Self::wait_or_get_last_lsn( - &shard, - request_lsn, - not_modified_since, - &shard.get_latest_gc_cutoff_lsn(), - ctx, + #[instrument(level = tracing::Level::DEBUG, skip_all)] + async fn pagesteam_handle_batched_message( + &mut self, + pgb_writer: &mut PostgresBackend, + batch: BatchedFeMessage, + ctx: &RequestContext, + ) -> Result<(), QueryError> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + { + // invoke handler function + let (handler_results, span): (Vec>, _) = + match batch { + BatchedFeMessage::Exists { span, shard, req } => { + fail::fail_point!("ps::handle-pagerequest-message::exists"); + ( + vec![ + self.handle_get_rel_exists_request(&shard, &req, ctx) + .instrument(span.clone()) + .await, + ], + span, ) - // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait - .await - { - Ok(lsn) => lsn, - Err(e) => { - return current_batch_and_error!(e); - } - }; - BatchedFeMessage::GetPage { + } + BatchedFeMessage::Nblocks { span, shard, req } => { + fail::fail_point!("ps::handle-pagerequest-message::nblocks"); + ( + vec![ + self.handle_get_nblocks_request(&shard, &req, ctx) + .instrument(span.clone()) + .await, + ], span, - shard, - effective_request_lsn, - pages: smallvec::smallvec![(rel, blkno)], - } + ) + } + BatchedFeMessage::GetPage { + span, + shard, + effective_request_lsn, + pages, + } => { + fail::fail_point!("ps::handle-pagerequest-message::getpage"); + ( + { + let npages = pages.len(); + trace!(npages, "handling getpage request"); + let res = self + .handle_get_page_at_lsn_request_batched( + &shard, + effective_request_lsn, + pages, + ctx, + ) + .instrument(span.clone()) + .await; + assert_eq!(res.len(), npages); + res + }, + span, + ) + } + BatchedFeMessage::DbSize { span, shard, req } => { + fail::fail_point!("ps::handle-pagerequest-message::dbsize"); + ( + vec![ + self.handle_db_size_request(&shard, &req, ctx) + .instrument(span.clone()) + .await, + ], + span, + ) + } + BatchedFeMessage::GetSlruSegment { span, shard, req } => { + fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); + ( + vec![ + self.handle_get_slru_segment_request(&shard, &req, ctx) + .instrument(span.clone()) + .await, + ], + span, + ) + } + BatchedFeMessage::RespondError { span, error } => { + // We've already decided to respond with an error, so we don't need to + // call the handler. + (vec![Err(error)], span) } }; - // - // batch - // - match (maybe_carry.as_mut(), this_msg) { - (None, this_msg) => { - *maybe_carry = Some(Carry { msg: this_msg, started_at: msg_start }); - } - ( - Some(Carry { msg: BatchedFeMessage::GetPage { - span: _, - shard: accum_shard, - pages: ref mut accum_pages, - effective_request_lsn: accum_lsn, - }, started_at: _}), - BatchedFeMessage::GetPage { - span: _, - shard: this_shard, - pages: this_pages, - effective_request_lsn: this_lsn, - }, - ) if async { - assert_eq!(this_pages.len(), 1); - if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { - trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size"); - assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize); - return false; - } - if (accum_shard.tenant_shard_id, accum_shard.timeline_id) - != (this_shard.tenant_shard_id, this_shard.timeline_id) - { - trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch"); - // TODO: we _could_ batch & execute each shard seperately (and in parallel). - // But the current logic for keeping responses in order does not support that. - return false; + // Map handler result to protocol behavior. + // Some handler errors cause exit from pagestream protocol. + // Other handler errors are sent back as an error message and we stay in pagestream protocol. + for handler_result in handler_results { + let response_msg = match handler_result { + Err(e) => match &e { + PageStreamError::Shutdown => { + // If we fail to fulfil a request during shutdown, which may be _because_ of + // shutdown, then do not send the error to the client. Instead just drop the + // connection. + span.in_scope(|| info!("dropping connection due to shutdown")); + return Err(QueryError::Shutdown); } - // the vectored get currently only supports a single LSN, so, bounce as soon - // as the effective request_lsn changes - if *accum_lsn != this_lsn { - trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed"); - return false; + PageStreamError::Reconnect(reason) => { + span.in_scope(|| info!("handler requested reconnect: {reason}")); + return Err(QueryError::Reconnect); } - true - } - .await => - { - // ok to batch - accum_pages.extend(this_pages); - } - (Some(carry), this_msg) => { - // by default, don't continue batching - let carry = std::mem::replace(carry, - Carry { - msg: this_msg, - started_at: msg_start, + PageStreamError::Read(_) + | PageStreamError::LsnTimeout(_) + | PageStreamError::NotFound(_) + | PageStreamError::BadRequest(_) => { + // print the all details to the log with {:#}, but for the client the + // error message is enough. Do not log if shutting down, as the anyhow::Error + // here includes cancellation which is not an error. + let full = utils::error::report_compact_sources(&e); + span.in_scope(|| { + error!("error reading relation or page version: {full:#}") }); - return Ok(BatchOrEof::Batch(smallvec::smallvec![carry.msg])); - } + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }) + } + }, + Ok(response_msg) => response_msg, + }; + + // marshal & transmit response message + pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; + } + tokio::select! { + biased; + _ = self.cancel.cancelled() => { + // We were requested to shut down. + info!("shutdown request received in page handler"); + return Err(QueryError::Shutdown) + } + res = pgb_writer.flush() => { + res?; } } + Ok(()) } /// Pagestream sub-protocol handler. @@ -883,7 +975,7 @@ impl PageServerHandler { ctx: RequestContext, ) -> Result<(), QueryError> where - IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); @@ -899,165 +991,287 @@ impl PageServerHandler { } } - let mut carry: Option = None; + let pgb_reader = pgb + .split() + .context("implementation error: split pgb into reader and writer")?; + let timeline_handles = self + .timeline_handles + .take() + .expect("implementation error: timeline_handles should not be locked"); + + let request_span = info_span!("request", shard_id = tracing::field::Empty); + let (pgb_reader, timeline_handles) = match self.pipelining_config.clone() { + Some(pipelining_config) => { + self.handle_pagerequests_pipelined( + pgb, + pgb_reader, + tenant_id, + timeline_id, + timeline_handles, + request_span, + pipelining_config, + &ctx, + ) + .await + } + None => { + self.handle_pagerequests_serial( + pgb, + pgb_reader, + tenant_id, + timeline_id, + timeline_handles, + request_span, + &ctx, + ) + .await + } + }?; + + debug!("pagestream subprotocol shut down cleanly"); + + pgb.unsplit(pgb_reader) + .context("implementation error: unsplit pgb")?; + + let replaced = self.timeline_handles.replace(timeline_handles); + assert!(replaced.is_none()); + + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + async fn handle_pagerequests_serial( + &mut self, + pgb_writer: &mut PostgresBackend, + mut pgb_reader: PostgresBackendReader, + tenant_id: TenantId, + timeline_id: TimelineId, + mut timeline_handles: TimelineHandles, + request_span: Span, + ctx: &RequestContext, + ) -> Result<(PostgresBackendReader, TimelineHandles), QueryError> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, + { loop { - let maybe_batched = self - .read_batch_from_connection(pgb, &tenant_id, &timeline_id, &mut carry, &ctx) - .await?; - let batched = match maybe_batched { - BatchOrEof::Batch(b) => b, - BatchOrEof::Eof => { - break; + let msg = Self::pagestream_read_message( + &mut pgb_reader, + tenant_id, + timeline_id, + &mut timeline_handles, + &self.cancel, + ctx, + request_span.clone(), + ) + .await?; + let msg = match msg { + Some(msg) => msg, + None => { + debug!("pagestream subprotocol end observed"); + return Ok((pgb_reader, timeline_handles)); } }; + self.pagesteam_handle_batched_message(pgb_writer, *msg, ctx) + .await?; + } + } - for batch in batched { - // invoke handler function - let (handler_results, span): ( - Vec>, - _, - ) = match batch { - BatchedFeMessage::Exists { span, req } => { - fail::fail_point!("ps::handle-pagerequest-message::exists"); - ( - vec![ - self.handle_get_rel_exists_request( - tenant_id, - timeline_id, - &req, - &ctx, - ) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::Nblocks { span, req } => { - fail::fail_point!("ps::handle-pagerequest-message::nblocks"); - ( - vec![ - self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::GetPage { - span, - shard, - effective_request_lsn, - pages, - } => { - fail::fail_point!("ps::handle-pagerequest-message::getpage"); - ( - { - let npages = pages.len(); - trace!(npages, "handling getpage request"); - let res = self - .handle_get_page_at_lsn_request_batched( - &shard, - effective_request_lsn, - pages, - &ctx, - ) - .instrument(span.clone()) - .await; - assert_eq!(res.len(), npages); - res - }, - span, - ) - } - BatchedFeMessage::DbSize { span, req } => { - fail::fail_point!("ps::handle-pagerequest-message::dbsize"); - ( - vec![ - self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::GetSlruSegment { span, req } => { - fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); - ( - vec![ - self.handle_get_slru_segment_request( - tenant_id, - timeline_id, - &req, - &ctx, - ) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::RespondError { span, error } => { - // We've already decided to respond with an error, so we don't need to - // call the handler. - (vec![Err(error)], span) + /// # Cancel-Safety + /// + /// May leak tokio tasks if not polled to completion. + #[allow(clippy::too_many_arguments)] + async fn handle_pagerequests_pipelined( + &mut self, + pgb_writer: &mut PostgresBackend, + pgb_reader: PostgresBackendReader, + tenant_id: TenantId, + timeline_id: TimelineId, + mut timeline_handles: TimelineHandles, + request_span: Span, + pipelining_config: PageServicePipeliningConfig, + ctx: &RequestContext, + ) -> Result<(PostgresBackendReader, TimelineHandles), QueryError> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, + { + // + // We construct a pipeline of + // - Reading: read messages from pgb + // - Batching: fill the current batch + // - Execution: take the current batch, execute it using get_vectored, and send the response. + // + + let PageServicePipeliningConfig { + max_batch_size, + protocol_pipelining_mode, + } = pipelining_config; + + let cancel = self.cancel.clone(); + + // + // Create Reading future. + // + + let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); + let read_messages = { + let cancel = self.cancel.child_token(); + let ctx = ctx.attached_child(); + async move { + scopeguard::defer! { + debug!("exiting"); + } + let mut pgb_reader = pgb_reader; + loop { + let msg = Self::pagestream_read_message( + &mut pgb_reader, + tenant_id, + timeline_id, + &mut timeline_handles, + &cancel, + &ctx, + request_span.clone(), + ) + .await?; + let msg = match msg { + Some(msg) => msg, + None => { + debug!("pagestream subprotocol end observed"); + break; + } + }; + match requests_tx.send(msg).await { + Ok(()) => {} + Err(tokio::sync::mpsc::error::SendError(_)) => { + debug!("downstream is gone"); + break; + } } + } + Ok((pgb_reader, timeline_handles)) + } + } + .instrument(tracing::info_span!("read_messages")); + + // + // Create Batching future. + // + let (mut batch_tx, mut batch_rx) = spsc_fold::channel(); + let batcher = async move { + scopeguard::defer! { + debug!("exiting"); + } + loop { + let maybe_req = requests_rx.recv().await; + let Some(req) = maybe_req else { + break; }; + let send_res = batch_tx + .send(req, |batch, req| { + Self::pagestream_do_batch(max_batch_size, batch, req) + }) + .await; + match send_res { + Ok(()) => {} + Err(spsc_fold::SendError::ReceiverGone) => { + debug!("downstream is gone"); + break; + } + } + } + } + .instrument(tracing::info_span!("batcher")); - // Map handler result to protocol behavior. - // Some handler errors cause exit from pagestream protocol. - // Other handler errors are sent back as an error message and we stay in pagestream protocol. - for handler_result in handler_results { - let response_msg = match handler_result { - Err(e) => match &e { - PageStreamError::Shutdown => { - // If we fail to fulfil a request during shutdown, which may be _because_ of - // shutdown, then do not send the error to the client. Instead just drop the - // connection. - span.in_scope(|| info!("dropping connection due to shutdown")); - return Err(QueryError::Shutdown); - } - PageStreamError::Reconnect(reason) => { - span.in_scope(|| info!("handler requested reconnect: {reason}")); - return Err(QueryError::Reconnect); - } - PageStreamError::Read(_) - | PageStreamError::LsnTimeout(_) - | PageStreamError::NotFound(_) - | PageStreamError::BadRequest(_) => { - // print the all details to the log with {:#}, but for the client the - // error message is enough. Do not log if shutting down, as the anyhow::Error - // here includes cancellation which is not an error. - let full = utils::error::report_compact_sources(&e); - span.in_scope(|| { - error!("error reading relation or page version: {full:#}") - }); - PagestreamBeMessage::Error(PagestreamErrorResponse { - message: e.to_string(), - }) - } - }, - Ok(response_msg) => response_msg, - }; + // + // Create Executor future. + // - // marshal & transmit response message - pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; - } - tokio::select! { - biased; - _ = self.cancel.cancelled() => { - // We were requested to shut down. - info!("shutdown request received in page handler"); - return Err(QueryError::Shutdown) - } - res = pgb.flush() => { - res?; + let executor = async { + scopeguard::defer! { + debug!("exiting"); + }; + loop { + let batch = match batch_rx.recv().await { + Ok(batch) => batch, + Err(spsc_fold::RecvError::SenderGone) => { + debug!("upstream gone"); + break; } + }; + self.pagesteam_handle_batched_message(pgb_writer, *batch, ctx) + .await?; + } + Ok(()) + } + .instrument(tracing::info_span!("executor")); + + // + // Execute the stages until they exit. + // + // We can either run the pipeline as concurrent futures or we can + // run it in separate tokio tasks. + // + // In any case, we need to be responsive to cancellation (self.cancel). + // The style chosen here is that cancellation must propagate through the + // pipeline: if any stage dies, the whole pipeline dies. + // + // If the client communicates intent to end the pagestream sub-protocol, + // the Reader stage shuts down the pipeline cleanly by sending a `None` + // through the pipeline, resulting in all stages exiting cleanly after + // the last response has been produced. + // + // Unclean pipeline shutdown is initiated by Reader or Executor returning + // a QueryError. This bubbles up to the caller, which will shut down the connection. + + macro_rules! with_noise_on_slow_cancel { + ($fut:ident) => { + timed_after_cancellation( + $fut, + std::stringify!($fut), + Duration::from_millis(100), + &cancel, + ) + }; + } + + let read_messages_res; + let executor_res; + match protocol_pipelining_mode { + PageServiceProtocolPipeliningMode::ConcurrentFutures => { + (read_messages_res, _, executor_res) = { + tokio::join!( + with_noise_on_slow_cancel!(read_messages), + with_noise_on_slow_cancel!(batcher), + with_noise_on_slow_cancel!(executor), + ) } } + PageServiceProtocolPipeliningMode::Tasks => { + // cancelled via sensitivity to self.cancel + let read_messages_task = tokio::task::spawn(read_messages); + // cancelled when it observes read_messages_task disconnect the channel + let batcher_task = tokio::task::spawn(batcher); + let read_messages_task_res; + let batcher_task_res; + (read_messages_task_res, batcher_task_res, executor_res) = tokio::join!( + with_noise_on_slow_cancel!(read_messages_task), + with_noise_on_slow_cancel!(batcher_task), + with_noise_on_slow_cancel!(executor), // not in a separate task + ); + read_messages_res = read_messages_task_res + .context("read_messages task panicked, check logs for details")?; + let _: () = + batcher_task_res.context("batcher task panicked, check logs for details")?; + } + } + + match (read_messages_res, executor_res) { + (Err(e), _) | (_, Err(e)) => { + let e: QueryError = e; + Err(e) + } + (Ok((pgb_reader, timeline_handles)), Ok(())) => Ok((pgb_reader, timeline_handles)), } - Ok(()) } /// Helper function to handle the LSN from client request. @@ -1160,6 +1374,8 @@ impl PageServerHandler { { let timeline = self .timeline_handles + .as_mut() + .unwrap() .get( tenant_shard_id.tenant_id, timeline_id, @@ -1194,22 +1410,17 @@ impl PageServerHandler { #[instrument(skip_all, fields(shard_id))] async fn handle_get_rel_exists_request( &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, + timeline: &Timeline, req: &PagestreamExistsRequest, ctx: &RequestContext, ) -> Result { - let timeline = self - .timeline_handles - .get(tenant_id, timeline_id, ShardSelector::Zero) - .await?; let _timer = timeline .query_metrics .start_timer(metrics::SmgrQueryType::GetRelExists, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1229,23 +1440,17 @@ impl PageServerHandler { #[instrument(skip_all, fields(shard_id))] async fn handle_get_nblocks_request( &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, + timeline: &Timeline, req: &PagestreamNblocksRequest, ctx: &RequestContext, ) -> Result { - let timeline = self - .timeline_handles - .get(tenant_id, timeline_id, ShardSelector::Zero) - .await?; - let _timer = timeline .query_metrics .start_timer(metrics::SmgrQueryType::GetRelSize, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1265,23 +1470,17 @@ impl PageServerHandler { #[instrument(skip_all, fields(shard_id))] async fn handle_db_size_request( &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, + timeline: &Timeline, req: &PagestreamDbSizeRequest, ctx: &RequestContext, ) -> Result { - let timeline = self - .timeline_handles - .get(tenant_id, timeline_id, ShardSelector::Zero) - .await?; - let _timer = timeline .query_metrics .start_timer(metrics::SmgrQueryType::GetDbSize, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1329,23 +1528,17 @@ impl PageServerHandler { #[instrument(skip_all, fields(shard_id))] async fn handle_get_slru_segment_request( &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, + timeline: &Timeline, req: &PagestreamGetSlruSegmentRequest, ctx: &RequestContext, ) -> Result { - let timeline = self - .timeline_handles - .get(tenant_id, timeline_id, ShardSelector::Zero) - .await?; - let _timer = timeline .query_metrics .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1403,6 +1596,8 @@ impl PageServerHandler { let timeline = self .timeline_handles + .as_mut() + .unwrap() .get(tenant_id, timeline_id, ShardSelector::Zero) .await?; @@ -1745,7 +1940,7 @@ impl PageServiceCmd { impl postgres_backend::Handler for PageServerHandler where - IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { fn check_auth_jwt( &mut self, diff --git a/test_runner/performance/pageserver/test_page_service_batching.py b/test_runner/performance/pageserver/test_page_service_batching.py new file mode 100644 index 000000000000..669ce32d57d2 --- /dev/null +++ b/test_runner/performance/pageserver/test_page_service_batching.py @@ -0,0 +1,373 @@ +import dataclasses +import json +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Optional, Union + +import pytest +from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn +from fixtures.utils import humantime_to_ms + +TARGET_RUNTIME = 30 + + +@dataclass +class PageServicePipeliningConfig: + max_batch_size: int + protocol_pipelining_mode: str + + +PROTOCOL_PIPELINING_MODES = ["concurrent-futures", "tasks"] + +NON_BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None] +for max_batch_size in [1, 32]: + for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: + NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)) + +BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None] +for max_batch_size in [1, 2, 4, 8, 16, 32]: + for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: + BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)) + + +@pytest.mark.parametrize( + "tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name", + [ + # non-batchable workloads + # (A separate benchmark will consider latency). + *[ + ( + 50, + config, + TARGET_RUNTIME, + 1, + 128, + f"not batchable {dataclasses.asdict(config) if config else None}", + ) + for config in NON_BATCHABLE + ], + # batchable workloads should show throughput and CPU efficiency improvements + *[ + ( + 50, + config, + TARGET_RUNTIME, + 100, + 128, + f"batchable {dataclasses.asdict(config) if config else None}", + ) + for config in BATCHABLE + ], + ], +) +def test_throughput( + neon_env_builder: NeonEnvBuilder, + zenbenchmark: NeonBenchmarker, + tablesize_mib: int, + pipelining_config: None | PageServicePipeliningConfig, + target_runtime: int, + effective_io_concurrency: int, + readhead_buffer_size: int, + name: str, +): + """ + Do a bunch of sequential scans with varying compute and pipelining configurations. + Primary performance metrics are the achieved batching factor and throughput (wall clock time). + Resource utilization is also interesting - we currently measure CPU time. + + The test is a fixed-runtime based type of test (target_runtime). + Hence, the results are normalized to the number of iterations completed within target runtime. + + If the compute doesn't provide pipeline depth (effective_io_concurrency=1), + performance should be about identical in all configurations. + Pipelining can still yield improvements in these scenarios because it parses the + next request while the current one is still being executed. + + If the compute provides pipeline depth (effective_io_concurrency=100), then + pipelining configs, especially with max_batch_size>1 should yield dramatic improvements + in all performance metrics. + """ + + # + # record perf-related parameters as metrics to simplify processing of results + # + params: dict[str, tuple[Union[float, int], dict[str, Any]]] = {} + + params.update( + { + "tablesize_mib": (tablesize_mib, {"unit": "MiB"}), + "pipelining_enabled": (1 if pipelining_config else 0, {}), + # target_runtime is just a polite ask to the workload to run for this long + "effective_io_concurrency": (effective_io_concurrency, {}), + "readhead_buffer_size": (readhead_buffer_size, {}), + # name is not a metric, we just use it to identify the test easily in the `test_...[...]`` notation + } + ) + if pipelining_config: + params.update( + { + f"pipelining_config.{k}": (v, {}) + for k, v in dataclasses.asdict(pipelining_config).items() + } + ) + + log.info("params: %s", params) + + for param, (value, kwargs) in params.items(): + zenbenchmark.record( + param, + metric_value=value, + unit=kwargs.pop("unit", ""), + report=MetricReport.TEST_PARAM, + **kwargs, + ) + + # + # Setup + # + + env = neon_env_builder.init_start() + ps_http = env.pageserver.http_client() + endpoint = env.endpoints.create_start("main") + conn = endpoint.connect() + cur = conn.cursor() + + cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends + cur.execute(f"SET effective_io_concurrency={effective_io_concurrency}") + cur.execute( + f"SET neon.readahead_buffer_size={readhead_buffer_size}" + ) # this is the current default value, but let's hard-code that + + cur.execute("CREATE EXTENSION IF NOT EXISTS neon;") + cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;") + + log.info("Filling the table") + cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)") + tablesize = tablesize_mib * 1024 * 1024 + npages = tablesize // (8 * 1024) + cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,)) + # TODO: can we force postgres to do sequential scans? + + # + # Run the workload, collect `Metrics` before and after, calculate difference, normalize. + # + + @dataclass + class Metrics: + time: float + pageserver_getpage_count: float + pageserver_vectored_get_count: float + compute_getpage_count: float + pageserver_cpu_seconds_total: float + + def __sub__(self, other: "Metrics") -> "Metrics": + return Metrics( + time=self.time - other.time, + pageserver_getpage_count=self.pageserver_getpage_count + - other.pageserver_getpage_count, + pageserver_vectored_get_count=self.pageserver_vectored_get_count + - other.pageserver_vectored_get_count, + compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count, + pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total + - other.pageserver_cpu_seconds_total, + ) + + def normalize(self, by) -> "Metrics": + return Metrics( + time=self.time / by, + pageserver_getpage_count=self.pageserver_getpage_count / by, + pageserver_vectored_get_count=self.pageserver_vectored_get_count / by, + compute_getpage_count=self.compute_getpage_count / by, + pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by, + ) + + def get_metrics() -> Metrics: + with conn.cursor() as cur: + cur.execute( + "select value from neon_perf_counters where metric='getpage_wait_seconds_count';" + ) + compute_getpage_count = cur.fetchall()[0][0] + pageserver_metrics = ps_http.get_metrics() + return Metrics( + time=time.time(), + pageserver_getpage_count=pageserver_metrics.query_one( + "pageserver_smgr_query_seconds_count", {"smgr_query_type": "get_page_at_lsn"} + ).value, + pageserver_vectored_get_count=pageserver_metrics.query_one( + "pageserver_get_vectored_seconds_count", {"task_kind": "PageRequestHandler"} + ).value, + compute_getpage_count=compute_getpage_count, + pageserver_cpu_seconds_total=pageserver_metrics.query_one( + "libmetrics_process_cpu_seconds_highres" + ).value, + ) + + def workload() -> Metrics: + start = time.time() + iters = 0 + while time.time() - start < target_runtime or iters < 2: + log.info("Seqscan %d", iters) + if iters == 1: + # round zero for warming up + before = get_metrics() + cur.execute( + "select clear_buffer_cache()" + ) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests + cur.execute("select sum(data::bigint) from t") + assert cur.fetchall()[0][0] == npages * (npages + 1) // 2 + iters += 1 + after = get_metrics() + return (after - before).normalize(iters - 1) + + env.pageserver.patch_config_toml_nonrecursive( + {"page_service_pipelining": dataclasses.asdict(pipelining_config)} + if pipelining_config is not None + else {} + ) + env.pageserver.restart() + metrics = workload() + + log.info("Results: %s", metrics) + + # + # Sanity-checks on the collected data + # + # assert that getpage counts roughly match between compute and ps + assert metrics.pageserver_getpage_count == pytest.approx( + metrics.compute_getpage_count, rel=0.01 + ) + + # + # Record the results + # + + for metric, value in dataclasses.asdict(metrics).items(): + zenbenchmark.record(f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM) + + zenbenchmark.record( + "perfmetric.batching_factor", + metrics.pageserver_getpage_count / metrics.pageserver_vectored_get_count, + unit="", + report=MetricReport.HIGHER_IS_BETTER, + ) + + +PRECISION_CONFIGS: list[Optional[PageServicePipeliningConfig]] = [None] +for max_batch_size in [1, 32]: + for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: + PRECISION_CONFIGS.append( + PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode) + ) + + +@pytest.mark.parametrize( + "pipelining_config,name", + [(config, f"{dataclasses.asdict(config) if config else None}") for config in PRECISION_CONFIGS], +) +def test_latency( + neon_env_builder: NeonEnvBuilder, + zenbenchmark: NeonBenchmarker, + pg_bin: PgBin, + pipelining_config: Optional[PageServicePipeliningConfig], + name: str, +): + """ + Measure the latency impact of pipelining in an un-batchable workloads. + + An ideal implementation should not increase average or tail latencies for such workloads. + + We don't have support in pagebench to create queue depth yet. + => https://github.com/neondatabase/neon/issues/9837 + """ + + # + # Setup + # + + def patch_ps_config(ps_config): + if pipelining_config is not None: + ps_config["page_service_pipelining"] = dataclasses.asdict(pipelining_config) + + neon_env_builder.pageserver_config_override = patch_ps_config + + env = neon_env_builder.init_start() + endpoint = env.endpoints.create_start("main") + conn = endpoint.connect() + cur = conn.cursor() + + cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends + cur.execute("SET effective_io_concurrency=1") + + cur.execute("CREATE EXTENSION IF NOT EXISTS neon;") + cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;") + + log.info("Filling the table") + cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)") + tablesize = 50 * 1024 * 1024 + npages = tablesize // (8 * 1024) + cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,)) + # TODO: can we force postgres to do sequential scans? + + cur.close() + conn.close() + + wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline) + + endpoint.stop() + + for sk in env.safekeepers: + sk.stop() + + # + # Run single-threaded pagebench (TODO: dedup with other benchmark code) + # + + env.pageserver.allowed_errors.append( + # https://github.com/neondatabase/neon/issues/6925 + r".*query handler for.*pagestream.*failed: unexpected message: CopyFail during COPY.*" + ) + + ps_http = env.pageserver.http_client() + + cmd = [ + str(env.neon_binpath / "pagebench"), + "get-page-latest-lsn", + "--mgmt-api-endpoint", + ps_http.base_url, + "--page-service-connstring", + env.pageserver.connstr(password=None), + "--num-clients", + "1", + "--runtime", + "10s", + ] + log.info(f"command: {' '.join(cmd)}") + basepath = pg_bin.run_capture(cmd, with_command_header=False) + results_path = Path(basepath + ".stdout") + log.info(f"Benchmark results at: {results_path}") + + with open(results_path) as f: + results = json.load(f) + log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}") + + total = results["total"] + + metric = "latency_mean" + zenbenchmark.record( + metric, + metric_value=humantime_to_ms(total[metric]), + unit="ms", + report=MetricReport.LOWER_IS_BETTER, + ) + + metric = "latency_percentiles" + for k, v in total[metric].items(): + zenbenchmark.record( + f"{metric}.{k}", + metric_value=humantime_to_ms(v), + unit="ms", + report=MetricReport.LOWER_IS_BETTER, + ) diff --git a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py deleted file mode 100644 index 272446b73c6d..000000000000 --- a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py +++ /dev/null @@ -1,196 +0,0 @@ -import dataclasses -import time -from dataclasses import dataclass -from typing import Any, Optional - -import pytest -from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker -from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder -from fixtures.utils import humantime_to_ms - -TARGET_RUNTIME = 60 - - -@pytest.mark.parametrize( - "tablesize_mib, batch_timeout, target_runtime, effective_io_concurrency, readhead_buffer_size, name", - [ - # the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout - (50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"), - (50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"), - (50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"), - # the next 4 cases demonstrate how batchable workloads benefit from batching - (50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"), - (50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"), - (50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"), - (50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"), - ], -) -def test_getpage_merge_smoke( - neon_env_builder: NeonEnvBuilder, - zenbenchmark: NeonBenchmarker, - tablesize_mib: int, - batch_timeout: Optional[str], - target_runtime: int, - effective_io_concurrency: int, - readhead_buffer_size: int, - name: str, -): - """ - Do a bunch of sequential scans and ensure that the pageserver does some merging. - """ - - # - # record perf-related parameters as metrics to simplify processing of results - # - params: dict[str, tuple[float | int, dict[str, Any]]] = {} - - params.update( - { - "tablesize_mib": (tablesize_mib, {"unit": "MiB"}), - "batch_timeout": ( - -1 if batch_timeout is None else 1e3 * humantime_to_ms(batch_timeout), - {"unit": "us"}, - ), - # target_runtime is just a polite ask to the workload to run for this long - "effective_io_concurrency": (effective_io_concurrency, {}), - "readhead_buffer_size": (readhead_buffer_size, {}), - # name is not a metric - } - ) - - log.info("params: %s", params) - - for param, (value, kwargs) in params.items(): - zenbenchmark.record( - param, - metric_value=value, - unit=kwargs.pop("unit", ""), - report=MetricReport.TEST_PARAM, - **kwargs, - ) - - # - # Setup - # - - env = neon_env_builder.init_start() - ps_http = env.pageserver.http_client() - endpoint = env.endpoints.create_start("main") - conn = endpoint.connect() - cur = conn.cursor() - - cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends - cur.execute(f"SET effective_io_concurrency={effective_io_concurrency}") - cur.execute( - f"SET neon.readahead_buffer_size={readhead_buffer_size}" - ) # this is the current default value, but let's hard-code that - - cur.execute("CREATE EXTENSION IF NOT EXISTS neon;") - cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;") - - log.info("Filling the table") - cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)") - tablesize = tablesize_mib * 1024 * 1024 - npages = tablesize // (8 * 1024) - cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,)) - # TODO: can we force postgres to do sequential scans? - - # - # Run the workload, collect `Metrics` before and after, calculate difference, normalize. - # - - @dataclass - class Metrics: - time: float - pageserver_getpage_count: float - pageserver_vectored_get_count: float - compute_getpage_count: float - pageserver_cpu_seconds_total: float - - def __sub__(self, other: "Metrics") -> "Metrics": - return Metrics( - time=self.time - other.time, - pageserver_getpage_count=self.pageserver_getpage_count - - other.pageserver_getpage_count, - pageserver_vectored_get_count=self.pageserver_vectored_get_count - - other.pageserver_vectored_get_count, - compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count, - pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total - - other.pageserver_cpu_seconds_total, - ) - - def normalize(self, by) -> "Metrics": - return Metrics( - time=self.time / by, - pageserver_getpage_count=self.pageserver_getpage_count / by, - pageserver_vectored_get_count=self.pageserver_vectored_get_count / by, - compute_getpage_count=self.compute_getpage_count / by, - pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by, - ) - - def get_metrics() -> Metrics: - with conn.cursor() as cur: - cur.execute( - "select value from neon_perf_counters where metric='getpage_wait_seconds_count';" - ) - compute_getpage_count = cur.fetchall()[0][0] - pageserver_metrics = ps_http.get_metrics() - return Metrics( - time=time.time(), - pageserver_getpage_count=pageserver_metrics.query_one( - "pageserver_smgr_query_seconds_count", {"smgr_query_type": "get_page_at_lsn"} - ).value, - pageserver_vectored_get_count=pageserver_metrics.query_one( - "pageserver_get_vectored_seconds_count", {"task_kind": "PageRequestHandler"} - ).value, - compute_getpage_count=compute_getpage_count, - pageserver_cpu_seconds_total=pageserver_metrics.query_one( - "libmetrics_process_cpu_seconds_highres" - ).value, - ) - - def workload() -> Metrics: - start = time.time() - iters = 0 - while time.time() - start < target_runtime or iters < 2: - log.info("Seqscan %d", iters) - if iters == 1: - # round zero for warming up - before = get_metrics() - cur.execute( - "select clear_buffer_cache()" - ) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests - cur.execute("select sum(data::bigint) from t") - assert cur.fetchall()[0][0] == npages * (npages + 1) // 2 - iters += 1 - after = get_metrics() - return (after - before).normalize(iters - 1) - - env.pageserver.patch_config_toml_nonrecursive({"server_side_batch_timeout": batch_timeout}) - env.pageserver.restart() - metrics = workload() - - log.info("Results: %s", metrics) - - # - # Sanity-checks on the collected data - # - # assert that getpage counts roughly match between compute and ps - assert metrics.pageserver_getpage_count == pytest.approx( - metrics.compute_getpage_count, rel=0.01 - ) - - # - # Record the results - # - - for metric, value in dataclasses.asdict(metrics).items(): - zenbenchmark.record(f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM) - - zenbenchmark.record( - "perfmetric.batching_factor", - metrics.pageserver_getpage_count / metrics.pageserver_vectored_get_count, - unit="", - report=MetricReport.HIGHER_IS_BETTER, - )