Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize single-column FilterFunctions to only run on distinct values for DictionaryBlocks #13289

Merged

Conversation

oerling
Copy link

@oerling oerling commented Aug 27, 2019

Note that the DictionaryBlocks are produced by scan only for string columns. This therefore depends on the introduction of a selective slice dictionary reader.

@oerling oerling requested a review from mbasmanova August 27, 2019 00:59
@mbasmanova mbasmanova self-assigned this Aug 27, 2019
@mbasmanova mbasmanova requested a review from a team August 27, 2019 11:45
@mbasmanova mbasmanova added the aria Presto Aria performance improvements label Aug 27, 2019
Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oerling This would be a nice optimization. Thanks for contributing. I have some comments.

private Block previousDictionary;
private Page dictionaryPage;

private static final byte FILTER_NOT_EVALUATED = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static member variables go to the beginning of the class before instance member variables

// as the dictionary inside the block is physically the same.
private byte[] dictionaryResults;
private Block previousDictionary;
private Page dictionaryPage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dictionaryPage variables is not used

// If the function has a single argument and this is a DictionaryBlock, we can cache results. The cache is valid as long
// as the dictionary inside the block is physically the same.
private byte[] dictionaryResults;
private Block previousDictionary;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Holding on to dictionary block will work with the current implementation of the readers, but strictly speaking this is not safe. SelectiveStreamReader.getBlockView API returns a temporary view into the data and the caller is not supposed to use this data after next call to SelectiveStreamReader.read. The data is both the block and the dictionary block it is referencing. Hence, it is not safe to hold on to a dictionary from the previous batch. How important is it to re-use filter result across batches? If important, let's think about ways to make it less fragile.

previousDictionary = dictionary;
int numEntries = dictionary.getPositionCount();
dictionaryPage = new Page(numEntries, dictionary);
if (dictionaryResults == null || dictionaryResults.length < numEntries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is preferable to use Arrays.ensureCapacity method for allocating arrays.

            dictionaryResults = ensureCapacity(dictionaryResults, dictionary.getPositionCount());
            fill(dictionaryResults, FILTER_NOT_EVALUATED);

If it is important to avoid extra fill for newly allocated array, consider adding another ensureCapacity method that takes a value used to initialize new array or re-set existing one.

dictionaryResults = ensureCapacity(dictionaryResults, dictionary.getPositionCount(), FILTER_NOT_EVALUATED);

    public static byte[] ensureCapacity(byte[] buffer, int capacity, byte initialValue)
    {
        if (buffer == null || buffer.length < capacity) {
            byte[] newBuffer = new byte[capacity];
            if (initialValue != 0) {
                fill(buffer, initialValue);
            }
            return newBuffer;
        }

        fill(buffer, initialValue);
        return buffer;
    }

continue;
}
if (result == FILTER_PASSED) {
positions[outputCount++] = position;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to carry over any errors that occurred while evaluating earlier filters:

                    positions[outputCount] = position;
                    errors[outputCount] = errors[i];
                    outputCount++;

same below

int position = positions[i];
int dictionaryPosition = block.getId(position);
byte result = dictionaryResults[dictionaryPosition];
if (result == FILTER_FAILED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any particular reason not to use switch?

            switch (result) {
                case FILTER_FAILED:
                    break;
                case FILTER_PASSED:
                    positions[outputCount] = position;
                    errors[outputCount] = errors[i];
                    outputCount++;
                    break;
                case FILTER_NOT_EVALUATED:
                    try {
                        if (predicate.evaluate(session, page, position)) {
                            positions[outputCount++] = position;
                            errors[outputCount] = errors[i];
                            dictionaryResults[dictionaryPosition] = FILTER_PASSED;
                        }
                        else {
                            dictionaryResults[dictionaryPosition] = FILTER_FAILED;
                        }
                    }
                    catch (RuntimeException e) {
                        // We do not record errors in the dictionary results.
                        positions[outputCount] = position;
                        errors[outputCount] = e;    // keep last error
                        outputCount++;
                    };
                    break;
                default:
                    verify(false, "Unexpected filter result: " + result);
            }

@@ -334,6 +334,9 @@ public void testFilterFunctions()
// filter function on numeric and boolean columns
assertFilterProject("if(is_returned, linenumber, orderkey) % 5 = 0", "linenumber");

// Filter functions on dictionary encoded columns
assertQuery("SELECT orderkey, linenumber, shipmode, shipinstruct FROM lineitem WHERE shipmode LIKE '%R%' and shipinstruct LIKE '%CO%'");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails because there is no varchar reader yet. FilterFunction can be unit tested though. This might be a better way to test it anyway.

@oerling oerling force-pushed the filter-function-with-dictionary branch from 0ee7fdd to 19b30d6 Compare August 28, 2019 22:43
@oerling
Copy link
Author

oerling commented Aug 28, 2019 via email

@oerling
Copy link
Author

oerling commented Aug 28, 2019 via email

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oerling Orri, thanks for adding unit test. Let's remove failing integration test, address comments and merge this. The integration test will be added in @bhhari's PR that introduces varchar reader.

@@ -53,6 +68,10 @@ public int filter(Page page, int[] positions, int positionCount, RuntimeExceptio
checkArgument(positionCount <= positions.length);
checkArgument(positionCount <= errors.length);

if (inputChannels.length == 1 && page.getBlock(0) instanceof DictionaryBlock && deterministic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd put the cheapest check first, e.g. if (deterministic && ...)

}
}
catch (RuntimeException e) {
// We do not record errors in the dictionary results.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm my understanding: we'll keep re-evaluating the filter if it throws an error, right?

private static final long UNLUCKY = 13;

@Test
public void TestFilter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

method names start with a lower case: TestFilter -> testFilter

checkFilter(filter, otherDictionary, otherDictionaryPositions, otherDictionaryPositions.length);

// Repeat test on a dictionary with different content to make sure that cached results are not reused.
Block numbers2 = makeNumbers(1, 1001);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numbers2 variable is not used; remove it

return array;
}

private static Block makeNumbers(long from, long to)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • input arguments can be integers; then toIntExact won't be needed
  • I think looping from 0 to count is easier to read
    private static Block makeNumbers(int from, int to)
    {
        int count = toIntExact(to - from);
        long[] array = new long[count];
        for (int i = 0; i < count; i++) {
            array[i] = from + i;
        }

        return new LongArrayBlock(count, Optional.empty(), array);
    }

ConnectorSession session = new TestingConnectorSession(ImmutableList.of());
FilterFunction filter = new FilterFunction(session, true, new IsOddPredicate());
checkFilter(filter, numbers, allPositions, allPositions.length);
Block dictionaryNumbers = new DictionaryBlock(numbers, makePositions(0, 1000, 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use allPositions here

{
Block numbers = makeNumbers(0, 1000);
int[] allPositions = makePositions(0, 1000, 1);
ConnectorSession session = new TestingConnectorSession(ImmutableList.of());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I'd initialize session and filter in the beginning of the method and add an empty line after that. These variables are not changing and are the same for all the test cases here.
  • I'd put an empty line after each call to assertFilter
        ConnectorSession session = new TestingConnectorSession(ImmutableList.of());
        FilterFunction filter = new FilterFunction(session, true, new IsOddPredicate());

        Block numbers = makeNumbers(0, 1000);
...


// Repeat test on a dictionary with different content to make sure that cached results are not reused.
Block numbers2 = makeNumbers(1, 1001);
Block dictionary2Numbers = new DictionaryBlock(numbers, makePositions(0, 1000, 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • use allPositions here
  • dictionary2Numbers appears to be the same as dictionaryNumbers (the pointers are different though) - is this intentional; the comment dictionary with different content appears misleading

checkFilter(filter, numbers, allPositions, allPositions.length);
Block dictionaryNumbers = new DictionaryBlock(numbers, makePositions(0, 1000, 1));
checkFilter(filter, dictionaryNumbers, allPositions, allPositions.length);
// Sparse coverage of the same dictionary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm seeing that we are not testing the case where subsequent call is reusing previous results, but still evaluates some new values. To cover this, we need to run filter on sparse values first, then on all values.

        Block dictionaryNumbers = new DictionaryBlock(numbers, allPositions);

        // Sparse coverage of the dictionary values
        int[] sparsePositions = makePositions(1, 300, 3);
        assertFilter(filter, dictionaryNumbers, sparsePositions, sparsePositions.length);

        // Full coverage of the dictionary values
        assertFilter(filter, dictionaryNumbers, allPositions, allPositions.length);

// Sparse coverage of the same dictionary
int[] sparsePositions = makePositions(1, 300, 3);
checkFilter(filter, dictionaryNumbers, sparsePositions, sparsePositions.length);
// Test with a different dictionary over the same numbers. Results are reused.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Results are reused.- I think this is not correct because otherDictionary is a new object and therefore won't match any previous dictionary.

@mbasmanova
Copy link
Contributor

@oerling Commit title is a bit too long. How about shortening it to Optimize single-column filter functions applied to dictionary blocks and updating the description to

This optimization will apply to dictionary blocks produced by scanning low 
cardinality string columns.

@oerling oerling force-pushed the filter-function-with-dictionary branch 2 times, most recently from d0f0d52 to d385c51 Compare September 5, 2019 18:11
@oerling
Copy link
Author

oerling commented Sep 5, 2019 via email

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oerling Looks good to me % one comment.

// Repeat test on a DictionaryBlock over different content to make sure that cached results are not reused.
Block numbers2 = makeNumbers(1, 1001);
Block dictionary2Numbers = new DictionaryBlock(numbers2, allPositions);
assertFilter(filter, dictionary2Numbers, allPositions, allPositions.length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use a1, a2, a3 names; here, I'd inline numbers2 and dictionary2Numbers to avoid naming problem: assertFilter(filter, new DictionaryBlock(makeNumbers(1, 1001), allPositions), allPositions, allPositions.length);

Applies the filter at most once on any of the distinct values in a DictionaryBlock over the same dictionary. This optimizes filtering of low cardinality string columns.
@oerling oerling force-pushed the filter-function-with-dictionary branch from d385c51 to a5f060e Compare September 5, 2019 20:43
@oerling
Copy link
Author

oerling commented Sep 5, 2019 via email

@mbasmanova mbasmanova requested a review from a team September 5, 2019 22:44
@mbasmanova mbasmanova merged commit 29bb4c6 into prestodb:master Sep 5, 2019
@mbasmanova
Copy link
Contributor

@oerling Thank you, Orri.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
aria Presto Aria performance improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants