From ba76fa982413ac28121d141814b43b638fce92bf Mon Sep 17 00:00:00 2001 From: shunsuke-aki Date: Mon, 6 Mar 2023 18:05:12 +0900 Subject: [PATCH] Test log of client side Fix for https://github.com/line/armeria/issues/4625 . --- ...rpcServiceImplErrorFromClientViewTest.java | 325 ++++++++++++++++++ ...pcServiceImplErrorFromServerViewTest.java} | 55 +-- 2 files changed, 358 insertions(+), 22 deletions(-) create mode 100644 grpc/src/test/java/com/linecorp/armeria/client/grpc/GrpcServiceImplErrorFromClientViewTest.java rename grpc/src/test/java/com/linecorp/armeria/server/grpc/{GrpcServiceImplErrorTest.java => GrpcServiceImplErrorFromServerViewTest.java} (83%) diff --git a/grpc/src/test/java/com/linecorp/armeria/client/grpc/GrpcServiceImplErrorFromClientViewTest.java b/grpc/src/test/java/com/linecorp/armeria/client/grpc/GrpcServiceImplErrorFromClientViewTest.java new file mode 100644 index 000000000000..d4c67543ff41 --- /dev/null +++ b/grpc/src/test/java/com/linecorp/armeria/client/grpc/GrpcServiceImplErrorFromClientViewTest.java @@ -0,0 +1,325 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.grpc; + +import static com.linecorp.armeria.internal.common.grpc.GrpcTestUtil.REQUEST_MESSAGE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.google.protobuf.ByteString; + +import com.linecorp.armeria.client.ClientRequestContextCaptor; +import com.linecorp.armeria.client.Clients; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.grpc.GrpcSerializationFormats; +import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.grpc.testing.Messages.Payload; +import com.linecorp.armeria.grpc.testing.Messages.SimpleRequest; +import com.linecorp.armeria.grpc.testing.Messages.SimpleResponse; +import com.linecorp.armeria.grpc.testing.Messages.StreamingOutputCallRequest; +import com.linecorp.armeria.grpc.testing.Messages.StreamingOutputCallResponse; +import com.linecorp.armeria.grpc.testing.TestServiceGrpc; +import com.linecorp.armeria.grpc.testing.TestServiceGrpc.TestServiceBlockingStub; +import com.linecorp.armeria.grpc.testing.TestServiceGrpc.TestServiceStub; +import com.linecorp.armeria.internal.common.grpc.StreamRecorder; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.server.logging.LoggingService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; + +import io.grpc.InternalMetadata; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; + +class GrpcServiceImplErrorFromClientViewTest { + + private static final StreamingOutputCallRequest STREAMING_OUTPUT_CALL_REQUEST_MESSAGE = + StreamingOutputCallRequest + .newBuilder() + .setPayload( + Payload.newBuilder().setBody(ByteString.copyFromUtf8("ping1")).build()) + .build(); + // Valid metadata has even count of binaryValues. + private static final Metadata validMetadata = InternalMetadata.newMetadata( + new byte[] { 0 }, new byte[] { 1 }, new byte[] { 2 }, new byte[] { 3 }); + // 'usedNames' is 3, but size of 'binaryValues' is 2. + // 'usedNames' and size of 'binaryValues' must be equal, so this is corrupted metadata. + private static final Metadata corruptedMetadata = + InternalMetadata.newMetadata(3, new byte[] { 0 }, new byte[] { 1 }); + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + final GrpcService grpcService = + GrpcService + .builder() + .addService( + new ServerImplErrorAtMetadataService()) + .build(); + sb.service(grpcService); + sb.decorator(LoggingService.newDecorator()); + } + }; + + @RegisterExtension + static ServerExtension serverWithBlockingExecutor = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + final GrpcService grpcService = + GrpcService.builder() + .addService( + new ServerImplErrorAtMetadataService()) + .enableUnframedRequests(true) + .supportedSerializationFormats( + GrpcSerializationFormats.values()) + .useBlockingTaskExecutor(true) + .build(); + sb.service(grpcService); + sb.decorator(LoggingService.newDecorator()); + } + }; + + // Normal case of #onError at Unary RPC. + @Test + void clientUnaryCall2() throws InterruptedException { + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThatThrownBy(() -> { + final TestServiceBlockingStub client = + GrpcClients.newClient("http://127.0.0.1:" + serverWithBlockingExecutor.httpPort(), + TestServiceBlockingStub.class) + .withDeadlineAfter(200, TimeUnit.SECONDS); + client.unaryCall2(REQUEST_MESSAGE); + }).satisfies(cause -> { + assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.ABORTED.getCode()); + assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> { + assertThat(metadata).isNotEqualTo(validMetadata); + }); + }); + + final RequestLog log = captor.get().log().whenComplete().join(); + assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK); + assertThat(log.responseCause()) + .isInstanceOf(StatusRuntimeException.class) + .satisfies(cause -> { + assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.ABORTED.getCode()); + assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> { + assertThat(metadata).isNotEqualTo(validMetadata); + }); + }); + } + } + + // Error inside #onError at Unary RPC. + @Test + void clientUnaryCall() throws InterruptedException { + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThatThrownBy(() -> { + final TestServiceBlockingStub client = + GrpcClients.newClient("http://127.0.0.1:" + serverWithBlockingExecutor.httpPort(), + TestServiceBlockingStub.class) + .withDeadlineAfter(2, TimeUnit.SECONDS); + + client.unaryCall(REQUEST_MESSAGE); + }).satisfies(cause -> { + assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.UNKNOWN.getCode()); + assertThat(Status.trailersFromThrowable(cause)).isNotEqualTo(corruptedMetadata); + }); + + final RequestLog log = captor.get().log().whenComplete().join(); + assertThat(log.responseStatus()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR); + assertThat(log.responseCause()) + .satisfies(cause -> { + assertThat(cause).isInstanceOf(StatusRuntimeException.class); + assertThat(Status.fromThrowable(cause).getCode()).isEqualTo( + Status.UNKNOWN.getCode()); + assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> { + assertThat(isSameSizeAndNameAndValues(metadata, corruptedMetadata)).isFalse(); + }); + }); + } + } + + // Error inside #onError at server streaming RPC + @Test + void clientUnaryCallServerStreamingOutputCall() throws InterruptedException { + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThatThrownBy(() -> { + final TestServiceBlockingStub client = GrpcClients.newClient( + "http://127.0.0.1:" + serverWithBlockingExecutor.httpPort(), + TestServiceBlockingStub.class) + .withDeadlineAfter(60, TimeUnit.SECONDS); + final Iterator it = client.streamingOutputCall( + STREAMING_OUTPUT_CALL_REQUEST_MESSAGE); + while (it.hasNext()) { + it.next(); + } + }).satisfies(cause -> { + assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.CANCELLED.getCode()); + assertThat(Status.trailersFromThrowable(cause)).isNotEqualTo(corruptedMetadata); + }); + + final RequestLog log = captor.get().log().whenComplete().join(); + assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK); + assertThat(log.responseCause()) + .isInstanceOf(StatusRuntimeException.class) + .satisfies(cause -> { + assertThat(Status.fromThrowable(cause).getCode()).isEqualTo( + Status.CANCELLED.getCode()); + assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> { + assertThat(isSameSizeAndNameAndValues(metadata, corruptedMetadata)).isFalse(); + }); + }); + } + } + + // Error inside #onError at bidirectional streaming RPC + @Test + void fullDuplexCall() throws Exception { + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + final TestServiceStub client = + GrpcClients.newClient("http://127.0.0.1:" + server.httpPort(), + TestServiceStub.class) + .withDeadlineAfter(200, TimeUnit.SECONDS); + + final StreamRecorder responseObserver = StreamRecorder.create(); + final StreamObserver requestObserer = client.fullDuplexCall( + responseObserver); + for (int i = 0; i < 3; i++) { + requestObserer.onNext(StreamingOutputCallRequest.newBuilder().setPayload( + Payload.newBuilder().setBody(ByteString.copyFromUtf8("ping" + i)).build()).build()); + } + responseObserver.awaitCompletion(); + + assertThat(Status.fromThrowable(responseObserver.getError()).getCode()).isEqualTo( + Status.CANCELLED.getCode()); + assertThat(Status.trailersFromThrowable(responseObserver.getError())).isNotEqualTo( + corruptedMetadata); + + final RequestLog log = captor.get().log().whenComplete().join(); + assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK); + assertThatThrownBy(() -> log.context().request().whenComplete().join()) + .cause() + .isSameAs(log.responseCause()) + .isInstanceOf(StatusRuntimeException.class) + .satisfies(cause -> { + assertThat(Status.fromThrowable(cause).getCode()).isEqualTo( + Status.CANCELLED.getCode()); + assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> { + assertThat(isSameSizeAndNameAndValues(metadata, corruptedMetadata)).isFalse(); + }); + }); + } + } + + private static final class ServerImplErrorAtMetadataService extends TestServiceGrpc.TestServiceImplBase { + // blocking client + // No error happens when serializing metadata. + @Override + public void unaryCall2(SimpleRequest request, StreamObserver responseObserver) { + responseObserver.onError(new StatusRuntimeException(Status.ABORTED, validMetadata)); + } + + // blocking client + // Error happens when serializing metadata. + @Override + public void unaryCall(SimpleRequest request, StreamObserver responseObserver) { + responseObserver.onError(new StatusRuntimeException(Status.ABORTED, corruptedMetadata)); + } + + // blocking client + // Error happens when serializing metadata. + @Override + public void streamingOutputCall(StreamingOutputCallRequest request, + StreamObserver responseObserver) { + responseObserver.onNext(StreamingOutputCallResponse.newBuilder().setPayload( + Payload.newBuilder().setBody(ByteString.copyFromUtf8("pong1")).build()).build()); + if (request.equals(STREAMING_OUTPUT_CALL_REQUEST_MESSAGE)) { + responseObserver.onError( + new StatusRuntimeException(Status.FAILED_PRECONDITION, corruptedMetadata)); + return; // onError must be the last call. + } + // The following lines are not reached. + responseObserver.onNext(StreamingOutputCallResponse.newBuilder().setPayload( + Payload.newBuilder().setBody(ByteString.copyFromUtf8("pong2")).build()).build()); + responseObserver.onCompleted(); + } + + // Error happens when serializing metadata. + @Override + public StreamObserver fullDuplexCall( + StreamObserver responseObserver) { + return new StreamObserver() { + int cnt; + + @Override + public void onNext(StreamingOutputCallRequest value) { + // Return error on the 3rd message. + if (++cnt == 3) { + responseObserver.onError( + new StatusRuntimeException(Status.FAILED_PRECONDITION, corruptedMetadata)); + } else { + responseObserver.onNext( + StreamingOutputCallResponse + .newBuilder() + .setPayload( + Payload.newBuilder() + .setBody(ByteString.copyFromUtf8("pong" + cnt)).build()) + .build()); + } + } + + @Override + public void onError(Throwable t) { + // no-op + } + + @Override + public void onCompleted() { + // no-op + } + }; + } + } + + // Use this method to ensure to escape assertion in the constructor of Metadata. + private static boolean isSameSizeAndNameAndValues(Metadata a, Metadata b) { + try { + final Field sizeField = Metadata.class.getDeclaredField("size"); + sizeField.setAccessible(true); + + final Field namesAndValuesField = Metadata.class.getDeclaredField("namesAndValues"); + namesAndValuesField.setAccessible(true); + + return sizeField.get(a) == sizeField.get(b) && + Arrays.equals((Object[]) namesAndValuesField.get(a), + (Object[]) namesAndValuesField.get(b)); + } catch (IllegalAccessException | NoSuchFieldException e) { + throw new RuntimeException("It has failed to mutate fields with reflection.", e); + } + } +} diff --git a/grpc/src/test/java/com/linecorp/armeria/server/grpc/GrpcServiceImplErrorTest.java b/grpc/src/test/java/com/linecorp/armeria/server/grpc/GrpcServiceImplErrorFromServerViewTest.java similarity index 83% rename from grpc/src/test/java/com/linecorp/armeria/server/grpc/GrpcServiceImplErrorTest.java rename to grpc/src/test/java/com/linecorp/armeria/server/grpc/GrpcServiceImplErrorFromServerViewTest.java index a6f173af5bf3..d038fe3283cd 100644 --- a/grpc/src/test/java/com/linecorp/armeria/server/grpc/GrpcServiceImplErrorTest.java +++ b/grpc/src/test/java/com/linecorp/armeria/server/grpc/GrpcServiceImplErrorFromServerViewTest.java @@ -29,6 +29,7 @@ import com.google.protobuf.ByteString; +import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.grpc.GrpcSerializationFormats; import com.linecorp.armeria.common.logging.RequestLog; import com.linecorp.armeria.grpc.testing.Messages.Payload; @@ -52,7 +53,7 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; -class GrpcServiceImplErrorTest { +class GrpcServiceImplErrorFromServerViewTest { private static final StreamingOutputCallRequest STREAMING_OUTPUT_CALL_REQUEST_MESSAGE = StreamingOutputCallRequest @@ -125,7 +126,7 @@ static void tearDownChannel() { @Test void clientUnaryCall2() throws InterruptedException { assertThatThrownBy(() -> { - TestServiceBlockingStub client = TestServiceGrpc.newBlockingStub(blockingChannel); + final TestServiceBlockingStub client = TestServiceGrpc.newBlockingStub(blockingChannel); client.unaryCall2(REQUEST_MESSAGE); }).satisfies(cause -> { assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.ABORTED.getCode()); @@ -134,7 +135,9 @@ void clientUnaryCall2() throws InterruptedException { }); }); - RequestLog log = serverWithBlockingExecutor.requestContextCaptor().take().log().whenComplete().join(); + final RequestLog log = serverWithBlockingExecutor + .requestContextCaptor().take().log().whenComplete().join(); + assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK); assertThat(log.responseCause()) .isInstanceOf(StatusRuntimeException.class) .satisfies(cause -> { @@ -149,14 +152,16 @@ void clientUnaryCall2() throws InterruptedException { @Test void clientUnaryCall() throws InterruptedException { assertThatThrownBy(() -> { - TestServiceBlockingStub client = TestServiceGrpc.newBlockingStub(blockingChannel); + final TestServiceBlockingStub client = TestServiceGrpc.newBlockingStub(blockingChannel); client.unaryCall(REQUEST_MESSAGE); }).satisfies(cause -> { assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.UNKNOWN.getCode()); assertThat(Status.trailersFromThrowable(cause)).isNotEqualTo(corruptedMetadata); }); - RequestLog log = serverWithBlockingExecutor.requestContextCaptor().take().log().whenComplete().join(); + final RequestLog log = serverWithBlockingExecutor + .requestContextCaptor().take().log().whenComplete().join(); + assertThat(log.responseStatus()).isEqualTo(HttpStatus.UNKNOWN); assertThat(log.responseCause()) .isInstanceOf(ArrayIndexOutOfBoundsException.class); } @@ -165,8 +170,8 @@ void clientUnaryCall() throws InterruptedException { @Test void clientUnaryCallServerStreamingOutputCall() throws InterruptedException { assertThatThrownBy(() -> { - TestServiceBlockingStub client = TestServiceGrpc.newBlockingStub(blockingChannel); - Iterator it = client.streamingOutputCall( + final TestServiceBlockingStub client = TestServiceGrpc.newBlockingStub(blockingChannel); + final Iterator it = client.streamingOutputCall( STREAMING_OUTPUT_CALL_REQUEST_MESSAGE); while (it.hasNext()) { it.next(); @@ -176,7 +181,9 @@ void clientUnaryCallServerStreamingOutputCall() throws InterruptedException { assertThat(Status.trailersFromThrowable(cause)).isNotEqualTo(corruptedMetadata); }); - RequestLog log = serverWithBlockingExecutor.requestContextCaptor().take().log().whenComplete().join(); + final RequestLog log = serverWithBlockingExecutor + .requestContextCaptor().take().log().whenComplete().join(); + assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK); assertThat(log.responseCause()) .isInstanceOf(StatusRuntimeException.class) .satisfies(cause -> { @@ -191,9 +198,10 @@ void clientUnaryCallServerStreamingOutputCall() throws InterruptedException { // Error inside #onError at bidirectional streaming RPC @Test void fullDuplexCall() throws Exception { - TestServiceStub client = TestServiceGrpc.newStub(channel); - StreamRecorder responseObserver = StreamRecorder.create(); - StreamObserver requestObserer = client.fullDuplexCall(responseObserver); + final TestServiceStub client = TestServiceGrpc.newStub(channel); + final StreamRecorder responseObserver = StreamRecorder.create(); + final StreamObserver requestObserer = client.fullDuplexCall( + responseObserver); for (int i = 0; i < 3; i++) { requestObserer.onNext(StreamingOutputCallRequest.newBuilder().setPayload( Payload.newBuilder().setBody(ByteString.copyFromUtf8("ping" + i)).build()).build()); @@ -204,20 +212,18 @@ void fullDuplexCall() throws Exception { Status.INTERNAL.getCode()); assertThat(Status.trailersFromThrowable(responseObserver.getError())).isNotEqualTo(corruptedMetadata); - RequestLog log = server.requestContextCaptor().take().log().whenComplete().join(); - assertThatThrownBy( - () -> log.context().request().whenComplete().join()) + final RequestLog log = server.requestContextCaptor().take().log().whenComplete().join(); + assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK); + assertThatThrownBy(() -> log.context().request().whenComplete().join()) .cause() .isSameAs(log.responseCause()) .isInstanceOf(StatusRuntimeException.class) .satisfies(cause -> { assertThat(Status.fromThrowable(cause).getCode()).isEqualTo( Status.FAILED_PRECONDITION.getCode()); - assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> { - assertThat(metadata).isEqualTo(corruptedMetadata); - }); + assertThat(Status.trailersFromThrowable(cause)) + .satisfies(metadata -> assertThat(metadata).isEqualTo(corruptedMetadata)); }); - } private static final class ServerImplErrorAtMetadataService extends TestServiceGrpc.TestServiceImplBase { @@ -258,7 +264,7 @@ public void streamingOutputCall(StreamingOutputCallRequest request, public StreamObserver fullDuplexCall( StreamObserver responseObserver) { return new StreamObserver() { - int cnt = 0; + int cnt; @Override public void onNext(StreamingOutputCallRequest value) { @@ -267,9 +273,14 @@ public void onNext(StreamingOutputCallRequest value) { responseObserver.onError( new StatusRuntimeException(Status.FAILED_PRECONDITION, corruptedMetadata)); } else { - responseObserver.onNext(StreamingOutputCallResponse.newBuilder().setPayload( - Payload.newBuilder().setBody(ByteString.copyFromUtf8("pong" + cnt)).build()) - .build()); + responseObserver.onNext( + StreamingOutputCallResponse + .newBuilder() + .setPayload( + Payload.newBuilder() + .setBody(ByteString.copyFromUtf8("pong" + cnt)) + .build()) + .build()); } }