Skip to content

Commit

Permalink
Fix handling of non-deterministic filters in SelectiveStreamReaders
Browse files Browse the repository at this point in the history
  • Loading branch information
bhhari authored and mbasmanova committed Aug 28, 2019
1 parent 47423bb commit 0ea8208
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -47,6 +48,7 @@ public class TestHivePushdownFilterQueries
" CASE WHEN orderkey % 3 = 0 THEN null ELSE CAST(commitdate AS TIMESTAMP) END AS commit_timestamp, \n" +
" CASE WHEN orderkey % 5 = 0 THEN null ELSE CAST(discount AS REAL) END AS discount_real, \n" +
" CASE WHEN orderkey % 7 = 0 THEN null ELSE CAST(tax AS REAL) END AS tax_real, \n" +
" CASE WHEN linenumber % 2 = 0 THEN null ELSE (CAST(day(shipdate) AS TINYINT) , CAST(month(shipdate) AS TINYINT)) END AS ship_day_month, " +
" CASE WHEN orderkey % 11 = 0 THEN null ELSE (orderkey, partkey, suppkey) END AS keys, \n" +
" CASE WHEN orderkey % 13 = 0 THEN null ELSE ((orderkey, partkey), (suppkey,), CASE WHEN orderkey % 17 = 0 THEN null ELSE (orderkey, partkey) END) END AS nested_keys, \n" +
" CASE WHEN orderkey % 17 = 0 THEN null ELSE (shipmode = 'AIR', returnflag = 'R') END as flags, \n" +
Expand All @@ -55,7 +57,8 @@ public class TestHivePushdownFilterQueries
" CASE WHEN orderkey % 31 = 0 THEN null ELSE (" +
" (CAST(day(shipdate) AS TINYINT), CAST(month(shipdate) AS TINYINT), CAST(year(shipdate) AS INTEGER)), " +
" (CAST(day(commitdate) AS TINYINT), CAST(month(commitdate) AS TINYINT), CAST(year(commitdate) AS INTEGER)), " +
" (CAST(day(receiptdate) AS TINYINT), CAST(month(receiptdate) AS TINYINT), CAST(year(receiptdate) AS INTEGER))) END AS dates \n" +
" (CAST(day(receiptdate) AS TINYINT), CAST(month(receiptdate) AS TINYINT), CAST(year(receiptdate) AS INTEGER))) END AS dates, \n" +
" CASE WHEN orderkey % 37 = 0 THEN null ELSE (CAST(shipdate AS TIMESTAMP), CAST(commitdate AS TIMESTAMP)) END AS timestamps \n" +
"FROM lineitem)\n";

protected TestHivePushdownFilterQueries()
Expand All @@ -73,7 +76,7 @@ private static QueryRunner createQueryRunner()
Optional.empty());

queryRunner.execute(noPushdownFilter(queryRunner.getDefaultSession()),
"CREATE TABLE lineitem_ex (linenumber, orderkey, partkey, suppkey, ship_by_air, is_returned, ship_day, ship_month, ship_timestamp, commit_timestamp, discount_real, tax_real, keys, nested_keys, flags, reals, info, dates) AS " +
"CREATE TABLE lineitem_ex (linenumber, orderkey, partkey, suppkey, ship_by_air, is_returned, ship_day, ship_month, ship_timestamp, commit_timestamp, discount_real, tax_real, ship_day_month, keys, nested_keys, flags, reals, info, dates, timestamps) AS " +
"SELECT linenumber, orderkey, partkey, suppkey, " +
" IF (linenumber % 5 = 0, null, shipmode = 'AIR') AS ship_by_air, " +
" IF (linenumber % 7 = 0, null, returnflag = 'R') AS is_returned, " +
Expand All @@ -83,6 +86,7 @@ private static QueryRunner createQueryRunner()
" IF (orderkey % 3 = 0, null, CAST(commitdate AS TIMESTAMP)) AS commit_timestamp, " +
" IF (orderkey % 5 = 0, null, CAST(discount AS REAL)) AS discount_real, " +
" IF (orderkey % 7 = 0, null, CAST(tax AS REAL)) AS tax_real, " +
" IF (linenumber % 2 = 0, null, ARRAY[CAST(day(shipdate) AS TINYINT), CAST(month(shipdate) AS TINYINT)]) AS ship_day_month, " +
" IF (orderkey % 11 = 0, null, ARRAY[orderkey, partkey, suppkey]), " +
" IF (orderkey % 13 = 0, null, ARRAY[ARRAY[orderkey, partkey], ARRAY[suppkey], IF (orderkey % 17 = 0, null, ARRAY[orderkey, partkey])]), " +
" IF (orderkey % 17 = 0, null, ARRAY[shipmode = 'AIR', returnflag = 'R']), " +
Expand All @@ -92,7 +96,8 @@ private static QueryRunner createQueryRunner()
" IF (orderkey % 31 = 0, NULL, ARRAY[" +
" CAST(ROW(day(shipdate), month(shipdate), year(shipdate)) AS ROW(day TINYINT, month TINYINT, year INTEGER)), " +
" CAST(ROW(day(commitdate), month(commitdate), year(commitdate)) AS ROW(day TINYINT, month TINYINT, year INTEGER)), " +
" CAST(ROW(day(receiptdate), month(receiptdate), year(receiptdate)) AS ROW(day TINYINT, month TINYINT, year INTEGER))]) " +
" CAST(ROW(day(receiptdate), month(receiptdate), year(receiptdate)) AS ROW(day TINYINT, month TINYINT, year INTEGER))]) ," +
" IF (orderkey % 37 = 0, NULL, ARRAY[CAST(shipdate AS TIMESTAMP), CAST(commitdate AS TIMESTAMP)]) AS timestamps " +
"FROM lineitem");

return queryRunner;
Expand Down Expand Up @@ -146,6 +151,8 @@ public void testBytes()
assertQueryUsingH2Cte("SELECT COUNT(*) FROM lineitem_ex WHERE ship_day is not null AND ship_month = 1");

assertQueryUsingH2Cte("SELECT ship_day, ship_month FROM lineitem_ex WHERE ship_day > 15 AND ship_month < 5 AND (ship_day + ship_month) < 20");

assertQueryUsingH2Cte("SELECT count(*) FROM lineitem_ex WHERE ship_day_month[2] = 12");
}

@Test
Expand Down Expand Up @@ -186,6 +193,10 @@ public void testTimestamps()
assertQueryReturnsEmptyResult("SELECT commit_timestamp, ship_timestamp FROM lineitem_ex WHERE year(ship_timestamp) - year(commit_timestamp) > 1");

assertQueryUsingH2Cte("SELECT commit_timestamp, ship_timestamp, orderkey FROM lineitem_ex WHERE year(commit_timestamp) > 1993 and year(ship_timestamp) > 1993 and year(ship_timestamp) - year(commit_timestamp) = 1");

assertQueryUsingH2Cte("SELECT count(*) from lineitem_ex where timestamps[1] > TIMESTAMP '1993-08-08 01:00:00'");

assertQueryUsingH2Cte("SELECT count(*) from lineitem_ex where year(timestamps[1]) != year(timestamps[2])");
}

@Test
Expand Down Expand Up @@ -397,6 +408,33 @@ public void testPathColumn()
assertQuerySucceeds(session, "SELECT linenumber, \"$path\" FROM lineitem WHERE length(\"$path\") % 2 = linenumber % 2");
}

//TODO add a correctness check for the results and move this test to TestOrcSelectiveReader
@Test
public void testArraysOfNulls()
{
getQueryRunner().execute("CREATE TABLE test_arrays_of_nulls AS " +
"SELECT orderkey, linenumber, " +
" CAST(ARRAY[null, null, null] AS ARRAY(BIGINT)) bigints, " +
" CAST(ARRAY[null, null, null] AS ARRAY(INTEGER)) integers, " +
" CAST(ARRAY[null, null, null] AS ARRAY(SMALLINT)) smallints, " +
" CAST(ARRAY[null, null, null] AS ARRAY(TINYINT)) tinyints, " +
" CAST(ARRAY[null, null, null] AS ARRAY(BOOLEAN)) booleans, " +
" CAST(ARRAY[null, null, null] AS ARRAY(TIMESTAMP)) timestamps, " +
" CAST(ARRAY[null, null, null] AS ARRAY(REAL)) floats " +
"FROM lineitem");

List<String> columnNames = ImmutableList.of("bigints", "integers", "tinyints", "smallints", "booleans", "floats");
try {
for (String columnName : columnNames) {
assertQuerySucceeds(getSession(), format("SELECT count(*) FROM test_arrays_of_nulls WHERE %s[1] IS NOT NULL", columnName));
assertQuerySucceeds(getSession(), format("SELECT count(*) FROM test_arrays_of_nulls WHERE %s[1] IS NULL", columnName));
}
}
finally {
getQueryRunner().execute("DROP TABLE test_arrays_of_nulls");
}
}

private void assertQueryUsingH2Cte(String query)
{
assertQueryUsingH2Cte(query, Function.identity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,20 @@ private int readAllNulls(int[] positions, int positionCount)
if (filter.testNull()) {
outputPositionCount++;
}
else {
outputPositionCount -= filter.getPrecedingPositionsToFail();
i += filter.getSucceedingPositionsToFail();
}
}
}
else if (nullsAllowed) {
outputPositionCount = positionCount;
allNulls = true;
}
else {
outputPositionCount = 0;
}

allNulls = true;
return positions[positionCount - 1] + 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class ByteSelectiveStreamReader
private final boolean nullsAllowed;
private final boolean outputRequired;
private final LocalMemoryContext systemMemoryContext;
private final boolean nonDeterministicFilter;

private InputStreamSource<BooleanInputStream> presentStreamSource = missingStreamSource(BooleanInputStream.class);
private InputStreamSource<ByteInputStream> dataStreamSource = missingStreamSource(ByteInputStream.class);
Expand Down Expand Up @@ -87,7 +88,8 @@ public ByteSelectiveStreamReader(
this.filter = filter.orElse(null);
this.outputRequired = outputRequired;
this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null");
this.nullsAllowed = this.filter == null || this.filter.testNull();
this.nonDeterministicFilter = this.filter != null && !this.filter.isDeterministic();
this.nullsAllowed = this.filter == null || nonDeterministicFilter || this.filter.testNull();
}

@Override
Expand Down Expand Up @@ -184,7 +186,7 @@ private int readWithFilter(int[] positions, int positionCount)
}

if (presentStream != null && !presentStream.nextBit()) {
if (nullsAllowed) {
if ((nonDeterministicFilter && filter.testNull()) || nullsAllowed) {
if (outputRequired) {
nulls[outputPositionCount] = true;
}
Expand All @@ -206,6 +208,21 @@ private int readWithFilter(int[] positions, int positionCount)
}
}
streamPosition++;

if (filter != null) {
outputPositionCount -= filter.getPrecedingPositionsToFail();
int succeedingPositionsToFail = filter.getSucceedingPositionsToFail();
if (succeedingPositionsToFail > 0) {
int positionsToSkip = 0;
for (int j = 0; j < succeedingPositionsToFail; j++) {
i++;
int nextPosition = positions[i];
positionsToSkip += 1 + nextPosition - streamPosition;
streamPosition = nextPosition + 1;
}
skip(positionsToSkip);
}
}
}
return streamPosition;
}
Expand All @@ -215,14 +232,26 @@ private int readAllNulls(int[] positions, int positionCount)
{
presentStream.skip(positions[positionCount - 1]);

if (nullsAllowed) {
if (nonDeterministicFilter) {
outputPositionCount = 0;
for (int i = 0; i < positionCount; i++) {
if (filter.testNull()) {
outputPositionCount++;
}
else {
outputPositionCount -= filter.getPrecedingPositionsToFail();
i += filter.getSucceedingPositionsToFail();
}
}
}
else if (nullsAllowed) {
outputPositionCount = positionCount;
allNulls = true;
}
else {
outputPositionCount = 0;
}

allNulls = true;
return positions[positionCount - 1] + 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,20 @@ private int readAllNulls(int[] positions, int positionCount)
if (filter.testNull()) {
outputPositionCount++;
}
else {
outputPositionCount -= filter.getPrecedingPositionsToFail();
i += filter.getSucceedingPositionsToFail();
}
}
}
else if (nullsAllowed) {
outputPositionCount = positionCount;
allNulls = true;
}
else {
outputPositionCount = 0;
}

allNulls = true;
return positions[positionCount - 1] + 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,20 @@ private int readAllNulls(int[] positions, int positionCount)
if (filter.testNull()) {
outputPositionCount++;
}
else {
outputPositionCount -= filter.getPrecedingPositionsToFail();
i += filter.getSucceedingPositionsToFail();
}
}
}
else if (nullsAllowed) {
outputPositionCount = positionCount;
allNulls = true;
}
else {
outputPositionCount = 0;
}

allNulls = true;
return positions[positionCount - 1] + 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,22 @@ private int readWithFilter(int[] positions, int positionCount)
}
}
streamPosition++;

if (filter != null) {
outputPositionCount -= filter.getPrecedingPositionsToFail();

int succeedingPositionsToFail = filter.getSucceedingPositionsToFail();
if (succeedingPositionsToFail > 0) {
int positionsToSkip = 0;
for (int j = 0; j < succeedingPositionsToFail; j++) {
i++;
int nextPosition = positions[i];
positionsToSkip += 1 + nextPosition - streamPosition;
streamPosition = nextPosition + 1;
}
skip(positionsToSkip);
}
}
}
return streamPosition;
}
Expand All @@ -236,16 +252,20 @@ private int readAllNulls(int[] positions, int positionCount)
if (filter.testNull()) {
outputPositionCount++;
}
else {
outputPositionCount -= filter.getPrecedingPositionsToFail();
i += filter.getSucceedingPositionsToFail();
}
}
}
else if (nullsAllowed) {
outputPositionCount = positionCount;
allNulls = true;
}
else {
outputPositionCount = 0;
}

allNulls = true;
return positions[positionCount - 1] + 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import static com.google.common.collect.Iterables.cycle;
import static com.google.common.collect.Iterables.limit;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.reverse;
import static java.util.Collections.nCopies;
import static java.util.stream.Collectors.toList;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -114,10 +113,10 @@ public void testByteValues()
ImmutableMap.of(1, IS_NULL),
ImmutableMap.of(
0,
BigintRange.of(11, 15, false),
BigintRange.of(7, 17, false),
1,
BigintRange.of(11, Long.MAX_VALUE, false)));
tester.testRoundTripTypes(ImmutableList.of(TINYINT, TINYINT), ImmutableList.of(byteValues, reverse(byteValues)), filters);
BigintRange.of(12, 14, false)));
tester.testRoundTripTypes(ImmutableList.of(TINYINT, TINYINT), ImmutableList.of(byteValues, byteValues), filters);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.spi.type.CharType;
import com.facebook.presto.spi.type.DecimalType;
import com.facebook.presto.spi.type.RowType;
import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarcharType;
import com.facebook.presto.testing.MaterializedResult;
Expand All @@ -48,6 +49,7 @@
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
Expand Down Expand Up @@ -402,6 +404,12 @@ private static Object[] mapArrayValues(ArrayType arrayType, Object[] values)
.toArray();
}

if (elementType instanceof TimestampType) {
return Arrays.stream(values)
.map(v -> v == null ? null : ((Timestamp) v).toLocalDateTime())
.toArray();
}

return values;
}

Expand Down

0 comments on commit 0ea8208

Please sign in to comment.