From 646b33c114e8ac4efa7c2ef76f2568faff830cf5 Mon Sep 17 00:00:00 2001 From: John Fallows Date: Mon, 12 Feb 2024 17:24:54 -0800 Subject: [PATCH] Require group host and port for coordinator-specific streams (#794) --- .../stream/KafkaCacheServerConsumerFactory.java | 12 +++++++----- .../binding/kafka/internal/KafkaFunctions.java | 14 ++++++++++++++ .../src/main/resources/META-INF/zilla/kafka.idl | 8 ++++---- .../commit.acknowledge.message.offset/client.rpt | 4 +++- .../commit.acknowledge.message.offset/server.rpt | 4 +++- .../unmerged.group.fetch.message.ack/client.rpt | 6 ++++-- .../unmerged.group.fetch.message.ack/server.rpt | 6 ++++-- .../binding/kafka/internal/KafkaFunctionsTest.java | 6 ++++++ 8 files changed, 45 insertions(+), 15 deletions(-) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerConsumerFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerConsumerFactory.java index deee0bf4dc..516e5a4363 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerConsumerFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerConsumerFactory.java @@ -651,11 +651,11 @@ private void doConsumerInitialBegin( this.receiver = newStream(this::onConsumerMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax, - traceId, authorization, 0L, - ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) + traceId, authorization, 0L, ex -> ex + .set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .group(g -> - g.groupId(groupId) + .group(g -> g + .groupId(groupId) .protocol("rebalance") .timeout(timeout) .metadataLen(metadata.sizeof()) @@ -1545,7 +1545,9 @@ private void doOffsetCommitInitialBegin( .offsetCommit(oc -> oc .groupId(delegate.fanout.groupId) .memberId(delegate.fanout.memberId) - .instanceId(delegate.fanout.instanceId)) + .instanceId(delegate.fanout.instanceId) + .host(delegate.fanout.host) + .port(delegate.fanout.port)) .build().sizeof())); state = KafkaState.openingInitial(state); } diff --git a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java index d6016eafc5..c2b215b83f 100644 --- a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java +++ b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java @@ -1482,6 +1482,20 @@ public KafkaConsumerBeginExBuilder consumerId( return this; } + public KafkaConsumerBeginExBuilder host( + String host) + { + consumerBeginExRW.host(host); + return this; + } + + public KafkaConsumerBeginExBuilder port( + int port) + { + consumerBeginExRW.port(port); + return this; + } + public KafkaConsumerBeginExBuilder timeout( int timeout) { diff --git a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl index a3c0874d9d..62873faacb 100644 --- a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl +++ b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl @@ -467,8 +467,8 @@ scope kafka struct KafkaOffsetFetchBeginEx { string16 groupId; - string16 host = null; - int32 port = 0; + string16 host; + int32 port; string16 topic; KafkaTopicPartition[] partitions; } @@ -483,8 +483,8 @@ scope kafka string16 groupId; string16 memberId; string16 instanceId; - string16 host = null; - int32 port = 0; + string16 host; + int32 port; } struct KafkaOffsetCommitDataEx diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/client.rpt index a771dd38c1..072b950b50 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/client.rpt @@ -42,7 +42,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .groupId("client-1") .protocol("rebalance") .instanceId("zilla") - .host("localhost") + .host("broker1.example.com") .port(9092) .timeout(30000) .build() @@ -131,6 +131,8 @@ write zilla:begin.ext ${kafka:beginEx() .groupId("client-1") .memberId("memberId-1") .instanceId("zilla") + .host("broker1.example.com") + .port(9092) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/server.rpt index 48a34b2a83..2d99a56c80 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/server.rpt @@ -46,7 +46,7 @@ write zilla:begin.ext ${kafka:beginEx() .groupId("client-1") .protocol("rebalance") .instanceId("zilla") - .host("localhost") + .host("broker1.example.com") .port(9092) .timeout(30000) .build() @@ -131,6 +131,8 @@ read zilla:begin.ext ${kafka:beginEx() .groupId("client-1") .memberId("memberId-1") .instanceId("zilla") + .host("broker1.example.com") + .port(9092) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/client.rpt index 47cda8d305..0d306a2e59 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/client.rpt @@ -135,7 +135,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .group() .groupId("client-1") .protocol("rebalance") - .host("localhost") + .host("broker1.example.com") .port(9092) .timeout(30000) .build() @@ -195,7 +195,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .offsetFetch() .groupId("client-1") - .host("localhost") + .host("broker1.example.com") .port(9092) .topic("test") .partition(0) @@ -258,6 +258,8 @@ write zilla:begin.ext ${kafka:beginEx() .groupId("client-1") .memberId("memberId-1") .instanceId("zilla") + .host("broker1.example.com") + .port(9092) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/server.rpt index 82b57608e9..ada4fbf33e 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/server.rpt @@ -134,7 +134,7 @@ write zilla:begin.ext ${kafka:beginEx() .groupId("client-1") .protocol("rebalance") .instanceId("zilla") - .host("localhost") + .host("broker1.example.com") .port(9092) .timeout(30000) .build() @@ -190,7 +190,7 @@ read zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .offsetFetch() .groupId("client-1") - .host("localhost") + .host("broker1.example.com") .port(9092) .topic("test") .partition(0) @@ -247,6 +247,8 @@ read zilla:begin.ext ${kafka:beginEx() .groupId("client-1") .memberId("memberId-1") .instanceId("zilla") + .host("broker1.example.com") + .port(9092) .build() .build()} diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctionsTest.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctionsTest.java index f996b0c30b..4f2b862f5f 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctionsTest.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctionsTest.java @@ -4225,6 +4225,8 @@ public void shouldGenerateGroupBeginWithEmptyMetadataExtension() .group() .groupId("test") .protocol("roundrobin") + .host("broker1.example.com") + .port(9092) .timeout(10) .build() .build(); @@ -4237,6 +4239,7 @@ public void shouldGenerateGroupBeginWithEmptyMetadataExtension() final KafkaGroupBeginExFW groupBeginEx = beginEx.group(); assertEquals("test", groupBeginEx.groupId().asString()); assertEquals("roundrobin", groupBeginEx.protocol().asString()); + assertEquals("broker1.example.com", groupBeginEx.host().asString()); assertEquals(10, groupBeginEx.timeout()); } @@ -4248,6 +4251,8 @@ public void shouldGenerateConsumerBeginExtension() .consumer() .groupId("test") .consumerId("consumer-1") + .host("broker1.example.com") + .port(9092) .timeout(10000) .topic("topic") .partition(0) @@ -4261,6 +4266,7 @@ public void shouldGenerateConsumerBeginExtension() final KafkaConsumerBeginExFW consumerBeginEx = beginEx.consumer(); assertEquals("test", consumerBeginEx.groupId().asString()); + assertEquals("broker1.example.com", consumerBeginEx.host().asString()); assertEquals("topic", consumerBeginEx.topic().asString()); assertEquals(1, consumerBeginEx.partitionIds().fieldCount()); }