diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java index eb6f68a217090..0f37b6bda5f48 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.codegen.CodeGenUtils; import org.apache.flink.table.planner.codegen.CodeGeneratorContext; @@ -50,6 +51,7 @@ import org.apache.flink.table.runtime.operators.join.FlinkJoinType; import org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator; import org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator; +import org.apache.flink.table.runtime.operators.join.temporal.async.AsyncStateTemporalProcessTimeJoinOperator; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; @@ -72,6 +74,7 @@ name = "stream-exec-temporal-join", version = 1, producedTransformations = StreamExecTemporalJoin.TEMPORAL_JOIN_TRANSFORMATION, + consumedOptions = {"table.exec.async-state.enabled"}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) public class StreamExecTemporalJoin extends ExecNodeBase @@ -276,12 +279,21 @@ private TwoInputStreamOperator createJoinOperator( isLeftOuterJoin); } else { if (isTemporalFunctionJoin) { - return new TemporalProcessTimeJoinOperator( - InternalTypeInfo.of(rightInputType), - generatedJoinCondition, - minRetentionTime, - maxRetentionTime, - isLeftOuterJoin); + if (config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED)) { + return new AsyncStateTemporalProcessTimeJoinOperator( + InternalTypeInfo.of(rightInputType), + generatedJoinCondition, + minRetentionTime, + maxRetentionTime, + isLeftOuterJoin); + } else { + return new TemporalProcessTimeJoinOperator( + InternalTypeInfo.of(rightInputType), + generatedJoinCondition, + minRetentionTime, + maxRetentionTime, + isLeftOuterJoin); + } } else { // The exsiting TemporalProcessTimeJoinOperator has already supported temporal table // join. diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala index 0809b9e1d52d2..5fb8b9d6251d4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala @@ -23,9 +23,9 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.factories.TestValuesTableFactory.{getResultsAsStrings, registerData} import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase -import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} import org.apache.flink.table.utils.LegacyRowExtension -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension +import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} import org.apache.flink.types.Row import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy} @@ -34,11 +34,22 @@ import org.junit.jupiter.api.extension.{ExtendWith, RegisterExtension} import java.time.LocalDateTime import java.time.format.DateTimeParseException +import java.util import scala.collection.JavaConversions._ @ExtendWith(Array(classOf[ParameterizedTestExtension])) -class TemporalJoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(state) { +class TemporalJoinITCase(state: StateBackendMode, enableAsyncState: Boolean) + extends StreamingWithStateTestBase(state) { + + @BeforeEach + override def before(): Unit = { + super.before() + + tEnv.getConfig.set( + ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED, + Boolean.box(enableAsyncState)) + } @RegisterExtension private val _: EachCallbackWrapper[LegacyRowExtension] = new EachCallbackWrapper[LegacyRowExtension](new LegacyRowExtension) @@ -795,3 +806,14 @@ class TemporalJoinITCase(state: StateBackendMode) extends StreamingWithStateTest TestValuesTableFactory.changelogRow(kind, objects.toArray: _*) } } + +object TemporalJoinITCase { + + @Parameters(name = "StateBackend={0}, EnableAsyncState={1}") + def parameters(): util.Collection[Array[java.lang.Object]] = { + Seq[Array[AnyRef]]( + Array(HEAP_BACKEND, Boolean.box(false)), + Array(HEAP_BACKEND, Boolean.box(true)), + Array(ROCKSDB_BACKEND, Boolean.box(false))) + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.java index 1d69088485503..ceb891aa3d0d4 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/state/JoinRecordAsyncStateViews.java @@ -63,6 +63,7 @@ public static JoinRecordAsyncStateView create( inputSideSpec.getUniqueKeySelector(), ttlConfig); } + } else { return new InputSideHasNoUniqueKey(ctx, stateName, recordType, ttlConfig); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java index 62d28a77bae9d..d7be442fa8bb9 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java @@ -36,9 +36,9 @@ import java.util.Optional; /** - * An abstract {@link TwoInputStreamOperator} that allows its subclasses to clean up their state - * based on a TTL. This TTL should be specified in the provided {@code minRetentionTime} and {@code - * maxRetentionTime}. + * An abstract {@link TwoInputStreamOperator} that allows its subclasses to clean up their state in + * sync state based on a TTL. This TTL should be specified in the provided {@code minRetentionTime} + * and {@code maxRetentionTime}. * *

For each known key, this operator registers a timer (in processing time) to fire after the TTL * expires. When the timer fires, the subclass can decide which state to cleanup and what further @@ -115,7 +115,7 @@ protected void registerProcessingCleanupTimer() throws IOException { Optional currentCleanupTime = Optional.ofNullable(latestRegisteredCleanupTimer.value()); - if (!currentCleanupTime.isPresent() + if (currentCleanupTime.isEmpty() || (currentProcessingTime + minRetentionTime) > currentCleanupTime.get()) { updateCleanupTimer(currentProcessingTime, currentCleanupTime); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java index 1c0a61242cb6a..1d08c73182c82 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java @@ -33,10 +33,11 @@ import org.apache.flink.table.data.utils.JoinedRowData; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.join.temporal.utils.TemporalProcessTimeJoinHelper; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; /** - * The operator to temporal join a stream on processing time. + * The operator to temporal join a stream on processing time in sync state. * *

For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime)) and temporal table * join (FOR SYSTEM_TIME AS OF), they can reuse same processing-time operator implementation, the @@ -60,6 +61,8 @@ public class TemporalProcessTimeJoinOperator extends BaseTwoInputStreamOperatorW private transient GenericRowData rightNullRow; private transient TimestampedCollector collector; + private transient SyncStateTemporalProcessTimeJoinHelper temporalProcessTimeJoinHelper; + public TemporalProcessTimeJoinOperator( InternalTypeInfo rightType, GeneratedJoinCondition generatedJoinCondition, @@ -88,6 +91,7 @@ public void open() throws Exception { this.rightNullRow = new GenericRowData(rightType.toRowSize()); // consider watermark from left stream only. super.processWatermark2(Watermark.MAX_WATERMARK); + this.temporalProcessTimeJoinHelper = new SyncStateTemporalProcessTimeJoinHelper(); } @Override @@ -95,31 +99,13 @@ public void processElement1(StreamRecord element) throws Exception { RowData leftSideRow = element.getValue(); RowData rightSideRow = rightState.value(); - if (rightSideRow == null) { - if (isLeftOuterJoin) { - collectJoinedRow(leftSideRow, rightNullRow); - } else { - return; - } - } else { - if (joinCondition.apply(leftSideRow, rightSideRow)) { - collectJoinedRow(leftSideRow, rightSideRow); - } else { - if (isLeftOuterJoin) { - collectJoinedRow(leftSideRow, rightNullRow); - } - } + temporalProcessTimeJoinHelper.processElement1(leftSideRow, rightSideRow); + if (rightSideRow != null) { // register a cleanup timer only if the rightSideRow is not null registerProcessingCleanupTimer(); } } - private void collectJoinedRow(RowData leftRow, RowData rightRow) { - outRow.setRowKind(leftRow.getRowKind()); - outRow.replace(leftRow, rightRow); - collector.collect(outRow); - } - @Override public void processElement2(StreamRecord element) throws Exception { if (RowDataUtil.isAccumulateMsg(element.getValue())) { @@ -150,4 +136,10 @@ public void cleanupState(long time) { /** Invoked when an event-time timer fires. */ @Override public void onEventTime(InternalTimer timer) throws Exception {} + + private class SyncStateTemporalProcessTimeJoinHelper extends TemporalProcessTimeJoinHelper { + public SyncStateTemporalProcessTimeJoinHelper() { + super(isLeftOuterJoin, joinCondition, outRow, rightNullRow, collector); + } + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/async/AsyncStateTemporalProcessTimeJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/async/AsyncStateTemporalProcessTimeJoinOperator.java new file mode 100644 index 0000000000000..73272f8208732 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/async/AsyncStateTemporalProcessTimeJoinOperator.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.temporal.async; + +import org.apache.flink.api.common.functions.DefaultOpenContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.v2.ValueStateDescriptor; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.RowDataUtil; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.join.temporal.utils.TemporalProcessTimeJoinHelper; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; + +/** + * The operator to temporal join a stream on processing time in async state. + * + *

For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime)) and temporal table + * join (FOR SYSTEM_TIME AS OF), they can reuse same processing-time operator implementation, the + * differences between them are: (1) The temporal TableFunction join only supports single column in + * primary key but temporal table join supports arbitrary columns in primary key. (2) The temporal + * TableFunction join only supports inner join, temporal table join supports both inner join and + * left outer join. + */ +public class AsyncStateTemporalProcessTimeJoinOperator + extends BaseTwoInputAsyncStateStreamOperatorWithStateRetention { + + private static final long serialVersionUID = 1L; + + private final boolean isLeftOuterJoin; + private final InternalTypeInfo rightType; + private final GeneratedJoinCondition generatedJoinCondition; + + private transient ValueState rightState; + private transient JoinCondition joinCondition; + + private transient JoinedRowData outRow; + private transient GenericRowData rightNullRow; + private transient TimestampedCollector collector; + + private transient AsyncStateTemporalProcessTimeJoinHelper temporalProcessTimeJoinHelper; + + public AsyncStateTemporalProcessTimeJoinOperator( + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + long minRetentionTime, + long maxRetentionTime, + boolean isLeftOuterJoin) { + super(minRetentionTime, maxRetentionTime); + this.rightType = rightType; + this.generatedJoinCondition = generatedJoinCondition; + this.isLeftOuterJoin = isLeftOuterJoin; + } + + @Override + public void open() throws Exception { + super.open(); + this.joinCondition = + generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()); + FunctionUtils.setFunctionRuntimeContext(joinCondition, getRuntimeContext()); + FunctionUtils.openFunction(joinCondition, DefaultOpenContext.INSTANCE); + + ValueStateDescriptor rightStateDesc = + new ValueStateDescriptor<>("right", rightType); + this.rightState = getRuntimeContext().getValueState(rightStateDesc); + this.collector = new TimestampedCollector<>(output); + this.outRow = new JoinedRowData(); + this.rightNullRow = new GenericRowData(rightType.toRowSize()); + // consider watermark from left stream only. + super.processWatermark2(Watermark.MAX_WATERMARK); + this.temporalProcessTimeJoinHelper = new AsyncStateTemporalProcessTimeJoinHelper(); + } + + @Override + public void processElement1(StreamRecord element) throws Exception { + RowData leftSideRow = element.getValue(); + // RowData rightSideRow = rightState.value(); + StateFuture rightSideRowFuture = rightState.asyncValue(); + + rightSideRowFuture.thenAccept( + rightSideRow -> { + temporalProcessTimeJoinHelper.processElement1(leftSideRow, rightSideRow); + if (rightSideRow != null) { + // register a cleanup timer only if the rightSideRow is not null + registerProcessingCleanupTimer(); + } + }); + } + + @Override + public void processElement2(StreamRecord element) throws Exception { + if (RowDataUtil.isAccumulateMsg(element.getValue())) { + StateFuture updateFuture = rightState.asyncUpdate(element.getValue()); + updateFuture.thenAccept(Void -> registerProcessingCleanupTimer()); + } else { + StateFuture clearFuture = rightState.asyncClear(); + clearFuture.thenAccept(Void -> cleanupLastTimer()); + } + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(joinCondition); + super.close(); + } + + /** + * The method to be called when a cleanup timer fires. + * + * @param time The timestamp of the fired timer. + */ + @Override + public StateFuture cleanupState(long time) { + return rightState.asyncClear(); + } + + /** Invoked when an event-time timer fires. */ + @Override + public void onEventTime(InternalTimer timer) throws Exception {} + + private class AsyncStateTemporalProcessTimeJoinHelper extends TemporalProcessTimeJoinHelper { + public AsyncStateTemporalProcessTimeJoinHelper() { + super(isLeftOuterJoin, joinCondition, outRow, rightNullRow, collector); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/async/BaseTwoInputAsyncStateStreamOperatorWithStateRetention.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/async/BaseTwoInputAsyncStateStreamOperatorWithStateRetention.java new file mode 100644 index 0000000000000..d37ddec9e9406 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/async/BaseTwoInputAsyncStateStreamOperatorWithStateRetention.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.temporal.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.v2.ValueStateDescriptor; +import org.apache.flink.streaming.api.SimpleTimerService; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.table.data.RowData; + +import java.io.IOException; +import java.util.Optional; + +/** + * An abstract {@link TwoInputStreamOperator} that allows its subclasses to clean up their state in + * async state based on a TTL. This TTL should be specified in the provided {@code minRetentionTime} + * and {@code maxRetentionTime}. + * + *

For each known key, this operator registers a timer (in processing time) to fire after the TTL + * expires. When the timer fires, the subclass can decide which state to cleanup and what further + * action to take. + * + *

This class takes care of maintaining at most one timer per key. + * + *

IMPORTANT NOTE TO USERS: When extending this class, do not use processing time timers + * in your business logic. The reason is that: + * + *

1) if your timers collide with clean up timers and you delete them, then state clean-up will + * not be performed, and + * + *

2) (this one is the reason why this class does not allow to override the onProcessingTime()) + * the onProcessingTime with your logic would be also executed on each clean up timer. + */ +@Internal +public abstract class BaseTwoInputAsyncStateStreamOperatorWithStateRetention + extends AbstractAsyncStateStreamOperator + implements TwoInputStreamOperator, + Triggerable { + + private static final long serialVersionUID = -5953921797477294258L; + + private static final String CLEANUP_TIMESTAMP = "cleanup-timestamp"; + private static final String TIMERS_STATE_NAME = "timers"; + + private final long minRetentionTime; + private final long maxRetentionTime; + protected final boolean stateCleaningEnabled; + + private transient ValueState latestRegisteredCleanupTimer; + private transient SimpleTimerService timerService; + + protected BaseTwoInputAsyncStateStreamOperatorWithStateRetention( + long minRetentionTime, long maxRetentionTime) { + this.minRetentionTime = minRetentionTime; + this.maxRetentionTime = maxRetentionTime; + this.stateCleaningEnabled = minRetentionTime > 1; + } + + @Override + public boolean useSplittableTimers() { + return true; + } + + @Override + public void open() throws Exception { + initializeTimerService(); + + if (stateCleaningEnabled) { + ValueStateDescriptor cleanupStateDescriptor = + new ValueStateDescriptor<>(CLEANUP_TIMESTAMP, Types.LONG); + latestRegisteredCleanupTimer = + getRuntimeContext().getValueState(cleanupStateDescriptor); + } + } + + private void initializeTimerService() { + InternalTimerService internalTimerService = + getInternalTimerService(TIMERS_STATE_NAME, VoidNamespaceSerializer.INSTANCE, this); + + timerService = new SimpleTimerService(internalTimerService); + } + + /** + * If the user has specified a {@code minRetentionTime} and {@code maxRetentionTime}, this + * method registers a cleanup timer for {@code currentProcessingTime + minRetentionTime}. + * + *

When this timer fires, the {@link #cleanupState(long)} method is called. + */ + protected StateFuture registerProcessingCleanupTimer() throws IOException { + if (!stateCleaningEnabled) { + return StateFutureUtils.completedFuture(null); + } + long currentProcessingTime = timerService.currentProcessingTime(); + StateFuture cleanupTimeFuture = latestRegisteredCleanupTimer.asyncValue(); + return cleanupTimeFuture.thenCompose( + cleanupTime -> { + Optional currentCleanupTime = Optional.ofNullable(cleanupTime); + + if (currentCleanupTime.isEmpty() + || (currentProcessingTime + minRetentionTime) + > currentCleanupTime.get()) { + return updateCleanupTimer(currentProcessingTime, currentCleanupTime); + } else { + return StateFutureUtils.completedFuture(null); + } + }); + } + + private StateFuture updateCleanupTimer( + long currentProcessingTime, Optional currentCleanupTime) throws IOException { + currentCleanupTime.ifPresent(aLong -> timerService.deleteProcessingTimeTimer(aLong)); + + long newCleanupTime = currentProcessingTime + maxRetentionTime; + timerService.registerProcessingTimeTimer(newCleanupTime); + return latestRegisteredCleanupTimer.asyncUpdate(newCleanupTime); + } + + protected StateFuture cleanupLastTimer() throws IOException { + if (!stateCleaningEnabled) { + return StateFutureUtils.completedFuture(null); + } + StateFuture cleanupTimeFuture = latestRegisteredCleanupTimer.asyncValue(); + return cleanupTimeFuture.thenCompose( + cleanupTime -> { + Optional currentCleanupTime = Optional.ofNullable(cleanupTime); + if (currentCleanupTime.isPresent()) { + timerService.deleteProcessingTimeTimer(currentCleanupTime.get()); + return latestRegisteredCleanupTimer.asyncClear(); + } + return StateFutureUtils.completedFuture(null); + }); + } + + /** The users of this class are not allowed to use processing time timers. See class javadoc. */ + @Override + public final void onProcessingTime(InternalTimer timer) + throws Exception { + if (stateCleaningEnabled) { + long timerTime = timer.getTimestamp(); + StateFuture cleanupTimeFuture = latestRegisteredCleanupTimer.asyncValue(); + + cleanupTimeFuture.thenAccept( + cleanupTime -> { + if (cleanupTime != null && cleanupTime == timerTime) { + StateFuture cleanupStateFuture = cleanupState(cleanupTime); + cleanupStateFuture.thenAccept( + Void -> latestRegisteredCleanupTimer.asyncClear()); + } + }); + } + } + + // ----------------- Abstract Methods ----------------- + + /** + * The method to be called when a cleanup timer fires. + * + * @param time The timestamp of the fired timer. + */ + public abstract StateFuture cleanupState(long time); +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/utils/TemporalProcessTimeJoinHelper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/utils/TemporalProcessTimeJoinHelper.java new file mode 100644 index 0000000000000..c3885a7a143e2 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/utils/TemporalProcessTimeJoinHelper.java @@ -0,0 +1,52 @@ +package org.apache.flink.table.runtime.operators.join.temporal.utils; + +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.JoinCondition; + +public class TemporalProcessTimeJoinHelper { + private final boolean isLeftOuterJoin; + private final JoinCondition joinCondition; + private final JoinedRowData outRow; + private final GenericRowData rightNullRow; + private final TimestampedCollector collector; + + public TemporalProcessTimeJoinHelper( + boolean isLeftOuterJoin, + JoinCondition joinCondition, + JoinedRowData outRow, + GenericRowData rightNullRow, + TimestampedCollector collector) { + this.isLeftOuterJoin = isLeftOuterJoin; + this.joinCondition = joinCondition; + this.outRow = outRow; + this.rightNullRow = rightNullRow; + this.collector = collector; + } + + public void processElement1(RowData leftSideRow, RowData rightSideRow) throws Exception { + if (rightSideRow == null) { + if (isLeftOuterJoin) { + collectJoinedRow(leftSideRow, rightNullRow); + } else { + return; + } + } else { + if (joinCondition.apply(leftSideRow, rightSideRow)) { + collectJoinedRow(leftSideRow, rightSideRow); + } else { + if (isLeftOuterJoin) { + collectJoinedRow(leftSideRow, rightNullRow); + } + } + } + } + + private void collectJoinedRow(RowData leftRow, RowData rightRow) { + outRow.setRowKind(leftRow.getRowKind()); + outRow.replace(leftRow, rightRow); + collector.collect(outRow); + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java index c998dce5b7ed2..8f45a5b2aedd7 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java @@ -19,18 +19,26 @@ package org.apache.flink.table.runtime.operators.join.temporal; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.join.temporal.async.AsyncStateTemporalProcessTimeJoinOperator; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.utils.HandwrittenSelectorUtil; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; @@ -39,6 +47,7 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord; /** Harness tests for {@link TemporalProcessTimeJoinOperator}. */ +@ExtendWith(ParameterizedTestExtension.class) class TemporalProcessTimeJoinOperatorTest extends TemporalTimeJoinOperatorTestBase { private int keyIdx = 0; @@ -57,13 +66,19 @@ class TemporalProcessTimeJoinOperatorTest extends TemporalTimeJoinOperatorTestBa private RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputRowType.toRowFieldTypes()); + @Parameters(name = "enableAsyncState = {0}") + public static List enableAsyncState() { + return Arrays.asList(false, true); + } + + @Parameter private boolean enableAsyncState; + /** Test proctime temporal join. */ - @Test + @TestTemplate void testProcTimeTemporalJoin() throws Exception { - TemporalProcessTimeJoinOperator joinOperator = - new TemporalProcessTimeJoinOperator(rowType, joinCondition, 0, 0, false); - KeyedTwoInputStreamOperatorTestHarness testHarness = - createTestHarness(joinOperator); + TwoInputStreamOperator joinOperator = + createTemporalProcessTimeJoinOperator(rowType, joinCondition, 0, 0, false); + testHarness = createTestHarness(joinOperator); testHarness.open(); testHarness.setProcessingTime(1); testHarness.processElement1(insertRecord(1L, "1a1")); @@ -88,15 +103,14 @@ void testProcTimeTemporalJoin() throws Exception { } /** Test proctime temporal join when set idle state retention. */ - @Test + @TestTemplate void testProcTimeTemporalJoinWithStateRetention() throws Exception { final int minRetentionTime = 10; final int maxRetentionTime = minRetentionTime * 3 / 2; - TemporalProcessTimeJoinOperator joinOperator = - new TemporalProcessTimeJoinOperator( + TwoInputStreamOperator joinOperator = + createTemporalProcessTimeJoinOperator( rowType, joinCondition, minRetentionTime, maxRetentionTime, false); - KeyedTwoInputStreamOperatorTestHarness testHarness = - createTestHarness(joinOperator); + testHarness = createTestHarness(joinOperator); testHarness.open(); testHarness.setProcessingTime(1); testHarness.processElement1(insertRecord(1L, "1a1")); @@ -118,15 +132,14 @@ void testProcTimeTemporalJoinWithStateRetention() throws Exception { } /** Test proctime left temporal join when set idle state retention. */ - @Test + @TestTemplate void testLeftProcTimeTemporalJoinWithStateRetention() throws Exception { final int minRetentionTime = 10; final int maxRetentionTime = minRetentionTime * 3 / 2; - TemporalProcessTimeJoinOperator joinOperator = - new TemporalProcessTimeJoinOperator( + TwoInputStreamOperator joinOperator = + createTemporalProcessTimeJoinOperator( rowType, joinCondition, minRetentionTime, maxRetentionTime, true); - KeyedTwoInputStreamOperatorTestHarness testHarness = - createTestHarness(joinOperator); + testHarness = createTestHarness(joinOperator); testHarness.open(); testHarness.setProcessingTime(1); testHarness.processElement1(insertRecord(1L, "1a1")); @@ -150,12 +163,11 @@ void testLeftProcTimeTemporalJoinWithStateRetention() throws Exception { } /** Test proctime temporal join changelog stream. */ - @Test + @TestTemplate void testProcTimeTemporalJoinOnChangelog() throws Exception { - TemporalProcessTimeJoinOperator joinOperator = - new TemporalProcessTimeJoinOperator(rowType, joinCondition, 0, 0, false); - KeyedTwoInputStreamOperatorTestHarness testHarness = - createTestHarness(joinOperator); + TwoInputStreamOperator joinOperator = + createTemporalProcessTimeJoinOperator(rowType, joinCondition, 0, 0, false); + testHarness = createTestHarness(joinOperator); testHarness.open(); testHarness.setProcessingTime(1); testHarness.processElement1(insertRecord(1L, "1a1")); @@ -186,11 +198,36 @@ void testProcTimeTemporalJoinOnChangelog() throws Exception { testHarness.close(); } - private KeyedTwoInputStreamOperatorTestHarness - createTestHarness(TemporalProcessTimeJoinOperator temporalJoinOperator) + @Override + protected KeyedTwoInputStreamOperatorTestHarness + createTestHarness( + TwoInputStreamOperator temporalJoinOperator) throws Exception { return new KeyedTwoInputStreamOperatorTestHarness<>( temporalJoinOperator, keySelector, keySelector, keyType); } + + private TwoInputStreamOperator createTemporalProcessTimeJoinOperator( + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + long minRetentionTime, + long maxRetentionTime, + boolean isLeftOuterJoin) { + if (!enableAsyncState) { + return new TemporalProcessTimeJoinOperator( + rightType, + generatedJoinCondition, + minRetentionTime, + maxRetentionTime, + isLeftOuterJoin); + } else { + return new AsyncStateTemporalProcessTimeJoinOperator( + rightType, + generatedJoinCondition, + minRetentionTime, + maxRetentionTime, + isLeftOuterJoin); + } + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java index bff3b8ae73aaf..3956bb10d9ce6 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java @@ -20,13 +20,19 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; @@ -37,6 +43,14 @@ /** Harness tests for {@link TemporalRowTimeJoinOperatorTest}. */ class TemporalRowTimeJoinOperatorTest extends TemporalTimeJoinOperatorTestBase { + + @Parameters(name = "enableAsyncState = {0}") + public static List enableAsyncState() { + return Arrays.asList(false); + } + + @Parameter private boolean enableAsyncState; + /** Test rowtime temporal join. */ @Test void testRowTimeTemporalJoin() throws Exception { @@ -77,7 +91,7 @@ void testRowTimeLeftTemporalJoin() throws Exception { private void testRowTimeTemporalJoin(boolean isLeftOuterJoin, List expectedOutput) throws Exception { TemporalRowTimeJoinOperator joinOperator = - new TemporalRowTimeJoinOperator( + createTemporalRowTimeJoinOperator( rowType, rowType, joinCondition, 0, 0, 0, 0, isLeftOuterJoin); KeyedTwoInputStreamOperatorTestHarness testHarness = createTestHarness(joinOperator); @@ -125,7 +139,7 @@ void testRowTimeTemporalJoinWithStateRetention() throws Exception { final int minRetentionTime = 4; final int maxRetentionTime = minRetentionTime * 3 / 2; TemporalRowTimeJoinOperator joinOperator = - new TemporalRowTimeJoinOperator( + createTemporalRowTimeJoinOperator( rowType, rowType, joinCondition, @@ -230,7 +244,7 @@ void testRowTimeLeftTemporalJoinOnUpsertSource() throws Exception { private void testRowTimeTemporalJoinOnUpsertSource( boolean isLeftOuterJoin, List expectedOutput) throws Exception { TemporalRowTimeJoinOperator joinOperator = - new TemporalRowTimeJoinOperator( + createTemporalRowTimeJoinOperator( rowType, rowType, joinCondition, 0, 0, 0, 0, isLeftOuterJoin); KeyedTwoInputStreamOperatorTestHarness testHarness = createTestHarness(joinOperator); @@ -270,10 +284,33 @@ private void testRowTimeTemporalJoinOnUpsertSource( testHarness.close(); } - private KeyedTwoInputStreamOperatorTestHarness - createTestHarness(TemporalRowTimeJoinOperator temporalJoinOperator) throws Exception { + @Override + protected KeyedTwoInputStreamOperatorTestHarness + createTestHarness( + TwoInputStreamOperator temporalJoinOperator) + throws Exception { return new KeyedTwoInputStreamOperatorTestHarness<>( temporalJoinOperator, keySelector, keySelector, keyType); } + + private TemporalRowTimeJoinOperator createTemporalRowTimeJoinOperator( + InternalTypeInfo leftType, + InternalTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + int leftTimeAttribute, + int rightTimeAttribute, + long minRetentionTime, + long maxRetentionTime, + boolean isLeftOuterJoin) { + return new TemporalRowTimeJoinOperator( + leftType, + rightType, + generatedJoinCondition, + leftTimeAttribute, + rightTimeAttribute, + minRetentionTime, + maxRetentionTime, + isLeftOuterJoin); + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java index 8ba7a8741bf79..8b580a8b340ef 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java @@ -19,6 +19,8 @@ package org.apache.flink.table.runtime.operators.join.temporal; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; @@ -62,4 +64,12 @@ abstract class TemporalTimeJoinOperatorTestBase { HandwrittenSelectorUtil.getRowDataSelector( new int[] {keyIdx}, rowType.toRowFieldTypes()); protected TypeInformation keyType = keySelector.getProducedType(); + + protected KeyedTwoInputStreamOperatorTestHarness + testHarness; + + protected abstract KeyedTwoInputStreamOperatorTestHarness + createTestHarness( + TwoInputStreamOperator temporalJoinOperator) + throws Exception; }