diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java index 044cc2b12..36c0dc520 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java @@ -27,9 +27,7 @@ import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler; import static io.airlift.http.client.Request.Builder.prepareGet; import static io.airlift.json.JsonCodec.jsonCodec; -import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY; -import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT; -import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; +import static io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor.shouldRetry; import static java.util.Objects.requireNonNull; public class ClusterStatsInfoApiMonitor @@ -88,16 +86,4 @@ private TrinoStatus checkStatus(String baseUrl, int retriesRemaining) } return TrinoStatus.UNHEALTHY; } - - public static boolean shouldRetry(int statusCode) - { - switch (statusCode) { - case HTTP_BAD_GATEWAY: - case HTTP_UNAVAILABLE: - case HTTP_GATEWAY_TIMEOUT: - return true; - default: - return false; - } - } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsMetricsMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsMetricsMonitor.java new file mode 100644 index 000000000..4fccd64d4 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsMetricsMonitor.java @@ -0,0 +1,201 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.clustermonitor; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.http.client.HttpClient; +import io.airlift.http.client.HttpUriBuilder; +import io.airlift.http.client.Request; +import io.airlift.http.client.Response; +import io.airlift.http.client.ResponseHandler; +import io.airlift.http.client.UnexpectedResponseException; +import io.airlift.log.Logger; +import io.trino.gateway.ha.config.BackendStateConfiguration; +import io.trino.gateway.ha.config.MonitorConfiguration; +import io.trino.gateway.ha.config.ProxyBackendConfiguration; +import io.trino.gateway.ha.security.util.BasicCredentials; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Map; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom; +import static io.airlift.http.client.Request.Builder.prepareGet; +import static io.airlift.http.client.ResponseHandlerUtils.propagate; +import static io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor.shouldRetry; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +public class ClusterStatsMetricsMonitor + implements ClusterStatsMonitor +{ + public static final String RUNNING_QUERIES_METRIC = "trino_execution_name_QueryManager_RunningQueries"; + public static final String QUEUED_QUERIES_METRIC = "trino_execution_name_QueryManager_QueuedQueries"; + private static final Logger log = Logger.get(ClusterStatsMetricsMonitor.class); + private final HttpClient client; + private final int retries; + private final MetricsResponseHandler metricsResponseHandler; + private final Header identityHeader; + private final String metricsEndpoint; + private final ImmutableSet metricNames; + private final Map metricMinimumValues; + private final Map metricMaximumValues; + + public ClusterStatsMetricsMonitor(HttpClient client, BackendStateConfiguration backendStateConfiguration, MonitorConfiguration monitorConfiguration) + { + this.client = requireNonNull(client, "client is null"); + retries = monitorConfiguration.getRetries(); + if (!isNullOrEmpty(backendStateConfiguration.getPassword())) { + identityHeader = new Header("Authorization", + new BasicCredentials(backendStateConfiguration.getUsername(), backendStateConfiguration.getPassword()).getBasicAuthHeader()); + } + else { + identityHeader = new Header("X-Trino-User", backendStateConfiguration.getUsername()); + } + metricsEndpoint = monitorConfiguration.getMetricsEndpoint(); + metricMinimumValues = monitorConfiguration.getMetricMinimumValues(); + metricMaximumValues = monitorConfiguration.getMetricMaximumValues(); + metricNames = ImmutableSet.builder() + .add(RUNNING_QUERIES_METRIC, QUEUED_QUERIES_METRIC) + .addAll(metricMinimumValues.keySet()) + .addAll(metricMaximumValues.keySet()) + .build(); + metricsResponseHandler = new MetricsResponseHandler(metricNames); + } + + private ClusterStats getUnhealthyStats(ProxyBackendConfiguration backend) + { + return ClusterStats.builder(backend.getName()) + .trinoStatus(TrinoStatus.UNHEALTHY) + .proxyTo(backend.getProxyTo()) + .externalUrl(backend.getExternalUrl()) + .routingGroup(backend.getRoutingGroup()) + .build(); + } + + @Override + public ClusterStats monitor(ProxyBackendConfiguration backend) + { + Map metrics = getMetrics(backend.getProxyTo(), retries); + if (metrics.isEmpty()) { + log.error(String.format("No metrics available for %s!", backend.getName())); + return getUnhealthyStats(backend); + } + + for (Map.Entry entry : metricMinimumValues.entrySet()) { + if (!metrics.containsKey(entry.getKey()) + || Float.parseFloat(metrics.get(entry.getKey())) < entry.getValue()) { + log.warn(String.format("Health metric value below min for cluster %s: %s=%s", backend.getName(), entry.getKey(), metrics.get(entry.getKey()))); + return getUnhealthyStats(backend); + } + } + + for (Map.Entry entry : metricMaximumValues.entrySet()) { + if (!metrics.containsKey(entry.getKey()) + || Float.parseFloat(metrics.get(entry.getKey())) > entry.getValue()) { + log.warn(String.format("Health metric value over max for cluster %s: %s=%s", backend.getName(), entry.getKey(), metrics.get(entry.getKey()))); + return getUnhealthyStats(backend); + } + } + return ClusterStats.builder(backend.getName()) + .trinoStatus(TrinoStatus.HEALTHY) + .runningQueryCount((int) Float.parseFloat(metrics.get(RUNNING_QUERIES_METRIC))) + .queuedQueryCount((int) Float.parseFloat(metrics.get(QUEUED_QUERIES_METRIC))) + .proxyTo(backend.getProxyTo()) + .externalUrl(backend.getExternalUrl()) + .routingGroup(backend.getRoutingGroup()) + .build(); + } + + private Map getMetrics(String baseUrl, int retriesRemaining) + { + HttpUriBuilder uri = uriBuilderFrom(URI.create(baseUrl)).appendPath(metricsEndpoint); + for (String metric : metricNames) { + uri.addParameter("name[]", metric); + } + + Request request = prepareGet() + .setUri(uri.build()) + .addHeader(identityHeader.name, identityHeader.value) + .addHeader("Content-Type", "application/openmetrics-text; version=1.0.0; charset=utf-8") + .build(); + try { + return client.execute(request, metricsResponseHandler); + } + catch (UnexpectedResponseException e) { + if (shouldRetry(e.getStatusCode())) { + if (retriesRemaining > 0) { + log.warn("Retrying health check on error: %s, ", e.toString()); + return getMetrics(baseUrl, retriesRemaining - 1); + } + else { + log.error("Encountered error %s, no retries remaining", e.toString()); + } + } + else { + log.error(e, "Health check failed with non-retryable response. %s\n%s", e.getMessage(), e.toString()); + } + } + catch (Exception e) { + log.error(e, "Exception checking %s for health", request.getUri()); + } + return ImmutableMap.of(); + } + + private static class MetricsResponseHandler + implements ResponseHandler, RuntimeException> + { + private final ImmutableSet requiredKeys; + + public MetricsResponseHandler(ImmutableSet requiredKeys) + { + this.requiredKeys = requireNonNull(requiredKeys); + } + + @Override + public Map handleException(Request request, Exception exception) + throws RuntimeException + { + throw propagate(request, exception); + } + + @Override + public Map handle(Request request, Response response) + throws RuntimeException + { + try { + String responseBody = new String(response.getInputStream().readAllBytes(), UTF_8); + Map metrics = Arrays.stream(responseBody.split("\n")) + .filter(s -> !s.startsWith("#")) + .collect(toImmutableMap(s -> s.split(" ")[0], s -> s.split(" ")[1])); + if (!metrics.keySet().containsAll(requiredKeys)) { + throw new UnexpectedResponseException( + String.format("Request is missing required keys: \n%s\nin response: '%s'", String.join("\n", requiredKeys), responseBody), + request, + response); + } + return metrics; + } + catch (IOException e) { + throw new UnexpectedResponseException(request, response); + } + } + } + + private record Header(String name, String value) {} +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsMonitor.java index 42c412105..2b1477936 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsMonitor.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsMonitor.java @@ -15,6 +15,10 @@ import io.trino.gateway.ha.config.ProxyBackendConfiguration; +import static java.net.HttpURLConnection.HTTP_BAD_GATEWAY; +import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT; +import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; + public interface ClusterStatsMonitor { ClusterStats monitor(ProxyBackendConfiguration backend); @@ -27,4 +31,12 @@ static ClusterStats.Builder getClusterStatsBuilder(ProxyBackendConfiguration bac builder.routingGroup(backend.getRoutingGroup()); return builder; } + + static boolean shouldRetry(int statusCode) + { + return switch (statusCode) { + case HTTP_BAD_GATEWAY, HTTP_UNAVAILABLE, HTTP_GATEWAY_TIMEOUT -> true; + default -> false; + }; + } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java index ad7835bd2..35ea334f7 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ClusterStatsMonitorType.java @@ -18,5 +18,6 @@ public enum ClusterStatsMonitorType NOOP, INFO_API, UI_API, - JDBC + JDBC, + METRICS } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/MonitorConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/MonitorConfiguration.java index 6ae1e9763..8037e26a4 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/MonitorConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/MonitorConfiguration.java @@ -13,14 +13,24 @@ */ package io.trino.gateway.ha.config; +import com.google.common.collect.ImmutableMap; import io.trino.gateway.ha.clustermonitor.ActiveClusterMonitor; +import java.util.Map; + public class MonitorConfiguration { private int taskDelaySeconds = ActiveClusterMonitor.MONITOR_TASK_DELAY_SECONDS; private int retries; + private String metricsEndpoint = "/metrics"; + + // Require 1 node for health by default. This configuration only applies to the ClusterStatsMetricsMonitor + private Map metricMinimumValues = ImmutableMap.of("trino_metadata_name_DiscoveryNodeManager_ActiveNodeCount", 1f); + + private Map metricMaximumValues = ImmutableMap.of(); + public MonitorConfiguration() {} public int getTaskDelaySeconds() @@ -42,4 +52,34 @@ public void setRetries(int retries) { this.retries = retries; } + + public String getMetricsEndpoint() + { + return metricsEndpoint; + } + + public void setMetricsEndpoint(String metricsEndpoint) + { + this.metricsEndpoint = metricsEndpoint; + } + + public Map getMetricMinimumValues() + { + return metricMinimumValues; + } + + public void setMetricMinimumValues(Map metricMinimumValues) + { + this.metricMinimumValues = metricMinimumValues; + } + + public Map getMetricMaximumValues() + { + return metricMaximumValues; + } + + public void setMetricMaximumValues(Map metricMaximumValues) + { + this.metricMaximumValues = metricMaximumValues; + } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStatsMonitorModule.java b/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStatsMonitorModule.java index b92ce01e9..63fb16eff 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStatsMonitorModule.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/module/ClusterStatsMonitorModule.java @@ -20,6 +20,7 @@ import io.trino.gateway.ha.clustermonitor.ClusterStatsHttpMonitor; import io.trino.gateway.ha.clustermonitor.ClusterStatsInfoApiMonitor; import io.trino.gateway.ha.clustermonitor.ClusterStatsJdbcMonitor; +import io.trino.gateway.ha.clustermonitor.ClusterStatsMetricsMonitor; import io.trino.gateway.ha.clustermonitor.ClusterStatsMonitor; import io.trino.gateway.ha.clustermonitor.ForMonitor; import io.trino.gateway.ha.clustermonitor.NoopClusterStatsMonitor; @@ -51,6 +52,7 @@ public ClusterStatsMonitor getClusterStatsMonitor(@ForMonitor HttpClient httpCli case UI_API -> new ClusterStatsHttpMonitor(config.getBackendState()); case JDBC -> new ClusterStatsJdbcMonitor(config.getBackendState(), config.getMonitor()); case NOOP -> new NoopClusterStatsMonitor(); + case METRICS -> new ClusterStatsMetricsMonitor(httpClient, config.getBackendState(), config.getMonitor()); }; } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/security/util/BasicCredentials.java b/gateway-ha/src/main/java/io/trino/gateway/ha/security/util/BasicCredentials.java index 64005a67f..eb569e8eb 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/security/util/BasicCredentials.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/security/util/BasicCredentials.java @@ -44,6 +44,11 @@ public static BasicCredentials extractBasicAuthCredentials(ContainerRequestConte return extractBasicAuthCredentials(header); } + public String getBasicAuthHeader() + { + return String.format("Basic %s", encodeCredentials()); + } + public static BasicCredentials extractBasicAuthCredentials(String header) throws AuthenticationException { @@ -78,4 +83,9 @@ private static String decodeCredentials(String credentials) throw new AuthenticationException("Invalid base64 encoded credentials"); } } + + private String encodeCredentials() + { + return Base64.getEncoder().encodeToString(String.format("%s:%s", username, password).getBytes(ISO_8859_1)); + } } diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java b/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java index 3a9ab15be..a869533d1 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterStatsMonitor.java @@ -13,6 +13,7 @@ */ package io.trino.gateway.ha.clustermonitor; +import com.google.common.collect.ImmutableMap; import io.airlift.http.client.HttpClientConfig; import io.airlift.http.client.jetty.JettyHttpClient; import io.trino.gateway.ha.config.BackendStateConfiguration; @@ -24,6 +25,7 @@ import org.junit.jupiter.api.TestInstance; import org.testcontainers.containers.TrinoContainer; +import java.util.Map; import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; @@ -70,6 +72,15 @@ void testInfoApiMonitor() testClusterStatsMonitor(ignored -> new ClusterStatsInfoApiMonitor(new JettyHttpClient(new HttpClientConfig()), monitorConfigurationWithRetries)); } + @Test + void testMetricsMonitor() + { + testClusterStatsMonitor(backendStateConfiguration -> new ClusterStatsMetricsMonitor( + new JettyHttpClient(new HttpClientConfig()), + backendStateConfiguration, + new MonitorConfiguration())); + } + private void testClusterStatsMonitor(Function monitorFactory) { BackendStateConfiguration backendStateConfiguration = new BackendStateConfiguration(); @@ -84,4 +95,40 @@ private void testClusterStatsMonitor(Function metricMinimumsFail = ImmutableMap.of("trino_metadata_name_DiscoveryNodeManager_ActiveNodeCount", 100f); + testMetricsWithRange(metricMinimumsFail, ImmutableMap.of(), TrinoStatus.UNHEALTHY); + + Map metricMaximumsFail = ImmutableMap.of("trino_metadata_name_DiscoveryNodeManager_ActiveNodeCount", 0.5f); + testMetricsWithRange(ImmutableMap.of(), metricMaximumsFail, TrinoStatus.UNHEALTHY); + + Map metricMinimumsPass = ImmutableMap.of("trino_metadata_name_DiscoveryNodeManager_ActiveNodeCount", 0.5f); + testMetricsWithRange(metricMinimumsPass, ImmutableMap.of(), TrinoStatus.HEALTHY); + + Map metricMaximumsPass = ImmutableMap.of("trino_metadata_name_DiscoveryNodeManager_ActiveNodeCount", 100f); + testMetricsWithRange(ImmutableMap.of(), metricMaximumsPass, TrinoStatus.HEALTHY); + } + + private void testMetricsWithRange(Map metricMinimums, Map metricMaximums, TrinoStatus expected) + { + BackendStateConfiguration backendStateConfiguration = new BackendStateConfiguration(); + backendStateConfiguration.setUsername("test_user"); + + ProxyBackendConfiguration proxyBackend = new ProxyBackendConfiguration(); + proxyBackend.setProxyTo("http://localhost:" + trino.getMappedPort(8080)); + proxyBackend.setName("test_cluster"); + + MonitorConfiguration monitorConfiguration = new MonitorConfiguration(); + monitorConfiguration.setMetricMinimumValues(metricMinimums); + monitorConfiguration.setMetricMaximumValues(metricMaximums); + + ClusterStatsMonitor monitor = new ClusterStatsMetricsMonitor(new JettyHttpClient(new HttpClientConfig()), backendStateConfiguration, monitorConfiguration); + ClusterStats stats = monitor.monitor(proxyBackend); + assertThat(stats.clusterId()).isEqualTo("test_cluster"); + assertThat(stats.trinoStatus()).isEqualTo(expected); + } }