From 4b6465fbc13c36ac07a2ab7928cd25ab146551d8 Mon Sep 17 00:00:00 2001 From: Star Poon Date: Thu, 24 Oct 2024 12:05:08 +0900 Subject: [PATCH] Refactor ActiveClusterMonitor --- .../clustermonitor/ActiveClusterMonitor.java | 89 +++++++++---------- 1 file changed, 40 insertions(+), 49 deletions(-) diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ActiveClusterMonitor.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ActiveClusterMonitor.java index 369104650..779cdadab 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ActiveClusterMonitor.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ActiveClusterMonitor.java @@ -26,8 +26,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static java.util.Objects.requireNonNull; + public class ActiveClusterMonitor { public static final int MONITOR_TASK_DELAY_SECONDS = 60; @@ -39,9 +42,8 @@ public class ActiveClusterMonitor private final int taskDelaySeconds; private final ClusterStatsMonitor clusterStatsMonitor; - private volatile boolean monitorActive = true; private final ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE); - private final ExecutorService singleTaskExecutor = Executors.newSingleThreadExecutor(); + private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); @Inject public ActiveClusterMonitor( @@ -50,65 +52,54 @@ public ActiveClusterMonitor( MonitorConfiguration monitorConfiguration, ClusterStatsMonitor clusterStatsMonitor) { - this.clusterStatsObservers = clusterStatsObservers; - this.gatewayBackendManager = gatewayBackendManager; + this.clusterStatsMonitor = requireNonNull(clusterStatsMonitor, "clusterStatsMonitor is null"); + this.clusterStatsObservers = requireNonNull(clusterStatsObservers, "clusterStatsObservers is null"); + this.gatewayBackendManager = requireNonNull(gatewayBackendManager, "gatewayBackendManager is null"); this.taskDelaySeconds = monitorConfiguration.getTaskDelaySeconds(); - this.clusterStatsMonitor = clusterStatsMonitor; - log.info("Running cluster monitor with connection task delay of %d seconds", taskDelaySeconds); } - /** - * Run an app that queries all active trino clusters for stats. - */ @PostConstruct public void start() { - singleTaskExecutor.submit( - () -> { - while (monitorActive) { - try { - log.info("Getting the stats for the active clusters"); - List activeClusters = - gatewayBackendManager.getAllActiveBackends(); - List> futures = new ArrayList<>(); - for (ProxyBackendConfiguration backend : activeClusters) { - Future call = - executorService.submit(() -> clusterStatsMonitor.monitor(backend)); - futures.add(call); - } - List stats = new ArrayList<>(); - for (Future clusterStatsFuture : futures) { - ClusterStats clusterStats = clusterStatsFuture.get(); - stats.add(clusterStats); - } + log.info("Running cluster monitor with connection task delay of %d seconds", taskDelaySeconds); + scheduledExecutor.scheduleAtFixedRate(() -> { + try { + log.info("Getting stats for all active clusters"); + List activeClusters = + gatewayBackendManager.getAllActiveBackends(); + List> futures = new ArrayList<>(); + for (ProxyBackendConfiguration backend : activeClusters) { + Future call = executorService.submit(() -> clusterStatsMonitor.monitor(backend)); + futures.add(call); + } + List stats = new ArrayList<>(); + for (Future clusterStatsFuture : futures) { + ClusterStats clusterStats = clusterStatsFuture.get(); + stats.add(clusterStats); + } - if (clusterStatsObservers != null) { - for (TrinoClusterStatsObserver observer : clusterStatsObservers) { - observer.observe(stats); - } - } - } - catch (Exception e) { - log.error(e, "Error performing backend monitor tasks"); - } - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(taskDelaySeconds)); - } - catch (Exception e) { - log.error(e, "Error with monitor task"); - } + if (clusterStatsObservers != null) { + for (TrinoClusterStatsObserver observer : clusterStatsObservers) { + observer.observe(stats); } - }); + } + } + catch (Exception e) { + log.error(e, "Error performing backend monitor tasks"); + } + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(taskDelaySeconds)); + } + catch (Exception e) { + log.error(e, "Error with monitor task"); + } + }, 0, taskDelaySeconds, TimeUnit.SECONDS); } - /** - * Shut down the app. - */ @PreDestroy public void stop() { - this.monitorActive = false; - this.executorService.shutdown(); - this.singleTaskExecutor.shutdown(); + executorService.shutdownNow(); + scheduledExecutor.shutdownNow(); } }