Skip to content

Commit

Permalink
Prevent KafkaMetrics' scheduler from stopping in case of errors
Browse files Browse the repository at this point in the history
fixed gh-2879
  • Loading branch information
jonatan-ivanov committed Nov 30, 2021
1 parent 022f3af commit 1a0657d
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,71 +160,78 @@ private long getRefreshIntervalInMillis() {
* comparing meters last returned from the Kafka client.
*/
void checkAndBindMetrics(MeterRegistry registry) {
this.metrics.set(this.metricsSupplier.get());
Map<MetricName, ? extends Metric> metrics = this.metrics.get();

if (!currentMeters.equals(metrics.keySet())) {
Set<MetricName> metricsToRemove = currentMeters.stream()
.filter(metricName -> !metrics.containsKey(metricName))
.collect(Collectors.toSet());

for (MetricName metricName : metricsToRemove) {
Meter.Id id = meterIdForComparison(metricName);
Meter meter = registry.remove(id);
registeredMeters.remove(meter);
}
try {
this.metrics.set(this.metricsSupplier.get());
Map<MetricName, ? extends Metric> metrics = this.metrics.get();

if (!currentMeters.equals(metrics.keySet())) {
Set<MetricName> metricsToRemove = currentMeters.stream()
.filter(metricName -> !metrics.containsKey(metricName))
.collect(Collectors.toSet());

for (MetricName metricName : metricsToRemove) {
Meter.Id id = meterIdForComparison(metricName);
Meter meter = registry.remove(id);
if (meter != null) {
registeredMeters.remove(meter);
}
}

currentMeters = new HashSet<>(metrics.keySet());
currentMeters = new HashSet<>(metrics.keySet());

Map<String, List<Meter>> registryMetersByNames = registry.getMeters().stream()
.collect(Collectors.groupingBy(meter -> meter.getId().getName()));
Map<String, List<Meter>> registryMetersByNames = registry.getMeters().stream()
.collect(Collectors.groupingBy(meter -> meter.getId().getName()));

metrics.forEach((name, metric) -> {
// Filter out non-numeric values
// Filter out metrics from groups that include metadata
if (!(metric.metricValue() instanceof Number) ||
METRIC_GROUP_APP_INFO.equals(name.group()) ||
METRIC_GROUP_METRICS_COUNT.equals(name.group())) {
return;
}
metrics.forEach((name, metric) -> {
// Filter out non-numeric values
// Filter out metrics from groups that include metadata
if (!(metric.metricValue() instanceof Number) ||
METRIC_GROUP_APP_INFO.equals(name.group()) ||
METRIC_GROUP_METRICS_COUNT.equals(name.group())) {
return;
}

String meterName = meterName(name);

// Kafka has metrics with lower number of tags (e.g. with/without topic or partition tag)
// Remove meters with lower number of tags
boolean hasLessTags = false;
for (Meter other : registryMetersByNames.getOrDefault(meterName, emptyList())) {
List<Tag> tags = other.getId().getTags();
List<Tag> meterTagsWithCommonTags = meterTags(name, true);
if (tags.size() < meterTagsWithCommonTags.size()) {
registry.remove(other);
registeredMeters.remove(other);
String meterName = meterName(name);

// Kafka has metrics with lower number of tags (e.g. with/without topic or partition tag)
// Remove meters with lower number of tags
boolean hasLessTags = false;
for (Meter other : registryMetersByNames.getOrDefault(meterName, emptyList())) {
List<Tag> tags = other.getId().getTags();
List<Tag> meterTagsWithCommonTags = meterTags(name, true);
if (tags.size() < meterTagsWithCommonTags.size()) {
registry.remove(other);
registeredMeters.remove(other);
}
// Check if already exists
else if (tags.size() == meterTagsWithCommonTags.size())
if (tags.containsAll(meterTagsWithCommonTags)) return;
else break;
else hasLessTags = true;
}
// Check if already exists
else if (tags.size() == meterTagsWithCommonTags.size())
if (tags.containsAll(meterTagsWithCommonTags)) return;
else break;
else hasLessTags = true;
}
if (hasLessTags) return;
if (hasLessTags) return;

List<Tag> tags = meterTags(name);
try {
Meter meter = bindMeter(registry, metric.metricName(), meterName, tags);
List<Meter> meters = registryMetersByNames.computeIfAbsent(meterName, k -> new ArrayList<>());
meters.add(meter);
}
catch (Exception ex) {
String message = ex.getMessage();
if (message != null && message.contains("Prometheus requires")) {
warnThenDebugLogger.log("Failed to bind meter: " + meterName + " " + tags
+ ". However, this could happen and might be restored in the next refresh.");
List<Tag> tags = meterTags(name);
try {
Meter meter = bindMeter(registry, metric.metricName(), meterName, tags);
List<Meter> meters = registryMetersByNames.computeIfAbsent(meterName, k -> new ArrayList<>());
meters.add(meter);
}
else {
log.warn("Failed to bind meter: " + meterName + " " + tags + ".", ex);
catch (Exception ex) {
String message = ex.getMessage();
if (message != null && message.contains("Prometheus requires")) {
warnThenDebugLogger.log("Failed to bind meter: " + meterName + " " + tags
+ ". However, this could happen and might be restored in the next refresh.");
}
else {
log.warn("Failed to bind meter: " + meterName + " " + tags + ".", ex);
}
}
}
});
});
}
}
catch (Exception e) {
log.warn("Failed to bind KafkaMetric", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,35 @@ void shouldRemoveOldMetersWithTags() {
assertThat(registry.getMeters()).hasSize(0);
}

@Issue("#2879")
@Test
void removeShouldWorkForNonExistingMeters() {
Map<MetricName, Metric> kafkaMetricMap = new HashMap<>();
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> kafkaMetricMap;
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(0);

MetricName aMetric = createMetricName("a");
kafkaMetricMap.put(aMetric, createKafkaMetric(aMetric));
kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSize(1);

kafkaMetricMap.clear();
registry.forEachMeter(registry::remove);
kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSize(0);
}

@Issue("#2879")
@Test
void checkAndBindMetricsShouldNotFail() {
kafkaMetrics = new KafkaMetrics(() -> { throw new RuntimeException("simulated"); });
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.checkAndBindMetrics(registry);
}

@SuppressWarnings("unchecked")
private MetricName createMetricName(String name) {
return createMetricName(name, EMPTY_MAP);
Expand Down

0 comments on commit 1a0657d

Please sign in to comment.