diff --git a/engine/rowset/src/main/java/io/deephaven/engine/rowset/RowSequence.java b/engine/rowset/src/main/java/io/deephaven/engine/rowset/RowSequence.java index 5e9611e499f..a8d5e416e0e 100644 --- a/engine/rowset/src/main/java/io/deephaven/engine/rowset/RowSequence.java +++ b/engine/rowset/src/main/java/io/deephaven/engine/rowset/RowSequence.java @@ -8,7 +8,6 @@ import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeyRanges; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; -import io.deephaven.engine.rowset.chunkattributes.RowKeys; import io.deephaven.chunk.LongChunk; import io.deephaven.chunk.WritableLongChunk; import io.deephaven.util.datastructures.LongRangeAbortableConsumer; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java index 66800ca77c3..80469aea3e5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java @@ -1528,8 +1528,9 @@ private static QueryTable makeResult(QueryTable leftTable, Table rightTable, Row MatchPair[] columnsToAdd, boolean refreshing) { final Map> columnSources = new LinkedHashMap<>(leftTable.getColumnSourceMap()); Arrays.stream(columnsToAdd).forEach(mp -> { - final RedirectedColumnSource rightSource = - new RedirectedColumnSource<>(rowRedirection, rightTable.getColumnSource(mp.rightColumn())); + // note that we must always redirect the right-hand side, because unmatched rows will be redirected to null + final ColumnSource rightSource = + RedirectedColumnSource.alwaysRedirect(rowRedirection, rightTable.getColumnSource(mp.rightColumn())); if (refreshing) { rightSource.startTrackingPrevValues(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/CrossJoinHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/CrossJoinHelper.java index 972f81a0ec3..7f188cbcf22 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/CrossJoinHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/CrossJoinHelper.java @@ -177,7 +177,8 @@ private static QueryTable internalJoin( return makeResult(leftTable, rightTable, columnsToAdd, resultStateManager, resultRowSet.toTracking(), - cs -> new CrossJoinRightColumnSource<>(resultStateManager, cs, rightTable.isRefreshing())); + cs -> CrossJoinRightColumnSource.maybeWrap( + resultStateManager, cs, rightTable.isRefreshing())); } final LeftOnlyIncrementalChunkedCrossJoinStateManager jsm = @@ -190,7 +191,7 @@ private static QueryTable internalJoin( final TrackingWritableRowSet resultRowSet = jsm.buildLeftTicking(leftTable, rightTable, bucketingContext.rightSources).toTracking(); final QueryTable resultTable = makeResult(leftTable, rightTable, columnsToAdd, jsm, resultRowSet, - cs -> new CrossJoinRightColumnSource<>(jsm, cs, rightTable.isRefreshing())); + cs -> CrossJoinRightColumnSource.maybeWrap(jsm, cs, rightTable.isRefreshing())); jsm.startTrackingPrevValues(); final ModifiedColumnSet.Transformer leftTransformer = leftTable.newModifiedColumnSetTransformer( @@ -290,7 +291,7 @@ public void onUpdate(final TableUpdate upstream) { final TrackingWritableRowSet resultRowSet = jsm.build(leftTable, rightTable).toTracking(); final QueryTable resultTable = makeResult(leftTable, rightTable, columnsToAdd, jsm, resultRowSet, - cs -> new CrossJoinRightColumnSource<>(jsm, cs, rightTable.isRefreshing())); + cs -> CrossJoinRightColumnSource.maybeWrap(jsm, cs, rightTable.isRefreshing())); final ModifiedColumnSet.Transformer rightTransformer = rightTable.newModifiedColumnSetTransformer(resultTable, columnsToAdd); @@ -1045,7 +1046,7 @@ private static QueryTable zeroKeyColumnsJoin( } final QueryTable result = makeResult(leftTable, rightTable, columnsToAdd, crossJoinState, resultRowSet, - cs -> new BitMaskingColumnSource<>(crossJoinState, cs)); + cs -> BitMaskingColumnSource.maybeWrap(crossJoinState, cs)); if (leftTable.isRefreshing() || rightTable.isRefreshing()) { crossJoinState.startTrackingPrevious(); @@ -1409,8 +1410,7 @@ private static > QueryTable makeResult( final Map> columnSourceMap = new LinkedHashMap<>(); for (final Map.Entry> leftColumn : leftTable.getColumnSourceMap().entrySet()) { - final BitShiftingColumnSource wrappedSource = - new BitShiftingColumnSource<>(joinState, leftColumn.getValue()); + final ColumnSource wrappedSource = BitShiftingColumnSource.maybeWrap(joinState, leftColumn.getValue()); columnSourceMap.put(leftColumn.getKey(), wrappedSource); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/FlattenOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/FlattenOperation.java index 499cf580af1..ee526102699 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/FlattenOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/FlattenOperation.java @@ -43,7 +43,7 @@ public Result initialize(boolean usePrev, long beforeClock) { final long size = usePrev ? rowSet.sizePrev() : rowSet.size(); for (Map.Entry> entry : parent.getColumnSourceMap().entrySet()) { - resultColumns.put(entry.getKey(), new RedirectedColumnSource<>(rowRedirection, entry.getValue())); + resultColumns.put(entry.getKey(), RedirectedColumnSource.maybeRedirect(rowRedirection, entry.getValue())); } resultTable = new QueryTable(RowSetFactory.flat(size).toTracking(), resultColumns); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/NaturalJoinHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/NaturalJoinHelper.java index c1e5e02d4d0..0907d8f72b9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/NaturalJoinHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/NaturalJoinHelper.java @@ -440,8 +440,9 @@ private static QueryTable makeResult(@NotNull final QueryTable leftTable, final boolean rightRefreshingColumns) { final Map> columnSourceMap = new LinkedHashMap<>(leftTable.getColumnSourceMap()); for (MatchPair mp : columnsToAdd) { - final RedirectedColumnSource redirectedColumnSource = - new RedirectedColumnSource<>(rowRedirection, rightTable.getColumnSource(mp.rightColumn())); + // note that we must always redirect the right-hand side, because unmatched rows will be redirected to null + final ColumnSource redirectedColumnSource = + RedirectedColumnSource.alwaysRedirect(rowRedirection, rightTable.getColumnSource(mp.rightColumn())); if (rightRefreshingColumns) { redirectedColumnSource.startTrackingPrevValues(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index d6a0d575ec0..da1a97a1391 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -2297,7 +2297,7 @@ public Table ungroup(boolean nullFill, Collection columnsT ungroupedSource.initializeBase(initialBase); result = ungroupedSource; } else { - result = new BitShiftingColumnSource<>(shiftState, column); + result = BitShiftingColumnSource.maybeWrap(shiftState, column); } resultMap.put(name, result); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java index 18cc4aba062..374e0722c0d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java @@ -116,7 +116,7 @@ private QueryTable historicalSort(SortHelpers.SortMapping sortedKeys) { final Map> resultMap = new LinkedHashMap<>(); for (Map.Entry> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) { resultMap.put(stringColumnSourceEntry.getKey(), - new RedirectedColumnSource<>(sortMapping, stringColumnSourceEntry.getValue())); + RedirectedColumnSource.maybeRedirect(sortMapping, stringColumnSourceEntry.getValue())); } resultTable = new QueryTable(resultRowSet, resultMap); @@ -148,7 +148,7 @@ private Result streamSort(@NotNull final SortHelpers.SortMapping ini final Map> resultMap = new LinkedHashMap<>(); for (Map.Entry> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) { resultMap.put(stringColumnSourceEntry.getKey(), - new RedirectedColumnSource<>(sortMapping, stringColumnSourceEntry.getValue())); + RedirectedColumnSource.maybeRedirect(sortMapping, stringColumnSourceEntry.getValue())); } resultTable = new QueryTable(resultRowSet, resultMap); @@ -256,7 +256,7 @@ public Result initialize(boolean usePrev, long beforeClock) { for (Map.Entry> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) { resultMap.put(stringColumnSourceEntry.getKey(), - new RedirectedColumnSource<>(sortMapping, stringColumnSourceEntry.getValue())); + RedirectedColumnSource.maybeRedirect(sortMapping, stringColumnSourceEntry.getValue())); } // noinspection unchecked @@ -288,13 +288,19 @@ public Result initialize(boolean usePrev, long beforeClock) { } } + /** + * Get the row redirection for a sort result. + * + * @param sortResult The sort result table; must be the direct result of a sort. + * @return The row redirection if at least one column required redirection, otherwise {@code null} + */ public static RowRedirection getRowRedirection(@NotNull final Table sortResult) { - final String firstColumnName = sortResult.getDefinition().getColumns().get(0).getName(); - final ColumnSource firstColumnSource = sortResult.getColumnSource(firstColumnName); - if (!(firstColumnSource instanceof RedirectedColumnSource)) { - return null; + for (final ColumnSource columnSource : sortResult.getColumnSources()) { + if (columnSource instanceof RedirectedColumnSource) { + return ((RedirectedColumnSource) columnSource).getRowRedirection(); + } } - return ((RedirectedColumnSource) firstColumnSource).getRowRedirection(); + return null; } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/BaseAddOnlyFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/BaseAddOnlyFirstOrLastChunkedOperator.java index b1c88357e58..dd547a33b64 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/BaseAddOnlyFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/BaseAddOnlyFirstOrLastChunkedOperator.java @@ -35,9 +35,8 @@ abstract class BaseAddOnlyFirstOrLastChunkedOperator this.resultColumns = new LinkedHashMap<>(resultPairs.length); for (final MatchPair mp : resultPairs) { - // noinspection unchecked - resultColumns.put(mp.leftColumn(), - new RedirectedColumnSource(rowRedirection, originalTable.getColumnSource(mp.rightColumn()))); + resultColumns.put(mp.leftColumn(), RedirectedColumnSource.maybeRedirect( + rowRedirection, originalTable.getColumnSource(mp.rightColumn()))); } if (exposeRedirectionAs != null) { resultColumns.put(exposeRedirectionAs, redirections); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FirstOrLastChunkedOperator.java index 2769566928d..77144d84ea0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/FirstOrLastChunkedOperator.java @@ -44,9 +44,8 @@ public class FirstOrLastChunkedOperator this.resultColumns = new LinkedHashMap<>(resultPairs.length); for (final MatchPair mp : resultPairs) { - // noinspection unchecked - resultColumns.put(mp.leftColumn(), - new RedirectedColumnSource(rowRedirection, originalTable.getColumnSource(mp.rightColumn()))); + resultColumns.put(mp.leftColumn(), RedirectedColumnSource.maybeRedirect( + rowRedirection, originalTable.getColumnSource(mp.rightColumn()))); } exposeRedirections = exposeRedirectionAs != null; if (exposeRedirectionAs != null) { @@ -298,9 +297,8 @@ private class DuplicateOperator implements IterativeChunkedAggregationOperator { private DuplicateOperator(MatchPair[] resultPairs, Table table, String exposeRedirectionAs) { for (final MatchPair mp : resultPairs) { - // noinspection unchecked resultColumns.put(mp.leftColumn(), - new RedirectedColumnSource(rowRedirection, table.getColumnSource(mp.rightColumn()))); + RedirectedColumnSource.maybeRedirect(rowRedirection, table.getColumnSource(mp.rightColumn()))); } if (exposeRedirectionAs != null) { resultColumns.put(exposeRedirectionAs, redirections); @@ -466,9 +464,8 @@ private ComplementaryOperator(boolean isFirst, MatchPair[] resultPairs, Table ta this.resultColumns = new LinkedHashMap<>(resultPairs.length); for (final MatchPair mp : resultPairs) { - // noinspection unchecked resultColumns.put(mp.leftColumn(), - new RedirectedColumnSource(rowRedirection, table.getColumnSource(mp.rightColumn()))); + RedirectedColumnSource.maybeRedirect(rowRedirection, table.getColumnSource(mp.rightColumn()))); } exposeRedirections = exposeRedirectionAs != null; if (exposeRedirections) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.java index 8c6f5fd55a2..90b672d798b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.java @@ -140,7 +140,8 @@ public ColumnSource[] getKeyHashTableSources() { alternateKeySources[kci], mainKeySources[kci]); } // noinspection unchecked - keyHashTableSources[kci] = new RedirectedColumnSource(resultIndexToHashSlot, alternatingColumnSources[kci]); + keyHashTableSources[kci] = + RedirectedColumnSource.maybeRedirect(resultIndexToHashSlot, alternatingColumnSources[kci]); } return keyHashTableSources; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerTypedBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerTypedBase.java index f6ce5d37929..c0fc54b923d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerTypedBase.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerTypedBase.java @@ -76,7 +76,7 @@ public ColumnSource[] getKeyHashTableSources() { final ColumnSource[] keyHashTableSources = new ColumnSource[mainKeySources.length]; for (int kci = 0; kci < mainKeySources.length; ++kci) { // noinspection unchecked - keyHashTableSources[kci] = new RedirectedColumnSource(resultRowKeyToHashSlot, + keyHashTableSources[kci] = RedirectedColumnSource.maybeRedirect(resultRowKeyToHashSlot, new HashTableColumnSource(mainKeySources[kci], overflowKeySources[kci])); } return keyHashTableSources; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/SortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/SortedFirstOrLastChunkedOperator.java index daa30528726..c86df1511ca 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/SortedFirstOrLastChunkedOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/SortedFirstOrLastChunkedOperator.java @@ -49,9 +49,8 @@ public class SortedFirstOrLastChunkedOperator this.resultColumns = new LinkedHashMap<>(); for (final MatchPair mp : resultNames) { - // noinspection unchecked,rawtypes - resultColumns.put(mp.leftColumn(), - new RedirectedColumnSource(rowRedirection, originalTable.getColumnSource(mp.rightColumn()))); + resultColumns.put(mp.leftColumn(), RedirectedColumnSource.maybeRedirect( + rowRedirection, originalTable.getColumnSource(mp.rightColumn()))); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StaticChunkedOperatorAggregationStateManagerOpenAddressedBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StaticChunkedOperatorAggregationStateManagerOpenAddressedBase.java index 13a9134b9c7..7e5e2f86fa6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StaticChunkedOperatorAggregationStateManagerOpenAddressedBase.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StaticChunkedOperatorAggregationStateManagerOpenAddressedBase.java @@ -69,7 +69,7 @@ public ColumnSource[] getKeyHashTableSources() { final ColumnSource[] keyHashTableSources = new ColumnSource[mainKeySources.length]; for (int kci = 0; kci < mainKeySources.length; ++kci) { // noinspection unchecked - keyHashTableSources[kci] = new RedirectedColumnSource(resultIndexToHashSlot, mainKeySources[kci]); + keyHashTableSources[kci] = RedirectedColumnSource.maybeRedirect(resultIndexToHashSlot, mainKeySources[kci]); } return keyHashTableSources; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StaticChunkedOperatorAggregationStateManagerTypedBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StaticChunkedOperatorAggregationStateManagerTypedBase.java index 32a3a555d74..26d492d449f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StaticChunkedOperatorAggregationStateManagerTypedBase.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StaticChunkedOperatorAggregationStateManagerTypedBase.java @@ -123,7 +123,7 @@ public ColumnSource[] getKeyHashTableSources() { final ColumnSource[] keyHashTableSources = new ColumnSource[mainKeySources.length]; for (int kci = 0; kci < mainKeySources.length; ++kci) { // noinspection unchecked - keyHashTableSources[kci] = new RedirectedColumnSource(resultIndexToHashSlot, + keyHashTableSources[kci] = RedirectedColumnSource.maybeRedirect(resultIndexToHashSlot, new HashTableColumnSource(mainKeySources[kci], overflowKeySources[kci])); } return keyHashTableSources; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/InitialSnapshotTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/InitialSnapshotTable.java index e15c2567830..0cf2b3bfab0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/InitialSnapshotTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/InitialSnapshotTable.java @@ -195,7 +195,7 @@ public static InitialSnapshotTable setupInitialSnapshotTable( writableSources[ci] = ArrayBackedColumnSource.getMemoryColumnSource( 0, column.getDataType(), column.getComponentType()); finalColumns.put(column.getName(), - new WritableRedirectedColumnSource<>(rowRedirection, writableSources[ci], 0)); + WritableRedirectedColumnSource.maybeRedirect(rowRedirection, writableSources[ci], 0)); } // This table does not run, so we don't need to tell our row redirection or column source to start // tracking diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/QueryReplayGroupedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/QueryReplayGroupedTable.java index f03d51ac41a..7b4ee746350 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/QueryReplayGroupedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/QueryReplayGroupedTable.java @@ -34,7 +34,7 @@ private static Map> getResultSources(Map> result = new LinkedHashMap<>(); for (Map.Entry> stringEntry : input.entrySet()) { ColumnSource value = stringEntry.getValue(); - result.put(stringEntry.getKey(), new RedirectedColumnSource<>(rowRedirection, value)); + result.put(stringEntry.getKey(), RedirectedColumnSource.maybeRedirect(rowRedirection, value)); } return result; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java index 194c34bbc69..4e6ea764e14 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java @@ -167,7 +167,7 @@ public static SelectAndViewAnalyzer create(final Mode mode, final Map underlyingSource = sc.newDestInstance(rowSet.size()); - final WritableColumnSource scs = new WritableRedirectedColumnSource<>( + final WritableColumnSource scs = WritableRedirectedColumnSource.maybeRedirect( rowRedirection, underlyingSource, rowSet.size()); analyzer = analyzer.createLayerForSelect(rowSet, sc.getName(), sc, scs, underlyingSource, distinctDeps, @@ -183,7 +183,8 @@ public static SelectAndViewAnalyzer create(final Mode mode, final Map underlyingSource = null; if (rowRedirection != null) { underlyingSource = scs; - scs = new WritableRedirectedColumnSource<>(rowRedirection, underlyingSource, rowSet.intSize()); + scs = WritableRedirectedColumnSource.maybeRedirect( + rowRedirection, underlyingSource, rowSet.intSize()); } analyzer = analyzer.createLayerForSelect(rowSet, sc.getName(), sc, scs, underlyingSource, distinctDeps, diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java index e9e2f95294b..ba4d224f4c9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java @@ -48,7 +48,7 @@ final public class StaticFlattenLayer extends SelectAndViewAnalyzer { return; } - overriddenColumns.put(name, new RedirectedColumnSource<>(rowRedirection, cs)); + overriddenColumns.put(name, RedirectedColumnSource.maybeRedirect(rowRedirection, cs)); }); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/BitMaskingColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/BitMaskingColumnSource.java index 29e6b385f55..1f967af1d33 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/BitMaskingColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/BitMaskingColumnSource.java @@ -21,10 +21,25 @@ public class BitMaskingColumnSource extends AbstractColumnSource implements UngroupableColumnSource { + /** + * Wrap the innerSource if it is not agnostic to redirection. Otherwise, return the innerSource. + * + * @param shiftState The cross join shift state to use + * @param innerSource The column source to redirect + */ + public static ColumnSource maybeWrap( + final ZeroKeyCrossJoinShiftState shiftState, + @NotNull final ColumnSource innerSource) { + if (innerSource instanceof RowKeyAgnosticChunkSource) { + return innerSource; + } + return new BitMaskingColumnSource<>(shiftState, innerSource); + } + private final ZeroKeyCrossJoinShiftState shiftState; private final ColumnSource innerSource; - public BitMaskingColumnSource( + protected BitMaskingColumnSource( final ZeroKeyCrossJoinShiftState shiftState, @NotNull final ColumnSource innerSource) { super(innerSource.getType()); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/BitShiftingColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/BitShiftingColumnSource.java index 5fec3f136c7..4de026a49f9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/BitShiftingColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/BitShiftingColumnSource.java @@ -24,10 +24,27 @@ public class BitShiftingColumnSource extends AbstractColumnSource implements UngroupableColumnSource { + /** + * Wrap the innerSource if it is not agnostic to redirection. Otherwise, return the innerSource. + * + * @param shiftState The cross join shift state to use + * @param innerSource The column source to redirect + */ + public static ColumnSource maybeWrap( + @NotNull final CrossJoinShiftState shiftState, + @NotNull final ColumnSource innerSource) { + if (innerSource instanceof RowKeyAgnosticChunkSource) { + return innerSource; + } + return new BitShiftingColumnSource<>(shiftState, innerSource); + } + private final CrossJoinShiftState shiftState; private final ColumnSource innerSource; - public BitShiftingColumnSource(final CrossJoinShiftState shiftState, @NotNull final ColumnSource innerSource) { + protected BitShiftingColumnSource( + @NotNull final CrossJoinShiftState shiftState, + @NotNull final ColumnSource innerSource) { super(innerSource.getType()); this.shiftState = shiftState; this.innerSource = innerSource; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/BooleanSingleValueSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/BooleanSingleValueSource.java new file mode 100644 index 00000000000..885c44a8569 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/BooleanSingleValueSource.java @@ -0,0 +1,158 @@ +/** + * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending + */ +/* + * --------------------------------------------------------------------------------------------------------------------- + * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit CharacterSingleValueSource and regenerate + * --------------------------------------------------------------------------------------------------------------------- + */ +package io.deephaven.engine.table.impl.sources; + +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.WritableObjectChunk; + +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; +import io.deephaven.engine.updategraph.LogicalClock; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; +import io.deephaven.chunk.Chunk; +import io.deephaven.chunk.LongChunk; +import io.deephaven.engine.rowset.RowSequence; +import org.jetbrains.annotations.NotNull; + +import static io.deephaven.util.QueryConstants.NULL_BOOLEAN; + +/** + * Single value source for Boolean. + *

+ * The C-haracterSingleValueSource is replicated to all other types with + * io.deephaven.engine.table.impl.sources.Replicate. + * + * (C-haracter is deliberately spelled that way in order to prevent Replicate from altering this very comment). + */ +public class BooleanSingleValueSource extends SingleValueColumnSource implements MutableColumnSourceGetDefaults.ForBoolean { + + private Boolean current; + private transient Boolean prev; + + // region Constructor + public BooleanSingleValueSource() { + super(Boolean.class); + current = NULL_BOOLEAN; + prev = NULL_BOOLEAN; + } + // endregion Constructor + + @Override + public final void set(Boolean value) { + if (isTrackingPrevValues) { + final long currentStep = LogicalClock.DEFAULT.currentStep(); + if (changeTime < currentStep) { + prev = current; + changeTime = currentStep; + } + } + current = value; + } + + // region UnboxedSetter + // endregion UnboxedSetter + + @Override + public final void setNull() { + set(NULL_BOOLEAN); + } + + @Override + public final void set(long key, Boolean value) { + set(value); + } + + @Override + public final void setNull(long key) { + // region null set + set(NULL_BOOLEAN); + // endregion null set + } + + @Override + public final Boolean get(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_BOOLEAN; + } + return current; + } + + @Override + public final Boolean getPrev(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_BOOLEAN; + } + if (!isTrackingPrevValues || changeTime < LogicalClock.DEFAULT.currentStep()) { + return current; + } + return prev; + } + + @Override + public final void fillFromChunk(@NotNull FillFromContext context, @NotNull Chunk src, @NotNull RowSequence rowSequence) { + if (rowSequence.size() == 0) { + return; + } + // We can only hold one value anyway, so arbitrarily take the first value in the chunk and ignore the rest. + final ObjectChunk chunk = src.asObjectChunk(); + set(chunk.get(0)); + } + + @Override + public void fillFromChunkUnordered(@NotNull FillFromContext context, @NotNull Chunk src, @NotNull LongChunk keys) { + if (keys.size() == 0) { + return; + } + // We can only hold one value anyway, so arbitrarily take the first value in the chunk and ignore the rest. + final ObjectChunk chunk = src.asObjectChunk(); + set(chunk.get(0)); + } + + @Override + public void fillChunk(@NotNull FillContext context, @NotNull WritableChunk destination, + @NotNull RowSequence rowSequence) { + destination.setSize(rowSequence.intSize()); + destination.asWritableObjectChunk().fillWithValue(0, rowSequence.intSize(), current); + } + + @Override + public void fillPrevChunk(@NotNull FillContext context, + @NotNull WritableChunk destination, @NotNull RowSequence rowSequence) { + Boolean value = getPrev(0); // avoid duplicating the current vs prev logic in getPrev + destination.setSize(rowSequence.intSize()); + destination.asWritableObjectChunk().fillWithValue(0, rowSequence.intSize(), value); + } + + @Override + public void fillChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableObjectChunk destChunk = dest.asWritableObjectChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_BOOLEAN : current); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + Boolean value = getPrev(0); // avoid duplicating the current vs prev logic in getPrev + final WritableObjectChunk destChunk = dest.asWritableObjectChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_BOOLEAN : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public boolean providesFillUnordered() { + return true; + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ByteSingleValueSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ByteSingleValueSource.java index 7ac7717420a..fa78a8ed06c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ByteSingleValueSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ByteSingleValueSource.java @@ -8,12 +8,12 @@ */ package io.deephaven.engine.table.impl.sources; +import io.deephaven.chunk.WritableByteChunk; +import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.rowset.chunkattributes.RowKeys; -import io.deephaven.util.QueryConstants; import io.deephaven.chunk.ByteChunk; import io.deephaven.chunk.Chunk; import io.deephaven.chunk.LongChunk; @@ -89,11 +89,17 @@ public final void setNull(long key) { @Override public final byte getByte(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_BYTE; + } return current; } @Override public final byte getPrevByte(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_BYTE; + } if (!isTrackingPrevValues || changeTime < LogicalClock.DEFAULT.currentStep()) { return current; } @@ -119,4 +125,45 @@ public void fillFromChunkUnordered(@NotNull FillFromContext context, @NotNull Ch final ByteChunk chunk = src.asByteChunk(); set(chunk.get(0)); } + + @Override + public void fillChunk(@NotNull FillContext context, @NotNull WritableChunk destination, + @NotNull RowSequence rowSequence) { + destination.setSize(rowSequence.intSize()); + destination.asWritableByteChunk().fillWithValue(0, rowSequence.intSize(), current); + } + + @Override + public void fillPrevChunk(@NotNull FillContext context, + @NotNull WritableChunk destination, @NotNull RowSequence rowSequence) { + byte value = getPrevByte(0); // avoid duplicating the current vs prev logic in getPrevByte + destination.setSize(rowSequence.intSize()); + destination.asWritableByteChunk().fillWithValue(0, rowSequence.intSize(), value); + } + + @Override + public void fillChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableByteChunk destChunk = dest.asWritableByteChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_BYTE : current); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + byte value = getPrevByte(0); // avoid duplicating the current vs prev logic in getPrevByte + final WritableByteChunk destChunk = dest.asWritableByteChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_BYTE : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/CharacterSingleValueSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/CharacterSingleValueSource.java index 330ed39e385..2fbd654e8b4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/CharacterSingleValueSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/CharacterSingleValueSource.java @@ -3,12 +3,12 @@ */ package io.deephaven.engine.table.impl.sources; +import io.deephaven.chunk.WritableCharChunk; +import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.rowset.chunkattributes.RowKeys; -import io.deephaven.util.QueryConstants; import io.deephaven.chunk.CharChunk; import io.deephaven.chunk.Chunk; import io.deephaven.chunk.LongChunk; @@ -84,11 +84,17 @@ public final void setNull(long key) { @Override public final char getChar(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_CHAR; + } return current; } @Override public final char getPrevChar(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_CHAR; + } if (!isTrackingPrevValues || changeTime < LogicalClock.DEFAULT.currentStep()) { return current; } @@ -114,4 +120,45 @@ public void fillFromChunkUnordered(@NotNull FillFromContext context, @NotNull Ch final CharChunk chunk = src.asCharChunk(); set(chunk.get(0)); } + + @Override + public void fillChunk(@NotNull FillContext context, @NotNull WritableChunk destination, + @NotNull RowSequence rowSequence) { + destination.setSize(rowSequence.intSize()); + destination.asWritableCharChunk().fillWithValue(0, rowSequence.intSize(), current); + } + + @Override + public void fillPrevChunk(@NotNull FillContext context, + @NotNull WritableChunk destination, @NotNull RowSequence rowSequence) { + char value = getPrevChar(0); // avoid duplicating the current vs prev logic in getPrevChar + destination.setSize(rowSequence.intSize()); + destination.asWritableCharChunk().fillWithValue(0, rowSequence.intSize(), value); + } + + @Override + public void fillChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableCharChunk destChunk = dest.asWritableCharChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_CHAR : current); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + char value = getPrevChar(0); // avoid duplicating the current vs prev logic in getPrevChar + final WritableCharChunk destChunk = dest.asWritableCharChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_CHAR : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/CrossJoinRightColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/CrossJoinRightColumnSource.java index d732c74ed04..f1cd6747890 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/CrossJoinRightColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/CrossJoinRightColumnSource.java @@ -36,13 +36,34 @@ import static io.deephaven.util.QueryConstants.*; public class CrossJoinRightColumnSource extends AbstractColumnSource implements UngroupableColumnSource { + + /** + * Wrap the innerSource if it is not agnostic to redirection. Otherwise, return the innerSource. + * + * @param crossJoinManager The cross join manager to use + * @param innerSource The column source to redirect + * @param rightIsLive Whether the right side is live + */ + public static ColumnSource maybeWrap( + @NotNull final CrossJoinStateManager crossJoinManager, + @NotNull final ColumnSource innerSource, + boolean rightIsLive) { + // Force wrapping if this is a leftOuterJoin or else we will not see the nulls; unless every row is null. + if ((!crossJoinManager.leftOuterJoin() && innerSource instanceof RowKeyAgnosticChunkSource) + || innerSource instanceof NullValueColumnSource) { + return innerSource; + } + return new CrossJoinRightColumnSource<>(crossJoinManager, innerSource, rightIsLive); + } + private final boolean rightIsLive; private final CrossJoinStateManager crossJoinManager; protected final ColumnSource innerSource; - - public CrossJoinRightColumnSource(@NotNull final CrossJoinStateManager crossJoinManager, - @NotNull final ColumnSource innerSource, boolean rightIsLive) { + protected CrossJoinRightColumnSource( + @NotNull final CrossJoinStateManager crossJoinManager, + @NotNull final ColumnSource innerSource, + boolean rightIsLive) { super(innerSource.getType()); this.rightIsLive = rightIsLive; this.crossJoinManager = crossJoinManager; @@ -397,7 +418,6 @@ private static class FillContext implements ColumnSource.FillContext { private final ResettableWritableChunk innerOrderedValuesSlice; private final DupExpandKernel dupExpandKernel; private final PermuteKernel permuteKernel; - private final boolean allowRightSideNulls; FillContext(final CrossJoinRightColumnSource cs, final int chunkCapacity, final SharedContext sharedContext) { @@ -420,7 +440,6 @@ private static class FillContext implements ColumnSource.FillContext { dupExpandKernel = DupExpandKernel.makeDupExpand(cs.getChunkType()); permuteKernel = PermuteKernel.makePermuteKernel(cs.getChunkType()); } - allowRightSideNulls = cs.crossJoinManager.leftOuterJoin(); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/DoubleSingleValueSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/DoubleSingleValueSource.java index b6e28cd2c91..8ac199e2820 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/DoubleSingleValueSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/DoubleSingleValueSource.java @@ -8,12 +8,12 @@ */ package io.deephaven.engine.table.impl.sources; +import io.deephaven.chunk.WritableDoubleChunk; +import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.rowset.chunkattributes.RowKeys; -import io.deephaven.util.QueryConstants; import io.deephaven.chunk.DoubleChunk; import io.deephaven.chunk.Chunk; import io.deephaven.chunk.LongChunk; @@ -89,11 +89,17 @@ public final void setNull(long key) { @Override public final double getDouble(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_DOUBLE; + } return current; } @Override public final double getPrevDouble(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_DOUBLE; + } if (!isTrackingPrevValues || changeTime < LogicalClock.DEFAULT.currentStep()) { return current; } @@ -119,4 +125,45 @@ public void fillFromChunkUnordered(@NotNull FillFromContext context, @NotNull Ch final DoubleChunk chunk = src.asDoubleChunk(); set(chunk.get(0)); } + + @Override + public void fillChunk(@NotNull FillContext context, @NotNull WritableChunk destination, + @NotNull RowSequence rowSequence) { + destination.setSize(rowSequence.intSize()); + destination.asWritableDoubleChunk().fillWithValue(0, rowSequence.intSize(), current); + } + + @Override + public void fillPrevChunk(@NotNull FillContext context, + @NotNull WritableChunk destination, @NotNull RowSequence rowSequence) { + double value = getPrevDouble(0); // avoid duplicating the current vs prev logic in getPrevDouble + destination.setSize(rowSequence.intSize()); + destination.asWritableDoubleChunk().fillWithValue(0, rowSequence.intSize(), value); + } + + @Override + public void fillChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableDoubleChunk destChunk = dest.asWritableDoubleChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_DOUBLE : current); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + double value = getPrevDouble(0); // avoid duplicating the current vs prev logic in getPrevDouble + final WritableDoubleChunk destChunk = dest.asWritableDoubleChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_DOUBLE : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/FillUnordered.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/FillUnordered.java index d40eff84d42..f12a524e71b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/FillUnordered.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/FillUnordered.java @@ -14,7 +14,16 @@ public interface FillUnordered { /** * Populates a contiguous portion of the given destination chunk with data corresponding to the keys from the given * {@link LongChunk}. + *

+ * It behaves as if the following code were executed: * + *

+     * destination.setSize(keys.size());
+     * for (int ii = 0; ii < keys.size(); ++ii) {
+     *     destination.set(ii, get(keys.get(ii)));
+     * }
+     * 
+ * * @param context A context containing all mutable/state related data used in retrieving the Chunk. * @param dest The chunk to be populated according to {@code keys} * @param keys A chunk of individual, not assumed to be ordered keys to be fetched @@ -27,7 +36,16 @@ void fillChunkUnordered( /** * Populates a contiguous portion of the given destination chunk with prev data corresponding to the keys from the * given {@link LongChunk}. + *

+ * It behaves as if the following code were executed: * + *

+     * destination.setSize(keys.size());
+     * for (int ii = 0; ii < keys.size(); ++ii) {
+     *     destination.set(ii, getPrev(keys.get(ii)));
+     * }
+     * 
+ * * @param context A context containing all mutable/state related data used in retrieving the Chunk. * @param dest The chunk to be populated according to {@code keys} * @param keys A chunk of individual, not assumed to be ordered keys to be fetched diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/FloatSingleValueSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/FloatSingleValueSource.java index 7c9aac9336a..aeb0d13293d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/FloatSingleValueSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/FloatSingleValueSource.java @@ -8,12 +8,12 @@ */ package io.deephaven.engine.table.impl.sources; +import io.deephaven.chunk.WritableFloatChunk; +import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.rowset.chunkattributes.RowKeys; -import io.deephaven.util.QueryConstants; import io.deephaven.chunk.FloatChunk; import io.deephaven.chunk.Chunk; import io.deephaven.chunk.LongChunk; @@ -89,11 +89,17 @@ public final void setNull(long key) { @Override public final float getFloat(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_FLOAT; + } return current; } @Override public final float getPrevFloat(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_FLOAT; + } if (!isTrackingPrevValues || changeTime < LogicalClock.DEFAULT.currentStep()) { return current; } @@ -119,4 +125,45 @@ public void fillFromChunkUnordered(@NotNull FillFromContext context, @NotNull Ch final FloatChunk chunk = src.asFloatChunk(); set(chunk.get(0)); } + + @Override + public void fillChunk(@NotNull FillContext context, @NotNull WritableChunk destination, + @NotNull RowSequence rowSequence) { + destination.setSize(rowSequence.intSize()); + destination.asWritableFloatChunk().fillWithValue(0, rowSequence.intSize(), current); + } + + @Override + public void fillPrevChunk(@NotNull FillContext context, + @NotNull WritableChunk destination, @NotNull RowSequence rowSequence) { + float value = getPrevFloat(0); // avoid duplicating the current vs prev logic in getPrevFloat + destination.setSize(rowSequence.intSize()); + destination.asWritableFloatChunk().fillWithValue(0, rowSequence.intSize(), value); + } + + @Override + public void fillChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableFloatChunk destChunk = dest.asWritableFloatChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_FLOAT : current); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + float value = getPrevFloat(0); // avoid duplicating the current vs prev logic in getPrevFloat + final WritableFloatChunk destChunk = dest.asWritableFloatChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_FLOAT : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/IntegerSingleValueSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/IntegerSingleValueSource.java index 6a1b5cbe69b..ab48582e797 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/IntegerSingleValueSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/IntegerSingleValueSource.java @@ -8,12 +8,12 @@ */ package io.deephaven.engine.table.impl.sources; +import io.deephaven.chunk.WritableIntChunk; +import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.rowset.chunkattributes.RowKeys; -import io.deephaven.util.QueryConstants; import io.deephaven.chunk.IntChunk; import io.deephaven.chunk.Chunk; import io.deephaven.chunk.LongChunk; @@ -89,11 +89,17 @@ public final void setNull(long key) { @Override public final int getInt(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_INT; + } return current; } @Override public final int getPrevInt(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_INT; + } if (!isTrackingPrevValues || changeTime < LogicalClock.DEFAULT.currentStep()) { return current; } @@ -119,4 +125,45 @@ public void fillFromChunkUnordered(@NotNull FillFromContext context, @NotNull Ch final IntChunk chunk = src.asIntChunk(); set(chunk.get(0)); } + + @Override + public void fillChunk(@NotNull FillContext context, @NotNull WritableChunk destination, + @NotNull RowSequence rowSequence) { + destination.setSize(rowSequence.intSize()); + destination.asWritableIntChunk().fillWithValue(0, rowSequence.intSize(), current); + } + + @Override + public void fillPrevChunk(@NotNull FillContext context, + @NotNull WritableChunk destination, @NotNull RowSequence rowSequence) { + int value = getPrevInt(0); // avoid duplicating the current vs prev logic in getPrevInt + destination.setSize(rowSequence.intSize()); + destination.asWritableIntChunk().fillWithValue(0, rowSequence.intSize(), value); + } + + @Override + public void fillChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableIntChunk destChunk = dest.asWritableIntChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_INT : current); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + int value = getPrevInt(0); // avoid duplicating the current vs prev logic in getPrevInt + final WritableIntChunk destChunk = dest.asWritableIntChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_INT : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/LongSingleValueSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/LongSingleValueSource.java index 7891841cda1..57907e9b332 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/LongSingleValueSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/LongSingleValueSource.java @@ -8,12 +8,12 @@ */ package io.deephaven.engine.table.impl.sources; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.rowset.chunkattributes.RowKeys; -import io.deephaven.util.QueryConstants; import io.deephaven.chunk.LongChunk; import io.deephaven.chunk.Chunk; import io.deephaven.chunk.LongChunk; @@ -89,11 +89,17 @@ public final void setNull(long key) { @Override public final long getLong(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_LONG; + } return current; } @Override public final long getPrevLong(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_LONG; + } if (!isTrackingPrevValues || changeTime < LogicalClock.DEFAULT.currentStep()) { return current; } @@ -119,4 +125,45 @@ public void fillFromChunkUnordered(@NotNull FillFromContext context, @NotNull Ch final LongChunk chunk = src.asLongChunk(); set(chunk.get(0)); } + + @Override + public void fillChunk(@NotNull FillContext context, @NotNull WritableChunk destination, + @NotNull RowSequence rowSequence) { + destination.setSize(rowSequence.intSize()); + destination.asWritableLongChunk().fillWithValue(0, rowSequence.intSize(), current); + } + + @Override + public void fillPrevChunk(@NotNull FillContext context, + @NotNull WritableChunk destination, @NotNull RowSequence rowSequence) { + long value = getPrevLong(0); // avoid duplicating the current vs prev logic in getPrevLong + destination.setSize(rowSequence.intSize()); + destination.asWritableLongChunk().fillWithValue(0, rowSequence.intSize(), value); + } + + @Override + public void fillChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableLongChunk destChunk = dest.asWritableLongChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_LONG : current); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + long value = getPrevLong(0); // avoid duplicating the current vs prev logic in getPrevLong + final WritableLongChunk destChunk = dest.asWritableLongChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_LONG : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/NullValueColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/NullValueColumnSource.java index e69eb0bb64d..da10ae46990 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/NullValueColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/NullValueColumnSource.java @@ -5,7 +5,9 @@ import io.deephaven.base.Pair; import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.LongChunk; import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.TableDefinition; @@ -26,7 +28,7 @@ * A column source that returns null for all keys. */ public class NullValueColumnSource extends AbstractColumnSource - implements ShiftData.ShiftCallback, InMemoryColumnSource { + implements ShiftData.ShiftCallback, InMemoryColumnSource, RowKeyAgnosticChunkSource { private static final KeyedObjectKey.Basic, Class>, NullValueColumnSource> KEY_TYPE = new KeyedObjectKey.Basic<>() { @Override @@ -192,4 +194,23 @@ public void fillPrevChunk(@NotNull FillContext context, @NotNull WritableChunk destination, @NotNull RowSequence rowSequence) { fillChunk(context, destination, rowSequence); } + + @Override + public void fillChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk destination, + @NotNull LongChunk keys) { + // note that we do not need to look for RowSequence.NULL_ROW_KEY; all values are null + destination.setSize(keys.size()); + destination.fillWithNullValue(0, keys.size()); + } + + @Override + public void fillPrevChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk destination, + @NotNull LongChunk keys) { + fillChunkUnordered(context, destination, keys); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ObjectSingleValueSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ObjectSingleValueSource.java index e06b0f04b5a..fd90fceb1a3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ObjectSingleValueSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ObjectSingleValueSource.java @@ -8,12 +8,12 @@ */ package io.deephaven.engine.table.impl.sources; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.rowset.chunkattributes.RowKeys; -import io.deephaven.util.QueryConstants; import io.deephaven.chunk.ObjectChunk; import io.deephaven.chunk.Chunk; import io.deephaven.chunk.LongChunk; @@ -77,11 +77,17 @@ public final void setNull(long key) { @Override public final T get(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return null; + } return current; } @Override public final T getPrev(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return null; + } if (!isTrackingPrevValues || changeTime < LogicalClock.DEFAULT.currentStep()) { return current; } @@ -107,4 +113,45 @@ public void fillFromChunkUnordered(@NotNull FillFromContext context, @NotNull Ch final ObjectChunk chunk = src.asObjectChunk(); set(chunk.get(0)); } + + @Override + public void fillChunk(@NotNull FillContext context, @NotNull WritableChunk destination, + @NotNull RowSequence rowSequence) { + destination.setSize(rowSequence.intSize()); + destination.asWritableObjectChunk().fillWithValue(0, rowSequence.intSize(), current); + } + + @Override + public void fillPrevChunk(@NotNull FillContext context, + @NotNull WritableChunk destination, @NotNull RowSequence rowSequence) { + T value = getPrev(0); // avoid duplicating the current vs prev logic in getPrev + destination.setSize(rowSequence.intSize()); + destination.asWritableObjectChunk().fillWithValue(0, rowSequence.intSize(), value); + } + + @Override + public void fillChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableObjectChunk destChunk = dest.asWritableObjectChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? null : current); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + T value = getPrev(0); // avoid duplicating the current vs prev logic in getPrev + final WritableObjectChunk destChunk = dest.asWritableObjectChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? null : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RedirectedColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RedirectedColumnSource.java index e10b4b7c1a8..1454a3a98ac 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RedirectedColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RedirectedColumnSource.java @@ -35,11 +35,41 @@ */ public class RedirectedColumnSource extends AbstractDeferredGroupingColumnSource implements UngroupableColumnSource { + /** + * Redirect the innerSource if it is not agnostic to redirection. Otherwise, return the innerSource. + * + * @param rowRedirection The row redirection to use + * @param innerSource The column source to redirect + */ + public static ColumnSource maybeRedirect( + @NotNull final RowRedirection rowRedirection, + @NotNull final ColumnSource innerSource) { + if (innerSource instanceof RowKeyAgnosticChunkSource) { + return innerSource; + } + return new RedirectedColumnSource<>(rowRedirection, innerSource); + } + + /** + * This factory method should be used when unmapped rows in the row redirection must be redirected to null values. + * For example, natural joins, left outer joins, and as-of joins must map unmatched rows to null values in + * right-side columns. + * + * @param rowRedirection The row redirection to use + * @param innerSource The column source to redirect + */ + public static ColumnSource alwaysRedirect( + @NotNull final RowRedirection rowRedirection, + @NotNull final ColumnSource innerSource) { + return new RedirectedColumnSource<>(rowRedirection, innerSource); + } + protected final RowRedirection rowRedirection; protected final ColumnSource innerSource; private final boolean ascendingMapping; - public RedirectedColumnSource(@NotNull final RowRedirection rowRedirection, + protected RedirectedColumnSource( + @NotNull final RowRedirection rowRedirection, @NotNull final ColumnSource innerSource) { super(innerSource.getType()); this.rowRedirection = rowRedirection; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowKeyAgnosticChunkSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowKeyAgnosticChunkSource.java new file mode 100644 index 00000000000..edfd6d0ca2a --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowKeyAgnosticChunkSource.java @@ -0,0 +1,14 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.engine.table.impl.sources; + +import io.deephaven.chunk.attributes.Any; + +/** + * This is a marker interface for chunk sources that are agnostic of the row key when evaluating the value for a given + * row key. + */ +public interface RowKeyAgnosticChunkSource extends FillUnordered { + +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ShortSingleValueSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ShortSingleValueSource.java index ae99200ca48..d30619ecda4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ShortSingleValueSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ShortSingleValueSource.java @@ -8,12 +8,12 @@ */ package io.deephaven.engine.table.impl.sources; +import io.deephaven.chunk.WritableShortChunk; +import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.rowset.chunkattributes.RowKeys; -import io.deephaven.util.QueryConstants; import io.deephaven.chunk.ShortChunk; import io.deephaven.chunk.Chunk; import io.deephaven.chunk.LongChunk; @@ -89,11 +89,17 @@ public final void setNull(long key) { @Override public final short getShort(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_SHORT; + } return current; } @Override public final short getPrevShort(long rowKey) { + if (rowKey == RowSequence.NULL_ROW_KEY) { + return NULL_SHORT; + } if (!isTrackingPrevValues || changeTime < LogicalClock.DEFAULT.currentStep()) { return current; } @@ -119,4 +125,45 @@ public void fillFromChunkUnordered(@NotNull FillFromContext context, @NotNull Ch final ShortChunk chunk = src.asShortChunk(); set(chunk.get(0)); } + + @Override + public void fillChunk(@NotNull FillContext context, @NotNull WritableChunk destination, + @NotNull RowSequence rowSequence) { + destination.setSize(rowSequence.intSize()); + destination.asWritableShortChunk().fillWithValue(0, rowSequence.intSize(), current); + } + + @Override + public void fillPrevChunk(@NotNull FillContext context, + @NotNull WritableChunk destination, @NotNull RowSequence rowSequence) { + short value = getPrevShort(0); // avoid duplicating the current vs prev logic in getPrevShort + destination.setSize(rowSequence.intSize()); + destination.asWritableShortChunk().fillWithValue(0, rowSequence.intSize(), value); + } + + @Override + public void fillChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableShortChunk destChunk = dest.asWritableShortChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_SHORT : current); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered(@NotNull FillContext context, @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + short value = getPrevShort(0); // avoid duplicating the current vs prev logic in getPrevShort + final WritableShortChunk destChunk = dest.asWritableShortChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_SHORT : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/SingleValueColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/SingleValueColumnSource.java index 8be483202ce..599b3f830d1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/SingleValueColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/SingleValueColumnSource.java @@ -4,13 +4,15 @@ package io.deephaven.engine.table.impl.sources; import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.rowset.RowSequence; import io.deephaven.engine.table.ChunkSink; import io.deephaven.engine.table.WritableColumnSource; import io.deephaven.engine.table.impl.AbstractColumnSource; import io.deephaven.engine.table.impl.util.ShiftData; public abstract class SingleValueColumnSource extends AbstractColumnSource - implements WritableColumnSource, ChunkSink, ShiftData.ShiftCallback, InMemoryColumnSource { + implements WritableColumnSource, ChunkSink, ShiftData.ShiftCallback, InMemoryColumnSource, + RowKeyAgnosticChunkSource { protected transient long changeTime; protected boolean isTrackingPrevValues; @@ -43,6 +45,8 @@ public static SingleValueColumnSource getSingleValueColumnSource(Class result = new LongSingleValueSource(); } else if (type == Short.class || type == short.class) { result = new ShortSingleValueSource(); + } else if (type == Boolean.class || type == boolean.class) { + result = new BooleanSingleValueSource(); } else { result = new ObjectSingleValueSource<>(type); } @@ -91,6 +95,13 @@ public void setNull() { throw new UnsupportedOperationException(); } + @Override + public final void setNull(RowSequence orderedKeys) { + if (!orderedKeys.isEmpty()) { + setNull(); + } + } + @Override public final void ensureCapacity(long capacity, boolean nullFilled) { // Do nothing diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/WritableRedirectedColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/WritableRedirectedColumnSource.java index edee7e4f34a..df90ac9deec 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/WritableRedirectedColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/WritableRedirectedColumnSource.java @@ -19,7 +19,25 @@ * {@link RowRedirection}. */ public class WritableRedirectedColumnSource extends RedirectedColumnSource implements WritableColumnSource { - private long maxInnerIndex; + /** + * Redirect the innerSource if it is not agnostic to redirection. Otherwise, return the innerSource. + * + * @param rowRedirection The row redirection to use + * @param innerSource The column source to redirect + * @param maxInnerIndex The maximum row key available in innerSource + */ + public static WritableColumnSource maybeRedirect( + @NotNull final RowRedirection rowRedirection, + @NotNull final WritableColumnSource innerSource, + final long maxInnerIndex) { + if (innerSource instanceof RowKeyAgnosticChunkSource) { + return innerSource; + } + return new WritableRedirectedColumnSource<>(rowRedirection, innerSource, maxInnerIndex); + } + + /** The maximum row key available in innerSource. */ + private final long maxInnerIndex; /** * Create a type-appropriate WritableRedirectedColumnSource for the supplied {@link WritableRowRedirection} and @@ -29,7 +47,8 @@ public class WritableRedirectedColumnSource extends RedirectedColumnSource * @param innerSource The column source to redirect * @param maxInnerIndex The maximum row key available in innerSource */ - public WritableRedirectedColumnSource(@NotNull final RowRedirection rowRedirection, + protected WritableRedirectedColumnSource( + @NotNull final RowRedirection rowRedirection, @NotNull final ColumnSource innerSource, final long maxInnerIndex) { super(rowRedirection, innerSource); @@ -140,7 +159,7 @@ public boolean allowsReinterpret(@NotNull Class ColumnSource doReinterpret( @NotNull Class alternateDataType) { - return new WritableRedirectedColumnSource<>( - rowRedirection, innerSource.reinterpret(alternateDataType), maxInnerIndex); + return WritableRedirectedColumnSource.maybeRedirect(rowRedirection, + (WritableColumnSource) innerSource.reinterpret(alternateDataType), maxInnerIndex); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantByteSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantByteSource.java index 05a54abd5fa..5fb13e6f403 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantByteSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantByteSource.java @@ -1,3 +1,6 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ /* * --------------------------------------------------------------------------------------------------------------------- * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit ImmutableConstantCharSource and regenerate @@ -7,9 +10,12 @@ import io.deephaven.engine.table.ColumnSource; +import io.deephaven.chunk.LongChunk; +import io.deephaven.chunk.WritableByteChunk; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; import io.deephaven.engine.table.impl.AbstractColumnSource; import io.deephaven.engine.table.impl.ImmutableColumnSourceGetDefaults; import io.deephaven.engine.table.impl.sources.*; @@ -27,7 +33,8 @@ */ public class ImmutableConstantByteSource extends AbstractColumnSource - implements ImmutableColumnSourceGetDefaults.ForByte, InMemoryColumnSource, ShiftData.ShiftCallback { + implements ImmutableColumnSourceGetDefaults.ForByte, ShiftData.ShiftCallback, InMemoryColumnSource, + RowKeyAgnosticChunkSource { private final byte value; @@ -80,4 +87,29 @@ protected ColumnSource doReinterpret( return (ColumnSource) new ByteAsBooleanColumnSource(this); } // endregion reinterpret + + @Override + public void fillChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableByteChunk destChunk = dest.asWritableByteChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_BYTE : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + fillChunkUnordered(context , dest, keys); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantCharSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantCharSource.java index c39a3d1f530..dee2302e1c8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantCharSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantCharSource.java @@ -1,8 +1,14 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ package io.deephaven.engine.table.impl.sources.immutable; +import io.deephaven.chunk.LongChunk; +import io.deephaven.chunk.WritableCharChunk; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; import io.deephaven.engine.table.impl.AbstractColumnSource; import io.deephaven.engine.table.impl.ImmutableColumnSourceGetDefaults; import io.deephaven.engine.table.impl.sources.*; @@ -20,7 +26,8 @@ */ public class ImmutableConstantCharSource extends AbstractColumnSource - implements ImmutableColumnSourceGetDefaults.ForChar, InMemoryColumnSource, ShiftData.ShiftCallback { + implements ImmutableColumnSourceGetDefaults.ForChar, ShiftData.ShiftCallback, InMemoryColumnSource, + RowKeyAgnosticChunkSource { private final char value; @@ -62,4 +69,29 @@ public final void shift(final long start, final long end, final long offset) {} // region reinterpret // endregion reinterpret + + @Override + public void fillChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableCharChunk destChunk = dest.asWritableCharChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_CHAR : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + fillChunkUnordered(context , dest, keys); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantDoubleSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantDoubleSource.java index 54c21ecd4c3..c8192949f45 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantDoubleSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantDoubleSource.java @@ -1,3 +1,6 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ /* * --------------------------------------------------------------------------------------------------------------------- * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit ImmutableConstantCharSource and regenerate @@ -5,9 +8,12 @@ */ package io.deephaven.engine.table.impl.sources.immutable; +import io.deephaven.chunk.LongChunk; +import io.deephaven.chunk.WritableDoubleChunk; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; import io.deephaven.engine.table.impl.AbstractColumnSource; import io.deephaven.engine.table.impl.ImmutableColumnSourceGetDefaults; import io.deephaven.engine.table.impl.sources.*; @@ -25,7 +31,8 @@ */ public class ImmutableConstantDoubleSource extends AbstractColumnSource - implements ImmutableColumnSourceGetDefaults.ForDouble, InMemoryColumnSource, ShiftData.ShiftCallback { + implements ImmutableColumnSourceGetDefaults.ForDouble, ShiftData.ShiftCallback, InMemoryColumnSource, + RowKeyAgnosticChunkSource { private final double value; @@ -67,4 +74,29 @@ public final void shift(final long start, final long end, final long offset) {} // region reinterpret // endregion reinterpret + + @Override + public void fillChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableDoubleChunk destChunk = dest.asWritableDoubleChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_DOUBLE : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + fillChunkUnordered(context , dest, keys); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantFloatSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantFloatSource.java index f09cfe25540..67e063c645a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantFloatSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantFloatSource.java @@ -1,3 +1,6 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ /* * --------------------------------------------------------------------------------------------------------------------- * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit ImmutableConstantCharSource and regenerate @@ -5,9 +8,12 @@ */ package io.deephaven.engine.table.impl.sources.immutable; +import io.deephaven.chunk.LongChunk; +import io.deephaven.chunk.WritableFloatChunk; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; import io.deephaven.engine.table.impl.AbstractColumnSource; import io.deephaven.engine.table.impl.ImmutableColumnSourceGetDefaults; import io.deephaven.engine.table.impl.sources.*; @@ -25,7 +31,8 @@ */ public class ImmutableConstantFloatSource extends AbstractColumnSource - implements ImmutableColumnSourceGetDefaults.ForFloat, InMemoryColumnSource, ShiftData.ShiftCallback { + implements ImmutableColumnSourceGetDefaults.ForFloat, ShiftData.ShiftCallback, InMemoryColumnSource, + RowKeyAgnosticChunkSource { private final float value; @@ -67,4 +74,29 @@ public final void shift(final long start, final long end, final long offset) {} // region reinterpret // endregion reinterpret + + @Override + public void fillChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableFloatChunk destChunk = dest.asWritableFloatChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_FLOAT : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + fillChunkUnordered(context , dest, keys); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantIntSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantIntSource.java index 3ed6604a411..8c9d8826f8a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantIntSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantIntSource.java @@ -1,3 +1,6 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ /* * --------------------------------------------------------------------------------------------------------------------- * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit ImmutableConstantCharSource and regenerate @@ -5,9 +8,12 @@ */ package io.deephaven.engine.table.impl.sources.immutable; +import io.deephaven.chunk.LongChunk; +import io.deephaven.chunk.WritableIntChunk; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; import io.deephaven.engine.table.impl.AbstractColumnSource; import io.deephaven.engine.table.impl.ImmutableColumnSourceGetDefaults; import io.deephaven.engine.table.impl.sources.*; @@ -25,7 +31,8 @@ */ public class ImmutableConstantIntSource extends AbstractColumnSource - implements ImmutableColumnSourceGetDefaults.ForInt, InMemoryColumnSource, ShiftData.ShiftCallback { + implements ImmutableColumnSourceGetDefaults.ForInt, ShiftData.ShiftCallback, InMemoryColumnSource, + RowKeyAgnosticChunkSource { private final int value; @@ -67,4 +74,29 @@ public final void shift(final long start, final long end, final long offset) {} // region reinterpret // endregion reinterpret + + @Override + public void fillChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableIntChunk destChunk = dest.asWritableIntChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_INT : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + fillChunkUnordered(context , dest, keys); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantLongSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantLongSource.java index fc269dd4014..361c45c9805 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantLongSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantLongSource.java @@ -1,3 +1,6 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ /* * --------------------------------------------------------------------------------------------------------------------- * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit ImmutableConstantCharSource and regenerate @@ -9,9 +12,12 @@ import io.deephaven.time.DateTime; +import io.deephaven.chunk.LongChunk; +import io.deephaven.chunk.WritableLongChunk; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; import io.deephaven.engine.table.impl.AbstractColumnSource; import io.deephaven.engine.table.impl.ImmutableColumnSourceGetDefaults; import io.deephaven.engine.table.impl.sources.*; @@ -29,7 +35,8 @@ */ public class ImmutableConstantLongSource extends AbstractColumnSource - implements ImmutableColumnSourceGetDefaults.ForLong, InMemoryColumnSource, ShiftData.ShiftCallback { + implements ImmutableColumnSourceGetDefaults.ForLong, ShiftData.ShiftCallback, InMemoryColumnSource, + RowKeyAgnosticChunkSource { private final long value; @@ -82,4 +89,29 @@ protected ColumnSource doReinterpret( return (ColumnSource) new LongAsDateTimeColumnSource(this); } // endregion reinterpret + + @Override + public void fillChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableLongChunk destChunk = dest.asWritableLongChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_LONG : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + fillChunkUnordered(context , dest, keys); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantObjectSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantObjectSource.java index d44b6df4e88..872874053b8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantObjectSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantObjectSource.java @@ -1,3 +1,6 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ /* * --------------------------------------------------------------------------------------------------------------------- * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit ImmutableConstantCharSource and regenerate @@ -5,9 +8,12 @@ */ package io.deephaven.engine.table.impl.sources.immutable; +import io.deephaven.chunk.LongChunk; +import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; import io.deephaven.engine.table.impl.AbstractColumnSource; import io.deephaven.engine.table.impl.ImmutableColumnSourceGetDefaults; import io.deephaven.engine.table.impl.sources.*; @@ -24,7 +30,8 @@ */ public class ImmutableConstantObjectSource extends AbstractColumnSource - implements ImmutableColumnSourceGetDefaults.ForObject, InMemoryColumnSource, ShiftData.ShiftCallback { + implements ImmutableColumnSourceGetDefaults.ForObject, ShiftData.ShiftCallback, InMemoryColumnSource, + RowKeyAgnosticChunkSource { private final T value; @@ -66,4 +73,29 @@ public final void shift(final long start, final long end, final long offset) {} // region reinterpret // endregion reinterpret + + @Override + public void fillChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableObjectChunk destChunk = dest.asWritableObjectChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? null : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + fillChunkUnordered(context , dest, keys); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantShortSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantShortSource.java index cdab3b09a31..e0a9f3c5317 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantShortSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/immutable/ImmutableConstantShortSource.java @@ -1,3 +1,6 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ /* * --------------------------------------------------------------------------------------------------------------------- * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit ImmutableConstantCharSource and regenerate @@ -5,9 +8,12 @@ */ package io.deephaven.engine.table.impl.sources.immutable; +import io.deephaven.chunk.LongChunk; +import io.deephaven.chunk.WritableShortChunk; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; import io.deephaven.engine.table.impl.AbstractColumnSource; import io.deephaven.engine.table.impl.ImmutableColumnSourceGetDefaults; import io.deephaven.engine.table.impl.sources.*; @@ -25,7 +31,8 @@ */ public class ImmutableConstantShortSource extends AbstractColumnSource - implements ImmutableColumnSourceGetDefaults.ForShort, InMemoryColumnSource, ShiftData.ShiftCallback { + implements ImmutableColumnSourceGetDefaults.ForShort, ShiftData.ShiftCallback, InMemoryColumnSource, + RowKeyAgnosticChunkSource { private final short value; @@ -67,4 +74,29 @@ public final void shift(final long start, final long end, final long offset) {} // region reinterpret // endregion reinterpret + + @Override + public void fillChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + final WritableShortChunk destChunk = dest.asWritableShortChunk(); + for (int ii = 0; ii < keys.size(); ++ii) { + destChunk.set(ii, keys.get(ii) == RowSequence.NULL_ROW_KEY ? NULL_SHORT : value); + } + destChunk.setSize(keys.size()); + } + + @Override + public void fillPrevChunkUnordered( + @NotNull FillContext context, + @NotNull WritableChunk dest, + @NotNull LongChunk keys) { + fillChunkUnordered(context , dest, keys); + } + + @Override + public boolean providesFillUnordered() { + return true; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseByteUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseByteUpdateByOperator.java index 1fd0c385d46..24432a06d53 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseByteUpdateByOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseByteUpdateByOperator.java @@ -101,7 +101,7 @@ public BaseByteUpdateByOperator(@NotNull final MatchPair pair, // region create-dense this.maybeInnerSource = makeDenseSource(); // endregion create-dense - this.outputSource = new WritableRedirectedColumnSource(rowRedirection, maybeInnerSource, 0); + this.outputSource = WritableRedirectedColumnSource.maybeRedirect(rowRedirection, maybeInnerSource, 0); } else { this.maybeInnerSource = null; // region create-sparse diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseCharUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseCharUpdateByOperator.java index 0e21db78eed..adcf7d101c7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseCharUpdateByOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseCharUpdateByOperator.java @@ -95,7 +95,7 @@ public BaseCharUpdateByOperator(@NotNull final MatchPair pair, // region create-dense this.maybeInnerSource = new CharacterArraySource(); // endregion create-dense - this.outputSource = new WritableRedirectedColumnSource(rowRedirection, maybeInnerSource, 0); + this.outputSource = WritableRedirectedColumnSource.maybeRedirect(rowRedirection, maybeInnerSource, 0); } else { this.maybeInnerSource = null; // region create-sparse diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseDoubleUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseDoubleUpdateByOperator.java index 354860af0ee..c3cbd33c0a9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseDoubleUpdateByOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseDoubleUpdateByOperator.java @@ -98,7 +98,7 @@ public BaseDoubleUpdateByOperator(@NotNull final MatchPair pair, this.isRedirected = rowRedirection != null; if(rowRedirection != null) { this.maybeInnerSource = new DoubleArraySource(); - this.outputSource = new WritableRedirectedColumnSource(rowRedirection, maybeInnerSource, 0); + this.outputSource = WritableRedirectedColumnSource.maybeRedirect(rowRedirection, maybeInnerSource, 0); } else { this.maybeInnerSource = null; this.outputSource = new DoubleSparseArraySource(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseFloatUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseFloatUpdateByOperator.java index 60ea52d2548..613a699f132 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseFloatUpdateByOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseFloatUpdateByOperator.java @@ -93,7 +93,7 @@ public BaseFloatUpdateByOperator(@NotNull final MatchPair pair, this.isRedirected = rowRedirection != null; if(rowRedirection != null) { this.maybeInnerSource = new FloatArraySource(); - this.outputSource = new WritableRedirectedColumnSource(rowRedirection, maybeInnerSource, 0); + this.outputSource = WritableRedirectedColumnSource.maybeRedirect(rowRedirection, maybeInnerSource, 0); } else { this.maybeInnerSource = null; this.outputSource = new FloatSparseArraySource(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseIntUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseIntUpdateByOperator.java index 49871e2dad3..8adf21343bc 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseIntUpdateByOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseIntUpdateByOperator.java @@ -100,7 +100,7 @@ public BaseIntUpdateByOperator(@NotNull final MatchPair pair, // region create-dense this.maybeInnerSource = new IntegerArraySource(); // endregion create-dense - this.outputSource = new WritableRedirectedColumnSource(rowRedirection, maybeInnerSource, 0); + this.outputSource = WritableRedirectedColumnSource.maybeRedirect(rowRedirection, maybeInnerSource, 0); } else { this.maybeInnerSource = null; // region create-sparse diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseLongUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseLongUpdateByOperator.java index cad71cddca9..623359b1e34 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseLongUpdateByOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseLongUpdateByOperator.java @@ -100,7 +100,7 @@ public BaseLongUpdateByOperator(@NotNull final MatchPair pair, // region create-dense this.maybeInnerSource = new LongArraySource(); // endregion create-dense - this.outputSource = new WritableRedirectedColumnSource(rowRedirection, maybeInnerSource, 0); + this.outputSource = WritableRedirectedColumnSource.maybeRedirect(rowRedirection, maybeInnerSource, 0); } else { this.maybeInnerSource = null; // region create-sparse diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectUpdateByOperator.java index 477c356fe30..a3bc08f8c3b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectUpdateByOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectUpdateByOperator.java @@ -102,7 +102,7 @@ public BaseObjectUpdateByOperator(@NotNull final MatchPair pair, // region create-dense this.maybeInnerSource = new ObjectArraySource(colType); // endregion create-dense - this.outputSource = new WritableRedirectedColumnSource(rowRedirection, maybeInnerSource, 0); + this.outputSource = WritableRedirectedColumnSource.maybeRedirect(rowRedirection, maybeInnerSource, 0); } else { this.maybeInnerSource = null; // region create-sparse diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseShortUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseShortUpdateByOperator.java index 073cf9360eb..158e19f4ecf 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseShortUpdateByOperator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseShortUpdateByOperator.java @@ -100,7 +100,7 @@ public BaseShortUpdateByOperator(@NotNull final MatchPair pair, // region create-dense this.maybeInnerSource = new ShortArraySource(); // endregion create-dense - this.outputSource = new WritableRedirectedColumnSource(rowRedirection, maybeInnerSource, 0); + this.outputSource = WritableRedirectedColumnSource.maybeRedirect(rowRedirection, maybeInnerSource, 0); } else { this.maybeInnerSource = null; // region create-sparse diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ColumnsToRowsTransform.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ColumnsToRowsTransform.java index b8aab4bd16c..4089e955bbd 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ColumnsToRowsTransform.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/ColumnsToRowsTransform.java @@ -179,7 +179,7 @@ public static Table columnsToRows(final Table source, final String labelColumn, } expandSet.add(name); if (crossJoinShiftState != null) { - resultMap.put(name, new BitShiftingColumnSource<>(crossJoinShiftState, cs)); + resultMap.put(name, BitShiftingColumnSource.maybeWrap(crossJoinShiftState, cs)); } else { resultMap.put(name, cs); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableNaturalJoinTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableNaturalJoinTest.java index e5496b1fe67..3e59a74c830 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableNaturalJoinTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableNaturalJoinTest.java @@ -4,13 +4,16 @@ package io.deephaven.engine.table.impl; import io.deephaven.base.FileUtils; +import io.deephaven.chunk.ObjectChunk; import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetBuilderSequential; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.TrackingRowSet; +import io.deephaven.engine.table.ChunkSource; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.DataColumn; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.indexer.RowSetIndexer; @@ -1576,6 +1579,25 @@ public void testDHC3202_v2() { } } + public void testGetDirectAfterNaturalJoin() { + final Table sodiumLeft = emptyTable(3).updateView("Value=(i%5==0? null : i*2)", "ColLeft=`LeftOnlyContents`"); + final Table peppermintRight = + emptyTable(4).updateView("Value=(i%5==0? null : i)", "ColRight=`RightOnlyContents`"); + final Table vanillaVanilla = sodiumLeft.naturalJoin(peppermintRight, "Value"); + final String rightValue = "RightOnlyContents"; + + final ColumnSource colRightSource = vanillaVanilla.getColumnSource("ColRight"); + try (final ChunkSource.GetContext gc = colRightSource.makeGetContext(3)) { + final ObjectChunk ck = colRightSource.getChunk(gc, vanillaVanilla.getRowSet()).asObjectChunk(); + assertEquals(rightValue, ck.get(0)); + assertEquals(rightValue, ck.get(1)); + assertNull(ck.get(2)); + } + final DataColumn colRight = vanillaVanilla.getColumn("ColRight"); + assertEquals(rightValue, colRight.get(0)); + assertEquals(rightValue, colRight.get(1)); + assertNull(colRight.get(2)); + } private void diskBackedTestHarness(BiConsumer testFunction) throws IOException { final File leftDirectory = Files.createTempDirectory("QueryTableJoinTest-Left").toFile(); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/StreamTableAggregationTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/StreamTableAggregationTest.java index 94ca3417c60..6600b19d6bb 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/StreamTableAggregationTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/StreamTableAggregationTest.java @@ -80,7 +80,7 @@ private void doOperatorTest(@NotNull final UnaryOperator operator, final new WrappedRowSetWritableRowRedirection(streamInternalRowSet); streamSources = source.getColumnSourceMap().entrySet().stream().collect(Collectors.toMap( Map.Entry::getKey, - (entry -> new RedirectedColumnSource<>(streamRedirections, entry.getValue())), + (entry -> RedirectedColumnSource.maybeRedirect(streamRedirections, entry.getValue())), Assert::neverInvoked, LinkedHashMap::new)); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/StreamTableOperationsTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/StreamTableOperationsTest.java index 1f049210332..8cf1723cb5e 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/StreamTableOperationsTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/StreamTableOperationsTest.java @@ -79,7 +79,7 @@ private void doOperatorTest(@NotNull final UnaryOperator
operator, final new WrappedRowSetWritableRowRedirection(streamInternalRowSet); streamSources = source.getColumnSourceMap().entrySet().stream().collect(Collectors.toMap( Map.Entry::getKey, - (entry -> new RedirectedColumnSource<>(streamRedirections, entry.getValue())), + (entry -> RedirectedColumnSource.maybeRedirect(streamRedirections, entry.getValue())), Assert::neverInvoked, LinkedHashMap::new)); } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java index 3a9d673c3ec..00d50e978a5 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java @@ -428,7 +428,7 @@ protected static LinkedHashMap> makeColumns( writableSources[ii] = ArrayBackedColumnSource.getMemoryColumnSource( 0, column.getDataType(), column.getComponentType()); finalColumns.put(column.getName(), - new WritableRedirectedColumnSource<>(emptyRowRedirection, writableSources[ii], 0)); + WritableRedirectedColumnSource.maybeRedirect(emptyRowRedirection, writableSources[ii], 0)); } return finalColumns; } diff --git a/replication/static/src/main/java/io/deephaven/replicators/ReplicateSourcesAndChunks.java b/replication/static/src/main/java/io/deephaven/replicators/ReplicateSourcesAndChunks.java index 2a2ca06acf1..76086c98f8e 100644 --- a/replication/static/src/main/java/io/deephaven/replicators/ReplicateSourcesAndChunks.java +++ b/replication/static/src/main/java/io/deephaven/replicators/ReplicateSourcesAndChunks.java @@ -139,6 +139,7 @@ private static void replicateSingleValues() throws IOException { charToAllButBoolean( "engine/table/src/main/java/io/deephaven/engine/table/impl/sources/CharacterSingleValueSource.java"); replicateObjectSingleValue(); + replicateBooleanSingleValue(); } private static void replicateObjectSingleValue() throws IOException { @@ -160,11 +161,16 @@ private static void replicateObjectSingleValue() throws IOException { "Object current", "T current", "Object prev", "T prev", "ColumnSource<[?] extends Object>", "ColumnSource", + "getObject", "get", + "getPrevObject", "getPrev", "set\\(Object", "set(T", "set\\(long key, Object", "set(long key, T", "set\\(NULL_OBJECT", "set(null", "final ObjectChunk<[?] extends Values>", "final ObjectChunk", - "unbox\\((.*)\\)", "$1"); + "unbox\\((.*)\\)", "$1", + "NULL_OBJECT", "null", + "WritableObjectChunk<[?] super Values>", "WritableObjectChunk", + "Object value", "T value"); lines = ReplicationUtils.removeRegion(lines, "UnboxedSetter"); lines = ReplicationUtils.replaceRegion(lines, "Constructor", Arrays.asList( " public ObjectSingleValueSource(Class type) {", @@ -175,6 +181,36 @@ private static void replicateObjectSingleValue() throws IOException { FileUtils.writeLines(resultClassJavaFile, lines); } + private static void replicateBooleanSingleValue() throws IOException { + final String resultClassJavaPath = charToBoolean( + "engine/table/src/main/java/io/deephaven/engine/table/impl/sources/CharacterSingleValueSource.java"); + final File resultClassJavaFile = new File(resultClassJavaPath); + List lines = FileUtils.readLines(resultClassJavaFile, Charset.defaultCharset()); + lines = ReplicationUtils.addImport(lines, + "import io.deephaven.chunk.ObjectChunk;", + "import io.deephaven.chunk.WritableObjectChunk;"); + lines = ReplicationUtils.removeImport(lines, + "import io.deephaven.chunk.BooleanChunk;", + "import io.deephaven.chunk.WritableBooleanChunk;", + "import static io.deephaven.util.type.TypeUtils.unbox;"); + lines = globalReplacements(lines, + "boolean current", "Boolean current", + "boolean prev", "Boolean prev", + "super\\(boolean.class", "super(Boolean.class", + "set\\(long key, boolean", "set(long key, Boolean", + "getBoolean", "get", + "getPrevBoolean", "getPrev", + "boolean get", "Boolean get", + "boolean value", "Boolean value", + "final BooleanChunk<[?] extends Values>", "final ObjectChunk", + "final WritableBooleanChunk<[?] super Values>", "final WritableObjectChunk", + "asBooleanChunk\\(", "asObjectChunk(", + "asWritableBooleanChunk\\(", "asWritableObjectChunk(", + "unbox\\((.*)\\)", "$1"); + lines = ReplicationUtils.removeRegion(lines, "UnboxedSetter"); + FileUtils.writeLines(resultClassJavaFile, lines); + } + private static void replicateChunkColumnSource() throws IOException { charToAllButBoolean( "engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/CharChunkColumnSource.java");