diff --git a/finagle-core/src/main/scala/com/twitter/finagle/context/BackupRequest.scala b/finagle-core/src/main/scala/com/twitter/finagle/context/BackupRequest.scala index 52074a8aa35..5096c6dc926 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/context/BackupRequest.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/context/BackupRequest.scala @@ -1,6 +1,8 @@ package com.twitter.finagle.context import com.twitter.io.Buf -import com.twitter.util.{Return, Throw, Try} +import com.twitter.util.Return +import com.twitter.util.Throw +import com.twitter.util.Try private[finagle] final class BackupRequest private () @@ -34,7 +36,7 @@ object BackupRequest { } } - private[finagle] val Ctx: Contexts.broadcast.Key[BackupRequest] = new Context + private[twitter] val Ctx: Contexts.broadcast.Key[BackupRequest] = new Context /** * Whether or not a request was initiated as a backup request. diff --git a/finagle-core/src/main/scala/com/twitter/finagle/context/MarshalledContext.scala b/finagle-core/src/main/scala/com/twitter/finagle/context/MarshalledContext.scala index 2dba2414df2..f9dfa8d4ccc 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/context/MarshalledContext.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/context/MarshalledContext.scala @@ -82,7 +82,7 @@ final class MarshalledContext private[context] extends Context { /** * The identifier used to lookup the key in the stored context. */ - private[context] final val lookupId: String = normalizeId(id) + private[finagle] final val lookupId: String = normalizeId(id) /** * Marshal an A-typed value into a Buf. diff --git a/finagle-core/src/main/scala/com/twitter/finagle/filter/ClearBroadcastContextFilter.scala b/finagle-core/src/main/scala/com/twitter/finagle/filter/ClearBroadcastContextFilter.scala new file mode 100644 index 00000000000..c7ae9dae330 --- /dev/null +++ b/finagle-core/src/main/scala/com/twitter/finagle/filter/ClearBroadcastContextFilter.scala @@ -0,0 +1,36 @@ +package com.twitter.finagle.filter + +import com.twitter.finagle.context.Contexts +import com.twitter.finagle._ +import com.twitter.util.Future + +/** + * ClearBroadcastContextFilter clears the broadcast context of all context keys except those specified. + */ +private[twitter] object ClearBroadcastContextFilter { + val role = Stack.Role("ClearBroadcastContextFilter") + + /** + * Creates a [[com.twitter.finagle.Stackable]] [[com.twitter.finagle.Filter]] that clears the + * broadcast context of all context keys except those specified. + */ + def module[Req, Rep]( + retainKeys: Set[Contexts.broadcast.Key[_]] + ): Stackable[ServiceFactory[Req, Rep]] = + new Stack.Module0[ServiceFactory[Req, Rep]] { + val role = ClearBroadcastContextFilter.role + val description = + s"Clears the broadcast context of all keys except: ${retainKeys.mkString(",")}" + val lookupIds = retainKeys.map(_.lookupId) + def make(next: ServiceFactory[Req, Rep]): ServiceFactory[Req, Rep] = { + val filter = new SimpleFilter[Req, Rep] { + def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = { + Contexts.broadcast.retainIds(lookupIds) { + service(request) + } + } + } + filter.andThen(next) + } + } +} diff --git a/finagle-core/src/main/scala/com/twitter/finagle/thrift/ClientId.scala b/finagle-core/src/main/scala/com/twitter/finagle/thrift/ClientId.scala index bfe23d553cf..de3263bad1d 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/thrift/ClientId.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/thrift/ClientId.scala @@ -36,7 +36,7 @@ object ClientId { // As a matter of legacy, we need to support the notion of // an empty client id. Old version of contexts could serialize // the absence of a client id with an empty buffer. - private[finagle] val clientIdCtx = + private[twitter] val clientIdCtx = new Contexts.broadcast.Key[Option[ClientId]]("com.twitter.finagle.thrift.ClientIdContext") { def marshal(clientId: Option[ClientId]): Buf = clientId match { case None => Buf.Empty diff --git a/finagle-core/src/test/scala/com/twitter/finagle/filter/ClearBroadcastContextFilterTest.scala b/finagle-core/src/test/scala/com/twitter/finagle/filter/ClearBroadcastContextFilterTest.scala new file mode 100644 index 00000000000..a7f23bdde6a --- /dev/null +++ b/finagle-core/src/test/scala/com/twitter/finagle/filter/ClearBroadcastContextFilterTest.scala @@ -0,0 +1,44 @@ +package com.twitter.finagle.filter + +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle._ +import com.twitter.finagle.context.Contexts +import com.twitter.finagle.context.Deadline +import com.twitter.finagle.context.Requeues +import com.twitter.finagle.stack.nilStack +import com.twitter.io.Buf +import com.twitter.util.Await +import com.twitter.util.Future +import org.scalatest.funsuite.AnyFunSuite + +class ClearBroadcastContextFilterTest extends AnyFunSuite { + + test("clears context except for the configured keys") { + val clearContextFilter = + ClearBroadcastContextFilter.module[Unit, Set[String]](retainKeys = Set(Deadline)) + + val svcModule = new Stack.Module0[ServiceFactory[Unit, Set[String]]] { + val role = Stack.Role("svcModule") + val description = "" + + def make(next: ServiceFactory[Unit, Set[String]]) = + ServiceFactory.const( + Service.mk[Unit, Set[String]](_ => + Future.value(Contexts.broadcast + .marshal().map { + case (k, v) => + Buf.Utf8.unapply(k).get + }.toSet))) + } + + val factory = new StackBuilder[ServiceFactory[Unit, Set[String]]](nilStack[Unit, Set[String]]) + .push(svcModule) + .push(clearContextFilter) + .make(Stack.Params.empty) + + val svc: Service[Unit, Set[String]] = Await.result(factory(), 1.second) + Contexts.broadcast.let(Requeues, Requeues(5), Deadline, Deadline.ofTimeout(5.seconds)) { + assert(Await.result(svc(()), 1.second) == Set(Deadline.id)) + } + } +}