From e6dd14ca0165dd64c9402167204b088e9d9e77b8 Mon Sep 17 00:00:00 2001 From: au_miner <358671982@qq.com> Date: Mon, 9 Dec 2024 20:46:50 +0800 Subject: [PATCH] [FLINK-36851][table] Introducing TemporalJoinITCase in the Async State API --- .../exec/stream/StreamExecTemporalJoin.java | 1 + .../stream/sql/TemporalJoinITCase.scala | 28 +++++++++++++++++-- ...StateStreamOperatorWithStateRetention.java | 5 +++- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java index 09f8079110c28..45f7f77f44dcb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java @@ -74,6 +74,7 @@ name = "stream-exec-temporal-join", version = 1, producedTransformations = StreamExecTemporalJoin.TEMPORAL_JOIN_TRANSFORMATION, + consumedOptions = {"table.exec.async-state.enabled"}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) public class StreamExecTemporalJoin extends ExecNodeBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala index 0809b9e1d52d2..5fb8b9d6251d4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala @@ -23,9 +23,9 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.factories.TestValuesTableFactory.{getResultsAsStrings, registerData} import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase -import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} import org.apache.flink.table.utils.LegacyRowExtension -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension +import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} import org.apache.flink.types.Row import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy} @@ -34,11 +34,22 @@ import org.junit.jupiter.api.extension.{ExtendWith, RegisterExtension} import java.time.LocalDateTime import java.time.format.DateTimeParseException +import java.util import scala.collection.JavaConversions._ @ExtendWith(Array(classOf[ParameterizedTestExtension])) -class TemporalJoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(state) { +class TemporalJoinITCase(state: StateBackendMode, enableAsyncState: Boolean) + extends StreamingWithStateTestBase(state) { + + @BeforeEach + override def before(): Unit = { + super.before() + + tEnv.getConfig.set( + ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED, + Boolean.box(enableAsyncState)) + } @RegisterExtension private val _: EachCallbackWrapper[LegacyRowExtension] = new EachCallbackWrapper[LegacyRowExtension](new LegacyRowExtension) @@ -795,3 +806,14 @@ class TemporalJoinITCase(state: StateBackendMode) extends StreamingWithStateTest TestValuesTableFactory.changelogRow(kind, objects.toArray: _*) } } + +object TemporalJoinITCase { + + @Parameters(name = "StateBackend={0}, EnableAsyncState={1}") + def parameters(): util.Collection[Array[java.lang.Object]] = { + Seq[Array[AnyRef]]( + Array(HEAP_BACKEND, Boolean.box(false)), + Array(HEAP_BACKEND, Boolean.box(true)), + Array(ROCKSDB_BACKEND, Boolean.box(false))) + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/asyncprocessing/BaseTwoInputAsyncStateStreamOperatorWithStateRetention.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/asyncprocessing/BaseTwoInputAsyncStateStreamOperatorWithStateRetention.java index 17a3b6083b4f4..e3f5cf2834ff6 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/asyncprocessing/BaseTwoInputAsyncStateStreamOperatorWithStateRetention.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/asyncprocessing/BaseTwoInputAsyncStateStreamOperatorWithStateRetention.java @@ -28,7 +28,10 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.streaming.api.SimpleTimerService; -import org.apache.flink.streaming.api.operators.*; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.table.data.RowData; import java.io.IOException;