Skip to content

Commit

Permalink
Use regexp_like function pushdown on Elasticsearch connector
Browse files Browse the repository at this point in the history
  • Loading branch information
assaf2 committed Mar 1, 2022
1 parent 8d6f162 commit a96f5ee
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public CountQueryPageSource(ElasticsearchClient client, ElasticsearchTableHandle
long count = client.count(
split.getIndex(),
split.getShard(),
buildSearchQuery(table.getConstraint().transformKeys(ElasticsearchColumnHandle.class::cast), table.getQuery()));
buildSearchQuery(table.getConstraint().transformKeys(ElasticsearchColumnHandle.class::cast), table.getQuery(), table.getRegexes()));
readTimeNanos = System.nanoTime() - start;

if (table.getLimit().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.io.BaseEncoding;
import io.airlift.json.ObjectMapperProvider;
import io.airlift.slice.Slice;
import io.trino.plugin.base.expression.ConnectorExpressions;
import io.trino.plugin.elasticsearch.client.ElasticsearchClient;
import io.trino.plugin.elasticsearch.client.IndexMetadata;
import io.trino.plugin.elasticsearch.client.IndexMetadata.DateTimeType;
Expand Down Expand Up @@ -53,6 +55,11 @@
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.expression.Call;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
import io.trino.spi.expression.FunctionName;
import io.trino.spi.expression.Variable;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.ArrayType;
Expand All @@ -64,12 +71,15 @@

import javax.inject.Inject;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -115,6 +125,9 @@ public class ElasticsearchMetadata
new VarcharDecoder.Descriptor(PASSTHROUGH_QUERY_RESULT_COLUMN_NAME),
false));

// See https://www.elastic.co/guide/en/elasticsearch/reference/current/regexp-syntax.html
private static final char[] REGEXP_RESERVED_CHARACTERS = new char[] {'.', '?', '+', '*', '|', '{', '}', '[', ']', '(', ')', '"', '#', '@', '&', '<', '>', '~'};

private final Type ipAddressType;
private final ElasticsearchClient client;
private final String schemaName;
Expand Down Expand Up @@ -488,6 +501,7 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(Connect
handle.getSchema(),
handle.getIndex(),
handle.getConstraint(),
handle.getRegexes(),
handle.getQuery(),
OptionalLong.of(limit));

Expand Down Expand Up @@ -521,7 +535,35 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C

TupleDomain<ColumnHandle> oldDomain = handle.getConstraint();
TupleDomain<ColumnHandle> newDomain = oldDomain.intersect(TupleDomain.withColumnDomains(supported));
if (oldDomain.equals(newDomain)) {

ConnectorExpression oldExpression = constraint.getExpression();
Map<String, String> newRegexes = new HashMap<>(handle.getRegexes());
List<ConnectorExpression> expressions = ConnectorExpressions.extractConjuncts(constraint.getExpression());
List<ConnectorExpression> notHandledExpressions = new ArrayList<>();
for (ConnectorExpression expression : expressions) {
if (expression instanceof Call) {
Call call = (Call) expression;
// TODO Support ESCAPE character when it's pushed down by the engine
if (new FunctionName("$like_pattern").equals(call.getFunctionName()) && call.getArguments().size() == 2 &&
call.getArguments().get(0) instanceof Variable && call.getArguments().get(1) instanceof Constant) {
String columnName = ((Variable) call.getArguments().get(0)).getName();
Object pattern = ((Constant) call.getArguments().get(1)).getValue();
if (!newRegexes.containsKey(columnName) && pattern instanceof Slice) {
IndexMetadata metadata = client.getIndexMetadata(handle.getIndex());
if (metadata.getSchema()
.getFields().stream()
.anyMatch(field -> columnName.equals(field.getName()) && field.getType() instanceof PrimitiveType && "keyword".equals(((PrimitiveType) field.getType()).getName()))) {
newRegexes.put(columnName, likeToRegexp(((Slice) pattern).toStringUtf8()));
continue;
}
}
}
}
notHandledExpressions.add(expression);
}

ConnectorExpression newExpression = ConnectorExpressions.and(notHandledExpressions);
if (oldDomain.equals(newDomain) && oldExpression.equals(newExpression)) {
return Optional.empty();
}

Expand All @@ -530,10 +572,23 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
handle.getSchema(),
handle.getIndex(),
newDomain,
newRegexes,
handle.getQuery(),
handle.getLimit());

return Optional.of(new ConstraintApplicationResult<>(handle, TupleDomain.withColumnDomains(unsupported), false));
return Optional.of(new ConstraintApplicationResult<>(handle, TupleDomain.withColumnDomains(unsupported), newExpression, false));
}

protected static String likeToRegexp(String like)
{
// TODO: This can be done more efficiently by using a state machine and iterating over characters (See io.trino.type.LikeFunctions.likePattern(String, char, boolean))
String regexp = like.replaceAll(Pattern.quote("\\"), Matcher.quoteReplacement("\\\\")); // first, escape regexp's escape character
for (char c : REGEXP_RESERVED_CHARACTERS) {
regexp = regexp.replaceAll(Pattern.quote(String.valueOf(c)), Matcher.quoteReplacement("\\" + c));
}
return regexp
.replaceAll("%", ".*")
.replaceAll("_", ".");
}

private static boolean isPassthroughQuery(ElasticsearchTableHandle table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.RegexpQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;

import java.time.Instant;
Expand Down Expand Up @@ -54,7 +55,7 @@ public final class ElasticsearchQueryBuilder
{
private ElasticsearchQueryBuilder() {}

public static QueryBuilder buildSearchQuery(TupleDomain<ElasticsearchColumnHandle> constraint, Optional<String> query)
public static QueryBuilder buildSearchQuery(TupleDomain<ElasticsearchColumnHandle> constraint, Optional<String> query, Map<String, String> regexes)
{
BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
if (constraint.getDomains().isPresent()) {
Expand All @@ -68,6 +69,9 @@ public static QueryBuilder buildSearchQuery(TupleDomain<ElasticsearchColumnHandl
}
}
}

regexes.forEach((name, value) -> queryBuilder.filter(new BoolQueryBuilder().must(((new RegexpQueryBuilder(name, value))))));

query.map(QueryStringQueryBuilder::new)
.ifPresent(queryBuilder::must);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.predicate.TupleDomain;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

Expand All @@ -37,6 +40,7 @@ public enum Type
private final String schema;
private final String index;
private final TupleDomain<ColumnHandle> constraint;
private final Map<String, String> regexes;
private final Optional<String> query;
private final OptionalLong limit;

Expand All @@ -48,6 +52,7 @@ public ElasticsearchTableHandle(Type type, String schema, String index, Optional
this.query = requireNonNull(query, "query is null");

constraint = TupleDomain.all();
regexes = ImmutableMap.of();
limit = OptionalLong.empty();
}

Expand All @@ -57,13 +62,15 @@ public ElasticsearchTableHandle(
@JsonProperty("schema") String schema,
@JsonProperty("index") String index,
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint,
@JsonProperty("regexes") Map<String, String> regexes,
@JsonProperty("query") Optional<String> query,
@JsonProperty("limit") OptionalLong limit)
{
this.type = requireNonNull(type, "type is null");
this.schema = requireNonNull(schema, "schema is null");
this.index = requireNonNull(index, "index is null");
this.constraint = requireNonNull(constraint, "constraint is null");
this.regexes = ImmutableMap.copyOf(requireNonNull(regexes, "regexes is null"));
this.query = requireNonNull(query, "query is null");
this.limit = requireNonNull(limit, "limit is null");
}
Expand Down Expand Up @@ -92,6 +99,12 @@ public TupleDomain<ColumnHandle> getConstraint()
return constraint;
}

@JsonProperty
public Map<String, String> getRegexes()
{
return regexes;
}

@JsonProperty
public OptionalLong getLimit()
{
Expand All @@ -118,14 +131,15 @@ public boolean equals(Object o)
schema.equals(that.schema) &&
index.equals(that.index) &&
constraint.equals(that.constraint) &&
regexes.equals(that.regexes) &&
query.equals(that.query) &&
limit.equals(that.limit);
}

@Override
public int hashCode()
{
return Objects.hash(type, schema, index, constraint, query, limit);
return Objects.hash(type, schema, index, constraint, regexes, query, limit);
}

@Override
Expand All @@ -135,6 +149,13 @@ public String toString()
builder.append(type + ":" + index);

StringBuilder attributes = new StringBuilder();
if (!regexes.isEmpty()) {
attributes.append("regexes=[");
attributes.append(regexes.entrySet().stream()
.map(regex -> regex.getKey() + ":" + regex.getValue())
.collect(Collectors.joining(", ")));
attributes.append("]");
}
limit.ifPresent(value -> attributes.append("limit=" + value));
query.ifPresent(value -> attributes.append("query" + value));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public ScanQueryPageSource(
SearchResponse searchResponse = client.beginSearch(
split.getIndex(),
split.getShard(),
buildSearchQuery(table.getConstraint().transformKeys(ElasticsearchColumnHandle.class::cast), table.getQuery()),
buildSearchQuery(table.getConstraint().transformKeys(ElasticsearchColumnHandle.class::cast), table.getQuery(), table.getRegexes()),
needAllFields ? Optional.empty() : Optional.of(requiredFields),
documentFields,
sort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,50 @@ public void testNestedVariants()
"VALUES 'value1', 'value2', 'value3', 'value4'");
}

@Test
public void testLike()
throws IOException
{
String indexName = "like_test";

@Language("JSON")
String mappings = "" +
"{" +
" \"properties\": { " +
" \"keyword_column\": { \"type\": \"keyword\" }," +
" \"text_column\": { \"type\": \"text\" }" +
" }" +
"}";

createIndex(indexName, mappings);

index(indexName, ImmutableMap.<String, Object>builder()
.put("keyword_column", "so.me tex\\t")
.put("text_column", "so.me tex\\t")
.buildOrThrow());

// Add another document to make sure '.' is escaped and not treated as any character
index(indexName, ImmutableMap.<String, Object>builder()
.put("keyword_column", "soome tex\\t")
.put("text_column", "soome tex\\t")
.buildOrThrow());

assertThat(query("" +
"SELECT " +
"keyword_column " +
"FROM " + indexName + " " +
"WHERE keyword_column LIKE 's_.m%ex\\t'"))
.matches("VALUES VARCHAR 'so.me tex\\t'")
.isFullyPushedDown();

assertThat(query("" +
"SELECT " +
"text_column " +
"FROM " + indexName + " " +
"WHERE text_column LIKE 's_.m%ex\\t'"))
.matches("VALUES VARCHAR 'so.me tex\\t'");
}

@Test
public void testDataTypes()
throws IOException
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.elasticsearch;

import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;

public class TestElasticsearchMetadata
{
@Test
public void testLikeToRegexp()
{
assertEquals(ElasticsearchMetadata.likeToRegexp("a_b_c"), "a.b.c");
assertEquals(ElasticsearchMetadata.likeToRegexp("a%b%c"), "a.*b.*c");
assertEquals(ElasticsearchMetadata.likeToRegexp("a%b_c"), "a.*b.c");
assertEquals(ElasticsearchMetadata.likeToRegexp("a[b"), "a\\[b");
assertEquals(ElasticsearchMetadata.likeToRegexp("a_\\b"), "a.\\\\b");
}
}

0 comments on commit a96f5ee

Please sign in to comment.