Skip to content

Commit

Permalink
Test log of client side
Browse files Browse the repository at this point in the history
Fix for line#4625 .
  • Loading branch information
aki-s committed Mar 6, 2023
1 parent 3ccad1e commit ba76fa9
Show file tree
Hide file tree
Showing 2 changed files with 358 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client.grpc;

import static com.linecorp.armeria.internal.common.grpc.GrpcTestUtil.REQUEST_MESSAGE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.google.protobuf.ByteString;

import com.linecorp.armeria.client.ClientRequestContextCaptor;
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.grpc.GrpcSerializationFormats;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.grpc.testing.Messages.Payload;
import com.linecorp.armeria.grpc.testing.Messages.SimpleRequest;
import com.linecorp.armeria.grpc.testing.Messages.SimpleResponse;
import com.linecorp.armeria.grpc.testing.Messages.StreamingOutputCallRequest;
import com.linecorp.armeria.grpc.testing.Messages.StreamingOutputCallResponse;
import com.linecorp.armeria.grpc.testing.TestServiceGrpc;
import com.linecorp.armeria.grpc.testing.TestServiceGrpc.TestServiceBlockingStub;
import com.linecorp.armeria.grpc.testing.TestServiceGrpc.TestServiceStub;
import com.linecorp.armeria.internal.common.grpc.StreamRecorder;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.server.logging.LoggingService;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;

import io.grpc.InternalMetadata;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

class GrpcServiceImplErrorFromClientViewTest {

private static final StreamingOutputCallRequest STREAMING_OUTPUT_CALL_REQUEST_MESSAGE =
StreamingOutputCallRequest
.newBuilder()
.setPayload(
Payload.newBuilder().setBody(ByteString.copyFromUtf8("ping1")).build())
.build();
// Valid metadata has even count of binaryValues.
private static final Metadata validMetadata = InternalMetadata.newMetadata(
new byte[] { 0 }, new byte[] { 1 }, new byte[] { 2 }, new byte[] { 3 });
// 'usedNames' is 3, but size of 'binaryValues' is 2.
// 'usedNames' and size of 'binaryValues' must be equal, so this is corrupted metadata.
private static final Metadata corruptedMetadata =
InternalMetadata.newMetadata(3, new byte[] { 0 }, new byte[] { 1 });

@RegisterExtension
static ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
final GrpcService grpcService =
GrpcService
.builder()
.addService(
new ServerImplErrorAtMetadataService())
.build();
sb.service(grpcService);
sb.decorator(LoggingService.newDecorator());
}
};

@RegisterExtension
static ServerExtension serverWithBlockingExecutor = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
final GrpcService grpcService =
GrpcService.builder()
.addService(
new ServerImplErrorAtMetadataService())
.enableUnframedRequests(true)
.supportedSerializationFormats(
GrpcSerializationFormats.values())
.useBlockingTaskExecutor(true)
.build();
sb.service(grpcService);
sb.decorator(LoggingService.newDecorator());
}
};

// Normal case of #onError at Unary RPC.
@Test
void clientUnaryCall2() throws InterruptedException {
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
assertThatThrownBy(() -> {
final TestServiceBlockingStub client =
GrpcClients.newClient("http://127.0.0.1:" + serverWithBlockingExecutor.httpPort(),
TestServiceBlockingStub.class)
.withDeadlineAfter(200, TimeUnit.SECONDS);
client.unaryCall2(REQUEST_MESSAGE);
}).satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.ABORTED.getCode());
assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> {
assertThat(metadata).isNotEqualTo(validMetadata);
});
});

final RequestLog log = captor.get().log().whenComplete().join();
assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK);
assertThat(log.responseCause())
.isInstanceOf(StatusRuntimeException.class)
.satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.ABORTED.getCode());
assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> {
assertThat(metadata).isNotEqualTo(validMetadata);
});
});
}
}

// Error inside #onError at Unary RPC.
@Test
void clientUnaryCall() throws InterruptedException {
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
assertThatThrownBy(() -> {
final TestServiceBlockingStub client =
GrpcClients.newClient("http://127.0.0.1:" + serverWithBlockingExecutor.httpPort(),
TestServiceBlockingStub.class)
.withDeadlineAfter(2, TimeUnit.SECONDS);

client.unaryCall(REQUEST_MESSAGE);
}).satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.UNKNOWN.getCode());
assertThat(Status.trailersFromThrowable(cause)).isNotEqualTo(corruptedMetadata);
});

final RequestLog log = captor.get().log().whenComplete().join();
assertThat(log.responseStatus()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR);
assertThat(log.responseCause())
.satisfies(cause -> {
assertThat(cause).isInstanceOf(StatusRuntimeException.class);
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(
Status.UNKNOWN.getCode());
assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> {
assertThat(isSameSizeAndNameAndValues(metadata, corruptedMetadata)).isFalse();
});
});
}
}

// Error inside #onError at server streaming RPC
@Test
void clientUnaryCallServerStreamingOutputCall() throws InterruptedException {
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
assertThatThrownBy(() -> {
final TestServiceBlockingStub client = GrpcClients.newClient(
"http://127.0.0.1:" + serverWithBlockingExecutor.httpPort(),
TestServiceBlockingStub.class)
.withDeadlineAfter(60, TimeUnit.SECONDS);
final Iterator<StreamingOutputCallResponse> it = client.streamingOutputCall(
STREAMING_OUTPUT_CALL_REQUEST_MESSAGE);
while (it.hasNext()) {
it.next();
}
}).satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.CANCELLED.getCode());
assertThat(Status.trailersFromThrowable(cause)).isNotEqualTo(corruptedMetadata);
});

final RequestLog log = captor.get().log().whenComplete().join();
assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK);
assertThat(log.responseCause())
.isInstanceOf(StatusRuntimeException.class)
.satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(
Status.CANCELLED.getCode());
assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> {
assertThat(isSameSizeAndNameAndValues(metadata, corruptedMetadata)).isFalse();
});
});
}
}

// Error inside #onError at bidirectional streaming RPC
@Test
void fullDuplexCall() throws Exception {
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
final TestServiceStub client =
GrpcClients.newClient("http://127.0.0.1:" + server.httpPort(),
TestServiceStub.class)
.withDeadlineAfter(200, TimeUnit.SECONDS);

final StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
final StreamObserver<StreamingOutputCallRequest> requestObserer = client.fullDuplexCall(
responseObserver);
for (int i = 0; i < 3; i++) {
requestObserer.onNext(StreamingOutputCallRequest.newBuilder().setPayload(
Payload.newBuilder().setBody(ByteString.copyFromUtf8("ping" + i)).build()).build());
}
responseObserver.awaitCompletion();

assertThat(Status.fromThrowable(responseObserver.getError()).getCode()).isEqualTo(
Status.CANCELLED.getCode());
assertThat(Status.trailersFromThrowable(responseObserver.getError())).isNotEqualTo(
corruptedMetadata);

final RequestLog log = captor.get().log().whenComplete().join();
assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK);
assertThatThrownBy(() -> log.context().request().whenComplete().join())
.cause()
.isSameAs(log.responseCause())
.isInstanceOf(StatusRuntimeException.class)
.satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(
Status.CANCELLED.getCode());
assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> {
assertThat(isSameSizeAndNameAndValues(metadata, corruptedMetadata)).isFalse();
});
});
}
}

private static final class ServerImplErrorAtMetadataService extends TestServiceGrpc.TestServiceImplBase {
// blocking client
// No error happens when serializing metadata.
@Override
public void unaryCall2(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
responseObserver.onError(new StatusRuntimeException(Status.ABORTED, validMetadata));
}

// blocking client
// Error happens when serializing metadata.
@Override
public void unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
responseObserver.onError(new StatusRuntimeException(Status.ABORTED, corruptedMetadata));
}

// blocking client
// Error happens when serializing metadata.
@Override
public void streamingOutputCall(StreamingOutputCallRequest request,
StreamObserver<StreamingOutputCallResponse> responseObserver) {
responseObserver.onNext(StreamingOutputCallResponse.newBuilder().setPayload(
Payload.newBuilder().setBody(ByteString.copyFromUtf8("pong1")).build()).build());
if (request.equals(STREAMING_OUTPUT_CALL_REQUEST_MESSAGE)) {
responseObserver.onError(
new StatusRuntimeException(Status.FAILED_PRECONDITION, corruptedMetadata));
return; // onError must be the last call.
}
// The following lines are not reached.
responseObserver.onNext(StreamingOutputCallResponse.newBuilder().setPayload(
Payload.newBuilder().setBody(ByteString.copyFromUtf8("pong2")).build()).build());
responseObserver.onCompleted();
}

// Error happens when serializing metadata.
@Override
public StreamObserver<StreamingOutputCallRequest> fullDuplexCall(
StreamObserver<StreamingOutputCallResponse> responseObserver) {
return new StreamObserver<StreamingOutputCallRequest>() {
int cnt;

@Override
public void onNext(StreamingOutputCallRequest value) {
// Return error on the 3rd message.
if (++cnt == 3) {
responseObserver.onError(
new StatusRuntimeException(Status.FAILED_PRECONDITION, corruptedMetadata));
} else {
responseObserver.onNext(
StreamingOutputCallResponse
.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8("pong" + cnt)).build())
.build());
}
}

@Override
public void onError(Throwable t) {
// no-op
}

@Override
public void onCompleted() {
// no-op
}
};
}
}

// Use this method to ensure to escape assertion in the constructor of Metadata.
private static boolean isSameSizeAndNameAndValues(Metadata a, Metadata b) {
try {
final Field sizeField = Metadata.class.getDeclaredField("size");
sizeField.setAccessible(true);

final Field namesAndValuesField = Metadata.class.getDeclaredField("namesAndValues");
namesAndValuesField.setAccessible(true);

return sizeField.get(a) == sizeField.get(b) &&
Arrays.equals((Object[]) namesAndValuesField.get(a),
(Object[]) namesAndValuesField.get(b));
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new RuntimeException("It has failed to mutate fields with reflection.", e);
}
}
}
Loading

0 comments on commit ba76fa9

Please sign in to comment.