diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java index 7ba2924ddb..126c431fc8 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientGroupFactory.java @@ -359,14 +359,14 @@ public MessageConsumer newStream( timeout, groupMembership, sasl); - newStream = newGroup::onApplication; + newStream = newGroup::onStream; groupStreams.put(groupId, newGroup); } else if (HIGHLANDER_PROTOCOL.equals(protocol)) { - group.onApplicationMigrate(begin, application); - newStream = group::onApplication; + group.onStreamMigrate(begin, application); + newStream = group::onStream; } } } @@ -1229,7 +1229,7 @@ private final class KafkaGroupStream private final String protocol; private final long resolvedId; - private MessageConsumer application; + private MessageConsumer sender; private String host; private String nodeId; private int port; @@ -1257,7 +1257,7 @@ private final class KafkaGroupStream private int topicMetadataLimit; KafkaGroupStream( - MessageConsumer application, + MessageConsumer sender, long originId, long routedId, long initialId, @@ -1269,7 +1269,7 @@ private final class KafkaGroupStream GroupMembership groupMembership, KafkaSaslConfig sasl) { - this.application = application; + this.sender = sender; this.originId = originId; this.routedId = routedId; this.initialId = initialId; @@ -1286,7 +1286,7 @@ private final class KafkaGroupStream this.metadataBuffer = new UnsafeBuffer(new byte[2048]); } - private void onApplication( + private void onStream( int msgTypeId, DirectBuffer buffer, int index, @@ -1296,38 +1296,38 @@ private void onApplication( { case BeginFW.TYPE_ID: final BeginFW begin = beginRO.wrap(buffer, index, index + length); - onApplicationBegin(begin); + onStreamBegin(begin); break; case DataFW.TYPE_ID: final DataFW data = dataRO.wrap(buffer, index, index + length); - onApplicationData(data); + onStreamData(data); break; case FlushFW.TYPE_ID: final FlushFW flush = flushRO.wrap(buffer, index, index + length); - onApplicationFlush(flush); + onStreamFlush(flush); break; case EndFW.TYPE_ID: final EndFW end = endRO.wrap(buffer, index, index + length); - onApplicationEnd(end); + onStreamEnd(end); break; case AbortFW.TYPE_ID: final AbortFW abort = abortRO.wrap(buffer, index, index + length); - onApplicationAbort(abort); + onStreamAbort(abort); break; case WindowFW.TYPE_ID: final WindowFW window = windowRO.wrap(buffer, index, index + length); - onApplicationWindow(window); + onStreamWindow(window); break; case ResetFW.TYPE_ID: final ResetFW reset = resetRO.wrap(buffer, index, index + length); - onApplicationReset(reset); + onStreamReset(reset); break; default: break; } } - private void onApplicationBegin( + private void onStreamBegin( BeginFW begin) { final long traceId = begin.traceId(); @@ -1356,10 +1356,10 @@ private void onApplicationBegin( clusterClient.doNetworkBeginIfNecessary(traceId, authorization, affinity); } - doApplicationWindow(traceId, 0L, 0, 0, 0); + doStreamWindow(traceId, 0L, 0, 0, 0); } - private void onApplicationData( + private void onStreamData( DataFW data) { final long traceId = data.traceId(); @@ -1368,7 +1368,7 @@ private void onApplicationData( coordinatorClient.doSyncRequest(traceId, budgetId, data.payload()); } - private void onApplicationEnd( + private void onStreamEnd( EndFW end) { final long traceId = end.traceId(); @@ -1378,7 +1378,7 @@ private void onApplicationEnd( coordinatorClient.doLeaveGroupRequest(traceId); } - private void onApplicationFlush( + private void onStreamFlush( FlushFW flush) { final long sequence = flush.sequence(); @@ -1427,7 +1427,7 @@ private void onApplicationFlush( } } - private void onApplicationAbort( + private void onStreamAbort( AbortFW abort) { final long traceId = abort.traceId(); @@ -1438,10 +1438,10 @@ private void onApplicationAbort( clusterClient.doNetworkAbort(traceId); coordinatorClient.doNetworkAbort(traceId); - cleanupApplication(traceId, EMPTY_OCTETS); + cleanupStream(traceId, ERROR_NONE); } - private void onApplicationWindow( + private void onStreamWindow( WindowFW window) { final long sequence = window.sequence(); @@ -1463,7 +1463,7 @@ private void onApplicationWindow( assert replyAck <= replySeq; } - private void onApplicationReset( + private void onStreamReset( ResetFW reset) { final long traceId = reset.traceId(); @@ -1473,22 +1473,22 @@ private void onApplicationReset( clusterClient.doNetworkReset(traceId); } - private boolean isApplicationReplyOpen() + private boolean isStreamReplyOpen() { return KafkaState.replyOpening(state); } - private void doApplicationBeginIfNecessary( + private void doStreamBeginIfNecessary( long traceId, long authorization) { if (!KafkaState.replyOpening(state)) { - doApplicationBegin(traceId, authorization); + doStreamBegin(traceId, authorization); } } - private void doApplicationBegin( + private void doStreamBegin( long traceId, long authorization) { @@ -1503,11 +1503,11 @@ private void doApplicationBegin( .timeout(timeout)) .build(); - doBegin(application, originId, routedId, replyId, replySeq, replyAck, replyMax, + doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, authorization, affinity, kafkaBeginEx); } - private void doApplicationData( + private void doStreamData( long traceId, long authorization, DirectBuffer buffer, @@ -1518,12 +1518,12 @@ private void doApplicationData( if (length > 0) { - doData(application, originId, routedId, replyId, replySeq, replyAck, replyMax, + doData(sender, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, authorization, replyBudgetId, reserved, buffer, offset, length, EMPTY_EXTENSION); } else { - doDataEmpty(application, originId, routedId, replyId, replySeq, replyAck, replyMax, + doDataEmpty(sender, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, authorization, replyBudgetId, reserved, EMPTY_EXTENSION); } @@ -1532,7 +1532,7 @@ private void doApplicationData( assert replyAck <= replySeq; } - private void doApplicationFlush( + private void doStreamFlush( long traceId, long authorization, Consumer extension) @@ -1541,34 +1541,34 @@ private void doApplicationFlush( { final int reserved = replyPad; - doFlush(application, originId, routedId, replyId, replySeq, replyAck, replyMax, + doFlush(sender, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, authorization, replyBudgetId, reserved, extension); } } - private void doApplicationEnd( + private void doStreamEnd( long traceId) { if (!KafkaState.replyClosed(state)) { state = KafkaState.closedReply(state); - doEnd(application, originId, routedId, replyId, replySeq, replyAck, replyMax, + doEnd(sender, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, 0, EMPTY_EXTENSION); } } - private void doApplicationAbort( + private void doStreamAbort( long traceId) { if (!KafkaState.replyClosed(state)) { state = KafkaState.closedReply(state); - doAbort(application, originId, routedId, replyId, replySeq, replyAck, replyMax, + doAbort(sender, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, 0, EMPTY_EXTENSION); } } - private void doApplicationWindow( + private void doStreamWindow( long traceId, long budgetId, int minInitialNoAck, @@ -1586,48 +1586,57 @@ private void doApplicationWindow( state = KafkaState.openedInitial(state); - doWindow(application, originId, routedId, initialId, initialSeq, initialAck, initialMax, + doWindow(sender, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId, clusterClient.authorization, budgetId, minInitialPad); } } - private void doApplicationReset( + private void doStreamReset( long traceId, Flyweight extension) { state = KafkaState.closedInitial(state); - doReset(application, originId, routedId, initialId, initialSeq, initialAck, initialMax, + doReset(sender, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId, clusterClient.authorization, extension); } - private void doApplicationAbortIfNecessary( + private void doStreamAbortIfNecessary( long traceId) { if (!KafkaState.replyClosed(state)) { - doApplicationAbort(traceId); + doStreamAbort(traceId); } } - private void doApplicationResetIfNecessary( + private void doStreamResetIfNecessary( long traceId, Flyweight extension) { if (!KafkaState.initialClosed(state)) { - doApplicationReset(traceId, extension); + doStreamReset(traceId, extension); } } private void onNotCoordinatorError( long traceId, - long authority) + long authorization) + { + clusterClient.doNetworkBeginIfNecessary(traceId, authorization, affinity); + } + + private void onLeaveGroup( + long traceId) { - clusterClient.doNetworkBeginIfNecessary(traceId, authority, affinity); + doStreamEnd(traceId); + + groupMembership.memberIds.remove(groupId); + groupStreams.remove(groupId); } - private void cleanupApplication( + private void cleanupStream( long traceId, int error) { @@ -1637,20 +1646,13 @@ private void cleanupApplication( .error(error) .build(); - cleanupApplication(traceId, kafkaResetEx); - } - - private void cleanupApplication( - long traceId, - Flyweight extension) - { - doApplicationResetIfNecessary(traceId, extension); - doApplicationAbortIfNecessary(traceId); + doStreamResetIfNecessary(traceId, kafkaResetEx); + doStreamAbortIfNecessary(traceId); groupStreams.remove(groupId); } - private void onApplicationMigrate( + private void onStreamMigrate( BeginFW begin, MessageConsumer application) { @@ -1660,10 +1662,10 @@ private void onApplicationMigrate( final long affinity = begin.affinity(); final long traceId = begin.traceId(); - doApplicationResetIfNecessary(traceId, EMPTY_OCTETS); - doApplicationAbortIfNecessary(traceId); + doStreamResetIfNecessary(traceId, EMPTY_OCTETS); + doStreamAbortIfNecessary(traceId); - this.application = application; + this.sender = application; this.originId = originId; this.routedId = routedId; this.initialId = initialId; @@ -2282,7 +2284,7 @@ protected void onDecodeSaslHandshakeResponse( decoder = decodeClusterSaslAuthenticateResponse; break; default: - delegate.cleanupApplication(traceId, errorCode); + delegate.cleanupStream(traceId, errorCode); doNetworkEnd(traceId, authorization); break; } @@ -2301,7 +2303,7 @@ protected void onDecodeSaslAuthenticateResponse( decoder = decodeFindCoordinatorResponse; break; default: - delegate.cleanupApplication(traceId, errorCode); + delegate.cleanupStream(traceId, errorCode); doNetworkEnd(traceId, authorization); break; } @@ -2360,7 +2362,7 @@ private void onError( doNetworkAbort(traceId); doNetworkReset(traceId); - delegate.cleanupApplication(traceId, EMPTY_OCTETS); + delegate.cleanupStream(traceId, ERROR_EXISTS); } private void cleanupDecodeSlotIfNecessary() @@ -2463,11 +2465,7 @@ public void onDecodeResource( assert resource.equals(delegate.nodeId); break; default: - final KafkaResetExFW resetEx = kafkaResetExRW.wrap(extBuffer, 0, extBuffer.capacity()) - .typeId(kafkaTypeId) - .error(errorCode) - .build(); - delegate.cleanupApplication(traceId, resetEx); + delegate.cleanupStream(traceId, errorCode); doNetworkEnd(traceId, authorization); break; } @@ -3024,7 +3022,7 @@ protected void onDecodeSaslHandshakeResponse( decoder = decodeSaslAuthenticateResponse; break; default: - delegate.cleanupApplication(traceId, errorCode); + delegate.cleanupStream(traceId, errorCode); doNetworkEnd(traceId, authorization); break; } @@ -3043,7 +3041,7 @@ protected void onDecodeSaslAuthenticateResponse( decoder = decodeDescribeResponse; break; default: - delegate.cleanupApplication(traceId, errorCode); + delegate.cleanupStream(traceId, errorCode); doNetworkEnd(traceId, authorization); break; } @@ -3293,7 +3291,7 @@ private void onNetworkEnd( cleanupDecodeSlotIfNecessary(); - if (!delegate.isApplicationReplyOpen()) + if (!delegate.isStreamReplyOpen()) { onError(traceId); } @@ -3592,7 +3590,7 @@ private void doEncodeJoinGroupRequest( decoder = decodeJoinGroupResponse; - delegate.doApplicationBeginIfNecessary(traceId, authorization); + delegate.doStreamBeginIfNecessary(traceId, authorization); } private int doGenerateSubscriptionMetadata() @@ -4132,7 +4130,7 @@ protected void onDecodeSaslHandshakeResponse( decoder = decodeCoordinatorSaslAuthenticateResponse; break; default: - delegate.cleanupApplication(traceId, errorCode); + delegate.cleanupStream(traceId, errorCode); doNetworkEnd(traceId, authorization); break; } @@ -4151,7 +4149,7 @@ protected void onDecodeSaslAuthenticateResponse( decoder = decodeJoinGroupResponse; break; default: - delegate.cleanupApplication(traceId, errorCode); + delegate.cleanupStream(traceId, errorCode); doNetworkEnd(traceId, authorization); break; } @@ -4200,7 +4198,7 @@ private void onJoinGroupResponse( if (nextJoinGroupRequestId == nextJoinGroupResponseId) { - delegate.doApplicationFlush(traceId, authorization, + delegate.doStreamFlush(traceId, authorization, ex -> ex.set((b, o, l) -> kafkaFlushExRW.wrap(b, o, l) .typeId(kafkaTypeId) .group(g -> g.leaderId(leaderId) @@ -4312,12 +4310,12 @@ private void onSyncGroupResponse( Array32FW topicAssignment = topicAssignmentBuilder.build(); - delegate.doApplicationData(traceId, authorization, topicAssignment.buffer(), topicAssignment.offset(), + delegate.doStreamData(traceId, authorization, topicAssignment.buffer(), topicAssignment.offset(), topicAssignment.sizeof()); } else { - delegate.doApplicationData(traceId, authorization, EMPTY_OCTETS.buffer(), EMPTY_OCTETS.offset(), + delegate.doStreamData(traceId, authorization, EMPTY_OCTETS.buffer(), EMPTY_OCTETS.offset(), EMPTY_OCTETS.sizeof()); } @@ -4354,13 +4352,10 @@ private void onLeaveGroupResponse( long traceId, long authorization) { - delegate.groupMembership.memberIds.remove(delegate.groupId); - doNetworkEnd(traceId, authorization); doNetworkReset(traceId); - delegate.doApplicationEnd(traceId); - delegate.doApplicationResetIfNecessary(traceId, EMPTY_OCTETS); + delegate.onLeaveGroup(traceId); } private void onRebalanceError( @@ -4389,7 +4384,7 @@ private void onError( doNetworkAbort(traceId); doNetworkReset(traceId); - delegate.cleanupApplication(traceId, EMPTY_OCTETS); + delegate.cleanupStream(traceId, ERROR_EXISTS); } private void cleanupDecodeSlotIfNecessary() diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientGroupIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientGroupIT.java index 4f5e9d2a74..0fefda6763 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientGroupIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientGroupIT.java @@ -176,4 +176,14 @@ public void shouldIgnoreHeartbeatBeforeHandshakeComplete() throws Exception { k3po.finish(); } + + @Test + @Configuration("client.yaml") + @Specification({ + "${app}/rebalance.multiple.members.with.same.group.id/client", + "${net}/rebalance.multiple.members.with.same.group.id/server"}) + public void shouldRebalanceMultipleMembersWithSameGroupId() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/rebalance.multiple.members.with.same.group.id/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/rebalance.multiple.members.with.same.group.id/client.rpt new file mode 100644 index 0000000000..ca60faf5e2 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/rebalance.multiple.members.with.same.group.id/client.rpt @@ -0,0 +1,96 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .group() + .groupId("test") + .protocol("highlander") + .timeout(45000) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .group() + .groupId("test") + .protocol("highlander") + .timeout(30000) + .build() + .build()} + +read advised zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .group() + .leaderId("memberId-1") + .memberId("memberId-1") + .members("memberId-1") + .build() + .build()} + +write zilla:data.empty +write flush + +read zilla:data.empty + +write close +read closed + +read notify ROUTED_BROKER_SERVER + +connect await ROUTED_BROKER_SERVER "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .group() + .groupId("test") + .protocol("highlander") + .timeout(45000) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .group() + .groupId("test") + .protocol("highlander") + .timeout(30000) + .build() + .build()} + +read advised zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .group() + .leaderId("memberId-2") + .memberId("memberId-2") + .members("memberId-2") + .build() + .build()} + +write zilla:data.empty +write flush + +read zilla:data.empty diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/rebalance.multiple.members.with.same.group.id/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/rebalance.multiple.members.with.same.group.id/server.rpt new file mode 100644 index 0000000000..d741071698 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/rebalance.multiple.members.with.same.group.id/server.rpt @@ -0,0 +1,98 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .group() + .groupId("test") + .protocol("highlander") + .timeout(45000) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .group() + .groupId("test") + .protocol("highlander") + .timeout(30000) + .build() + .build()} +write flush + +write advise zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .group() + .leaderId("memberId-1") + .memberId("memberId-1") + .members("memberId-1") + .build() + .build()} + +read zilla:data.empty + +write zilla:data.empty +write flush + +read closed +write close + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .group() + .groupId("test") + .protocol("highlander") + .timeout(45000) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .group() + .groupId("test") + .protocol("highlander") + .timeout(30000) + .build() + .build()} +write flush + +write advise zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .group() + .leaderId("memberId-2") + .memberId("memberId-2") + .members("memberId-2") + .build() + .build()} + +read zilla:data.empty + +write zilla:data.empty +write flush diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/rebalance.multiple.members.with.same.group.id/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/rebalance.multiple.members.with.same.group.id/client.rpt new file mode 100644 index 0000000000..ea6d69890c --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/rebalance.multiple.members.with.same.group.id/client.rpt @@ -0,0 +1,312 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 +property instanceId ${kafka:randomBytes(42)} + +property newRequestId ${kafka:newRequestId()} +property fetchWaitMax 500 +property fetchBytesMax 65535 +property partitionBytesMax 8192 + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 22 # size + 10s # find coordinator + 1s # v1 + ${newRequestId} + 5s "zilla" # client id + 4s "test" # "session" coordinator key + [0x00] # coordinator group type + +read 35 # size + (int:newRequestId) + 0 # throttle time + 0s # no error + 4s "none" # error message none + 0 # coordinator node + 9s "localhost" # host + 9092 # port + +write 82 # size + 32s # describe configs + 0s # v0 + ${newRequestId} + -1s # no client id + 1 # resources + [0x04] # broker resource + 1s "0" # "node" topic + 2 # configs + 28s "group.min.session.timeout.ms" # name + 28s "group.max.session.timeout.ms" # name + +read 103 # size + (int:newRequestId) + 0 + 1 # resources + 0s # no error + -1s # error message + [0x04] # broker resource + 1s "0" # "0" nodeId + 2 # configs + 28s "group.min.session.timeout.ms" # name + 4s "6000" # value + [0x00] # not read only + [0x00] # not default + [0x00] # not sensitive + 28s "group.max.session.timeout.ms" # name + 5s "30000" # value + [0x00] # not read only + [0x00] # not default + [0x00] # not sensitive + +write 22 # size + 10s # find coordinator + 1s # v1 + ${newRequestId} + 5s "zilla" # client id + 4s "test" # "session" coordinator key + [0x00] # coordinator group type + +read 35 # size + (int:newRequestId) + 0 # throttle time + 0s # no error + 4s "none" # error message none + 0 # coordinator node + 9s "localhost" # host + 9092 # port + +write 82 # size + 32s # describe configs + 0s # v0 + ${newRequestId} + -1s # no client id + 1 # resources + [0x04] # broker resource + 1s "0" # "node" topic + 2 # configs + 28s "group.min.session.timeout.ms" # name + 28s "group.max.session.timeout.ms" # name + +read 103 # size + (int:newRequestId) + 0 + 1 # resources + 0s # no error + -1s # error message + [0x04] # broker resource + 1s "0" # "0" nodeId + 2 # configs + 28s "group.min.session.timeout.ms" # name + 4s "6000" # value + [0x00] # not read only + [0x00] # not default + [0x00] # not sensitive + 28s "group.max.session.timeout.ms" # name + 5s "30000" # value + [0x00] # not read only + [0x00] # not default + [0x00] # not sensitive + +read notify ROUTED_DESCRIBE_SERVER + +connect await ROUTED_DESCRIBE_SERVER + "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 119 # size + 11s # join group + 5s # v5 + ${newRequestId} + 5s "zilla" # client id + 4s "test" # consumer group + 30000 # session timeout + 4000 # rebalance timeout + 0s # consumer group member + 42s ${instanceId} # group instance id + 8s "consumer" # protocol type + 1 # group protocol + 10s "highlander" # protocol name + 14 # metadata size + ${kafka:randomBytes(14)} # metadata + +read 34 # size + (int:newRequestId) + 0 # throttle time + 79s # member id required + -1 # generated id + 0s # protocol name + 0s # leader id + 10s "memberId-1" # consumer member group id + 0 # members + +write 129 # size + 11s # join group + 5s # v5 + ${newRequestId} + 5s "zilla" # no client id + 4s "test" # consumer group + 30000 # session timeout + 4000 # rebalance timeout + 10s "memberId-1" # consumer group member + 42s ${instanceId} # group instance id + 8s "consumer" # protocol type + 1 # group protocol + 10s "highlander" # protocol name + 14 # metadata size + ${kafka:randomBytes(14)} # metadata + +read 128 # size + (int:newRequestId) + 0 # throttle time + 0s # no error + 3 # generated id + 10s "highlander" # protocol name + 10s "memberId-1" # leader id + 10s "memberId-1" # consumer member group id + 1 # members + 10s "memberId-1" # consumer member group id + 42s [0..42] # group instance id + 14 # metadata size + 2s # version + 0 # topics + 0 # userdata + 0 # partitions + +write 101 # size + 14s # sync group + 3s # v3 + ${newRequestId} + 5s "zilla" # no client id + 4s "test" # consumer group + 3 # generation id + 10s "memberId-1" # consumer member group id + 42s ${instanceId} # group instance id + 1 # assignments + 10s "memberId-1" # consumer member group id + 0 # metadata + +read 14 # size + (int:newRequestId) + 0 # throttle time + 0s # no error + 0 # assignment + +write 81 # size + 13s # leave group + 3s # v3 + ${newRequestId} + 5s "zilla" # client id + 4s "test" # consumer group + 1 # assignments + 10s "memberId-1" # consumer member group id + 42s ${instanceId} # group instance id + +read 70 # size + (int:newRequestId) + 0 # throttle time + 0s # no error + 1 # assignments + 10s "memberId-1" # consumer member group id + 42s [0..42] # group instance id + +write 119 # size + 11s # join group + 5s # v5 + ${newRequestId} + 5s "zilla" # client id + 4s "test" # consumer group + 30000 # session timeout + 4000 # rebalance timeout + 0s # consumer group member + 42s ${instanceId} # group instance id + 8s "consumer" # protocol type + 1 # group protocol + 10s "highlander" # protocol name + 14 # metadata size + ${kafka:randomBytes(14)} # metadata + +read 34 # size + (int:newRequestId) + 0 # throttle time + 79s # member id required + -1 # generated id + 0s # protocol name + 0s # leader id + 10s "memberId-2" # consumer member group id + 0 # members + +write 129 # size + 11s # join group + 5s # v5 + ${newRequestId} + 5s "zilla" # no client id + 4s "test" # consumer group + 30000 # session timeout + 4000 # rebalance timeout + 10s "memberId-2" # consumer group member + 42s ${instanceId} # group instance id + 8s "consumer" # protocol type + 1 # group protocol + 10s "highlander" # protocol name + 14 # metadata size + ${kafka:randomBytes(14)} # metadata + +read 128 # size + (int:newRequestId) + 0 # throttle time + 0s # no error + 3 # generated id + 10s "highlander" # protocol name + 10s "memberId-2" # leader id + 10s "memberId-2" # consumer member group id + 1 # members + 10s "memberId-2" # consumer member group id + 42s [0..42] # group instance id + 14 # metadata size + 2s # version + 0 # topics + 0 # userdata + 0 # partitions + +write 101 # size + 14s # sync group + 3s # v3 + ${newRequestId} + 5s "zilla" # no client id + 4s "test" # consumer group + 3 # generation id + 10s "memberId-2" # consumer member group id + 42s ${instanceId} # group instance id + 1 # assignments + 10s "memberId-2" # consumer member group id + 0 # metadata + +read 14 # size + (int:newRequestId) + 0 # throttle time + 0s # no error + 0 # assignment diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/rebalance.multiple.members.with.same.group.id/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/rebalance.multiple.members.with.same.group.id/server.rpt new file mode 100644 index 0000000000..c56a29d2b4 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/rebalance.multiple.members.with.same.group.id/server.rpt @@ -0,0 +1,313 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property instanceId ${kafka:randomBytes(42)} + +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 22 # size + 10s # find coordinator + 1s # v1 + (int:newRequestId) + 5s "zilla" # client id + 4s "test" # "test" coordinator key + [0x00] # coordinator group type + +write 35 # size + ${newRequestId} + 0 # throttle time + 0s # no error + 4s "none" # error message none + 0 # coordinator node + 9s "localhost" # host + 9092 # port + +read 82 # size + 32s # describe configs + 0s # v0 + (int:requestId) + -1s # no client id + 1 # resources + [0x04] # broker resource + 1s "0" # "node" topic + 2 # configs + 28s "group.min.session.timeout.ms" # name + 28s "group.max.session.timeout.ms" # name + +write 103 # size + ${requestId} + 0 + 1 # resources + 0s # no error + -1s # error message + [0x04] # broker resource + 1s "0" # "0" nodeId + 2 # configs + 28s "group.min.session.timeout.ms" # name + 4s "6000" # value + [0x00] # not read only + [0x00] # not default + [0x00] # not sensitive + 28s "group.max.session.timeout.ms" # name + 5s "30000" # value + [0x00] # not read only + [0x00] # not default + [0x00] # not sensitive + +read 22 # size + 10s # find coordinator + 1s # v1 + (int:newRequestId) + 5s "zilla" # client id + 4s "test" # "test" coordinator key + [0x00] # coordinator group type + +write 35 # size + ${newRequestId} + 0 # throttle time + 0s # no error + 4s "none" # error message none + 0 # coordinator node + 9s "localhost" # host + 9092 # port + +read 82 # size + 32s # describe configs + 0s # v0 + (int:requestId) + -1s # no client id + 1 # resources + [0x04] # broker resource + 1s "0" # "node" topic + 2 # configs + 28s "group.min.session.timeout.ms" # name + 28s "group.max.session.timeout.ms" # name + +write 103 # size + ${requestId} + 0 + 1 # resources + 0s # no error + -1s # error message + [0x04] # broker resource + 1s "0" # "0" nodeId + 2 # configs + 28s "group.min.session.timeout.ms" # name + 4s "6000" # value + [0x00] # not read only + [0x00] # not default + [0x00] # not sensitive + 28s "group.max.session.timeout.ms" # name + 5s "30000" # value + [0x00] # not read only + [0x00] # not default + [0x00] # not sensitive + + +accepted + +connected + +read 119 # size + 11s # join group + 5s # v5 + (int:newRequestId) + 5s "zilla" # client id + 4s "test" # consumer group + 30000 # session timeout + 4000 # rebalance timeout + 0s # consumer group member + 42s [0..42] # group instance id + 8s "consumer" # protocol type + 1 # group protocol + 10s "highlander" # protocol name + 14 # metadata size + [0..14] # metadata + + +write 34 # size + ${newRequestId} + 0 # throttle time + 79s # member id required + -1 # generated id + 0s # protocol name + 0s # leader id + 10s "memberId-1" # consumer member group id + 0 # members + +read 129 # size + 11s # join group + 5s # v5 + (int:newRequestId) + 5s "zilla" # no client id + 4s "test" # consumer group + 30000 # session timeout + 4000 # rebalance timeout + 10s "memberId-1" # consumer group member + 42s [0..42] # group instance id + 8s "consumer" # protocol type + 1 # group protocol + 10s "highlander" # protocol name + 14 # metadata size + [0..14] # metadata + +write 128 # size + ${newRequestId} + 0 # throttle time + 0s # no error + 3 # generated id + 10s "highlander" # protocol name + 10s "memberId-1" # leader id + 10s "memberId-1" # consumer member group id + 1 # members + 10s "memberId-1" # consumer member group id + 42s ${instanceId} # group instance id + 14 # metadata size + 2s # version + 0 # topics + 0 # userdata + 0 # partitions + +read 101 # size + 14s # sync group + 3s # v3 + (int:newRequestId) + 5s "zilla" # no client id + 4s "test" # consumer group + 3 # generation id + 10s "memberId-1" # consumer member group id + 42s [0..42] # group instance id + 1 # assignments + 10s "memberId-1" # consumer member group id + 0 # metadata + +write 14 # size + ${newRequestId} + 0 # throttle time + 0s # no error + 0 # assignment + +read 81 # size + 13s # leave group + 3s # v3 + (int:newRequestId) + 5s "zilla" # client id + 4s "test" # consumer group + 1 # assignments + 10s "memberId-1" # consumer member group id + 42s [0..42] # group instance id + +write 70 # size + ${newRequestId} + 0 # throttle time + 0s # no error + 1 # assignments + 10s "memberId-1" # consumer member group id + 42s ${instanceId} # group instance id + +#Second try + +read 119 # size + 11s # join group + 5s # v5 + (int:newRequestId) + 5s "zilla" # client id + 4s "test" # consumer group + 30000 # session timeout + 4000 # rebalance timeout + 0s # consumer group member + 42s [0..42] # group instance id + 8s "consumer" # protocol type + 1 # group protocol + 10s "highlander" # protocol name + 14 # metadata size + [0..14] # metadata + + +write 34 # size + ${newRequestId} + 0 # throttle time + 79s # member id required + -1 # generated id + 0s # protocol name + 0s # leader id + 10s "memberId-2" # consumer member group id + 0 # members + + +read 129 # size + 11s # join group + 5s # v5 + (int:newRequestId) + 5s "zilla" # no client id + 4s "test" # consumer group + 30000 # session timeout + 4000 # rebalance timeout + 10s "memberId-2" # consumer group member + 42s [0..42] # group instance id + 8s "consumer" # protocol type + 1 # group protocol + 10s "highlander" # protocol name + 14 # metadata size + [0..14] # metadata + +write 128 # size + ${newRequestId} + 0 # throttle time + 0s # no error + 3 # generated id + 10s "highlander" # protocol name + 10s "memberId-2" # leader id + 10s "memberId-2" # consumer member group id + 1 # members + 10s "memberId-2" # consumer member group id + 42s ${instanceId} # group instance id + 14 # metadata size + 2s # version + 0 # topics + 0 # userdata + 0 # partitions + +read 101 # size + 14s # sync group + 3s # v3 + (int:newRequestId) + 5s "zilla" # no client id + 4s "test" # consumer group + 3 # generation id + 10s "memberId-2" # consumer member group id + 42s [0..42] # group instance id + 1 # assignments + 10s "memberId-2" # consumer member group id + 0 # metadata + +write 14 # size + ${newRequestId} + 0 # throttle time + 0s # no error + 0 # assignment + + + + + diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/GroupIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/GroupIT.java index 56c078e0b4..bec56f7a7a 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/GroupIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/GroupIT.java @@ -134,4 +134,13 @@ public void shouldReassignOnNewTopic() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${app}/rebalance.multiple.members.with.same.group.id/client", + "${app}/rebalance.multiple.members.with.same.group.id/server"}) + public void shouldRebalanceMultipleMembersWithSameGroupId() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/GroupIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/GroupIT.java index 7b2f7f5bf9..13493d5c45 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/GroupIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/GroupIT.java @@ -152,4 +152,14 @@ public void shouldIgnoreHeartbeatBeforeHandshakeComplete() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${net}/rebalance.multiple.members.with.same.group.id/client", + "${net}/rebalance.multiple.members.with.same.group.id/server"}) + public void shouldRebalanceMultipleMembersWithSameGroupId() throws Exception + { + k3po.finish(); + } + }