Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19059]: Support non-time retract mode for OverAggregate operator #25753

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
import org.apache.flink.table.catalog.{ManagedTableListener, ResolvedCatalogBaseTable}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone, onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER}
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
Expand Down Expand Up @@ -247,9 +248,33 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val children = visitChildren(cep, ModifyKindSetTrait.INSERT_ONLY, "Match Recognize")
createNewNode(cep, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)

case over: StreamPhysicalOverAggregate =>
// OverAggregate can only support insert for row-time/proc-time sort keys
var overRequiredTrait = ModifyKindSetTrait.INSERT_ONLY
val builder = ModifyKindSet
.newBuilder()
.addContainedKind(ModifyKind.INSERT)

// All aggregates are computed over the same window and order by is supported for only 1 field
val orderKeyIndex =
over.logicWindow.groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex
val orderKeyType = over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType
if (
!FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType)
&& !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType)
) {
// Only non row-time/proc-time sort can support UPDATES
builder.addContainedKind(ModifyKind.UPDATE)
builder.addContainedKind(ModifyKind.DELETE)
overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES
}
val children = visitChildren(over, overRequiredTrait)
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(over, children, providedTrait, requiredTrait, requester)

case _: StreamPhysicalTemporalSort | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalOverAggregate | _: StreamPhysicalPythonOverAggregate =>
// TemporalSort, OverAggregate, IntervalJoin only support consuming insert-only
_: StreamPhysicalPythonOverAggregate =>
// TemporalSort, IntervalJoin only support consuming insert-only
// and producing insert-only changes
val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
createNewNode(rel, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
Expand Down Expand Up @@ -469,19 +494,18 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate |
_: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
_: StreamPhysicalPythonGroupTableAggregate | _: StreamPhysicalGroupWindowAggregateBase |
_: StreamPhysicalWindowAggregate =>
// Aggregate, TableAggregate, Limit, GroupWindowAggregate, WindowAggregate,
_: StreamPhysicalWindowAggregate | _: StreamPhysicalOverAggregate =>
// Aggregate, TableAggregate, OverAggregate, Limit, GroupWindowAggregate, WindowAggregate,
// and WindowTableAggregate requires update_before if there are updates
val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
val children = visitChildren(rel, requiredChildTrait)
// use requiredTrait as providedTrait, because they should support all kinds of UpdateKind
createNewNode(rel, children, requiredTrait)

case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin =>
// WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP, OverAggregate,
// WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP,
// and IntervalJoin, WindowJoin require nothing about UpdateKind.
val children = visitChildren(rel, UpdateKindTrait.NONE)
createNewNode(rel, children, requiredTrait)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;

import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

Expand All @@ -36,7 +36,7 @@ public OverWindowRestoreTest() {
@Override
protected Stream<String> getSavepointPaths(
TableTestProgram program, ExecNodeMetadata metadata) {
if (metadata.version() == 1) {
if (metadata.version() == 1 && program.equals(OverWindowTestPrograms.LAG_OVER_FUNCTION)) {
return Stream.concat(
super.getSavepointPaths(program, metadata),
// See src/test/resources/restore-tests/stream-exec-over-aggregate_1/over
Expand All @@ -50,6 +50,11 @@ protected Stream<String> getSavepointPaths(

@Override
public List<TableTestProgram> programs() {
return Collections.singletonList(OverWindowTestPrograms.LAG_OVER_FUNCTION);
return Arrays.asList(
OverWindowTestPrograms.LAG_OVER_FUNCTION,
OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_RETRACT_MODE,
OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_APPEND_MODE,
OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_APPEND_MODE_MULTIPLE_AGGS,
OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_NO_PARTITION_BY);
}
}
Loading