Skip to content

Commit

Permalink
[home-mixer] don't offload scoredCandidateFeaturesCache
Browse files Browse the repository at this point in the history
scoredCandidateFeaturesCache accounts for 60% of offload tasks issued by client OffloadFilter.
These tasks are very short (p999 is 50us), nobody waits for their reply (it's a side effect) and therefore can be safely executed by a Netty thread without context switch.

JIRA Issues: STOR-8861

Differential Revision: https://phabricator.twitter.biz/D1190370
  • Loading branch information
Anton Ivanov authored and jenkins committed Dec 30, 2024
1 parent 7954703 commit 786f06c
Showing 1 changed file with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ object OffloadFilter {
private[finagle] def server[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new ServerModule[Req, Rep]

private[this] val offloadsDisabledLocal = new Local[Unit]

/**
* Disables offloads for the enclosed scope.
* The scope must return Unit,
* this ensures no callbacks can be added to the result.
*/
def withOffloadsDisabled(f: => Unit): Unit = {
offloadsDisabledLocal.let(()) {
f
}
}

final class Client[Req, Rep](pool: FuturePool, statsReceiver: StatsReceiver)
extends SimpleFilter[Req, Rep] {

Expand Down Expand Up @@ -98,21 +111,26 @@ object OffloadFilter {
// You would be surprised but this can happen. Same simulations report that we lose a race in
// about 1 in 1 000 000 of cases this way (0.0001%).
val response = service(request)
val shifted = Promise.interrupts[Rep](response)
response.respond { t =>
pool {
val startNs = System.nanoTime()
shifted.update(t)
applyTimeNs.add(System.nanoTime() - startNs)
}
if (offloadsDisabledLocal().isDefined) {
response

} else {
val shifted = Promise.interrupts[Rep](response)
response.respond { t =>
pool {
val startNs = System.nanoTime()
shifted.update(t)
applyTimeNs.add(System.nanoTime() - startNs)
}

val tracing = Trace()
if (tracing.isActivelyTracing) {
tracing.recordBinary(ClientAnnotationKey, pool.poolSize)
val tracing = Trace()
if (tracing.isActivelyTracing) {
tracing.recordBinary(ClientAnnotationKey, pool.poolSize)
}
}
}

shifted
shifted
}
}
}

Expand Down

0 comments on commit 786f06c

Please sign in to comment.