From a96f5eef5d15dadc74a1c02d0457384e64d26939 Mon Sep 17 00:00:00 2001 From: Assaf Bern Date: Tue, 5 Oct 2021 14:11:11 +0300 Subject: [PATCH] Use regexp_like function pushdown on Elasticsearch connector --- .../elasticsearch/CountQueryPageSource.java | 2 +- .../elasticsearch/ElasticsearchMetadata.java | 59 ++++++++++++++++++- .../ElasticsearchQueryBuilder.java | 6 +- .../ElasticsearchTableHandle.java | 23 +++++++- .../elasticsearch/ScanQueryPageSource.java | 2 +- .../BaseElasticsearchConnectorTest.java | 44 ++++++++++++++ .../TestElasticsearchMetadata.java | 31 ++++++++++ 7 files changed, 161 insertions(+), 6 deletions(-) create mode 100644 plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchMetadata.java diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/CountQueryPageSource.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/CountQueryPageSource.java index 7c84bf29a160..f76b86a47658 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/CountQueryPageSource.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/CountQueryPageSource.java @@ -42,7 +42,7 @@ public CountQueryPageSource(ElasticsearchClient client, ElasticsearchTableHandle long count = client.count( split.getIndex(), split.getShard(), - buildSearchQuery(table.getConstraint().transformKeys(ElasticsearchColumnHandle.class::cast), table.getQuery())); + buildSearchQuery(table.getConstraint().transformKeys(ElasticsearchColumnHandle.class::cast), table.getQuery(), table.getRegexes())); readTimeNanos = System.nanoTime() - start; if (table.getLimit().isPresent()) { diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchMetadata.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchMetadata.java index 09fc08509ad9..78dbc0f37df7 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchMetadata.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchMetadata.java @@ -20,6 +20,8 @@ import com.google.common.collect.ImmutableSet; import com.google.common.io.BaseEncoding; import io.airlift.json.ObjectMapperProvider; +import io.airlift.slice.Slice; +import io.trino.plugin.base.expression.ConnectorExpressions; import io.trino.plugin.elasticsearch.client.ElasticsearchClient; import io.trino.plugin.elasticsearch.client.IndexMetadata; import io.trino.plugin.elasticsearch.client.IndexMetadata.DateTimeType; @@ -53,6 +55,11 @@ import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; +import io.trino.spi.expression.Call; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.expression.Constant; +import io.trino.spi.expression.FunctionName; +import io.trino.spi.expression.Variable; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.ArrayType; @@ -64,12 +71,15 @@ import javax.inject.Inject; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; @@ -115,6 +125,9 @@ public class ElasticsearchMetadata new VarcharDecoder.Descriptor(PASSTHROUGH_QUERY_RESULT_COLUMN_NAME), false)); + // See https://www.elastic.co/guide/en/elasticsearch/reference/current/regexp-syntax.html + private static final char[] REGEXP_RESERVED_CHARACTERS = new char[] {'.', '?', '+', '*', '|', '{', '}', '[', ']', '(', ')', '"', '#', '@', '&', '<', '>', '~'}; + private final Type ipAddressType; private final ElasticsearchClient client; private final String schemaName; @@ -488,6 +501,7 @@ public Optional> applyLimit(Connect handle.getSchema(), handle.getIndex(), handle.getConstraint(), + handle.getRegexes(), handle.getQuery(), OptionalLong.of(limit)); @@ -521,7 +535,35 @@ public Optional> applyFilter(C TupleDomain oldDomain = handle.getConstraint(); TupleDomain newDomain = oldDomain.intersect(TupleDomain.withColumnDomains(supported)); - if (oldDomain.equals(newDomain)) { + + ConnectorExpression oldExpression = constraint.getExpression(); + Map newRegexes = new HashMap<>(handle.getRegexes()); + List expressions = ConnectorExpressions.extractConjuncts(constraint.getExpression()); + List notHandledExpressions = new ArrayList<>(); + for (ConnectorExpression expression : expressions) { + if (expression instanceof Call) { + Call call = (Call) expression; + // TODO Support ESCAPE character when it's pushed down by the engine + if (new FunctionName("$like_pattern").equals(call.getFunctionName()) && call.getArguments().size() == 2 && + call.getArguments().get(0) instanceof Variable && call.getArguments().get(1) instanceof Constant) { + String columnName = ((Variable) call.getArguments().get(0)).getName(); + Object pattern = ((Constant) call.getArguments().get(1)).getValue(); + if (!newRegexes.containsKey(columnName) && pattern instanceof Slice) { + IndexMetadata metadata = client.getIndexMetadata(handle.getIndex()); + if (metadata.getSchema() + .getFields().stream() + .anyMatch(field -> columnName.equals(field.getName()) && field.getType() instanceof PrimitiveType && "keyword".equals(((PrimitiveType) field.getType()).getName()))) { + newRegexes.put(columnName, likeToRegexp(((Slice) pattern).toStringUtf8())); + continue; + } + } + } + } + notHandledExpressions.add(expression); + } + + ConnectorExpression newExpression = ConnectorExpressions.and(notHandledExpressions); + if (oldDomain.equals(newDomain) && oldExpression.equals(newExpression)) { return Optional.empty(); } @@ -530,10 +572,23 @@ public Optional> applyFilter(C handle.getSchema(), handle.getIndex(), newDomain, + newRegexes, handle.getQuery(), handle.getLimit()); - return Optional.of(new ConstraintApplicationResult<>(handle, TupleDomain.withColumnDomains(unsupported), false)); + return Optional.of(new ConstraintApplicationResult<>(handle, TupleDomain.withColumnDomains(unsupported), newExpression, false)); + } + + protected static String likeToRegexp(String like) + { + // TODO: This can be done more efficiently by using a state machine and iterating over characters (See io.trino.type.LikeFunctions.likePattern(String, char, boolean)) + String regexp = like.replaceAll(Pattern.quote("\\"), Matcher.quoteReplacement("\\\\")); // first, escape regexp's escape character + for (char c : REGEXP_RESERVED_CHARACTERS) { + regexp = regexp.replaceAll(Pattern.quote(String.valueOf(c)), Matcher.quoteReplacement("\\" + c)); + } + return regexp + .replaceAll("%", ".*") + .replaceAll("_", "."); } private static boolean isPassthroughQuery(ElasticsearchTableHandle table) diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchQueryBuilder.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchQueryBuilder.java index 4e8cdef67c4a..c474c0a334b3 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchQueryBuilder.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchQueryBuilder.java @@ -24,6 +24,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryStringQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.RegexpQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import java.time.Instant; @@ -54,7 +55,7 @@ public final class ElasticsearchQueryBuilder { private ElasticsearchQueryBuilder() {} - public static QueryBuilder buildSearchQuery(TupleDomain constraint, Optional query) + public static QueryBuilder buildSearchQuery(TupleDomain constraint, Optional query, Map regexes) { BoolQueryBuilder queryBuilder = new BoolQueryBuilder(); if (constraint.getDomains().isPresent()) { @@ -68,6 +69,9 @@ public static QueryBuilder buildSearchQuery(TupleDomain queryBuilder.filter(new BoolQueryBuilder().must(((new RegexpQueryBuilder(name, value)))))); + query.map(QueryStringQueryBuilder::new) .ifPresent(queryBuilder::must); diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchTableHandle.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchTableHandle.java index e7583ebb8dbf..a381ef99b34d 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchTableHandle.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchTableHandle.java @@ -15,13 +15,16 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.predicate.TupleDomain; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; +import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -37,6 +40,7 @@ public enum Type private final String schema; private final String index; private final TupleDomain constraint; + private final Map regexes; private final Optional query; private final OptionalLong limit; @@ -48,6 +52,7 @@ public ElasticsearchTableHandle(Type type, String schema, String index, Optional this.query = requireNonNull(query, "query is null"); constraint = TupleDomain.all(); + regexes = ImmutableMap.of(); limit = OptionalLong.empty(); } @@ -57,6 +62,7 @@ public ElasticsearchTableHandle( @JsonProperty("schema") String schema, @JsonProperty("index") String index, @JsonProperty("constraint") TupleDomain constraint, + @JsonProperty("regexes") Map regexes, @JsonProperty("query") Optional query, @JsonProperty("limit") OptionalLong limit) { @@ -64,6 +70,7 @@ public ElasticsearchTableHandle( this.schema = requireNonNull(schema, "schema is null"); this.index = requireNonNull(index, "index is null"); this.constraint = requireNonNull(constraint, "constraint is null"); + this.regexes = ImmutableMap.copyOf(requireNonNull(regexes, "regexes is null")); this.query = requireNonNull(query, "query is null"); this.limit = requireNonNull(limit, "limit is null"); } @@ -92,6 +99,12 @@ public TupleDomain getConstraint() return constraint; } + @JsonProperty + public Map getRegexes() + { + return regexes; + } + @JsonProperty public OptionalLong getLimit() { @@ -118,6 +131,7 @@ public boolean equals(Object o) schema.equals(that.schema) && index.equals(that.index) && constraint.equals(that.constraint) && + regexes.equals(that.regexes) && query.equals(that.query) && limit.equals(that.limit); } @@ -125,7 +139,7 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(type, schema, index, constraint, query, limit); + return Objects.hash(type, schema, index, constraint, regexes, query, limit); } @Override @@ -135,6 +149,13 @@ public String toString() builder.append(type + ":" + index); StringBuilder attributes = new StringBuilder(); + if (!regexes.isEmpty()) { + attributes.append("regexes=["); + attributes.append(regexes.entrySet().stream() + .map(regex -> regex.getKey() + ":" + regex.getValue()) + .collect(Collectors.joining(", "))); + attributes.append("]"); + } limit.ifPresent(value -> attributes.append("limit=" + value)); query.ifPresent(value -> attributes.append("query" + value)); diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ScanQueryPageSource.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ScanQueryPageSource.java index 4baa34908f60..8bb5e60e9209 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ScanQueryPageSource.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ScanQueryPageSource.java @@ -111,7 +111,7 @@ public ScanQueryPageSource( SearchResponse searchResponse = client.beginSearch( split.getIndex(), split.getShard(), - buildSearchQuery(table.getConstraint().transformKeys(ElasticsearchColumnHandle.class::cast), table.getQuery()), + buildSearchQuery(table.getConstraint().transformKeys(ElasticsearchColumnHandle.class::cast), table.getQuery(), table.getRegexes()), needAllFields ? Optional.empty() : Optional.of(requiredFields), documentFields, sort, diff --git a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/BaseElasticsearchConnectorTest.java b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/BaseElasticsearchConnectorTest.java index 7b21d3f5455d..bfd991e56a2c 100644 --- a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/BaseElasticsearchConnectorTest.java +++ b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/BaseElasticsearchConnectorTest.java @@ -1041,6 +1041,50 @@ public void testNestedVariants() "VALUES 'value1', 'value2', 'value3', 'value4'"); } + @Test + public void testLike() + throws IOException + { + String indexName = "like_test"; + + @Language("JSON") + String mappings = "" + + "{" + + " \"properties\": { " + + " \"keyword_column\": { \"type\": \"keyword\" }," + + " \"text_column\": { \"type\": \"text\" }" + + " }" + + "}"; + + createIndex(indexName, mappings); + + index(indexName, ImmutableMap.builder() + .put("keyword_column", "so.me tex\\t") + .put("text_column", "so.me tex\\t") + .buildOrThrow()); + + // Add another document to make sure '.' is escaped and not treated as any character + index(indexName, ImmutableMap.builder() + .put("keyword_column", "soome tex\\t") + .put("text_column", "soome tex\\t") + .buildOrThrow()); + + assertThat(query("" + + "SELECT " + + "keyword_column " + + "FROM " + indexName + " " + + "WHERE keyword_column LIKE 's_.m%ex\\t'")) + .matches("VALUES VARCHAR 'so.me tex\\t'") + .isFullyPushedDown(); + + assertThat(query("" + + "SELECT " + + "text_column " + + "FROM " + indexName + " " + + "WHERE text_column LIKE 's_.m%ex\\t'")) + .matches("VALUES VARCHAR 'so.me tex\\t'"); + } + @Test public void testDataTypes() throws IOException diff --git a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchMetadata.java b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchMetadata.java new file mode 100644 index 000000000000..0d967f0001f5 --- /dev/null +++ b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchMetadata.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.elasticsearch; + +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class TestElasticsearchMetadata +{ + @Test + public void testLikeToRegexp() + { + assertEquals(ElasticsearchMetadata.likeToRegexp("a_b_c"), "a.b.c"); + assertEquals(ElasticsearchMetadata.likeToRegexp("a%b%c"), "a.*b.*c"); + assertEquals(ElasticsearchMetadata.likeToRegexp("a%b_c"), "a.*b.c"); + assertEquals(ElasticsearchMetadata.likeToRegexp("a[b"), "a\\[b"); + assertEquals(ElasticsearchMetadata.likeToRegexp("a_\\b"), "a.\\\\b"); + } +}