Skip to content

Commit

Permalink
Configurable routers, add query count based router
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalya committed Mar 12, 2024
1 parent 7c3bddc commit 6ebd81c
Show file tree
Hide file tree
Showing 22 changed files with 831 additions and 1,021 deletions.
18 changes: 18 additions & 0 deletions docs/operation.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,21 @@ To gracefully shutdown a single worker process, see
[this](https://trino.io/docs/current/admin/graceful-shutdown.html) for the
operations.

## Query routing options
- The default router selects the backend randomly to route the queries.
- If you want to route the queries to the least loaded backend for a user
i.e. backend with the least number of queries running or queued from a particular user,
then use `QueryCountBasedRouter`, it can be configured by adding the module name
to config file's modules section like below
```
modules:
- io.trino.gateway.ha.module.HaGatewayProviderModule
- io.trino.gateway.ha.module.ClusterStateListenerModule
- io.trino.gateway.ha.module.ClusterStatsMonitorModule
- io.trino.gateway.ha.module.QueryCountBasedRouterProvider
```
- The router works on the stats it receives from the clusters about the load i.e number queries queued and running on a cluster at regular intervals which can be configured like below. The default interval is 1 min
```
monitor:
taskDelaySeconds: 10
```
4 changes: 4 additions & 0 deletions gateway-ha/gateway-ha-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ server:
port: 8091
useForwardedHeaders: true

# This can be adjusted based on the coordinator state
monitor:
taskDelaySeconds: 10

modules:
- io.trino.gateway.ha.module.HaGatewayProviderModule
- io.trino.gateway.ha.module.ClusterStateListenerModule
Expand Down
48 changes: 37 additions & 11 deletions gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.codahale.metrics.health.HealthCheck;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.MoreCollectors;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
Expand All @@ -29,6 +30,8 @@
import io.dropwizard.lifecycle.setup.LifecycleEnvironment;
import io.dropwizard.servlets.tasks.Task;
import io.trino.gateway.ha.log.GatewayRequestLogFactory;
import io.trino.gateway.ha.module.RouterBaseModule;
import io.trino.gateway.ha.module.StochasticRoutingManagerProvider;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.ext.Provider;
import org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature;
Expand All @@ -41,6 +44,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
Expand All @@ -63,6 +67,35 @@ public abstract class BaseApp<T extends AppConfiguration>
private final Reflections reflections;
private final ImmutableList.Builder<Module> appModules = ImmutableList.builder();

private AppModule newModule(String clazz, T configuration, Environment environment)
{
try {
logger.info("Trying to load module [%s]", clazz);
Object module =
Class.forName(clazz)
.getConstructor(configuration.getClass(), Environment.class)
.newInstance(configuration, environment);
return ((AppModule) module);
}
catch (Exception e) {
logger.error(e, "Could not instantiate module [%s]", clazz);
onFatalError(e);
}
return null;
}

private void validateModules(List<AppModule> modules, T configuration, Environment environment)
{
Optional<AppModule> routerProvider = modules.stream()
.filter(module -> module instanceof RouterBaseModule)
.collect(MoreCollectors.toOptional());
if (routerProvider.isEmpty()) {
logger.warn("Router provider doesn't exist in the config, using the StochasticRoutingManagerProvider");
String clazz = StochasticRoutingManagerProvider.class.getCanonicalName();
modules.add(newModule(clazz, configuration, environment));
}
}

protected BaseApp(String... basePackages)
{
final ConfigurationBuilder confBuilder = new ConfigurationBuilder();
Expand Down Expand Up @@ -139,18 +172,11 @@ protected List<AppModule> addModules(T configuration, Environment environment)
return modules;
}
for (String clazz : configuration.getModules()) {
try {
logger.info("Trying to load module [%s]", clazz);
Object ob =
Class.forName(clazz)
.getConstructor(configuration.getClass(), Environment.class)
.newInstance(configuration, environment);
modules.add((AppModule) ob);
}
catch (Exception e) {
logger.error(e, "Could not instantiate module [%s]", clazz);
}
modules.add(newModule(clazz, configuration, environment));
}

validateModules(modules, configuration, environment);

return modules;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@
public class ActiveClusterMonitor
implements Managed
{
public static final int MONITOR_TASK_DELAY_MIN = 1;
public static final int MONITOR_TASK_DELAY_SECONDS = 60;
public static final int DEFAULT_THREAD_POOL_SIZE = 20;
private static final Logger log = Logger.get(ActiveClusterMonitor.class);

private final List<TrinoClusterStatsObserver> clusterStatsObservers;
private final GatewayBackendManager gatewayBackendManager;
private final int taskDelayMin;

private final int taskDelaySeconds;
private final ClusterStatsMonitor clusterStatsMonitor;
private volatile boolean monitorActive = true;
private final ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
Expand All @@ -51,9 +52,9 @@ public ActiveClusterMonitor(
{
this.clusterStatsObservers = clusterStatsObservers;
this.gatewayBackendManager = gatewayBackendManager;
this.taskDelayMin = monitorConfiguration.getTaskDelayMin();
this.taskDelaySeconds = monitorConfiguration.getTaskDelaySeconds();
this.clusterStatsMonitor = clusterStatsMonitor;
log.info("Running cluster monitor with task delay of %d", taskDelayMin);
log.info("Running cluster monitor with connection task delay of %d seconds", taskDelaySeconds);
}

/**
Expand All @@ -66,6 +67,7 @@ public void start()
() -> {
while (monitorActive) {
try {
log.info("Getting the stats for the active clusters");
List<ProxyBackendConfiguration> activeClusters =
gatewayBackendManager.getAllActiveBackends();
List<Future<ClusterStats>> futures = new ArrayList<>();
Expand All @@ -90,7 +92,7 @@ public void start()
log.error(e, "Error performing backend monitor tasks");
}
try {
Thread.sleep(TimeUnit.MINUTES.toMillis(taskDelayMin));
Thread.sleep(TimeUnit.SECONDS.toMillis(taskDelaySeconds));
}
catch (Exception e) {
log.error(e, "Error with monitor task");
Expand Down
Loading

0 comments on commit 6ebd81c

Please sign in to comment.