Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address issues for PartitionedTable.transform and PartitionedTable.partitionedTransform with static inputs and refreshing result constituents #3281

Merged
merged 2 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.util.annotations.FinalDefault;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Collection;
import java.util.Set;
Expand Down Expand Up @@ -200,16 +201,22 @@ default Proxy proxy() {
* Apply {@code transformer} to all constituent {@link Table tables}, and produce a new PartitionedTable containing
* the results.
* <p>
* This overload uses the {@link ExecutionContext#getContextToRecord enclosing ExecutionContext} and expects
* {@code transformer} to produce {@link Table#isRefreshing() refreshing} results if and only if this
* PartitionedTable's {@link #table() underlying table} is refreshing.
* <p>
*
* @apiNote {@code transformer} must be stateless, safe for concurrent use, and able to return a valid result for an
* empty input table. It is required to install an ExecutionContext to access any
* QueryLibrary/QueryScope/QueryCompiler functionality from the {@code transformer}.
*
* @param transformer The {@link UnaryOperator} to apply to all constituent {@link Table tables}
* @return The new PartitionedTable containing the resulting constituents
* @throws IllegalStateException On instantiation or update if {@code !table().isRefreshing()} and
* {@code transformer} produces a refreshing result for any constituent
*/
default PartitionedTable transform(@NotNull UnaryOperator<Table> transformer) {
return transform(ExecutionContext.getContextToRecord(), transformer);
return transform(ExecutionContext.getContextToRecord(), transformer, table().isRefreshing());
}

/**
Expand All @@ -222,9 +229,20 @@ default PartitionedTable transform(@NotNull UnaryOperator<Table> transformer) {
*
* @param executionContext The ExecutionContext to use for the {@code transformer}
* @param transformer The {@link UnaryOperator} to apply to all constituent {@link Table tables}
* @param expectRefreshingResults Whether to expect that the results of applying {@code transformer} <em>may</em> be
* {@link Table#isRefreshing() refreshing}. If {@code true}, the resulting PartitionedTable will always be
* backed by a refreshing {@link #table() table}. This hint is important for transforms to static inputs that
* might produce refreshing output, in order to ensure correct liveness management; incorrectly specifying
* {@code false} will result in exceptions.
* @return The new PartitionedTable containing the resulting constituents
* @throws IllegalStateException On instantiation or update if
* {@code !table().isRefreshing() && !expectRefreshingResults} and {@code transformer} produces a refreshing
* result for any constituent
*/
PartitionedTable transform(ExecutionContext executionContext, @NotNull UnaryOperator<Table> transformer);
PartitionedTable transform(
@Nullable ExecutionContext executionContext,
@NotNull UnaryOperator<Table> transformer,
boolean expectRefreshingResults);

/**
* <p>
Expand All @@ -242,6 +260,10 @@ default PartitionedTable transform(@NotNull UnaryOperator<Table> transformer) {
* {@link ColumnSource#getType() data type} and {@link ColumnSource#getComponentType() component type}.</li>
* </ol>
* <p>
* This overload uses the {@link ExecutionContext#getContextToRecord enclosing ExecutionContext} and expects
* {@code transformer} to produce {@link Table#isRefreshing() refreshing} results if and only if {@code this} or
* {@code other} has a refreshing {@link #table() underlying table}.
* <p>
*
* @apiNote {@code transformer} must be stateless, safe for concurrent use, and able to return a valid result for
* empty input tables. It is required to install an ExecutionContext to access any
Expand All @@ -250,11 +272,15 @@ default PartitionedTable transform(@NotNull UnaryOperator<Table> transformer) {
* @param other The other PartitionedTable to find constituents in
* @param transformer The {@link BinaryOperator} to apply to all pairs of constituent {@link Table tables}
* @return The new PartitionedTable containing the resulting constituents
* @throws IllegalStateException On instantiation or update if
* {@code !table().isRefreshing() && !other.table().isRefreshing()} and {@code transformer} produces a
* refreshing result for any constituent
*/
default PartitionedTable partitionedTransform(
@NotNull PartitionedTable other,
@NotNull BinaryOperator<Table> transformer) {
return partitionedTransform(other, ExecutionContext.getContextToRecord(), transformer);
return partitionedTransform(other, ExecutionContext.getContextToRecord(), transformer,
table().isRefreshing() || other.table().isRefreshing());
}

/**
Expand All @@ -281,12 +307,21 @@ default PartitionedTable partitionedTransform(
* @param other The other PartitionedTable to find constituents in
* @param executionContext The ExecutionContext to use for the {@code transformer}
* @param transformer The {@link BinaryOperator} to apply to all pairs of constituent {@link Table tables}
* @param expectRefreshingResults Whether to expect that the results of applying {@code transformer} <em>may</em> be
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
* {@link Table#isRefreshing() refreshing}. If {@code true}, the resulting PartitionedTable will always be
* backed by a refreshing {@link #table() table}. This hint is important for transforms to static inputs that
* might produce refreshing output, in order to ensure correct liveness management; incorrectly specifying
* {@code false} will result in exceptions.
* @return The new PartitionedTable containing the resulting constituents
* @throws IllegalStateException On instantiation or update if
* {@code !table().isRefreshing() && !other.table().isRefreshing() && !expectRefreshingResults} and
* {@code transformer} produces a refreshing result for any constituent
*/
PartitionedTable partitionedTransform(
@NotNull PartitionedTable other,
ExecutionContext executionContext,
@NotNull BinaryOperator<Table> transformer);
@Nullable ExecutionContext executionContext,
@NotNull BinaryOperator<Table> transformer,
boolean expectRefreshingResults);

/**
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.lang.ref.WeakReference;
import java.util.*;
Expand Down Expand Up @@ -240,17 +241,22 @@ public PartitionedTable sort(@NotNull final Collection<SortColumn> sortColumns)

@ConcurrentMethod
@Override
public PartitionedTableImpl transform(final ExecutionContext executionContext,
@NotNull final UnaryOperator<Table> transformer) {
public PartitionedTableImpl transform(
@Nullable final ExecutionContext executionContext,
@NotNull final UnaryOperator<Table> transformer,
final boolean expectRefreshingResults) {
final Table resultTable;
final TableDefinition resultConstituentDefinition;
final LivenessManager enclosingScope = LivenessScopeStack.peek();
try (final SafeCloseable ignored1 = executionContext == null ? null : executionContext.open();
final SafeCloseable ignored2 = LivenessScopeStack.open()) {

final Table asRefreshingIfNeeded = maybeCopyAsRefreshing(table, expectRefreshingResults);

// Perform the transformation
resultTable = table.update(new TableTransformationColumn(
constituentColumnName, executionContext, transformer));
resultTable = asRefreshingIfNeeded.update(new TableTransformationColumn(
constituentColumnName, executionContext,
asRefreshingIfNeeded.isRefreshing() ? transformer : assertResultsStatic(transformer)));
enclosingScope.manage(resultTable);

// Make sure we have a valid result constituent definition
Expand All @@ -273,8 +279,9 @@ public PartitionedTableImpl transform(final ExecutionContext executionContext,
@Override
public PartitionedTableImpl partitionedTransform(
@NotNull final PartitionedTable other,
final ExecutionContext executionContext,
@NotNull final BinaryOperator<Table> transformer) {
@Nullable final ExecutionContext executionContext,
@NotNull final BinaryOperator<Table> transformer,
final boolean expectRefreshingResults) {
// Check safety before doing any extra work
if (table.isRefreshing() || other.table().isRefreshing()) {
UpdateGraphProcessor.DEFAULT.checkInitiateTableOperation();
Expand All @@ -295,9 +302,12 @@ public PartitionedTableImpl partitionedTransform(
? table.naturalJoin(other.table(), joinPairs, joinAdditions)
.where(new MatchFilter(Inverted, RHS_CONSTITUENT, (Object) null))
: table.join(other.table(), joinPairs, joinAdditions);
resultTable = joined
.update(new BiTableTransformationColumn(
constituentColumnName, RHS_CONSTITUENT, executionContext, transformer))

final Table asRefreshingIfNeeded = maybeCopyAsRefreshing(joined, expectRefreshingResults);

resultTable = asRefreshingIfNeeded
.update(new BiTableTransformationColumn(constituentColumnName, RHS_CONSTITUENT, executionContext,
asRefreshingIfNeeded.isRefreshing() ? transformer : assertResultsStatic(transformer)))
.dropColumns(RHS_CONSTITUENT);
enclosingScope.manage(resultTable);

Expand All @@ -319,6 +329,37 @@ public PartitionedTableImpl partitionedTransform(
true);
}

private Table maybeCopyAsRefreshing(Table table, boolean expectRefreshingResults) {
if (!expectRefreshingResults || table.isRefreshing()) {
return table;
}
final Table copied = ((QueryTable) table.coalesce()).copy();
copied.setRefreshing(true);
return copied;
}

private static UnaryOperator<Table> assertResultsStatic(@NotNull final UnaryOperator<Table> wrapped) {
return (final Table table) -> {
final Table result = wrapped.apply(table);
if (result != null && result.isRefreshing()) {
throw new IllegalStateException("Static partitioned tables cannot contain refreshing constituents. "
+ "Did you mean to specify expectRefreshingResults=true for this transform?");
}
return result;
};
}

private static BinaryOperator<Table> assertResultsStatic(@NotNull final BinaryOperator<Table> wrapped) {
return (final Table table1, final Table table2) -> {
final Table result = wrapped.apply(table1, table2);
if (result != null && result.isRefreshing()) {
throw new IllegalStateException("Static partitioned tables cannot contain refreshing constituents. "
+ "Did you mean to specify expectRefreshingResults=true for this transform?");
}
return result;
};
}

// TODO (https://github.com/deephaven/deephaven-core/issues/2368): Consider adding transformWithKeys support

@ConcurrentMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ private PartitionedTable.Proxy basicTransform(@NotNull final UnaryOperator<Table
private PartitionedTable.Proxy basicTransform(
final boolean requiresFullContext, @NotNull final UnaryOperator<Table> transformer) {
return new PartitionedTableProxyImpl(
target.transform(getOrCreateExecutionContext(requiresFullContext), transformer),
target.transform(
getOrCreateExecutionContext(requiresFullContext),
transformer,
target.table().isRefreshing()),
requireMatchingKeys,
sanityCheckJoins);
}
Expand All @@ -170,17 +173,19 @@ private PartitionedTable.Proxy complexTransform(
final ExecutionContext context = getOrCreateExecutionContext(requiresFullContext);
if (other instanceof Table) {
final Table otherTable = (Table) other;
if ((target.table().isRefreshing() || otherTable.isRefreshing()) && joinMatches != null) {
final boolean refreshingResults = target.table().isRefreshing() || otherTable.isRefreshing();
if (refreshingResults && joinMatches != null) {
UpdateGraphProcessor.DEFAULT.checkInitiateTableOperation();
}
return new PartitionedTableProxyImpl(
target.transform(context, ct -> transformer.apply(ct, otherTable)),
target.transform(context, ct -> transformer.apply(ct, otherTable), refreshingResults),
requireMatchingKeys,
sanityCheckJoins);
}
if (other instanceof PartitionedTable.Proxy) {
final PartitionedTable.Proxy otherProxy = (PartitionedTable.Proxy) other;
final PartitionedTable otherTarget = otherProxy.target();
final boolean refreshingResults = target.table().isRefreshing() || otherTarget.table().isRefreshing();

if (target.table().isRefreshing() || otherTarget.table().isRefreshing()) {
UpdateGraphProcessor.DEFAULT.checkInitiateTableOperation();
Expand All @@ -203,7 +208,7 @@ private PartitionedTable.Proxy complexTransform(
final PartitionedTable rhsToUse = maybeRewrap(validatedRhsTable, otherTarget);

return new PartitionedTableProxyImpl(
lhsToUse.partitionedTransform(rhsToUse, context, transformer),
lhsToUse.partitionedTransform(rhsToUse, context, transformer, refreshingResults),
requireMatchingKeys,
sanityCheckJoins);
}
Expand Down Expand Up @@ -350,8 +355,11 @@ private static DependentValidation nonOverlappingJoinKeysValidation(
// NB: At the moment, we are assuming that constituents appear only once per partitioned table in scenarios
// where overlapping join keys are concerning.
final AtomicLong sequenceCounter = new AtomicLong(0);
final PartitionedTable stamped = input.transform(null, table -> table
.updateView(new LongConstantColumn(ENCLOSING_CONSTITUENT.name(), sequenceCounter.getAndIncrement())));
final PartitionedTable stamped = input.transform(
null,
table -> table.updateView(
new LongConstantColumn(ENCLOSING_CONSTITUENT.name(), sequenceCounter.getAndIncrement())),
input.table().isRefreshing());
final Table merged = stamped.merge();
final Table mergedWithUniqueAgg = merged.aggAllBy(AggSpec.unique(), joinKeyColumnNames);
final Table overlappingJoinKeys = mergedWithUniqueAgg.where(Filter.isNull(ENCLOSING_CONSTITUENT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ private void validateBindingPartitionedTableConstituents(
final QueryTable coalesced = (QueryTable) table.coalesce();
addValidator(hardReferences, description, coalesced);
return coalesced;
});
}, true);
hardReferences.put(k.toString(), validated);
}
});
Expand Down
Loading