Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure the router policy via router provider module #98

Merged
merged 1 commit into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/operation.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,21 @@ To gracefully shutdown a single worker process, see
[this](https://trino.io/docs/current/admin/graceful-shutdown.html) for the
operations.

## Query routing options
- The default router selects the backend randomly to route the queries.
- If you want to route the queries to the least loaded backend for a user
i.e. backend with the least number of queries running or queued from a particular user,
then use `QueryCountBasedRouter`, it can be configured by adding the module name
to config file's modules section like below
```
modules:
- io.trino.gateway.ha.module.HaGatewayProviderModule
- io.trino.gateway.ha.module.ClusterStateListenerModule
- io.trino.gateway.ha.module.ClusterStatsMonitorModule
- io.trino.gateway.ha.module.QueryCountBasedRouterProvider
```
- The router works on the stats it receives from the clusters about the load i.e number queries queued and running on a cluster at regular intervals which can be configured like below. The default interval is 1 min
```
monitor:
taskDelaySeconds: 10
```
4 changes: 4 additions & 0 deletions gateway-ha/gateway-ha-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ server:
port: 8091
useForwardedHeaders: true

# This can be adjusted based on the coordinator state
monitor:
taskDelaySeconds: 10

modules:
- io.trino.gateway.ha.module.HaGatewayProviderModule
- io.trino.gateway.ha.module.ClusterStateListenerModule
Expand Down
48 changes: 37 additions & 11 deletions gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.gateway.baseapp;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.MoreCollectors;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
Expand All @@ -27,6 +28,8 @@
import io.dropwizard.lifecycle.Managed;
import io.dropwizard.lifecycle.setup.LifecycleEnvironment;
import io.trino.gateway.ha.log.GatewayRequestLogFactory;
import io.trino.gateway.ha.module.RouterBaseModule;
import io.trino.gateway.ha.module.StochasticRoutingManagerProvider;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.ext.Provider;
import org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature;
Expand All @@ -39,6 +42,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
Expand All @@ -61,6 +65,35 @@ public abstract class BaseApp<T extends AppConfiguration>
private final Reflections reflections;
private final ImmutableList.Builder<Module> appModules = ImmutableList.builder();

private AppModule newModule(String clazz, T configuration, Environment environment)
{
try {
logger.info("Trying to load module [%s]", clazz);
Object module =
Class.forName(clazz)
.getConstructor(configuration.getClass(), Environment.class)
.newInstance(configuration, environment);
return ((AppModule) module);
}
catch (Exception e) {
logger.error(e, "Could not instantiate module [%s]", clazz);
onFatalError(e);
}
return null;
}

private void validateModules(List<AppModule> modules, T configuration, Environment environment)
{
Optional<AppModule> routerProvider = modules.stream()
.filter(module -> module instanceof RouterBaseModule)
.collect(MoreCollectors.toOptional());
if (routerProvider.isEmpty()) {
logger.warn("Router provider doesn't exist in the config, using the StochasticRoutingManagerProvider");
String clazz = StochasticRoutingManagerProvider.class.getCanonicalName();
modules.add(newModule(clazz, configuration, environment));
}
}

protected BaseApp(String... basePackages)
{
final ConfigurationBuilder confBuilder = new ConfigurationBuilder();
Expand Down Expand Up @@ -135,18 +168,11 @@ protected List<AppModule> addModules(T configuration, Environment environment)
return modules;
}
for (String clazz : configuration.getModules()) {
try {
logger.info("Trying to load module [%s]", clazz);
Object ob =
Class.forName(clazz)
.getConstructor(configuration.getClass(), Environment.class)
.newInstance(configuration, environment);
modules.add((AppModule) ob);
}
catch (Exception e) {
logger.error(e, "Could not instantiate module [%s]", clazz);
}
modules.add(newModule(clazz, configuration, environment));
}

validateModules(modules, configuration, environment);

return modules;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@
public class ActiveClusterMonitor
implements Managed
{
public static final int MONITOR_TASK_DELAY_MIN = 1;
public static final int MONITOR_TASK_DELAY_SECONDS = 60;
public static final int DEFAULT_THREAD_POOL_SIZE = 20;
private static final Logger log = Logger.get(ActiveClusterMonitor.class);

private final List<TrinoClusterStatsObserver> clusterStatsObservers;
private final GatewayBackendManager gatewayBackendManager;
private final int taskDelayMin;

private final int taskDelaySeconds;
private final ClusterStatsMonitor clusterStatsMonitor;
private volatile boolean monitorActive = true;
private final ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
Expand All @@ -51,9 +52,9 @@ public ActiveClusterMonitor(
{
this.clusterStatsObservers = clusterStatsObservers;
this.gatewayBackendManager = gatewayBackendManager;
this.taskDelayMin = monitorConfiguration.getTaskDelayMin();
this.taskDelaySeconds = monitorConfiguration.getTaskDelaySeconds();
this.clusterStatsMonitor = clusterStatsMonitor;
log.info("Running cluster monitor with task delay of %d", taskDelayMin);
log.info("Running cluster monitor with connection task delay of %d seconds", taskDelaySeconds);
}

/**
Expand All @@ -66,6 +67,7 @@ public void start()
() -> {
while (monitorActive) {
try {
log.info("Getting the stats for the active clusters");
List<ProxyBackendConfiguration> activeClusters =
gatewayBackendManager.getAllActiveBackends();
List<Future<ClusterStats>> futures = new ArrayList<>();
Expand All @@ -90,7 +92,7 @@ public void start()
log.error(e, "Error performing backend monitor tasks");
}
try {
Thread.sleep(TimeUnit.MINUTES.toMillis(taskDelayMin));
Thread.sleep(TimeUnit.SECONDS.toMillis(taskDelaySeconds));
}
catch (Exception e) {
log.error(e, "Error with monitor task");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ public ClusterStatsHttpMonitor(BackendStateConfiguration backendStateConfigurati
@Override
public ClusterStats monitor(ProxyBackendConfiguration backend)
{
ClusterStats.Builder clusterStats = ClusterStats.builder(backend.getName());

ClusterStats.Builder clusterStats = ClusterStatsMonitor.getClusterStatsBuilder(backend);
Copy link
Member

@ebyhr ebyhr Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert unrelated change.

// Fetch Cluster level Stats.
String response = queryCluster(backend, UI_API_STATS_PATH);
if (Strings.isNullOrEmpty(response)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public ClusterStatsJdbcMonitor(BackendStateConfiguration backendStateConfigurati
public ClusterStats monitor(ProxyBackendConfiguration backend)
{
String url = backend.getProxyTo();
ClusterStats.Builder clusterStats = ClusterStats.builder(backend.getName());
ClusterStats.Builder clusterStats = ClusterStatsMonitor.getClusterStatsBuilder(backend);
String jdbcUrl;
try {
URL parsedUrl = new URL(url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,13 @@
public interface ClusterStatsMonitor
{
ClusterStats monitor(ProxyBackendConfiguration backend);

static ClusterStats.Builder getClusterStatsBuilder(ProxyBackendConfiguration backend)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to ClusterStats.

{
ClusterStats.Builder builder = ClusterStats.builder(backend.getName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to define the variable. This is a builder.

builder.proxyTo(backend.getProxyTo());
builder.externalUrl(backend.getExternalUrl());
builder.routingGroup(backend.getRoutingGroup());
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ public HealthCheckObserver(RoutingManager routingManager)
@Override
public void observe(java.util.List<ClusterStats> clustersStats)
{
for (ClusterStats clusterStats : clustersStats) {
routingManager.upateBackEndHealth(clusterStats.clusterId(), clusterStats.healthy());
routingManager.updateBackEndHealthDB(clusterStats);
}
routingManager.upateBackEndStats(clustersStats);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

public class MonitorConfiguration
{
private int taskDelayMin = ActiveClusterMonitor.MONITOR_TASK_DELAY_MIN;
private int taskDelaySeconds = ActiveClusterMonitor.MONITOR_TASK_DELAY_SECONDS;

public MonitorConfiguration() {}

public int getTaskDelayMin()
public int getTaskDelaySeconds()
{
return this.taskDelayMin;
return this.taskDelaySeconds;
}

public void setTaskDelayMin(int taskDelayMin)
public void setTaskDelaySeconds(int taskDelaySeconds)
{
this.taskDelayMin = taskDelayMin;
this.taskDelaySeconds = taskDelaySeconds;
}
}
Loading
Loading