Skip to content

Commit

Permalink
Fix gRPC context propagation.
Browse files Browse the repository at this point in the history
  • Loading branch information
alesj committed Mar 27, 2023
1 parent 32ec9f6 commit 9cc82c2
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.quarkus.grpc.bd.deadline;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.grpc.Deadline;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloRequest;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.test.QuarkusUnitTest;

public class ClientBlockingDeadlineTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addPackage(GreeterGrpc.class.getPackage()).addClasses(HelloService.class))
.withConfigurationResource("hello-config-deadline.properties");

@GrpcClient("hello-service")
GreeterGrpc.GreeterBlockingStub stub;

@Test
public void testCallOptions() {
Deadline deadline = stub.getCallOptions().getDeadline();
assertNotNull(deadline);
try {
//noinspection ResultOfMethodCallIgnored
stub.sayHello(HelloRequest.newBuilder().setName("Scaladar").build());
} catch (Exception e) {
Assertions.assertTrue(e instanceof StatusRuntimeException);
StatusRuntimeException sre = (StatusRuntimeException) e;
Status status = sre.getStatus();
Assertions.assertNotNull(status);
Assertions.assertEquals(Status.DEADLINE_EXCEEDED.getCode(), status.getCode());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.grpc.bd.deadline;

import java.time.Duration;

import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import io.quarkus.grpc.GrpcService;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;

@GrpcService
public class HelloService extends GreeterGrpc.GreeterImplBase {

@Override
@Blocking
public void sayHello(HelloRequest request, StreamObserver<HelloReply> observer) {
Deadline deadline = Context.current().getDeadline();
if (deadline == null) {
throw new IllegalStateException("Null deadline");
}
Uni.createFrom()
.item(HelloReply.newBuilder().setMessage("OK").build())
.onItem()
.delayIt()
.by(Duration.ofMillis(400)).invoke(observer::onNext)
.invoke(observer::onCompleted)
.await()
.indefinitely();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.time.Duration;

import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.examples.helloworld.Greeter;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
Expand All @@ -13,6 +15,10 @@ public class HelloService implements Greeter {

@Override
public Uni<HelloReply> sayHello(HelloRequest request) {
Deadline deadline = Context.current().getDeadline();
if (deadline == null) {
throw new IllegalStateException("Null deadline");
}
return Uni.createFrom().item(HelloReply.newBuilder().setMessage("OK").build()).onItem().delayIt()
.by(Duration.ofMillis(400));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@
import java.util.function.Consumer;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
import io.quarkus.arc.InjectableContext;
import io.quarkus.arc.InjectableContext.ContextState;
import io.quarkus.arc.ManagedContext;
Expand All @@ -31,6 +35,7 @@
* For non-annotated methods, the interceptor acts as a pass-through.
*/
public class BlockingServerInterceptor implements ServerInterceptor, Function<String, Boolean> {
private static final Logger log = LoggerFactory.getLogger(BlockingServerInterceptor.class);

private final Vertx vertx;
private final Set<String> blockingMethods;
Expand Down Expand Up @@ -144,6 +149,12 @@ private void executeOnContextOrEnqueue(Consumer<ServerCall.Listener<ReqT>> consu
* @param consumer the original
*/
private void executeBlockingWithRequestContext(Consumer<ServerCall.Listener<ReqT>> consumer) {
ArcContainer container = Arc.container();
if (container == null) {
log.warn("Arc container is null, already shutdown? Ignoring execution ...");
return;
}

final Context grpcContext = Context.current();
Handler<Promise<Object>> blockingHandler = new BlockingExecutionHandler<>(consumer, grpcContext, delegate,
requestContextState, getRequestContext(), this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,28 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
setContextSafe(local, true);

// Must be sure to call next.startCall on the right context
return new ListenedOnDuplicatedContext<>(ehp, call, () -> next.startCall(call, headers), local);
return new ListenedOnDuplicatedContext<>(ehp, call, nextCall(call, headers, next), local);
} else {
log.warn("Unable to run on a duplicated context - interceptor not called on the Vert.x event loop");
return next.startCall(call, headers);
}
}

private <ReqT, RespT> Supplier<ServerCall.Listener<ReqT>> nextCall(ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// Must be sure to call next.startCall on the right context
io.grpc.Context current = io.grpc.Context.current();
return () -> {
io.grpc.Context previous = current.attach();
try {
return next.startCall(call, headers);
} finally {
current.detach(previous);
}
};
}

@Override
public int getPriority() {
return Interceptors.DUPLICATE_CONTEXT;
Expand Down

0 comments on commit 9cc82c2

Please sign in to comment.