diff --git a/runtime/binding-ws/src/main/java/io/aklivity/zilla/runtime/binding/ws/internal/stream/WsClientFactory.java b/runtime/binding-ws/src/main/java/io/aklivity/zilla/runtime/binding/ws/internal/stream/WsClientFactory.java index 06afcaf08a..5ef7592981 100644 --- a/runtime/binding-ws/src/main/java/io/aklivity/zilla/runtime/binding/ws/internal/stream/WsClientFactory.java +++ b/runtime/binding-ws/src/main/java/io/aklivity/zilla/runtime/binding/ws/internal/stream/WsClientFactory.java @@ -58,6 +58,7 @@ import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.FlushFW; import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.HttpBeginExFW; import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.ResetFW; +import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.SignalFW; import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.WindowFW; import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.WsBeginExFW; import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.WsDataExFW; @@ -65,6 +66,7 @@ import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.binding.BindingHandler; import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer; +import io.aklivity.zilla.runtime.engine.concurrent.Signaler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; public final class WsClientFactory implements WsStreamFactory @@ -77,6 +79,7 @@ public final class WsClientFactory implements WsStreamFactory private static final String WEBSOCKET_UPGRADE = "websocket"; private static final String WEBSOCKET_VERSION_13 = "13"; private static final int MAXIMUM_HEADER_SIZE = 14; + private static final int PONG_SIGNAL_ID = 1; private static final DirectBuffer CLOSE_PAYLOAD = new UnsafeBuffer(new byte[0]); @@ -84,6 +87,7 @@ public final class WsClientFactory implements WsStreamFactory private final BeginFW beginRO = new BeginFW(); private final DataFW dataRO = new DataFW(); + private final SignalFW signalRO = new SignalFW(); private final EndFW endRO = new EndFW(); private final AbortFW abortRO = new AbortFW(); private final FlushFW flushRO = new FlushFW(); @@ -106,6 +110,8 @@ public final class WsClientFactory implements WsStreamFactory private final ResetFW.Builder resetRW = new ResetFW.Builder(); private final ChallengeFW.Builder challengeRW = new ChallengeFW.Builder(); + private final OctetsFW.Builder payloadRW = new OctetsFW.Builder(); + private final OctetsFW payloadRO = new OctetsFW(); private final HttpBeginExFW httpBeginExRO = new HttpBeginExFW(); @@ -118,9 +124,11 @@ public final class WsClientFactory implements WsStreamFactory private final WsHeaderFW.Builder wsHeaderRW = new WsHeaderFW.Builder(); private final MutableDirectBuffer writeBuffer; + private final MutableDirectBuffer extBuffer; private final BindingHandler streamFactory; private final LongUnaryOperator supplyInitialId; private final LongUnaryOperator supplyReplyId; + private final Signaler signaler; private final Long2ObjectHashMap bindings; private final int wsTypeId; @@ -131,10 +139,12 @@ public WsClientFactory( EngineContext context) { this.writeBuffer = context.writeBuffer(); + this.extBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]); this.streamFactory = context.streamFactory(); this.supplyInitialId = context::supplyInitialId; this.supplyReplyId = context::supplyReplyId; this.bindings = new Long2ObjectHashMap<>(); + this.signaler = context.signaler(); this.wsTypeId = context.supplyTypeId(WsBinding.NAME); this.httpTypeId = context.supplyTypeId("http"); } @@ -713,6 +723,7 @@ private final class WsClient private int statusLength; private MutableDirectBuffer status; + private int pingReceived; private WsClient( long originId, @@ -931,6 +942,10 @@ private void onNetMessage( final DataFW data = dataRO.wrap(buffer, index, index + length); onNetData(data); break; + case SignalFW.TYPE_ID: + final SignalFW signal = signalRO.wrap(buffer, index, index + length); + onNetSignal(signal); + break; case EndFW.TYPE_ID: final EndFW end = endRO.wrap(buffer, index, index + length); onNetEnd(end); @@ -1069,6 +1084,23 @@ private void onNetData( } } + private void onNetSignal( + SignalFW signal) + { + final int signalId = signal.signalId(); + final long traceId = signal.traceId(); + final OctetsFW payload = signal.payload(); + + assert signalId == PONG_SIGNAL_ID; + + if (--pingReceived == 0) + { + final int reserved = payload.sizeof() + MAXIMUM_HEADER_SIZE + replyPad; + + doNetData(traceId, decodeAuthorization, initialBudgetId, reserved, payload, 0x8a); + } + } + private void onNetEnd( EndFW end) { @@ -1251,6 +1283,9 @@ private int decodeHeader( case 0x08: this.decodeState = this::decodeClose; break; + case 0x09: + this.decodeState = this::decodePing; + break; case 0x0a: this.decodeState = this::decodePong; break; @@ -1395,6 +1430,44 @@ private int decodeClose( } } + private int decodePing( + final DirectBuffer buffer, + final int offset, + final int length) + { + if (payloadLength > MAXIMUM_CONTROL_FRAME_PAYLOAD_SIZE) + { + doNetReset(decodeTraceId, decodeAuthorization); + doAppAbort(decodeTraceId, decodeAuthorization, STATUS_PROTOCOL_ERROR); + return length; + } + else + { + final int decodeBytes = Math.min(length, payloadLength - payloadProgress); + + OctetsFW payload = payloadRO.wrap(buffer, offset, offset + decodeBytes); + + OctetsFW.Builder payloadBuilder = payloadRW.wrap(extBuffer, 0, extBuffer.capacity()); + payloadBuilder.set(payload); + xor(extBuffer, 0, payload.sizeof(), maskingKey); + OctetsFW unmaskedPayload = payloadBuilder.build(); + + pingReceived++; + signaler.signalNow(originId, routedId, initialId, decodeTraceId, PONG_SIGNAL_ID, 0, + unmaskedPayload.value(), 0, unmaskedPayload.sizeof()); + + payloadProgress += decodeBytes; + maskingKey = rotateMaskingKey(maskingKey, decodeBytes); + + if (payloadProgress == payloadLength) + { + this.decodeState = this::decodeHeader; + } + + return decodeBytes; + } + } + private int decodePong( final DirectBuffer buffer, final int offset, diff --git a/runtime/binding-ws/src/main/java/io/aklivity/zilla/runtime/binding/ws/internal/stream/WsServerFactory.java b/runtime/binding-ws/src/main/java/io/aklivity/zilla/runtime/binding/ws/internal/stream/WsServerFactory.java index 2e2d1071b9..0a061797e3 100644 --- a/runtime/binding-ws/src/main/java/io/aklivity/zilla/runtime/binding/ws/internal/stream/WsServerFactory.java +++ b/runtime/binding-ws/src/main/java/io/aklivity/zilla/runtime/binding/ws/internal/stream/WsServerFactory.java @@ -19,6 +19,7 @@ import static io.aklivity.zilla.runtime.binding.ws.internal.types.codec.WsHeaderFW.STATUS_PROTOCOL_ERROR; import static io.aklivity.zilla.runtime.binding.ws.internal.types.codec.WsHeaderFW.STATUS_UNEXPECTED_CONDITION; import static io.aklivity.zilla.runtime.binding.ws.internal.util.WsMaskUtil.xor; +import static io.aklivity.zilla.runtime.engine.concurrent.Signaler.NO_CANCEL_ID; import static java.nio.ByteOrder.BIG_ENDIAN; import static java.nio.ByteOrder.nativeOrder; import static java.nio.charset.StandardCharsets.US_ASCII; @@ -56,6 +57,7 @@ import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.FlushFW; import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.HttpBeginExFW; import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.ResetFW; +import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.SignalFW; import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.WindowFW; import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.WsBeginExFW; import io.aklivity.zilla.runtime.binding.ws.internal.types.stream.WsDataExFW; @@ -63,6 +65,7 @@ import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.binding.BindingHandler; import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer; +import io.aklivity.zilla.runtime.engine.concurrent.Signaler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; public final class WsServerFactory implements WsStreamFactory @@ -74,12 +77,15 @@ public final class WsServerFactory implements WsStreamFactory private static final String WEBSOCKET_VERSION_13 = "13"; private static final int MAXIMUM_HEADER_SIZE = 14; + private static final int PONG_SIGNAL_ID = 1; + private static final DirectBuffer CLOSE_PAYLOAD = new UnsafeBuffer(new byte[0]); private final MessageDigest sha1 = initSHA1(); private final BeginFW beginRO = new BeginFW(); private final DataFW dataRO = new DataFW(); + private final SignalFW signalRO = new SignalFW(); private final EndFW endRO = new EndFW(); private final AbortFW abortRO = new AbortFW(); private final FlushFW flushRO = new FlushFW(); @@ -98,6 +104,7 @@ public final class WsServerFactory implements WsStreamFactory private final WsDataExFW.Builder wsDataExRW = new WsDataExFW.Builder(); private final WsEndExFW.Builder wsEndExRW = new WsEndExFW.Builder(); + private final OctetsFW.Builder payloadRW = new OctetsFW.Builder(); private final WindowFW.Builder windowRW = new WindowFW.Builder(); private final ResetFW.Builder resetRW = new ResetFW.Builder(); private final ChallengeFW.Builder challengeRW = new ChallengeFW.Builder(); @@ -113,9 +120,11 @@ public final class WsServerFactory implements WsStreamFactory private final WsHeaderFW.Builder wsHeaderRW = new WsHeaderFW.Builder(); private final MutableDirectBuffer writeBuffer; + private final MutableDirectBuffer extBuffer; private final BindingHandler streamFactory; private final LongUnaryOperator supplyInitialId; private final LongUnaryOperator supplyReplyId; + private final Signaler signaler; private final Long2ObjectHashMap bindings; private final int wsTypeId; @@ -126,10 +135,12 @@ public WsServerFactory( EngineContext context) { this.writeBuffer = context.writeBuffer(); + this.extBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]); this.streamFactory = context.streamFactory(); this.supplyInitialId = context::supplyInitialId; this.supplyReplyId = context::supplyReplyId; this.bindings = new Long2ObjectHashMap<>(); + this.signaler = context.signaler(); this.wsTypeId = context.supplyTypeId(WsBinding.NAME); this.httpTypeId = context.supplyTypeId("http"); } @@ -284,6 +295,9 @@ private final class WsServer private int replyMax; private int replyPad; + private long pongId = NO_CANCEL_ID; + private int pingReceived; + private WsServer( MessageConsumer receiver, long originId, @@ -506,6 +520,10 @@ private void onNetMessage( final DataFW data = dataRO.wrap(buffer, index, index + length); onNetData(data); break; + case SignalFW.TYPE_ID: + final SignalFW signal = signalRO.wrap(buffer, index, index + length); + onNetSignal(signal); + break; case EndFW.TYPE_ID: final EndFW end = endRO.wrap(buffer, index, index + length); onNetEnd(end); @@ -600,6 +618,23 @@ private void onNetData( } } + private void onNetSignal( + SignalFW signal) + { + final int signalId = signal.signalId(); + final long traceId = signal.traceId(); + final OctetsFW payload = signal.payload(); + + assert signalId == PONG_SIGNAL_ID; + + if (--pingReceived == 0) + { + final int reserved = payload.sizeof() + MAXIMUM_HEADER_SIZE + replyPad; + + doNetData(traceId, decodeAuthorization, replyBudgetId, reserved, payload, 0x8a); + } + } + private void onNetEnd( EndFW end) { @@ -747,6 +782,9 @@ private int decodeHeader( case 0x08: this.decodeState = this::decodeClose; break; + case 0x09: + this.decodeState = this::decodePing; + break; case 0x0a: this.decodeState = this::decodePong; break; @@ -904,6 +942,44 @@ private int decodePong( } } + private int decodePing( + final DirectBuffer buffer, + final int offset, + final int length) + { + if (payloadLength > MAXIMUM_CONTROL_FRAME_PAYLOAD_SIZE) + { + doNetReset(decodeTraceId, decodeAuthorization); + stream.doAppAbort(decodeTraceId, decodeAuthorization, STATUS_PROTOCOL_ERROR); + return length; + } + else + { + final int decodeBytes = Math.min(length, payloadLength - payloadProgress); + + OctetsFW payload = payloadRO.wrap(buffer, offset, offset + decodeBytes); + + OctetsFW.Builder payloadBuilder = payloadRW.wrap(extBuffer, 0, extBuffer.capacity()); + payloadBuilder.set(payload); + xor(extBuffer, 0, payload.sizeof(), maskingKey); + OctetsFW unmaskedPayload = payloadBuilder.build(); + + pingReceived++; + signaler.signalNow(originId, routedId, replyId, decodeTraceId, PONG_SIGNAL_ID, 0, + unmaskedPayload.value(), 0, unmaskedPayload.sizeof()); + + payloadProgress += decodeBytes; + maskingKey = rotateMaskingKey(maskingKey, decodeBytes); + + if (payloadProgress == payloadLength) + { + this.decodeState = this::decodeHeader; + } + + return decodeBytes; + } + } + private int decodeUnexpected( final DirectBuffer directBuffer, final int offset, diff --git a/runtime/binding-ws/src/test/java/io/aklivity/zilla/runtime/binding/ws/internal/streams/client/ControlIT.java b/runtime/binding-ws/src/test/java/io/aklivity/zilla/runtime/binding/ws/internal/streams/client/ControlIT.java new file mode 100644 index 0000000000..70b48b76be --- /dev/null +++ b/runtime/binding-ws/src/test/java/io/aklivity/zilla/runtime/binding/ws/internal/streams/client/ControlIT.java @@ -0,0 +1,70 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.ws.internal.streams.client; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.rules.RuleChain.outerRule; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; +import org.kaazing.k3po.junit.annotation.Specification; +import org.kaazing.k3po.junit.rules.K3poRule; + +import io.aklivity.zilla.runtime.engine.test.EngineRule; +import io.aklivity.zilla.runtime.engine.test.annotation.Configuration; + +public class ControlIT +{ + private final K3poRule k3po = new K3poRule() + .addScriptRoot("net", "io/aklivity/zilla/specs/binding/ws/streams/network/control") + .addScriptRoot("app", "io/aklivity/zilla/specs/binding/ws/streams/application/control"); + + private final TestRule timeout = new DisableOnDebug(new Timeout(10, SECONDS)); + + private final EngineRule engine = new EngineRule() + .directory("target/zilla-itests") + .countersBufferCapacity(4096) + .configurationRoot("io/aklivity/zilla/specs/binding/ws/config") + .external("net0") + .clean(); + + @Rule + public final TestRule chain = outerRule(engine).around(k3po).around(timeout); + + + @Test + @Configuration("client.when.yaml") + @Specification({ + "${app}/server.send.ping/handshake.request", + "${net}/server.send.ping.payload.length.0/handshake.response.and.frame" }) + public void shouldReceiveClientPingFrameWithEmptyPayload() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("client.when.yaml") + @Specification({ + "${app}/server.send.ping/handshake.request", + "${net}/server.send.ping.payload.length.125/handshake.response.and.frame" }) + public void shouldReceiveClientPingFrameWithPayload() throws Exception + { + k3po.finish(); + } +} diff --git a/runtime/binding-ws/src/test/java/io/aklivity/zilla/runtime/binding/ws/internal/streams/server/ControlIT.java b/runtime/binding-ws/src/test/java/io/aklivity/zilla/runtime/binding/ws/internal/streams/server/ControlIT.java index b56df19049..85d713574a 100644 --- a/runtime/binding-ws/src/test/java/io/aklivity/zilla/runtime/binding/ws/internal/streams/server/ControlIT.java +++ b/runtime/binding-ws/src/test/java/io/aklivity/zilla/runtime/binding/ws/internal/streams/server/ControlIT.java @@ -78,4 +78,24 @@ public void shouldRejectClientPongFrameWithPayloadTooLong() throws Exception { k3po.finish(); } + + @Test + @Configuration("server.yaml") + @Specification({ + "${net}/client.send.ping.payload.length.0/handshake.request.and.frame", + "${app}/client.send.ping.payload.length.0/handshake.response" }) + public void shouldReceiveClientPingFrameWithEmptyPayload() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("server.yaml") + @Specification({ + "${net}/client.send.ping.payload.length.125/handshake.request.and.frame", + "${app}/client.send.ping.payload.length.125/handshake.response" }) + public void shouldReceiveClientPingFrameWithPayload() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/invalid.session.timeout/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/invalid.session.timeout/client.rpt index 8b7cd79203..b8c4ca1662 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/invalid.session.timeout/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/invalid.session.timeout/client.rpt @@ -101,7 +101,7 @@ write 82 # size ${newRequestId} 5s "zilla" # client id 4s "test" # consumer group - 0 # session timeout + 6000 # session timeout 4000 # rebalance timeout 0s # consumer group member 5s "zilla" # group instance id diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/invalid.session.timeout/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/invalid.session.timeout/server.rpt index 040f4bd4dd..e2f1393ede 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/invalid.session.timeout/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/invalid.session.timeout/server.rpt @@ -92,7 +92,7 @@ read 82 # size (int:newRequestId) 5s "zilla" # client id 4s "test" # consumer group - 0 # session timeout + 6000 # session timeout 4000 # rebalance timeout 0s # consumer group member 5s "zilla" # group instance id diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/client.send.ping.payload.length.0/handshake.request.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/client.send.ping.payload.length.0/handshake.request.rpt new file mode 100644 index 0000000000..05a0ac9046 --- /dev/null +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/client.send.ping.payload.length.0/handshake.request.rpt @@ -0,0 +1,23 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +connected + +# connection established diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/client.send.ping.payload.length.0/handshake.response.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/client.send.ping.payload.length.0/handshake.response.rpt new file mode 100644 index 0000000000..169c951a6a --- /dev/null +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/client.send.ping.payload.length.0/handshake.response.rpt @@ -0,0 +1,24 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" +accepted + +connected + +# connection established diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/client.send.ping.payload.length.125/handshake.request.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/client.send.ping.payload.length.125/handshake.request.rpt new file mode 100644 index 0000000000..05a0ac9046 --- /dev/null +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/client.send.ping.payload.length.125/handshake.request.rpt @@ -0,0 +1,23 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +connected + +# connection established diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/client.send.ping.payload.length.125/handshake.response.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/client.send.ping.payload.length.125/handshake.response.rpt new file mode 100644 index 0000000000..169c951a6a --- /dev/null +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/client.send.ping.payload.length.125/handshake.response.rpt @@ -0,0 +1,24 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" +accepted + +connected + +# connection established diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/server.send.ping/handshake.request.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/server.send.ping/handshake.request.rpt new file mode 100644 index 0000000000..05a0ac9046 --- /dev/null +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/server.send.ping/handshake.request.rpt @@ -0,0 +1,23 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +connected + +# connection established diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/server.send.ping/handshake.response.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/server.send.ping/handshake.response.rpt new file mode 100644 index 0000000000..169c951a6a --- /dev/null +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/application/control/server.send.ping/handshake.response.rpt @@ -0,0 +1,24 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" +accepted + +connected + +# connection established diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.0/handshake.request.and.frame.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.0/handshake.request.and.frame.rpt index 6819239f08..2365e61380 100644 --- a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.0/handshake.request.and.frame.rpt +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.0/handshake.request.and.frame.rpt @@ -14,25 +14,34 @@ # under the License. # -property key ${ws:handshakeKey()} +property newHandshakeKey ${ws:handshakeKey()} property writeMask ${http:randomBytes(4)} -connect "http://localhost:8080/echo" -connected +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "GET") + .header(":scheme", "http") + .header(":authority", "localhost:8080") + .header(":path", "/echo") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-key", newHandshakeKey) + .header("sec-websocket-version", "13") + .build()} -write http:method "GET" -write http:version "HTTP/1.1" -write http:host -write http:header "Upgrade" "websocket" -write http:header "Connection" "Upgrade" -write http:header "Sec-WebSocket-Key" ${key} -write http:header "Sec-WebSocket-Version" "13" +connected -read http:status "101" /.+/ -read http:version "HTTP/1.1" -read http:header "Upgrade" /(?i:websocket)/ -read http:header "Connection" /(?i:Upgrade)/ -read http:header "Sec-WebSocket-Accept" ${ws:handshakeHash(key)} +read zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "101") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-accept", ws:handshakeHash(newHandshakeKey)) + .build()} write [0x89 0x80] ${writeMask} diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.0/handshake.response.and.frame.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.0/handshake.response.and.frame.rpt index e9a4f71822..791956c8cc 100644 --- a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.0/handshake.response.and.frame.rpt +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.0/handshake.response.and.frame.rpt @@ -14,23 +14,33 @@ # under the License. # -accept "http://localhost:8080/echo" +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "half-duplex" accepted -connected -read http:method "GET" -read http:version "HTTP/1.1" -read http:header "Host" "localhost:8080" -read http:header "Upgrade" /(?i:websocket)/ -read http:header "Connection" /(?i:Upgrade)/ -read http:header "Sec-WebSocket-Key" /(?[a-zA-Z0-9+\/=]{24})/ -read http:header "Sec-WebSocket-Version" "13" +read zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "GET") + .header(":scheme", "http") + .header(":authority", "localhost:8080") + .header(":path", "/echo") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-key", newHandshakeKey) + .header("sec-websocket-version", "13") + .build()} + +connected -write http:status "101" "Switching Protocols" -write http:version "HTTP/1.1" -write http:header "Upgrade" "websocket" -write http:header "Connection" "Upgrade" -write http:header "Sec-WebSocket-Accept" ${ws:handshakeHash(key)} +write zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "101") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-accept", ws:handshakeHash(newHandshakeKey)) + .build()} +write flush read [0x89 0x80] [0..4] diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.125/handshake.request.and.frame.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.125/handshake.request.and.frame.rpt index 629496478b..6f0bba4790 100644 --- a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.125/handshake.request.and.frame.rpt +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.125/handshake.request.and.frame.rpt @@ -14,26 +14,35 @@ # under the License. # -property key ${ws:handshakeKey()} +property newHandshakeKey ${ws:handshakeKey()} property writeMask ${http:randomBytes(4)} property client125 ${http:randomBytesUTF8(125)} -connect "http://localhost:8080/echo" -connected +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "GET") + .header(":scheme", "http") + .header(":authority", "localhost:8080") + .header(":path", "/echo") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-key", newHandshakeKey) + .header("sec-websocket-version", "13") + .build()} -write http:method "GET" -write http:version "HTTP/1.1" -write http:host -write http:header "Upgrade" "websocket" -write http:header "Connection" "Upgrade" -write http:header "Sec-WebSocket-Key" ${key} -write http:header "Sec-WebSocket-Version" "13" +connected -read http:status "101" /.+/ -read http:version "HTTP/1.1" -read http:header "Upgrade" /(?i:websocket)/ -read http:header "Connection" /(?i:Upgrade)/ -read http:header "Sec-WebSocket-Accept" ${ws:handshakeHash(key)} +read zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "101") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-accept", ws:handshakeHash(newHandshakeKey)) + .build()} write [0x89 0xfd] ${writeMask} write option mask ${writeMask} diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.125/handshake.response.and.frame.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.125/handshake.response.and.frame.rpt index 6769138638..ee9ce5d52a 100644 --- a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.125/handshake.response.and.frame.rpt +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/client.send.ping.payload.length.125/handshake.response.and.frame.rpt @@ -14,27 +14,37 @@ # under the License. # -accept "http://localhost:8080/echo" +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "half-duplex" accepted -connected -read http:method "GET" -read http:version "HTTP/1.1" -read http:header "Host" "localhost:8080" -read http:header "Upgrade" /(?i:websocket)/ -read http:header "Connection" /(?i:Upgrade)/ -read http:header "Sec-WebSocket-Key" /(?[a-zA-Z0-9+\/=]{24})/ -read http:header "Sec-WebSocket-Version" "13" +read zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "GET") + .header(":scheme", "http") + .header(":authority", "localhost:8080") + .header(":path", "/echo") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-key", newHandshakeKey) + .header("sec-websocket-version", "13") + .build()} + +connected -write http:status "101" "Switching Protocols" -write http:version "HTTP/1.1" -write http:header "Upgrade" "websocket" -write http:header "Connection" "Upgrade" -write http:header "Sec-WebSocket-Accept" ${ws:handshakeHash(key)} +write zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "101") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-accept", ws:handshakeHash(newHandshakeKey)) + .build()} +write flush -read [0x89 0xfd] ([0..4] :readMask) -read option mask ${readMask} -read ([0..125] :server125) +read [0x89 0xfd] ([0..4] :writeMask) +read option mask ${writeMask} +read ([0..125] :client125) read option mask [0x00 0x00 0x00 0x00] -write [0x8a 0x7d] ${server125} +write [0x8a 0x7d] ${client125} diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.0/handshake.request.and.frame.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.0/handshake.request.and.frame.rpt index 0699c728f5..154a2ef9f0 100644 --- a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.0/handshake.request.and.frame.rpt +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.0/handshake.request.and.frame.rpt @@ -14,25 +14,37 @@ # under the License. # -property key ${ws:handshakeKey()} +property location 'http://localhost:8080/echo' +property newHandshakeKey ${ws:handshakeKey()} property writeMask ${http:randomBytes(4)} -connect "http://localhost:8080/echo" -connected +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "GET") + .header(":scheme", "http") + .header(":authority", "localhost:8080") + .header(":path", "/echo") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-key", newHandshakeKey) + .header("sec-websocket-version", "13") + .header("sec-websocket-protocol", "primary") + .build()} -write http:method "GET" -write http:version "HTTP/1.1" -write http:host -write http:header "Upgrade" "websocket" -write http:header "Connection" "Upgrade" -write http:header "Sec-WebSocket-Key" ${key} -write http:header "Sec-WebSocket-Version" "13" +read zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "101") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-accept", ws:handshakeHash(newHandshakeKey)) + .header("sec-websocket-protocol", "primary") + .build()} -read http:status "101" /.+/ -read http:version "HTTP/1.1" -read http:header "Upgrade" /(?i:websocket)/ -read http:header "Connection" /(?i:Upgrade)/ -read http:header "Sec-WebSocket-Accept" ${ws:handshakeHash(key)} +connected read [0x89 0x00] diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.0/handshake.response.and.frame.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.0/handshake.response.and.frame.rpt index 4c967d6f26..3f79e26f32 100644 --- a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.0/handshake.response.and.frame.rpt +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.0/handshake.response.and.frame.rpt @@ -14,23 +14,34 @@ # under the License. # -accept "http://localhost:8080/echo" +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" accepted -connected -read http:method "GET" -read http:version "HTTP/1.1" -read http:header "Host" "localhost:8080" -read http:header "Upgrade" /(?i:websocket)/ -read http:header "Connection" /(?i:Upgrade)/ -read http:header "Sec-WebSocket-Key" /(?[a-zA-Z0-9+\/=]{24})/ -read http:header "Sec-WebSocket-Version" "13" +read zilla:begin.ext ${zilla:id("http")} + [0xd4 0x00 0x00 0x00] + [0x09 0x00 0x00 0x00] + [0x07] ":method" [0x03 0x00] "GET" + [0x07] ":scheme" [0x04 0x00] "http" + [0x0a] ":authority" [0x0e 0x00] "localhost:8080" + [0x05] ":path" [0x05 0x00] "/echo" + [0x07] "upgrade" [0x09 0x00] "websocket" + [0x0a] "connection" [0x07 0x00] "upgrade" + [0x11] "sec-websocket-key" [0x18 0x00] /(?[a-zA-Z0-9+\/=]{24})/ + [0x15] "sec-websocket-version" [0x02 0x00] "13" + [0x16] "sec-websocket-protocol" [0x07 0x00] "primary" + +write zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "101") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-accept", ws:handshakeHash(key)) + .header("sec-websocket-protocol", "primary") + .build()} -write http:status "101" "Switching Protocols" -write http:version "HTTP/1.1" -write http:header "Upgrade" "websocket" -write http:header "Connection" "Upgrade" -write http:header "Sec-WebSocket-Accept" ${ws:handshakeHash(key)} +connected write [0x89 0x00] diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.125/handshake.request.and.frame.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.125/handshake.request.and.frame.rpt index a47c42bf9a..80d5d6316d 100644 --- a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.125/handshake.request.and.frame.rpt +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.125/handshake.request.and.frame.rpt @@ -14,25 +14,37 @@ # under the License. # -property key ${ws:handshakeKey()} +property newHandshakeKey ${ws:handshakeKey()} property writeMask ${http:randomBytes(4)} -connect "http://localhost:8080/echo" +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":method", "GET") + .header(":scheme", "http") + .header(":authority", "localhost:8080") + .header(":path", "/echo") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-key", newHandshakeKey) + .header("sec-websocket-version", "13") + .header("sec-websocket-protocol", "primary") + .build()} + +read zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "101") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-accept", ws:handshakeHash(newHandshakeKey)) + .header("sec-websocket-protocol", "primary") + .build()} + connected -write http:method "GET" -write http:version "HTTP/1.1" -write http:host -write http:header "Upgrade" "websocket" -write http:header "Connection" "Upgrade" -write http:header "Sec-WebSocket-Key" ${key} -write http:header "Sec-WebSocket-Version" "13" - -read http:status "101" /.+/ -read http:version "HTTP/1.1" -read http:header "Upgrade" /(?i:websocket)/ -read http:header "Connection" /(?i:Upgrade)/ -read http:header "Sec-WebSocket-Accept" ${ws:handshakeHash(key)} read [0x89 0x7d] ([0..125] :client125) diff --git a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.125/handshake.response.and.frame.rpt b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.125/handshake.response.and.frame.rpt index 87dc012b1a..0af0bd8683 100644 --- a/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.125/handshake.response.and.frame.rpt +++ b/specs/binding-ws.spec/src/main/scripts/io/aklivity/zilla/specs/binding/ws/streams/network/control/server.send.ping.payload.length.125/handshake.response.and.frame.rpt @@ -16,23 +16,34 @@ property server125 ${http:randomBytesUTF8(125)} -accept "http://localhost:8080/echo" +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" accepted -connected -read http:method "GET" -read http:version "HTTP/1.1" -read http:header "Host" "localhost:8080" -read http:header "Upgrade" /(?i:websocket)/ -read http:header "Connection" /(?i:Upgrade)/ -read http:header "Sec-WebSocket-Key" /(?[a-zA-Z0-9+\/=]{24})/ -read http:header "Sec-WebSocket-Version" "13" +read zilla:begin.ext ${zilla:id("http")} + [0xd4 0x00 0x00 0x00] + [0x09 0x00 0x00 0x00] + [0x07] ":method" [0x03 0x00] "GET" + [0x07] ":scheme" [0x04 0x00] "http" + [0x0a] ":authority" [0x0e 0x00] "localhost:8080" + [0x05] ":path" [0x05 0x00] "/echo" + [0x07] "upgrade" [0x09 0x00] "websocket" + [0x0a] "connection" [0x07 0x00] "upgrade" + [0x11] "sec-websocket-key" [0x18 0x00] /(?[a-zA-Z0-9+\/=]{24})/ + [0x15] "sec-websocket-version" [0x02 0x00] "13" + [0x16] "sec-websocket-protocol" [0x07 0x00] "primary" + +write zilla:begin.ext ${http:beginEx() + .typeId(zilla:id("http")) + .header(":status", "101") + .header("upgrade", "websocket") + .header("connection", "upgrade") + .header("sec-websocket-accept", ws:handshakeHash(key)) + .header("sec-websocket-protocol", "primary") + .build()} -write http:status "101" "Switching Protocols" -write http:version "HTTP/1.1" -write http:header "Upgrade" "websocket" -write http:header "Connection" "Upgrade" -write http:header "Sec-WebSocket-Accept" ${ws:handshakeHash(key)} +connected write [0x89 0x7d] ${server125}