Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use builders for GroupBy initialization and updates #2886

Merged
merged 29 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6302b68
use RowSetBuilderRandom instead of RowSet.insert calls
lbooker42 Aug 24, 2022
506b644
Merge branch 'main' of github.com:lbooker42/deephaven-core
lbooker42 Aug 24, 2022
779898e
PT tests passing, spotless applied
lbooker42 Aug 24, 2022
a28e758
wip
lbooker42 Aug 25, 2022
9cd408d
updated PT to use builders
lbooker42 Sep 14, 2022
ef8985c
minor cleanup
lbooker42 Sep 14, 2022
8d1fcbf
pull upstream
lbooker42 Sep 14, 2022
d747f02
bugfix, re-scope setter for initialized flag
lbooker42 Sep 14, 2022
903bc6c
ported Builder improvements
lbooker42 Sep 14, 2022
b139642
pull upstream changes
lbooker42 Sep 16, 2022
c4fd810
PR comments round 1 addressed
lbooker42 Sep 16, 2022
69dc9da
can we get auto-spotless?
lbooker42 Sep 16, 2022
a29e655
add'l PR comments addressed
lbooker42 Sep 19, 2022
51f4c81
fixed a silly comment
lbooker42 Sep 19, 2022
abd7e21
extracted the local class to allow general usage
lbooker42 Sep 19, 2022
ba3567f
Using sequential builders to accelerate initial GroupBy creation
lbooker42 Sep 20, 2022
396d6c8
merge upstream
lbooker42 Sep 20, 2022
7f2496f
bugfix: enable tracking on rowset after build()
lbooker42 Sep 20, 2022
d4f163a
changes to allow more efficient updates (WIP)
lbooker42 Sep 21, 2022
e1665c1
WIP
lbooker42 Sep 21, 2022
c7340fe
COB WIP
lbooker42 Sep 21, 2022
5ddc31e
Aggregation Tests passing, still a little messy
lbooker42 Sep 22, 2022
6be2f49
cleaned up a bit, tests still passing
lbooker42 Sep 22, 2022
efc5186
removed anyKeysModified from propagateUpdates() in chunked aggregation
lbooker42 Sep 22, 2022
1452d4f
revert changes to resetForStep
lbooker42 Sep 22, 2022
6cf9374
spotless for the win
lbooker42 Sep 22, 2022
839d215
another round of comments addressed
lbooker42 Sep 26, 2022
813d34f
more spotless
lbooker42 Sep 26, 2022
7c3fff6
Moved BitmapRandomBuilder to a better home
lbooker42 Sep 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,11 @@ ModifiedColumnSet[] getInputModifiedColumnSets(QueryTable input) {
* initialization.
*
* @param resultTable The result {@link QueryTable} after initialization
* @param maxDestination The maximum destination slot created during initialization
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
*/
void propagateInitialStateToOperators(@NotNull final QueryTable resultTable) {
void propagateInitialStateToOperators(@NotNull final QueryTable resultTable, final int maxDestination) {
for (final IterativeChunkedAggregationOperator operator : operators) {
operator.propagateInitialState(resultTable);
operator.propagateInitialState(resultTable, maxDestination);
}
}

Expand Down Expand Up @@ -266,7 +267,7 @@ UnaryOperator<ModifiedColumnSet>[] initializeRefreshing(@NotNull final QueryTabl
/**
* Allow all operators to reset any per-step internal state. Note that the arguments to this method should not be
* mutated in any way.
*
*
* @param upstream The upstream {@link TableUpdateImpl}
* @param startingDestinationsCount The number of used destinations at the beginning of this step
*/
Expand All @@ -280,7 +281,7 @@ void resetOperatorsForStep(@NotNull final TableUpdate upstream, final int starti
* Allow all operators to perform any internal state keeping needed for destinations that were added (went from 0
* keys to &gt 0), removed (went from &gt 0 keys to 0), or modified (keys added or removed, or keys modified) by
* this iteration. Note that the arguments to this method should not be mutated in any way.
*
*
* @param downstream The downstream {@link TableUpdate} (which does <em>not</em> have its {@link ModifiedColumnSet}
* finalized yet)
* @param newDestinations New destinations added on this update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private boolean addChunk(@NotNull final ByteChunk<? extends Values> values,
}

@Override
public void propagateInitialState(@NotNull final QueryTable resultTable) {
public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) {
copyStreamToResult(resultTable.getRowSet());
redirections = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private boolean addChunk(@NotNull final CharChunk<? extends Values> values,
}

@Override
public void propagateInitialState(@NotNull final QueryTable resultTable) {
public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) {
copyStreamToResult(resultTable.getRowSet());
redirections = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private static QueryTable aggregation(

// Construct the result table
final QueryTable result = new QueryTable(resultRowSet, resultColumnSourceMap);
ac.propagateInitialStateToOperators(result);
ac.propagateInitialStateToOperators(result, outputPosition.intValue());

if (input.isRefreshing()) {
assert keyColumnsCopied != null;
Expand Down Expand Up @@ -1593,7 +1593,7 @@ private static QueryTable staticGroupedAggregation(QueryTable withView, String k

final QueryTable result = new QueryTable(RowSetFactory.flat(responsiveGroups).toTracking(),
resultColumnSourceMap);
ac.propagateInitialStateToOperators(result);
ac.propagateInitialStateToOperators(result, responsiveGroups);

final ReverseLookupListener rll = ReverseLookupListener.makeReverseLookupListenerWithSnapshot(result, keyName);
ac.setReverseLookupFunction(k -> (int) rll.get(k));
Expand Down Expand Up @@ -1957,7 +1957,8 @@ private static QueryTable noKeyAggregation(

final QueryTable result = new QueryTable(RowSetFactory.flat(initialResultSize).toTracking(),
resultColumnSourceMap);
ac.propagateInitialStateToOperators(result);
// always will create one result for zerokey
ac.propagateInitialStateToOperators(result, 1);

if (table.isRefreshing()) {
ac.startTrackingPrevValues();
Expand Down Expand Up @@ -2412,7 +2413,7 @@ public void addRange(final long firstRowKey, final long lastRowKey) {
* We also know that we will only modify the rows that existed when we start, so that we can clamp the maximum key
* for the builder to the maximum output position without loss of fidelity.
*/
private static class BitmapRandomBuilder implements RowSetBuilderRandom {
public static class BitmapRandomBuilder implements RowSetBuilderRandom {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

/**
* An upper bound on {@code lastUsed}. That is, the highest bit index that may be used in {@code bitset}.
Expand All @@ -2434,7 +2435,7 @@ private static class BitmapRandomBuilder implements RowSetBuilderRandom {
*/
long[] bitset;

private BitmapRandomBuilder(int maxKey) {
public BitmapRandomBuilder(int maxKey) {
this.maxKey = maxKey;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private boolean addChunk(@NotNull final DoubleChunk<? extends Values> values,
}

@Override
public void propagateInitialState(@NotNull final QueryTable resultTable) {
public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) {
copyStreamToResult(resultTable.getRowSet());
redirections = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private boolean addChunk(@NotNull final FloatChunk<? extends Values> values,
}

@Override
public void propagateInitialState(@NotNull final QueryTable resultTable) {
public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) {
copyStreamToResult(resultTable.getRowSet());
redirections = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.LongConsumer;
import java.util.function.UnaryOperator;

import static io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource.BLOCK_SIZE;

/**
* An {@link IterativeChunkedAggregationOperator} used in the implementation of {@link Table#applyToAllBy}.
*/
class FormulaChunkedOperator implements StateChangeRecorder, IterativeChunkedAggregationOperator {
class FormulaChunkedOperator implements IterativeChunkedAggregationOperator {

private final GroupByChunkedOperator groupBy;
private final boolean delegateToBy;
Expand Down Expand Up @@ -101,16 +100,6 @@ class FormulaChunkedOperator implements StateChangeRecorder, IterativeChunkedAgg
}
}

@Override
public void startRecording(LongConsumer reincarnatedDestinationCallback, LongConsumer emptiedDestinationCallback) {
groupBy.startRecording(reincarnatedDestinationCallback, emptiedDestinationCallback);
}

@Override
public void finishRecording() {
groupBy.finishRecording();
}

@Override
public void addChunk(final BucketedContext bucketedContext, final Chunk<? extends Values> values,
@NotNull final LongChunk<? extends RowKeys> inputRowKeys,
Expand Down Expand Up @@ -257,9 +246,9 @@ public void ensureCapacity(final long tableSize) {
}

@Override
public void propagateInitialState(@NotNull final QueryTable resultTable) {
public void propagateInitialState(@NotNull final QueryTable resultTable, int startingDestinationsCount) {
if (delegateToBy) {
groupBy.propagateInitialState(resultTable);
groupBy.propagateInitialState(resultTable, startingDestinationsCount);
}

final Map<String, ? extends ColumnSource<?>> byResultColumns = groupBy.getResultColumns();
Expand Down
Loading