Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure the router policy via router provider module #98

Merged
merged 1 commit into from
Apr 3, 2024

Conversation

vishalya
Copy link
Member

@vishalya vishalya commented Nov 24, 2023

  • This PR providers a way to configure the que length based router.
  • This router and it's logic was there, now it can be configured via the config file.
  • Please check the configuration.md, Router Policy section to understand on how to configure the current default router vs the queue length based router
  • HaGatewayProviderModule needed to be refactored and the db dependencies are moved into another base module, because of that the router module needs to be configured explicitly.
  • Hopefully this PR fixes Feature Request: Routing based on number of queued and running queries #77 and Load balancing scheme in Trino gateway? #84

@cla-bot cla-bot bot added the cla-signed label Nov 24, 2023
Copy link
Contributor

@willmostly willmostly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. My only real concern is that as config gets more complex, we should make sure we're giving understandable error messages to users and avoiding returning hundreds of lines of Guice injection errors.

docs/configuration.md Outdated Show resolved Hide resolved
There are 2 routers you can choose from
### Que length based
- This distributed the queries based on the number of queries running
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Queries are routed to the backend with the fewest queued queries. If multiple backends have zero queued queries, the backend with the fewest running queries is chosen.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should route it to the cluster with the fewest running queries.

Comment on lines 42 to 43
- io.trino.gateway.ha.module.QueueLengthListenerModule
- io.trino.gateway.ha.module.QueueLengthRouterProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The QueueLengthRouterProvider requires that the QueueLengthListenerModule is configured (right?). If it isn't, what kind of error does the end user see?

If its not an easy one to understand, we should add some checking in Baseapp to ensure that dependent modules are loaded. AppConfiguration.getModules() should give us the list of loaded modules.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my latest change, I am providing the BasicRouter by default and config can override it. Is that ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@mosabua
Copy link
Member

mosabua commented Jan 5, 2024

This needs a rebase before we can look at it again.

@vishalya
Copy link
Member Author

vishalya commented Jan 9, 2024

Yes, let me rebase and then I can also add a new router which is much simpler. Currently, we are testing it internally.

@vishalya
Copy link
Member Author

vishalya commented Feb 1, 2024

  • I have added a new router - QueryCountBasedRouter which is proved by QueryCountBasedRouterProvider. The clusterstats are supplied to the routers and any hence can be used by any new router in future. The new router can be enabled by adding the provider to config. The full modules section in the config would look like below
modules:
  - io.trino.gateway.ha.module.HaGatewayProviderModule
  - io.trino.gateway.ha.module.ClusterStateListenerModule
  - io.trino.gateway.ha.module.ClusterStatsMonitorModule
  - io.trino.gateway.ha.module.QueryCountBasedRouterProvider
  • The existing router could be used replace the last line with BasicRouterProvider. We can also set the existing router as default one, if none is provided explicitly in the config.
modules:
  - io.trino.gateway.ha.module.HaGatewayProviderModule
  - io.trino.gateway.ha.module.ClusterStateListenerModule
  - io.trino.gateway.ha.module.ClusterStatsMonitorModule
  - io.trino.gateway.ha.module.BasicRouterProvider
  • TrinoQueueLengthRoutingTable didn't seem to work that well, so I am dropping it and eventually we can get rid of that class all together.

  • The background task which collected stats can be configured to run with second as an unit, instead of the minute. It is a part of the monitor section of the config. It's not a mandatory parameter, as the previous default of 1 minute is respected with 60 second default. The old parameter taskDelayMin is removed.

monitor:
  connectionTimeout: 15
  taskDelaySeconds: 10

The logic for the QueryCountBasedRouter is in the comments.

@vishalya vishalya changed the title configure the router policy via router provider module Configure the router policy via router provider module Feb 2, 2024
Copy link
Contributor

@willmostly willmostly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things to address, mainly around the locking and potential performance impacts

// The live stats refresh every few seconds, so we update the stats immediately
// so that they can be used for next queries to route
// We assume that if a user has queued queries then newly arriving queries
// for that user would also be queued
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We assume that if a user has queued queries then newly arriving queries

This is a strong assumption given trino resource group options such as queryType and queryText.

My main concern however is putting a synchronized method in the getBackend critical path. This could cause performance issues at high concurrency.

Copy link
Member Author

@vishalya vishalya Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I hear you on the assumption, but one could only assume in absense of the live stats. Currently, the cluster stats interval can be set with a granularity of seconds (default is 60 secs), but we are doing ok with 10 sec interval. This assumption phase is applicable during that 10 seconds and when the fresh stats arrive we start afresh with them.
  • There is tremendous overall gain in the performance, the BasicRouter (the one on the main branch) checks db for every request and it's literally 50 to 100 times slower. I have verified that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean this is faster compared to the TrinoQueueLengthRoutingTable I assume?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even compared to the BasicRouter it's very fast, since BasicRouter keeps cheking db for the active/inactive state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This synchronized usage is incorrect.
Reads and writes on userQueuedCount and runningQueryCount must be synchronized.

Change from record class to normal class requires a lot of works on reimplementing getter/setters/equals. I think we can continue to use record class by using AtomicInteger and ConcurrentHashMap.

import io.trino.gateway.ha.router.HaRoutingManager;
import io.trino.gateway.ha.router.RoutingManager;

public class BasicRouterProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class name should reflect the router it provides, so HaRouterProvider. Tbh the HaRoutingManager could have a more meaningful name such as StochasticRoutingManager, this would be a good time to update it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so StochasticRoutingManager and StochasticRoutingManagerProvide? I am ok with that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Lock readLock = lock.readLock();
try {
readLock.lock();
filteredList = clusterStats.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a good use case for a Map<Array<ClusterStats>> or potentially com.google.common.collect.Table if you need to look up on various attributes vs filtering a list

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, If it becomes more complicated then yes, may be the next iteration when we get more stats for the cost based routing we can use it. At this time the filter and sort is just simple and clean IMO.

@alaturqua
Copy link
Contributor

Worked today on resolving the issues alongside @vishalya. The solution now operates as intended, effectively distributing queries based on the number of queued queries evenly across a routing group. A merge would greatly enhance our current setup and would be great feature for the community.

Copy link
Contributor

@willmostly willmostly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting close, just spotted a few more code improvements to make.

Can you additionally remove TrinoQueueLengthRoutingTable as part of this PR? It will be orphaned once this merged.


private void validateModules(List<AppModule> modules, T configuration, Environment environment)
{
Optional routerProvider = modules.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a <TypeParameter> to this declaration

ClusterStats monitor(ProxyBackendConfiguration backend);
abstract ClusterStats monitor(ProxyBackendConfiguration backend);

protected void populateClusterStats(ClusterStats.Builder stats, ProxyBackendConfiguration backend)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return a ClusterStatsBuilder instead of modifying one passed as an argument. It looks like the builder passed is initialized with ClusterStats.builder(backend.getName()), so just change this to

ClusterStats.Builder getClusterStatsBuilder(ProxyBackendConfiguration backend)
{
   ClusterStats.Builder builder =  ProxyBackendConfiguration backend);
   //...
   return builder;
}

import io.trino.gateway.ha.router.HaRoutingManager;
import io.trino.gateway.ha.router.RoutingManager;

public class BasicRouterProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return Optional.empty();
}

Collections.sort(filteredList, new Comparator<ClusterStats>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace with Collections.max() instead of sort and get 0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be Collections.min() but I got the idea.


Collections.sort(filteredList, new Comparator<ClusterStats>()
{
public int compare(ClusterStats lhs, ClusterStats rhs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClusterStats should extend Comparable, then you can implement compareTo using this logic. That way it can be reused in other routers.

Copy link
Member Author

@vishalya vishalya Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true! The comparison involves a user specific stats, so it won't be straight forward comparison, but I can a compare function and that can be resused and also makes the router code more readable with a simple lambda.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, can you remove the comment block of the previous code?

{
Optional<ClusterStats> cluster = getClusterToRoute(user, routingGroup);
if (cluster.isPresent()) {
cluster.orElseThrow().updateLocalStats(user);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is orElseThrow used? isPresent() was just checked. If this is needed then an exception supplier should be passed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because the static check doesn't allow it, but I guess there should be a way around it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be able to eliminate the isPresent condition if you use ifPresent for the updateLocalStats call and return a flatMap from cluster

Copy link
Contributor

@willmostly willmostly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few loose ends

{
Optional<ClusterStats> cluster = getClusterToRoute(user, routingGroup);
if (cluster.isPresent()) {
cluster.orElseThrow().updateLocalStats(user);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be able to eliminate the isPresent condition if you use ifPresent for the updateLocalStats call and return a flatMap from cluster

.findFirst();
if (routerProvider.isEmpty()) {
logger.warn("Router provider doesn't exist in the config, using the StochasticRoutingManagerProvider");
String clazz = "io.trino.gateway.ha.module.StochasticRoutingManagerProvider";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this hard coded name could be missed in a refactor - use StochasticRoutingManagerProvider.class.getCanonicalName()


Collections.sort(filteredList, new Comparator<ClusterStats>()
{
public int compare(ClusterStats lhs, ClusterStats rhs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, can you remove the comment block of the previous code?

Comment on lines 139 to 143
Optional<String> proxyUrl = getBackendForRoutingGroup(routingGroup, user);
if (proxyUrl.isPresent()) {
return proxyUrl.orElseThrow();
}
return provideAdhocBackend(user);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Optional<String> proxyUrl = getBackendForRoutingGroup(routingGroup, user);
if (proxyUrl.isPresent()) {
return proxyUrl.orElseThrow();
}
return provideAdhocBackend(user);
return getBackendForRoutingGroup(routingGroup, user).OrElse(provideAdhocBackend(user));

Copy link
Member

@ebyhr ebyhr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just skimmed.

@vishalya vishalya force-pushed the router_config branch 2 times, most recently from 51997af to 9fbc3e7 Compare March 7, 2024 17:13
@vishalya
Copy link
Member Author

vishalya commented Mar 7, 2024

I have taken care of the review comments, the rebase is coming next.

@vishalya
Copy link
Member Author

vishalya commented Mar 8, 2024

@ebyhr please take a look when you get a chance, changes and rebase is done.

Copy link
Member

@oneonestar oneonestar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments on the implementation of concurrency.

// The live stats refresh every few seconds, so we update the stats immediately
// so that they can be used for next queries to route
// We assume that if a user has queued queries then newly arriving queries
// for that user would also be queued
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This synchronized usage is incorrect.
Reads and writes on userQueuedCount and runningQueryCount must be synchronized.

Change from record class to normal class requires a lot of works on reimplementing getter/setters/equals. I think we can continue to use record class by using AtomicInteger and ConcurrentHashMap.

Comment on lines 33 to 34
private ArrayList<ClusterStats> clusterStats;
private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The operations on clusterStats are read all and replace all.
ReentrantReadWriteLock can be replaced by AtomicReference<List<ClusterStats>>.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the read, thanks!
Both suggestions for the synchronization are good.
I might need to modify the flow a bit to use the record, with record I won't be able to modify the member attributes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I am going back to the class for ClusterStats as the router needs to update it, it can be achieved by copying the records back and forth, but it won't be performant.

  • I have moved the reading/sorting and updating of the cluster stats at the router level - inside QueryCountBasedRouter, for 2 reasons.

    1. The logic is specific to the router.

    2. The logic needs to read the stats, sort and the update the counters in the ClusterStats in a single transaction. This would avoid the race condition while processing simultaneous requests.

@oneonestar
Copy link
Member

oneonestar commented Mar 20, 2024

I have moved the reading/sorting and updating of the cluster stats at the router level - inside QueryCountBasedRouter, for 2 reasons.
The logic is specific to the router.
The logic needs to read the stats, sort and the update the counters in the ClusterStats in a single transaction. This would avoid the race condition while processing simultaneous requests.

Thanks for the refactoring. The logic is easier to follow now.

If I understand correctly, QueryCountBasedRouter does the following things:

  • store new ClusterStats received from observer
  • choose a backend for a query base on clusterStats
  • update the clusterStats with heuristic load

QueryCountBasedRouter is the only class that needs to update ClusterStats and read the updated ClusterStats.
The purpose of updating ClusterStats is to store the heuristic load of a new routed query.

We can keep ClusterStats as a Record class and store the heuristic load in another variable.
In this way, ClusterStats will stay as a read-only snapshot of a cluster's stats.

public class QueryCountBasedRouter {
    private List<ClusterStats> clusterStats;
    
    private Map<String, Integer> heuristicLoad; // store heuristic load on cluster
    // maybe HashBasedTable to store heuristic load on cluster per user
    
    public String provideBackendForRoutingGroup(...) {
        // make decision using clusterStats and heuristicLoad
        // update heuristicLoad after assigned a query to backend
    }
    public synchronized void upateBackEndStats(...) {
        // update clusterStats and clear heuristicLoad
    } 
}

Updates: Discussed the following in contributor meeting. The current implementation works well in production.
We could update this later if it doesn't work for some edge cases, but for now it's all good.

Another concern is how we choose a backend.
The current decision tree style of decision could off the mark by a lot.

For example, we have backend-1 and backend-2.

Case 1:
backend-1: 1 queued query for UserA
backend-2: 0 queued query for UserA
All the following queries (could be thousands) from UserA will route to backend-2, for the following 60 seconds before stats got refreshed.
Next time we refresh the stats, backend-2 will likely to have UserA's query queued and all queries now route to backend-1 and so on.

Case 2:
backend-1: 20 running queries, 1 queued query
backend-2: 100 running queries, 0 queued query
All the following queries will route to backend-2.

We have to assume a lot of the things to estimate the load.
* Number of worker / processing power of the backend clusters are roughly identical
* Resource group setting of the backend clusters are roughly identical
* The workload of each query are roughly identical

There are many assumptions that could go wrong. I think some kind of weighted round-robin is a safer choice.
How about something like this?

int clusterLoadForBackend1 = runningQueryCount()
                             + queuedQueryCount() * QUEUED_QUERY_WEIGHT
                             + userQueuedCount(user) *  USER_QUEUED_WEIGHT
                             + heuristicLoad;
...
return Integer.compare(clusterLoadForBackend1, clusterLoadForBackend2);

@vishalya
Copy link
Member Author

I have pushed the changes to copy the stats locally and the router now works with them as discussed.

Copy link
Member

@oneonestar oneonestar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some minor comments. Others LGTM.

I think coding style can be improved overtime.
Hope we can get this merged soon to avoid conflicts on migration to JDBI & Airlift.

return clusterStats;
}

class LocalStats
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add static

Comment on lines +35 to +38
List<LocalStats> clusterStats()
{
return clusterStats;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's prevent someone misuse this function in the future.

Suggested change
List<LocalStats> clusterStats()
{
return clusterStats;
}
@VisibleForTesting
synchronized List<LocalStats> clusterStats()
{
return new ArrayList<>(clusterStats);
}

return Optional.of(Collections.min(filteredList, (lhs, rhs) -> compareStats(lhs, rhs, user)));
}

private void updateLocalStats(LocalStats stats, String user)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add synchronized.

Copy link
Member

@mosabua mosabua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. As agreed with @willmostly @Chaho12 and others in the dev sync, we are merging this now and will address any code style and other minor issues in follow up PRs.

@mosabua mosabua merged commit aa2cbed into trinodb:main Apr 3, 2024
2 checks passed
@github-actions github-actions bot added this to the 8 milestone Apr 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Feature Request: Routing based on number of queued and running queries
7 participants