-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow to bind KafkaClientMetrics for each decaton consumer #132
Conversation
Thanks for PR! I think one of the downsides of enabling
That's because each decaton subscription creates metrics with its own label sets, so unless we clean these metrics up at the end of subscription's lifetime, there will be a garbage created which will keep accumulating. It wouldn't become a significant problem in most of decaton use-cases where decaton subscription dies along with application instance, but still should be cleaned up as possible as we can as a manner. |
yes, assuming people don't already monitor their Kafka client via JMX adapted (namely, micrometer's deprecated
If I understand what you mean correctly, isn't that exactly the
My first thought, but it was not convenient since it doesn't have the
Sounds good. Will add this flag.
I see, if we go the way I suggest in this PR I will fix to cleanup the metrics |
8741661
to
cefcab2
Compare
I couldn't find a way to cleanup |
No, they favored the way using |
Indeed with the current version, but looks like it has added in latest? |
Okay, I didn't know there was such a thing. I don't know why would they use or not use it :D
a-ha, nice. Let me try to integrate it. |
cefcab2
to
7036afb
Compare
PTAL. |
wow, many tests started timing out... why.. |
Just guessing, by increased number of metrics ...? We might need profiling to be sure. |
It turns out that calling I added commits to
|
|
||
public void bindClientMetrics(Consumer<String, byte[]> consumer) { | ||
if (kafkaClientMetrics != null) { | ||
closeClientMetrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which case would this be effective?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, none. Indeed SubscriptionMetrics
are only instantiated in ProcessorSubscription
ctor. Removing
@@ -59,8 +59,8 @@ | |||
presetRetryProducerConfig.put(ProducerConfig.LINGER_MS_CONFIG, "100"); | |||
} | |||
|
|||
@Setter(AccessLevel.NONE) | |||
private ProcessorProperties.Builder<ProcessorProperties> propertiesBuilder; | |||
private final ProcessorProperties.Builder<ProcessorProperties> propertiesBuilder = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid overwriting properties in SubscriptionBuilder - I can't see any good reason to do it
The intention is to provide a way to "clear" (reset) something set previously. So basically these methods taking collection of items replaces existing set rather than adding.
To keep this behavior consistent to other methods like consumerConfig
and backward, I think we shouldn't change this.
I'm happy to add another kind of method like addProperties()
(addProperty()
) which supports it and being more explicit tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's what one would expect of a builder, but happy to oblige with your design constraints
@@ -62,6 +72,7 @@ private static Properties defaultProducerProps(String bootstrapServers) { | |||
} | |||
|
|||
public static final String DEFAULT_GROUP_ID = "test-group"; | |||
public static final Duration DEFINITELY_TOO_SLOW = Duration.ofSeconds(20); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add timeouts of 20s in some places of the tests that were blocking forever, for easier debugging
curious why backtrace provided when test itself times out (@Test(timeout = xxx)
) isn't sufficient to know at where it stuck?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because it was not exactly stuck. Producer and Consumer were running and doing nothing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thread.currentThread().interrupt(); | ||
throw new RuntimeException(e); | ||
} | ||
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the intention of this change? Seems like waiting just 100ms here isn't an actual problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just avoid having to deal with InterruptedException
. No strong preference anyways
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
LGTM except kawamuray's point.
Thanks for the reviews! Is the next release already scheduled? |
It should be in next few weeks, but if you're in hurry, I can make a release now. |
I'm not exactly in a hurry but finer-grained releases usually make happier users! |
Motivation
micrometer
1.4+ has deprecated the JMX-based based metrics gathering (KafkaConsumerMetrics
) in favour of convertingConsumer
exposedMetrics
. See this PR.Changes
Always bind
KafkaClientMetrics
, tagged with the right subscription id.This should not be a breaking change since there is no reasonable way to have already bound
KafkaClientMetrics
for these consumers elsewhere.Note : not doing this for
KafkaProducerSupplier
which is overridable / customizable.(I'd like advice on how to deal with the test failure - I don't understand why we have to manage micrometer registry cleanup manually)