Skip to content

Commit

Permalink
[FLINK-36851][table] Introducing TemporalJoinITCase in the Async Stat…
Browse files Browse the repository at this point in the history
…e API
  • Loading branch information
Au-Miner committed Dec 13, 2024
1 parent fd106e1 commit e6dd14c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
name = "stream-exec-temporal-join",
version = 1,
producedTransformations = StreamExecTemporalJoin.TEMPORAL_JOIN_TRANSFORMATION,
consumedOptions = {"table.exec.async-state.enabled"},
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
public class StreamExecTemporalJoin extends ExecNodeBase<RowData>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.factories.TestValuesTableFactory.{getResultsAsStrings, registerData}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
import org.apache.flink.table.utils.LegacyRowExtension
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension
import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters}
import org.apache.flink.types.Row

import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
Expand All @@ -34,11 +34,22 @@ import org.junit.jupiter.api.extension.{ExtendWith, RegisterExtension}

import java.time.LocalDateTime
import java.time.format.DateTimeParseException
import java.util

import scala.collection.JavaConversions._

@ExtendWith(Array(classOf[ParameterizedTestExtension]))
class TemporalJoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(state) {
class TemporalJoinITCase(state: StateBackendMode, enableAsyncState: Boolean)
extends StreamingWithStateTestBase(state) {

@BeforeEach
override def before(): Unit = {
super.before()

tEnv.getConfig.set(
ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED,
Boolean.box(enableAsyncState))
}

@RegisterExtension private val _: EachCallbackWrapper[LegacyRowExtension] =
new EachCallbackWrapper[LegacyRowExtension](new LegacyRowExtension)
Expand Down Expand Up @@ -795,3 +806,14 @@ class TemporalJoinITCase(state: StateBackendMode) extends StreamingWithStateTest
TestValuesTableFactory.changelogRow(kind, objects.toArray: _*)
}
}

object TemporalJoinITCase {

@Parameters(name = "StateBackend={0}, EnableAsyncState={1}")
def parameters(): util.Collection[Array[java.lang.Object]] = {
Seq[Array[AnyRef]](
Array(HEAP_BACKEND, Boolean.box(false)),
Array(HEAP_BACKEND, Boolean.box(true)),
Array(ROCKSDB_BACKEND, Boolean.box(false)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.table.data.RowData;

import java.io.IOException;
Expand Down

0 comments on commit e6dd14c

Please sign in to comment.