Skip to content

Commit

Permalink
ES query throttling.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Hale committed Nov 15, 2023
1 parent 38ea84a commit 5234e55
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ public final class EvaluationConfig {
public static final String TRACK_BRANCH_OPERATORS_ONLY = "halyard.evaluation.trackBranchOperatorsOnly";
public static final String QUERY_CACHE_MAX_SIZE = "hayard.evaluation.maxQueryCacheSize";
public static final String QUERY_HISTORY_MAX_SIZE = "hayard.evaluation.maxQueryHistorySize";
public static final String SEARCH_MAX_REQUESTS = "halyard.evaluation.search.maxRequests";

public final int queryCacheSize;
public final boolean trackResultSize;
public final boolean trackResultTime;
public final boolean trackBranchOperatorsOnly;
public final int maxQueryHistorySize;
public final int maxSearchRequests;

EvaluationConfig(Configuration config) {
queryCacheSize = config.getInt(EvaluationConfig.QUERY_CACHE_MAX_SIZE, 100);
trackResultSize = config.getBoolean(EvaluationConfig.TRACK_RESULT_SIZE, false);
trackResultTime = config.getBoolean(EvaluationConfig.TRACK_RESULT_TIME, false);
trackBranchOperatorsOnly = config.getBoolean(TRACK_BRANCH_OPERATORS_ONLY, true);
maxQueryHistorySize = config.getInt(EvaluationConfig.QUERY_HISTORY_MAX_SIZE, 10);
maxSearchRequests = config.getInt(EvaluationConfig.SEARCH_MAX_REQUESTS, 20);
}
}
2 changes: 1 addition & 1 deletion sail/src/main/java/com/msd/gin/halyard/sail/HBaseSail.java
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ private Optional<SearchClient> getSearchClient() {
synchronized (this) {
localRef = searchClient;
if (localRef == null) {
localRef = esTransport.map(transport -> new SearchClient(new ElasticsearchClient(transport), esSettings.indexName));
localRef = esTransport.map(transport -> new SearchClient(new ElasticsearchClient(transport), esSettings.indexName, evaluationConfig.maxSearchRequests));
searchClient = localRef;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.msd.gin.halyard.sail.search;

import java.io.IOException;
import java.util.concurrent.Semaphore;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.GeoLocation;
Expand All @@ -14,22 +15,33 @@ public final class SearchClient {

private final ElasticsearchClient client;
private final String index;
private final Semaphore requests;

public SearchClient(ElasticsearchClient client, String index) {
public SearchClient(ElasticsearchClient client, String index, int maxRequests) {
this.client = client;
this.index = index;
this.requests = new Semaphore(maxRequests);
}

public SearchResponse<SearchDocument> search(String query, int limit, double minScore, int fuzziness, int slop) throws IOException {
return client.search(
s -> s.index(index).source(src -> src.filter(f -> f.includes(SearchDocument.REQUIRED_FIELDS)))
.query(q -> q.queryString(qs -> qs.query(query).defaultField(SearchDocument.LABEL_FIELD).fuzziness(Integer.toString(fuzziness)).phraseSlop(Double.valueOf(slop)))).minScore(minScore).size(limit),
SearchDocument.class);
requests.acquireUninterruptibly();
try {
return client.search(
s -> s.index(index).source(src -> src.filter(f -> f.includes(SearchDocument.REQUIRED_FIELDS)))
.query(q -> q.queryString(qs -> qs.query(query).defaultField(SearchDocument.LABEL_FIELD).fuzziness(Integer.toString(fuzziness)).phraseSlop(Double.valueOf(slop)))).minScore(minScore).size(limit),
SearchDocument.class);
} finally {
requests.release();
}
}

public SearchResponse<SearchDocument> search(double lat, double lon, double dist, String units) throws IOException {
return client.search(s -> s.index(index).source(src -> src.filter(f -> f.includes(SearchDocument.REQUIRED_FIELDS)))
.query(q -> q.geoDistance(gd -> gd.field(SearchDocument.LABEL_POINT_FIELD).location(GeoLocation.of(gl -> gl.latlon(ll -> ll.lat(lat).lon(lon)))).distance(dist + units))),
SearchDocument.class);
requests.acquireUninterruptibly();
try {
return client.search(s -> s.index(index).source(src -> src.filter(f -> f.includes(SearchDocument.REQUIRED_FIELDS)))
.query(q -> q.geoDistance(gd -> gd.field(SearchDocument.LABEL_POINT_FIELD).location(GeoLocation.of(gl -> gl.latlon(ll -> ll.lat(lat).lon(lon)))).distance(dist + units))), SearchDocument.class);
} finally {
requests.release();
}
}
}

0 comments on commit 5234e55

Please sign in to comment.