From bdee45b6dab497f43058cf78dc98441be729bcd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Szynkiewicz?= Date: Wed, 9 Jun 2021 12:32:38 +0200 Subject: [PATCH] gRPC: more logging on failures --- .../io/quarkus/grpc/runtime/ServerCalls.java | 75 +++++++++++++------ 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/extensions/grpc/stubs/src/main/java/io/quarkus/grpc/runtime/ServerCalls.java b/extensions/grpc/stubs/src/main/java/io/quarkus/grpc/runtime/ServerCalls.java index 997dd903f5e9c..18319d4d2c062 100644 --- a/extensions/grpc/stubs/src/main/java/io/quarkus/grpc/runtime/ServerCalls.java +++ b/extensions/grpc/stubs/src/main/java/io/quarkus/grpc/runtime/ServerCalls.java @@ -3,6 +3,8 @@ import java.util.function.Consumer; import java.util.function.Function; +import org.jboss.logging.Logger; + import io.grpc.Status; import io.grpc.StatusException; import io.grpc.StatusRuntimeException; @@ -13,6 +15,8 @@ import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor; public class ServerCalls { + private static final Logger log = Logger.getLogger(ServerCalls.class); + private static StreamCollector streamCollector = StreamCollector.NO_OP; private ServerCalls() { @@ -23,6 +27,13 @@ public static void oneToOne(I request, StreamObserver response, String trySetCompression(response, compression); try { Uni uni = implementation.apply(request); + if (uni == null) { + log.error("gRPC service method returned null instead of Uni. " + + "Please change the implementation to return a Uni object, either carrying a value or a failure," + + " or throw StatusRuntimeException"); + response.onError(Status.fromCode(Status.Code.INTERNAL).asException()); + return; + } uni.subscribe().with( new Consumer() { @Override @@ -46,26 +57,32 @@ public static void oneToMany(I request, StreamObserver response, Strin Function> implementation) { try { streamCollector.add(response); - implementation.apply(request) - .subscribe().with( - new Consumer() { - @Override - public void accept(O v) { - response.onNext(v); - } - }, - new Consumer() { - @Override - public void accept(Throwable throwable) { - onError(response, throwable); - } - }, - new Runnable() { - @Override - public void run() { - onCompleted(response); - } - }); + Multi returnValue = implementation.apply(request); + if (returnValue == null) { + log.error("gRPC service method returned null instead of Multi. " + + "Please change the implementation to return a Multi object or throw StatusRuntimeException"); + response.onError(Status.fromCode(Status.Code.INTERNAL).asException()); + return; + } + returnValue.subscribe().with( + new Consumer() { + @Override + public void accept(O v) { + response.onNext(v); + } + }, + new Consumer() { + @Override + public void accept(Throwable throwable) { + onError(response, throwable); + } + }, + new Runnable() { + @Override + public void run() { + onCompleted(response); + } + }); } catch (Throwable throwable) { onError(response, toStatusFailure(throwable)); } @@ -79,6 +96,13 @@ public static StreamObserver manyToOne(StreamObserver response, streamCollector.add(response); Uni uni = implementation.apply(input); + if (uni == null) { + log.error("gRPC service method returned null instead of Uni. " + + "Please change the implementation to return a Uni object, either carrying a value or a failure," + + " or throw StatusRuntimeException"); + response.onError(Status.fromCode(Status.Code.INTERNAL).asException()); + return null; + } uni.subscribe().with( new Consumer() { @Override @@ -106,8 +130,14 @@ public static StreamObserver manyToMany(StreamObserver response, streamCollector.add(response); UnicastProcessor input = UnicastProcessor.create(); StreamObserver pump = getStreamObserverFeedingProcessor(input); - Multi uni = implementation.apply(input); - uni.subscribe().with( + Multi multi = implementation.apply(input); + if (multi == null) { + log.error("gRPC service method returned null instead of Multi. " + + "Please change the implementation to return a Multi object or throw StatusRuntimeException"); + response.onError(Status.fromCode(Status.Code.INTERNAL).asException()); + return null; + } + multi.subscribe().with( new Consumer() { @Override public void accept(O v) { @@ -179,6 +209,7 @@ private static Throwable toStatusFailure(Throwable throwable) { if (throwable instanceof IllegalArgumentException) { return Status.INVALID_ARGUMENT.withDescription(desc).asException(); } + log.warn("gRPC service threw an exception other than StatusRuntimeException", throwable); return Status.fromThrowable(throwable) .withDescription(desc) .asException();