diff --git a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java index 29bb3cc2ed..2241278ed8 100644 --- a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java +++ b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java @@ -1320,8 +1320,9 @@ protected void onKafkaData( if (grpcStatus != null && !HEADER_VALUE_GRPC_OK.value().equals(grpcStatus.value().value())) { + OctetsFW value = grpcStatus.value(); String16FW status = statusRW - .set(grpcStatus.value().buffer(), grpcStatus.offset(), grpcStatus.sizeof()) + .set(value.buffer(), value.offset(), value.sizeof()) .build(); doGrpcAbort(traceId, authorization, status); } @@ -1410,7 +1411,8 @@ private void doGrpcAbort( long authorization, String16FW status) { - if (GrpcKafkaState.replyOpened(state) && !GrpcKafkaState.replyClosed(state)) + if (GrpcKafkaState.replyOpening(state) && + !GrpcKafkaState.replyClosed(state)) { replySeq = correlater.replySeq; diff --git a/runtime/binding-grpc-kafka/src/test/java/io/aklivity/zilla/runtime/blinding/grpc/kafka/internal/stream/GrpcKafkaProduceProxyIT.java b/runtime/binding-grpc-kafka/src/test/java/io/aklivity/zilla/runtime/blinding/grpc/kafka/internal/stream/GrpcKafkaProduceProxyIT.java index 1470ed8a24..3128c2ebb2 100644 --- a/runtime/binding-grpc-kafka/src/test/java/io/aklivity/zilla/runtime/blinding/grpc/kafka/internal/stream/GrpcKafkaProduceProxyIT.java +++ b/runtime/binding-grpc-kafka/src/test/java/io/aklivity/zilla/runtime/blinding/grpc/kafka/internal/stream/GrpcKafkaProduceProxyIT.java @@ -68,6 +68,16 @@ public void shouldRejectUnaryRpc() throws Exception k3po.finish(); } + @Test + @Configuration("produce.proxy.rpc.yaml") + @Specification({ + "${grpc}/unary.rpc.error/client", + "${kafka}/unary.rpc.error/server"}) + public void shouldRejectUnaryRpcWithError() throws Exception + { + k3po.finish(); + } + @Test @Configuration("produce.proxy.rpc.yaml") @Specification({ diff --git a/runtime/binding-grpc/src/main/java/io/aklivity/zilla/runtime/binding/grpc/internal/stream/GrpcClientFactory.java b/runtime/binding-grpc/src/main/java/io/aklivity/zilla/runtime/binding/grpc/internal/stream/GrpcClientFactory.java index 419b50743d..3634c3f24e 100644 --- a/runtime/binding-grpc/src/main/java/io/aklivity/zilla/runtime/binding/grpc/internal/stream/GrpcClientFactory.java +++ b/runtime/binding-grpc/src/main/java/io/aklivity/zilla/runtime/binding/grpc/internal/stream/GrpcClientFactory.java @@ -113,6 +113,8 @@ public class GrpcClientFactory implements GrpcStreamFactory private final GrpcResetExFW.Builder grpcResetExRW = new GrpcResetExFW.Builder(); private final GrpcMessageFW.Builder grpcMessageRW = new GrpcMessageFW.Builder(); + private final GrpcAbortExFW grpcAbortedStatusRO; + private final MutableDirectBuffer writeBuffer; private final MutableDirectBuffer metadataBuffer; private final MutableDirectBuffer extBuffer; @@ -139,6 +141,11 @@ public GrpcClientFactory( this.grpcTypeId = context.supplyTypeId(GrpcBinding.NAME); this.bindings = new Long2ObjectHashMap<>(); this.helper = new HttpGrpcResponseHeaderHelper(); + + this.grpcAbortedStatusRO = grpcAbortExRW.wrap(new UnsafeBuffer(new byte[32]), 0, 32) + .typeId(grpcTypeId) + .status(HEADER_VALUE_GRPC_ABORTED) + .build(); } @Override @@ -234,6 +241,7 @@ private final class GrpcClient private int replyPad; private int state; + private String grpcStatus; private GrpcClient( MessageConsumer application, @@ -403,7 +411,14 @@ private void onAppWindow( replyPad = padding; state = GrpcState.openReply(state); - delegate.doNetWindow(traceId, authorization, budgetId, padding, replyAck, replyMax); + if (GrpcState.replyClosing(state)) + { + doAppAbortDeferred(traceId, authorization); + } + else + { + delegate.doNetWindow(traceId, authorization, budgetId, padding, replyAck, replyMax); + } assert replyAck <= replySeq; @@ -450,21 +465,49 @@ private void doAppData( assert replySeq <= replyAck + replyMax; } - private void doAppAbort( + private void doAppAbortDeferring( + long traceId, + long authorization, + String16FW grpcStatus) + { + this.grpcStatus = grpcStatus != null ? grpcStatus.asString() : null; + this.state = GrpcState.closingReply(state); + + if (GrpcState.replyOpened(state)) + { + doAppAbortDeferred(traceId, authorization); + } + } + + private void doAppAbortDeferred( long traceId, long authorization) { - if (!GrpcState.replyClosed(state)) + GrpcAbortExFW abortEx = grpcStatus != null + ? grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(grpcTypeId) + .status(grpcStatus) + .build() + : grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(grpcTypeId) + .status(HEADER_VALUE_GRPC_INTERNAL_ERROR) + .build(); + + doAppAbort(traceId, authorization, abortEx); + } + + private void doAppAbort( + long traceId, + long authorization, + Flyweight extension) + { + if (GrpcState.replyOpening(state) && + !GrpcState.replyClosed(state)) { state = GrpcState.closeReply(state); - GrpcAbortExFW abortEx = grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity()) - .typeId(grpcTypeId) - .status(HEADER_VALUE_GRPC_ABORTED) - .build(); - doAbort(application, originId, routedId, replyId, replySeq, replyAck, replyMax, - traceId, authorization, abortEx); + traceId, authorization, extension); } } @@ -500,15 +543,19 @@ private void doAppWindow( private void doAppReset( long traceId, - long authorization, - Flyweight extension) + long authorization) { if (!GrpcState.initialClosed(state)) { state = GrpcState.closeInitial(state); + GrpcResetExFW resetEx = grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(grpcTypeId) + .status(HEADER_VALUE_GRPC_ABORTED) + .build(); + doReset(application, originId, routedId, initialId, initialSeq, initialAck, initialMax, - traceId, authorization, extension); + traceId, authorization, resetEx); } } } @@ -725,21 +772,13 @@ private void onNetBegin( replyMax = maximum; state = GrpcState.openingReply(state); + delegate.doAppBegin(traceId, authorization, affinity); + if (!HTTP_HEADER_VALUE_STATUS_200.equals(status) || grpcStatus != null && !HEADER_VALUE_GRPC_OK.equals(grpcStatus)) { - final String16FW newGrpcStatus = grpcStatus == null ? HEADER_VALUE_GRPC_INTERNAL_ERROR : grpcStatus; - GrpcResetExFW resetEx = grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity()) - .typeId(grpcTypeId) - .status(newGrpcStatus) - .build(); - - delegate.doAppReset(traceId, authorization, resetEx); - doNetAbort(traceId, authorization); - } - else - { - delegate.doAppBegin(traceId, authorization, affinity); + delegate.doAppAbortDeferring(traceId, authorization, grpcStatus); + doNetReset(traceId, authorization); } } @@ -823,10 +862,9 @@ private void onNetEnd( } else { - delegate.doAppAbort(traceId, authorization); + delegate.doAppAbortDeferring(traceId, authorization, + grpcStatus != null ? grpcStatus.value() : HEADER_VALUE_GRPC_INTERNAL_ERROR); } - - } private void onNetAbort( @@ -846,7 +884,7 @@ private void onNetAbort( state = GrpcState.closeReply(state); - delegate.doAppAbort(traceId, authorization); + delegate.doAppAbort(traceId, authorization, grpcAbortedStatusRO); } private void onNetReset( @@ -857,12 +895,7 @@ private void onNetReset( state = GrpcState.closeInitial(state); - GrpcResetExFW resetEx = grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity()) - .typeId(grpcTypeId) - .status(HEADER_VALUE_GRPC_ABORTED) - .build(); - - delegate.doAppReset(traceId, authorization, resetEx); + delegate.doAppReset(traceId, authorization); } private void onNetWindow( diff --git a/runtime/binding-grpc/src/test/java/io/aklivity/zilla/runtime/binding/grpc/internal/streams/client/UnaryRpcIT.java b/runtime/binding-grpc/src/test/java/io/aklivity/zilla/runtime/binding/grpc/internal/streams/client/UnaryRpcIT.java index 7855ea51e3..f110c239c6 100644 --- a/runtime/binding-grpc/src/test/java/io/aklivity/zilla/runtime/binding/grpc/internal/streams/client/UnaryRpcIT.java +++ b/runtime/binding-grpc/src/test/java/io/aklivity/zilla/runtime/binding/grpc/internal/streams/client/UnaryRpcIT.java @@ -124,4 +124,25 @@ public void serverSendsWriteAbortOnOpenRequestResponse() throws Exception } + @Test + @Configuration("client.when.yaml") + @Specification({ + "${app}/server.send.write.abort.on.open.response/client", + "${net}/response.with.grpc.error/server" + }) + public void shouldAbortResponseWithGrpcError() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("client.when.yaml") + @Specification({ + "${app}/response.missing.grpc.status/client", + "${net}/response.missing.grpc.status/server", + }) + public void shouldAbortResponseMissingGrpcStatus() throws Exception + { + k3po.finish(); + } } diff --git a/runtime/binding-kafka-grpc/src/main/java/io/aklivity/zilla/runtime/binding/kafka/grpc/internal/stream/KafkaGrpcRemoteServerFactory.java b/runtime/binding-kafka-grpc/src/main/java/io/aklivity/zilla/runtime/binding/kafka/grpc/internal/stream/KafkaGrpcRemoteServerFactory.java index baa6d57abb..f8d41b4da3 100644 --- a/runtime/binding-kafka-grpc/src/main/java/io/aklivity/zilla/runtime/binding/kafka/grpc/internal/stream/KafkaGrpcRemoteServerFactory.java +++ b/runtime/binding-kafka-grpc/src/main/java/io/aklivity/zilla/runtime/binding/kafka/grpc/internal/stream/KafkaGrpcRemoteServerFactory.java @@ -786,9 +786,9 @@ private void doKafkaEnd( if (KafkaGrpcState.initialOpened(state) && !KafkaGrpcState.initialClosed(state)) { - initialSeq = delegate.initialSeq; - initialAck = delegate.initialAck; - initialMax = delegate.initialMax; + initialSeq = delegate.replySeq; + initialAck = delegate.replyAck; + initialMax = delegate.replyMax; state = KafkaGrpcState.closeInitial(state); doKafkaTombstone(traceId, authorization, HEADER_VALUE_GRPC_OK); @@ -806,9 +806,9 @@ private void doKafkaAbort( if (KafkaGrpcState.initialOpening(state) && !KafkaGrpcState.initialClosed(state)) { - initialSeq = delegate.initialSeq; - initialAck = delegate.initialAck; - initialMax = delegate.initialMax; + initialSeq = delegate.replySeq; + initialAck = delegate.replyAck; + initialMax = delegate.replyMax; state = KafkaGrpcState.closeInitial(state); doKafkaTombstone(traceId, authorization, status); @@ -1453,7 +1453,6 @@ private void onGrpcAbort( final String16FW status = abortEx != null ? abortEx.status() : HEADER_VALUE_GRPC_ABORTED; correlater.doKafkaAbort(traceId, authorization, status); - } private void onGrpcReset( @@ -1464,6 +1463,7 @@ private void onGrpcReset( final int maximum = reset.maximum(); final long traceId = reset.traceId(); final long authorization = reset.authorization(); + final OctetsFW extension = reset.extension(); assert acknowledge <= sequence; assert sequence <= initialSeq; @@ -1472,7 +1472,7 @@ private void onGrpcReset( initialAck = acknowledge; initialMax = maximum; - state = KafkaGrpcState.closingInitial(state); + state = KafkaGrpcState.closeInitial(state); cleanup(traceId, authorization); @@ -1501,7 +1501,7 @@ private void onGrpcWindow( initialBud = budgetId; initialPad = padding; initialCap = capabilities; - state = KafkaGrpcState.openReply(state); + state = KafkaGrpcState.openInitial(state); assert initialAck <= initialSeq; @@ -1592,7 +1592,7 @@ private void doGrpcBegin( OctetsFW service, OctetsFW method) { - state = KafkaGrpcState.openingReply(state); + state = KafkaGrpcState.openingInitial(state); grpc = newGrpcStream(this::onGrpcMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId, authorization, affinity, server.condition.scheme(), server.condition.authority(), @@ -1621,7 +1621,8 @@ private void doGrpcAbort( long traceId, long authorization) { - if (KafkaGrpcState.replyOpened(state) && !KafkaGrpcState.replyClosed(state)) + if (KafkaGrpcState.initialOpening(state) && + !KafkaGrpcState.initialClosed(state)) { final GrpcAbortExFW grpcAbortEx = grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity()) @@ -1639,9 +1640,9 @@ private void doGrpcEnd( long traceId, long authorization) { - if (!KafkaGrpcState.replyClosed(state)) + if (!KafkaGrpcState.initialClosed(state)) { - state = KafkaGrpcState.closeReply(state); + state = KafkaGrpcState.closeInitial(state); doEnd(grpc, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId, authorization); @@ -1666,8 +1667,7 @@ private void doGrpcReset( long traceId, long authorization) { - if (KafkaGrpcState.replyOpening(state) && - !KafkaGrpcState.replyClosed(state)) + if (!KafkaGrpcState.replyClosed(state)) { state = KafkaGrpcState.closeReply(state); @@ -1682,6 +1682,7 @@ private void doGrpcReset( } } } + private void doBegin( MessageConsumer receiver, long originId, diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.error/client.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.error/client.rpt new file mode 100644 index 0000000000..dfc9f9f631 --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.error/client.rpt @@ -0,0 +1,42 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/grpc0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${grpc:beginEx() + .typeId(zilla:id("grpc")) + .scheme("http") + .authority("localhost:8080") + .service("example.EchoService") + .method("EchoUnary") + .metadata("custom", "test") + .metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8") + .build()} +connected + +write ${grpc:protobuf() + .string(1, "Hello World") + .build()} +write flush + +write close + +read zilla:abort.ext ${grpc:abortEx() + .typeId(zilla:id("grpc")) + .status("9") + .build()} +read aborted diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.error/server.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.error/server.rpt new file mode 100644 index 0000000000..1d240780ec --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.error/server.rpt @@ -0,0 +1,42 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/grpc0" + option zilla:window 8192 + option zilla:transmission "half-duplex" +accepted + +read zilla:begin.ext ${grpc:matchBeginEx() + .typeId(zilla:id("grpc")) + .scheme("http") + .authority("localhost:8080") + .service("example.EchoService") + .method("EchoUnary") + .metadata("custom", "test") + .metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8") + .build()} +connected + +read ${grpc:protobuf() + .string(1, "Hello World") + .build()} + +read closed + +write zilla:abort.ext ${grpc:abortEx() + .typeId(zilla:id("grpc")) + .status("9") + .build()} +write abort diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.error/client.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.error/client.rpt new file mode 100644 index 0000000000..5574e9268d --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.error/client.rpt @@ -0,0 +1,110 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("requests") + .partition(-1, -2) + .ackMode("LEADER_ONLY") + .build() + .build()} + +connected + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.EchoService") + .header("zilla:method", "EchoUnary") + .header("zilla:reply-to", "responses") + .header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-479f2c3fb58bc3f04bbe15440a657670") + .build() + .build()} +write ${grpc:protobuf() + .string(1, "Hello World") + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.EchoService") + .header("zilla:method", "EchoUnary") + .header("zilla:reply-to", "responses") + .header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-479f2c3fb58bc3f04bbe15440a657670") + .build() + .build()} + +write flush + +write close +write notify SENT_ASYNC_REQUEST +read closed + +connect await SENT_ASYNC_REQUEST + "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("responses") + .partition(-1, -2) + .filter() + .header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-479f2c3fb58bc3f04bbe15440a657670") + .build() + .build() + .build()} + +connected + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .partition(0, 1, 1) + .progress(0, 2) + .progress(1, 1) + .key("test") + .header("zilla:status", "9") + .build() + .build()} +read zilla:data.null + +read advised zilla:flush ${kafka:matchFlushEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .progress(0, 2, 2, 2) + .build() + .build()} + +write close +read closed diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.error/server.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.error/server.rpt new file mode 100644 index 0000000000..cd2dbd2925 --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.error/server.rpt @@ -0,0 +1,109 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("requests") + .partition(-1, -2) + .ackMode("LEADER_ONLY") + .build() + .build()} + +connected + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.EchoService") + .header("zilla:method", "EchoUnary") + .header("zilla:reply-to", "responses") + .header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-479f2c3fb58bc3f04bbe15440a657670") + .build() + .build()} + +read ${grpc:protobuf() + .string(1, "Hello World") + .build()} + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.EchoService") + .header("zilla:method", "EchoUnary") + .header("zilla:reply-to", "responses") + .header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-479f2c3fb58bc3f04bbe15440a657670") + .build() + .build()} +read zilla:data.null + +read closed +write close + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("FETCH_ONLY") + .topic("responses") + .partition(-1, -2) + .filter() + .header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-479f2c3fb58bc3f04bbe15440a657670") + .build() + .build() + .build()} + +connected + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .timestamp(kafka:timestamp()) + .partition(0, 1, 1) + .progress(0, 2) + .progress(1, 1) + .key("test") + .header("zilla:status", "9") + .build() + .build()} + +write flush + +write advise zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .merged() + .fetch() + .progress(0, 2, 2, 2) + .build() + .build()} + +read closed +write close diff --git a/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java b/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java index c32b62a8fa..9c90117277 100644 --- a/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java +++ b/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java @@ -98,6 +98,15 @@ public void shouldExchangeMessageInUnary() throws Exception k3po.finish(); } + @Test + @Specification({ + "${grpc}/unary.rpc.error/client", + "${grpc}/unary.rpc.error/server"}) + public void shouldRejectUnaryRpcWithError() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${grpc}/unary.rpc.rejected/client", diff --git a/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java b/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java index e2a9ff62d9..4776f9219b 100644 --- a/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java +++ b/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java @@ -107,6 +107,15 @@ public void shouldRejectUnaryRpc() throws Exception k3po.finish(); } + @Test + @Specification({ + "${kafka}/unary.rpc.error/client", + "${kafka}/unary.rpc.error/server"}) + public void shouldRejectUnaryRpcWithError() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${kafka}/unary.rpc.sent.write.abort/client", diff --git a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/response.missing.grpc.status/client.rpt b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/response.missing.grpc.status/client.rpt new file mode 100644 index 0000000000..fc13096d57 --- /dev/null +++ b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/response.missing.grpc.status/client.rpt @@ -0,0 +1,45 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${grpc:beginEx() + .typeId(zilla:id("grpc")) + .scheme("http") + .authority("localhost:8080") + .service("example.EchoService") + .method("EchoUnary") + .metadata("custom", "test") + .build()} +connected + +write ${grpc:protobuf() + .string(1, "Hello World") + .build()} +write flush + +write close + +read ${grpc:protobuf() + .string(1, "Hello World") + .build()} + +read zilla:abort.ext ${grpc:abortEx() + .typeId(zilla:id("grpc")) + .status("13") + .build()} +read aborted diff --git a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/response.missing.grpc.status/server.rpt b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/response.missing.grpc.status/server.rpt new file mode 100644 index 0000000000..2bbfdf4e81 --- /dev/null +++ b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/response.missing.grpc.status/server.rpt @@ -0,0 +1,46 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" +accepted + +read zilla:begin.ext ${grpc:matchBeginEx() + .typeId(zilla:id("grpc")) + .scheme("http") + .authority("localhost:8080") + .service("example.EchoService") + .method("EchoUnary") + .metadata("custom", "test") + .build()} +connected + +read ${grpc:protobuf() + .string(1, "Hello World") + .build()} + +read closed + +write ${grpc:protobuf() + .string(1, "Hello World") + .build()} +write flush + +write zilla:abort.ext ${grpc:abortEx() + .typeId(zilla:id("grpc")) + .status("13") + .build()} +write abort diff --git a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/server.send.write.abort.on.open.response/client.rpt b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/server.send.write.abort.on.open.response/client.rpt index 6ea70ed517..adbca04b17 100644 --- a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/server.send.write.abort.on.open.response/client.rpt +++ b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/server.send.write.abort.on.open.response/client.rpt @@ -36,7 +36,7 @@ write close read zilla:abort.ext ${grpc:abortEx() .typeId(zilla:id("grpc")) - .status("10") + .status("9") .build()} read aborted diff --git a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/server.send.write.abort.on.open.response/server.rpt b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/server.send.write.abort.on.open.response/server.rpt index cdd5453625..f7289de25b 100644 --- a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/server.send.write.abort.on.open.response/server.rpt +++ b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/application/unary.rpc/server.send.write.abort.on.open.response/server.rpt @@ -35,7 +35,7 @@ read closed write zilla:abort.ext ${grpc:abortEx() .typeId(zilla:id("grpc")) - .status("10") + .status("9") .build()} write flush diff --git a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/response.missing.grpc.status/client.rpt b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/response.missing.grpc.status/client.rpt new file mode 100644 index 0000000000..866729ae1b --- /dev/null +++ b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/response.missing.grpc.status/client.rpt @@ -0,0 +1,53 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "POST") + .header(":scheme", "http") + .header(":authority", "localhost:8080") + .header(":path", "/example.EchoService/EchoUnary") + .header("content-type", "application/grpc") + .header("te", "trailers") + .header("custom", "test") + .build()} +connected + +write ${grpc:message() + .string(1, "Hello World") + .build()} +write flush + +write close + +read zilla:begin.ext ${http:matchBeginEx() + .typeId(zilla:id("http")) + .header(":status", "200") + .header("content-type", "application/grpc") + .header("grpc-encoding", "identity") + .build()} + +read ${grpc:message() + .string(1, "Hello World") + .build()} + +read zilla:end.ext ${http:endEx() + .typeId(zilla:id("http")) + .build()} +read closed diff --git a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/response.missing.grpc.status/server.rpt b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/response.missing.grpc.status/server.rpt new file mode 100644 index 0000000000..d3ce8b85fe --- /dev/null +++ b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/response.missing.grpc.status/server.rpt @@ -0,0 +1,55 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "half-duplex" +accepted + +read zilla:begin.ext ${http:matchBeginEx() + .typeId(zilla:id("http")) + .header(":method", "POST") + .header(":scheme", "http") + .header(":authority", "localhost:8080") + .header(":path", "/example.EchoService/EchoUnary") + .header("content-type", "application/grpc") + .header("te", "trailers") + .header("custom", "test") + .build()} +connected + +read ${grpc:message() + .string(1, "Hello World") + .build()} + +read closed + +write zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "200") + .header("content-type", "application/grpc") + .header("grpc-encoding", "identity") + .build()} +write flush + +write ${grpc:message() + .string(1, "Hello World") + .build()} +write flush + +write zilla:end.ext ${http:endEx() + .typeId(zilla:id("http")) + .build()} +write close diff --git a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/response.with.grpc.error/client.rpt b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/response.with.grpc.error/client.rpt new file mode 100644 index 0000000000..ffc2ab5410 --- /dev/null +++ b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/response.with.grpc.error/client.rpt @@ -0,0 +1,47 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "POST") + .header(":scheme", "http") + .header(":authority", "localhost:8080") + .header(":path", "/example.EchoService/EchoUnary") + .header("content-type", "application/grpc") + .header("te", "trailers") + .header("custom", "test") + .build()} +connected + +write ${grpc:message() + .string(1, "Hello World") + .build()} +write flush + +write close + +read zilla:begin.ext ${http:matchBeginEx() + .typeId(zilla:id("http")) + .header(":status", "200") + .header("content-type", "application/grpc") + .header("grpc-encoding", "identity") + .header("grpc-status", "9") + .build()} + +read closed diff --git a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/response.with.grpc.error/server.rpt b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/response.with.grpc.error/server.rpt new file mode 100644 index 0000000000..85625a5ced --- /dev/null +++ b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/response.with.grpc.error/server.rpt @@ -0,0 +1,48 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "half-duplex" +accepted + +read zilla:begin.ext ${http:matchBeginEx() + .typeId(zilla:id("http")) + .header(":method", "POST") + .header(":scheme", "http") + .header(":authority", "localhost:8080") + .header(":path", "/example.EchoService/EchoUnary") + .header("content-type", "application/grpc") + .header("te", "trailers") + .header("custom", "test") + .build()} +connected + +read ${grpc:message() + .string(1, "Hello World") + .build()} + +read closed + +write zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "200") + .header("content-type", "application/grpc") + .header("grpc-encoding", "identity") + .header("grpc-status", "9") + .build()} +write flush + +write close diff --git a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/server.send.write.abort.on.open.response/client.rpt b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/server.send.write.abort.on.open.response/client.rpt index 70f18e1c69..d5af75fc51 100644 --- a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/server.send.write.abort.on.open.response/client.rpt +++ b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/server.send.write.abort.on.open.response/client.rpt @@ -45,7 +45,7 @@ read zilla:begin.ext ${http:matchBeginEx() read zilla:end.ext ${http:endEx() .typeId(zilla:id("http")) - .trailer("grpc-status", "10") + .trailer("grpc-status", "9") .build()} read closed diff --git a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/server.send.write.abort.on.open.response/server.rpt b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/server.send.write.abort.on.open.response/server.rpt index 4a815657f8..f60d03ad9d 100644 --- a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/server.send.write.abort.on.open.response/server.rpt +++ b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/streams/network/unary.rpc/server.send.write.abort.on.open.response/server.rpt @@ -46,6 +46,6 @@ write flush write zilla:end.ext ${http:endEx() .typeId(zilla:id("http")) - .trailer("grpc-status", "10") + .trailer("grpc-status", "9") .build()} write close diff --git a/specs/binding-grpc.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/streams/application/UnaryRpcIT.java b/specs/binding-grpc.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/streams/application/UnaryRpcIT.java index 95dcb86929..5621868579 100644 --- a/specs/binding-grpc.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/streams/application/UnaryRpcIT.java +++ b/specs/binding-grpc.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/streams/application/UnaryRpcIT.java @@ -94,4 +94,14 @@ public void serverSendsWriteAbortOnOpenRequestResponse() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${app}/response.missing.grpc.status/client", + "${app}/response.missing.grpc.status/server", + }) + public void shouldAbortResponseMissingGrpcStatus() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-grpc.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/streams/network/UnaryRpcIT.java b/specs/binding-grpc.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/streams/network/UnaryRpcIT.java index 2d0cc03267..8bc47c27ad 100644 --- a/specs/binding-grpc.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/streams/network/UnaryRpcIT.java +++ b/specs/binding-grpc.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/streams/network/UnaryRpcIT.java @@ -86,6 +86,26 @@ public void shouldTimeoutOnNoResponse() throws Exception k3po.finish(); } + @Test + @Specification({ + "${net}/response.with.grpc.error/client", + "${net}/response.with.grpc.error/server", + }) + public void shouldAbortResponseOnGrpcError() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${net}/response.missing.grpc.status/client", + "${net}/response.missing.grpc.status/server", + }) + public void shouldAbortResponseMissingGrpcStatus() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${net}/server.send.read.abort.on.open.request/client",