Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x Support creating routes from standard gRPC bindable services #7384

Merged
merged 3 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.protobuf.Descriptors;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCallHandler;
import io.grpc.ServerMethodDefinition;
import io.grpc.stub.ServerCalls;

class Grpc<ReqT, ResT> extends GrpcRoute {
Expand Down Expand Up @@ -77,6 +78,23 @@ static <ReqT, ResT> Grpc<ReqT, ResT> clientStream(Descriptors.FileDescriptor pro
return grpc(proto, serviceName, methodName, ServerCalls.asyncClientStreamingCall(method));
}

/**
* Create a {@link io.helidon.nima.grpc.webserver.Grpc gRPC route} from a {@link io.grpc.ServerMethodDefinition}.
*
* @param definition the {@link io.grpc.ServerMethodDefinition} representing the method to execute
* @param proto an optional protocol buffer {@link com.google.protobuf.Descriptors.FileDescriptor}
* containing the service definition
* @param <ReqT> the request type
* @param <ResT> the response type
*
* @return a {@link io.helidon.nima.grpc.webserver.Grpc gRPC route} created
* from the {@link io.grpc.ServerMethodDefinition}
*/
static <ReqT, ResT> Grpc<ReqT, ResT> methodDefinition(ServerMethodDefinition<ReqT, ResT> definition,
Descriptors.FileDescriptor proto) {
return grpc(definition.getMethodDescriptor(), definition.getServerCallHandler(), proto);
}

@Override
Grpc<?, ?> toGrpc(HttpPrologue grpcPrologue) {
return this;
Expand Down Expand Up @@ -131,6 +149,42 @@ private static <ResT, ReqT> Grpc<ReqT, ResT> grpc(Descriptors.FileDescriptor pro
return new Grpc<>(grpcDesc.build(), PathMatchers.exact(path), requestType, responsetype, callHandler);
}


/**
* Create a {@link io.helidon.nima.grpc.webserver.Grpc gRPC route} from a {@link io.grpc.MethodDescriptor}.
*
* @param grpcDesc the {@link io.grpc.MethodDescriptor} describing the method to execute
* @param callHandler the {@link io.grpc.ServerCallHandler} that will execute the method
* @param proto an optional protocol buffer {@link com.google.protobuf.Descriptors.FileDescriptor} containing
* the service definition
* @param <ReqT> the request type
* @param <ResT> the response type
*
* @return a {@link io.helidon.nima.grpc.webserver.Grpc gRPC route} created
* from the {@link io.grpc.ServerMethodDefinition}
*/
private static <ResT, ReqT> Grpc<ReqT, ResT> grpc(MethodDescriptor<ReqT, ResT> grpcDesc,
ServerCallHandler<ReqT, ResT> callHandler,
Descriptors.FileDescriptor proto) {

Class<ReqT> requestType = null;
Class<ResT> responsetype = null;
String serviceName = grpcDesc.getServiceName();

if (proto != null && serviceName != null) {
Descriptors.ServiceDescriptor svc = proto.findServiceByName(serviceName);
Descriptors.MethodDescriptor mtd = svc.findMethodByName(grpcDesc.getBareMethodName());
/*
We have to use reflection here
- to load the class
- to invoke a static method on it
*/
requestType = load(getClassName(mtd.getInputType()));
responsetype = load(getClassName(mtd.getOutputType()));
}
return new Grpc<>(grpcDesc, PathMatchers.exact(grpcDesc.getFullMethodName()), requestType, responsetype, callHandler);
}

private static String getClassName(Descriptors.Descriptor descriptor) {
Descriptors.FileDescriptor fd = descriptor.getFile();
String outerClass = getOuterClass(fd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,16 @@ public void sendMessage(RES message) {
public void close(Status status, Metadata trailers) {
// todo ignoring trailers
WritableHeaders<?> writable = WritableHeaders.create();
writable.set(GrpcStatus.OK);
Http2Headers http2Headers = Http2Headers.create(writable);

// write the expected gRPC headers for content type and status
writable.set(GRPC_CONTENT_TYPE);
writable.set(Http.Headers.create(GrpcStatus.STATUS_NAME, status.getCode().value()));
String description = status.getDescription();
if (description != null) {
writable.set(Http.Headers.create(GrpcStatus.MESSAGE_NAME, description));
}

Http2Headers http2Headers = Http2Headers.create(writable).status(Http.Status.OK_200);
streamWriter.writeHeaders(http2Headers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,9 @@
import io.helidon.nima.webserver.Routing;

import com.google.protobuf.Descriptors;
import io.grpc.BindableService;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.stub.ServerCalls;

/**
Expand Down Expand Up @@ -181,6 +184,35 @@ public <ReqT, ResT> Builder clientStream(Descriptors.FileDescriptor proto,
return route(Grpc.clientStream(proto, serviceName, methodName, method));
}

/**
* Add all the routes for a {@link BindableService} service.
*
* @param proto the proto descriptor
* @param service the {@link BindableService} to add routes for
*
* @return updated builder
*/
public Builder service(Descriptors.FileDescriptor proto, BindableService service) {
for (ServerMethodDefinition<?, ?> method : service.bindService().getMethods()) {
route(Grpc.methodDefinition(method, proto));
}
return this;
}

/**
* Add all the routes for the {@link ServerServiceDefinition} service.
*
* @param service the {@link ServerServiceDefinition} to add routes for
*
* @return updated builder
*/
public Builder service(ServerServiceDefinition service) {
for (ServerMethodDefinition<?, ?> method : service.getMethods()) {
route(Grpc.methodDefinition(method, null));
}
return this;
}

private Builder route(GrpcRoute route) {
routes.add(route);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public final class GrpcStatus {
* grpc status header name.
*/
public static final HeaderName STATUS_NAME = HeaderNames.createFromLowercase("grpc-status");
/**
* grpc status message header name.
*/
public static final HeaderName MESSAGE_NAME = HeaderNames.createFromLowercase("grpc-message");
/**
* The operation completed successfully.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.nima.tests.integration.grpc.webserver;

import java.util.Locale;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.helidon.nima.grpc.strings.StringServiceGrpc;
import io.helidon.nima.grpc.strings.Strings;
import io.helidon.nima.grpc.webserver.CollectingObserver;

import io.grpc.stub.StreamObserver;

import static io.helidon.nima.grpc.webserver.ResponseHelper.complete;
import static io.helidon.nima.grpc.webserver.ResponseHelper.stream;

public class BindableStringService
extends StringServiceGrpc.StringServiceImplBase {

@Override
public void upper(Strings.StringMessage request, StreamObserver<Strings.StringMessage> observer) {
String requestText = request.getText();
complete(observer, Strings.StringMessage.newBuilder()
.setText(requestText.toUpperCase(Locale.ROOT))
.build());
}

@Override
public void lower(Strings.StringMessage request, StreamObserver<Strings.StringMessage> observer) {
String requestText = request.getText();
complete(observer, Strings.StringMessage.newBuilder()
.setText(requestText.toLowerCase(Locale.ROOT))
.build());
}

@Override
public void split(Strings.StringMessage request, StreamObserver<Strings.StringMessage> observer) {
String[] parts = request.getText().split(" ");
stream(observer, Stream.of(parts).map(this::response));
}

@Override
public StreamObserver<Strings.StringMessage> join(StreamObserver<Strings.StringMessage> observer) {
return new CollectingObserver<>(
Collectors.joining(" "),
observer,
Strings.StringMessage::getText,
this::response);
}

@Override
public StreamObserver<Strings.StringMessage> echo(StreamObserver<Strings.StringMessage> observer) {
return new StreamObserver<>() {
public void onNext(Strings.StringMessage value) {
observer.onNext(value);
}

public void onError(Throwable t) {
t.printStackTrace();
}

public void onCompleted() {
observer.onCompleted();
}
};
}

private Strings.StringMessage response(String text) {
return Strings.StringMessage.newBuilder().setText(text).build();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,8 @@ public Descriptors.FileDescriptor proto() {

@Override
public void update(Routing router) {
router.unary("Upper", this::grpcUnary)
router.unary("Upper", this::grpcUnaryUpper)
.unary("Lower", this::grpcUnaryLower)
.bidi("Echo", this::grpcBidi)
.serverStream("Split", this::grpcServerStream)
.clientStream("Join", this::grpcClientStream);
Expand Down Expand Up @@ -74,13 +75,20 @@ public void onCompleted() {
};
}

private void grpcUnary(StringMessage request, StreamObserver<StringMessage> observer) {
private void grpcUnaryUpper(StringMessage request, StreamObserver<StringMessage> observer) {
String requestText = request.getText();
complete(observer, StringMessage.newBuilder()
.setText(requestText.toUpperCase(Locale.ROOT))
.build());
}

private void grpcUnaryLower(StringMessage request, StreamObserver<StringMessage> observer) {
String requestText = request.getText();
complete(observer, StringMessage.newBuilder()
.setText(requestText.toLowerCase(Locale.ROOT))
.build());
}

private StringMessage response(String text) {
return StringMessage.newBuilder().setText(text).build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,22 +42,17 @@
import static org.hamcrest.Matchers.contains;

@ServerTest
class GrpcServerTest {
abstract class BaseStringServiceTest {
private final int port;

private ManagedChannel channel;
private StringServiceGrpc.StringServiceBlockingStub blockingStub;
private StringServiceGrpc.StringServiceStub stub;
protected ManagedChannel channel;
protected StringServiceGrpc.StringServiceBlockingStub blockingStub;
protected StringServiceGrpc.StringServiceStub stub;

GrpcServerTest(WebServer server) {
BaseStringServiceTest(WebServer server) {
this.port = server.port();
}

@SetUpRoute
static void routing(Router.RouterBuilder<?> router) {
router.addRouting(GrpcRouting.builder().service(new StringService()));
}

@BeforeEach
void beforeEach() {
channel = ManagedChannelBuilder.forAddress("localhost", port)
Expand All @@ -81,14 +76,23 @@ void afterEach() throws InterruptedException {
}

@RepeatedTest(20)
void testUnary() {
void testUnaryUpper() {
String text = "lower case original";
StringMessage request = StringMessage.newBuilder().setText(text).build();
StringMessage response = blockingStub.upper(request);

assertThat(response.getText(), is(text.toUpperCase(Locale.ROOT)));
}

@RepeatedTest(20)
void testUnaryLower() {
String text = "UPPER CASE ORIGINAL";
StringMessage request = StringMessage.newBuilder().setText(text).build();
StringMessage response = blockingStub.lower(request);

assertThat(response.getText(), is(text.toLowerCase(Locale.ROOT)));
}

@RepeatedTest(20)
void testBidi() throws Throwable {
List<String> valuesToStream = List.of("A", "B", "C", "D");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.nima.tests.integration.grpc.webserver;

import io.helidon.nima.grpc.strings.Strings;
import io.helidon.nima.grpc.webserver.GrpcRouting;
import io.helidon.nima.testing.junit5.webserver.ServerTest;
import io.helidon.nima.testing.junit5.webserver.SetUpRoute;
import io.helidon.nima.webserver.Router;
import io.helidon.nima.webserver.WebServer;

@ServerTest
class BindableGrpcServiceTest
extends BaseStringServiceTest {
BindableGrpcServiceTest(WebServer server) {
super(server);
}

@SetUpRoute
static void routing(Router.RouterBuilder<?> router) {
router.addRouting(GrpcRouting.builder().service(Strings.getDescriptor(), new BindableStringService()));
}
}
Loading