diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchHandleResolver.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchHandleResolver.java index 50b1803d8af6..5fbf64c1cbea 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchHandleResolver.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchHandleResolver.java @@ -17,7 +17,6 @@ import io.prestosql.spi.connector.ConnectorHandleResolver; import io.prestosql.spi.connector.ConnectorSplit; import io.prestosql.spi.connector.ConnectorTableHandle; -import io.prestosql.spi.connector.ConnectorTableLayoutHandle; import io.prestosql.spi.connector.ConnectorTransactionHandle; public class ElasticsearchHandleResolver @@ -29,12 +28,6 @@ public Class getTableHandleClass() return ElasticsearchTableHandle.class; } - @Override - public Class getTableLayoutHandleClass() - { - return ElasticsearchTableLayoutHandle.class; - } - @Override public Class getColumnHandleClass() { diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchMetadata.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchMetadata.java index fd88b1711402..243447336a4c 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchMetadata.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchMetadata.java @@ -20,21 +20,20 @@ import io.prestosql.spi.connector.ConnectorMetadata; import io.prestosql.spi.connector.ConnectorSession; import io.prestosql.spi.connector.ConnectorTableHandle; -import io.prestosql.spi.connector.ConnectorTableLayout; -import io.prestosql.spi.connector.ConnectorTableLayoutHandle; -import io.prestosql.spi.connector.ConnectorTableLayoutResult; import io.prestosql.spi.connector.ConnectorTableMetadata; +import io.prestosql.spi.connector.ConnectorTableProperties; import io.prestosql.spi.connector.Constraint; +import io.prestosql.spi.connector.ConstraintApplicationResult; import io.prestosql.spi.connector.SchemaTableName; import io.prestosql.spi.connector.SchemaTablePrefix; import io.prestosql.spi.connector.TableNotFoundException; +import io.prestosql.spi.predicate.TupleDomain; import javax.inject.Inject; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import static java.util.Objects.requireNonNull; @@ -67,20 +66,6 @@ public ElasticsearchTableHandle getTableHandle(ConnectorSession session, SchemaT return new ElasticsearchTableHandle(tableName.getSchemaName(), tableName.getTableName()); } - @Override - public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional> desiredColumns) - { - ElasticsearchTableHandle handle = (ElasticsearchTableHandle) table; - ConnectorTableLayout layout = new ConnectorTableLayout(new ElasticsearchTableLayoutHandle(handle, constraint.getSummary())); - return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); - } - - @Override - public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) - { - return new ConnectorTableLayout(handle); - } - @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { @@ -145,6 +130,37 @@ public Map> listTableColumns(ConnectorSess return columns.build(); } + @Override + public boolean usesLegacyTableLayouts() + { + return false; + } + + @Override + public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table) + { + return new ConnectorTableProperties(); + } + + @Override + public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle table, Constraint constraint) + { + ElasticsearchTableHandle handle = (ElasticsearchTableHandle) table; + + TupleDomain oldDomain = handle.getConstraint(); + TupleDomain newDomain = oldDomain.intersect(constraint.getSummary()); + if (oldDomain.equals(newDomain)) { + return Optional.empty(); + } + + handle = new ElasticsearchTableHandle( + handle.getSchemaName(), + handle.getTableName(), + handle.getConstraint()); + + return Optional.of(new ConstraintApplicationResult<>(handle, constraint.getSummary())); + } + private Optional getTableMetadata(SchemaTableName tableName) { ElasticsearchTableDescription table = client.getTable(tableName.getSchemaName(), tableName.getTableName()); diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchQueryBuilder.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchQueryBuilder.java index adec9880da40..7903d30aa8e6 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchQueryBuilder.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchQueryBuilder.java @@ -68,14 +68,14 @@ public class ElasticsearchQueryBuilder private final String index; private final String type; - public ElasticsearchQueryBuilder(List columnHandles, ElasticsearchConnectorConfig config, ElasticsearchSplit split) + public ElasticsearchQueryBuilder(List columnHandles, ElasticsearchConnectorConfig config, ElasticsearchSplit split, ElasticsearchTableHandle table) { requireNonNull(columnHandles, "columnHandles is null"); requireNonNull(config, "config is null"); requireNonNull(split, "split is null"); columns = columnHandles; - tupleDomain = split.getTupleDomain(); + tupleDomain = table.getConstraint(); index = split.getIndex(); shard = split.getShard(); type = split.getType(); diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordCursor.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordCursor.java index a37f61430abb..9d53f7b40cc6 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordCursor.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordCursor.java @@ -66,7 +66,7 @@ public class ElasticsearchRecordCursor private long totalBytes; private List fields; - public ElasticsearchRecordCursor(List columnHandles, ElasticsearchConnectorConfig config, ElasticsearchSplit split) + public ElasticsearchRecordCursor(List columnHandles, ElasticsearchConnectorConfig config, ElasticsearchSplit split, ElasticsearchTableHandle table) { requireNonNull(columnHandles, "columnHandle is null"); requireNonNull(config, "config is null"); @@ -80,7 +80,7 @@ public ElasticsearchRecordCursor(List columnHandles, for (int i = 0; i < columnHandles.size(); i++) { jsonPathToIndex.put(columnHandles.get(i).getColumnJsonPath(), i); } - this.builder = new ElasticsearchQueryBuilder(columnHandles, config, split); + this.builder = new ElasticsearchQueryBuilder(columnHandles, config, split, table); this.searchHits = sendElasticsearchQuery(builder).iterator(); } diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordSet.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordSet.java index 4f2c19f456bf..f2f7c57c9a45 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordSet.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordSet.java @@ -28,11 +28,13 @@ public class ElasticsearchRecordSet private final List columnHandles; private final List columnTypes; private final ElasticsearchSplit split; + private final ElasticsearchTableHandle table; private final ElasticsearchConnectorConfig config; - public ElasticsearchRecordSet(ElasticsearchSplit split, ElasticsearchConnectorConfig config, List columnHandles) + public ElasticsearchRecordSet(ElasticsearchSplit split, ElasticsearchTableHandle table, ElasticsearchConnectorConfig config, List columnHandles) { this.split = requireNonNull(split, "split is null"); + this.table = requireNonNull(table, "table is null"); this.config = requireNonNull(config, "config is null"); this.columnHandles = requireNonNull(columnHandles, "columnHandles is null"); this.columnTypes = columnHandles.stream() @@ -49,6 +51,6 @@ public List getColumnTypes() @Override public RecordCursor cursor() { - return new ElasticsearchRecordCursor(columnHandles, config, split); + return new ElasticsearchRecordCursor(columnHandles, config, split, table); } } diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordSetProvider.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordSetProvider.java index 9b7445854fb8..91216b75959c 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordSetProvider.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchRecordSetProvider.java @@ -43,13 +43,14 @@ public ElasticsearchRecordSetProvider(ElasticsearchConnectorConfig config) public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List columns) { requireNonNull(split, "split is null"); + requireNonNull(table, "table is null"); ElasticsearchSplit elasticsearchSplit = (ElasticsearchSplit) split; - + ElasticsearchTableHandle elasticsearchTable = (ElasticsearchTableHandle) table; ImmutableList.Builder handles = ImmutableList.builder(); for (ColumnHandle handle : columns) { handles.add((ElasticsearchColumnHandle) handle); } - return new ElasticsearchRecordSet(elasticsearchSplit, config, handles.build()); + return new ElasticsearchRecordSet(elasticsearchSplit, elasticsearchTable, config, handles.build()); } } diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchSplit.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchSplit.java index 34d7c3e35a84..89ca71f0a5e0 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchSplit.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchSplit.java @@ -17,9 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.prestosql.spi.HostAddress; -import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.ConnectorSplit; -import io.prestosql.spi.predicate.TupleDomain; import java.util.List; @@ -34,7 +32,6 @@ public class ElasticsearchSplit private final int shard; private final String searchNode; private final int port; - private final TupleDomain tupleDomain; @JsonCreator public ElasticsearchSplit( @@ -42,15 +39,13 @@ public ElasticsearchSplit( @JsonProperty("type") String type, @JsonProperty("shard") int shard, @JsonProperty("searchNode") String searchNode, - @JsonProperty("port") int port, - @JsonProperty("tupleDomain") TupleDomain tupleDomain) + @JsonProperty("port") int port) { this.index = requireNonNull(index, "index is null"); this.type = requireNonNull(type, "index is null"); this.searchNode = requireNonNull(searchNode, "searchNode is null"); this.port = port; this.shard = shard; - this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null"); } @JsonProperty @@ -83,12 +78,6 @@ public int getPort() return port; } - @JsonProperty - public TupleDomain getTupleDomain() - { - return tupleDomain; - } - @Override public boolean isRemotelyAccessible() { @@ -116,7 +105,6 @@ public String toString() .addValue(shard) .addValue(port) .addValue(searchNode) - .addValue(tupleDomain) .toString(); } } diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchSplitManager.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchSplitManager.java index a48c59e1a5d8..a859f2e5f954 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchSplitManager.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchSplitManager.java @@ -18,7 +18,7 @@ import io.prestosql.spi.connector.ConnectorSplit; import io.prestosql.spi.connector.ConnectorSplitManager; import io.prestosql.spi.connector.ConnectorSplitSource; -import io.prestosql.spi.connector.ConnectorTableLayoutHandle; +import io.prestosql.spi.connector.ConnectorTableHandle; import io.prestosql.spi.connector.ConnectorTransactionHandle; import io.prestosql.spi.connector.FixedSplitSource; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; @@ -44,27 +44,25 @@ public ElasticsearchSplitManager(ElasticsearchClient client) } @Override - public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingStrategy splitSchedulingStrategy) + public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableHandle table, SplitSchedulingStrategy splitSchedulingStrategy) { - ElasticsearchTableLayoutHandle layoutHandle = (ElasticsearchTableLayoutHandle) layout; - ElasticsearchTableHandle tableHandle = layoutHandle.getTable(); - ElasticsearchTableDescription table = client.getTable(tableHandle.getSchemaName(), tableHandle.getTableName()); + ElasticsearchTableHandle tableHandle = (ElasticsearchTableHandle) table; + ElasticsearchTableDescription tableDescription = client.getTable(tableHandle.getSchemaName(), tableHandle.getTableName()); verify(table != null, "Table no longer exists: %s", tableHandle.toString()); - List indices = client.getIndices(table); + List indices = client.getIndices(tableDescription); ImmutableList.Builder splits = ImmutableList.builder(); for (String index : indices) { - ClusterSearchShardsResponse response = client.getSearchShards(index, table); + ClusterSearchShardsResponse response = client.getSearchShards(index, tableDescription); DiscoveryNode[] nodes = response.getNodes(); for (ClusterSearchShardsGroup group : response.getGroups()) { int nodeIndex = group.getShardId().getId() % nodes.length; ElasticsearchSplit split = new ElasticsearchSplit( index, - table.getType(), + tableDescription.getType(), group.getShardId().getId(), nodes[nodeIndex].getHostName(), - nodes[nodeIndex].getAddress().getPort(), - layoutHandle.getTupleDomain()); + nodes[nodeIndex].getAddress().getPort()); splits.add(split); } } diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchTableHandle.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchTableHandle.java index 4c4e3c1d8e7d..7d1f0f2b41f7 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchTableHandle.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchTableHandle.java @@ -16,8 +16,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Joiner; +import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.ConnectorTableHandle; import io.prestosql.spi.connector.SchemaTableName; +import io.prestosql.spi.predicate.TupleDomain; import java.util.Objects; @@ -28,15 +30,23 @@ public final class ElasticsearchTableHandle implements ConnectorTableHandle { private final SchemaTableName schemaTableName; + private final TupleDomain constraint; + + public ElasticsearchTableHandle(String schemaName, String tableName) + { + this(schemaName, tableName, TupleDomain.all()); + } @JsonCreator public ElasticsearchTableHandle( @JsonProperty("schemaName") String schemaName, - @JsonProperty("tableName") String tableName) + @JsonProperty("tableName") String tableName, + @JsonProperty("constraint") TupleDomain constraint) { requireNonNull(schemaName, "schemaName is null"); requireNonNull(tableName, "tableName is null"); this.schemaTableName = new SchemaTableName(schemaName.toLowerCase(ENGLISH), tableName.toLowerCase(ENGLISH)); + this.constraint = requireNonNull(constraint, "constraint is null"); } @JsonProperty @@ -51,6 +61,12 @@ public String getTableName() return schemaTableName.getTableName(); } + @JsonProperty + public TupleDomain getConstraint() + { + return constraint; + } + public SchemaTableName getSchemaTableName() { return schemaTableName; diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchTableLayoutHandle.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchTableLayoutHandle.java deleted file mode 100644 index 4284f75c22b2..000000000000 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchTableLayoutHandle.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.prestosql.elasticsearch; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import io.prestosql.spi.connector.ColumnHandle; -import io.prestosql.spi.connector.ConnectorTableLayoutHandle; -import io.prestosql.spi.predicate.TupleDomain; - -import java.util.Objects; - -import static java.util.Objects.requireNonNull; - -public class ElasticsearchTableLayoutHandle - implements ConnectorTableLayoutHandle -{ - private final ElasticsearchTableHandle table; - private final TupleDomain tupleDomain; - - @JsonCreator - public ElasticsearchTableLayoutHandle( - @JsonProperty("table") ElasticsearchTableHandle table, - @JsonProperty("tupleDomain") TupleDomain domain) - { - this.table = requireNonNull(table, "table is null"); - this.tupleDomain = requireNonNull(domain, "tupleDomain is null"); - } - - @JsonProperty - public ElasticsearchTableHandle getTable() - { - return table; - } - - @JsonProperty - public TupleDomain getTupleDomain() - { - return tupleDomain; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ElasticsearchTableLayoutHandle that = (ElasticsearchTableLayoutHandle) o; - return Objects.equals(table, that.table) && - Objects.equals(tupleDomain, that.tupleDomain); - } - - @Override - public int hashCode() - { - return Objects.hash(table, tupleDomain); - } - - @Override - public String toString() - { - return table.toString(); - } -}