From e4a911e8e9c7bc981ce616b5218cccb4e7cbaa34 Mon Sep 17 00:00:00 2001 From: Ivan Gorbachev Date: Wed, 27 Nov 2024 14:40:53 +0000 Subject: [PATCH] finagle-core: Track queue delay and send latency in pipelining client Problem We need to measure the delays between submission to the Netty epoll loop queue and task execution, as well as the latencies for message sending, including socket writes. Solution Add epoll_queue_delay_ns and message_send_latency_ns to PipeliningClientPushSession Differential Revision: https://phabricator.twitter.biz/D1185421 --- CHANGELOG.rst | 2 + .../PipeliningClientPushSession.scala | 38 ++++++++++++++++--- .../scala/com/twitter/finagle/Memcached.scala | 3 +- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 9a8ca0b09b8..b10c1cca230 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -21,6 +21,8 @@ Runtime Behavior Changes Previously it was `[0, dur)`, which could result in `next.duration < duration` for arbitrary long invocation chains. ``PHAB_ID=D1182252`` * finagle-core: `Backoff.equalJittered` is now deprecated and falls back to `exponentialJittered`. ``PHAB_ID=D1182535`` +* finagle-core: `PipeliningClientPushSession` now collects stats `epoll_queue_delay_ns` and `message_send_latency_ns`. + ``PHAB_ID=D1185421`` New Features ~~~~~~~~~~ diff --git a/finagle-core/src/main/scala/com/twitter/finagle/pushsession/PipeliningClientPushSession.scala b/finagle-core/src/main/scala/com/twitter/finagle/pushsession/PipeliningClientPushSession.scala index d9dd1bfa6fc..6a2f0199961 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/pushsession/PipeliningClientPushSession.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/pushsession/PipeliningClientPushSession.scala @@ -5,6 +5,12 @@ import com.twitter.finagle.Failure import com.twitter.finagle.FailureFlags import com.twitter.finagle.Service import com.twitter.finagle.Status +import com.twitter.finagle.stats.DefaultStatsReceiver +import com.twitter.finagle.stats.HistogramFormat +import com.twitter.finagle.stats.MetricBuilder +import com.twitter.finagle.stats.MetricBuilder.HistogramType +import com.twitter.finagle.stats.MetricUsageHint +import com.twitter.finagle.stats.StatsReceiver import com.twitter.logging.Logger import com.twitter.util._ @@ -22,13 +28,29 @@ import com.twitter.util._ * result of another request unless the connection is stuck, and does not * look like it will make progress. Use `stallTimeout` to configure this timeout. */ -final class PipeliningClientPushSession[In, Out]( +class PipeliningClientPushSession[In, Out]( handle: PushChannelHandle[In, Out], stallTimeout: Duration, - timer: Timer) + timer: Timer, + statsReceiver: StatsReceiver = DefaultStatsReceiver) extends PushSession[In, Out](handle) { self => private[this] val logger = Logger.get + private[this] val scopedStatsReceived = statsReceiver.scope("pipelining_client") + private[this] val epollQueueDelay = scopedStatsReceived.stat( + MetricBuilder(metricType = HistogramType) + .withHistogramFormat(HistogramFormat.FullSummary) + .withPercentiles(0.5, 0.9, 0.99, 0.999, 0.9999) + .withMetricUsageHints(Set(MetricUsageHint.HighContention)) + .withName("epoll_queue_delay_ns") + ) + private[this] val messageSendLatency = scopedStatsReceived.stat( + MetricBuilder(metricType = HistogramType) + .withHistogramFormat(HistogramFormat.FullSummary) + .withPercentiles(0.5, 0.9, 0.99, 0.999, 0.9999) + .withMetricUsageHints(Set(MetricUsageHint.HighContention)) + .withName("message_send_latency_ns") + ) // used only within SerialExecutor private[this] val h_queue = new java.util.ArrayDeque[Promise[In]]() @@ -93,7 +115,9 @@ final class PipeliningClientPushSession[In, Out]( }) } } - handle.serialExecutor.execute(new Runnable { def run(): Unit = handleDispatch(request, p) }) + + val requestStartTime = System.nanoTime() + handle.serialExecutor.execute(() => handleDispatch(request, p, requestStartTime)) p } @@ -123,12 +147,16 @@ final class PipeliningClientPushSession[In, Out]( } } - private[this] def handleDispatch(request: Out, p: Promise[In]): Unit = { + private[this] def handleDispatch(request: Out, p: Promise[In], requestStartTime: Long): Unit = { + val handleStartTime = System.nanoTime() + epollQueueDelay.add(handleStartTime - requestStartTime) if (!h_running) p.setException(new ChannelClosedException(handle.remoteAddress)) else { h_queue.offer(p) h_queueSize += 1 - handle.sendAndForget(request) + handle.send(request) { _ => + messageSendLatency.add(System.nanoTime() - handleStartTime) + } } } } diff --git a/finagle-memcached/src/main/scala/com/twitter/finagle/Memcached.scala b/finagle-memcached/src/main/scala/com/twitter/finagle/Memcached.scala index 86c1b88dc96..17a33f183ab 100644 --- a/finagle-memcached/src/main/scala/com/twitter/finagle/Memcached.scala +++ b/finagle-memcached/src/main/scala/com/twitter/finagle/Memcached.scala @@ -238,7 +238,8 @@ object Memcached extends finagle.Client[Command, Response] with finagle.Server[C new PipeliningClientPushSession[Response, Command]( handle, params[StalledPipelineTimeout].timeout, - params[finagle.param.Timer].timer + params[finagle.param.Timer].timer, + statsReceiver = params[Stats].statsReceiver, ) ) }