Skip to content

Commit

Permalink
Add StructSelectiveStreamReader
Browse files Browse the repository at this point in the history
This is basic implementation of SelectiveStreamReader for ROW type that
supports the following:
- extracting only specified rows;
- top-level IS NULL and IS NOT NULL filters;
- subfield pruning
- structs nested within other structs or arrays.

Support for range filters on nested fields will be added in future commits.
  • Loading branch information
mbasmanova committed Aug 23, 2019
1 parent 0a41532 commit 2af400e
Show file tree
Hide file tree
Showing 7 changed files with 624 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -49,7 +50,12 @@ public class TestHivePushdownFilterQueries
" CASE WHEN orderkey % 11 = 0 THEN null ELSE (orderkey, partkey, suppkey) END AS keys, \n" +
" CASE WHEN orderkey % 13 = 0 THEN null ELSE ((orderkey, partkey), (suppkey,), CASE WHEN orderkey % 17 = 0 THEN null ELSE (orderkey, partkey) END) END AS nested_keys, \n" +
" CASE WHEN orderkey % 17 = 0 THEN null ELSE (shipmode = 'AIR', returnflag = 'R') END as flags, \n" +
" CASE WHEN orderkey % 19 = 0 THEN null ELSE (CAST(discount AS REAL), CAST(tax AS REAL)) END as reals \n" +
" CASE WHEN orderkey % 19 = 0 THEN null ELSE (CAST(discount AS REAL), CAST(tax AS REAL)) END as reals, \n" +
" CASE WHEN orderkey % 23 = 0 THEN null ELSE (orderkey, linenumber, (CAST(day(shipdate) as TINYINT), CAST(month(shipdate) AS TINYINT), CAST(year(shipdate) AS INTEGER))) END AS info, \n" +
" CASE WHEN orderkey % 31 = 0 THEN null ELSE (" +
" (CAST(day(shipdate) AS TINYINT), CAST(month(shipdate) AS TINYINT), CAST(year(shipdate) AS INTEGER)), " +
" (CAST(day(commitdate) AS TINYINT), CAST(month(commitdate) AS TINYINT), CAST(year(commitdate) AS INTEGER)), " +
" (CAST(day(receiptdate) AS TINYINT), CAST(month(receiptdate) AS TINYINT), CAST(year(receiptdate) AS INTEGER))) END AS dates \n" +
"FROM lineitem)\n";

protected TestHivePushdownFilterQueries()
Expand All @@ -67,7 +73,7 @@ private static QueryRunner createQueryRunner()
Optional.empty());

queryRunner.execute(noPushdownFilter(queryRunner.getDefaultSession()),
"CREATE TABLE lineitem_ex (linenumber, orderkey, partkey, suppkey, ship_by_air, is_returned, ship_day, ship_month, ship_timestamp, commit_timestamp, discount_real, tax_real, keys, nested_keys, flags, reals) AS " +
"CREATE TABLE lineitem_ex (linenumber, orderkey, partkey, suppkey, ship_by_air, is_returned, ship_day, ship_month, ship_timestamp, commit_timestamp, discount_real, tax_real, keys, nested_keys, flags, reals, info, dates) AS " +
"SELECT linenumber, orderkey, partkey, suppkey, " +
" IF (linenumber % 5 = 0, null, shipmode = 'AIR') AS ship_by_air, " +
" IF (linenumber % 7 = 0, null, returnflag = 'R') AS is_returned, " +
Expand All @@ -80,7 +86,13 @@ private static QueryRunner createQueryRunner()
" IF (orderkey % 11 = 0, null, ARRAY[orderkey, partkey, suppkey]), " +
" IF (orderkey % 13 = 0, null, ARRAY[ARRAY[orderkey, partkey], ARRAY[suppkey], IF (orderkey % 17 = 0, null, ARRAY[orderkey, partkey])]), " +
" IF (orderkey % 17 = 0, null, ARRAY[shipmode = 'AIR', returnflag = 'R']), " +
" IF (orderkey % 19 = 0, null, ARRAY[CAST(discount AS REAL), CAST(tax AS REAL)]) " +
" IF (orderkey % 19 = 0, null, ARRAY[CAST(discount AS REAL), CAST(tax AS REAL)]), " +
" IF (orderkey % 23 = 0, null, CAST(ROW(orderkey, linenumber, ROW(day(shipdate), month(shipdate), year(shipdate))) " +
" AS ROW(orderkey BIGINT, linenumber INTEGER, shipdate ROW(ship_day TINYINT, ship_month TINYINT, ship_year INTEGER)))), " +
" IF (orderkey % 31 = 0, NULL, ARRAY[" +
" CAST(ROW(day(shipdate), month(shipdate), year(shipdate)) AS ROW(day TINYINT, month TINYINT, year INTEGER)), " +
" CAST(ROW(day(commitdate), month(commitdate), year(commitdate)) AS ROW(day TINYINT, month TINYINT, year INTEGER)), " +
" CAST(ROW(day(receiptdate), month(receiptdate), year(receiptdate)) AS ROW(day TINYINT, month TINYINT, year INTEGER))]) " +
"FROM lineitem");

return queryRunner;
Expand Down Expand Up @@ -258,6 +270,31 @@ public void testArrays()
assertFilterProjectFails("nested_keys[2][5] > 0", "orderkey", "Array subscript out of bounds");
}

@Test
public void testStructs()
{
assertQueryUsingH2Cte("SELECT orderkey, info, dates FROM lineitem_ex");

Function<String, String> rewriter = query -> query.replaceAll("info.orderkey", "info[1]")
.replaceAll("info.linenumber", "info[2]")
.replaceAll("info.shipdate.ship_year", "info[3][3]")
.replaceAll("info.shipdate", "info[3]")
.replaceAll("dates\\[1\\].day", "dates[1][1]");

assertQueryUsingH2Cte("SELECT info.orderkey FROM lineitem_ex", rewriter);
assertQueryUsingH2Cte("SELECT info.orderkey, info.linenumber FROM lineitem_ex", rewriter);

assertQueryUsingH2Cte("SELECT info.linenumber, info.shipdate.ship_year FROM lineitem_ex WHERE orderkey < 1000", rewriter);

assertQueryUsingH2Cte("SELECT info.orderkey FROM lineitem_ex WHERE info IS NULL", rewriter);
assertQueryUsingH2Cte("SELECT info.orderkey FROM lineitem_ex WHERE info IS NOT NULL", rewriter);

assertQueryUsingH2Cte("SELECT info, dates FROM lineitem_ex WHERE info.orderkey % 7 = 0", rewriter);
assertQueryUsingH2Cte("SELECT info.orderkey, info.shipdate FROM lineitem_ex WHERE info.orderkey % 7 = 0", rewriter);

assertQueryUsingH2Cte("SELECT dates FROM lineitem_ex WHERE dates[1].day % 2 = 0", rewriter);
}

private void assertFilterProject(String filter, String projections)
{
assertQueryUsingH2Cte(format("SELECT * FROM lineitem_ex WHERE %s", filter));
Expand Down Expand Up @@ -362,7 +399,12 @@ public void testPathColumn()

private void assertQueryUsingH2Cte(String query)
{
assertQuery(query, WITH_LINEITEM_EX + toH2(query));
assertQueryUsingH2Cte(query, Function.identity());
}

private void assertQueryUsingH2Cte(String query, Function<String, String> rewriter)
{
assertQuery(query, WITH_LINEITEM_EX + toH2(rewriter.apply(query)));
}

private static String toH2(String query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public static SelectiveStreamReader createStreamReader(
case LIST:
return new ListSelectiveStreamReader(streamDescriptor, filters, requiredSubfields, null, 0, outputType, hiveStorageTimeZone, systemMemoryContext);
case STRUCT:
return new StructSelectiveStreamReader(streamDescriptor, filters, requiredSubfields, outputType, hiveStorageTimeZone, systemMemoryContext);
case MAP:
case DECIMAL:
case UNION:
Expand Down Expand Up @@ -134,6 +135,7 @@ public static SelectiveStreamReader createNestedStreamReader(
Optional<ListFilter> childFilter = parentFilter.map(HierarchicalFilter::getChild).map(ListFilter.class::cast);
return new ListSelectiveStreamReader(streamDescriptor, ImmutableMap.of(), ImmutableList.of(), childFilter.orElse(null), level, outputType, hiveStorageTimeZone, systemMemoryContext.newAggregatedMemoryContext());
case STRUCT:
return new StructSelectiveStreamReader(streamDescriptor, ImmutableMap.of(), ImmutableList.of(), outputType, hiveStorageTimeZone, systemMemoryContext.newAggregatedMemoryContext());
case MAP:
case UNION:
default:
Expand Down
Loading

0 comments on commit 2af400e

Please sign in to comment.