From ce72b557ec463d559892f6d49848c6bf2d444f34 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Mon, 27 Mar 2023 17:26:39 +0200 Subject: [PATCH] Fix gRPC context propagation. --- .../client/bd/ClientBlockingDeadlineTest.java | 46 +++++++++++++++++++ .../quarkus/grpc/client/bd/HelloService.java | 35 ++++++++++++++ .../grpc/client/deadline/HelloService.java | 6 +++ .../blocking/BlockingServerInterceptor.java | 17 ++++++- .../GrpcDuplicatedContextGrpcInterceptor.java | 17 ++++++- .../BlockingServerInterceptorTest.java | 31 ++++++++----- 6 files changed, 139 insertions(+), 13 deletions(-) create mode 100644 extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/ClientBlockingDeadlineTest.java create mode 100644 extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/HelloService.java diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/ClientBlockingDeadlineTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/ClientBlockingDeadlineTest.java new file mode 100644 index 00000000000000..3455abda0e0fa0 --- /dev/null +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/ClientBlockingDeadlineTest.java @@ -0,0 +1,46 @@ +package io.quarkus.grpc.client.bd; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.grpc.Deadline; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloRequest; +import io.quarkus.grpc.GrpcClient; +import io.quarkus.test.QuarkusUnitTest; + +public class ClientBlockingDeadlineTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer( + () -> ShrinkWrap.create(JavaArchive.class) + .addPackage(GreeterGrpc.class.getPackage()).addClasses(HelloService.class)) + .withConfigurationResource("hello-config-deadline.properties"); + + @GrpcClient("hello-service") + GreeterGrpc.GreeterBlockingStub stub; + + @Test + public void testCallOptions() { + Deadline deadline = stub.getCallOptions().getDeadline(); + assertNotNull(deadline); + try { + //noinspection ResultOfMethodCallIgnored + stub.sayHello(HelloRequest.newBuilder().setName("Scaladar").build()); + } catch (Exception e) { + Assertions.assertTrue(e instanceof StatusRuntimeException); + StatusRuntimeException sre = (StatusRuntimeException) e; + Status status = sre.getStatus(); + Assertions.assertNotNull(status); + Assertions.assertEquals(Status.DEADLINE_EXCEEDED.getCode(), status.getCode()); + } + } +} diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/HelloService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/HelloService.java new file mode 100644 index 00000000000000..c3cda52bb6df15 --- /dev/null +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/bd/HelloService.java @@ -0,0 +1,35 @@ +package io.quarkus.grpc.client.bd; + +import java.time.Duration; + +import io.grpc.Context; +import io.grpc.Deadline; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.stub.StreamObserver; +import io.quarkus.grpc.GrpcService; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Uni; + +@GrpcService +public class HelloService extends GreeterGrpc.GreeterImplBase { + + @Override + @Blocking + public void sayHello(HelloRequest request, StreamObserver observer) { + Deadline deadline = Context.current().getDeadline(); + if (deadline == null) { + throw new IllegalStateException("Null deadline"); + } + Uni.createFrom() + .item(HelloReply.newBuilder().setMessage("OK").build()) + .onItem() + .delayIt() + .by(Duration.ofMillis(400)).invoke(observer::onNext) + .invoke(observer::onCompleted) + .await() + .indefinitely(); + } + +} diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/deadline/HelloService.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/deadline/HelloService.java index 2d2f018cbcf186..c44b698ba5df6f 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/deadline/HelloService.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/client/deadline/HelloService.java @@ -2,6 +2,8 @@ import java.time.Duration; +import io.grpc.Context; +import io.grpc.Deadline; import io.grpc.examples.helloworld.Greeter; import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; @@ -13,6 +15,10 @@ public class HelloService implements Greeter { @Override public Uni sayHello(HelloRequest request) { + Deadline deadline = Context.current().getDeadline(); + if (deadline == null) { + throw new IllegalStateException("Null deadline"); + } return Uni.createFrom().item(HelloReply.newBuilder().setMessage("OK").build()).onItem().delayIt() .by(Duration.ofMillis(400)); } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java index 69e99da2654c61..5dc950193aedcb 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java @@ -10,6 +10,9 @@ import java.util.function.Consumer; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.grpc.Context; import io.grpc.Metadata; import io.grpc.ServerCall; @@ -31,6 +34,7 @@ * For non-annotated methods, the interceptor acts as a pass-through. */ public class BlockingServerInterceptor implements ServerInterceptor, Function { + private static final Logger log = LoggerFactory.getLogger(BlockingServerInterceptor.class); private final Vertx vertx; private final Set blockingMethods; @@ -107,9 +111,11 @@ private class ReplayListener extends ServerCall.Listener { private ServerCall.Listener delegate; private final Queue>> incomingEvents = new LinkedList<>(); private boolean isConsumingFromIncomingEvents = false; + private final Context grpcContext; private ReplayListener(InjectableContext.ContextState requestContextState) { this.requestContextState = requestContextState; + this.grpcContext = Context.current(); } /** @@ -144,7 +150,11 @@ private void executeOnContextOrEnqueue(Consumer> consu * @param consumer the original */ private void executeBlockingWithRequestContext(Consumer> consumer) { - final Context grpcContext = Context.current(); + if (!isExecutable()) { + log.warn("Not executable, already shutdown? Ignoring execution ..."); + return; + } + Handler> blockingHandler = new BlockingExecutionHandler<>(consumer, grpcContext, delegate, requestContextState, getRequestContext(), this); if (devMode) { @@ -189,6 +199,11 @@ public void onReady() { } // protected for tests + + protected boolean isExecutable() { + return Arc.container() != null; + } + protected ManagedContext getRequestContext() { return Arc.container().requestContext(); } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java index e02cf3001f1723..675f6338aaa11f 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcDuplicatedContextGrpcInterceptor.java @@ -47,13 +47,28 @@ public ServerCall.Listener interceptCall(ServerCall(ehp, call, () -> next.startCall(call, headers), local); + return new ListenedOnDuplicatedContext<>(ehp, call, nextCall(call, headers, next), local); } else { log.warn("Unable to run on a duplicated context - interceptor not called on the Vert.x event loop"); return next.startCall(call, headers); } } + private Supplier> nextCall(ServerCall call, + Metadata headers, + ServerCallHandler next) { + // Must be sure to call next.startCall on the right context + io.grpc.Context current = io.grpc.Context.current(); + return () -> { + io.grpc.Context previous = current.attach(); + try { + return next.startCall(call, headers); + } finally { + current.detach(previous); + } + }; + } + @Override public int getPriority() { return Interceptors.DUPLICATE_CONTEXT; diff --git a/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java b/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java index 96a27e392e6eb1..78c0e3875802e2 100644 --- a/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java +++ b/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java @@ -35,6 +35,11 @@ void setup() { ManagedContext requestContext = mock(ManagedContext.class); when(requestContext.getState()).thenReturn(contextState); blockingServerInterceptor = new BlockingServerInterceptor(vertx, Collections.singletonList("blocking"), false) { + @Override + protected boolean isExecutable() { + return true; + } + @Override protected ManagedContext getRequestContext() { return requestContext; @@ -53,21 +58,25 @@ void testContextPropagation() throws Exception { // setting grpc context final Context context = Context.current().withValue(USERNAME, "my-user"); + Context previous = context.attach(); + try { + final ServerCall.Listener listener = blockingServerInterceptor.interceptCall(serverCall, null, serverCallHandler); + serverCallHandler.awaitSetup(); - final ServerCall.Listener listener = blockingServerInterceptor.interceptCall(serverCall, null, serverCallHandler); - serverCallHandler.awaitSetup(); + // simulate GRPC call + context.wrap(() -> listener.onMessage("hello")).run(); - // simulate GRPC call - context.wrap(() -> listener.onMessage("hello")).run(); + // await for the message to be received + serverCallHandler.await(); - // await for the message to be received - serverCallHandler.await(); + // check that the thread is a worker thread + assertThat(serverCallHandler.threadName).contains("vert.x").contains("worker"); - // check that the thread is a worker thread - assertThat(serverCallHandler.threadName).contains("vert.x").contains("worker"); - - // check that the context was propagated correctly - assertThat(serverCallHandler.contextUserName).isEqualTo("my-user"); + // check that the context was propagated correctly + assertThat(serverCallHandler.contextUserName).isEqualTo("my-user"); + } finally { + context.detach(previous); + } } static class BlockingServerCallHandler implements ServerCallHandler {