Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert some classes to record in Cassandra #21645

Merged
merged 6 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public CassandraClusteringPredicatesExtractor(CassandraTypeManager cassandraType

public String getClusteringKeyPredicates()
{
return clusteringPushDownResult.getDomainQuery();
return clusteringPushDownResult.domainQuery();
}

public TupleDomain<ColumnHandle> getUnenforcedConstraints()
Expand Down Expand Up @@ -182,25 +182,17 @@ private String translateRangeIntoCql(CassandraColumnHandle columnHandle, Range r
return upperBoundPredicate;
}

private static class ClusteringPushDownResult
private record ClusteringPushDownResult(Set<ColumnHandle> fullyPushedColumnPredicates, String domainQuery)
{
private final Set<ColumnHandle> fullyPushedColumnPredicates;
private final String domainQuery;

public ClusteringPushDownResult(Set<ColumnHandle> fullyPushedColumnPredicates, String domainQuery)
private ClusteringPushDownResult
{
this.fullyPushedColumnPredicates = ImmutableSet.copyOf(requireNonNull(fullyPushedColumnPredicates, "fullyPushedColumnPredicates is null"));
this.domainQuery = requireNonNull(domainQuery);
fullyPushedColumnPredicates = ImmutableSet.copyOf(requireNonNull(fullyPushedColumnPredicates, "fullyPushedColumnPredicates is null"));
requireNonNull(domainQuery);
}

public boolean hasBeenFullyPushed(ColumnHandle column)
{
return fullyPushedColumnPredicates.contains(column);
}

public String getDomainQuery()
{
return domainQuery;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public CassandraTableHandle getTableHandle(ConnectorSession session, SchemaTable
{
requireNonNull(tableName, "tableName is null");
try {
return new CassandraTableHandle(cassandraSession.getTable(tableName).getTableHandle());
return new CassandraTableHandle(cassandraSession.getTable(tableName).tableHandle());
}
catch (TableNotFoundException | SchemaNotFoundException e) {
// table was not found
Expand Down Expand Up @@ -153,7 +153,7 @@ private static SchemaTableName getSchemaTableName(CassandraTableHandle handle)
private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)
{
CassandraTable table = cassandraSession.getTable(tableName);
List<ColumnMetadata> columns = table.getColumns().stream()
List<ColumnMetadata> columns = table.columns().stream()
.map(CassandraColumnHandle::getColumnMetadata)
.collect(toList());
return new ConnectorTableMetadata(tableName, columns);
Expand Down Expand Up @@ -190,7 +190,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
requireNonNull(tableHandle, "tableHandle is null");
CassandraTable table = cassandraSession.getTable(getTableName(tableHandle));
ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (CassandraColumnHandle columnHandle : table.getColumns()) {
for (CassandraColumnHandle columnHandle : table.columns()) {
columnHandles.put(cqlNameToSqlName(columnHandle.name()).toLowerCase(ENGLISH), columnHandle);
}
return columnHandles.buildOrThrow();
Expand Down Expand Up @@ -244,16 +244,16 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C

String clusteringKeyPredicates = "";
TupleDomain<ColumnHandle> unenforcedConstraint;
if (partitionResult.isUnpartitioned() || partitionResult.isIndexedColumnPredicatePushdown()) {
if (partitionResult.unpartitioned() || partitionResult.indexedColumnPredicatePushdown()) {
// When the filter is missing at least one of the partition keys or when the table is not partitioned,
// use the raw unenforced constraint of the partitionResult
unenforcedConstraint = partitionResult.getUnenforcedConstraint();
unenforcedConstraint = partitionResult.unenforcedConstraint();
}
else {
CassandraClusteringPredicatesExtractor clusteringPredicatesExtractor = new CassandraClusteringPredicatesExtractor(
cassandraTypeManager,
cassandraSession.getTable(handle.getSchemaTableName()).getClusteringKeyColumns(),
partitionResult.getUnenforcedConstraint(),
cassandraSession.getTable(handle.getSchemaTableName()).clusteringKeyColumns(),
partitionResult.unenforcedConstraint(),
cassandraSession.getCassandraVersion());
clusteringKeyPredicates = clusteringPredicatesExtractor.getClusteringKeyPredicates();
unenforcedConstraint = clusteringPredicatesExtractor.getUnenforcedConstraints();
Expand All @@ -262,7 +262,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
Optional<List<CassandraPartition>> currentPartitions = handle.getPartitions();
if (currentPartitions.isPresent() &&
// TODO: we should skip only when new table handle does not narrow down enforced predicate
currentPartitions.get().containsAll(partitionResult.getPartitions()) &&
currentPartitions.get().containsAll(partitionResult.partitions()) &&
handle.getClusteringKeyPredicates().equals(clusteringKeyPredicates)) {
return Optional.empty();
}
Expand All @@ -271,7 +271,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
new ConstraintApplicationResult<>(new CassandraTableHandle(new CassandraNamedRelationHandle(
handle.getSchemaName(),
handle.getTableName(),
Optional.of(partitionResult.getPartitions()),
Optional.of(partitionResult.partitions()),
// TODO this should probably be AND-ed with handle.getClusteringKeyPredicates()
clusteringKeyPredicates)),
unenforcedConstraint,
Expand Down Expand Up @@ -385,7 +385,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
}

SchemaTableName schemaTableName = new SchemaTableName(table.getSchemaName(), table.getTableName());
List<CassandraColumnHandle> columns = cassandraSession.getTable(schemaTableName).getColumns();
List<CassandraColumnHandle> columns = cassandraSession.getTable(schemaTableName).columns();
List<String> columnNames = columns.stream()
.filter(columnHandle -> !isHiddenIdColumn(columnHandle))
.map(CassandraColumnHandle::name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private List<CassandraPartition> getCassandraPartitions(CassandraTable table, Tu
private List<Set<Object>> getPartitionKeysList(CassandraTable table, TupleDomain<ColumnHandle> tupleDomain)
{
ImmutableList.Builder<Set<Object>> partitionColumnValues = ImmutableList.builder();
for (CassandraColumnHandle columnHandle : table.getPartitionKeyColumns()) {
for (CassandraColumnHandle columnHandle : table.partitionKeyColumns()) {
Domain domain = tupleDomain.getDomains().get().get(columnHandle);

// if there is no constraint on a partition key, return an empty set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.cassandra;

import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.predicate.TupleDomain;

Expand All @@ -21,33 +22,20 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.util.Objects.requireNonNull;

public class CassandraPartitionResult
public record CassandraPartitionResult(List<CassandraPartition> partitions, TupleDomain<ColumnHandle> unenforcedConstraint)
{
private final List<CassandraPartition> partitions;
private final TupleDomain<ColumnHandle> unenforcedConstraint;

public CassandraPartitionResult(List<CassandraPartition> partitions, TupleDomain<ColumnHandle> unenforcedConstraint)
{
this.partitions = requireNonNull(partitions, "partitions is null");
this.unenforcedConstraint = requireNonNull(unenforcedConstraint, "unenforcedConstraint is null");
}

public List<CassandraPartition> getPartitions()
{
return partitions;
}

public TupleDomain<ColumnHandle> getUnenforcedConstraint()
public CassandraPartitionResult
{
return unenforcedConstraint;
partitions = ImmutableList.copyOf(requireNonNull(partitions, "partitions is null"));
requireNonNull(unenforcedConstraint, "unenforcedConstraint is null");
}

public boolean isUnpartitioned()
public boolean unpartitioned()
{
return partitions.size() == 1 && getOnlyElement(partitions).isUnpartitioned();
}

public boolean isIndexedColumnPredicatePushdown()
public boolean indexedColumnPredicatePushdown()
{
return partitions.size() == 1 && getOnlyElement(partitions).isIndexedColumnPredicatePushdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,10 @@ public CassandraTable getTable(SchemaTableName schemaTableName)

// column ordering
List<ExtraColumnMetadata> extras = extraColumnMetadataCodec.fromJson(columnOrderingString);
List<String> explicitColumnOrder = new ArrayList<>(ImmutableList.copyOf(transform(extras, ExtraColumnMetadata::getName)));
List<String> explicitColumnOrder = new ArrayList<>(ImmutableList.copyOf(transform(extras, ExtraColumnMetadata::name)));
hiddenColumns = extras.stream()
.filter(ExtraColumnMetadata::isHidden)
.map(ExtraColumnMetadata::getName)
.filter(ExtraColumnMetadata::hidden)
.map(ExtraColumnMetadata::name)
.collect(toImmutableSet());

// add columns not in the comment to the ordering
Expand Down Expand Up @@ -379,12 +379,12 @@ private Optional<CassandraColumnHandle> buildColumnHandle(RelationMetadata table
*
* @param table the table to get partitions from
* @param filterPrefixes the list of possible values for each partition key.
* Order of values should match {@link CassandraTable#getPartitionKeyColumns()}
* Order of values should match {@link CassandraTable#partitionKeyColumns()}
* @return list of {@link CassandraPartition}
*/
public List<CassandraPartition> getPartitions(CassandraTable table, List<Set<Object>> filterPrefixes)
{
List<CassandraColumnHandle> partitionKeyColumns = table.getPartitionKeyColumns();
List<CassandraColumnHandle> partitionKeyColumns = table.partitionKeyColumns();

if (filterPrefixes.size() != partitionKeyColumns.size()) {
return ImmutableList.of(CassandraPartition.UNPARTITIONED);
Expand Down Expand Up @@ -465,8 +465,8 @@ public ResultSet execute(Statement<?> statement)

private Iterable<Row> queryPartitionKeysWithInClauses(CassandraTable table, List<Set<Object>> filterPrefixes)
{
CassandraNamedRelationHandle tableHandle = table.getTableHandle();
List<CassandraColumnHandle> partitionKeyColumns = table.getPartitionKeyColumns();
CassandraNamedRelationHandle tableHandle = table.tableHandle();
List<CassandraColumnHandle> partitionKeyColumns = table.partitionKeyColumns();

Select partitionKeys = selectDistinctFrom(tableHandle, partitionKeyColumns)
.where(getInRelations(partitionKeyColumns, filterPrefixes));
Expand All @@ -477,8 +477,8 @@ private Iterable<Row> queryPartitionKeysWithInClauses(CassandraTable table, List

private Iterable<Row> queryPartitionKeysLegacyWithMultipleQueries(CassandraTable table, List<Set<Object>> filterPrefixes)
{
CassandraNamedRelationHandle tableHandle = table.getTableHandle();
List<CassandraColumnHandle> partitionKeyColumns = table.getPartitionKeyColumns();
CassandraNamedRelationHandle tableHandle = table.tableHandle();
List<CassandraColumnHandle> partitionKeyColumns = table.partitionKeyColumns();

Set<List<Object>> filterCombinations = Sets.cartesianProduct(filterPrefixes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public ConnectorSplitSource getSplits(
}
else {
CassandraPartitionResult partitionResult = partitionManager.getPartitions(cassandraTableHandle, TupleDomain.all());
partitions = partitionResult.getPartitions();
partitions = partitionResult.partitions();
clusteringKeyPredicates = extractClusteringKeyPredicates(partitionResult, cassandraTableHandle, cassandraSession);
}

Expand All @@ -124,23 +124,23 @@ public ConnectorSplitSource getSplits(

private String extractClusteringKeyPredicates(CassandraPartitionResult partitionResult, CassandraNamedRelationHandle tableHandle, CassandraSession session)
{
if (partitionResult.isUnpartitioned()) {
if (partitionResult.unpartitioned()) {
return "";
}

CassandraClusteringPredicatesExtractor clusteringPredicatesExtractor = new CassandraClusteringPredicatesExtractor(
cassandraTypeManager,
session.getTable(tableHandle.getSchemaTableName()).getClusteringKeyColumns(),
partitionResult.getUnenforcedConstraint(),
session.getTable(tableHandle.getSchemaTableName()).clusteringKeyColumns(),
partitionResult.unenforcedConstraint(),
session.getCassandraVersion());
return clusteringPredicatesExtractor.getClusteringKeyPredicates();
}

private List<ConnectorSplit> getSplitsByTokenRange(CassandraTable table, String partitionId, Optional<Long> sessionSplitsPerNode)
{
String schema = table.getTableHandle().getSchemaName();
String tableName = table.getTableHandle().getTableName();
String tokenExpression = table.getTokenExpression();
String schema = table.tableHandle().getSchemaName();
String tableName = table.tableHandle().getTableName();
String tokenExpression = table.tokenExpression();

ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder();
List<CassandraTokenSplitManager.TokenSplit> tokenSplits = tokenSplitMgr.getSplits(schema, tableName, sessionSplitsPerNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,35 @@

import java.util.List;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class CassandraTable
public record CassandraTable(CassandraNamedRelationHandle tableHandle, List<CassandraColumnHandle> columns)
{
private final CassandraNamedRelationHandle tableHandle;
private final List<CassandraColumnHandle> columns;

public CassandraTable(CassandraNamedRelationHandle tableHandle, List<CassandraColumnHandle> columns)
public CassandraTable
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
requireNonNull(tableHandle, "tableHandle is null");
columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
}

public List<CassandraColumnHandle> getColumns()
{
return columns;
}

public CassandraNamedRelationHandle getTableHandle()
{
return tableHandle;
}

public List<CassandraColumnHandle> getPartitionKeyColumns()
public List<CassandraColumnHandle> partitionKeyColumns()
{
return columns.stream()
.filter(CassandraColumnHandle::partitionKey)
.collect(toImmutableList());
}

public List<CassandraColumnHandle> getClusteringKeyColumns()
public List<CassandraColumnHandle> clusteringKeyColumns()
{
return columns.stream()
.filter(CassandraColumnHandle::clusteringKey)
.collect(toImmutableList());
}

public String getTokenExpression()
public String tokenExpression()
{
StringBuilder sb = new StringBuilder();
for (CassandraColumnHandle column : getPartitionKeyColumns()) {
for (CassandraColumnHandle column : partitionKeyColumns()) {
if (sb.length() == 0) {
sb.append("token(");
}
Expand All @@ -72,31 +58,4 @@ public String getTokenExpression()
sb.append(")");
return sb.toString();
}

@Override
public int hashCode()
{
return tableHandle.hashCode();
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (!(obj instanceof CassandraTable)) {
return false;
}
CassandraTable that = (CassandraTable) obj;
return this.tableHandle.equals(that.tableHandle);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("tableHandle", tableHandle)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public long getTotalPartitionsCount(String keyspace, String table, Optional<Long
}
List<SizeEstimate> estimates = session.getSizeEstimates(keyspace, table);
return estimates.stream()
.mapToLong(SizeEstimate::getPartitionsCount)
.mapToLong(SizeEstimate::partitionsCount)
.sum();
}

Expand Down
Loading