Skip to content

Commit

Permalink
Catch Exception thrown at setting trailers
Browse files Browse the repository at this point in the history
Partial fix for line#4625 .
This fix sets 'RequestLog.request/responseCause()' only the case when Metadata was invalid.
If we catch Exception at com.linecorp.armeria.server.grpc.StreamingServerCall#doClose
and set them as new metadata, then they could be transferred to client side before timeout.
  • Loading branch information
aki-s committed Aug 16, 2023
1 parent b2e31ee commit 0214fab
Show file tree
Hide file tree
Showing 2 changed files with 327 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,14 @@ public static HttpHeaders statusToTrailers(
GrpcTrailersUtil.addStatusMessageToTrailers(
trailersBuilder, status.getCode().value(), status.getDescription());

MetadataUtil.fillHeaders(metadata, trailersBuilder);
try {
MetadataUtil.fillHeaders(metadata, trailersBuilder);
} catch (Exception e) {
// Catching exception is necessary, because server implementer could have set corrupted metadata.
return trailersBuilder
.set(GrpcHeaderNames.GRPC_STATUS, "13")
.build();
}

if (ctx.config().verboseResponses() && status.getCause() != null) {
final ThrowableProto proto = GrpcStatus.serializeThrowable(status.getCause());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.server.grpc;

import static com.linecorp.armeria.internal.common.grpc.GrpcTestUtil.REQUEST_MESSAGE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Iterator;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.google.protobuf.ByteString;

import com.linecorp.armeria.client.ClientRequestContextCaptor;
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.client.grpc.GrpcClients;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.grpc.protocol.GrpcHeaderNames;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.grpc.testing.Messages.Payload;
import com.linecorp.armeria.grpc.testing.Messages.SimpleRequest;
import com.linecorp.armeria.grpc.testing.Messages.SimpleResponse;
import com.linecorp.armeria.grpc.testing.Messages.StreamingOutputCallRequest;
import com.linecorp.armeria.grpc.testing.Messages.StreamingOutputCallResponse;
import com.linecorp.armeria.grpc.testing.TestServiceGrpc;
import com.linecorp.armeria.grpc.testing.TestServiceGrpc.TestServiceBlockingStub;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.logging.LoggingService;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;

import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.InternalMetadata;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

class GrpcServiceImplErrorTest {

private static final StreamingOutputCallRequest STREAMING_OUTPUT_CALL_REQUEST_MESSAGE =
StreamingOutputCallRequest
.newBuilder()
.setPayload(
Payload.newBuilder().setBody(ByteString.copyFromUtf8("ping1")).build())
.build();
// Valid metadata has even count of binaryValues.
private static final Metadata validMetadata = InternalMetadata.newMetadata(
"key1".getBytes(StandardCharsets.US_ASCII),
"val1".getBytes(StandardCharsets.US_ASCII),
"key2".getBytes(StandardCharsets.US_ASCII),
"val2".getBytes(StandardCharsets.US_ASCII)
);
// 'usedNames' is 3, but size of 'binaryValues' is 2.
// 'usedNames' and size of 'binaryValues' must be equal, so this is corrupted metadata.
private static final Metadata corruptedMetadata =
InternalMetadata.newMetadata(
3, "key1".getBytes(StandardCharsets.US_ASCII), "val1".getBytes(StandardCharsets.US_ASCII));

@RegisterExtension
static ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
configureGrpcService(sb);
}
};

@RegisterExtension
static ServerExtension serverUsingCorruptedInterceptor = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
final ServerInterceptor corruptedServerInterceptor = new ServerInterceptor() {
@Override
public <I, O> Listener<I> interceptCall(ServerCall<I, O> call,
Metadata headers,
ServerCallHandler<I, O> next) {
return next.startCall(new SimpleForwardingServerCall<I, O>(call) {
@Override
public void close(Status status, Metadata trailers) {
super.close(status, corruptedMetadata);
}
}, headers);
}
};
configureGrpcService(sb, corruptedServerInterceptor);
}
};

// Normal case of #onError at Unary RPC, but metadata set in server interceptor is corrupted.
// Client cannot expect corrupted metadata is returned from server.
@Test
void clientUnaryCall2ForServerUsingCorruptedInterceptor() throws InterruptedException {
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
assertThatThrownBy(() -> {
final TestServiceBlockingStub client =
GrpcClients.newClient("http://127.0.0.1:" + serverUsingCorruptedInterceptor.httpPort(),
TestServiceBlockingStub.class);
client.unaryCall2(REQUEST_MESSAGE);
}).satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.INTERNAL.getCode());
assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> {
assertThat(isSameSizeAndNameAndValues(metadata, corruptedMetadata)).isFalse();
});
});

final RequestLog log = captor.get().log().whenComplete().join();
assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK);
assertThat(log.responseTrailers().get(GrpcHeaderNames.GRPC_STATUS)).isNull();
assertThat(log.responseHeaders().get(GrpcHeaderNames.GRPC_STATUS)).satisfies(grpcStatus -> {
assertThat(grpcStatus).isNotNull();
assertThat(grpcStatus).isEqualTo(String.valueOf(Status.INTERNAL.getCode().value()));
});
assertThat(log.responseCause())
.isInstanceOf(StatusRuntimeException.class)
.satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.INTERNAL.getCode());
assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> {
assertThat(isSameSizeAndNameAndValues(metadata, corruptedMetadata)).isFalse();
});
});
}
}

// Normal case of #onError at Unary RPC.
@Test
void clientUnaryCall2() throws InterruptedException {
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
assertThatThrownBy(() -> {
final TestServiceBlockingStub client =
GrpcClients.newClient("http://127.0.0.1:" + server.httpPort(),
TestServiceBlockingStub.class);
client.unaryCall2(REQUEST_MESSAGE);
}).satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.ABORTED.getCode());
assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> {
for (String key : validMetadata.keys()) {
final Key<String> metaKey = Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
assertThat(metadata.get(metaKey)).isEqualTo(validMetadata.get(metaKey));
}
});
});

final RequestLog log = captor.get().log().whenComplete().join();
assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK);
assertThat(log.responseTrailers().get(GrpcHeaderNames.GRPC_STATUS)).isNull();
assertThat(log.responseHeaders().get(GrpcHeaderNames.GRPC_STATUS)).satisfies(grpcStatus -> {
assertThat(grpcStatus).isNotNull();
assertThat(grpcStatus).isEqualTo(String.valueOf(Status.ABORTED.getCode().value()));
});
assertThat(log.responseCause())
.isInstanceOf(StatusRuntimeException.class)
.satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.ABORTED.getCode());
assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> {
for (String key : validMetadata.keys()) {
final Key<String> metaKey = Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
assertThat(metadata.get(metaKey)).isEqualTo(validMetadata.get(metaKey));
}
});
});
}
}

// Error inside #onError at Unary RPC.
// Client cannot expect corrupted metadata is returned from server.
@Test
void clientUnaryCall() throws InterruptedException {
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
assertThatThrownBy(() -> {
final TestServiceBlockingStub client =
GrpcClients.newClient("http://127.0.0.1:" + server.httpPort(),
TestServiceBlockingStub.class);

client.unaryCall(REQUEST_MESSAGE);
}).satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(Status.INTERNAL.getCode());
assertThat(Status.trailersFromThrowable(cause)).isNotEqualTo(corruptedMetadata);
});

final RequestLog log = captor.get().log().whenComplete().join();
assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK);
assertThat(log.responseTrailers().get(GrpcHeaderNames.GRPC_STATUS)).isNull();
assertThat(log.responseHeaders().get(GrpcHeaderNames.GRPC_STATUS)).satisfies(grpcStatus -> {
assertThat(grpcStatus).isNotNull();
assertThat(grpcStatus).isEqualTo(String.valueOf(Status.INTERNAL.getCode().value()));
});
assertThat(log.responseCause())
.satisfies(cause -> {
assertThat(cause).isInstanceOf(StatusRuntimeException.class);
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(
Status.INTERNAL.getCode());
assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> {
assertThat(isSameSizeAndNameAndValues(metadata, corruptedMetadata)).isFalse();
});
});
}
}

// Error inside #onError at server streaming RPC
// Client cannot expect corrupted metadata is returned from server.
@Test
void clientUnaryCallServerStreamingOutputCall() throws InterruptedException {
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
assertThatThrownBy(() -> {
final TestServiceBlockingStub client =
GrpcClients.newClient(
"http://127.0.0.1:" + server.httpPort(),
TestServiceBlockingStub.class);
final Iterator<StreamingOutputCallResponse> it = client.streamingOutputCall(
STREAMING_OUTPUT_CALL_REQUEST_MESSAGE);
while (it.hasNext()) {
it.next();
}
}).satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(
Status.INTERNAL.getCode());
assertThat(Status.trailersFromThrowable(cause)).isNotEqualTo(corruptedMetadata);
});

final RequestLog log = captor.get().log().whenComplete().join();
assertThat(log.responseStatus()).isEqualTo(HttpStatus.OK);
assertThat(log.responseHeaders().get(GrpcHeaderNames.GRPC_STATUS)).isNull();
assertThat(log.responseTrailers().get(GrpcHeaderNames.GRPC_STATUS)).satisfies(grpcStatus -> {
assertThat(grpcStatus).isNotNull();
assertThat(grpcStatus).isEqualTo(String.valueOf(Status.INTERNAL.getCode().value()));
});
assertThat(log.responseCause())
.isInstanceOf(StatusRuntimeException.class)
.satisfies(cause -> {
assertThat(Status.fromThrowable(cause).getCode()).isEqualTo(
Status.INTERNAL.getCode());
assertThat(Status.trailersFromThrowable(cause)).satisfies(metadata -> {
assertThat(isSameSizeAndNameAndValues(metadata, corruptedMetadata)).isFalse();
});
});
}
}

private static final class ServerImplErrorAtMetadataService extends TestServiceGrpc.TestServiceImplBase {
// blocking client
// No error happens when serializing metadata.
@Override
public void unaryCall2(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
responseObserver.onError(new StatusRuntimeException(Status.ABORTED, validMetadata));
}

// blocking client
// Error happens when serializing metadata.
@Override
public void unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
responseObserver.onError(new StatusRuntimeException(Status.ABORTED, corruptedMetadata));
}

// blocking client
// Error happens when serializing metadata.
@Override
public void streamingOutputCall(StreamingOutputCallRequest request,
StreamObserver<StreamingOutputCallResponse> responseObserver) {
responseObserver.onNext(StreamingOutputCallResponse.newBuilder().setPayload(
Payload.newBuilder().setBody(ByteString.copyFromUtf8("pong1")).build()).build());
if (request.equals(STREAMING_OUTPUT_CALL_REQUEST_MESSAGE)) {
responseObserver.onError(
new StatusRuntimeException(Status.FAILED_PRECONDITION, corruptedMetadata));
return; // onError must be the last call.
}
// The following lines are not reached.
responseObserver.onNext(StreamingOutputCallResponse.newBuilder().setPayload(
Payload.newBuilder().setBody(ByteString.copyFromUtf8("pong2")).build()).build());
responseObserver.onCompleted();
}
}

private static void configureGrpcService(
ServerBuilder sb, ServerInterceptor... interceptors) {
sb.service(GrpcService.builder().addService(new ServerImplErrorAtMetadataService())
.intercept(interceptors).build());
sb.decorator(LoggingService.newDecorator());
}

// Use this method to ensure to escape assertion in the constructor of Metadata.
private static boolean isSameSizeAndNameAndValues(Metadata a, Metadata b) {
try {
final Field sizeField = Metadata.class.getDeclaredField("size");
sizeField.setAccessible(true);

final Field namesAndValuesField = Metadata.class.getDeclaredField("namesAndValues");
namesAndValuesField.setAccessible(true);

return sizeField.get(a) == sizeField.get(b) &&
Arrays.equals((Object[]) namesAndValuesField.get(a),
(Object[]) namesAndValuesField.get(b));
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new RuntimeException("It has failed to mutate fields with reflection.", e);
}
}
}

0 comments on commit 0214fab

Please sign in to comment.