Skip to content

Commit

Permalink
Require group host and port for coordinator-specific streams (#794)
Browse files Browse the repository at this point in the history
  • Loading branch information
jfallows authored Feb 13, 2024
1 parent 95bb4b5 commit 646b33c
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -651,11 +651,11 @@ private void doConsumerInitialBegin(

this.receiver = newStream(this::onConsumerMessage,
originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, 0L,
ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l)
traceId, authorization, 0L, ex -> ex
.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.group(g ->
g.groupId(groupId)
.group(g -> g
.groupId(groupId)
.protocol("rebalance")
.timeout(timeout)
.metadataLen(metadata.sizeof())
Expand Down Expand Up @@ -1545,7 +1545,9 @@ private void doOffsetCommitInitialBegin(
.offsetCommit(oc -> oc
.groupId(delegate.fanout.groupId)
.memberId(delegate.fanout.memberId)
.instanceId(delegate.fanout.instanceId))
.instanceId(delegate.fanout.instanceId)
.host(delegate.fanout.host)
.port(delegate.fanout.port))
.build().sizeof()));
state = KafkaState.openingInitial(state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,20 @@ public KafkaConsumerBeginExBuilder consumerId(
return this;
}

public KafkaConsumerBeginExBuilder host(
String host)
{
consumerBeginExRW.host(host);
return this;
}

public KafkaConsumerBeginExBuilder port(
int port)
{
consumerBeginExRW.port(port);
return this;
}

public KafkaConsumerBeginExBuilder timeout(
int timeout)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,8 @@ scope kafka
struct KafkaOffsetFetchBeginEx
{
string16 groupId;
string16 host = null;
int32 port = 0;
string16 host;
int32 port;
string16 topic;
KafkaTopicPartition[] partitions;
}
Expand All @@ -483,8 +483,8 @@ scope kafka
string16 groupId;
string16 memberId;
string16 instanceId;
string16 host = null;
int32 port = 0;
string16 host;
int32 port;
}

struct KafkaOffsetCommitDataEx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ read zilla:begin.ext ${kafka:matchBeginEx()
.groupId("client-1")
.protocol("rebalance")
.instanceId("zilla")
.host("localhost")
.host("broker1.example.com")
.port(9092)
.timeout(30000)
.build()
Expand Down Expand Up @@ -131,6 +131,8 @@ write zilla:begin.ext ${kafka:beginEx()
.groupId("client-1")
.memberId("memberId-1")
.instanceId("zilla")
.host("broker1.example.com")
.port(9092)
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ write zilla:begin.ext ${kafka:beginEx()
.groupId("client-1")
.protocol("rebalance")
.instanceId("zilla")
.host("localhost")
.host("broker1.example.com")
.port(9092)
.timeout(30000)
.build()
Expand Down Expand Up @@ -131,6 +131,8 @@ read zilla:begin.ext ${kafka:beginEx()
.groupId("client-1")
.memberId("memberId-1")
.instanceId("zilla")
.host("broker1.example.com")
.port(9092)
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ read zilla:begin.ext ${kafka:matchBeginEx()
.group()
.groupId("client-1")
.protocol("rebalance")
.host("localhost")
.host("broker1.example.com")
.port(9092)
.timeout(30000)
.build()
Expand Down Expand Up @@ -195,7 +195,7 @@ write zilla:begin.ext ${kafka:beginEx()
.typeId(zilla:id("kafka"))
.offsetFetch()
.groupId("client-1")
.host("localhost")
.host("broker1.example.com")
.port(9092)
.topic("test")
.partition(0)
Expand Down Expand Up @@ -258,6 +258,8 @@ write zilla:begin.ext ${kafka:beginEx()
.groupId("client-1")
.memberId("memberId-1")
.instanceId("zilla")
.host("broker1.example.com")
.port(9092)
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ write zilla:begin.ext ${kafka:beginEx()
.groupId("client-1")
.protocol("rebalance")
.instanceId("zilla")
.host("localhost")
.host("broker1.example.com")
.port(9092)
.timeout(30000)
.build()
Expand Down Expand Up @@ -190,7 +190,7 @@ read zilla:begin.ext ${kafka:beginEx()
.typeId(zilla:id("kafka"))
.offsetFetch()
.groupId("client-1")
.host("localhost")
.host("broker1.example.com")
.port(9092)
.topic("test")
.partition(0)
Expand Down Expand Up @@ -247,6 +247,8 @@ read zilla:begin.ext ${kafka:beginEx()
.groupId("client-1")
.memberId("memberId-1")
.instanceId("zilla")
.host("broker1.example.com")
.port(9092)
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4225,6 +4225,8 @@ public void shouldGenerateGroupBeginWithEmptyMetadataExtension()
.group()
.groupId("test")
.protocol("roundrobin")
.host("broker1.example.com")
.port(9092)
.timeout(10)
.build()
.build();
Expand All @@ -4237,6 +4239,7 @@ public void shouldGenerateGroupBeginWithEmptyMetadataExtension()
final KafkaGroupBeginExFW groupBeginEx = beginEx.group();
assertEquals("test", groupBeginEx.groupId().asString());
assertEquals("roundrobin", groupBeginEx.protocol().asString());
assertEquals("broker1.example.com", groupBeginEx.host().asString());
assertEquals(10, groupBeginEx.timeout());
}

Expand All @@ -4248,6 +4251,8 @@ public void shouldGenerateConsumerBeginExtension()
.consumer()
.groupId("test")
.consumerId("consumer-1")
.host("broker1.example.com")
.port(9092)
.timeout(10000)
.topic("topic")
.partition(0)
Expand All @@ -4261,6 +4266,7 @@ public void shouldGenerateConsumerBeginExtension()

final KafkaConsumerBeginExFW consumerBeginEx = beginEx.consumer();
assertEquals("test", consumerBeginEx.groupId().asString());
assertEquals("broker1.example.com", consumerBeginEx.host().asString());
assertEquals("topic", consumerBeginEx.topic().asString());
assertEquals(1, consumerBeginEx.partitionIds().fieldCount());
}
Expand Down

0 comments on commit 646b33c

Please sign in to comment.