Skip to content

Commit

Permalink
Add check for used Cassandra partition columns
Browse files Browse the repository at this point in the history
Thank to this we don't drop partition Predicates, that does not
match in Cassandra, totally out of the query. This prediction droping
results in returning to many rows.
  • Loading branch information
s2lomon authored and ebyhr committed Feb 21, 2022
1 parent 921e189 commit a123f44
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import static com.google.common.base.Predicates.in;
import static com.google.common.base.Predicates.not;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

Expand All @@ -51,7 +54,6 @@ public CassandraPartitionManager(CassandraSession cassandraSession)
public CassandraPartitionResult getPartitions(CassandraTableHandle cassandraTableHandle, TupleDomain<ColumnHandle> tupleDomain)
{
CassandraTable table = cassandraSession.getTable(cassandraTableHandle.getSchemaTableName());
List<CassandraColumnHandle> partitionKeys = table.getPartitionKeyColumns();

// fetch the partitions
List<CassandraPartition> allPartitions = getCassandraPartitions(table, tupleDomain);
Expand All @@ -69,8 +71,14 @@ public CassandraPartitionResult getPartitions(CassandraTableHandle cassandraTabl
remainingTupleDomain = tupleDomain;
}
else {
List<ColumnHandle> partitionColumns = ImmutableList.copyOf(partitionKeys);
remainingTupleDomain = tupleDomain.filter((column, domain) -> !partitionColumns.contains(column));
Set<ColumnHandle> usedPartitionColumns = partitions.stream()
.flatMap(partition -> Optional.ofNullable(partition.getTupleDomain())
.flatMap(partitionTupleDomain -> partitionTupleDomain.getDomains()
.map(Map::keySet)
.map(Set::stream))
.orElse(Stream.empty()))
.collect(toImmutableSet());
remainingTupleDomain = tupleDomain.filter((column, domain) -> !usedPartitionColumns.contains(column));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,21 @@ public void testPushdownAllTypesPartitionKeyPredicate()
assertEquals(result.getRowCount(), 1);
}

@Test
public void testPartitionPushdownsWithNotMatchingPredicate()
throws Exception
{
String table = "partition_not_pushed_down_keys";
session.execute(format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, table));
session.execute(format("CREATE TABLE %s.%s ( id varchar, trino_filter_col int, PRIMARY KEY (id))", KEYSPACE, table));
session.execute(format("INSERT INTO %s.%s(id, trino_filter_col) VALUES ('2', 0)", KEYSPACE, table));
server.refreshSizeEstimates(KEYSPACE, table);

String sql = "SELECT 1 FROM " + table + " WHERE id = '1' AND trino_filter_col = 0";

assertThat(execute(sql).getMaterializedRows().size()).isEqualTo(0);
}

@Test
public void testPartitionKeyPredicate()
{
Expand Down

0 comments on commit a123f44

Please sign in to comment.