Skip to content

Commit

Permalink
Optimize single-column FilterFunctions on DictionaryBlocks
Browse files Browse the repository at this point in the history
Applies the filter at most once on any of the distinct values in a DictionaryBlock over the same dictionary. This optimizes filtering of low cardinality string columns.
  • Loading branch information
[email protected] authored and mbasmanova committed Sep 5, 2019
1 parent 67efdcf commit 29bb4c6
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,33 @@

import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.DictionaryBlock;
import com.facebook.presto.spi.relation.Predicate;

import static com.facebook.presto.array.Arrays.ensureCapacity;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static java.util.Arrays.fill;
import static java.util.Objects.requireNonNull;

public class FilterFunction
{
private static final byte FILTER_NOT_EVALUATED = 0;
private static final byte FILTER_PASSED = 1;
private static final byte FILTER_FAILED = 2;

private final ConnectorSession session;
private final Predicate predicate;
private final boolean deterministic;
private final int[] inputChannels;

// If the function has a single argument and this is a DictionaryBlock, we can cache results. The cache is valid as long
// as the dictionary inside the block is physically the same.
private byte[] dictionaryResults;
private Block previousDictionary;
private Page dictionaryPage;

public FilterFunction(ConnectorSession session, boolean deterministic, Predicate predicate)
{
this.session = requireNonNull(session, "session is null");
Expand All @@ -53,6 +68,10 @@ public int filter(Page page, int[] positions, int positionCount, RuntimeExceptio
checkArgument(positionCount <= positions.length);
checkArgument(positionCount <= errors.length);

if (deterministic && inputChannels.length == 1 && page.getBlock(0) instanceof DictionaryBlock) {
return filterWithDictionary(page, positions, positionCount, errors);
}

int outputCount = 0;
for (int i = 0; i < positionCount; i++) {
int position = positions[i];
Expand All @@ -73,6 +92,56 @@ public int filter(Page page, int[] positions, int positionCount, RuntimeExceptio
return outputCount;
}

private int filterWithDictionary(Page page, int[] positions, int positionCount, RuntimeException[] errors)
{
int outputCount = 0;
DictionaryBlock block = (DictionaryBlock) page.getBlock(0);
Block dictionary = block.getDictionary();
if (dictionary != previousDictionary) {
previousDictionary = dictionary;
int numEntries = dictionary.getPositionCount();
dictionaryPage = new Page(numEntries, dictionary);
dictionaryResults = ensureCapacity(dictionaryResults, numEntries);
fill(dictionaryResults, 0, numEntries, FILTER_NOT_EVALUATED);
}
for (int i = 0; i < positionCount; i++) {
int position = positions[i];
int dictionaryPosition = block.getId(position);
byte result = dictionaryResults[dictionaryPosition];
switch (result) {
case FILTER_FAILED:
continue;
case FILTER_PASSED:
positions[outputCount] = position;
errors[outputCount] = errors[i];
outputCount++;
continue;
case FILTER_NOT_EVALUATED:
try {
if (predicate.evaluate(session, dictionaryPage, dictionaryPosition)) {
positions[outputCount] = position;
errors[outputCount] = errors[i];
outputCount++;
dictionaryResults[dictionaryPosition] = FILTER_PASSED;
}
else {
dictionaryResults[dictionaryPosition] = FILTER_FAILED;
}
}
catch (RuntimeException e) {
// We do not record errors in the dictionary results.
positions[outputCount] = position;
errors[outputCount] = e; // keep last error
outputCount++;
}
break;
default:
verify(false, "Unexpected filter result: " + result);
}
}
return outputCount;
}

public int[] getInputChannels()
{
return inputChannels;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc;

import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.DictionaryBlock;
import com.facebook.presto.spi.block.LongArrayBlock;
import com.facebook.presto.spi.relation.Predicate;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.Optional;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestFilterFunction
{
private static final long UNLUCKY = 13;

@Test
public void testFilter()
{
ConnectorSession session = new TestingConnectorSession(ImmutableList.of());
FilterFunction filter = new FilterFunction(session, true, new IsOddPredicate());

Block numbers = makeNumbers(0, 1000);
int[] allPositions = makePositions(0, 1000, 1);
assertFilter(filter, numbers, allPositions, allPositions.length);

Block dictionaryNumbers = new DictionaryBlock(numbers, allPositions);
// Sparse coverage of the dictionary values
int[] sparsePositions = makePositions(1, 300, 3);
assertFilter(filter, dictionaryNumbers, sparsePositions, sparsePositions.length);

// Full coverage of the dictionary values
assertFilter(filter, dictionaryNumbers, allPositions, allPositions.length);

// Test with a different DictionaryBlock over the same numbers. Results are reused. The DictionaryBlock covers the
// values sparsely. TheDictionaryBlock itself is accessed sparsely.
DictionaryBlock otherDictionary = new DictionaryBlock(numbers, makePositions(1, 332, 3));
int[] otherDictionaryPositions = makePositions(0, 150, 2);
assertFilter(filter, otherDictionary, otherDictionaryPositions, otherDictionaryPositions.length);

// Repeat test on a DictionaryBlock over different content to make sure that cached results are not reused.
assertFilter(filter, new DictionaryBlock(makeNumbers(1, 1001), allPositions), allPositions, allPositions.length);
}

private static void assertFilter(FilterFunction filter, Block input, int[] inputPositions, int positionCount)
{
// Copy the positions array because filter mutates it.
int[] positions = Arrays.copyOf(inputPositions, positionCount);
RuntimeException[] errors = new RuntimeException[inputPositions[positionCount - 1] + 1];
// Put a pre-existing error in the 1st half of the input.
int numPreviousErrors = positionCount / 2;
for (int i = 0; i < numPreviousErrors; i++) {
errors[i] = new RuntimeException("Pre-existent error at " + positions[i]);
}
int lastErrorPosition = numPreviousErrors > 0 ? positions[numPreviousErrors - 1] : -1;
int numHits = filter.filter(new Page(positionCount, input), positions, positionCount, errors);
int hitCounter = 0;
for (int position : inputPositions) {
long number = input.getLong(position);
if (number == UNLUCKY) {
assertEquals(positions[hitCounter], position);
assertTrue(errors[hitCounter] instanceof UnluckyError);
hitCounter++;
}
else if ((number & 1) == 1) {
assertEquals(positions[hitCounter], position);
if (position <= lastErrorPosition) {
assertTrue(errors[hitCounter] instanceof RuntimeException);
}
else {
assertEquals(errors[hitCounter], null);
}
hitCounter++;
}
}
assertEquals(numHits, hitCounter);
}

private static class UnluckyError
extends RuntimeException
{
}

private static int[] makePositions(int from, int count, int step)
{
int[] array = new int[count];
for (int i = 0; i < count; i++) {
array[i] = from + step * i;
}
return array;
}

private static Block makeNumbers(int from, int to)
{
int count = to - from;
long[] array = new long[count];
for (int i = 0; i < count; i++) {
array[i] = from + i;
}
return new LongArrayBlock(count, Optional.empty(), array);
}

private static class IsOddPredicate
implements Predicate
{
@Override
public int[] getInputChannels()
{
return new int[] {0};
}

@Override
public boolean evaluate(ConnectorSession session, Page page, int position)
{
long number = page.getBlock(0).getLong(position);
if (number == UNLUCKY) {
throw new UnluckyError();
}
return (number & 1) == 1;
}
}
}

0 comments on commit 29bb4c6

Please sign in to comment.