-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement JoinGroup request as first class stream #844
Conversation
@@ -62,12 +63,20 @@ public KafkaClientFactory( | |||
final BindingHandler streamFactory = config.clientConnectionPool() ? connectionPool.streamFactory() : | |||
context.streamFactory(); | |||
|
|||
final Function<Integer, BindingHandler> streamFactorySupplier = d -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid auto-boxing, use IntFunction<BindingHandler>
instead, for all references too.
@@ -362,7 +381,7 @@ public MessageConsumer newStream( | |||
{ | |||
final long resolvedId = resolved.id; | |||
final List<KafkaServerConfig> servers = binding.servers(); | |||
final KafkaSaslConfig sasl = resolveSasl.apply(binding.sasl()); | |||
final KafkaSaslConfig sasl = resolveSaslSupplier.apply(0).apply(binding.sasl()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please define a constant private static final int NO_REBALANCE_DELAY = 0
and use it here instead of 0
.
@@ -413,6 +432,7 @@ public void onDetached( | |||
|
|||
private MessageConsumer newStream( | |||
MessageConsumer sender, | |||
int initialDelay, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the meaning already associated with initial
for streams, let's rename initialDelay
to rebalanceDelay
in this file.
Description
Move join group request stream out of coordinator stream and use a separate connection if the group.initial.rebalance.delay.ms > 0 otherwise use the connection pool since there won't be any delay.
Fixes #719