Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way to propagate a custom context to CoroutineContext #66

Closed
ikhoon opened this issue Apr 18, 2020 · 15 comments
Closed

Provide a way to propagate a custom context to CoroutineContext #66

ikhoon opened this issue Apr 18, 2020 · 15 comments
Milestone

Comments

@ikhoon
Copy link
Contributor

ikhoon commented Apr 18, 2020

Currently, CoroutineContext could be customized with AbstractCoroutineServerImpl constructor.

val context: CoroutineContext = EmptyCoroutineContext

And it is combined with GrpcContextElement that captures a context from the current thread and propagates it.

context + GrpcContextElement.current(),

However, a CoroutinContext injected by users could not propagate any context from the current thread, because it was instantiated when the Coroutine stub is created.

We can use a hack that was suggested by @anuraaga, line/armeria#2669 (comment)

internal object ArmeriaContext: CoroutineContext {
    override fun <R> fold(initial: R, operation: (R, CoroutineContext.Element) -> R): R =
            EmptyCoroutineContext.fold(initial, operation)

    override fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? =
            EmptyCoroutineContext.get(key)

    override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext =
            EmptyCoroutineContext.minusKey(key)

    override fun plus(context: CoroutineContext): CoroutineContext {
        val requestCtx: RequestContext = RequestContext.current()
        return requestCtx.contextAwareExecutor().asCoroutineDispatcher() + context
    }
}

We don't think this is a nice approach. If AbstractCoroutineServerImpl get CoroutineContext from a method then constructor parameter, gRPC-Kotlin users can easily propagate their context to CoroutineContext.

abstract class AbstractCoroutineServerImpl(
        val defaultContext: CoroutineContext = EmptyCoroutineContext
) : BindableService {
    open fun context(): CoroutineContext = defaultContext
}

class UserService: XXXCoroutineImplBase() {
    override fun context(): CoroutineContext {
        return ServiceRequestContext.current().contextAwareExecutor().asCoroutineDispatcher()
    }
}

Do you think this change makes sense, I could make a PR. 😀

/cc @anuraaga

@anuraaga
Copy link

An alternative instead of a method override would be for the constructor parameter to accept a supplier of contexts instead of context, but the method seems a bit cleaner.

The key point here is context should often be scoped to a request, so having a constant parameter passed to the stub itself, which is essentially a Singleton, doesn't allow for modeling request-scoped contexts.

@lowasser
Copy link
Collaborator

lowasser commented Apr 21, 2020

I don't follow why, if you want context scoped to a request, you would not actively prefer to just write

class MyServer : MyServerCoroutineImplBase() {
  override suspend fun myRpc(request: MyRequest): MyResponse {
    return withContext(appropriateContext()) {
      ...
    }
  }
}

...which seems like a fairly standard approach. I don't follow your use case yet, as to why that wouldn't be appropriate here?

@anuraaga
Copy link

That technically works but is a lot of boilerplate if it has to happen for every method - we have created an appropriate context in our framework and would like to use it in a generic way. Otherwise we add this same code to every service method.

withContext(RequestContext.current().contextAwareExecutor().asCoroutineDispatcher())

With Java we'd probably use interceptors instead for something similar but unfortunately a Java interceptor can't work with coroutines.

@lowasser
Copy link
Collaborator

If the boilerplate is getting to be a lot, you can factor it out:

private inline suspend fun <T> withRequestContext(block: suspend CoroutineScope.() -> T): T =
   withContext(RequestContext.current().contentAwareExecutor().asCoroutineDispatcher()) {
     block()
   }

and then just write withRequestContext { ... } for each of your methods, which isn't significantly more boilerplate than the coroutineScope you have to wrap many other coroutine functions in.

It makes sense that you might want this behavior for this case where you have a context you're generating the same way for every RPC method, but I'm not certain that's actually a common thing, and I feel like it'd seem strange to any project that didn't have that precise practice?

One possibility might be to look into adding Kotlin-specific interceptors which would be more general, and could let you intercept the coroutines appropriately.

@trustin
Copy link

trustin commented Apr 23, 2020

Does this mean users (not an RPC framework but those who write business logic) have to wrap their logic with withRequestContext() for every service method? I guess that would be a pretty painful experience...

@ikhoon
Copy link
Contributor Author

ikhoon commented Apr 23, 2020

Technically, it is hard to use withRequetContext { ... } only. The current request context could be captured from thread local like gRPC Context.

That means coroutine stub should run under the context-aware executor that is request-scoped, so could not be created by the constructor.

If gRPC Kotlin supports Kotlin-specific interceptors, that sounds good to me. 😀

@ikhoon
Copy link
Contributor Author

ikhoon commented Apr 30, 2020

We can not use decorate all RPC methods with withRequestContext that should be used in a suspend function.
An RPC method that returns Flow is not a suspend function.

withRequestContext could not wrap inside of flow block. Because emit should happen the dispatchers of flow.
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/flow/Builders.kt#L37-L45

@okue
Copy link

okue commented May 14, 2020

I think this is not a rare case.
Some users may want to propagate a logging context which is aware of an incoming request.

It would be happy for them if they can write like the following:

// https://github.com/Kotlin/kotlinx.coroutines/tree/master/integration/kotlinx-coroutines-slf4j
import kotlinx.coroutines.slf4j.MDCContext

class XxxService: XxxCoroutineImplBase() {
  override val context: CoroutineContext
    get() = MDCContext(requestAwareMdc())

  // gets a logging context from thread local or gRPC context
  private fun requestAwareMdc() = ...

  ...
}

@lowasser
Copy link
Collaborator

Flows would certainly have to use .flowOn(context).

To organize my thoughts: I remain surprised that the CoroutineContext API being suggested gets created anew for each request. To take SLF4J as a particular example, here is one approach I might've expected:

object MdcFromGrpcElement : 
  ThreadContextElement<MdcContextMap>,
  CoroutineContext.Element,
  CoroutineContext.Key<MdcFromGrpcElement> {
  override val key: CoroutineContext.Key<MdcFromGrpcElement> = this

  override fun updateThreadContext(context: CoroutineContext): MdcContextMap {
    val grpcContext = io.grpc.Context.current()
    val newMdcContextMap = ...
    val oldMdcContextMap = MDC.getCopyOfContextMap()
    MDC.setContextMap(newMdcContextMap)
    return oldMdcContextMap
  }

  override fun restoreThreadContext(context: CoroutineContext, oldState: MdcContextMap) {
    MDC.setContextMap(oldState)
  }
}

...in short, a singleton, which works just fine with gRPC-Kotlin's current API.

If these maps tended to be large and rebuilding them was expensive -- or for any other reason -- you might instead create an interceptor which did the work once and stored it in the gRPC Context, and then your singleton would get them pre-made from the gRPC context and install them.

@lowasser
Copy link
Collaborator

One possible API: a gRPC Context element to store a CoroutineContext add-on. Some advantages of that approach include:

  • these interceptors can be applied to servers, clients, and to different services, using a standard, preexisting, flexible mechanism that doesn't have any additional implicit state passing
  • assuming the interceptors properly add to the preexisting context, the story for stacking multiple interceptors should be very smooth

@davidjwiner davidjwiner added this to the v0.1.4 milestone Jun 2, 2020
@dariuszkuc
Copy link

Using suggested MdcFromGrpcElement approach does work but it is suboptimal. Unless I'm missing something there is no way in grpc-java (or Kotlin) to access Metadata from the Context (equivalent of Go metadata.FromIncomingContext). Since quite often you would specify tracing metadata through headers I needed to create custom interceptor that populates Context based on values from Metadata....

@jef
Copy link

jef commented Jun 12, 2020

Since quite often you would specify tracing metadata through headers I needed to create custom interceptor that populates Context based on values from Metadata....

This approach is pretty common I'd think.

@lowasser
Copy link
Collaborator

I'm moving forward on a design for an interceptor-based solution, where interceptors can inject CoroutineContext elements. There's no ambiguity about how they work, what information they have access to, or other thread-local information.

@bshaffer bshaffer modified the milestones: v0.1.4, v0.1.5 Jun 24, 2020
lowasser added a commit that referenced this issue Jul 15, 2020
…Contexts. (#143)

* Revert "Add a ServerInterceptor API for injecting custom CoroutineContext elements."

This reverts commit 989f1f1.

* Add CoroutineContextServerInterceptor, an API to add elements to the CoroutineContext of the server.
@lowasser
Copy link
Collaborator

Merged a mechanism to introduce CoroutineContext elements via an interceptor -- which can also be easily installed on multiple servers, which seem applicable to many of these use cases.

@ikhoon
Copy link
Contributor Author

ikhoon commented Jul 16, 2020

Thanks for your consideration and solve it. 🙇‍♂️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants