Skip to content

Commit

Permalink
Addressed many PR comments, still WIP for reinterpreted columns
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Jan 17, 2025
1 parent dc0c99f commit f090edb
Show file tree
Hide file tree
Showing 12 changed files with 1,914 additions and 1,875 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,39 @@ protected void pop(int count) {
throw new UnsupportedOperationException("pop() must be overriden by rolling operators");
}

/**
* For cumulative operators only, this method will be called to pass the input chunk data to the operator and
* produce the output data values.
*
* @param inputKeys the keys for the input data rows (also matches the output keys)
* @param valueChunkArr the input data chunks needed by the operator for internal calculations
* @param tsChunk the timestamp chunk for the input data (if applicable)
* @param len the number of items in the input data chunks
*/
public abstract void accumulateCumulative(
RowSequence inputKeys,
Chunk<? extends Values>[] valueChunkArr,
LongChunk<? extends Values> tsChunk,
int len);

/**
* For windowed operators only, this method will be called to pass the input chunk data to the operator and
* produce the output data values. It is important to note that the size of the influencer (input) and affected
* (output) chunks are not likely be the same. We pass these sizes explicitly to the operators for the sake of
* the operators (such as {@link io.deephaven.engine.table.impl.updateby.countwhere.CountWhereOperator} with
* zero input columns) where no input chunks are provided but we must still process the exact number of input
* rows.
*
* @param inputKeys the keys for the input data rows (also matches the output keys)
* @param influencerValueChunkArr the input data chunks needed by the operator for internal calculations, these
* values will be pushed and popped into the current window
* @param affectedPosChunk the row positions of the affected rows
* @param influencerPosChunk the row positions of the influencer rows
* @param pushChunk a chunk containing the push instructions for each output row to be calculated
* @param popChunk a chunk containing the pop instructions for each output row to be calculated
* @param affectedCount how many affected (output) rows are being computed
* @param influencerCount how many influencer (input) rows are needed for the computation
*/
public abstract void accumulateRolling(
RowSequence inputKeys,
Chunk<? extends Values>[] influencerValueChunkArr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.deephaven.engine.table.impl.select.SelectColumn;
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.updateby.countwhere.CountWhereOperator;
import io.deephaven.engine.table.impl.updateby.delta.*;
import io.deephaven.engine.table.impl.updateby.em.*;
Expand Down Expand Up @@ -1319,9 +1320,11 @@ private UpdateByOperator makeCountWhereOperator(
final Map<String, ColumnSource<?>> columnSourceMap = new LinkedHashMap<>();
for (int i = 0; i < inputColumnNames.length; i++) {
final ColumnDefinition<?> columnDef = tableDef.getColumn(inputColumnNames[i]);
final ColumnSource<?> source =
NullValueColumnSource.getInstance(columnDef.getDataType(), columnDef.getComponentType());
columnSourceMap.put(inputColumnNames[i], source);
// Reinterpret the column source to a primitive type if necessary so the filter is working with
// primitive types.
final ColumnSource<?> maybeReinterpretedSource = ReinterpretUtils.maybeConvertToPrimitive(
NullValueColumnSource.getInstance(columnDef.getDataType(), columnDef.getComponentType()));
columnSourceMap.put(inputColumnNames[i], maybeReinterpretedSource);
}
final Table dummyTable = new QueryTable(RowSetFactory.empty().toTracking(), columnSourceMap);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ private void applyFilters(final int chunkSize) {
} else {
filter.chunkFilter.filterAnd(valueChunks[0], resultsChunk);
}
continue;
} else if (filter.conditionFilter != null) {
if (!initialized) {
filter.conditionFilter.filter(conditionalFilterContext, valueChunks, chunkSize, resultsChunk);
Expand All @@ -306,19 +305,18 @@ private void applyFilters(final int chunkSize) {
filter.conditionFilter.filterAnd(conditionalFilterContext, valueChunks, chunkSize,
resultsChunk);
}
continue;
}

if (remainingRows == null) {
// This is the first WhereFilter to run, initialize the remainingRows RowSet
remainingRows = initialized
? buildFromBooleanChunk(resultsChunk, chunkSize)
: RowSetFactory.flat(chunkSize);
}
try (final RowSet ignored = remainingRows) {
remainingRows = filter.whereFilter.filter(remainingRows, flatRowSet, chunkSourceTable, false);
} else {
if (remainingRows == null) {
// This is the first WhereFilter to run, initialize the remainingRows RowSet
remainingRows = initialized
? buildFromBooleanChunk(resultsChunk, chunkSize)
: RowSetFactory.flat(chunkSize);
}
try (final RowSet ignored = remainingRows) {
remainingRows = filter.whereFilter.filter(remainingRows, flatRowSet, chunkSourceTable, false);
}
initialized = true;
}
initialized = true;
}

try (final RowSet ignored = remainingRows; final RowSet ignored2 = flatRowSet) {
Expand Down
Loading

0 comments on commit f090edb

Please sign in to comment.