Skip to content

Commit

Permalink
Configurable routers, add query count based router
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalya committed Mar 7, 2024
1 parent 43c8142 commit 51997af
Show file tree
Hide file tree
Showing 23 changed files with 837 additions and 1,026 deletions.
18 changes: 18 additions & 0 deletions docs/operation.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,21 @@ To gracefully shutdown a single worker process, see
[this](https://trino.io/docs/current/admin/graceful-shutdown.html) for the
operations.

## Query routing options
- The default router selects the backend randomly to route the queries.
- If you want to route the queries to the least loaded backend for a user
i.e. backend with the least number of queries running or queued from a particular user,
then use `QueryCountBasedRouter`, it can be configured by adding the module name
to config file's modules section like below
```
modules:
- io.trino.gateway.ha.module.HaGatewayProviderModule
- io.trino.gateway.ha.module.ClusterStateListenerModule
- io.trino.gateway.ha.module.ClusterStatsMonitorModule
- io.trino.gateway.ha.module.QueryCountBasedRouterProvider
```
- The router works on the stats it receives from the clusters about the load i.e number queries queued and running on a cluster at regular intervals which can be configured like below. The default interval is 1 min
```
monitor:
taskDelaySeconds: 10
```
4 changes: 4 additions & 0 deletions gateway-ha/gateway-ha-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ server:
port: 8091
useForwardedHeaders: true

# This can be adjusted based on the coordinator state
monitor:
taskDelaySeconds: 10

modules:
- io.trino.gateway.ha.module.HaGatewayProviderModule
- io.trino.gateway.ha.module.ClusterStateListenerModule
Expand Down
46 changes: 35 additions & 11 deletions gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.dropwizard.lifecycle.Managed;
import io.dropwizard.lifecycle.setup.LifecycleEnvironment;
import io.dropwizard.servlets.tasks.Task;
import io.trino.gateway.ha.module.RouterBaseModule;
import io.trino.gateway.ha.module.StochasticRoutingManagerProvider;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.ext.Provider;
import org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature;
Expand All @@ -41,6 +43,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
Expand All @@ -66,6 +69,34 @@ public abstract class BaseApp<T extends AppConfiguration>
private final ImmutableList.Builder<Module> appModules = ImmutableList.builder();
private Injector injector;

private AppModule newModule(String clazz, T configuration, Environment environment)
{
try {
logger.info("Trying to load module [{}]", clazz);
Object module =
Class.forName(clazz)
.getConstructor(configuration.getClass(), Environment.class)
.newInstance(configuration, environment);
return ((AppModule) module);
}
catch (Exception e) {
logger.error("Could not instantiate module [" + clazz + "]", e);
onFatalError(e);
}
}

private void validateModules(List<AppModule> modules, T configuration, Environment environment)
{
Optional<AppModule> routerProvider = modules.stream()
.filter(module -> module instanceof RouterBaseModule)
.findFirst();
if (routerProvider.isEmpty()) {
logger.warn("Router provider doesn't exist in the config, using the StochasticRoutingManagerProvider");
String clazz = StochasticRoutingManagerProvider.class.getCanonicalName();
modules.add(newModule(clazz, configuration, environment));
}
}

protected BaseApp(String... basePackages)
{
final ConfigurationBuilder confBuilder = new ConfigurationBuilder();
Expand Down Expand Up @@ -171,18 +202,11 @@ protected List<AppModule> addModules(T configuration, Environment environment)
return modules;
}
for (String clazz : configuration.getModules()) {
try {
logger.info("Trying to load module [{}]", clazz);
Object ob =
Class.forName(clazz)
.getConstructor(configuration.getClass(), Environment.class)
.newInstance(configuration, environment);
modules.add((AppModule) ob);
}
catch (Exception e) {
logger.error("Could not instantiate module [" + clazz + "]", e);
}
modules.add(newModule(clazz, configuration, environment));
}

validateModules(modules, configuration, environment);

return modules;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@
public class ActiveClusterMonitor
implements Managed
{
public static final int MONITOR_TASK_DELAY_MIN = 1;
public static final int MONITOR_TASK_DELAY_SECONDS = 60;
public static final int DEFAULT_THREAD_POOL_SIZE = 20;
private static final Logger log = LoggerFactory.getLogger(ActiveClusterMonitor.class);

private final List<TrinoClusterStatsObserver> clusterStatsObservers;
private final GatewayBackendManager gatewayBackendManager;
private final int taskDelayMin;

private final int taskDelaySeconds;
private final ClusterStatsMonitor clusterStatsMonitor;
private volatile boolean monitorActive = true;
private final ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
Expand All @@ -52,9 +53,9 @@ public ActiveClusterMonitor(
{
this.clusterStatsObservers = clusterStatsObservers;
this.gatewayBackendManager = gatewayBackendManager;
this.taskDelayMin = monitorConfiguration.getTaskDelayMin();
this.taskDelaySeconds = monitorConfiguration.getTaskDelaySeconds();
this.clusterStatsMonitor = clusterStatsMonitor;
log.info("Running cluster monitor with task delay of {}", taskDelayMin);
log.info("Running cluster monitor with connection task delay of {} seconds", taskDelaySeconds);
}

/**
Expand All @@ -67,6 +68,7 @@ public void start()
() -> {
while (monitorActive) {
try {
log.info("Getting the stats for the active clusters");
List<ProxyBackendConfiguration> activeClusters =
gatewayBackendManager.getAllActiveBackends();
List<Future<ClusterStats>> futures = new ArrayList<>();
Expand All @@ -91,7 +93,7 @@ public void start()
log.error("Error performing backend monitor tasks", e);
}
try {
Thread.sleep(TimeUnit.MINUTES.toMillis(taskDelayMin));
Thread.sleep(TimeUnit.SECONDS.toMillis(taskDelaySeconds));
}
catch (Exception e) {
log.error("Error with monitor task", e);
Expand Down
Loading

0 comments on commit 51997af

Please sign in to comment.