Skip to content

Commit

Permalink
Changes from self-review
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Jan 15, 2025
1 parent b69cdf2 commit e343f89
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.api.updateby.UpdateByControl;
import io.deephaven.api.updateby.UpdateByOperation;
import io.deephaven.api.updateby.spec.*;
import io.deephaven.base.verify.Require;
import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.ColumnDefinition;
Expand All @@ -23,7 +24,6 @@
import io.deephaven.engine.table.impl.select.SelectColumn;
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.engine.table.impl.updateby.countwhere.CountFilter;
import io.deephaven.engine.table.impl.updateby.countwhere.CountWhereOperator;
import io.deephaven.engine.table.impl.updateby.delta.*;
import io.deephaven.engine.table.impl.updateby.em.*;
Expand Down Expand Up @@ -425,7 +425,7 @@ public Void visit(CumProdSpec cps) {

@Override
public Void visit(CumCountWhereSpec spec) {
ops.add(makeCumCountWhereOperator(tableDef, spec));
ops.add(makeCountWhereOperator(tableDef, spec));
return null;
}

Expand Down Expand Up @@ -554,7 +554,7 @@ public Void visit(@NotNull final RollingCountSpec spec) {

@Override
public Void visit(@NotNull final RollingCountWhereSpec spec) {
ops.add(makeRollingCountWhereOperator(tableDef, spec));
ops.add(makeCountWhereOperator(tableDef, spec));
return null;
}

Expand Down Expand Up @@ -1261,86 +1261,21 @@ private UpdateByOperator makeRollingCountOperator(@NotNull final MatchPair pair,
}
}

private UpdateByOperator makeCumCountWhereOperator(
/**
* This is used for Cum/Rolling CountWhere operators
*/
private UpdateByOperator makeCountWhereOperator(
@NotNull final TableDefinition tableDef,
@NotNull final CumCountWhereSpec rs) {
final WhereFilter[] whereFilters = WhereFilter.fromInternal(rs.filter());
@NotNull final UpdateBySpec spec) {

final List<String> inputColumnNameList = new ArrayList<>();
final Map<String, Integer> inputColumnMap = new HashMap<>();
final List<int[]> filterInputColumnIndicesList = new ArrayList<>();
Require.eqTrue(spec instanceof CumCountWhereSpec || spec instanceof RollingCountWhereSpec,
"spec instanceof CumCountWhereSpec || spec instanceof RollingCountWhereSpec");

// Verify all the columns in the where filters are present in the table def and valid for use.
for (final WhereFilter whereFilter : whereFilters) {
whereFilter.init(tableDef);
if (whereFilter.isRefreshing()) {
throw new UnsupportedOperationException("CumCountWhere does not support refreshing filters");
}
final boolean isCumulative = spec instanceof CumCountWhereSpec;

// Compute which input sources this filter will use.
final List<String> filterColumnName = whereFilter.getColumns();
final int inputColumnCount = whereFilter.getColumns().size();
final int[] inputColumnIndices = new int[inputColumnCount];
for (int ii = 0; ii < inputColumnCount; ++ii) {
final String inputColumnName = filterColumnName.get(ii);
final int inputColumnIndex = inputColumnMap.computeIfAbsent(inputColumnName, k -> {
inputColumnNameList.add(inputColumnName);
return inputColumnNameList.size() - 1;
});
inputColumnIndices[ii] = inputColumnIndex;
}
filterInputColumnIndicesList.add(inputColumnIndices);
}

// Gather the input column type info.
final String[] inputColumnNames = inputColumnNameList.toArray(String[]::new);
final ChunkType[] inputChunkTypes = new ChunkType[inputColumnNames.length];
final Class<?>[] inputColumnTypes = new Class[inputColumnNames.length];
final Class<?>[] inputComponentTypes = new Class[inputColumnNames.length];
for (int i = 0; i < inputColumnNames.length; i++) {
final ColumnDefinition<?> columnDef = tableDef.getColumn(inputColumnNames[i]);
inputColumnTypes[i] = columnDef.getDataType();
inputChunkTypes[i] = ChunkType.fromElementType(inputColumnTypes[i]);
inputComponentTypes[i] = columnDef.getComponentType();
}

// Create a dummy table we can use to initialize filters.
final Map<String, ColumnSource<?>> columnSourceMap = new LinkedHashMap<>();
for (int i = 0; i < inputColumnNames.length; i++) {
final ColumnDefinition<?> columnDef = tableDef.getColumn(inputColumnNames[i]);
final ColumnSource<?> source =
NullValueColumnSource.getInstance(columnDef.getDataType(), columnDef.getComponentType());
columnSourceMap.put(inputColumnNames[i], source);
}
final Table dummyTable = new QueryTable(RowSetFactory.empty().toTracking(), columnSourceMap);

final CountFilter[] countFilters =
CountFilter.createCountFilters(whereFilters, dummyTable, filterInputColumnIndicesList);

// If any filter is a standard WhereFilter, we need a chunk source table.
final boolean chunkSourceTableRequired =
Arrays.asList(countFilters).stream().anyMatch(filter -> filter.whereFilter() != null);

// Create a new column pair with the same name for the left and right columns
final MatchPair pair = new MatchPair(rs.column().name(), rs.column().name());

return new CountWhereOperator(
pair,
countFilters,
inputColumnNames,
inputChunkTypes,
inputColumnTypes,
inputComponentTypes,
chunkSourceTableRequired);
}

private UpdateByOperator makeRollingCountWhereOperator(
@NotNull final TableDefinition tableDef,
@NotNull final RollingCountWhereSpec rs) {
final long prevWindowScaleUnits = rs.revWindowScale().getTimeScaleUnits();
final long fwdWindowScaleUnits = rs.fwdWindowScale().getTimeScaleUnits();

final WhereFilter[] whereFilters = WhereFilter.fromInternal(rs.filter());
final WhereFilter[] whereFilters = isCumulative
? WhereFilter.fromInternal(((CumCountWhereSpec) spec).filter())
: WhereFilter.fromInternal(((RollingCountWhereSpec) spec).filter());

final List<String> inputColumnNameList = new ArrayList<>();
final Map<String, Integer> inputColumnMap = new HashMap<>();
Expand All @@ -1350,7 +1285,7 @@ private UpdateByOperator makeRollingCountWhereOperator(
for (final WhereFilter whereFilter : whereFilters) {
whereFilter.init(tableDef);
if (whereFilter.isRefreshing()) {
throw new UnsupportedOperationException("RollingCountWhere does not support refreshing filters");
throw new UnsupportedOperationException("CountWhere does not support refreshing filters");
}

// Compute which input sources this filter will use.
Expand Down Expand Up @@ -1390,35 +1325,56 @@ private UpdateByOperator makeRollingCountWhereOperator(
}
final Table dummyTable = new QueryTable(RowSetFactory.empty().toTracking(), columnSourceMap);

final CountFilter[] countFilters =
CountFilter.createCountFilters(whereFilters, dummyTable, filterInputColumnIndicesList);
final CountWhereOperator.CountFilter[] countFilters =
CountWhereOperator.CountFilter.createCountFilters(whereFilters, dummyTable,
filterInputColumnIndicesList);

// If any filter is a standard WhereFilter, we need a chunk source table.
final boolean chunkSourceTableRequired =
Arrays.asList(countFilters).stream().anyMatch(filter -> filter.whereFilter() != null);

final String[] affectingColumns;
if (rs.revWindowScale().timestampCol() == null) {
affectingColumns = inputColumnNames;
// Create a new column pair with the same name for the left and right columns
final String columnName = isCumulative
? ((CumCountWhereSpec) spec).column().name()
: ((RollingCountWhereSpec) spec).column().name();
final MatchPair pair = new MatchPair(columnName, columnName);

// Create and return the operator.
if (isCumulative) {
return new CountWhereOperator(
pair,
countFilters,
inputColumnNames,
inputChunkTypes,
inputColumnTypes,
inputComponentTypes,
chunkSourceTableRequired);
} else {
affectingColumns = ArrayUtils.add(inputColumnNames, rs.revWindowScale().timestampCol());
}
final RollingCountWhereSpec rs = (RollingCountWhereSpec) spec;

// Create a new column pair with the same name for the left and right columns
final MatchPair pair = new MatchPair(rs.column().name(), rs.column().name());
final String[] affectingColumns;
if (rs.revWindowScale().timestampCol() == null) {
affectingColumns = inputColumnNames;
} else {
affectingColumns = ArrayUtils.add(inputColumnNames, rs.revWindowScale().timestampCol());
}

return new CountWhereOperator(
pair,
affectingColumns,
rs.revWindowScale().timestampCol(),
prevWindowScaleUnits,
fwdWindowScaleUnits,
countFilters,
inputColumnNames,
inputChunkTypes,
inputColumnTypes,
inputComponentTypes,
chunkSourceTableRequired);
final long prevWindowScaleUnits = rs.revWindowScale().getTimeScaleUnits();
final long fwdWindowScaleUnits = rs.fwdWindowScale().getTimeScaleUnits();

return new CountWhereOperator(
pair,
affectingColumns,
rs.revWindowScale().timestampCol(),
prevWindowScaleUnits,
fwdWindowScaleUnits,
countFilters,
inputColumnNames,
inputChunkTypes,
inputColumnTypes,
inputComponentTypes,
chunkSourceTableRequired);
}
}

private UpdateByOperator makeRollingStdOperator(@NotNull final MatchPair pair,
Expand Down

This file was deleted.

Loading

0 comments on commit e343f89

Please sign in to comment.