From 663fae6b5ebdfed05e63d8f6b8bd78a3e6561b25 Mon Sep 17 00:00:00 2001 From: Nate Bauernfeind Date: Thu, 15 Aug 2024 17:21:48 -0600 Subject: [PATCH] perf: Remove Unnecessary Recursion from Select/Update (#5924) This reorganizes how select/update builds the resulting formula columns, column source maps, and work to be processed on the PUG. Instead of using recursion, it now builds things up in a dynamic-programming style without redoing work. As a result, select/update operations with thousands of columns now use significantly less memory and cpu to perform the same work. A simple 50k select-column update operation initializes in 2.5m (source starts empty and this is on an m2 mac) and has cycle times of ~650ms. --- .../java/io/deephaven/util/SimpleTypeMap.java | 43 +- .../impl/QueryCompilerRequestProcessor.java | 47 +- .../engine/table/impl/QueryTable.java | 88 +- .../table/impl/SelectOrUpdateListener.java | 19 +- .../table/impl/ShiftedColumnsFactory.java | 4 +- .../table/impl/lang/QueryLanguageParser.java | 36 +- .../impl/select/AbstractConditionFilter.java | 2 +- .../impl/select/AbstractFormulaColumn.java | 38 +- .../table/impl/select/DhFormulaColumn.java | 21 +- .../engine/table/impl/select/MatchFilter.java | 3 +- .../engine/table/impl/select/RangeFilter.java | 2 +- .../impl/select/analyzers/BaseLayer.java | 97 -- .../select/analyzers/ConstantColumnLayer.java | 63 +- .../select/analyzers/DependencyLayerBase.java | 74 +- .../select/analyzers/PreserveColumnLayer.java | 76 +- .../select/analyzers/RedirectionLayer.java | 91 +- .../analyzers/SelectAndViewAnalyzer.java | 1013 +++++++++++------ .../SelectAndViewAnalyzerWrapper.java | 128 --- .../select/analyzers/SelectColumnLayer.java | 250 ++-- .../analyzers/SelectOrViewColumnLayer.java | 17 +- .../select/analyzers/StaticFlattenLayer.java | 146 --- .../select/analyzers/ViewColumnLayer.java | 25 +- .../impl/select/codegen/FormulaAnalyzer.java | 64 +- .../impl/lang/TestQueryLanguageParser.java | 2 +- .../VarListChunkInputStreamGenerator.java | 12 +- .../barrage/chunk/VarListChunkReader.java | 2 + .../BoxedBooleanArrayExpansionKernel.java | 2 +- 27 files changed, 1146 insertions(+), 1219 deletions(-) delete mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/BaseLayer.java delete mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzerWrapper.java delete mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java diff --git a/Util/src/main/java/io/deephaven/util/SimpleTypeMap.java b/Util/src/main/java/io/deephaven/util/SimpleTypeMap.java index 7b15c86f8b5..c1ab8b67abb 100644 --- a/Util/src/main/java/io/deephaven/util/SimpleTypeMap.java +++ b/Util/src/main/java/io/deephaven/util/SimpleTypeMap.java @@ -7,17 +7,56 @@ public final class SimpleTypeMap { - public static SimpleTypeMap create(V forBoolean, V forChar, V forByte, V forShort, V forInt, V forLong, - V forFloat, V forDouble, V forObject) { + /** + * Create a mapping from type {@link Class classes} to a value. + * + * @param forBoolean The mapping for {@code boolean} types (note {@link Boolean} maps to {@code forObject}) + * @param forChar The mapping for {@code char} and {@link Character} types + * @param forByte The mapping for {@code byte} and {@link Byte} types + * @param forShort The mapping for {@code short} and {@link Short} types + * @param forInt The mapping for {@code int} and {@link Integer} types + * @param forLong The mapping for {@code long} and {@link Long} types + * @param forFloat The mapping for {@code float} and {@link Float} types + * @param forDouble The mapping for {@code double} and {@link Double} types + * @param forObject The mapping for all other types + * @return A SimpleTypeMap to the provided values + */ + public static SimpleTypeMap create( + V forBoolean, + V forChar, + V forByte, + V forShort, + V forInt, + V forLong, + V forFloat, + V forDouble, + V forObject) { final HashMap, V> map = new HashMap<>(); + map.put(boolean.class, forBoolean); + // Note: Booleans are treated as Objects, unlike other boxed primitives + map.put(char.class, forChar); + map.put(Character.class, forChar); + map.put(byte.class, forByte); + map.put(Byte.class, forByte); + map.put(short.class, forShort); + map.put(Short.class, forShort); + map.put(int.class, forInt); + map.put(Integer.class, forInt); + map.put(long.class, forLong); + map.put(Long.class, forLong); + map.put(float.class, forFloat); + map.put(Float.class, forFloat); + map.put(double.class, forDouble); + map.put(Double.class, forDouble); + return new SimpleTypeMap<>(map, forObject); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryCompilerRequestProcessor.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryCompilerRequestProcessor.java index ae321271be4..49e6e2e6b14 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryCompilerRequestProcessor.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryCompilerRequestProcessor.java @@ -4,12 +4,11 @@ package io.deephaven.engine.table.impl; import io.deephaven.UncheckedDeephavenException; -import io.deephaven.api.util.NameValidator; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.context.QueryCompiler; import io.deephaven.engine.context.QueryCompilerRequest; -import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; +import io.deephaven.engine.table.impl.select.codegen.FormulaAnalyzer; import io.deephaven.util.MultiException; import io.deephaven.util.SafeCloseable; import io.deephaven.util.CompletionStageFuture; @@ -18,43 +17,43 @@ import org.jetbrains.annotations.VisibleForTesting; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -public interface QueryCompilerRequestProcessor { +public abstract class QueryCompilerRequestProcessor { /** * @return An immediate QueryCompilerRequestProcessor */ - static QueryCompilerRequestProcessor.ImmediateProcessor immediate() { + public static QueryCompilerRequestProcessor.ImmediateProcessor immediate() { return new ImmediateProcessor(); } /** * @return A batch QueryCompilerRequestProcessor */ - static QueryCompilerRequestProcessor.BatchProcessor batch() { + public static QueryCompilerRequestProcessor.BatchProcessor batch() { return new BatchProcessor(); } /** - * @return a CachingSupplier that supplies a snapshot of the current query scope variables + * @return a CachingSupplier that supplies a snapshot of current query scope variables and query library imports */ @VisibleForTesting - static CachingSupplier> newQueryScopeVariableSupplier() { - final QueryScope queryScope = ExecutionContext.getContext().getQueryScope(); - return new CachingSupplier<>(() -> Collections.unmodifiableMap( - queryScope.toMap((name, value) -> NameValidator.isValidQueryParameterName(name)))); + public static CachingSupplier newFormulaImportsSupplier() { + return new CachingSupplier<>(FormulaAnalyzer.Imports::new); } + private final CachingSupplier formulaImportsSupplier = newFormulaImportsSupplier(); + /** - * @return a lazily cached snapshot of the current query scope variables + * @return a lazily cached snapshot of current query scope variables and query library imports */ - Map getQueryScopeVariables(); + public final FormulaAnalyzer.Imports getFormulaImports() { + return formulaImportsSupplier.get(); + } /** * Submit a request for compilation. The QueryCompilerRequestProcessor is not required to immediately compile this @@ -62,24 +61,16 @@ static CachingSupplier> newQueryScopeVariableSupplier() { * * @param request the request to compile */ - CompletionStageFuture> submit(@NotNull QueryCompilerRequest request); + public abstract CompletionStageFuture> submit(@NotNull QueryCompilerRequest request); /** * A QueryCompilerRequestProcessor that immediately compiles requests. */ - class ImmediateProcessor implements QueryCompilerRequestProcessor { - - private final CachingSupplier> queryScopeVariableSupplier = newQueryScopeVariableSupplier(); - + public static class ImmediateProcessor extends QueryCompilerRequestProcessor { private ImmediateProcessor() { // force use of static factory method } - @Override - public Map getQueryScopeVariables() { - return queryScopeVariableSupplier.get(); - } - @Override public CompletionStageFuture> submit(@NotNull final QueryCompilerRequest request) { final String desc = "Compile: " + request.description(); @@ -108,20 +99,14 @@ public CompletionStageFuture> submit(@NotNull final QueryCompilerReques *

* The compile method must be called to actually compile the requests. */ - class BatchProcessor implements QueryCompilerRequestProcessor { + public static class BatchProcessor extends QueryCompilerRequestProcessor { private final List requests = new ArrayList<>(); private final List>> resolvers = new ArrayList<>(); - private final CachingSupplier> queryScopeVariableSupplier = newQueryScopeVariableSupplier(); private BatchProcessor() { // force use of static factory method } - @Override - public Map getQueryScopeVariables() { - return queryScopeVariableSupplier.get(); - } - @Override public CompletionStageFuture> submit(@NotNull final QueryCompilerRequest request) { final CompletionStageFuture.Resolver> resolver = CompletionStageFuture.make(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index be12408a63b..1c227ea3ae7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -43,7 +43,6 @@ import io.deephaven.engine.table.impl.remote.ConstructSnapshot; import io.deephaven.engine.table.impl.select.*; import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer; -import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper; import io.deephaven.engine.table.impl.snapshot.SnapshotIncrementalListener; import io.deephaven.engine.table.impl.snapshot.SnapshotInternalListener; import io.deephaven.engine.table.impl.snapshot.SnapshotUtils; @@ -1498,10 +1497,9 @@ public Table update(final Collection newColumns) { */ public SelectValidationResult validateSelect(final SelectColumn... selectColumns) { final SelectColumn[] clones = SelectColumn.copyFrom(selectColumns); - SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create( - this, SelectAndViewAnalyzer.Mode.SELECT_STATIC, columns, rowSet, getModifiedColumnSetForUpdates(), true, - false, clones); - return new SelectValidationResult(analyzerWrapper.getAnalyzer(), clones); + SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.createContext( + this, SelectAndViewAnalyzer.Mode.SELECT_STATIC, true, false, clones); + return new SelectValidationResult(analyzerContext.createAnalyzer(), clones); } private Table selectOrUpdate(Flavor flavor, final SelectColumn... selectColumns) { @@ -1526,18 +1524,16 @@ private Table selectOrUpdate(Flavor flavor, final SelectColumn... selectColumns) } } final boolean publishTheseSources = flavor == Flavor.Update; - final SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create( - this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSources, true, - selectColumns); + final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = SelectAndViewAnalyzer.createContext( + this, mode, publishTheseSources, true, selectColumns); - final SelectAndViewAnalyzer analyzer = analyzerWrapper.getAnalyzer(); - final SelectColumn[] processedColumns = analyzerWrapper.getProcessedColumns() + final SelectAndViewAnalyzer analyzer = analyzerContext.createAnalyzer(); + final SelectColumn[] processedColumns = analyzerContext.getProcessedColumns() .toArray(SelectColumn[]::new); // Init all the rows by cooking up a fake Update final TableUpdate fakeUpdate = new TableUpdateImpl( - analyzer.alreadyFlattenedSources() ? RowSetFactory.flat(rowSet.size()) : rowSet.copy(), - RowSetFactory.empty(), RowSetFactory.empty(), + rowSet.copy(), RowSetFactory.empty(), RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.ALL); final CompletableFuture waitForResult = new CompletableFuture<>(); @@ -1558,8 +1554,10 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc new SelectAndViewAnalyzer.UpdateHelper(emptyRowSet, fakeUpdate)) { try { - analyzer.applyUpdate(fakeUpdate, emptyRowSet, updateHelper, jobScheduler, - liveResultCapture, analyzer.futureCompletionHandler(waitForResult)); + analyzer.applyUpdate( + fakeUpdate, emptyRowSet, updateHelper, jobScheduler, liveResultCapture, + () -> waitForResult.complete(null), + waitForResult::completeExceptionally); } catch (Exception e) { waitForResult.completeExceptionally(e); } @@ -1580,14 +1578,15 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc } } - final TrackingRowSet resultRowSet = - analyzer.flattenedResult() ? RowSetFactory.flat(rowSet.size()).toTracking() : rowSet; - resultTable = new QueryTable(resultRowSet, analyzerWrapper.getPublishedColumnResources()); + final TrackingRowSet resultRowSet = analyzer.flatResult() && !rowSet.isFlat() + ? RowSetFactory.flat(rowSet.size()).toTracking() + : rowSet; + resultTable = new QueryTable(resultRowSet, analyzerContext.getPublishedColumnSources()); if (liveResultCapture != null) { analyzer.startTrackingPrev(); - final Map effects = analyzerWrapper.calcEffects(); - final SelectOrUpdateListener soul = new SelectOrUpdateListener(updateDescription, this, - resultTable, effects, analyzer); + final Map effects = analyzerContext.calcEffects(); + final SelectOrUpdateListener soul = new SelectOrUpdateListener( + updateDescription, this, resultTable, effects, analyzer); liveResultCapture.transferTo(soul); addUpdateListener(soul); ConstituentDependency.install(resultTable, soul); @@ -1596,11 +1595,6 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc resultTable.setFlat(); } propagateDataIndexes(processedColumns, resultTable); - for (final ColumnSource columnSource : analyzer.getNewColumnSources().values()) { - if (columnSource instanceof PossiblyImmutableColumnSource) { - ((PossiblyImmutableColumnSource) columnSource).setImmutable(); - } - } } } propagateFlatness(resultTable); @@ -1610,10 +1604,10 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc } else { maybeCopyColumnDescriptions(resultTable); } - SelectAndViewAnalyzerWrapper.UpdateFlavor updateFlavor = flavor == Flavor.Update - ? SelectAndViewAnalyzerWrapper.UpdateFlavor.Update - : SelectAndViewAnalyzerWrapper.UpdateFlavor.Select; - return analyzerWrapper.applyShiftsAndRemainingColumns(this, resultTable, updateFlavor); + SelectAndViewAnalyzer.UpdateFlavor updateFlavor = flavor == Flavor.Update + ? SelectAndViewAnalyzer.UpdateFlavor.Update + : SelectAndViewAnalyzer.UpdateFlavor.Select; + return analyzerContext.applyShiftsAndRemainingColumns(this, resultTable, updateFlavor); })); } @@ -1761,15 +1755,16 @@ updateDescription, sizeForInstrumentation(), () -> { createSnapshotControlIfRefreshing(OperationSnapshotControl::new); initializeWithSnapshot(humanReadablePrefix, sc, (usePrev, beforeClockValue) -> { final boolean publishTheseSources = flavor == Flavor.UpdateView; - final SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create( - this, SelectAndViewAnalyzer.Mode.VIEW_EAGER, columns, rowSet, - getModifiedColumnSetForUpdates(), publishTheseSources, true, viewColumns); - final SelectColumn[] processedViewColumns = analyzerWrapper.getProcessedColumns() + final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = + SelectAndViewAnalyzer.createContext( + this, SelectAndViewAnalyzer.Mode.VIEW_EAGER, + publishTheseSources, true, viewColumns); + final SelectColumn[] processedViewColumns = analyzerContext.getProcessedColumns() .toArray(SelectColumn[]::new); QueryTable queryTable = new QueryTable( - rowSet, analyzerWrapper.getPublishedColumnResources()); + rowSet, analyzerContext.getPublishedColumnSources()); if (sc != null) { - final Map effects = analyzerWrapper.calcEffects(); + final Map effects = analyzerContext.calcEffects(); final TableUpdateListener listener = new ViewOrUpdateViewListener(updateDescription, this, queryTable, effects); sc.setListenerAndResult(listener, queryTable); @@ -1786,11 +1781,11 @@ updateDescription, sizeForInstrumentation(), () -> { } else { maybeCopyColumnDescriptions(queryTable); } - final SelectAndViewAnalyzerWrapper.UpdateFlavor updateFlavor = + final SelectAndViewAnalyzer.UpdateFlavor updateFlavor = flavor == Flavor.UpdateView - ? SelectAndViewAnalyzerWrapper.UpdateFlavor.UpdateView - : SelectAndViewAnalyzerWrapper.UpdateFlavor.View; - queryTable = analyzerWrapper.applyShiftsAndRemainingColumns( + ? SelectAndViewAnalyzer.UpdateFlavor.UpdateView + : SelectAndViewAnalyzer.UpdateFlavor.View; + queryTable = analyzerContext.applyShiftsAndRemainingColumns( this, queryTable, updateFlavor); result.setValue(queryTable); @@ -1851,14 +1846,13 @@ public Table lazyUpdate(final Collection newColumns) { sizeForInstrumentation(), () -> { checkInitiateOperation(); - final SelectAndViewAnalyzerWrapper analyzerWrapper = SelectAndViewAnalyzer.create( - this, SelectAndViewAnalyzer.Mode.VIEW_LAZY, columns, rowSet, - getModifiedColumnSetForUpdates(), - true, true, selectColumns); - final SelectColumn[] processedColumns = analyzerWrapper.getProcessedColumns() + final SelectAndViewAnalyzer.AnalyzerContext analyzerContext = + SelectAndViewAnalyzer.createContext( + this, SelectAndViewAnalyzer.Mode.VIEW_LAZY, true, true, selectColumns); + final SelectColumn[] processedColumns = analyzerContext.getProcessedColumns() .toArray(SelectColumn[]::new); final QueryTable result = new QueryTable( - rowSet, analyzerWrapper.getPublishedColumnResources()); + rowSet, analyzerContext.getPublishedColumnSources()); if (isRefreshing()) { addUpdateListener(new ListenerImpl( "lazyUpdate(" + Arrays.deepToString(processedColumns) + ')', this, result)); @@ -1868,8 +1862,8 @@ public Table lazyUpdate(final Collection newColumns) { copySortableColumns(result, processedColumns); maybeCopyColumnDescriptions(result, processedColumns); - return analyzerWrapper.applyShiftsAndRemainingColumns( - this, result, SelectAndViewAnalyzerWrapper.UpdateFlavor.LazyUpdate); + return analyzerContext.applyShiftsAndRemainingColumns( + this, result, SelectAndViewAnalyzer.UpdateFlavor.LazyUpdate); }); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java index 285cdeabb94..d7236ce6010 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SelectOrUpdateListener.java @@ -15,8 +15,8 @@ import io.deephaven.engine.table.impl.util.JobScheduler; import io.deephaven.engine.table.impl.util.UpdateGraphJobScheduler; -import java.util.BitSet; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * A Shift-Aware listener for Select or Update. It uses the SelectAndViewAnalyzer to calculate how columns affect other @@ -29,8 +29,6 @@ class SelectOrUpdateListener extends BaseTable.ListenerImpl { private final SelectAndViewAnalyzer analyzer; private volatile boolean updateInProgress = false; - private final BitSet completedColumns = new BitSet(); - private final BitSet allNewColumns = new BitSet(); private final boolean enableParallelUpdate; /** @@ -61,7 +59,6 @@ class SelectOrUpdateListener extends BaseTable.ListenerImpl { (QueryTable.ENABLE_PARALLEL_SELECT_AND_UPDATE && getUpdateGraph().parallelismFactor() > 1)) && analyzer.allowCrossColumnParallelization(); - analyzer.setAllNewColumns(allNewColumns); } @Override @@ -76,7 +73,6 @@ public void onUpdate(final TableUpdate upstream) { // - create parallel arrays of pre-shift-keys and post-shift-keys so we can move them in chunks updateInProgress = true; - completedColumns.clear(); final TableUpdate acquiredUpdate = upstream.acquire(); final WritableRowSet toClear = resultRowSet.copyPrev(); @@ -91,15 +87,16 @@ public void onUpdate(final TableUpdate upstream) { jobScheduler = new ImmediateJobScheduler(); } + // do not allow a double-notify + final AtomicBoolean hasNotified = new AtomicBoolean(); analyzer.applyUpdate(acquiredUpdate, toClear, updateHelper, jobScheduler, this, - new SelectAndViewAnalyzer.SelectLayerCompletionHandler(allNewColumns, completedColumns) { - @Override - public void onAllRequiredColumnsCompleted() { + () -> { + if (!hasNotified.getAndSet(true)) { completionRoutine(acquiredUpdate, jobScheduler, toClear, updateHelper); } - - @Override - protected void onError(Exception error) { + }, + error -> { + if (!hasNotified.getAndSet(true)) { handleException(error); } }); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftedColumnsFactory.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftedColumnsFactory.java index 4ab1c70248c..c84deddd93e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftedColumnsFactory.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftedColumnsFactory.java @@ -117,7 +117,7 @@ import io.deephaven.engine.table.impl.select.FormulaColumn; import io.deephaven.engine.table.impl.select.WhereFilter; import io.deephaven.engine.table.impl.select.WhereFilterFactory; -import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper; +import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer; import io.deephaven.util.mutable.MutableInt; import org.jetbrains.annotations.NotNull; @@ -260,7 +260,7 @@ private static Pair getShiftedTableFilterPair( public static Table getShiftedColumnsTable( @NotNull final Table source, @NotNull FormulaColumn formulaColumn, - @NotNull SelectAndViewAnalyzerWrapper.UpdateFlavor updateFlavor) { + @NotNull SelectAndViewAnalyzer.UpdateFlavor updateFlavor) { String nuggetName = "getShiftedColumnsTable( " + formulaColumn + ", " + updateFlavor + ") "; return QueryPerformanceRecorder.withNugget(nuggetName, source.sizeForInstrumentation(), () -> { Table tableSoFar = source; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/lang/QueryLanguageParser.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/lang/QueryLanguageParser.java index 5cd2418c83b..d457a4ab0b3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/lang/QueryLanguageParser.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/lang/QueryLanguageParser.java @@ -182,6 +182,8 @@ public final class QueryLanguageParser extends GenericVisitorAdapter, Q * Create a QueryLanguageParser and parse the given {@code expression}. After construction, the * {@link QueryLanguageParser.Result result} of parsing the {@code expression} is available with the * {@link #getResult()}} method. + *

+ * Note that the provided Collections and Maps must not be mutated concurrently with or after construction. * * @param expression The query language expression to parse * @param packageImports Wildcard package imports @@ -190,9 +192,10 @@ public final class QueryLanguageParser extends GenericVisitorAdapter, Q * imported. * @param variables A map of the names of scope variables to their types * @param variableTypeArguments A map of the names of scope variables to their type arguments - * @param unboxArguments If true it will unbox the query scope arguments - * @param queryScopeVariables A mutable map of the names of query scope variables to their values + * @param queryScopeVariables A map of the names of query scope variables to their values * @param columnVariables A set of column variable names + * @param unboxArguments If true it will unbox the query scope arguments + * @param timeConversionResult The result of converting time literals in the expression * @throws QueryLanguageParseException If any exception or error is encountered */ public QueryLanguageParser( @@ -225,6 +228,8 @@ public QueryLanguageParser( * Create a QueryLanguageParser and parse the given {@code expression}. After construction, the * {@link QueryLanguageParser.Result result} of parsing the {@code expression} is available with the * {@link #getResult()}} method. + *

+ * Note that the provided Collections and Maps must not be mutated concurrently with or after construction. * * @param expression The query language expression to parse * @param packageImports Wildcard package imports @@ -247,6 +252,28 @@ public QueryLanguageParser( variableTypeArguments, null, null, true, null); } + /** + * Create a QueryLanguageParser and parse the given {@code expression}. After construction, the + * {@link QueryLanguageParser.Result result} of parsing the {@code expression} is available with the + * {@link #getResult()}} method. + *

+ * Note that the provided Collections and Maps must not be mutated concurrently with or after construction. + * + * @param expression The query language expression to parse + * @param packageImports Wildcard package imports + * @param classImports Individual class imports + * @param staticImports Wildcard static imports. All static variables and methods for the given classes are + * imported. + * @param variables A map of the names of scope variables to their types + * @param variableTypeArguments A map of the names of scope variables to their type arguments + * @param queryScopeVariables A map of the names of query scope variables to their values + * @param columnVariables A set of column variable names + * @param unboxArguments If true it will unbox the query scope arguments + * @param verifyIdempotence If true, the parser will verify that the result expression will not mutate when parsed + * @param pyCallableWrapperImplName The name of the PyCallableWrapper implementation to use + * @param timeConversionResult The result of converting time literals in the expression + * @throws QueryLanguageParseException If any exception or error is encountered + */ @VisibleForTesting QueryLanguageParser( String expression, @@ -264,9 +291,8 @@ public QueryLanguageParser( this.packageImports = packageImports == null ? Collections.emptySet() : Set.copyOf(packageImports); this.classImports = classImports == null ? Collections.emptySet() : Set.copyOf(classImports); this.staticImports = staticImports == null ? Collections.emptySet() : Set.copyOf(staticImports); - this.variables = variables == null ? Collections.emptyMap() : Map.copyOf(variables); - this.variableTypeArguments = - variableTypeArguments == null ? Collections.emptyMap() : Map.copyOf(variableTypeArguments); + this.variables = variables == null ? Collections.emptyMap() : variables; + this.variableTypeArguments = variableTypeArguments == null ? Collections.emptyMap() : variableTypeArguments; this.queryScopeVariables = queryScopeVariables == null ? new HashMap<>() : queryScopeVariables; this.columnVariables = columnVariables == null ? Collections.emptySet() : columnVariables; this.unboxArguments = unboxArguments; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/AbstractConditionFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/AbstractConditionFilter.java index e96bb529f1e..2007eee6526 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/AbstractConditionFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/AbstractConditionFilter.java @@ -89,7 +89,7 @@ public synchronized void init( try { final QueryLanguageParser.Result result = FormulaAnalyzer.parseFormula( formula, tableDefinition.getColumnNameMap(), outerToInnerNames, - compilationProcessor.getQueryScopeVariables(), unboxArguments); + compilationProcessor.getFormulaImports(), unboxArguments); formulaShiftColPair = result.getFormulaShiftColPair(); if (formulaShiftColPair != null) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/AbstractFormulaColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/AbstractFormulaColumn.java index 06cbd79fd70..ace064b9353 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/AbstractFormulaColumn.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/AbstractFormulaColumn.java @@ -51,7 +51,7 @@ public abstract class AbstractFormulaColumn implements FormulaColumn { private Formula formula; protected QueryScopeParam[] params; protected Map> columnSources; - protected Map> columnDefinitions; + protected Map> columnDefinitions; private TrackingRowSet rowSet; protected Class returnedType; public static final String COLUMN_SUFFIX = "_"; @@ -90,12 +90,28 @@ public List initInputs( @NotNull final Map> columnsOfInterest) { this.rowSet = rowSet; - this.columnSources = columnsOfInterest; - if (usedColumns != null) { - return usedColumns; + if (usedColumns == null) { + initDef(extractDefinitions(columnsOfInterest), QueryCompilerRequestProcessor.immediate()); } + this.columnSources = filterColumnSources(columnsOfInterest); - return initDef(extractDefinitions(columnsOfInterest), QueryCompilerRequestProcessor.immediate()); + return usedColumns; + } + + private Map> filterColumnSources( + final Map> columnsOfInterest) { + if (usedColumns.isEmpty() && usedColumnArrays.isEmpty()) { + return Map.of(); + } + + final HashMap> sources = new HashMap<>(); + for (String columnName : usedColumns) { + sources.put(columnName, columnsOfInterest.get(columnName)); + } + for (String columnName : usedColumnArrays) { + sources.put(columnName, columnsOfInterest.get(columnName)); + } + return sources; } @Override @@ -119,28 +135,32 @@ public void validateSafeForRefresh(BaseTable sourceTable) { } protected void applyUsedVariables( - @NotNull final Map> columnDefinitionMap, + @NotNull final Map> parentColumnDefinitions, @NotNull final Set variablesUsed, @NotNull final Map possibleParams) { // the column definition map passed in is being mutated by the caller, so we need to make a copy - columnDefinitions = Map.copyOf(columnDefinitionMap); + columnDefinitions = new HashMap<>(); final List> paramsList = new ArrayList<>(); usedColumns = new ArrayList<>(); usedColumnArrays = new ArrayList<>(); for (String variable : variablesUsed) { + ColumnDefinition columnDefinition = parentColumnDefinitions.get(variable); if (variable.equals("i")) { usesI = true; } else if (variable.equals("ii")) { usesII = true; } else if (variable.equals("k")) { usesK = true; - } else if (columnDefinitions.get(variable) != null) { + } else if (columnDefinition != null) { + columnDefinitions.put(variable, columnDefinition); usedColumns.add(variable); } else { String strippedColumnName = variable.substring(0, Math.max(0, variable.length() - COLUMN_SUFFIX.length())); - if (variable.endsWith(COLUMN_SUFFIX) && columnDefinitions.get(strippedColumnName) != null) { + columnDefinition = parentColumnDefinitions.get(strippedColumnName); + if (variable.endsWith(COLUMN_SUFFIX) && columnDefinition != null) { + columnDefinitions.put(strippedColumnName, columnDefinition); usedColumnArrays.add(strippedColumnName); } else if (possibleParams.containsKey(variable)) { paramsList.add(new QueryScopeParam<>(variable, possibleParams.get(variable))); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DhFormulaColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DhFormulaColumn.java index 56a8aecfddc..7ed70800dfa 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DhFormulaColumn.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DhFormulaColumn.java @@ -34,8 +34,7 @@ import io.deephaven.io.logger.Logger; import io.deephaven.util.CompletionStageFuture; import io.deephaven.util.type.TypeUtils; -import io.deephaven.vector.ObjectVector; -import io.deephaven.vector.Vector; +import io.deephaven.vector.VectorFactory; import org.jetbrains.annotations.NotNull; import org.jpy.PyObject; @@ -161,21 +160,7 @@ private static Map> makeNameToTypeDict(final String[] names, } public static Class getVectorType(Class declaredType) { - if (!io.deephaven.util.type.TypeUtils.isConvertibleToPrimitive(declaredType) || declaredType == boolean.class - || declaredType == Boolean.class) { - return ObjectVector.class; - } else { - final String declaredTypeSimpleName = - io.deephaven.util.type.TypeUtils.getUnboxedType(declaredType).getSimpleName(); - try { - return Class.forName(Vector.class.getPackage().getName() + '.' - + Character.toUpperCase(declaredTypeSimpleName.charAt(0)) - + declaredTypeSimpleName.substring(1) - + "Vector"); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Unexpected exception for type " + declaredType, e); - } - } + return VectorFactory.forElementType(declaredType).vectorType(); } @Override @@ -195,7 +180,7 @@ public List initDef( try { final QueryLanguageParser.Result result = FormulaAnalyzer.parseFormula( formulaString, columnDefinitionMap, Collections.emptyMap(), - compilationRequestProcessor.getQueryScopeVariables()); + compilationRequestProcessor.getFormulaImports()); analyzedFormula = FormulaAnalyzer.analyze(formulaString, columnDefinitionMap, result); hasConstantValue = result.isConstantValueExpression(); formulaShiftColPair = result.getFormulaShiftColPair(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MatchFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MatchFilter.java index d4026f087aa..4a7921e39b1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MatchFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MatchFilter.java @@ -223,7 +223,8 @@ public synchronized void init( return; } final List valueList = new ArrayList<>(); - final Map queryScopeVariables = compilationProcessor.getQueryScopeVariables(); + final Map queryScopeVariables = + compilationProcessor.getFormulaImports().getQueryScopeVariables(); final ColumnTypeConvertor convertor = ColumnTypeConvertorFactory.getConvertor(column.getDataType()); for (String strValue : strValues) { convertor.convertValue(column, tableDefinition, strValue, queryScopeVariables, valueList::add); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/RangeFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/RangeFilter.java index 65d65b75e03..5236862c6d5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/RangeFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/RangeFilter.java @@ -183,7 +183,7 @@ public void init( try { boolean wasAnArrayType = convertor.convertValue( - def, tableDefinition, value, compilationProcessor.getQueryScopeVariables(), + def, tableDefinition, value, compilationProcessor.getFormulaImports().getQueryScopeVariables(), realValue::setValue); if (wasAnArrayType) { conversionError = diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/BaseLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/BaseLayer.java deleted file mode 100644 index fb79fc32e01..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/BaseLayer.java +++ /dev/null @@ -1,97 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.engine.table.impl.select.analyzers; - -import io.deephaven.base.log.LogOutput; -import io.deephaven.engine.liveness.LivenessNode; -import io.deephaven.engine.table.TableUpdate; -import io.deephaven.engine.table.ModifiedColumnSet; -import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.table.impl.util.JobScheduler; -import org.jetbrains.annotations.Nullable; - -import java.util.*; - -public class BaseLayer extends SelectAndViewAnalyzer { - private final Map> sources; - private final boolean publishTheseSources; - - BaseLayer(Map> sources, boolean publishTheseSources) { - super(BASE_LAYER_INDEX); - this.sources = sources; - this.publishTheseSources = publishTheseSources; - } - - @Override - int getLayerIndexFor(String column) { - if (sources.containsKey(column)) { - return BASE_LAYER_INDEX; - } - throw new IllegalArgumentException("Unknown column: " + column); - } - - @Override - void setBaseBits(BitSet bitset) { - bitset.set(BASE_LAYER_INDEX); - } - - @Override - public void setAllNewColumns(BitSet bitset) { - bitset.set(BASE_LAYER_INDEX); - } - - @Override - void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy) { - mcsBuilder.setAll(remainingDepsToSatisfy.toArray(String[]::new)); - } - - @Override - final Map> getColumnSourcesRecurse(GetMode mode) { - // We specifically return a LinkedHashMap so the columns get populated in order - final Map> result = new LinkedHashMap<>(); - if (mode == GetMode.All || (mode == GetMode.Published && publishTheseSources)) { - result.putAll(sources); - } - return result; - } - - @Override - public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler, - @Nullable LivenessNode liveResultOwner, SelectLayerCompletionHandler onCompletion) { - // nothing to do at the base layer - onCompletion.onLayerCompleted(BASE_LAYER_INDEX); - } - - @Override - final Map> calcDependsOnRecurse(boolean forcePublishAllSources) { - final Map> result = new HashMap<>(); - if (publishTheseSources || forcePublishAllSources) { - for (final String col : sources.keySet()) { - result.computeIfAbsent(col, dummy -> new HashSet<>()).add(col); - } - } - return result; - } - - @Override - public SelectAndViewAnalyzer getInner() { - return null; - } - - @Override - public void startTrackingPrev() { - // nothing to do - } - - @Override - public LogOutput append(LogOutput logOutput) { - return logOutput.append("{BaseLayer").append(", layerIndex=").append(getLayerIndex()).append("}"); - } - - @Override - public boolean allowCrossColumnParallelization() { - return true; - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ConstantColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ConstantColumnLayer.java index c5eff7f3132..4738d944825 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ConstantColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ConstantColumnLayer.java @@ -5,41 +5,23 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.rowset.RowSequence; -import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.ChunkSource; import io.deephaven.engine.table.ModifiedColumnSet; -import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.WritableColumnSource; import io.deephaven.engine.table.impl.select.SelectColumn; import io.deephaven.engine.table.impl.select.VectorChunkAdapter; -import io.deephaven.engine.table.impl.util.JobScheduler; -import org.jetbrains.annotations.Nullable; - -import java.util.Arrays; -import java.util.BitSet; public class ConstantColumnLayer extends SelectOrViewColumnLayer { - private final BitSet dependencyBitSet; - private final boolean flattenedResult; - private final boolean alreadyFlattenedSources; ConstantColumnLayer( - SelectAndViewAnalyzer inner, - String name, - SelectColumn sc, - WritableColumnSource ws, - String[] deps, - ModifiedColumnSet mcsBuilder, - boolean flattenedResult, - boolean alreadyFlattenedSources) { - super(inner, name, sc, ws, null, deps, mcsBuilder); - this.dependencyBitSet = new BitSet(); - this.flattenedResult = flattenedResult; - this.alreadyFlattenedSources = alreadyFlattenedSources; - Arrays.stream(deps).mapToInt(inner::getLayerIndexFor).forEach(dependencyBitSet::set); + final SelectAndViewAnalyzer.AnalyzerContext context, + final SelectColumn sc, + final WritableColumnSource ws, + final String[] deps, + final ModifiedColumnSet mcsBuilder) { + super(context, sc, ws, null, deps, mcsBuilder); initialize(ws); } @@ -60,38 +42,17 @@ private void initialize(final WritableColumnSource writableSource) { } @Override - public void applyUpdate(final TableUpdate upstream, final RowSet toClear, final UpdateHelper helper, - final JobScheduler jobScheduler, @Nullable final LivenessNode liveResultOwner, - final SelectLayerCompletionHandler onCompletion) { - // Nothing to do at this level, but need to recurse because my inner layers might need to be called (e.g. - // because they are SelectColumnLayers) - inner.applyUpdate(upstream, toClear, helper, jobScheduler, liveResultOwner, - new SelectLayerCompletionHandler(dependencyBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - // we don't need to do anything specific here; our result value is constant - onCompletion.onLayerCompleted(getLayerIndex()); - } - }); - } - - @Override - public LogOutput append(LogOutput logOutput) { - return logOutput.append("{ConstantColumnLayer: ").append(selectColumn.toString()).append("}"); + public boolean hasRefreshingLogic() { + return false; } @Override - public boolean flattenedResult() { - return flattenedResult; + boolean allowCrossColumnParallelization() { + return true; } @Override - public boolean alreadyFlattenedSources() { - return alreadyFlattenedSources; - } - - @Override - public boolean allowCrossColumnParallelization() { - return inner.allowCrossColumnParallelization(); + public LogOutput append(LogOutput logOutput) { + return logOutput.append("{ConstantColumnLayer: ").append(selectColumn.toString()).append("}"); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java index 67fa89b424a..7d00107d17e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/DependencyLayerBase.java @@ -3,7 +3,6 @@ // package io.deephaven.engine.table.impl.select.analyzers; -import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.vector.Vector; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.impl.select.SelectColumn; @@ -11,75 +10,38 @@ import java.util.*; -public abstract class DependencyLayerBase extends SelectAndViewAnalyzer { - final SelectAndViewAnalyzer inner; +public abstract class DependencyLayerBase extends SelectAndViewAnalyzer.Layer { final String name; final SelectColumn selectColumn; final boolean selectColumnHoldsVector; final ColumnSource columnSource; - // probably don't need this any more - private final String[] dependencies; final ModifiedColumnSet myModifiedColumnSet; - - DependencyLayerBase(SelectAndViewAnalyzer inner, String name, SelectColumn selectColumn, - ColumnSource columnSource, - String[] dependencies, ModifiedColumnSet mcsBuilder) { - super(inner.getLayerIndex() + 1); - this.inner = inner; - this.name = name; + final BitSet myLayerDependencySet; + + DependencyLayerBase( + final SelectAndViewAnalyzer.AnalyzerContext context, + final SelectColumn selectColumn, + final ColumnSource columnSource, + final String[] dependencies, + final ModifiedColumnSet mcsBuilder) { + super(context.getNextLayerIndex()); + this.name = selectColumn.getName(); this.selectColumn = selectColumn; selectColumnHoldsVector = Vector.class.isAssignableFrom(selectColumn.getReturnedType()); this.columnSource = columnSource; - this.dependencies = dependencies; - final Set remainingDepsToSatisfy = new HashSet<>(Arrays.asList(dependencies)); - inner.populateModifiedColumnSetRecurse(mcsBuilder, remainingDepsToSatisfy); + context.populateParentDependenciesMCS(mcsBuilder, dependencies); this.myModifiedColumnSet = mcsBuilder; + this.myLayerDependencySet = new BitSet(); + context.populateLayerDependencySet(myLayerDependencySet, dependencies); } @Override - void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy) { - // Later-defined columns override earlier-defined columns. So we satisfy column dependencies "on the way - // down" the recursion. - if (remainingDepsToSatisfy.remove(name)) { - // Caller had a dependency on us, so caller gets our dependencies - mcsBuilder.setAll(myModifiedColumnSet); - } - inner.populateModifiedColumnSetRecurse(mcsBuilder, remainingDepsToSatisfy); - } - - @Override - final Map> calcDependsOnRecurse(boolean forcePublishAllResources) { - final Map> result = inner.calcDependsOnRecurse(forcePublishAllResources); - final Set thisResult = new HashSet<>(); - for (final String dep : dependencies) { - final Set innerDependencies = result.get(dep); - if (innerDependencies == null) { - // There are no further expansions of 'dep', so add it as a dependency. - thisResult.add(dep); - } else { - // Instead of adding 'dep', add what 'dep' expands to. - thisResult.addAll(innerDependencies); - } - } - result.put(name, thisResult); - return result; - } - - @Override - public SelectAndViewAnalyzer getInner() { - return inner; - } - - @Override - int getLayerIndexFor(String column) { - if (name.equals(column)) { - return getLayerIndex(); - } - return inner.getLayerIndexFor(column); + Set getLayerColumnNames() { + return Set.of(name); } @Override - void setBaseBits(BitSet bitset) { - inner.setBaseBits(bitset); + public ModifiedColumnSet getModifiedColumnSet() { + return myModifiedColumnSet; } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java index 8d687cdc8c8..9b0f4b690f7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/PreserveColumnLayer.java @@ -4,69 +4,41 @@ package io.deephaven.engine.table.impl.select.analyzers; import io.deephaven.base.log.LogOutput; -import io.deephaven.engine.liveness.LivenessNode; -import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.impl.select.SelectColumn; import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.table.impl.util.JobScheduler; -import org.jetbrains.annotations.Nullable; -import java.util.Arrays; -import java.util.BitSet; import java.util.Map; /** * A layer that copies a column from our input to our output. - * + *

* {@implNote This class is part of the Deephaven engine, and not intended for direct use.} */ final public class PreserveColumnLayer extends DependencyLayerBase { - private final BitSet dependencyBitSet; - PreserveColumnLayer(SelectAndViewAnalyzer inner, String name, SelectColumn sc, ColumnSource cs, String[] deps, - ModifiedColumnSet mcsBuilder) { - super(inner, name, sc, cs, deps, mcsBuilder); - this.dependencyBitSet = new BitSet(); - Arrays.stream(deps).mapToInt(inner::getLayerIndexFor).forEach(dependencyBitSet::set); + PreserveColumnLayer( + final SelectAndViewAnalyzer.AnalyzerContext context, + final SelectColumn sc, + final ColumnSource cs, + final String[] deps, + final ModifiedColumnSet mcsBuilder) { + super(context, sc, cs, deps, mcsBuilder); } @Override - public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler, - @Nullable LivenessNode liveResultOwner, SelectLayerCompletionHandler onCompletion) { - // Nothing to do at this level, but need to recurse because my inner layers might need to be called (e.g. - // because they are SelectColumnLayers) - inner.applyUpdate(upstream, toClear, helper, jobScheduler, liveResultOwner, - new SelectLayerCompletionHandler(dependencyBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - // we don't need to do anything specific here - onCompletion.onLayerCompleted(getLayerIndex()); - } - }); + public boolean hasRefreshingLogic() { + return false; } @Override - Map> getColumnSourcesRecurse(GetMode mode) { - // our column is not a new column, so we need to make sure that we do not double enable previous tracking - final Map> result = inner.getColumnSourcesRecurse(mode); - switch (mode) { - case New: - // we have no new sources - break; - case Published: - case All: - result.put(name, columnSource); - break; - } - return result; + void populateColumnSources(final Map> result) { + result.put(name, columnSource); } @Override - public void startTrackingPrev() { - // nothing to do, here but the inner needs to be called - inner.startTrackingPrev(); + boolean allowCrossColumnParallelization() { + return true; } @Override @@ -74,24 +46,4 @@ public LogOutput append(LogOutput logOutput) { return logOutput.append("{PreserveColumnLayer: ").append(name).append(", layerIndex=").append(getLayerIndex()) .append("}"); } - - @Override - public boolean flattenedResult() { - // preserve layer is only flattened if the inner is flattened - // the "flattenedResult" means that we are flattening the table as part of select. For a pre-existing column, we - // could not preserve a layer while flattening, but if we are preserving a newly generated column; it is valid - // for the result to have been flattened as part of select. - return inner.flattenedResult(); - } - - @Override - public boolean alreadyFlattenedSources() { - // a preserve layer is only already flattened if the inner is already flattened - return inner.alreadyFlattenedSources(); - } - - @Override - public boolean allowCrossColumnParallelization() { - return inner.allowCrossColumnParallelization(); - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/RedirectionLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/RedirectionLayer.java index 8a33acfa00d..46fe6ea4c31 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/RedirectionLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/RedirectionLayer.java @@ -4,82 +4,83 @@ package io.deephaven.engine.table.impl.select.analyzers; import io.deephaven.base.log.LogOutput; -import io.deephaven.base.verify.Assert; import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.table.TableUpdate; -import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.ModifiedColumnSet; +import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.util.*; import io.deephaven.util.mutable.MutableLong; import org.apache.commons.lang3.mutable.MutableObject; import org.jetbrains.annotations.Nullable; import java.util.*; +import java.util.function.Consumer; /** * A layer that maintains the row redirection for future SelectColumnLayers. - * + *

* {@implNote This class is part of the Deephaven engine, and not intended for direct use.} */ -public final class RedirectionLayer extends SelectAndViewAnalyzer { - private final SelectAndViewAnalyzer inner; +public final class RedirectionLayer extends SelectAndViewAnalyzer.Layer { private final TrackingRowSet resultRowSet; private final WritableRowRedirection rowRedirection; private final WritableRowSet freeValues = RowSetFactory.empty(); + private final BitSet layerDependencySet = new BitSet(); private long maxInnerIndex; - RedirectionLayer(SelectAndViewAnalyzer inner, TrackingRowSet resultRowSet, WritableRowRedirection rowRedirection) { - super(REDIRECTION_LAYER_INDEX); - Assert.eq(inner.getLayerIndex(), "inner.getLayerIndex()", BASE_LAYER_INDEX); - this.inner = inner; + RedirectionLayer( + final SelectAndViewAnalyzer.AnalyzerContext context, + final TrackingRowSet resultRowSet, + final WritableRowRedirection rowRedirection) { + super(context.getNextLayerIndex()); this.resultRowSet = resultRowSet; this.rowRedirection = rowRedirection; this.maxInnerIndex = -1; } @Override - int getLayerIndexFor(String column) { - // Result columns' applyUpdate depend on the result of the redirection. - Assert.eq(inner.getLayerIndexFor(column), "inner.getLayerIndexFor(column)", BASE_LAYER_INDEX); - return REDIRECTION_LAYER_INDEX; + Set getLayerColumnNames() { + return Set.of(); } @Override - void setBaseBits(BitSet bitset) { - inner.setBaseBits(bitset); - bitset.set(REDIRECTION_LAYER_INDEX); + void populateColumnSources(final Map> result) { + // we don't generate any column sources, so we don't need to do anything here } @Override - public void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy) { - inner.populateModifiedColumnSetRecurse(mcsBuilder, remainingDepsToSatisfy); + ModifiedColumnSet getModifiedColumnSet() { + return ModifiedColumnSet.EMPTY; } @Override - public Map> getColumnSourcesRecurse(GetMode mode) { - return inner.getColumnSourcesRecurse(mode); + BitSet getLayerDependencySet() { + return layerDependencySet; } @Override - public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler, - @Nullable LivenessNode liveResultOwner, SelectLayerCompletionHandler onCompletion) { - final BitSet baseLayerBitSet = new BitSet(); - inner.setBaseBits(baseLayerBitSet); - inner.applyUpdate(upstream, toClear, helper, jobScheduler, liveResultOwner, - new SelectLayerCompletionHandler(baseLayerBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - // we only have a base layer underneath us, so we do not care about the bitSet; it is always - // empty - doApplyUpdate(upstream, toClear, helper, onCompletion); - } - }); + boolean allowCrossColumnParallelization() { + return true; + } + + @Override + public Runnable createUpdateHandler( + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + final JobScheduler jobScheduler, + @Nullable final LivenessNode liveResultOwner, + final Runnable onSuccess, + final Consumer onError) { + // note that we process this layer directly because all subsequent layers depend on it + return () -> doApplyUpdate(upstream, onSuccess); } - private void doApplyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, - SelectLayerCompletionHandler onCompletion) { + private void doApplyUpdate( + final TableUpdate upstream, + final Runnable onSuccess) { // we need to remove the removed values from our row redirection, and add them to our free RowSet; so that // updating tables will not consume more space over the course of a day for abandoned rows final RowSetBuilderRandom innerToFreeBuilder = RowSetFactory.builderRandom(); @@ -150,32 +151,16 @@ private void doApplyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper he freeValues.removeRange(0, lastAllocated.get()); } - onCompletion.onLayerCompleted(REDIRECTION_LAYER_INDEX); - } - - @Override - public Map> calcDependsOnRecurse(boolean forcePublishAllResources) { - return inner.calcDependsOnRecurse(forcePublishAllResources); - } - - @Override - public SelectAndViewAnalyzer getInner() { - return inner; + onSuccess.run(); } @Override public void startTrackingPrev() { rowRedirection.startTrackingPrevValues(); - inner.startTrackingPrev(); } @Override public LogOutput append(LogOutput logOutput) { return logOutput.append("{RedirectionLayer").append(", layerIndex=").append(getLayerIndex()).append("}"); } - - @Override - public boolean allowCrossColumnParallelization() { - return inner.allowCrossColumnParallelization(); - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java index 099d70d4eb6..6efaf2425b2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java @@ -3,46 +3,60 @@ // package io.deephaven.engine.table.impl.select.analyzers; +import gnu.trove.map.TObjectIntMap; +import gnu.trove.map.hash.TObjectIntHashMap; import io.deephaven.base.Pair; +import io.deephaven.base.log.LogOutput; import io.deephaven.base.log.LogOutputAppendable; +import io.deephaven.base.verify.Assert; import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.rowset.RowSetShiftData; import io.deephaven.engine.rowset.TrackingRowSet; import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.MatchPair; import io.deephaven.engine.table.impl.QueryCompilerRequestProcessor; import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.table.impl.ShiftedColumnsFactory; +import io.deephaven.engine.table.impl.TableUpdateImpl; import io.deephaven.engine.table.impl.select.FormulaColumn; import io.deephaven.engine.table.impl.select.SelectColumn; import io.deephaven.engine.table.impl.select.SourceColumn; import io.deephaven.engine.table.impl.select.SwitchColumn; import io.deephaven.engine.table.impl.sources.InMemoryColumnSource; +import io.deephaven.engine.table.impl.sources.PossiblyImmutableColumnSource; +import io.deephaven.engine.table.impl.sources.RedirectedColumnSource; import io.deephaven.engine.table.impl.sources.SingleValueColumnSource; import io.deephaven.engine.table.impl.sources.WritableRedirectedColumnSource; import io.deephaven.engine.table.impl.util.InverseWrappedRowSetRowRedirection; import io.deephaven.engine.table.impl.util.JobScheduler; import io.deephaven.engine.table.impl.util.RowRedirection; +import io.deephaven.engine.table.impl.util.WrappedRowSetRowRedirection; import io.deephaven.engine.table.impl.util.WritableRowRedirection; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.io.log.impl.LogOutputStringImpl; import io.deephaven.util.SafeCloseable; import io.deephaven.util.SafeCloseablePair; import io.deephaven.vector.Vector; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.stream.Stream; -public abstract class SelectAndViewAnalyzer implements LogOutputAppendable { +public class SelectAndViewAnalyzer implements LogOutputAppendable { private static final Consumer> NOOP = ignore -> { }; public enum Mode { VIEW_LAZY, VIEW_EAGER, SELECT_STATIC, SELECT_REFRESHING, SELECT_REDIRECTED_REFRESHING, SELECT_REDIRECTED_STATIC } + public enum UpdateFlavor { + Select, View, Update, UpdateView, LazyUpdate + } public static void initializeSelectColumns( final Map> parentColumnMap, @@ -65,26 +79,23 @@ public static void initializeSelectColumns( } } - public static SelectAndViewAnalyzerWrapper create( - QueryTable sourceTable, Mode mode, Map> columnSources, - TrackingRowSet rowSet, ModifiedColumnSet parentMcs, boolean publishTheseSources, boolean useShiftedColumns, - SelectColumn... selectColumns) { - return create(sourceTable, mode, columnSources, rowSet, parentMcs, publishTheseSources, useShiftedColumns, - true, selectColumns); - } - - public static SelectAndViewAnalyzerWrapper create( - final QueryTable sourceTable, + public static AnalyzerContext createContext( + final QueryTable parentTable, final Mode mode, - final Map> columnSources, - TrackingRowSet rowSet, - final ModifiedColumnSet parentMcs, - final boolean publishTheseSources, + final boolean publishParentSources, boolean useShiftedColumns, - final boolean allowInternalFlatten, final SelectColumn... selectColumns) { - final UpdateGraph updateGraph = sourceTable.getUpdateGraph(); - SelectAndViewAnalyzer analyzer = createBaseLayer(columnSources, publishTheseSources); + final UpdateGraph updateGraph = parentTable.getUpdateGraph(); + + final Map> columnSources = parentTable.getColumnSourceMap(); + final TrackingRowSet rowSet = parentTable.getRowSet(); + + final boolean parentIsFlat = parentTable.isFlat(); + final boolean flatResult = !parentIsFlat + && (columnSources.isEmpty() || !publishParentSources) + && mode == Mode.SELECT_STATIC; + final AnalyzerContext context = new AnalyzerContext(parentTable, publishParentSources, flatResult); + final Map> columnDefinitions = new LinkedHashMap<>(); final RowRedirection rowRedirection; if (mode == Mode.SELECT_REDIRECTED_STATIC) { @@ -92,19 +103,12 @@ public static SelectAndViewAnalyzerWrapper create( } else if (mode == Mode.SELECT_REDIRECTED_REFRESHING && rowSet.size() < Integer.MAX_VALUE) { final WritableRowRedirection writableRowRedirection = WritableRowRedirection.FACTORY.createRowRedirection(rowSet.intSize()); - analyzer = analyzer.createRedirectionLayer(rowSet, writableRowRedirection); + context.addLayer(new RedirectionLayer(context, rowSet, writableRowRedirection)); rowRedirection = writableRowRedirection; } else { rowRedirection = null; } - List processedCols = new LinkedList<>(); - List remainingCols = null; - FormulaColumn shiftColumn = null; - boolean shiftColumnHasPositiveOffset = false; - - final HashSet resultColumns = new HashSet<>(); - // First pass to initialize all columns and to compile formulas in one batch. final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor = QueryCompilerRequestProcessor.batch(); for (Map.Entry> entry : columnSources.entrySet()) { @@ -114,9 +118,10 @@ public static SelectAndViewAnalyzerWrapper create( columnDefinitions.put(name, cd); } + final Set resultColumnNames = new HashSet<>(); for (final SelectColumn sc : selectColumns) { - if (remainingCols != null) { - remainingCols.add(sc); + if (context.remainingCols != null) { + context.remainingCols.add(sc); continue; } @@ -126,55 +131,50 @@ public static SelectAndViewAnalyzerWrapper create( columnDefinitions.put(sc.getName(), cd); if (useShiftedColumns && hasConstantArrayAccess(sc)) { - remainingCols = new LinkedList<>(); - shiftColumn = sc instanceof FormulaColumn + context.remainingCols = new LinkedList<>(); + context.shiftColumn = sc instanceof FormulaColumn ? (FormulaColumn) sc : (FormulaColumn) ((SwitchColumn) sc).getRealColumn(); - shiftColumnHasPositiveOffset = hasPositiveOffsetConstantArrayAccess(sc); + context.shiftColumnHasPositiveOffset = hasPositiveOffsetConstantArrayAccess(sc); continue; } - processedCols.add(sc); + // In our first pass, determine whether any columns will be preserved so that we don't prematurely flatten. + final SourceColumn realColumn = tryToGetSourceColumn(sc); + + if (realColumn != null && !resultColumnNames.contains(realColumn.getSourceName())) { + // if we are preserving a column, then we cannot change key space + context.flatResult &= !shouldPreserve(columnSources.get(realColumn.getSourceName())); + } + + // TODO (deephaven#5760): If layers may define more than one column, we'll need to add all of them here. + resultColumnNames.add(sc.getName()); + + context.processedCols.add(sc); } compilationProcessor.compile(); // Second pass builds the analyzer and destination columns - final TrackingRowSet originalRowSet = rowSet; - boolean flatResult = rowSet.isFlat(); - // if we preserve a column, we set this to false - boolean flattenedResult = !flatResult - && allowInternalFlatten - && (columnSources.isEmpty() || !publishTheseSources) - && mode == Mode.SELECT_STATIC; - int numberOfInternallyFlattenedColumns = 0; - final HashMap> resultAlias = new HashMap<>(); - for (final SelectColumn sc : processedCols) { - - sc.initInputs(rowSet, analyzer.getAllColumnSources()); + for (final SelectColumn sc : context.processedCols) { - // When flattening the result, intermediate columns generate results in position space. When we discover - // that a select column depends on an intermediate result, then we must flatten all parent columns so - // that all dependent columns are in the same result-key space. - if (!flatResult && flattenedResult && Stream.concat(sc.getColumns().stream(), sc.getColumnArrays().stream()) - .anyMatch(resultColumns::contains)) { - analyzer = analyzer.createStaticFlattenLayer(rowSet); - rowSet = RowSetFactory.flat(rowSet.size()).toTracking(); - flatResult = true; + // if this select column depends on result column then its updates must happen in result-key-space + // note: if flatResult is true then we are not preserving any parent columns + final boolean useResultKeySpace = context.flatResult + && Stream.concat(sc.getColumns().stream(), sc.getColumnArrays().stream()) + .anyMatch(columnName -> context.getLayerIndexFor(columnName) != Layer.PARENT_TABLE_INDEX); - // we must re-initialize the column inputs as they may have changed post-flatten - sc.initInputs(rowSet, analyzer.getAllColumnSources()); - } + sc.initInputs(rowSet, useResultKeySpace ? context.allSourcesInResultKeySpace : context.allSources); - resultColumns.add(sc.getName()); - // this shadows any known alias + // TODO (deephaven-core#5760): If layers may define more than one column, we'll need to fix resultAlias. + // new columns shadow known aliases resultAlias.remove(sc.getName()); final Stream allDependencies = Stream.concat(sc.getColumns().stream(), sc.getColumnArrays().stream()); final String[] distinctDeps = allDependencies.distinct().toArray(String[]::new); - final ModifiedColumnSet mcsBuilder = new ModifiedColumnSet(parentMcs); + final ModifiedColumnSet mcsBuilder = new ModifiedColumnSet(parentTable.getModifiedColumnSetForUpdates()); if (useShiftedColumns && hasConstantArrayAccess(sc)) { // we use the first shifted column to split between processed columns and remaining columns @@ -183,104 +183,78 @@ public static SelectAndViewAnalyzerWrapper create( // shifted columns appear to not be safe for refresh, so we do not validate them until they are rewritten // using the intermediary shifted column - if (sourceTable.isRefreshing()) { - sc.validateSafeForRefresh(sourceTable); + if (parentTable.isRefreshing()) { + sc.validateSafeForRefresh(parentTable); } if (hasConstantValue(sc)) { final WritableColumnSource constViewSource = SingleValueColumnSource.getSingleValueColumnSource(sc.getReturnedType()); - analyzer = analyzer.createLayerForConstantView( - sc.getName(), sc, constViewSource, distinctDeps, mcsBuilder, flattenedResult, - flatResult && flattenedResult); + context.addLayer(new ConstantColumnLayer(context, sc, constViewSource, distinctDeps, mcsBuilder)); continue; } - final SourceColumn realColumn; - if (sc instanceof SourceColumn) { - realColumn = (SourceColumn) sc; - } else if ((sc instanceof SwitchColumn) && ((SwitchColumn) sc).getRealColumn() instanceof SourceColumn) { - realColumn = (SourceColumn) ((SwitchColumn) sc).getRealColumn(); - } else { - realColumn = null; - } - - if (realColumn != null && shouldPreserve(sc)) { - boolean sourceIsNew = resultColumns.contains(realColumn.getSourceName()); - if (!sourceIsNew) { - if (numberOfInternallyFlattenedColumns > 0) { - // we must preserve this column, but have already created an analyzer for the internally - // flattened column, therefore must start over without permitting internal flattening - return create(sourceTable, mode, columnSources, originalRowSet, parentMcs, publishTheseSources, - useShiftedColumns, false, selectColumns); - } else { - // we can not flatten future columns because we are preserving a column that may not be flat - flattenedResult = false; - } - } - - analyzer = analyzer.createLayerForPreserve( - sc.getName(), sc, sc.getDataView(), distinctDeps, mcsBuilder); - - continue; - } - - // look for an existing alias that can be preserved instead + final SourceColumn realColumn = tryToGetSourceColumn(sc); if (realColumn != null) { + if (shouldPreserve(sc.getDataView())) { + context.addLayer(new PreserveColumnLayer(context, sc, sc.getDataView(), distinctDeps, mcsBuilder)); + continue; + } + // look for an existing alias that can be preserved instead final ColumnSource alias = resultAlias.get(realColumn.getSourceName()); if (alias != null) { - analyzer = analyzer.createLayerForPreserve(sc.getName(), sc, alias, distinctDeps, mcsBuilder); + context.addLayer(new PreserveColumnLayer(context, sc, alias, distinctDeps, mcsBuilder)); continue; } } - // if this is a source column, then results are eligible for aliasing + // if this is a SourceColumn, then results are eligible for aliasing final Consumer> maybeCreateAlias = realColumn == null ? NOOP : cs -> resultAlias.put(realColumn.getSourceName(), cs); final long targetDestinationCapacity = - rowSet.isEmpty() ? 0 : (flattenedResult ? rowSet.size() : rowSet.lastRowKey() + 1); + rowSet.isEmpty() ? 0 : (context.flatResult ? rowSet.size() : rowSet.lastRowKey() + 1); switch (mode) { case VIEW_LAZY: { final ColumnSource viewCs = sc.getLazyView(); maybeCreateAlias.accept(viewCs); - analyzer = analyzer.createLayerForView(sc.getName(), sc, viewCs, distinctDeps, mcsBuilder); + context.addLayer(new ViewColumnLayer(context, sc, viewCs, distinctDeps, mcsBuilder)); break; } case VIEW_EAGER: { final ColumnSource viewCs = sc.getDataView(); maybeCreateAlias.accept(viewCs); - analyzer = analyzer.createLayerForView(sc.getName(), sc, viewCs, distinctDeps, mcsBuilder); + context.addLayer(new ViewColumnLayer(context, sc, viewCs, distinctDeps, mcsBuilder)); break; } case SELECT_STATIC: { // We need to call newDestInstance because only newDestInstance has the knowledge to endow our // created array with the proper componentType (in the case of Vectors). - final WritableColumnSource scs = - flatResult || flattenedResult ? sc.newFlatDestInstance(targetDestinationCapacity) - : sc.newDestInstance(targetDestinationCapacity); + final WritableColumnSource scs = parentIsFlat || context.flatResult + ? sc.newFlatDestInstance(targetDestinationCapacity) + : sc.newDestInstance(targetDestinationCapacity); + maybeSetStaticColumnSourceImmutable(scs); maybeCreateAlias.accept(scs); - analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, null, - distinctDeps, mcsBuilder, false, flattenedResult, flatResult && flattenedResult); - if (flattenedResult) { - numberOfInternallyFlattenedColumns++; - } + context.addLayer(new SelectColumnLayer( + updateGraph, rowSet, context, sc, scs, null, distinctDeps, mcsBuilder, false, + useResultKeySpace)); break; } case SELECT_REDIRECTED_STATIC: { final WritableColumnSource underlyingSource = sc.newDestInstance(rowSet.size()); final WritableColumnSource scs = WritableRedirectedColumnSource.maybeRedirect( rowRedirection, underlyingSource, rowSet.size()); + maybeSetStaticColumnSourceImmutable(scs); maybeCreateAlias.accept(scs); - analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, - underlyingSource, distinctDeps, mcsBuilder, true, false, false); + context.addLayer(new SelectColumnLayer( + updateGraph, rowSet, context, sc, scs, underlyingSource, distinctDeps, mcsBuilder, true, + useResultKeySpace)); break; } case SELECT_REDIRECTED_REFRESHING: case SELECT_REFRESHING: { // We need to call newDestInstance because only newDestInstance has the knowledge to endow our // created array with the proper componentType (in the case of Vectors). - // TODO: use DeltaAwareColumnSource WritableColumnSource scs = sc.newDestInstance(targetDestinationCapacity); WritableColumnSource underlyingSource = null; if (rowRedirection != null) { @@ -289,8 +263,9 @@ public static SelectAndViewAnalyzerWrapper create( rowRedirection, underlyingSource, rowSet.intSize()); } maybeCreateAlias.accept(scs); - analyzer = analyzer.createLayerForSelect(updateGraph, rowSet, sc.getName(), sc, scs, - underlyingSource, distinctDeps, mcsBuilder, rowRedirection != null, false, false); + context.addLayer(new SelectColumnLayer( + updateGraph, rowSet, context, sc, scs, underlyingSource, distinctDeps, mcsBuilder, + rowRedirection != null, useResultKeySpace)); break; } default: @@ -298,8 +273,25 @@ public static SelectAndViewAnalyzerWrapper create( } } - return new SelectAndViewAnalyzerWrapper(analyzer, shiftColumn, shiftColumnHasPositiveOffset, remainingCols, - processedCols); + return context; + } + + private static void maybeSetStaticColumnSourceImmutable(final ColumnSource columnSource) { + if (columnSource instanceof PossiblyImmutableColumnSource) { + ((PossiblyImmutableColumnSource) columnSource).setImmutable(); + } + } + + private static @Nullable SourceColumn tryToGetSourceColumn(final SelectColumn sc) { + final SourceColumn realColumn; + if (sc instanceof SourceColumn) { + realColumn = (SourceColumn) sc; + } else if ((sc instanceof SwitchColumn) && ((SwitchColumn) sc).getRealColumn() instanceof SourceColumn) { + realColumn = (SourceColumn) ((SwitchColumn) sc).getRealColumn(); + } else { + realColumn = null; + } + return realColumn; } private static boolean hasConstantArrayAccess(final SelectColumn sc) { @@ -343,110 +335,439 @@ private static boolean hasConstantValue(final SelectColumn sc) { return false; } - private static boolean shouldPreserve(final SelectColumn sc) { - // we already know sc is a SourceColumn or switches to a SourceColumn - final ColumnSource sccs = sc.getDataView(); - return sccs instanceof InMemoryColumnSource && ((InMemoryColumnSource) sccs).isInMemory() - && !Vector.class.isAssignableFrom(sc.getReturnedType()); + private static boolean shouldPreserve(final ColumnSource columnSource) { + return columnSource instanceof InMemoryColumnSource && ((InMemoryColumnSource) columnSource).isInMemory() + && !Vector.class.isAssignableFrom(columnSource.getType()); } - static final int BASE_LAYER_INDEX = 0; - static final int REDIRECTION_LAYER_INDEX = 1; + /** The layers that make up this analyzer. */ + private final Layer[] layers; - /** - * The layerIndex is used to identify each layer uniquely within the bitsets for completion. - */ - private final int layerIndex; + /** Whether the result should be flat. */ + private final boolean flatResult; - public SelectAndViewAnalyzer(int layerIndex) { - this.layerIndex = layerIndex; - } + private final BitSet requiredLayers = new BitSet(); + private final BitSet remainingLayers = new BitSet(); - int getLayerIndex() { - return layerIndex; + private SelectAndViewAnalyzer( + final Layer[] layers, + final boolean flatResult) { + this.layers = layers; + this.flatResult = flatResult; + for (final Layer layer : layers) { + if (layer.hasRefreshingLogic()) { + requiredLayers.set(layer.getLayerIndex()); + } else { + this.layers[layer.getLayerIndex()] = null; + } + } } - /** - * Set the bits in bitset that represent the base layer and optional redirection layer. No other jobs can be - * executed until all of these bits are set. - * - * @param bitset the bitset to manipulate. - */ - abstract void setBaseBits(BitSet bitset); + public final static class AnalyzerContext { - /** - * Set the bits in bitset that represent all the new columns. This is used to identify when the select or update - * operation is complete. - * - * @param bitset the bitset to manipulate. - */ - public void setAllNewColumns(BitSet bitset) { - getInner().setAllNewColumns(bitset); - bitset.set(getLayerIndex()); - } + /** The analyzer that we are building. */ + private final List layers = new ArrayList<>(); + /** + * The sources that are available to the analyzer, including parent columns. Parent columns are in parent key + * space, others are in result key space. + */ + private final Map> allSources = new LinkedHashMap<>(); + /** The sources that are available to the analyzer, including parent columns, in result key space. */ + private final Map> allSourcesInResultKeySpace; + /** The sources that are published to the child table. */ + private final Map> publishedSources = new LinkedHashMap<>(); + /** A mapping from result column name to the layer index that created it. */ + private final TObjectIntMap columnToLayerIndex; + /** The select columns that have been processed so far. */ + private final List processedCols = new ArrayList<>(); + + /** A holder for the shift column, if any. */ + private FormulaColumn shiftColumn; + /** Whether the shift column has a positive offset. */ + private boolean shiftColumnHasPositiveOffset; + /** The columns that will need to be processed after the shift column. */ + private List remainingCols; + /** Whether the result should be flat. */ + private boolean flatResult; + /** The layer that will be used to process redirection, if we have one. */ + private int redirectionLayer = Layer.UNSET_INDEX; + + AnalyzerContext( + final QueryTable parentTable, + final boolean publishParentSources, + final boolean flatResult) { + final Map> parentSources = parentTable.getColumnSourceMap(); + columnToLayerIndex = new TObjectIntHashMap<>(parentSources.size(), 0.5f, Layer.UNSET_INDEX); + + this.flatResult = flatResult; + + allSources.putAll(parentSources); + for (final String columnName : allSources.keySet()) { + columnToLayerIndex.put(columnName, Layer.PARENT_TABLE_INDEX); + } - private static SelectAndViewAnalyzer createBaseLayer(Map> sources, - boolean publishTheseSources) { - return new BaseLayer(sources, publishTheseSources); - } + if (publishParentSources) { + publishedSources.putAll(parentSources); + } - private RedirectionLayer createRedirectionLayer(TrackingRowSet resultRowSet, - WritableRowRedirection rowRedirection) { - return new RedirectionLayer(this, resultRowSet, rowRedirection); - } + if (!flatResult) { + // result key space is the same as parent key space + allSourcesInResultKeySpace = allSources; + } else { + allSourcesInResultKeySpace = new HashMap<>(); - private StaticFlattenLayer createStaticFlattenLayer(TrackingRowSet parentRowSet) { - return new StaticFlattenLayer(this, parentRowSet); - } + final RowRedirection rowRedirection = new WrappedRowSetRowRedirection(parentTable.getRowSet()); + allSources.forEach((name, cs) -> allSourcesInResultKeySpace.put(name, + RedirectedColumnSource.maybeRedirect(rowRedirection, cs))); + } + } - private SelectAndViewAnalyzer createLayerForSelect( - UpdateGraph updateGraph, RowSet parentRowset, String name, SelectColumn sc, WritableColumnSource cs, - WritableColumnSource underlyingSource, String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder, - boolean isRedirected, boolean flattenResult, boolean alreadyFlattened) { - return new SelectColumnLayer(updateGraph, parentRowset, this, name, sc, cs, underlyingSource, - parentColumnDependencies, - mcsBuilder, isRedirected, flattenResult, alreadyFlattened); - } + /** + * Add a layer to the analyzer. + * + * @param layer the layer to add + */ + void addLayer(final Layer layer) { + if (layer instanceof RedirectionLayer) { + if (redirectionLayer != Layer.UNSET_INDEX) { + throw new IllegalStateException("Cannot have more than one redirection layer"); + } + redirectionLayer = layers.size(); + } - private SelectAndViewAnalyzer createLayerForConstantView(String name, SelectColumn sc, WritableColumnSource cs, - String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder, boolean flattenResult, - boolean alreadyFlattened) { - return new ConstantColumnLayer(this, name, sc, cs, parentColumnDependencies, mcsBuilder, flattenResult, - alreadyFlattened); - } + layer.populateColumnSources(allSources); + if (flatResult) { + layer.populateColumnSources(allSourcesInResultKeySpace); + } + layer.populateColumnSources(publishedSources); - private SelectAndViewAnalyzer createLayerForView(String name, SelectColumn sc, ColumnSource cs, - String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder) { - return new ViewColumnLayer(this, name, sc, cs, parentColumnDependencies, mcsBuilder); - } + layers.add(layer); - private SelectAndViewAnalyzer createLayerForPreserve(String name, SelectColumn sc, ColumnSource cs, - String[] parentColumnDependencies, ModifiedColumnSet mcsBuilder) { - return new PreserveColumnLayer(this, name, sc, cs, parentColumnDependencies, mcsBuilder); - } + for (final String columnName : layer.getLayerColumnNames()) { + columnToLayerIndex.put(columnName, layer.getLayerIndex()); + } + } - abstract void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy); + /** + * @return the next layerIndex to use + */ + int getNextLayerIndex() { + return layers.size(); + } - enum GetMode { - All, New, Published - } + /** + * Return the layerIndex for a given string column. + * + * @param column the name of the column + * + * @return the layerIndex + */ + int getLayerIndexFor(String column) { + final int layerIndex = columnToLayerIndex.get(column); + if (layerIndex == Layer.UNSET_INDEX) { + throw new IllegalStateException("Column " + column + " not found in any layer of the analyzer"); + } + return layerIndex; + } - public final Map> getAllColumnSources() { - return getColumnSourcesRecurse(GetMode.All); - } + /** + * Populate the ModifiedColumnSet with all indirect/direct dependencies on the parent table. + * + * @param mcsBuilder the result ModifiedColumnSet to populate + * @param dependencies the immediate dependencies + */ + void populateParentDependenciesMCS( + final ModifiedColumnSet mcsBuilder, + final String[] dependencies) { + for (final String dep : dependencies) { + final int layerIndex = getLayerIndexFor(dep); + if (layerIndex == Layer.PARENT_TABLE_INDEX) { + // this is a preserved parent column + mcsBuilder.setAll(dep); + } else { + mcsBuilder.setAll(layers.get(layerIndex).getModifiedColumnSet()); + } + } + } - public final Map> getNewColumnSources() { - return getColumnSourcesRecurse(GetMode.New); - } + /** + * Populate the layer dependency set with the layer indices that the dependencies are in. + * + * @param layerDependencySet the result bitset to populate + * @param dependencies the dependencies + */ + void populateLayerDependencySet( + final BitSet layerDependencySet, + final String[] dependencies) { + for (final String dep : dependencies) { + final int layerIndex = getLayerIndexFor(dep); + if (layerIndex != Layer.PARENT_TABLE_INDEX) { + // note that implicitly preserved columns do not belong to a layer. + layerDependencySet.or(layers.get(layerIndex).getLayerDependencySet()); + } + } + } + + /** + * Set the redirection layer in the bitset if the analyzer has any redirection. + * + * @param layerDependencies the result bitset to populate + */ + void setRedirectionLayer(final BitSet layerDependencies) { + if (redirectionLayer != Layer.UNSET_INDEX) { + layerDependencies.set(redirectionLayer); + } + } + + /** + * @return the column sources that are published through the child table + */ + public Map> getPublishedColumnSources() { + // Note that if we have a shift column that we forcibly publish all columns. + return shiftColumn == null ? publishedSources : allSources; + } + + /** + * @return the final analyzer + */ + public SelectAndViewAnalyzer createAnalyzer() { + return new SelectAndViewAnalyzer(layers.toArray(Layer[]::new), flatResult); + } + + /** + * @return which select columns were included in the result (not including the shift, or post-shift, columns) + */ + public List getProcessedColumns() { + return processedCols; + } + + /** + * @return whether the result should be flat + */ + public boolean isFlatResult() { + return flatResult; + } + + /** + * Our job here is to calculate the effects: a map from incoming column to a list of columns that it effects. We + * do this in two stages. In the first stage we create a map from column to (set of dependent columns). In the + * second stage we reverse that map. + * + * @return the effects map + */ + public Map calcEffects() { + final Map> resultMap = getPublishedColumnSources(); + + // Create the mapping from result column to dependent source columns. + final Map dependsOn = new HashMap<>(); + for (final String columnName : resultMap.keySet()) { + final int layerIndex = getLayerIndexFor(columnName); + final String[] dependencies; + if (layerIndex == Layer.PARENT_TABLE_INDEX) { + dependencies = new String[] {columnName}; + } else { + dependencies = layers.get(layerIndex).getModifiedColumnSet().dirtyColumnNames(); + } + dependsOn.put(columnName, dependencies); + } + + // Now create the mapping from source column to result columns. + final Map> effects = new HashMap<>(); + for (Map.Entry entry : dependsOn.entrySet()) { + final String depender = entry.getKey(); + for (final String dependee : entry.getValue()) { + effects.computeIfAbsent(dependee, dummy -> new ArrayList<>()).add(depender); + } + } + + // Convert effects type into result type + final Map result = new HashMap<>(); + for (Map.Entry> entry : effects.entrySet()) { + final String[] value = entry.getValue().toArray(String[]::new); + result.put(entry.getKey(), value); + } + return result; + } + + /** + * Shift columns introduce intermediary table operations. This method applies remaining work to the result built + * so far. + * + * @param parentTable the source table + * @param resultSoFar the intermediate result + * @param updateFlavor the update flavor + * @return the final result + */ + public QueryTable applyShiftsAndRemainingColumns( + @NotNull final QueryTable parentTable, + @NotNull QueryTable resultSoFar, + final UpdateFlavor updateFlavor) { + if (shiftColumn != null) { + resultSoFar = (QueryTable) ShiftedColumnsFactory.getShiftedColumnsTable( + resultSoFar, shiftColumn, updateFlavor); + } + + // shift columns may introduce modifies that are not present in the original table; set these before using + if (parentTable.isRefreshing()) { + if (shiftColumn == null && parentTable.isAddOnly()) { + resultSoFar.setAttribute(Table.ADD_ONLY_TABLE_ATTRIBUTE, true); + } + if ((shiftColumn == null || !shiftColumnHasPositiveOffset) && parentTable.isAppendOnly()) { + // note if the shift offset is non-positive, then this result is still append-only + resultSoFar.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true); + } + if (parentTable.hasAttribute(Table.TEST_SOURCE_TABLE_ATTRIBUTE)) { + // be convenient for test authors by propagating the test source table attribute + resultSoFar.setAttribute(Table.TEST_SOURCE_TABLE_ATTRIBUTE, true); + } + if (parentTable.isBlink()) { + // blink tables, although possibly not useful, can have shift columns + resultSoFar.setAttribute(Table.BLINK_TABLE_ATTRIBUTE, true); + } + } - public final Map> getPublishedColumnSources() { - return getColumnSourcesRecurse(GetMode.Published); + boolean isMultiStateSelect = shiftColumn != null || remainingCols != null; + if (isMultiStateSelect && (updateFlavor == UpdateFlavor.Select || updateFlavor == UpdateFlavor.View)) { + List newResultColumns = new LinkedList<>(); + for (SelectColumn processed : processedCols) { + newResultColumns.add(new SourceColumn(processed.getName())); + } + if (shiftColumn != null) { + newResultColumns.add(new SourceColumn(shiftColumn.getName())); + } + if (remainingCols != null) { + newResultColumns.addAll(remainingCols); + } + + if (updateFlavor == UpdateFlavor.Select) { + resultSoFar = (QueryTable) resultSoFar.select(newResultColumns); + } else { + resultSoFar = (QueryTable) resultSoFar.view(newResultColumns); + } + } else if (remainingCols != null) { + switch (updateFlavor) { + case Update: { + resultSoFar = (QueryTable) resultSoFar.update(remainingCols); + break; + } + case UpdateView: { + resultSoFar = (QueryTable) resultSoFar.updateView(remainingCols); + break; + } + case LazyUpdate: { + resultSoFar = (QueryTable) resultSoFar.lazyUpdate(remainingCols); + break; + } + default: + throw new IllegalStateException("Unexpected update flavor: " + updateFlavor); + } + } + + return resultSoFar; + } } - abstract Map> getColumnSourcesRecurse(GetMode mode); + static abstract class Layer implements LogOutputAppendable { + private static final BitSet EMPTY_BITSET = new BitSet(); + + public static final int UNSET_INDEX = -1; + public static final int PARENT_TABLE_INDEX = -2; + + /** + * The layerIndex is used to identify each layer uniquely within the bitsets for completion. + */ + private final int layerIndex; + + Layer(int layerIndex) { + this.layerIndex = layerIndex; + } + + /** + * @return which index in the layer stack this layer is + */ + int getLayerIndex() { + return layerIndex; + } + + /** + * @return whether this layer has refreshing logic and needs to be updated + */ + boolean hasRefreshingLogic() { + return true; + } + + /** + * @return the modified column set of the parent table that this layer indirectly depends on + */ + ModifiedColumnSet getModifiedColumnSet() { + return failNoRefreshingLogic(); + } + + /** + * @return the layer dependency set indicating which layers this layer depends on + */ + BitSet getLayerDependencySet() { + return EMPTY_BITSET; + } + + @Override + public String toString() { + return new LogOutputStringImpl().append(this).toString(); + } + + void startTrackingPrev() { + // default is that there is nothing to do + } + + /** + * @return the column names created by this layer + */ + abstract Set getLayerColumnNames(); + + /** + * Populate the column sources for this layer. + * + * @param result the map to populate + */ + abstract void populateColumnSources(Map> result); + + /** + * @return true if this layer allows parallelization across columns + */ + abstract boolean allowCrossColumnParallelization(); + + /** + * Apply this update to this Layer. + * + * @param upstream the upstream update + * @param toClear rows that used to exist and no longer exist + * @param helper convenience class that memoizes reusable calculations for this update + * @param jobScheduler scheduler for parallel sub-tasks + * @param liveResultOwner {@link LivenessNode node} to be used to manage/unmanage results that happen to be + * {@link io.deephaven.engine.liveness.LivenessReferent liveness referents} + * @param onSuccess called when the update completed successfully + * @param onError called when the update failed + */ + Runnable createUpdateHandler( + TableUpdate upstream, + RowSet toClear, + UpdateHelper helper, + JobScheduler jobScheduler, + @Nullable LivenessNode liveResultOwner, + Runnable onSuccess, + Consumer onError) { + return failNoRefreshingLogic(); + } + + private T failNoRefreshingLogic() { + throw new UnsupportedOperationException(String.format( + "%s does not have any refreshing logic", this.getClass().getSimpleName())); + } + } public static class UpdateHelper implements SafeCloseable { private RowSet existingRows; + private TableUpdate upstreamInResultSpace; private SafeCloseablePair shiftedWithModifies; private SafeCloseablePair shiftedWithoutModifies; @@ -458,6 +779,21 @@ public UpdateHelper(RowSet parentRowSet, TableUpdate upstream) { this.upstream = upstream; } + /** + * Flatten the upstream update from the parent key space to the destination key space. We are guaranteed to be + * in STATIC_SELECT mode. + * + * @return the flattened update + */ + TableUpdate resultKeySpaceUpdate() { + if (upstreamInResultSpace == null) { + upstreamInResultSpace = new TableUpdateImpl( + RowSetFactory.flat(upstream.added().size()), RowSetFactory.empty(), RowSetFactory.empty(), + RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY); + } + return upstreamInResultSpace; + } + private RowSet getExisting() { if (existingRows == null) { existingRows = parentRowSet.minus(upstream.added()); @@ -507,6 +843,10 @@ public void close() { shiftedWithoutModifies.close(); shiftedWithoutModifies = null; } + if (upstreamInResultSpace != null) { + upstreamInResultSpace.release(); + upstreamInResultSpace = null; + } } } @@ -519,195 +859,174 @@ public void close() { * @param jobScheduler scheduler for parallel sub-tasks * @param liveResultOwner {@link LivenessNode node} to be used to manage/unmanage results that happen to be * {@link io.deephaven.engine.liveness.LivenessReferent liveness referents} - * @param onCompletion Called when an inner column is complete. The outer layer should pass the {@code onCompletion} - */ - public abstract void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, - JobScheduler jobScheduler, @Nullable LivenessNode liveResultOwner, - SelectLayerCompletionHandler onCompletion); - - /** - * Our job here is to calculate the effects: a map from incoming column to a list of columns that it effects. We do - * this in two stages. In the first stage we create a map from column to (set of dependent columns). In the second - * stage we reverse that map. + * @param onSuccess called when the update completed successfully + * @param onError called when the update failed */ - public final Map calcEffects(boolean forcePublishAllResources) { - final Map> dependsOn = calcDependsOnRecurse(forcePublishAllResources); - - // Now create effects, which is the inverse of dependsOn: - // An entry W -> [X, Y, Z] in effects means that W affects X, Y, and Z - final Map> effects = new HashMap<>(); - for (Map.Entry> entry : dependsOn.entrySet()) { - final String depender = entry.getKey(); - for (final String dependee : entry.getValue()) { - effects.computeIfAbsent(dependee, dummy -> new ArrayList<>()).add(depender); + public void applyUpdate( + final TableUpdate upstream, + final RowSet toClear, + final UpdateHelper helper, + final JobScheduler jobScheduler, + @Nullable final LivenessNode liveResultOwner, + final Runnable onSuccess, + final Consumer onError) { + + Assert.assertion(remainingLayers.isEmpty(), "remainingLayers.isEmpty()"); + remainingLayers.or(requiredLayers); + + final Runnable[] runners = new Runnable[layers.length]; + final UpdateScheduler scheduler = new UpdateScheduler(runners, onSuccess, onError); + + for (int ii = 0; ii < layers.length; ++ii) { + final Layer layer = layers[ii]; + if (layer != null) { + // TODO (deephaven-core#4896): this error handling allows concurrent layers to fail without ensuring + // that other tasks are finished. + runners[ii] = layer.createUpdateHandler( + upstream, toClear, helper, jobScheduler, liveResultOwner, + () -> scheduler.onLayerComplete(layer.getLayerIndex()), onError); } } - // Convert effects type into result type - final Map result = new HashMap<>(); - for (Map.Entry> entry : effects.entrySet()) { - final String[] value = entry.getValue().toArray(String[]::new); - result.put(entry.getKey(), value); - } - return result; + + scheduler.tryToKickOffWork(); } - abstract Map> calcDependsOnRecurse(boolean forcePublishAllResources); + private class UpdateScheduler { + private final ReentrantLock runLock = new ReentrantLock(); - public abstract SelectAndViewAnalyzer getInner(); + private final Runnable[] runners; + private final Runnable onSuccess; + private final Consumer onError; - public abstract void startTrackingPrev(); + private volatile boolean needsRun; + /** whether we have already invoked onSuccess */ + private boolean updateComplete; - /** - * Was the result internally flattened? Only the STATIC_SELECT case flattens the result. If the result preserves any - * columns, then flattening is not permitted. Because all the other layers cannot internally flatten, the default - * implementation returns false. - */ - public boolean flattenedResult() { - return false; - } + public UpdateScheduler( + final Runnable[] runners, + final Runnable onSuccess, + final Consumer onError) { + this.runners = runners; + this.onSuccess = onSuccess; + this.onError = onError; + } - /** - * Have the column sources already been flattened? Only the STATIC_SELECT case flattens the result. A static flatten - * layer is only added if SelectColumn depends on an intermediate result. - */ - public boolean alreadyFlattenedSources() { - return false; - } + public void onLayerComplete(final int layerIndex) { + synchronized (remainingLayers) { + remainingLayers.set(layerIndex, false); + } - /** - * Return the layerIndex for a given string column. - * - *

- * This is executed recursively, because later columns in a select statement hide earlier columns. - *

- * - * @param column the name of the column - * - * @return the layerIndex - */ - abstract int getLayerIndexFor(String column); + tryToKickOffWork(); + } - /** - * Can all of our columns permit parallel updates? - */ - abstract public boolean allowCrossColumnParallelization(); + private void tryToKickOffWork() { + needsRun = true; + while (true) { + if (runLock.isHeldByCurrentThread() || !runLock.tryLock()) { + // do not permit re-entry or waiting on another thread doing exactly this work + return; + } - /** - * A class that handles the completion of one select column. The handlers are chained together; all downstream - * dependencies may execute when a column completes. - */ - public static abstract class SelectLayerCompletionHandler { - /** - * Note that the completed columns are shared among the entire chain of completion handlers. - */ - private final BitSet completedColumns; - private final SelectLayerCompletionHandler nextHandler; - private final BitSet requiredColumns; - private volatile boolean fired = false; + try { + if (needsRun) { + needsRun = false; + doKickOffWork(); + } + } catch (final Exception exception) { + try { + onError.accept(exception); + } catch (final Exception ignored) { + } + } finally { + runLock.unlock(); + } - /** - * Create a new completion handler that calls nextHandler after its own processing. The completedColumns BitSet - * is shared among all handlers. - * - * @param requiredColumns the columns required for this layer - * @param nextHandler the next handler to call - */ - SelectLayerCompletionHandler(BitSet requiredColumns, SelectLayerCompletionHandler nextHandler) { - this.requiredColumns = requiredColumns; - this.completedColumns = nextHandler.completedColumns; - this.nextHandler = nextHandler; + if (!needsRun) { + return; + } + } } - /** - * Create the final completion handler, which has no next handler. - * - * @param requiredColumns the columns required for this handler to fire - * @param completedColumns the set of completed columns, shared with all the other handlers - */ - public SelectLayerCompletionHandler(BitSet requiredColumns, BitSet completedColumns) { - this.requiredColumns = requiredColumns; - this.completedColumns = completedColumns; - this.nextHandler = null; - } + private void doKickOffWork() { + if (updateComplete) { + // we may have already completed the update, but are checking again due to the potential of a race + return; + } - /** - * Called when a single column is completed. - *

- * If we are ready, then we call {@link #onAllRequiredColumnsCompleted()}. - *

- * We may not be ready, but other columns downstream of us may be ready, so they are also notified (the - * nextHandler). - * - * @param completedColumn the layerIndex of the completedColumn - */ - void onLayerCompleted(int completedColumn) { - if (!fired) { + int nextLayer = 0; + while (nextLayer >= 0) { + boolean complete; boolean readyToFire = false; - synchronized (completedColumns) { - if (!fired) { - completedColumns.set(completedColumn); - if (requiredColumns.get(completedColumn) || requiredColumns.isEmpty()) { - readyToFire = requiredColumns.stream().allMatch(completedColumns::get); - if (readyToFire) { - fired = true; - } + Runnable runner = null; + synchronized (remainingLayers) { + complete = remainingLayers.isEmpty(); + nextLayer = complete ? -1 : remainingLayers.nextSetBit(nextLayer); + + if (nextLayer != -1) { + if ((runner = runners[nextLayer]) != null) { + readyToFire = !layers[nextLayer].getLayerDependencySet().intersects(remainingLayers); + } + + if (readyToFire) { + runners[nextLayer] = null; + } else { + ++nextLayer; } } } + if (readyToFire) { - onAllRequiredColumnsCompleted(); + runner.run(); + } else if (complete) { + updateComplete = true; + onSuccess.run(); + return; } } - if (nextHandler != null) { - nextHandler.onLayerCompleted(completedColumn); - } } + } - protected void onError(Exception error) { - if (nextHandler != null) { - nextHandler.onError(error); + public void startTrackingPrev() { + for (final Layer layer : layers) { + if (layer != null) { + layer.startTrackingPrev(); } } - - /** - * Called when all required columns are completed. - */ - protected abstract void onAllRequiredColumnsCompleted(); } /** - * Create a completion handler that signals a future when the update is completed. - * - * @param waitForResult a void future indicating success or failure - * - * @return a completion handler that will signal the future + * Is the result of this select/view flat? */ - public SelectLayerCompletionHandler futureCompletionHandler(CompletableFuture waitForResult) { - final BitSet completedColumns = new BitSet(); - final BitSet requiredColumns = new BitSet(); - - setAllNewColumns(requiredColumns); + public boolean flatResult() { + return flatResult; + } - return new SelectLayerCompletionHandler(requiredColumns, completedColumns) { - boolean errorOccurred = false; + /** + * Can all of our columns permit parallel updates? + */ + public boolean allowCrossColumnParallelization() { + return Arrays.stream(layers) + .filter(Objects::nonNull) + .allMatch(Layer::allowCrossColumnParallelization); + } - @Override - public void onAllRequiredColumnsCompleted() { - if (errorOccurred) { - return; - } - waitForResult.complete(null); + @Override + public LogOutput append(LogOutput logOutput) { + logOutput = logOutput.append("SelectAndViewAnalyzer{"); + boolean first = true; + for (final Layer layer : layers) { + if (layer == null) { + continue; } - - @Override - protected void onError(Exception error) { - if (errorOccurred) { - return; - } - errorOccurred = true; - waitForResult.completeExceptionally(error); + if (first) { + first = false; + } else { + logOutput = logOutput.append(", "); } - }; + logOutput = logOutput.append(layer); + + } + return logOutput.append("}"); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzerWrapper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzerWrapper.java deleted file mode 100644 index ec4fcf6f534..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzerWrapper.java +++ /dev/null @@ -1,128 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.engine.table.impl.select.analyzers; - -import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.impl.QueryTable; -import io.deephaven.engine.table.impl.ShiftedColumnsFactory; -import io.deephaven.engine.table.impl.select.FormulaColumn; -import io.deephaven.engine.table.impl.select.SelectColumn; -import io.deephaven.engine.table.impl.select.SourceColumn; -import org.jetbrains.annotations.NotNull; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class SelectAndViewAnalyzerWrapper { - public enum UpdateFlavor { - Select, View, Update, UpdateView, LazyUpdate - } - - private final SelectAndViewAnalyzer analyzer; - private final FormulaColumn shiftColumn; - private final boolean shiftColumnHasPositiveOffset; - private final List remainingCols; - private final List processedColumns; - - SelectAndViewAnalyzerWrapper( - SelectAndViewAnalyzer analyzer, - FormulaColumn shiftColumn, - boolean shiftColumnHasPositiveOffset, - List remainingCols, - List processedColumns) { - this.analyzer = analyzer; - this.shiftColumn = shiftColumn; - this.shiftColumnHasPositiveOffset = shiftColumnHasPositiveOffset; - this.remainingCols = remainingCols; - this.processedColumns = processedColumns; - } - - public final Map> getPublishedColumnResources() { - if (shiftColumn == null) { - return analyzer.getPublishedColumnSources(); - } else { - return analyzer.getAllColumnSources(); - } - } - - public final Map calcEffects() { - return analyzer.calcEffects(shiftColumn != null); - } - - public SelectAndViewAnalyzer getAnalyzer() { - return analyzer; - } - - public List getProcessedColumns() { - return processedColumns; - } - - public QueryTable applyShiftsAndRemainingColumns( - @NotNull QueryTable sourceTable, @NotNull QueryTable queryTable, UpdateFlavor updateFlavor) { - if (shiftColumn != null) { - queryTable = (QueryTable) ShiftedColumnsFactory.getShiftedColumnsTable( - queryTable, shiftColumn, updateFlavor); - } - - // shift columns may introduce modifies that are not present in the original table; set these before using - if (sourceTable.isRefreshing()) { - if (shiftColumn == null && sourceTable.isAddOnly()) { - queryTable.setAttribute(Table.ADD_ONLY_TABLE_ATTRIBUTE, true); - } - if ((shiftColumn == null || !shiftColumnHasPositiveOffset) && sourceTable.isAppendOnly()) { - // note if the shift offset is non-positive, then this result is still append-only - queryTable.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true); - } - if (sourceTable.hasAttribute(Table.TEST_SOURCE_TABLE_ATTRIBUTE)) { - // be convenient for test authors by propagating the test source table attribute - queryTable.setAttribute(Table.TEST_SOURCE_TABLE_ATTRIBUTE, true); - } - if (sourceTable.isBlink()) { - // blink tables, although possibly not useful, can have shift columns - queryTable.setAttribute(Table.BLINK_TABLE_ATTRIBUTE, true); - } - } - - boolean isMultiStateSelect = shiftColumn != null || remainingCols != null; - if (isMultiStateSelect && (updateFlavor == UpdateFlavor.Select || updateFlavor == UpdateFlavor.View)) { - List newResultColumns = new LinkedList<>(); - for (SelectColumn processed : processedColumns) { - newResultColumns.add(new SourceColumn(processed.getName())); - } - if (shiftColumn != null) { - newResultColumns.add(new SourceColumn(shiftColumn.getName())); - } - if (remainingCols != null) { - newResultColumns.addAll(remainingCols); - } - - if (updateFlavor == UpdateFlavor.Select) { - queryTable = (QueryTable) queryTable.select(newResultColumns); - } else { - queryTable = (QueryTable) queryTable.view(newResultColumns); - } - } else if (remainingCols != null) { - switch (updateFlavor) { - case Update: { - queryTable = (QueryTable) queryTable.update(remainingCols); - break; - } - case UpdateView: { - queryTable = (QueryTable) queryTable.updateView(remainingCols); - break; - } - case LazyUpdate: { - queryTable = (QueryTable) queryTable.lazyUpdate(remainingCols); - break; - } - default: - throw new IllegalStateException("Unexpected update flavor: " + updateFlavor); - } - } - - return queryTable; - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java index b7177e9fe39..431d00ee7e7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java @@ -40,24 +40,22 @@ import static io.deephaven.chunk.util.pools.ChunkPoolConstants.LARGEST_POOLED_CHUNK_CAPACITY; final public class SelectColumnLayer extends SelectOrViewColumnLayer { - /** - * The same reference as super.columnSource, but as a WritableColumnSource and maybe reinterpreted - */ + /** The same reference as super.columnSource, but as a WritableColumnSource and maybe reinterpreted */ private final WritableColumnSource writableSource; - /** - * The execution context the select column layer was constructed in - */ + /** The execution context the select column layer was constructed in */ private final ExecutionContext executionContext; private final UpdateGraph updateGraph; - /** - * Our parent row set, used for ensuring capacity. - */ + /** Our parent row set, used for ensuring capacity */ private final RowSet parentRowSet; + /** Whether our result is redirected */ private final boolean isRedirected; + /** Whether our result is flattened */ private final boolean flattenedResult; - private final boolean alreadyFlattenedSources; + /** Whether our dependencies are in the result key space instead of parent key space */ + private final boolean sourcesAreInResultKeySpace; + /** Which layers we depend on */ private final BitSet dependencyBitSet; private final boolean canParallelizeThisColumn; private final boolean isSystemic; @@ -73,14 +71,22 @@ final public class SelectColumnLayer extends SelectOrViewColumnLayer { private ChunkSource.WithPrev chunkSource; SelectColumnLayer( - UpdateGraph updateGraph, RowSet parentRowSet, SelectAndViewAnalyzer inner, String name, SelectColumn sc, - WritableColumnSource ws, WritableColumnSource underlying, String[] deps, ModifiedColumnSet mcsBuilder, - boolean isRedirected, boolean flattenedResult, boolean alreadyFlattenedSources) { - super(inner, name, sc, ws, underlying, deps, mcsBuilder); + final UpdateGraph updateGraph, + final RowSet parentRowSet, + final SelectAndViewAnalyzer.AnalyzerContext context, + final SelectColumn sc, + final WritableColumnSource ws, + final WritableColumnSource underlying, + final String[] deps, + final ModifiedColumnSet mcsBuilder, + final boolean isRedirected, + final boolean sourcesAreInResultKeySpace) { + super(context, sc, ws, underlying, deps, mcsBuilder); this.updateGraph = updateGraph; this.parentRowSet = parentRowSet; this.writableSource = ReinterpretUtils.maybeConvertToWritablePrimitive(ws); this.isRedirected = isRedirected; + this.sourcesAreInResultKeySpace = sourcesAreInResultKeySpace; final ExecutionContext userSuppliedContext = ExecutionContext.getContextToRecord(); if (userSuppliedContext != null) { @@ -91,10 +97,16 @@ final public class SelectColumnLayer extends SelectOrViewColumnLayer { } dependencyBitSet = new BitSet(); - Arrays.stream(deps).mapToInt(inner::getLayerIndexFor).forEach(dependencyBitSet::set); + Arrays.stream(deps) + .mapToInt(context::getLayerIndexFor) + .filter(layerIndex -> layerIndex >= 0) + .forEach(dependencyBitSet::set); + if (isRedirected) { + // we cannot write to the redirected column until after the redirection has been updated + context.setRedirectionLayer(dependencyBitSet); + } - this.flattenedResult = flattenedResult; - this.alreadyFlattenedSources = alreadyFlattenedSources; + this.flattenedResult = context.isFlatResult(); // We can only parallelize this column if we are not redirected, our destination provides ensure previous, and // the select column is stateless @@ -136,9 +148,33 @@ private ChunkSource getChunkSource() { } @Override - public void applyUpdate(final TableUpdate upstream, final RowSet toClear, - final UpdateHelper helper, final JobScheduler jobScheduler, @Nullable final LivenessNode liveResultOwner, - final SelectLayerCompletionHandler onCompletion) { + public BitSet getLayerDependencySet() { + return dependencyBitSet; + } + + @Override + public Runnable createUpdateHandler( + final TableUpdate originalUpdate, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + final JobScheduler jobScheduler, + @Nullable final LivenessNode liveResultOwner, + final Runnable onSuccess, + final Consumer onError) { + final TableUpdate upstream; + if (!sourcesAreInResultKeySpace) { + upstream = originalUpdate; + } else { + // This better be the static fake update. + Assert.eqTrue(originalUpdate.added().size() == parentRowSet.size(), + "originalUpdate.added().size() == parentRowSet.size()"); + Assert.eqTrue(originalUpdate.removed().isEmpty(), "originalUpdate.removed.isEmpty()"); + Assert.eqTrue(originalUpdate.modified().isEmpty(), "originalUpdate.modified.isEmpty()"); + Assert.eqTrue(originalUpdate.shifted().empty(), "originalUpdate.shifted.empty()"); + + upstream = helper.resultKeySpaceUpdate(); + } + if (upstream.removed().isNonempty()) { if (isRedirected) { clearObjectsAtThisLevel(upstream.removed()); @@ -148,80 +184,80 @@ public void applyUpdate(final TableUpdate upstream, final RowSet toClear, } } - // recurse so that dependent intermediate columns are already updated - inner.applyUpdate(upstream, toClear, helper, jobScheduler, liveResultOwner, - new SelectLayerCompletionHandler(dependencyBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - // We don't want to bother with threads if we are going to process a small update - final long totalSize = upstream.added().size() + upstream.modified().size(); - - // If we have shifts, that makes everything nasty; so we do not want to deal with it - final boolean hasShifts = upstream.shifted().nonempty(); - - final boolean serialTableOperationsSafe = updateGraph.serialTableOperationsSafe() - || updateGraph.sharedLock().isHeldByCurrentThread() - || updateGraph.exclusiveLock().isHeldByCurrentThread(); - - if (canParallelizeThisColumn && jobScheduler.threadCount() > 1 && !hasShifts && - ((resultTypeIsTableOrRowSet && totalSize > 0) - || totalSize >= QueryTable.MINIMUM_PARALLEL_SELECT_ROWS)) { - final long divisionSize = resultTypeIsTableOrRowSet ? 1 - : Math.max(QueryTable.MINIMUM_PARALLEL_SELECT_ROWS, - (totalSize + jobScheduler.threadCount() - 1) / jobScheduler.threadCount()); - final List updates = new ArrayList<>(); - // divide up the additions and modifications - try (final RowSequence.Iterator rsAddIt = upstream.added().getRowSequenceIterator(); - final RowSequence.Iterator rsModIt = upstream.modified().getRowSequenceIterator()) { - while (rsAddIt.hasMore() || rsModIt.hasMore()) { - final TableUpdateImpl update = new TableUpdateImpl(); - update.modifiedColumnSet = upstream.modifiedColumnSet(); - update.shifted = RowSetShiftData.EMPTY; - update.removed = RowSetFactory.empty(); - - if (rsAddIt.hasMore()) { - update.added = rsAddIt.getNextRowSequenceWithLength(divisionSize).asRowSet(); - } else { - update.added = RowSetFactory.empty(); - } - - if (update.added.size() < divisionSize && rsModIt.hasMore()) { - update.modified = rsModIt - .getNextRowSequenceWithLength(divisionSize - update.added().size()) - .asRowSet(); - } else { - update.modified = RowSetFactory.empty(); - } - - updates.add(update); - } - } - - if (updates.isEmpty()) { - throw new IllegalStateException(); - } + return () -> { + // We don't want to bother with threads if we are going to process a small update + final long totalSize = upstream.added().size() + upstream.modified().size(); + + // If we have shifts, that makes everything nasty; so we do not want to deal with it + final boolean hasShifts = upstream.shifted().nonempty(); + + final boolean serialTableOperationsSafe = updateGraph.serialTableOperationsSafe() + || updateGraph.sharedLock().isHeldByCurrentThread() + || updateGraph.exclusiveLock().isHeldByCurrentThread(); + + if (canParallelizeThisColumn && jobScheduler.threadCount() > 1 && !hasShifts && + ((resultTypeIsTableOrRowSet && totalSize > 0) + || totalSize >= QueryTable.MINIMUM_PARALLEL_SELECT_ROWS)) { + final long divisionSize = resultTypeIsTableOrRowSet ? 1 + : Math.max(QueryTable.MINIMUM_PARALLEL_SELECT_ROWS, + (totalSize + jobScheduler.threadCount() - 1) / jobScheduler.threadCount()); + final List updates = new ArrayList<>(); + // divide up the additions and modifications + try (final RowSequence.Iterator rsAddIt = upstream.added().getRowSequenceIterator(); + final RowSequence.Iterator rsModIt = upstream.modified().getRowSequenceIterator()) { + while (rsAddIt.hasMore() || rsModIt.hasMore()) { + final TableUpdateImpl update = new TableUpdateImpl(); + update.modifiedColumnSet = upstream.modifiedColumnSet(); + update.shifted = RowSetShiftData.EMPTY; + update.removed = RowSetFactory.empty(); + + if (rsAddIt.hasMore()) { + update.added = rsAddIt.getNextRowSequenceWithLength(divisionSize).asRowSet(); + } else { + update.added = RowSetFactory.empty(); + } - jobScheduler.submit( - executionContext, - () -> prepareParallelUpdate(jobScheduler, upstream, toClear, helper, - liveResultOwner, onCompletion, this::onError, updates, - serialTableOperationsSafe), - SelectColumnLayer.this, this::onError); + if (update.added.size() < divisionSize && rsModIt.hasMore()) { + update.modified = rsModIt + .getNextRowSequenceWithLength(divisionSize - update.added().size()) + .asRowSet(); } else { - jobScheduler.submit( - executionContext, - () -> doSerialApplyUpdate(upstream, toClear, helper, liveResultOwner, onCompletion, - serialTableOperationsSafe), - SelectColumnLayer.this, this::onError); + update.modified = RowSetFactory.empty(); } + + updates.add(update); } - }); + } + + if (updates.isEmpty()) { + throw new IllegalStateException(); + } + + jobScheduler.submit( + executionContext, + () -> prepareParallelUpdate(jobScheduler, upstream, toClear, helper, liveResultOwner, onSuccess, + onError, updates, serialTableOperationsSafe), + SelectColumnLayer.this, onError); + } else { + jobScheduler.submit( + executionContext, + () -> doSerialApplyUpdate(upstream, toClear, helper, liveResultOwner, onSuccess, + serialTableOperationsSafe), + SelectColumnLayer.this, onError); + } + }; } - private void prepareParallelUpdate(final JobScheduler jobScheduler, final TableUpdate upstream, - final RowSet toClear, final UpdateHelper helper, @Nullable final LivenessNode liveResultOwner, - final SelectLayerCompletionHandler onCompletion, final Consumer onError, - final List splitUpdates, final boolean serialTableOperationsSafe) { + private void prepareParallelUpdate( + final JobScheduler jobScheduler, + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + @Nullable final LivenessNode liveResultOwner, + final Runnable onSuccess, + final Consumer onError, + final List splitUpdates, + final boolean serialTableOperationsSafe) { // we have to do removal and previous initialization before we can do any of the actual filling in multiple // threads to avoid concurrency problems with our destination column sources doEnsureCapacity(); @@ -250,13 +286,17 @@ private void prepareParallelUpdate(final JobScheduler jobScheduler, final TableU if (!isRedirected) { clearObjectsAtThisLevel(toClear); } - onCompletion.onLayerCompleted(getLayerIndex()); + onSuccess.run(); }, onError); } - private void doSerialApplyUpdate(final TableUpdate upstream, final RowSet toClear, final UpdateHelper helper, - @Nullable final LivenessNode liveResultOwner, final SelectLayerCompletionHandler onCompletion, + private void doSerialApplyUpdate( + final TableUpdate upstream, + final RowSet toClear, + final SelectAndViewAnalyzer.UpdateHelper helper, + @Nullable final LivenessNode liveResultOwner, + final Runnable onSuccess, final boolean serialTableOperationsSafe) { doEnsureCapacity(); final boolean oldSafe = updateGraph.setSerialTableOperationsSafe(serialTableOperationsSafe); @@ -269,11 +309,14 @@ private void doSerialApplyUpdate(final TableUpdate upstream, final RowSet toClea if (!isRedirected) { clearObjectsAtThisLevel(toClear); } - onCompletion.onLayerCompleted(getLayerIndex()); + onSuccess.run(); } - private void doParallelApplyUpdate(final TableUpdate upstream, final UpdateHelper helper, - @Nullable final LivenessNode liveResultOwner, final boolean serialTableOperationsSafe, + private void doParallelApplyUpdate( + final TableUpdate upstream, + final SelectAndViewAnalyzer.UpdateHelper helper, + @Nullable final LivenessNode liveResultOwner, + final boolean serialTableOperationsSafe, final long startOffset) { final boolean oldSafe = updateGraph.setSerialTableOperationsSafe(serialTableOperationsSafe); try { @@ -285,8 +328,11 @@ private void doParallelApplyUpdate(final TableUpdate upstream, final UpdateHelpe upstream.release(); } - private Boolean doApplyUpdate(final TableUpdate upstream, final UpdateHelper helper, - @Nullable final LivenessNode liveResultOwner, final long startOffset) { + private Boolean doApplyUpdate( + final TableUpdate upstream, + final SelectAndViewAnalyzer.UpdateHelper helper, + @Nullable final LivenessNode liveResultOwner, + final long startOffset) { final int PAGE_SIZE = 4096; final LongToIntFunction contextSize = (long size) -> size > PAGE_SIZE ? PAGE_SIZE : (int) size; @@ -594,16 +640,6 @@ private void clearObjectsAtThisLevel(RowSet keys) { } } - @Override - public boolean flattenedResult() { - return flattenedResult; - } - - @Override - public boolean alreadyFlattenedSources() { - return alreadyFlattenedSources; - } - @Override public LogOutput append(LogOutput logOutput) { return logOutput.append("{SelectColumnLayer: ").append(selectColumn.toString()).append(", layerIndex=") @@ -612,6 +648,6 @@ public LogOutput append(LogOutput logOutput) { @Override public boolean allowCrossColumnParallelization() { - return selectColumn.isStateless() && inner.allowCrossColumnParallelization(); + return selectColumn.isStateless(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectOrViewColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectOrViewColumnLayer.java index 5fbef5b9d74..ebf2ac05ec2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectOrViewColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectOrViewColumnLayer.java @@ -12,18 +12,20 @@ public abstract class SelectOrViewColumnLayer extends DependencyLayerBase { private final ColumnSource optionalUnderlying; - SelectOrViewColumnLayer(SelectAndViewAnalyzer inner, String name, SelectColumn sc, - ColumnSource ws, ColumnSource optionalUnderlying, - String[] deps, ModifiedColumnSet mcsBuilder) { - super(inner, name, sc, ws, deps, mcsBuilder); + SelectOrViewColumnLayer( + final SelectAndViewAnalyzer.AnalyzerContext context, + final SelectColumn sc, + final ColumnSource ws, + final ColumnSource optionalUnderlying, + final String[] deps, + final ModifiedColumnSet mcsBuilder) { + super(context, sc, ws, deps, mcsBuilder); this.optionalUnderlying = optionalUnderlying; } @Override - final Map> getColumnSourcesRecurse(GetMode mode) { - final Map> result = inner.getColumnSourcesRecurse(mode); + void populateColumnSources(final Map> result) { result.put(name, columnSource); - return result; } @Override @@ -32,6 +34,5 @@ public void startTrackingPrev() { if (optionalUnderlying != null) { optionalUnderlying.startTrackingPrevValues(); } - inner.startTrackingPrev(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java deleted file mode 100644 index 25827b2ca19..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/StaticFlattenLayer.java +++ /dev/null @@ -1,146 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.engine.table.impl.select.analyzers; - -import io.deephaven.base.log.LogOutput; -import io.deephaven.base.verify.Assert; -import io.deephaven.engine.liveness.LivenessNode; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.rowset.RowSetShiftData; -import io.deephaven.engine.rowset.TrackingRowSet; -import io.deephaven.engine.table.ColumnDefinition; -import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.table.ModifiedColumnSet; -import io.deephaven.engine.table.TableUpdate; -import io.deephaven.engine.table.impl.TableUpdateImpl; -import io.deephaven.engine.table.impl.sources.RedirectedColumnSource; -import io.deephaven.engine.table.impl.util.RowRedirection; -import io.deephaven.engine.table.impl.util.WrappedRowSetRowRedirection; -import io.deephaven.engine.table.impl.util.JobScheduler; -import org.jetbrains.annotations.Nullable; - -import java.util.BitSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; - -final public class StaticFlattenLayer extends SelectAndViewAnalyzer { - private final SelectAndViewAnalyzer inner; - private final TrackingRowSet parentRowSet; - private final Map> overriddenColumns; - - StaticFlattenLayer(SelectAndViewAnalyzer inner, TrackingRowSet parentRowSet) { - super(inner.getLayerIndex() + 1); - this.inner = inner; - this.parentRowSet = parentRowSet; - final HashSet alreadyFlattenedColumns = new HashSet<>(); - inner.getNewColumnSources().forEach((name, cs) -> { - alreadyFlattenedColumns.add(name); - }); - - final RowRedirection rowRedirection = new WrappedRowSetRowRedirection(parentRowSet); - overriddenColumns = new HashMap<>(); - inner.getAllColumnSources().forEach((name, cs) -> { - if (alreadyFlattenedColumns.contains(name)) { - return; - } - - overriddenColumns.put(name, RedirectedColumnSource.maybeRedirect(rowRedirection, cs)); - }); - } - - @Override - void setBaseBits(BitSet bitset) { - inner.setBaseBits(bitset); - } - - @Override - void populateModifiedColumnSetRecurse(ModifiedColumnSet mcsBuilder, Set remainingDepsToSatisfy) { - inner.populateModifiedColumnSetRecurse(mcsBuilder, remainingDepsToSatisfy); - } - - @Override - Map> getColumnSourcesRecurse(GetMode mode) { - final Map> innerColumns = inner.getColumnSourcesRecurse(mode); - - if (overriddenColumns.keySet().stream().noneMatch(innerColumns::containsKey)) { - return innerColumns; - } - - final Map> columns = new LinkedHashMap<>(); - innerColumns.forEach((name, cs) -> columns.put(name, overriddenColumns.getOrDefault(name, cs))); - return columns; - } - - @Override - public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler, - @Nullable LivenessNode liveResultOwner, SelectLayerCompletionHandler onCompletion) { - // this must be the fake update used to initialize the result table - Assert.eqTrue(upstream.added().isFlat(), "upstream.added.isFlat()"); - Assert.eq(upstream.added().size(), "upstream.added.size()", parentRowSet.size(), "parentRowSet.size()"); - Assert.eqTrue(upstream.removed().isEmpty(), "upstream.removed.isEmpty()"); - Assert.eqTrue(upstream.modified().isEmpty(), "upstream.modified.isEmpty()"); - - final BitSet baseLayerBitSet = new BitSet(); - inner.setBaseBits(baseLayerBitSet); - final TableUpdate innerUpdate = new TableUpdateImpl( - parentRowSet.copy(), RowSetFactory.empty(), RowSetFactory.empty(), - RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY); - inner.applyUpdate(innerUpdate, toClear, helper, jobScheduler, liveResultOwner, - new SelectLayerCompletionHandler(baseLayerBitSet, onCompletion) { - @Override - public void onAllRequiredColumnsCompleted() { - onCompletion.onLayerCompleted(getLayerIndex()); - } - }); - } - - @Override - Map> calcDependsOnRecurse(boolean forcePublishAllResources) { - return inner.calcDependsOnRecurse(forcePublishAllResources); - } - - @Override - public SelectAndViewAnalyzer getInner() { - return inner; - } - - @Override - int getLayerIndexFor(String column) { - if (overriddenColumns.containsKey(column)) { - return getLayerIndex(); - } - return inner.getLayerIndexFor(column); - } - - @Override - public void startTrackingPrev() { - throw new UnsupportedOperationException("StaticFlattenLayer is used in only non-refreshing scenarios"); - } - - @Override - public LogOutput append(LogOutput logOutput) { - return logOutput.append("{StaticFlattenLayer").append(", layerIndex=").append(getLayerIndex()).append("}"); - } - - @Override - public boolean allowCrossColumnParallelization() { - return inner.allowCrossColumnParallelization(); - } - - @Override - public boolean flattenedResult() { - // this layer performs a flatten, so the result is flattened - return true; - } - - @Override - public boolean alreadyFlattenedSources() { - // this layer performs a flatten, so the sources are now flattened - return true; - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ViewColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ViewColumnLayer.java index 84bdda755a5..3019b5277b1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ViewColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/ViewColumnLayer.java @@ -4,35 +4,30 @@ package io.deephaven.engine.table.impl.select.analyzers; import io.deephaven.base.log.LogOutput; -import io.deephaven.base.verify.Assert; import io.deephaven.configuration.Configuration; -import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.liveness.LivenessReferent; -import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.ModifiedColumnSet; -import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.select.SelectColumn; -import io.deephaven.engine.table.impl.util.JobScheduler; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; final public class ViewColumnLayer extends SelectOrViewColumnLayer { private static final boolean ALLOW_LIVENESS_REFERENT_RESULTS = Configuration.getInstance() .getBooleanForClassWithDefault(ViewColumnLayer.class, "allowLivenessReferentResults", false); - ViewColumnLayer(SelectAndViewAnalyzer inner, String name, SelectColumn sc, ColumnSource cs, String[] deps, - ModifiedColumnSet mcsBuilder) { - super(inner, name, sc, checkResultType(cs), null, deps, mcsBuilder); + ViewColumnLayer( + final SelectAndViewAnalyzer.AnalyzerContext context, + final SelectColumn sc, + final ColumnSource cs, + final String[] deps, + final ModifiedColumnSet mcsBuilder) { + super(context, sc, checkResultType(cs), null, deps, mcsBuilder); } @Override - public void applyUpdate(TableUpdate upstream, RowSet toClear, UpdateHelper helper, JobScheduler jobScheduler, - @Nullable LivenessNode liveResultOwner, SelectLayerCompletionHandler completionHandler) { - // To be parallel with SelectColumnLayer, we would recurse here, but since this is ViewColumnLayer - // (and all my inner layers are ViewColumnLayer), there's nothing to do. - Assert.eqNull(completionHandler, "completionHandler"); + public boolean hasRefreshingLogic() { + return false; } @Override @@ -47,7 +42,7 @@ public boolean allowCrossColumnParallelization() { return false; } - private static ColumnSource checkResultType(@NotNull final ColumnSource cs) { + private static ColumnSource checkResultType(@NotNull final ColumnSource cs) { final Class resultType = cs.getType(); if (!ALLOW_LIVENESS_REFERENT_RESULTS && LivenessReferent.class.isAssignableFrom(resultType)) { throw new UnsupportedOperationException(String.format( diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/codegen/FormulaAnalyzer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/codegen/FormulaAnalyzer.java index fe162c1f305..0ef5b12f59e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/codegen/FormulaAnalyzer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/codegen/FormulaAnalyzer.java @@ -3,6 +3,7 @@ // package io.deephaven.engine.table.impl.select.codegen; +import io.deephaven.api.util.NameValidator; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.context.QueryLibrary; import io.deephaven.engine.table.ColumnDefinition; @@ -12,8 +13,6 @@ import io.deephaven.vector.ObjectVector; import io.deephaven.engine.table.impl.select.DhFormulaColumn; import io.deephaven.engine.table.impl.select.formula.FormulaSourceDescriptor; -import io.deephaven.engine.table.WritableColumnSource; -import io.deephaven.engine.rowset.TrackingWritableRowSet; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import org.jetbrains.annotations.NotNull; @@ -28,9 +27,45 @@ public class FormulaAnalyzer { private static final Logger log = LoggerFactory.getLogger(FormulaAnalyzer.class); + /** + * A container to hold a single copy of imports required to compile formulas for one operation. + */ + public static final class Imports { + private final Map queryScopeVariables; + private final Collection packageImports; + private final Collection> classImports; + private final Collection> staticImports; + + public Imports() { + final ExecutionContext context = ExecutionContext.getContext(); + queryScopeVariables = Collections.unmodifiableMap( + context.getQueryScope().toMap((name, value) -> NameValidator.isValidQueryParameterName(name))); + final QueryLibrary queryLibrary = context.getQueryLibrary(); + packageImports = Set.copyOf(queryLibrary.getPackageImports()); + classImports = Set.copyOf(queryLibrary.getClassImports()); + staticImports = Set.copyOf(queryLibrary.getStaticImports()); + } + + public Map getQueryScopeVariables() { + return queryScopeVariables; + } + + public Collection getPackageImports() { + return packageImports; + } + + public Collection> getClassImports() { + return classImports; + } + + public Collection> getStaticImports() { + return staticImports; + } + } + public static Result analyze(final String rawFormulaString, final Map> columnDefinitionMap, - final QueryLanguageParser.Result queryLanguageResult) throws Exception { + final QueryLanguageParser.Result queryLanguageResult) { log.debug().append("Expression (after language conversion) : ") .append(queryLanguageResult.getConvertedExpression()) @@ -75,7 +110,7 @@ public static Result analyze(final String rawFormulaString, * @param formulaString The raw formula string * @param availableColumns The columns available for use in the formula * @param columnRenames Outer to inner column name mapping - * @param queryScopeVariables The query scope variables + * @param imports The query scope variables, package, class, and static imports * @return The parsed formula {@link QueryLanguageParser.Result result} * @throws Exception If the formula cannot be parsed */ @@ -83,8 +118,8 @@ public static QueryLanguageParser.Result parseFormula( @NotNull final String formulaString, @NotNull final Map> availableColumns, @NotNull final Map columnRenames, - @NotNull final Map queryScopeVariables) throws Exception { - return parseFormula(formulaString, availableColumns, columnRenames, queryScopeVariables, true); + @NotNull final Imports imports) throws Exception { + return parseFormula(formulaString, availableColumns, columnRenames, imports, true); } /** @@ -93,7 +128,7 @@ public static QueryLanguageParser.Result parseFormula( * @param formulaString The raw formula string * @param availableColumns The columns available for use in the formula * @param columnRenames Outer to inner column name mapping - * @param queryScopeVariables The query scope variables + * @param imports The query scope variables, package, class, and static imports * @param unboxArguments If true it will unbox the query scope arguments * @return The parsed formula {@link QueryLanguageParser.Result result} * @throws Exception If the formula cannot be parsed @@ -102,7 +137,7 @@ public static QueryLanguageParser.Result parseFormula( @NotNull final String formulaString, @NotNull final Map> availableColumns, @NotNull final Map columnRenames, - @NotNull final Map queryScopeVariables, + @NotNull final Imports imports, final boolean unboxArguments) throws Exception { final TimeLiteralReplacedExpression timeConversionResult = @@ -177,7 +212,7 @@ public static QueryLanguageParser.Result parseFormula( } // Parameters come last. - for (Map.Entry param : queryScopeVariables.entrySet()) { + for (Map.Entry param : imports.queryScopeVariables.entrySet()) { if (possibleVariables.containsKey(param.getKey())) { // Columns and column arrays take precedence over parameters. continue; @@ -200,13 +235,10 @@ public static QueryLanguageParser.Result parseFormula( possibleVariables.putAll(timeConversionResult.getNewVariables()); - final QueryLibrary queryLibrary = ExecutionContext.getContext().getQueryLibrary(); - final Set> classImports = new HashSet<>(queryLibrary.getClassImports()); - classImports.add(TrackingWritableRowSet.class); - classImports.add(WritableColumnSource.class); - return new QueryLanguageParser(timeConversionResult.getConvertedFormula(), queryLibrary.getPackageImports(), - classImports, queryLibrary.getStaticImports(), possibleVariables, possibleVariableParameterizedTypes, - queryScopeVariables, columnVariables, unboxArguments, timeConversionResult).getResult(); + return new QueryLanguageParser(timeConversionResult.getConvertedFormula(), imports.getPackageImports(), + imports.getClassImports(), imports.getStaticImports(), possibleVariables, + possibleVariableParameterizedTypes, imports.getQueryScopeVariables(), columnVariables, unboxArguments, + timeConversionResult).getResult(); } public static class Result { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/lang/TestQueryLanguageParser.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/lang/TestQueryLanguageParser.java index 32523cbf766..4f1558f0c1d 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/lang/TestQueryLanguageParser.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/lang/TestQueryLanguageParser.java @@ -3178,7 +3178,7 @@ private void check(String expression, String resultExpression, Class resultTy final Map possibleParams; final QueryScope queryScope = ExecutionContext.getContext().getQueryScope(); if (!(queryScope instanceof PoisonedQueryScope)) { - possibleParams = QueryCompilerRequestProcessor.newQueryScopeVariableSupplier().get(); + possibleParams = QueryCompilerRequestProcessor.newFormulaImportsSupplier().get().getQueryScopeVariables(); } else { possibleParams = null; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java index 8142c8d7d22..d85c2d6c3d4 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java @@ -46,11 +46,17 @@ private synchronized void computePayload() { final Class myType = type.getComponentType(); final Class myComponentType = myType != null ? myType.getComponentType() : null; - ChunkType chunkType = ChunkType.fromElementType(myType); - if (chunkType == ChunkType.Boolean) { - // the internal payload is in bytes (to handle nulls), but the wire format is packed bits + + final ChunkType chunkType; + if (myType == boolean.class || myType == Boolean.class) { + // Note: Internally booleans are passed around as bytes, but the wire format is packed bits. chunkType = ChunkType.Byte; + } else if (myType != null && !myType.isPrimitive()) { + chunkType = ChunkType.Object; + } else { + chunkType = ChunkType.fromElementType(myType); } + final ArrayExpansionKernel kernel = ArrayExpansionKernel.makeExpansionKernel(chunkType, myType); offsets = WritableIntChunk.makeWritableChunk(chunk.size() + 1); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkReader.java index dddc6414b1e..4e5b8cb0bd7 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkReader.java @@ -36,6 +36,8 @@ public VarListChunkReader(final StreamReaderOptions options, final TypeInfo type if (componentType == boolean.class || componentType == Boolean.class) { // Note: Internally booleans are passed around as bytes, but the wire format is packed bits. chunkType = ChunkType.Byte; + } else if (componentType != null && !componentType.isPrimitive()) { + chunkType = ChunkType.Object; } else { chunkType = ChunkType.fromElementType(componentType); } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/BoxedBooleanArrayExpansionKernel.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/BoxedBooleanArrayExpansionKernel.java index c6b901a7dd3..0a02ddb31f9 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/BoxedBooleanArrayExpansionKernel.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/array/BoxedBooleanArrayExpansionKernel.java @@ -47,7 +47,7 @@ public WritableChunk expand(final ObjectChunk source continue; } for (int j = 0; j < row.length; ++j) { - final byte value = row[j] ? BooleanUtils.TRUE_BOOLEAN_AS_BYTE : BooleanUtils.FALSE_BOOLEAN_AS_BYTE; + final byte value = BooleanUtils.booleanAsByte(row[j]); result.set(lenWritten + j, value); } lenWritten += row.length;