Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PrometheusHttpServer drops metrics with same name and different type #5078

Merged
merged 5 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package io.opentelemetry.exporter.prometheus;

import static java.util.stream.Collectors.joining;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
Expand All @@ -34,11 +36,14 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;

Expand All @@ -52,6 +57,7 @@ public final class PrometheusHttpServer implements Closeable, MetricReader {

private static final DaemonThreadFactory THREAD_FACTORY =
new DaemonThreadFactory("prometheus-http");
private static final Logger LOGGER = Logger.getLogger(PrometheusHttpServer.class.getName());

private final HttpServer server;
private final ExecutorService executor;
Expand All @@ -77,9 +83,10 @@ public static PrometheusHttpServerBuilder builder() {
} catch (IOException e) {
throw new UncheckedIOException("Could not create Prometheus HTTP server", e);
}
server.createContext("/", new MetricsHandler(() -> getMetricProducer().collectAllMetrics()));
server.createContext(
"/metrics", new MetricsHandler(() -> getMetricProducer().collectAllMetrics()));
MetricsHandler metricsHandler =
new MetricsHandler(() -> getMetricProducer().collectAllMetrics());
server.createContext("/", metricsHandler);
server.createContext("/metrics", metricsHandler);
server.createContext("/-/healthy", HealthHandler.INSTANCE);

executor = Executors.newFixedThreadPool(5, THREAD_FACTORY);
Expand Down Expand Up @@ -159,6 +166,9 @@ InetSocketAddress getAddress() {

private static class MetricsHandler implements HttpHandler {
jack-berg marked this conversation as resolved.
Show resolved Hide resolved

private final Set<String> allConflictHeaderNames =
Collections.newSetFromMap(new ConcurrentHashMap<>());

private final Supplier<Collection<MetricData>> metricsSupplier;

private MetricsHandler(Supplier<Collection<MetricData>> metricsSupplier) {
Expand Down Expand Up @@ -190,7 +200,15 @@ public void handle(HttpExchange exchange) throws IOException {
} else {
out = exchange.getResponseBody();
}
serializer.write(metrics, out);
Set<String> conflictHeaderNames = serializer.write(metrics, out);
conflictHeaderNames.removeAll(allConflictHeaderNames);
if (conflictHeaderNames.size() > 0 && LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(
Level.WARNING,
"Metric conflict(s) detected. Multiple metrics with same name but different type: "
+ conflictHeaderNames.stream().collect(joining(",", "[", "]")));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal is to log each name conflict that is detected once to avoid spammy logs.

allConflictHeaderNames.addAll(conflictHeaderNames);
}
}
exchange.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,25 @@
import java.io.UncheckedIOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/** Serializes metrics into Prometheus exposition formats. */
// Adapted from
// https://github.com/prometheus/client_java/blob/master/simpleclient_common/src/main/java/io/prometheus/client/exporter/common/TextFormat.java
abstract class Serializer {

static Serializer create(@Nullable String acceptHeader, Predicate<String> filter) {
if (acceptHeader == null) {
return new Prometheus004Serializer(filter);
Expand Down Expand Up @@ -100,61 +102,64 @@ abstract void writeExemplar(

abstract void writeEof(Writer writer) throws IOException;

final void write(Collection<MetricData> metrics, OutputStream output) throws IOException {
Map<InstrumentationScopeInfo, List<MetricData>> metricsByScope =
metrics.stream()
// Not supported in specification yet.
.filter(metric -> metric.getType() != MetricDataType.EXPONENTIAL_HISTOGRAM)
// PrometheusHttpServer#getAggregationTemporality specifies cumulative temporality for
// all instruments, but non-SDK MetricProducers may not conform. We drop delta
// temporality metrics to avoid the complexity of stateful transformation to cumulative.
.filter(metric -> !isDeltaTemporality(metric))
.filter(metric -> metricNameFilter.test(metricName(metric)))
.collect(
Collectors.groupingBy(
MetricData::getInstrumentationScopeInfo,
LinkedHashMap::new,
Collectors.toList()));
final Set<String> write(Collection<MetricData> metrics, OutputStream output) throws IOException {
Set<String> conflictMetricNames = new HashSet<>();
Map<String, List<MetricData>> metricsByName = new LinkedHashMap<>();
Set<InstrumentationScopeInfo> scopes = new LinkedHashSet<>();
// Iterate through metrics, filtering and grouping by headerName
for (MetricData metric : metrics) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A for loop is more appropriate because we now need to track the unique scopes we've seen and metric name conflicts, which we return to the caller for logging purposes.

// Not supported in specification yet.
if (metric.getType() == MetricDataType.EXPONENTIAL_HISTOGRAM) {
continue;
}
// PrometheusHttpServer#getAggregationTemporality specifies cumulative temporality for
// all instruments, but non-SDK MetricProducers may not conform. We drop delta
// temporality metrics to avoid the complexity of stateful transformation to cumulative.
if (isDeltaTemporality(metric)) {
continue;
}
PrometheusType prometheusType = PrometheusType.forMetric(metric);
String metricName = metricName(metric.getName(), prometheusType);
// Skip metrics which do not pass metricNameFilter
if (!metricNameFilter.test(metricName)) {
continue;
}
List<MetricData> metricsWithHeaderName =
metricsByName.computeIfAbsent(metricName, unused -> new ArrayList<>());
// Skip metrics with the same name but different type
if (metricsWithHeaderName.size() > 0
&& prometheusType != PrometheusType.forMetric(metricsWithHeaderName.get(0))) {
conflictMetricNames.add(metricName);
continue;
}

metricsWithHeaderName.add(metric);
scopes.add(metric.getInstrumentationScopeInfo());
}

Optional<Resource> optResource = metrics.stream().findFirst().map(MetricData::getResource);
try (Writer writer =
new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8))) {
if (optResource.isPresent()) {
writeResource(optResource.get(), writer);
}
for (Map.Entry<InstrumentationScopeInfo, List<MetricData>> entry :
metricsByScope.entrySet()) {
for (InstrumentationScopeInfo scope : scopes) {
writeScopeInfo(scope, writer);
}
for (Map.Entry<String, List<MetricData>> entry : metricsByName.entrySet()) {
write(entry.getValue(), entry.getKey(), writer);
}
writeEof(writer);
}
return conflictMetricNames;
}

private void write(
List<MetricData> metrics, InstrumentationScopeInfo instrumentationScopeInfo, Writer writer)
throws IOException {
writeScopeInfo(instrumentationScopeInfo, writer);
// Group metrics with the scope, name, but different types. This is a semantic error which the
// SDK warns about but passes through to exporters to handle.
Map<String, List<MetricData>> metricsByName =
metrics.stream()
.collect(
Collectors.groupingBy(
metric ->
headerName(
NameSanitizer.INSTANCE.apply(metric.getName()),
PrometheusType.forMetric(metric)),
LinkedHashMap::new,
Collectors.toList()));

for (Map.Entry<String, List<MetricData>> entry : metricsByName.entrySet()) {
write(entry.getValue(), entry.getKey(), writer);
}
}

private void write(List<MetricData> metrics, String headerName, Writer writer)
private void write(List<MetricData> metrics, String metricName, Writer writer)
throws IOException {
// Write header based on first metric
PrometheusType type = PrometheusType.forMetric(metrics.get(0));
MetricData first = metrics.get(0);
PrometheusType type = PrometheusType.forMetric(first);
String headerName = headerName(NameSanitizer.INSTANCE.apply(first.getName()), type);
String description = metrics.get(0).getDescription();

writer.write("# TYPE ");
Expand All @@ -171,21 +176,19 @@ private void write(List<MetricData> metrics, String headerName, Writer writer)

// Then write the metrics.
for (MetricData metric : metrics) {
write(metric, writer);
write(metric, metricName, writer);
}
}

private void write(MetricData metric, Writer writer) throws IOException {
String name = metricName(metric);

private void write(MetricData metric, String metricName, Writer writer) throws IOException {
for (PointData point : getPoints(metric)) {
switch (metric.getType()) {
case DOUBLE_SUM:
case DOUBLE_GAUGE:
writePoint(
writer,
metric.getInstrumentationScopeInfo(),
name,
metricName,
((DoublePointData) point).getValue(),
point.getAttributes(),
point.getEpochNanos());
Expand All @@ -195,18 +198,18 @@ private void write(MetricData metric, Writer writer) throws IOException {
writePoint(
writer,
metric.getInstrumentationScopeInfo(),
name,
metricName,
(double) ((LongPointData) point).getValue(),
point.getAttributes(),
point.getEpochNanos());
break;
case HISTOGRAM:
writeHistogram(
writer, metric.getInstrumentationScopeInfo(), name, (HistogramPointData) point);
writer, metric.getInstrumentationScopeInfo(), metricName, (HistogramPointData) point);
break;
case SUMMARY:
writeSummary(
writer, metric.getInstrumentationScopeInfo(), name, (SummaryPointData) point);
writer, metric.getInstrumentationScopeInfo(), metricName, (SummaryPointData) point);
break;
case EXPONENTIAL_HISTOGRAM:
throw new IllegalArgumentException("Can't happen");
Expand Down Expand Up @@ -648,9 +651,8 @@ static Collection<? extends PointData> getPoints(MetricData metricData) {
return Collections.emptyList();
}

private static String metricName(MetricData metric) {
PrometheusType type = PrometheusType.forMetric(metric);
String name = NameSanitizer.INSTANCE.apply(metric.getName());
private static String metricName(String rawMetricName, PrometheusType type) {
String name = NameSanitizer.INSTANCE.apply(rawMetricName);
if (type == PrometheusType.COUNTER) {
name = name + "_total";
}
Expand Down
1 change: 1 addition & 0 deletions exporters/prometheus/src/module/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@

requires transitive io.opentelemetry.sdk.metrics;
requires jdk.httpserver;
requires java.logging;
}
Loading