Skip to content

Commit

Permalink
WIP: batching observability improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Nov 25, 2024
1 parent 6ec5ac1 commit c4f92a2
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 51 deletions.
53 changes: 22 additions & 31 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1178,19 +1178,21 @@ pub(crate) mod virtual_file_io_engine {
});
}

struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
struct GlobalAndPerTimelineHistogramTimer<'a, 'c, I>
where
I: IntoIterator<Item = std::time::Instant> + ExactSizeIterator,
{
global_latency_histo: &'a Histogram,

// Optional because not all op types are tracked per-timeline
per_timeline_latency_histo: Option<&'a Histogram>,

ctx: &'c RequestContext,
start: std::time::Instant,
starts: I,
op: SmgrQueryType,
count: usize,
}

impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_> {
impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_, _> {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
let ex_throttled = self
Expand Down Expand Up @@ -1392,37 +1394,27 @@ impl SmgrQueryTimePerTimeline {
) -> Option<impl Drop + 'a> {
self.start_timer_many(op, 1, ctx)
}
pub(crate) fn start_timer_many<'c: 'a, 'a>(

pub(crate) fn start_timer_at<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
count: usize,
start: Instant,
ctx: &'c RequestContext,
) -> Option<impl Drop + 'a> {
let start = Instant::now();

self.global_started[op as usize].inc();

// We subtract time spent throttled from the observed latency.
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[op];
rate_limit.call(|| {
warn!(?op, error, "error opening micros_spent_throttled; this message is logged at a global rate limit");
});
}
}
self.start_timer_at_many(op, std::iter::once(start), ctx)
}

pub(crate) fn start_timer_at_many<'c: 'a, 'a, T>(
&'a self,
op: SmgrQueryType,
starts: T,
ctx: &'c RequestContext,
) -> Option<impl Drop + 'a>
where
T: IntoIterator<Item = Instant> + ExactSizeIterator,
{
let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) {
self.per_timeline_getpage_started.inc();
self.per_timeline_getpage_started.inc_by(starts.len());
Some(&self.per_timeline_getpage_latency)
} else {
None
Expand All @@ -1432,9 +1424,8 @@ impl SmgrQueryTimePerTimeline {
global_latency_histo: &self.global_latency[op as usize],
per_timeline_latency_histo,
ctx,
start,
op,
count,
starts,
})
}
}
Expand Down
69 changes: 55 additions & 14 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,27 +536,31 @@ impl From<WaitLsnError> for QueryError {
enum BatchedFeMessage {
Exists {
span: Span,
received_at: Instant,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamExistsRequest,
},
Nblocks {
span: Span,
received_at: Instant,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamNblocksRequest,
},
GetPage {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
effective_request_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
pages: smallvec::SmallVec<[(RelTag, BlockNumber, Instant); 1]>,
},
DbSize {
span: Span,
received_at: Instant,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamDbSizeRequest,
},
GetSlruSegment {
span: Span,
received_at: Instant,
shard: timeline::handle::Handle<TenantManagerTypes>,
req: models::PagestreamGetSlruSegmentRequest,
},
Expand Down Expand Up @@ -628,6 +632,8 @@ impl PageServerHandler {
msg = pgb.read_message() => { msg }
};

let received_at = Instant::now();

let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => {
Expand Down Expand Up @@ -656,31 +662,51 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
BatchedFeMessage::Exists { span, shard, req }
BatchedFeMessage::Exists {
span,
received_at,
shard,
req,
}
}
PagestreamFeMessage::Nblocks(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
BatchedFeMessage::Nblocks { span, shard, req }
BatchedFeMessage::Nblocks {
span,
received_at,
shard,
req,
}
}
PagestreamFeMessage::DbSize(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
BatchedFeMessage::DbSize { span, shard, req }
BatchedFeMessage::DbSize {
span,
received_at,
shard,
req,
}
}
PagestreamFeMessage::GetSlruSegment(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
let shard = timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
BatchedFeMessage::GetSlruSegment { span, shard, req }
BatchedFeMessage::GetSlruSegment {
span,
received_at,
shard,
req,
}
}
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
request_lsn,
Expand Down Expand Up @@ -743,7 +769,7 @@ impl PageServerHandler {
span,
shard,
effective_request_lsn,
pages: smallvec::smallvec![(rel, blkno)],
pages: smallvec::smallvec![(rel, blkno, received_at)],
}
}
};
Expand Down Expand Up @@ -830,7 +856,12 @@ impl PageServerHandler {
// invoke handler function
let (handler_results, span): (Vec<Result<PagestreamBeMessage, PageStreamError>>, _) =
match batch {
BatchedFeMessage::Exists { span, shard, req } => {
BatchedFeMessage::Exists {
span,
received_at: _,
shard,
req,
} => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
(
vec![
Expand All @@ -841,7 +872,12 @@ impl PageServerHandler {
span,
)
}
BatchedFeMessage::Nblocks { span, shard, req } => {
BatchedFeMessage::Nblocks {
span,
received_at,
shard,
req,
} => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
(
vec![
Expand Down Expand Up @@ -1409,11 +1445,12 @@ impl PageServerHandler {
&mut self,
timeline: &Timeline,
req: &PagestreamExistsRequest,
received_at: Instant,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
.start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at, ctx);

let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
Expand All @@ -1439,11 +1476,12 @@ impl PageServerHandler {
&mut self,
timeline: &Timeline,
req: &PagestreamNblocksRequest,
received_at: Instant,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
.start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at, ctx);

let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
Expand All @@ -1469,11 +1507,14 @@ impl PageServerHandler {
&mut self,
timeline: &Timeline,
req: &PagestreamDbSizeRequest,
received_at: Instant,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
let _timer = timeline.query_metrics.start_timer_at(
metrics::SmgrQueryType::GetDbSize,
received_at,
ctx,
);

let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
Expand All @@ -1500,7 +1541,7 @@ impl PageServerHandler {
&mut self,
timeline: &Timeline,
effective_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
pages: smallvec::SmallVec<[(RelTag, BlockNumber, Instant); 1]>,
ctx: &RequestContext,
) -> Vec<Result<PagestreamBeMessage, PageStreamError>> {
debug_assert_current_span_has_tenant_and_timeline_id();
Expand Down
12 changes: 6 additions & 6 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl Timeline {
Version::Lsn(effective_lsn) => {
let pages = smallvec::smallvec![(tag, blknum)];
let res = self
.get_rel_page_at_lsn_batched(pages, effective_lsn, ctx)
.get_rel_page_at_lsn_batched(&pages, effective_lsn, ctx)
.await;
assert_eq!(res.len(), 1);
res.into_iter().next().unwrap()
Expand Down Expand Up @@ -237,7 +237,7 @@ impl Timeline {
/// The ordering of the returned vec corresponds to the ordering of `pages`.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
pages: &smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
effective_lsn: Lsn,
ctx: &RequestContext,
) -> Vec<Result<Bytes, PageReconstructError>> {
Expand All @@ -251,7 +251,7 @@ impl Timeline {
let result_slots = result.spare_capacity_mut();

let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
for (response_slot_idx, (tag, blknum)) in pages.into_iter().enumerate() {
for (response_slot_idx, (tag, blknum)) in pages.iter().enumerate() {
if tag.relnode == 0 {
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
Expand All @@ -262,7 +262,7 @@ impl Timeline {
}

let nblocks = match self
.get_rel_size(tag, Version::Lsn(effective_lsn), ctx)
.get_rel_size(*tag, Version::Lsn(effective_lsn), ctx)
.await
{
Ok(nblocks) => nblocks,
Expand All @@ -273,7 +273,7 @@ impl Timeline {
}
};

if blknum >= nblocks {
if *blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag, blknum, effective_lsn, nblocks
Expand All @@ -283,7 +283,7 @@ impl Timeline {
continue;
}

let key = rel_block_to_key(tag, blknum);
let key = rel_block_to_key(*tag, *blknum);

let key_slots = keys_slots.entry(key).or_default();
key_slots.push(response_slot_idx);
Expand Down

0 comments on commit c4f92a2

Please sign in to comment.