Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove TableLayout from Accumulo connector #823

Merged
merged 1 commit into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,17 @@
import io.prestosql.plugin.accumulo.model.AccumuloColumnHandle;
import io.prestosql.plugin.accumulo.model.AccumuloSplit;
import io.prestosql.plugin.accumulo.model.AccumuloTableHandle;
import io.prestosql.plugin.accumulo.model.AccumuloTableLayoutHandle;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorHandleResolver;
import io.prestosql.spi.connector.ConnectorInsertTableHandle;
import io.prestosql.spi.connector.ConnectorOutputTableHandle;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;

public class AccumuloHandleResolver
implements ConnectorHandleResolver
{
@Override
public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
{
return AccumuloTableLayoutHandle.class;
}

@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.prestosql.plugin.accumulo.metadata.AccumuloView;
import io.prestosql.plugin.accumulo.model.AccumuloColumnHandle;
import io.prestosql.plugin.accumulo.model.AccumuloTableHandle;
import io.prestosql.plugin.accumulo.model.AccumuloTableLayoutHandle;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ColumnMetadata;
Expand All @@ -32,15 +31,15 @@
import io.prestosql.spi.connector.ConnectorOutputTableHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableLayout;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTableLayoutResult;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorTableProperties;
import io.prestosql.spi.connector.ConnectorViewDefinition;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.ConstraintApplicationResult;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SchemaTablePrefix;
import io.prestosql.spi.connector.TableNotFoundException;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.statistics.ComputedStatistics;

import javax.inject.Inject;
Expand Down Expand Up @@ -250,24 +249,6 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
return null;
}

@Override
public List<ConnectorTableLayoutResult> getTableLayouts(
ConnectorSession session,
ConnectorTableHandle table,
Constraint constraint,
Optional<Set<ColumnHandle>> desiredColumns)
{
AccumuloTableHandle tableHandle = (AccumuloTableHandle) table;
ConnectorTableLayout layout = new ConnectorTableLayout(new AccumuloTableLayoutHandle(tableHandle, constraint.getSummary()));
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
}

@Override
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle)
{
return new ConnectorTableLayout(handle);
}

@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
Expand Down Expand Up @@ -352,6 +333,41 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return columns.build();
}

@Override
public boolean usesLegacyTableLayouts()
{
return false;
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle table, Constraint constraint)
{
AccumuloTableHandle handle = (AccumuloTableHandle) table;

TupleDomain<ColumnHandle> oldDomain = handle.getConstraint();
TupleDomain<ColumnHandle> newDomain = oldDomain.intersect(constraint.getSummary());
if (oldDomain.equals(newDomain)) {
return Optional.empty();
}

handle = new AccumuloTableHandle(
handle.getSchema(),
handle.getTable(),
handle.getRowId(),
newDomain,
handle.isExternal(),
handle.getSerializerClassName(),
handle.getScanAuthorizations());

return Optional.of(new ConstraintApplicationResult<>(handle, constraint.getSummary()));
}

@Override
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle handle)
{
return new ConnectorTableProperties();
}

private void checkNoRollback()
{
checkState(rollbackAction.get() == null, "Cannot begin a new write while in an existing one");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
import io.prestosql.plugin.accumulo.model.AccumuloColumnHandle;
import io.prestosql.plugin.accumulo.model.AccumuloSplit;
import io.prestosql.plugin.accumulo.model.AccumuloTableHandle;
import io.prestosql.plugin.accumulo.model.AccumuloTableLayoutHandle;
import io.prestosql.plugin.accumulo.model.TabletSplitMetadata;
import io.prestosql.plugin.accumulo.model.WrappedRange;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.ConnectorSplitSource;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.FixedSplitSource;
import io.prestosql.spi.predicate.Domain;
Expand All @@ -53,30 +52,28 @@ public AccumuloSplitManager(AccumuloClient client)
}

@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingStrategy splitSchedulingStrategy)
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableHandle tableHandle, SplitSchedulingStrategy splitSchedulingStrategy)
{
AccumuloTableLayoutHandle layoutHandle = (AccumuloTableLayoutHandle) layout;
AccumuloTableHandle tableHandle = layoutHandle.getTable();
AccumuloTableHandle handle = (AccumuloTableHandle) tableHandle;

String schemaName = tableHandle.getSchema();
String tableName = tableHandle.getTable();
String rowIdName = tableHandle.getRowId();
String schemaName = handle.getSchema();
String tableName = handle.getTable();
String rowIdName = handle.getRowId();

// Get non-row ID column constraints
List<AccumuloColumnConstraint> constraints = getColumnConstraints(rowIdName, layoutHandle.getConstraint());
List<AccumuloColumnConstraint> constraints = getColumnConstraints(rowIdName, handle.getConstraint());

// Get the row domain column range
Optional<Domain> rDom = getRangeDomain(rowIdName, layoutHandle.getConstraint());
Optional<Domain> rDom = getRangeDomain(rowIdName, handle.getConstraint());

// Call out to our client to retrieve all tablet split metadata using the row ID domain and the secondary index
List<TabletSplitMetadata> tabletSplits = client.getTabletSplits(session, schemaName, tableName, rDom, constraints, tableHandle.getSerializerInstance());
List<TabletSplitMetadata> tabletSplits = client.getTabletSplits(session, schemaName, tableName, rDom, constraints, handle.getSerializerInstance());

// Pack the tablet split metadata into a connector split
ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
for (TabletSplitMetadata splitMetadata : tabletSplits) {
AccumuloSplit split = new AccumuloSplit(
splitMetadata.getRanges().stream().map(WrappedRange::new).collect(Collectors.toList()),
constraints,
splitMetadata.getHostPort());
cSplits.add(split);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.prestosql.plugin.accumulo.Types;
import io.prestosql.plugin.accumulo.model.AccumuloColumnConstraint;
import io.prestosql.plugin.accumulo.model.AccumuloColumnHandle;
import io.prestosql.plugin.accumulo.serializers.AccumuloRowSerializer;
import io.prestosql.spi.PrestoException;
Expand Down Expand Up @@ -80,16 +79,14 @@ public AccumuloRecordCursor(
AccumuloRowSerializer serializer,
BatchScanner scanner,
String rowIdName,
List<AccumuloColumnHandle> columnHandles,
List<AccumuloColumnConstraint> constraints)
List<AccumuloColumnHandle> columnHandles)
{
this.columnHandles = requireNonNull(columnHandles, "columnHandles is null");
this.scanner = requireNonNull(scanner, "scanner is null");
this.serializer = requireNonNull(serializer, "serializer is null");
this.serializer.setRowIdName(requireNonNull(rowIdName, "rowIdName is null"));

requireNonNull(columnHandles, "columnHandles is null");
requireNonNull(constraints, "constraints is null");

if (retrieveOnlyRowIds(rowIdName)) {
this.scanner.addScanIterator(new IteratorSetting(1, "firstentryiter", FirstEntryInRowIterator.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.collect.Iterables;
import io.airlift.log.Logger;
import io.prestosql.plugin.accumulo.conf.AccumuloSessionProperties;
import io.prestosql.plugin.accumulo.model.AccumuloColumnConstraint;
import io.prestosql.plugin.accumulo.model.AccumuloColumnHandle;
import io.prestosql.plugin.accumulo.model.AccumuloSplit;
import io.prestosql.plugin.accumulo.model.AccumuloTableHandle;
Expand Down Expand Up @@ -54,7 +53,6 @@ public class AccumuloRecordSet
private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();

private final List<AccumuloColumnHandle> columnHandles;
private final List<AccumuloColumnConstraint> constraints;
private final List<Type> columnTypes;
private final AccumuloRowSerializer serializer;
private final BatchScanner scanner;
Expand All @@ -71,7 +69,7 @@ public AccumuloRecordSet(
requireNonNull(session, "session is null");
requireNonNull(split, "split is null");
requireNonNull(username, "username is null");
constraints = requireNonNull(split.getConstraints(), "constraints is null");
requireNonNull(table, "table is null");

rowIdName = table.getRowId();

Expand Down Expand Up @@ -140,6 +138,6 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
return new AccumuloRecordCursor(serializer, scanner, rowIdName, columnHandles, constraints);
return new AccumuloRecordCursor(serializer, scanner, rowIdName, columnHandles);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,13 @@ public class AccumuloSplit
{
private final Optional<String> hostPort;
private final List<HostAddress> addresses;
private final List<AccumuloColumnConstraint> constraints;
private final List<WrappedRange> ranges;

@JsonCreator
public AccumuloSplit(
@JsonProperty("ranges") List<WrappedRange> ranges,
@JsonProperty("constraints") List<AccumuloColumnConstraint> constraints,
@JsonProperty("hostPort") Optional<String> hostPort)
{
this.constraints = ImmutableList.copyOf(requireNonNull(constraints, "constraints is null"));
this.hostPort = requireNonNull(hostPort, "hostPort is null");
this.ranges = ImmutableList.copyOf(requireNonNull(ranges, "ranges is null"));

Expand Down Expand Up @@ -73,12 +70,6 @@ public List<Range> getRanges()
return ranges.stream().map(WrappedRange::getRange).collect(Collectors.toList());
}

@JsonProperty
public List<AccumuloColumnConstraint> getConstraints()
{
return constraints;
}

@Override
public boolean isRemotelyAccessible()
{
Expand All @@ -103,7 +94,6 @@ public String toString()
return toStringHelper(this)
.add("addresses", addresses)
.add("numRanges", ranges.size())
.add("constraints", constraints)
.add("hostPort", hostPort)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import io.prestosql.plugin.accumulo.metadata.AccumuloTable;
import io.prestosql.plugin.accumulo.serializers.AccumuloRowSerializer;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorInsertTableHandle;
import io.prestosql.spi.connector.ConnectorOutputTableHandle;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.predicate.TupleDomain;

import java.util.Objects;
import java.util.Optional;
Expand All @@ -40,12 +42,25 @@ public final class AccumuloTableHandle
private final String schema;
private final String serializerClassName;
private final String table;
private final TupleDomain<ColumnHandle> constraint;

public AccumuloTableHandle(
String schema,
String table,
String rowId,
boolean external,
String serializerClassName,
Optional<String> scanAuthorizations)
{
this(schema, table, rowId, TupleDomain.all(), external, serializerClassName, scanAuthorizations);
}

@JsonCreator
public AccumuloTableHandle(
@JsonProperty("schema") String schema,
@JsonProperty("table") String table,
@JsonProperty("rowId") String rowId,
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint,
@JsonProperty("external") boolean external,
@JsonProperty("serializerClassName") String serializerClassName,
@JsonProperty("scanAuthorizations") Optional<String> scanAuthorizations)
Expand All @@ -56,6 +71,7 @@ public AccumuloTableHandle(
this.schema = requireNonNull(schema, "schema is null");
this.serializerClassName = requireNonNull(serializerClassName, "serializerClassName is null");
this.table = requireNonNull(table, "table is null");
this.constraint = requireNonNull(constraint, "constraints is null");
}

@JsonProperty
Expand Down Expand Up @@ -105,6 +121,12 @@ public boolean isExternal()
return external;
}

@JsonProperty
public TupleDomain<ColumnHandle> getConstraint()
{
return constraint;
}

public SchemaTableName toSchemaTableName()
{
return new SchemaTableName(schema, table);
Expand Down
Loading