diff --git a/gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java b/gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java index 250e7bc5d..aebdf4a67 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java +++ b/gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java @@ -13,7 +13,6 @@ */ package io.trino.gateway.baseapp; -import com.google.common.collect.MoreCollectors; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Scopes; @@ -44,6 +43,7 @@ import java.util.List; import java.util.Optional; +import static com.google.common.collect.MoreCollectors.toOptional; import static io.airlift.http.client.HttpClientBinder.httpClientBinder; import static io.airlift.jaxrs.JaxrsBinder.jaxrsBinder; import static java.lang.String.format; @@ -54,11 +54,11 @@ public class BaseApp implements Module { private static final Logger logger = Logger.get(BaseApp.class); - private final HaGatewayConfiguration haGatewayConfiguration; + private final HaGatewayConfiguration configuration; - public BaseApp(HaGatewayConfiguration haGatewayConfiguration) + public BaseApp(HaGatewayConfiguration configuration) { - this.haGatewayConfiguration = requireNonNull(haGatewayConfiguration); + this.configuration = requireNonNull(configuration, "configuration is null"); } private static Module newModule(String clazz, HaGatewayConfiguration configuration) @@ -89,7 +89,7 @@ private static void validateModules(List modules, HaGatewayConfiguration { Optional routerProvider = modules.stream() .filter(module -> module instanceof RouterBaseModule) - .collect(MoreCollectors.toOptional()); + .collect(toOptional()); if (routerProvider.isEmpty()) { logger.warn("Router provider doesn't exist in the config, using the StochasticRoutingManagerProvider"); String clazz = StochasticRoutingManagerProvider.class.getCanonicalName(); @@ -116,12 +116,12 @@ public static List addModules(HaGatewayConfiguration configuration) @Override public void configure(Binder binder) { - binder.bind(HaGatewayConfiguration.class).toInstance(haGatewayConfiguration); + binder.bind(HaGatewayConfiguration.class).toInstance(configuration); registerAuthFilters(binder); registerResources(binder); registerProxyResources(binder); jaxrsBinder(binder).bind(RoutingTargetHandler.class); - addManagedApps(this.haGatewayConfiguration, binder); + addManagedApps(configuration, binder); jaxrsBinder(binder).bind(AuthorizedExceptionMapper.class); binder.bind(ProxyHandlerStats.class).in(Scopes.SINGLETON); newExporter(binder).export(ProxyHandlerStats.class).withGeneratedName(); @@ -136,7 +136,7 @@ private static void addManagedApps(HaGatewayConfiguration configuration, Binder configuration.getManagedApps().forEach( clazz -> { try { - Class c = Class.forName(clazz); + Class c = Class.forName(clazz); binder.bind(c).in(Scopes.SINGLETON); } catch (Exception e) { diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ActiveClusterMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ActiveClusterMonitor.java index 369104650..779cdadab 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ActiveClusterMonitor.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ActiveClusterMonitor.java @@ -26,8 +26,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static java.util.Objects.requireNonNull; + public class ActiveClusterMonitor { public static final int MONITOR_TASK_DELAY_SECONDS = 60; @@ -39,9 +42,8 @@ public class ActiveClusterMonitor private final int taskDelaySeconds; private final ClusterStatsMonitor clusterStatsMonitor; - private volatile boolean monitorActive = true; private final ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE); - private final ExecutorService singleTaskExecutor = Executors.newSingleThreadExecutor(); + private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); @Inject public ActiveClusterMonitor( @@ -50,65 +52,54 @@ public ActiveClusterMonitor( MonitorConfiguration monitorConfiguration, ClusterStatsMonitor clusterStatsMonitor) { - this.clusterStatsObservers = clusterStatsObservers; - this.gatewayBackendManager = gatewayBackendManager; + this.clusterStatsMonitor = requireNonNull(clusterStatsMonitor, "clusterStatsMonitor is null"); + this.clusterStatsObservers = requireNonNull(clusterStatsObservers, "clusterStatsObservers is null"); + this.gatewayBackendManager = requireNonNull(gatewayBackendManager, "gatewayBackendManager is null"); this.taskDelaySeconds = monitorConfiguration.getTaskDelaySeconds(); - this.clusterStatsMonitor = clusterStatsMonitor; - log.info("Running cluster monitor with connection task delay of %d seconds", taskDelaySeconds); } - /** - * Run an app that queries all active trino clusters for stats. - */ @PostConstruct public void start() { - singleTaskExecutor.submit( - () -> { - while (monitorActive) { - try { - log.info("Getting the stats for the active clusters"); - List activeClusters = - gatewayBackendManager.getAllActiveBackends(); - List> futures = new ArrayList<>(); - for (ProxyBackendConfiguration backend : activeClusters) { - Future call = - executorService.submit(() -> clusterStatsMonitor.monitor(backend)); - futures.add(call); - } - List stats = new ArrayList<>(); - for (Future clusterStatsFuture : futures) { - ClusterStats clusterStats = clusterStatsFuture.get(); - stats.add(clusterStats); - } + log.info("Running cluster monitor with connection task delay of %d seconds", taskDelaySeconds); + scheduledExecutor.scheduleAtFixedRate(() -> { + try { + log.info("Getting stats for all active clusters"); + List activeClusters = + gatewayBackendManager.getAllActiveBackends(); + List> futures = new ArrayList<>(); + for (ProxyBackendConfiguration backend : activeClusters) { + Future call = executorService.submit(() -> clusterStatsMonitor.monitor(backend)); + futures.add(call); + } + List stats = new ArrayList<>(); + for (Future clusterStatsFuture : futures) { + ClusterStats clusterStats = clusterStatsFuture.get(); + stats.add(clusterStats); + } - if (clusterStatsObservers != null) { - for (TrinoClusterStatsObserver observer : clusterStatsObservers) { - observer.observe(stats); - } - } - } - catch (Exception e) { - log.error(e, "Error performing backend monitor tasks"); - } - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(taskDelaySeconds)); - } - catch (Exception e) { - log.error(e, "Error with monitor task"); - } + if (clusterStatsObservers != null) { + for (TrinoClusterStatsObserver observer : clusterStatsObservers) { + observer.observe(stats); } - }); + } + } + catch (Exception e) { + log.error(e, "Error performing backend monitor tasks"); + } + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(taskDelaySeconds)); + } + catch (Exception e) { + log.error(e, "Error with monitor task"); + } + }, 0, taskDelaySeconds, TimeUnit.SECONDS); } - /** - * Shut down the app. - */ @PreDestroy public void stop() { - this.monitorActive = false; - this.executorService.shutdown(); - this.singleTaskExecutor.shutdown(); + executorService.shutdownNow(); + scheduledExecutor.shutdownNow(); } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsHttpMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsHttpMonitor.java index a948bc033..371eb3f83 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsHttpMonitor.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsHttpMonitor.java @@ -15,7 +15,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Strings; import io.airlift.http.client.HttpStatus; import io.airlift.log.Logger; import io.trino.gateway.ha.config.BackendStateConfiguration; @@ -33,15 +32,18 @@ import java.util.List; import java.util.Map; +import static com.google.common.base.Strings.isNullOrEmpty; import static io.airlift.http.client.HttpStatus.fromStatusCode; import static io.trino.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_QUEUED_LIST_PATH; import static io.trino.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_STATS_PATH; import static io.trino.gateway.ha.handler.QueryIdCachingProxyHandler.UI_LOGIN_PATH; +import static java.util.Objects.requireNonNull; public class ClusterStatsHttpMonitor implements ClusterStatsMonitor { private static final Logger log = Logger.get(ClusterStatsHttpMonitor.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String SESSION_USER = "sessionUser"; private final String username; @@ -59,14 +61,13 @@ public ClusterStats monitor(ProxyBackendConfiguration backend) ClusterStats.Builder clusterStats = ClusterStatsMonitor.getClusterStatsBuilder(backend); // Fetch Cluster level Stats. String response = queryCluster(backend, UI_API_STATS_PATH); - if (Strings.isNullOrEmpty(response)) { + if (isNullOrEmpty(response)) { log.error("Received null/empty response for %s", UI_API_STATS_PATH); return clusterStats.build(); } try { - HashMap result = new ObjectMapper().readValue(response, HashMap.class); - + HashMap result = OBJECT_MAPPER.readValue(response, new TypeReference<>() {}); int activeWorkers = (int) result.get("activeWorkers"); clusterStats .numWorkerNodes(activeWorkers) @@ -84,18 +85,14 @@ public ClusterStats monitor(ProxyBackendConfiguration backend) // Fetch User Level Stats. Map clusterUserStats = new HashMap<>(); response = queryCluster(backend, UI_API_QUEUED_LIST_PATH); - if (Strings.isNullOrEmpty(response)) { + if (isNullOrEmpty(response)) { log.error("Received null/empty response for %s", UI_API_QUEUED_LIST_PATH); return clusterStats.build(); } try { - List> queries = new ObjectMapper().readValue(response, - new TypeReference>>() - { - }); - - for (Map q : queries) { - String user = (String) q.get(SESSION_USER); + List> queries = OBJECT_MAPPER.readValue(response, new TypeReference<>() {}); + for (Map query : queries) { + String user = (String) query.get(SESSION_USER); clusterUserStats.put(user, clusterUserStats.getOrDefault(user, 0) + 1); } } @@ -148,19 +145,15 @@ private String queryCluster(ProxyBackendConfiguration backend, String path) Call call = client.newCall(request); try (Response res = call.execute()) { - switch (fromStatusCode(res.code())) { - case HttpStatus.OK: - return res.body().string(); - case HttpStatus.UNAUTHORIZED: + return switch (fromStatusCode(res.code())) { + case HttpStatus.OK -> requireNonNull(res.body(), "body is null").string(); + case HttpStatus.UNAUTHORIZED -> { log.info("Unauthorized to fetch cluster stats"); - log.debug("username: %s, targetUrl: %s, cookieStore: %s", - username, - targetUrl, - client.cookieJar().loadForRequest(HttpUrl.parse(targetUrl))); - return null; - default: - return null; - } + log.debug("username: %s, targetUrl: %s, cookieStore: %s", username, targetUrl, client.cookieJar().loadForRequest(HttpUrl.parse(targetUrl))); + yield null; + } + default -> null; + }; } catch (IOException e) { log.warn(e, "Failed to fetch cluster stats"); diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java index 7326c444f..044cc2b12 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsInfoApiMonitor.java @@ -67,7 +67,7 @@ private TrinoStatus checkStatus(String baseUrl, int retriesRemaining) .build(); try { ServerInfo serverInfo = client.execute(request, SERVER_INFO_JSON_RESPONSE_HANDLER); - return serverInfo.isStarting() ? TrinoStatus.PENDING : TrinoStatus.HEALTHY; + return serverInfo.starting() ? TrinoStatus.PENDING : TrinoStatus.HEALTHY; } catch (UnexpectedResponseException e) { if (shouldRetry(e.getStatusCode())) { diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJdbcMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJdbcMonitor.java index af4bffcea..6f8246d9b 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJdbcMonitor.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterStatsJdbcMonitor.java @@ -73,12 +73,12 @@ public ClusterStats monitor(ProxyBackendConfiguration backend) return clusterStats.build(); // TODO Invalid configuration should fail } - try (Connection conn = DriverManager.getConnection(jdbcUrl, properties)) { - PreparedStatement stmt = SimpleTimeLimiter.create(Executors.newSingleThreadExecutor()).callWithTimeout( - () -> conn.prepareStatement(STATE_QUERY), 10, TimeUnit.SECONDS); - stmt.setString(1, (String) properties.get("user")); + try (Connection conn = DriverManager.getConnection(jdbcUrl, properties); + PreparedStatement statement = SimpleTimeLimiter.create(Executors.newSingleThreadExecutor()).callWithTimeout( + () -> conn.prepareStatement(STATE_QUERY), 10, TimeUnit.SECONDS)) { + statement.setString(1, (String) properties.get("user")); Map partialState = new HashMap<>(); - ResultSet rs = stmt.executeQuery(); + ResultSet rs = statement.executeQuery(); while (rs.next()) { partialState.put(rs.getString("state"), rs.getInt("count")); } @@ -91,10 +91,10 @@ public ClusterStats monitor(ProxyBackendConfiguration backend) .build(); } catch (TimeoutException e) { - log.error(e, "timed out fetching status for %s backend", url); + log.error(e, "Timed out fetching status for %s backend", url); } catch (Exception e) { - log.error(e, "could not fetch status for %s backend", url); + log.error(e, "Could not fetch status for %s backend", url); } return clusterStats.build(); } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthChecker.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthChecker.java deleted file mode 100644 index cfd0d19ad..000000000 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthChecker.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.gateway.ha.clustermonitor; - -import io.trino.gateway.ha.notifier.Notifier; - -import java.util.List; - -import static java.lang.String.format; - -public class HealthChecker - implements TrinoClusterStatsObserver -{ - private static final int MAX_THRESHOLD_QUEUED_QUERY_COUNT = 100; - private final Notifier notifier; - - public HealthChecker(Notifier notifier) - { - this.notifier = notifier; - } - - @Override - public void observe(List clustersStats) - { - for (ClusterStats clusterStats : clustersStats) { - if (clusterStats.trinoStatus() == TrinoStatus.UNHEALTHY) { - notifyUnhealthyCluster(clusterStats); - } - else { - if (clusterStats.queuedQueryCount() > MAX_THRESHOLD_QUEUED_QUERY_COUNT) { - notifyForTooManyQueuedQueries(clusterStats); - } - if (clusterStats.numWorkerNodes() < 1) { - notifyForNoWorkers(clusterStats); - } - } - } - } - - private void notifyUnhealthyCluster(ClusterStats clusterStats) - { - notifier.sendNotification(format("%s - Cluster unhealthy", clusterStats.clusterId()), clusterStats.toString()); - } - - private void notifyForTooManyQueuedQueries(ClusterStats clusterStats) - { - notifier.sendNotification(format("%s - Too many queued queries", clusterStats.clusterId()), clusterStats.toString()); - } - - private void notifyForNoWorkers(ClusterStats clusterStats) - { - notifier.sendNotification(format("%s - Number of workers", clusterStats.clusterId()), clusterStats.toString()); - } -} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ServerInfo.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ServerInfo.java index 9ace5fa0c..a0cf94cb0 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ServerInfo.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ServerInfo.java @@ -13,27 +13,8 @@ */ package io.trino.gateway.ha.clustermonitor; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import static java.util.Objects.requireNonNull; - // based on https://github.com/trinodb/trino/blob/439/client/trino-client/src/main/java/io/trino/client/ServerInfo.java // without unused fields -public class ServerInfo +public record ServerInfo(boolean starting) { - private final Boolean starting; - - @JsonCreator - public ServerInfo( - @JsonProperty("starting") Boolean starting) - { - this.starting = requireNonNull(starting, "starting is null"); - } - - @JsonProperty - public boolean isStarting() - { - return starting; - } }