diff --git a/android-interop-testing/src/generated/debug/grpc/io/grpc/testing/integration/MetricsServiceGrpc.java b/android-interop-testing/src/generated/debug/grpc/io/grpc/testing/integration/MetricsServiceGrpc.java index adf0c204ce3b..375a4e4120bc 100644 --- a/android-interop-testing/src/generated/debug/grpc/io/grpc/testing/integration/MetricsServiceGrpc.java +++ b/android-interop-testing/src/generated/debug/grpc/io/grpc/testing/integration/MetricsServiceGrpc.java @@ -235,12 +235,8 @@ protected MetricsServiceBlockingV2Stub build( */ public io.grpc.stub.BlockingClientCall getAllGauges(io.grpc.testing.integration.Metrics.EmptyMessage request) throws java.lang.InterruptedException { - io.grpc.stub.BlockingClientCall call = - io.grpc.stub.ClientCalls.blockingBidiStreamingCall( - getChannel(), getGetAllGaugesMethod(), getCallOptions()); - call.write(request); - call.halfClose(); - return call; + return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall( + getChannel(), getGetAllGaugesMethod(), getCallOptions(), request); } /** diff --git a/android-interop-testing/src/generated/debug/grpc/io/grpc/testing/integration/TestServiceGrpc.java b/android-interop-testing/src/generated/debug/grpc/io/grpc/testing/integration/TestServiceGrpc.java index 2ecd22e53ccd..c57869a268e5 100644 --- a/android-interop-testing/src/generated/debug/grpc/io/grpc/testing/integration/TestServiceGrpc.java +++ b/android-interop-testing/src/generated/debug/grpc/io/grpc/testing/integration/TestServiceGrpc.java @@ -611,12 +611,8 @@ public io.grpc.testing.integration.Messages.SimpleResponse cacheableUnaryCall(io */ public io.grpc.stub.BlockingClientCall streamingOutputCall(io.grpc.testing.integration.Messages.StreamingOutputCallRequest request) throws java.lang.InterruptedException { - io.grpc.stub.BlockingClientCall call = - io.grpc.stub.ClientCalls.blockingBidiStreamingCall( - getChannel(), getStreamingOutputCallMethod(), getCallOptions()); - call.write(request); - call.halfClose(); - return call; + return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall( + getChannel(), getStreamingOutputCallMethod(), getCallOptions(), request); } /** @@ -627,7 +623,7 @@ public io.grpc.testing.integration.Messages.SimpleResponse cacheableUnaryCall(io */ public io.grpc.stub.BlockingClientCall streamingInputCall() { - return io.grpc.stub.ClientCalls.blockingBidiStreamingCall( + return io.grpc.stub.ClientCalls.blockingClientStreamingCall( getChannel(), getStreamingInputCallMethod(), getCallOptions()); } diff --git a/android-interop-testing/src/generated/release/grpc/io/grpc/testing/integration/LoadBalancerStatsServiceGrpc.java b/android-interop-testing/src/generated/release/grpc/io/grpc/testing/integration/LoadBalancerStatsServiceGrpc.java index c2645a221c87..fe8c30ba3c4a 100644 --- a/android-interop-testing/src/generated/release/grpc/io/grpc/testing/integration/LoadBalancerStatsServiceGrpc.java +++ b/android-interop-testing/src/generated/release/grpc/io/grpc/testing/integration/LoadBalancerStatsServiceGrpc.java @@ -15,7 +15,7 @@ public final class LoadBalancerStatsServiceGrpc { private LoadBalancerStatsServiceGrpc() {} - public static final String SERVICE_NAME = "grpc.testing.LoadBalancerStatsService"; + public static final java.lang.String SERVICE_NAME = "grpc.testing.LoadBalancerStatsService"; // Static method descriptors that strictly reflect the proto. private static volatile io.grpc.MethodDescriptor */ - public io.grpc.stub.BlockingClientCall + public io.grpc.stub.BlockingClientCall getAllGauges(io.grpc.testing.integration.Metrics.EmptyMessage request) throws java.lang.InterruptedException { - io.grpc.stub.BlockingClientCall call = - io.grpc.stub.ClientCalls.blockingBidiStreamingCall( - getChannel(), getGetAllGaugesMethod(), getCallOptions()); - call.write(request); - call.halfClose(); - return call; + return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall( + getChannel(), getGetAllGaugesMethod(), getCallOptions(), request); } /** diff --git a/android-interop-testing/src/generated/release/grpc/io/grpc/testing/integration/ReconnectServiceGrpc.java b/android-interop-testing/src/generated/release/grpc/io/grpc/testing/integration/ReconnectServiceGrpc.java index 3f64faed8eaf..79f0bdf0fab3 100644 --- a/android-interop-testing/src/generated/release/grpc/io/grpc/testing/integration/ReconnectServiceGrpc.java +++ b/android-interop-testing/src/generated/release/grpc/io/grpc/testing/integration/ReconnectServiceGrpc.java @@ -15,7 +15,7 @@ public final class ReconnectServiceGrpc { private ReconnectServiceGrpc() {} - public static final String SERVICE_NAME = "grpc.testing.ReconnectService"; + public static final java.lang.String SERVICE_NAME = "grpc.testing.ReconnectService"; // Static method descriptors that strictly reflect the proto. private static volatile io.grpc.MethodDescriptor */ - public io.grpc.stub.BlockingClientCall + public io.grpc.stub.BlockingClientCall streamingOutputCall(io.grpc.testing.integration.Messages.StreamingOutputCallRequest request) throws java.lang.InterruptedException { - io.grpc.stub.BlockingClientCall call = - io.grpc.stub.ClientCalls.blockingBidiStreamingCall( - getChannel(), getStreamingOutputCallMethod(), getCallOptions()); - call.write(request); - call.halfClose(); - return call; + return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall( + getChannel(), getStreamingOutputCallMethod(), getCallOptions(), request); } /** @@ -625,9 +621,9 @@ public io.grpc.testing.integration.Messages.SimpleResponse cacheableUnaryCall(io * The server returns the aggregated size of client payload as the result. * */ - public io.grpc.stub.BlockingClientCall + public io.grpc.stub.BlockingClientCall streamingInputCall() { - return io.grpc.stub.ClientCalls.blockingBidiStreamingCall( + return io.grpc.stub.ClientCalls.blockingClientStreamingCall( getChannel(), getStreamingInputCallMethod(), getCallOptions()); } @@ -638,7 +634,7 @@ public io.grpc.testing.integration.Messages.SimpleResponse cacheableUnaryCall(io * demonstrates the idea of full duplexing. * */ - public io.grpc.stub.BlockingClientCall + public io.grpc.stub.BlockingClientCall fullDuplexCall() { return io.grpc.stub.ClientCalls.blockingBidiStreamingCall( getChannel(), getFullDuplexCallMethod(), getCallOptions()); @@ -652,7 +648,7 @@ public io.grpc.testing.integration.Messages.SimpleResponse cacheableUnaryCall(io * first request. * */ - public io.grpc.stub.BlockingClientCall + public io.grpc.stub.BlockingClientCall halfDuplexCall() { return io.grpc.stub.ClientCalls.blockingBidiStreamingCall( getChannel(), getHalfDuplexCallMethod(), getCallOptions()); diff --git a/android-interop-testing/src/generated/release/grpc/io/grpc/testing/integration/UnimplementedServiceGrpc.java b/android-interop-testing/src/generated/release/grpc/io/grpc/testing/integration/UnimplementedServiceGrpc.java index a240da69835d..be3d1eab13af 100644 --- a/android-interop-testing/src/generated/release/grpc/io/grpc/testing/integration/UnimplementedServiceGrpc.java +++ b/android-interop-testing/src/generated/release/grpc/io/grpc/testing/integration/UnimplementedServiceGrpc.java @@ -16,7 +16,7 @@ public final class UnimplementedServiceGrpc { private UnimplementedServiceGrpc() {} - public static final String SERVICE_NAME = "grpc.testing.UnimplementedService"; + public static final java.lang.String SERVICE_NAME = "grpc.testing.UnimplementedService"; // Static method descriptors that strictly reflect the proto. private static volatile io.grpc.MethodDescriptor streamingFromClient() { - return io.grpc.stub.ClientCalls.blockingBidiStreamingCall( + return io.grpc.stub.ClientCalls.blockingClientStreamingCall( getChannel(), getStreamingFromClientMethod(), getCallOptions()); } @@ -439,12 +439,8 @@ public io.grpc.benchmarks.proto.Messages.SimpleResponse unaryCall(io.grpc.benchm */ public io.grpc.stub.BlockingClientCall streamingFromServer(io.grpc.benchmarks.proto.Messages.SimpleRequest request) throws java.lang.InterruptedException { - io.grpc.stub.BlockingClientCall call = - io.grpc.stub.ClientCalls.blockingBidiStreamingCall( - getChannel(), getStreamingFromServerMethod(), getCallOptions()); - call.write(request); - call.halfClose(); - return call; + return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall( + getChannel(), getStreamingFromServerMethod(), getCallOptions(), request); } /** diff --git a/compiler/src/java_plugin/cpp/java_generator.cpp b/compiler/src/java_plugin/cpp/java_generator.cpp index 3daaeeeff3a3..2bb771e4e90b 100644 --- a/compiler/src/java_plugin/cpp/java_generator.cpp +++ b/compiler/src/java_plugin/cpp/java_generator.cpp @@ -806,23 +806,23 @@ static void PrintStub( " getChannel(), $method_method_name$(), getCallOptions(), $params$);\n"); break; case BLOCKING_V2_CALL: - if (client_streaming) { // used for both client and bidi - p->Print( + if (client_streaming) { // client and bidi streaming + if (server_streaming) { + (*vars)["calls_method"] = "io.grpc.stub.ClientCalls.blockingBidiStreamingCall"; + } else { + (*vars)["calls_method"] = "io.grpc.stub.ClientCalls.blockingClientStreamingCall"; + } + p->Print( *vars, - "return io.grpc.stub.ClientCalls.blockingBidiStreamingCall(\n" + "return $calls_method$(\n" " getChannel(), $method_method_name$(), getCallOptions());\n"); - } else if (server_streaming) { - p->Print( - *vars, - "$BlockingClientCall$<$input_type$, $output_type$> call =\n" - " io.grpc.stub.ClientCalls.blockingBidiStreamingCall(\n" - " getChannel(), $method_method_name$(), getCallOptions());\n" - "call.write(request);\n" - "call.halfClose();\n" - "return call;\n"); - } else { - (*vars)["calls_method"] = "io.grpc.stub.ClientCalls.blockingUnaryCall"; - (*vars)["params"] = "request"; + } else { // server streaming and unary + (*vars)["params"] = "request"; + if (server_streaming) { + (*vars)["calls_method"] = "io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall"; + } else { + (*vars)["calls_method"] = "io.grpc.stub.ClientCalls.blockingUnaryCall"; + } p->Print( *vars, diff --git a/compiler/src/test/golden/TestService.java.txt b/compiler/src/test/golden/TestService.java.txt index dc8160821a56..c98b772a4f50 100644 --- a/compiler/src/test/golden/TestService.java.txt +++ b/compiler/src/test/golden/TestService.java.txt @@ -593,12 +593,8 @@ public final class TestServiceGrpc { */ public io.grpc.stub.BlockingClientCall streamingOutputCall(io.grpc.testing.compiler.Test.StreamingOutputCallRequest request) throws java.lang.InterruptedException { - io.grpc.stub.BlockingClientCall call = - io.grpc.stub.ClientCalls.blockingBidiStreamingCall( - getChannel(), getStreamingOutputCallMethod(), getCallOptions()); - call.write(request); - call.halfClose(); - return call; + return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall( + getChannel(), getStreamingOutputCallMethod(), getCallOptions(), request); } /** @@ -609,7 +605,7 @@ public final class TestServiceGrpc { */ public io.grpc.stub.BlockingClientCall streamingInputCall() { - return io.grpc.stub.ClientCalls.blockingBidiStreamingCall( + return io.grpc.stub.ClientCalls.blockingClientStreamingCall( getChannel(), getStreamingInputCallMethod(), getCallOptions()); } diff --git a/compiler/src/testLite/golden/TestService.java.txt b/compiler/src/testLite/golden/TestService.java.txt index 341820bac0f0..408eb750abfd 100644 --- a/compiler/src/testLite/golden/TestService.java.txt +++ b/compiler/src/testLite/golden/TestService.java.txt @@ -585,12 +585,8 @@ public final class TestServiceGrpc { */ public io.grpc.stub.BlockingClientCall streamingOutputCall(io.grpc.testing.compiler.Test.StreamingOutputCallRequest request) throws java.lang.InterruptedException { - io.grpc.stub.BlockingClientCall call = - io.grpc.stub.ClientCalls.blockingBidiStreamingCall( - getChannel(), getStreamingOutputCallMethod(), getCallOptions()); - call.write(request); - call.halfClose(); - return call; + return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall( + getChannel(), getStreamingOutputCallMethod(), getCallOptions(), request); } /** @@ -601,7 +597,7 @@ public final class TestServiceGrpc { */ public io.grpc.stub.BlockingClientCall streamingInputCall() { - return io.grpc.stub.ClientCalls.blockingBidiStreamingCall( + return io.grpc.stub.ClientCalls.blockingClientStreamingCall( getChannel(), getStreamingInputCallMethod(), getCallOptions()); } diff --git a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/MetricsServiceGrpc.java b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/MetricsServiceGrpc.java index c310a8f48abc..30f8fff0127e 100644 --- a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/MetricsServiceGrpc.java +++ b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/MetricsServiceGrpc.java @@ -237,12 +237,8 @@ protected MetricsServiceBlockingV2Stub build( */ public io.grpc.stub.BlockingClientCall getAllGauges(io.grpc.testing.integration.Metrics.EmptyMessage request) throws java.lang.InterruptedException { - io.grpc.stub.BlockingClientCall call = - io.grpc.stub.ClientCalls.blockingBidiStreamingCall( - getChannel(), getGetAllGaugesMethod(), getCallOptions()); - call.write(request); - call.halfClose(); - return call; + return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall( + getChannel(), getGetAllGaugesMethod(), getCallOptions(), request); } /** diff --git a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java index d9e6a7f05d4b..31905fa48787 100644 --- a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java +++ b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java @@ -619,12 +619,8 @@ public io.grpc.testing.integration.Messages.SimpleResponse cacheableUnaryCall(io */ public io.grpc.stub.BlockingClientCall streamingOutputCall(io.grpc.testing.integration.Messages.StreamingOutputCallRequest request) throws java.lang.InterruptedException { - io.grpc.stub.BlockingClientCall call = - io.grpc.stub.ClientCalls.blockingBidiStreamingCall( - getChannel(), getStreamingOutputCallMethod(), getCallOptions()); - call.write(request); - call.halfClose(); - return call; + return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall( + getChannel(), getStreamingOutputCallMethod(), getCallOptions(), request); } /** @@ -635,7 +631,7 @@ public io.grpc.testing.integration.Messages.SimpleResponse cacheableUnaryCall(io */ public io.grpc.stub.BlockingClientCall streamingInputCall() { - return io.grpc.stub.ClientCalls.blockingBidiStreamingCall( + return io.grpc.stub.ClientCalls.blockingClientStreamingCall( getChannel(), getStreamingInputCallMethod(), getCallOptions()); } diff --git a/services/src/generated/main/grpc/io/grpc/health/v1/HealthGrpc.java b/services/src/generated/main/grpc/io/grpc/health/v1/HealthGrpc.java index dc2fc45feb7c..1b14c98f28cb 100644 --- a/services/src/generated/main/grpc/io/grpc/health/v1/HealthGrpc.java +++ b/services/src/generated/main/grpc/io/grpc/health/v1/HealthGrpc.java @@ -283,12 +283,8 @@ public io.grpc.health.v1.HealthCheckResponse check(io.grpc.health.v1.HealthCheck */ public io.grpc.stub.BlockingClientCall watch(io.grpc.health.v1.HealthCheckRequest request) throws java.lang.InterruptedException { - io.grpc.stub.BlockingClientCall call = - io.grpc.stub.ClientCalls.blockingBidiStreamingCall( - getChannel(), getWatchMethod(), getCallOptions()); - call.write(request); - call.halfClose(); - return call; + return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall( + getChannel(), getWatchMethod(), getCallOptions(), request); } } diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 5271e7fbfaba..af1d43a47e7a 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -202,14 +202,14 @@ public static Iterator blockingServerStreamingCall( /** * Executes a server-streaming call returning a blocking {@link Iterator} over the - * response stream. The {@code call} should not be already started. After calling this method, - * {@code call} should no longer be used. + * response stream. * *

The returned iterator may throw {@link StatusRuntimeException} on error. * + *

Warning: the iterator can result in leaks if not completely consumed. + * * @return an iterator over the response stream. */ - // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs. public static Iterator blockingServerStreamingCall( Channel channel, MethodDescriptor method, CallOptions callOptions, ReqT req) { ThreadlessExecutor executor = new ThreadlessExecutor(); @@ -221,6 +221,46 @@ public static Iterator blockingServerStreamingCall( return result; } + /** + * The better way to do server streaming calls.
+ * Call {@link BlockingClientCall#read()} for + * retrieving values. A {@code null} will be returned after the server has closed the stream. + *
+ * The methods {@link BlockingClientCall#hasNext()} and {@link + * BlockingClientCall#cancel(String, Throwable)} can be used for more extensive control. + *
+ *

Example usage:

+ *
+   * {@code  while ((response = call.read()) != null) { ... } }
+   * 
+ * or + *
+   * {@code
+   *   while (call.hasNext()) {
+   *     response = call.read();
+   *     ...
+   *   }
+   * }
+   * 
+ * + * @return A {@link BlockingClientCall} that has had the request sent and halfClose called + * @throws InterruptedException if it receives an interrupt while sending the request + */ + public static BlockingClientCall blockingV2ServerStreamingCall( + Channel channel, MethodDescriptor method, CallOptions callOptions, ReqT req) + throws InterruptedException { + io.grpc.stub.BlockingClientCall call = + blockingBidiStreamingCall(channel, method, callOptions); + call.write(req); + call.halfClose(); + return call; + } + + public static BlockingClientCall blockingClientStreamingCall( + Channel channel, MethodDescriptor method, CallOptions callOptions) { + return blockingBidiStreamingCall(channel, method, callOptions); + } + /** * Initiate a bidirectional-streaming {@link ClientCall} and returning a stream object * ({@link BlockingClientCall}) which can be used by the client to send and receive messages over diff --git a/testing-proto/src/generated/main/grpc/io/grpc/testing/protobuf/SimpleServiceGrpc.java b/testing-proto/src/generated/main/grpc/io/grpc/testing/protobuf/SimpleServiceGrpc.java index b7f3a58d6df7..0d7172a5eacd 100644 --- a/testing-proto/src/generated/main/grpc/io/grpc/testing/protobuf/SimpleServiceGrpc.java +++ b/testing-proto/src/generated/main/grpc/io/grpc/testing/protobuf/SimpleServiceGrpc.java @@ -363,7 +363,7 @@ public io.grpc.testing.protobuf.SimpleResponse unaryRpc(io.grpc.testing.protobuf */ public io.grpc.stub.BlockingClientCall clientStreamingRpc() { - return io.grpc.stub.ClientCalls.blockingBidiStreamingCall( + return io.grpc.stub.ClientCalls.blockingClientStreamingCall( getChannel(), getClientStreamingRpcMethod(), getCallOptions()); } @@ -374,12 +374,8 @@ public io.grpc.testing.protobuf.SimpleResponse unaryRpc(io.grpc.testing.protobuf */ public io.grpc.stub.BlockingClientCall serverStreamingRpc(io.grpc.testing.protobuf.SimpleRequest request) throws java.lang.InterruptedException { - io.grpc.stub.BlockingClientCall call = - io.grpc.stub.ClientCalls.blockingBidiStreamingCall( - getChannel(), getServerStreamingRpcMethod(), getCallOptions()); - call.write(request); - call.halfClose(); - return call; + return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall( + getChannel(), getServerStreamingRpcMethod(), getCallOptions(), request); } /** diff --git a/xds/src/generated/thirdparty/grpc/com/github/xds/service/orca/v3/OpenRcaServiceGrpc.java b/xds/src/generated/thirdparty/grpc/com/github/xds/service/orca/v3/OpenRcaServiceGrpc.java index 3d9fd477eebd..bd02fe198603 100644 --- a/xds/src/generated/thirdparty/grpc/com/github/xds/service/orca/v3/OpenRcaServiceGrpc.java +++ b/xds/src/generated/thirdparty/grpc/com/github/xds/service/orca/v3/OpenRcaServiceGrpc.java @@ -223,12 +223,8 @@ protected OpenRcaServiceBlockingV2Stub build( */ public io.grpc.stub.BlockingClientCall streamCoreMetrics(com.github.xds.service.orca.v3.OrcaLoadReportRequest request) throws java.lang.InterruptedException { - io.grpc.stub.BlockingClientCall call = - io.grpc.stub.ClientCalls.blockingBidiStreamingCall( - getChannel(), getStreamCoreMetricsMethod(), getCallOptions()); - call.write(request); - call.halfClose(); - return call; + return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall( + getChannel(), getStreamCoreMetricsMethod(), getCallOptions(), request); } }