Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RowKey-Agnostic ColumnSource Optimizations #3329

Merged
merged 5 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeyRanges;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.util.datastructures.LongRangeAbortableConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1528,8 +1528,9 @@ private static QueryTable makeResult(QueryTable leftTable, Table rightTable, Row
MatchPair[] columnsToAdd, boolean refreshing) {
final Map<String, ColumnSource<?>> columnSources = new LinkedHashMap<>(leftTable.getColumnSourceMap());
Arrays.stream(columnsToAdd).forEach(mp -> {
final RedirectedColumnSource<?> rightSource =
new RedirectedColumnSource<>(rowRedirection, rightTable.getColumnSource(mp.rightColumn()));
// note that we must always redirect the right-hand side, because unmatched rows will be redirected to null
final ColumnSource<?> rightSource =
RedirectedColumnSource.alwaysRedirect(rowRedirection, rightTable.getColumnSource(mp.rightColumn()));
if (refreshing) {
rightSource.startTrackingPrevValues();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ private static QueryTable internalJoin(

return makeResult(leftTable, rightTable, columnsToAdd, resultStateManager,
resultRowSet.toTracking(),
cs -> new CrossJoinRightColumnSource<>(resultStateManager, cs, rightTable.isRefreshing()));
cs -> CrossJoinRightColumnSource.maybeWrap(
resultStateManager, cs, rightTable.isRefreshing()));
}

final LeftOnlyIncrementalChunkedCrossJoinStateManager jsm =
Expand All @@ -190,7 +191,7 @@ private static QueryTable internalJoin(
final TrackingWritableRowSet resultRowSet =
jsm.buildLeftTicking(leftTable, rightTable, bucketingContext.rightSources).toTracking();
final QueryTable resultTable = makeResult(leftTable, rightTable, columnsToAdd, jsm, resultRowSet,
cs -> new CrossJoinRightColumnSource<>(jsm, cs, rightTable.isRefreshing()));
cs -> CrossJoinRightColumnSource.maybeWrap(jsm, cs, rightTable.isRefreshing()));

jsm.startTrackingPrevValues();
final ModifiedColumnSet.Transformer leftTransformer = leftTable.newModifiedColumnSetTransformer(
Expand Down Expand Up @@ -290,7 +291,7 @@ public void onUpdate(final TableUpdate upstream) {
final TrackingWritableRowSet resultRowSet = jsm.build(leftTable, rightTable).toTracking();

final QueryTable resultTable = makeResult(leftTable, rightTable, columnsToAdd, jsm, resultRowSet,
cs -> new CrossJoinRightColumnSource<>(jsm, cs, rightTable.isRefreshing()));
cs -> CrossJoinRightColumnSource.maybeWrap(jsm, cs, rightTable.isRefreshing()));

final ModifiedColumnSet.Transformer rightTransformer =
rightTable.newModifiedColumnSetTransformer(resultTable, columnsToAdd);
Expand Down Expand Up @@ -1045,7 +1046,7 @@ private static QueryTable zeroKeyColumnsJoin(
}

final QueryTable result = makeResult(leftTable, rightTable, columnsToAdd, crossJoinState, resultRowSet,
cs -> new BitMaskingColumnSource<>(crossJoinState, cs));
cs -> BitMaskingColumnSource.maybeWrap(crossJoinState, cs));

if (leftTable.isRefreshing() || rightTable.isRefreshing()) {
crossJoinState.startTrackingPrevious();
Expand Down Expand Up @@ -1409,8 +1410,7 @@ private static <T extends ColumnSource<?>> QueryTable makeResult(
final Map<String, ColumnSource<?>> columnSourceMap = new LinkedHashMap<>();

for (final Map.Entry<String, ColumnSource<?>> leftColumn : leftTable.getColumnSourceMap().entrySet()) {
final BitShiftingColumnSource<?> wrappedSource =
new BitShiftingColumnSource<>(joinState, leftColumn.getValue());
final ColumnSource<?> wrappedSource = BitShiftingColumnSource.maybeWrap(joinState, leftColumn.getValue());
columnSourceMap.put(leftColumn.getKey(), wrappedSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Result<QueryTable> initialize(boolean usePrev, long beforeClock) {
final long size = usePrev ? rowSet.sizePrev() : rowSet.size();

for (Map.Entry<String, ColumnSource<?>> entry : parent.getColumnSourceMap().entrySet()) {
resultColumns.put(entry.getKey(), new RedirectedColumnSource<>(rowRedirection, entry.getValue()));
resultColumns.put(entry.getKey(), RedirectedColumnSource.maybeRedirect(rowRedirection, entry.getValue()));
}

resultTable = new QueryTable(RowSetFactory.flat(size).toTracking(), resultColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,9 @@ private static QueryTable makeResult(@NotNull final QueryTable leftTable,
final boolean rightRefreshingColumns) {
final Map<String, ColumnSource<?>> columnSourceMap = new LinkedHashMap<>(leftTable.getColumnSourceMap());
for (MatchPair mp : columnsToAdd) {
final RedirectedColumnSource<?> redirectedColumnSource =
new RedirectedColumnSource<>(rowRedirection, rightTable.getColumnSource(mp.rightColumn()));
// note that we must always redirect the right-hand side, because unmatched rows will be redirected to null
final ColumnSource<?> redirectedColumnSource =
RedirectedColumnSource.alwaysRedirect(rowRedirection, rightTable.getColumnSource(mp.rightColumn()));
if (rightRefreshingColumns) {
redirectedColumnSource.startTrackingPrevValues();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2297,7 +2297,7 @@ public Table ungroup(boolean nullFill, Collection<? extends ColumnName> columnsT
ungroupedSource.initializeBase(initialBase);
result = ungroupedSource;
} else {
result = new BitShiftingColumnSource<>(shiftState, column);
result = BitShiftingColumnSource.maybeWrap(shiftState, column);
}
resultMap.put(name, result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private QueryTable historicalSort(SortHelpers.SortMapping sortedKeys) {
final Map<String, ColumnSource<?>> resultMap = new LinkedHashMap<>();
for (Map.Entry<String, ColumnSource<?>> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) {
resultMap.put(stringColumnSourceEntry.getKey(),
new RedirectedColumnSource<>(sortMapping, stringColumnSourceEntry.getValue()));
RedirectedColumnSource.maybeRedirect(sortMapping, stringColumnSourceEntry.getValue()));
}

resultTable = new QueryTable(resultRowSet, resultMap);
Expand Down Expand Up @@ -148,7 +148,7 @@ private Result<QueryTable> streamSort(@NotNull final SortHelpers.SortMapping ini
final Map<String, ColumnSource<?>> resultMap = new LinkedHashMap<>();
for (Map.Entry<String, ColumnSource<?>> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) {
resultMap.put(stringColumnSourceEntry.getKey(),
new RedirectedColumnSource<>(sortMapping, stringColumnSourceEntry.getValue()));
RedirectedColumnSource.maybeRedirect(sortMapping, stringColumnSourceEntry.getValue()));
}

resultTable = new QueryTable(resultRowSet, resultMap);
Expand Down Expand Up @@ -256,7 +256,7 @@ public Result<QueryTable> initialize(boolean usePrev, long beforeClock) {

for (Map.Entry<String, ColumnSource<?>> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) {
resultMap.put(stringColumnSourceEntry.getKey(),
new RedirectedColumnSource<>(sortMapping, stringColumnSourceEntry.getValue()));
RedirectedColumnSource.maybeRedirect(sortMapping, stringColumnSourceEntry.getValue()));
}

// noinspection unchecked
Expand Down Expand Up @@ -288,13 +288,19 @@ public Result<QueryTable> initialize(boolean usePrev, long beforeClock) {
}
}

/**
* Get the row redirection for a sort result.
*
* @param sortResult The sort result table; <em>must</em> be the direct result of a sort.
* @return The row redirection if at least one column required redirection, otherwise {@code null}
*/
public static RowRedirection getRowRedirection(@NotNull final Table sortResult) {
final String firstColumnName = sortResult.getDefinition().getColumns().get(0).getName();
final ColumnSource<?> firstColumnSource = sortResult.getColumnSource(firstColumnName);
if (!(firstColumnSource instanceof RedirectedColumnSource)) {
return null;
for (final ColumnSource<?> columnSource : sortResult.getColumnSources()) {
if (columnSource instanceof RedirectedColumnSource) {
return ((RedirectedColumnSource<?>) columnSource).getRowRedirection();
}
}
return ((RedirectedColumnSource) firstColumnSource).getRowRedirection();
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ abstract class BaseAddOnlyFirstOrLastChunkedOperator

this.resultColumns = new LinkedHashMap<>(resultPairs.length);
for (final MatchPair mp : resultPairs) {
// noinspection unchecked
resultColumns.put(mp.leftColumn(),
new RedirectedColumnSource(rowRedirection, originalTable.getColumnSource(mp.rightColumn())));
resultColumns.put(mp.leftColumn(), RedirectedColumnSource.maybeRedirect(
rowRedirection, originalTable.getColumnSource(mp.rightColumn())));
}
if (exposeRedirectionAs != null) {
resultColumns.put(exposeRedirectionAs, redirections);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ public class FirstOrLastChunkedOperator

this.resultColumns = new LinkedHashMap<>(resultPairs.length);
for (final MatchPair mp : resultPairs) {
// noinspection unchecked
resultColumns.put(mp.leftColumn(),
new RedirectedColumnSource(rowRedirection, originalTable.getColumnSource(mp.rightColumn())));
resultColumns.put(mp.leftColumn(), RedirectedColumnSource.maybeRedirect(
rowRedirection, originalTable.getColumnSource(mp.rightColumn())));
}
exposeRedirections = exposeRedirectionAs != null;
if (exposeRedirectionAs != null) {
Expand Down Expand Up @@ -298,9 +297,8 @@ private class DuplicateOperator implements IterativeChunkedAggregationOperator {

private DuplicateOperator(MatchPair[] resultPairs, Table table, String exposeRedirectionAs) {
for (final MatchPair mp : resultPairs) {
// noinspection unchecked
resultColumns.put(mp.leftColumn(),
new RedirectedColumnSource(rowRedirection, table.getColumnSource(mp.rightColumn())));
RedirectedColumnSource.maybeRedirect(rowRedirection, table.getColumnSource(mp.rightColumn())));
}
if (exposeRedirectionAs != null) {
resultColumns.put(exposeRedirectionAs, redirections);
Expand Down Expand Up @@ -466,9 +464,8 @@ private ComplementaryOperator(boolean isFirst, MatchPair[] resultPairs, Table ta

this.resultColumns = new LinkedHashMap<>(resultPairs.length);
for (final MatchPair mp : resultPairs) {
// noinspection unchecked
resultColumns.put(mp.leftColumn(),
new RedirectedColumnSource(rowRedirection, table.getColumnSource(mp.rightColumn())));
RedirectedColumnSource.maybeRedirect(rowRedirection, table.getColumnSource(mp.rightColumn())));
}
exposeRedirections = exposeRedirectionAs != null;
if (exposeRedirections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public ColumnSource[] getKeyHashTableSources() {
alternateKeySources[kci], mainKeySources[kci]);
}
// noinspection unchecked
keyHashTableSources[kci] = new RedirectedColumnSource(resultIndexToHashSlot, alternatingColumnSources[kci]);
keyHashTableSources[kci] =
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
RedirectedColumnSource.maybeRedirect(resultIndexToHashSlot, alternatingColumnSources[kci]);
}

return keyHashTableSources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public ColumnSource[] getKeyHashTableSources() {
final ColumnSource[] keyHashTableSources = new ColumnSource[mainKeySources.length];
for (int kci = 0; kci < mainKeySources.length; ++kci) {
// noinspection unchecked
keyHashTableSources[kci] = new RedirectedColumnSource(resultRowKeyToHashSlot,
keyHashTableSources[kci] = RedirectedColumnSource.maybeRedirect(resultRowKeyToHashSlot,
new HashTableColumnSource(mainKeySources[kci], overflowKeySources[kci]));
}
return keyHashTableSources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ public class SortedFirstOrLastChunkedOperator

this.resultColumns = new LinkedHashMap<>();
for (final MatchPair mp : resultNames) {
// noinspection unchecked,rawtypes
resultColumns.put(mp.leftColumn(),
new RedirectedColumnSource(rowRedirection, originalTable.getColumnSource(mp.rightColumn())));
resultColumns.put(mp.leftColumn(), RedirectedColumnSource.maybeRedirect(
rowRedirection, originalTable.getColumnSource(mp.rightColumn())));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public ColumnSource[] getKeyHashTableSources() {
final ColumnSource[] keyHashTableSources = new ColumnSource[mainKeySources.length];
for (int kci = 0; kci < mainKeySources.length; ++kci) {
// noinspection unchecked
keyHashTableSources[kci] = new RedirectedColumnSource(resultIndexToHashSlot, mainKeySources[kci]);
keyHashTableSources[kci] = RedirectedColumnSource.maybeRedirect(resultIndexToHashSlot, mainKeySources[kci]);
}
return keyHashTableSources;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public ColumnSource[] getKeyHashTableSources() {
final ColumnSource[] keyHashTableSources = new ColumnSource[mainKeySources.length];
for (int kci = 0; kci < mainKeySources.length; ++kci) {
// noinspection unchecked
keyHashTableSources[kci] = new RedirectedColumnSource(resultIndexToHashSlot,
keyHashTableSources[kci] = RedirectedColumnSource.maybeRedirect(resultIndexToHashSlot,
new HashTableColumnSource(mainKeySources[kci], overflowKeySources[kci]));
}
return keyHashTableSources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public static InitialSnapshotTable setupInitialSnapshotTable(
writableSources[ci] = ArrayBackedColumnSource.getMemoryColumnSource(
0, column.getDataType(), column.getComponentType());
finalColumns.put(column.getName(),
new WritableRedirectedColumnSource<>(rowRedirection, writableSources[ci], 0));
WritableRedirectedColumnSource.maybeRedirect(rowRedirection, writableSources[ci], 0));
}
// This table does not run, so we don't need to tell our row redirection or column source to start
// tracking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private static Map<String, ColumnSource<?>> getResultSources(Map<String, ? exten
Map<String, ColumnSource<?>> result = new LinkedHashMap<>();
for (Map.Entry<String, ? extends ColumnSource<?>> stringEntry : input.entrySet()) {
ColumnSource<?> value = stringEntry.getValue();
result.put(stringEntry.getKey(), new RedirectedColumnSource<>(rowRedirection, value));
result.put(stringEntry.getKey(), RedirectedColumnSource.maybeRedirect(rowRedirection, value));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public static SelectAndViewAnalyzer create(final Mode mode, final Map<String, Co
}
case SELECT_REDIRECTED_STATIC: {
final WritableColumnSource<?> underlyingSource = sc.newDestInstance(rowSet.size());
final WritableColumnSource<?> scs = new WritableRedirectedColumnSource<>(
final WritableColumnSource<?> scs = WritableRedirectedColumnSource.maybeRedirect(
rowRedirection, underlyingSource, rowSet.size());
analyzer =
analyzer.createLayerForSelect(rowSet, sc.getName(), sc, scs, underlyingSource, distinctDeps,
Expand All @@ -183,7 +183,8 @@ public static SelectAndViewAnalyzer create(final Mode mode, final Map<String, Co
WritableColumnSource<?> underlyingSource = null;
if (rowRedirection != null) {
underlyingSource = scs;
scs = new WritableRedirectedColumnSource<>(rowRedirection, underlyingSource, rowSet.intSize());
scs = WritableRedirectedColumnSource.maybeRedirect(
rowRedirection, underlyingSource, rowSet.intSize());
}
analyzer =
analyzer.createLayerForSelect(rowSet, sc.getName(), sc, scs, underlyingSource, distinctDeps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ final public class StaticFlattenLayer extends SelectAndViewAnalyzer {
return;
}

overriddenColumns.put(name, new RedirectedColumnSource<>(rowRedirection, cs));
overriddenColumns.put(name, RedirectedColumnSource.maybeRedirect(rowRedirection, cs));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,25 @@

public class BitMaskingColumnSource<T> extends AbstractColumnSource<T> implements UngroupableColumnSource {

/**
* Wrap the innerSource if it is not agnostic to redirection. Otherwise, return the innerSource.
*
* @param shiftState The cross join shift state to use
* @param innerSource The column source to redirect
*/
public static <T> ColumnSource<T> maybeWrap(
final ZeroKeyCrossJoinShiftState shiftState,
@NotNull final ColumnSource<T> innerSource) {
if (innerSource instanceof RowKeyAgnosticChunkSource) {
return innerSource;
}
return new BitMaskingColumnSource<>(shiftState, innerSource);
}

private final ZeroKeyCrossJoinShiftState shiftState;
private final ColumnSource<T> innerSource;

public BitMaskingColumnSource(
protected BitMaskingColumnSource(
final ZeroKeyCrossJoinShiftState shiftState,
@NotNull final ColumnSource<T> innerSource) {
super(innerSource.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,27 @@

public class BitShiftingColumnSource<T> extends AbstractColumnSource<T> implements UngroupableColumnSource {

/**
* Wrap the innerSource if it is not agnostic to redirection. Otherwise, return the innerSource.
*
* @param shiftState The cross join shift state to use
* @param innerSource The column source to redirect
*/
public static <T> ColumnSource<T> maybeWrap(
@NotNull final CrossJoinShiftState shiftState,
@NotNull final ColumnSource<T> innerSource) {
if (innerSource instanceof RowKeyAgnosticChunkSource) {
return innerSource;
}
return new BitShiftingColumnSource<>(shiftState, innerSource);
}

private final CrossJoinShiftState shiftState;
private final ColumnSource<T> innerSource;

public BitShiftingColumnSource(final CrossJoinShiftState shiftState, @NotNull final ColumnSource<T> innerSource) {
protected BitShiftingColumnSource(
@NotNull final CrossJoinShiftState shiftState,
@NotNull final ColumnSource<T> innerSource) {
super(innerSource.getType());
this.shiftState = shiftState;
this.innerSource = innerSource;
Expand Down
Loading