Skip to content

Commit

Permalink
Remove TableLayout from Elasticsearch connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveen2112 committed Jun 4, 2019
1 parent bee2029 commit aa5667f
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.prestosql.spi.connector.ConnectorHandleResolver;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;

public class ElasticsearchHandleResolver
Expand All @@ -29,12 +28,6 @@ public Class<? extends ConnectorTableHandle> getTableHandleClass()
return ElasticsearchTableHandle.class;
}

@Override
public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
{
return ElasticsearchTableLayoutHandle.class;
}

@Override
public Class<? extends ColumnHandle> getColumnHandleClass()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@
import io.prestosql.spi.connector.ConnectorMetadata;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableLayout;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTableLayoutResult;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorTableProperties;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.ConstraintApplicationResult;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SchemaTablePrefix;
import io.prestosql.spi.connector.TableNotFoundException;
import io.prestosql.spi.predicate.TupleDomain;

import javax.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -67,20 +66,6 @@ public ElasticsearchTableHandle getTableHandle(ConnectorSession session, SchemaT
return new ElasticsearchTableHandle(tableName.getSchemaName(), tableName.getTableName());
}

@Override
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional<Set<ColumnHandle>> desiredColumns)
{
ElasticsearchTableHandle handle = (ElasticsearchTableHandle) table;
ConnectorTableLayout layout = new ConnectorTableLayout(new ElasticsearchTableLayoutHandle(handle, constraint.getSummary()));
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
}

@Override
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle)
{
return new ConnectorTableLayout(handle);
}

@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
Expand Down Expand Up @@ -145,6 +130,37 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return columns.build();
}

@Override
public boolean usesLegacyTableLayouts()
{
return false;
}

@Override
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table)
{
return new ConnectorTableProperties();
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle table, Constraint constraint)
{
ElasticsearchTableHandle handle = (ElasticsearchTableHandle) table;

TupleDomain<ColumnHandle> oldDomain = handle.getConstraint();
TupleDomain<ColumnHandle> newDomain = oldDomain.intersect(constraint.getSummary());
if (oldDomain.equals(newDomain)) {
return Optional.empty();
}

handle = new ElasticsearchTableHandle(
handle.getSchemaName(),
handle.getTableName(),
handle.getConstraint());

return Optional.of(new ConstraintApplicationResult<>(handle, constraint.getSummary()));
}

private Optional<ConnectorTableMetadata> getTableMetadata(SchemaTableName tableName)
{
ElasticsearchTableDescription table = client.getTable(tableName.getSchemaName(), tableName.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ public class ElasticsearchQueryBuilder
private final String index;
private final String type;

public ElasticsearchQueryBuilder(List<ElasticsearchColumnHandle> columnHandles, ElasticsearchConnectorConfig config, ElasticsearchSplit split)
public ElasticsearchQueryBuilder(List<ElasticsearchColumnHandle> columnHandles, ElasticsearchConnectorConfig config, ElasticsearchSplit split, ElasticsearchTableHandle table)
{
requireNonNull(columnHandles, "columnHandles is null");
requireNonNull(config, "config is null");
requireNonNull(split, "split is null");

columns = columnHandles;
tupleDomain = split.getTupleDomain();
tupleDomain = table.getConstraint();
index = split.getIndex();
shard = split.getShard();
type = split.getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class ElasticsearchRecordCursor
private long totalBytes;
private List<Object> fields;

public ElasticsearchRecordCursor(List<ElasticsearchColumnHandle> columnHandles, ElasticsearchConnectorConfig config, ElasticsearchSplit split)
public ElasticsearchRecordCursor(List<ElasticsearchColumnHandle> columnHandles, ElasticsearchConnectorConfig config, ElasticsearchSplit split, ElasticsearchTableHandle table)
{
requireNonNull(columnHandles, "columnHandle is null");
requireNonNull(config, "config is null");
Expand All @@ -80,7 +80,7 @@ public ElasticsearchRecordCursor(List<ElasticsearchColumnHandle> columnHandles,
for (int i = 0; i < columnHandles.size(); i++) {
jsonPathToIndex.put(columnHandles.get(i).getColumnJsonPath(), i);
}
this.builder = new ElasticsearchQueryBuilder(columnHandles, config, split);
this.builder = new ElasticsearchQueryBuilder(columnHandles, config, split, table);
this.searchHits = sendElasticsearchQuery(builder).iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ public class ElasticsearchRecordSet
private final List<ElasticsearchColumnHandle> columnHandles;
private final List<Type> columnTypes;
private final ElasticsearchSplit split;
private final ElasticsearchTableHandle table;
private final ElasticsearchConnectorConfig config;

public ElasticsearchRecordSet(ElasticsearchSplit split, ElasticsearchConnectorConfig config, List<ElasticsearchColumnHandle> columnHandles)
public ElasticsearchRecordSet(ElasticsearchSplit split, ElasticsearchTableHandle table, ElasticsearchConnectorConfig config, List<ElasticsearchColumnHandle> columnHandles)
{
this.split = requireNonNull(split, "split is null");
this.table = requireNonNull(table, "table is null");
this.config = requireNonNull(config, "config is null");
this.columnHandles = requireNonNull(columnHandles, "columnHandles is null");
this.columnTypes = columnHandles.stream()
Expand All @@ -49,6 +51,6 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
return new ElasticsearchRecordCursor(columnHandles, config, split);
return new ElasticsearchRecordCursor(columnHandles, config, split, table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ public ElasticsearchRecordSetProvider(ElasticsearchConnectorConfig config)
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns)
{
requireNonNull(split, "split is null");
requireNonNull(table, "table is null");
ElasticsearchSplit elasticsearchSplit = (ElasticsearchSplit) split;

ElasticsearchTableHandle elasticsearchTable = (ElasticsearchTableHandle) table;
ImmutableList.Builder<ElasticsearchColumnHandle> handles = ImmutableList.builder();
for (ColumnHandle handle : columns) {
handles.add((ElasticsearchColumnHandle) handle);
}

return new ElasticsearchRecordSet(elasticsearchSplit, config, handles.build());
return new ElasticsearchRecordSet(elasticsearchSplit, elasticsearchTable, config, handles.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.prestosql.spi.HostAddress;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.predicate.TupleDomain;

import java.util.List;

Expand All @@ -34,23 +32,20 @@ public class ElasticsearchSplit
private final int shard;
private final String searchNode;
private final int port;
private final TupleDomain<ColumnHandle> tupleDomain;

@JsonCreator
public ElasticsearchSplit(
@JsonProperty("index") String index,
@JsonProperty("type") String type,
@JsonProperty("shard") int shard,
@JsonProperty("searchNode") String searchNode,
@JsonProperty("port") int port,
@JsonProperty("tupleDomain") TupleDomain<ColumnHandle> tupleDomain)
@JsonProperty("port") int port)
{
this.index = requireNonNull(index, "index is null");
this.type = requireNonNull(type, "index is null");
this.searchNode = requireNonNull(searchNode, "searchNode is null");
this.port = port;
this.shard = shard;
this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null");
}

@JsonProperty
Expand Down Expand Up @@ -83,12 +78,6 @@ public int getPort()
return port;
}

@JsonProperty
public TupleDomain<ColumnHandle> getTupleDomain()
{
return tupleDomain;
}

@Override
public boolean isRemotelyAccessible()
{
Expand Down Expand Up @@ -116,7 +105,6 @@ public String toString()
.addValue(shard)
.addValue(port)
.addValue(searchNode)
.addValue(tupleDomain)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.ConnectorSplitSource;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.FixedSplitSource;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
Expand All @@ -44,27 +44,25 @@ public ElasticsearchSplitManager(ElasticsearchClient client)
}

@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingStrategy splitSchedulingStrategy)
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableHandle table, SplitSchedulingStrategy splitSchedulingStrategy)
{
ElasticsearchTableLayoutHandle layoutHandle = (ElasticsearchTableLayoutHandle) layout;
ElasticsearchTableHandle tableHandle = layoutHandle.getTable();
ElasticsearchTableDescription table = client.getTable(tableHandle.getSchemaName(), tableHandle.getTableName());
ElasticsearchTableHandle tableHandle = (ElasticsearchTableHandle) table;
ElasticsearchTableDescription tableDescription = client.getTable(tableHandle.getSchemaName(), tableHandle.getTableName());
verify(table != null, "Table no longer exists: %s", tableHandle.toString());

List<String> indices = client.getIndices(table);
List<String> indices = client.getIndices(tableDescription);
ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder();
for (String index : indices) {
ClusterSearchShardsResponse response = client.getSearchShards(index, table);
ClusterSearchShardsResponse response = client.getSearchShards(index, tableDescription);
DiscoveryNode[] nodes = response.getNodes();
for (ClusterSearchShardsGroup group : response.getGroups()) {
int nodeIndex = group.getShardId().getId() % nodes.length;
ElasticsearchSplit split = new ElasticsearchSplit(
index,
table.getType(),
tableDescription.getType(),
group.getShardId().getId(),
nodes[nodeIndex].getHostName(),
nodes[nodeIndex].getAddress().getPort(),
layoutHandle.getTupleDomain());
nodes[nodeIndex].getAddress().getPort());
splits.add(split);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.predicate.TupleDomain;

import java.util.Objects;

Expand All @@ -28,15 +30,23 @@ public final class ElasticsearchTableHandle
implements ConnectorTableHandle
{
private final SchemaTableName schemaTableName;
private final TupleDomain<ColumnHandle> constraint;

public ElasticsearchTableHandle(String schemaName, String tableName)
{
this(schemaName, tableName, TupleDomain.all());
}

@JsonCreator
public ElasticsearchTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName)
@JsonProperty("tableName") String tableName,
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint)
{
requireNonNull(schemaName, "schemaName is null");
requireNonNull(tableName, "tableName is null");
this.schemaTableName = new SchemaTableName(schemaName.toLowerCase(ENGLISH), tableName.toLowerCase(ENGLISH));
this.constraint = requireNonNull(constraint, "constraint is null");
}

@JsonProperty
Expand All @@ -51,6 +61,12 @@ public String getTableName()
return schemaTableName.getTableName();
}

@JsonProperty
public TupleDomain<ColumnHandle> getConstraint()
{
return constraint;
}

public SchemaTableName getSchemaTableName()
{
return schemaTableName;
Expand Down
Loading

0 comments on commit aa5667f

Please sign in to comment.