diff --git a/docs/Changelog.md b/docs/Changelog.md index f230163f3be..8f7082f6510 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -27,6 +27,7 @@ * `accessTokenIsJwt` Oidc configuration introduced in [#4283](https://github.com/TouK/nussknacker/pull/4283) is removed: `audience` configuration specifies that access token is a JWT as it was before this change * [#4797](https://github.com/TouK/nussknacker/pull/4797) Ability to define the name of query parameter with access token that will be passed into tabs url * [#4804](https://github.com/TouK/nussknacker/pull/4804) Improvement: Allow passing globalVariables on TestRunner +* [#4828](https://github.com/TouK/nussknacker/pull/4828) Improvement: Allow passing timestampAssigner at FlinkTestScenarioRunner 1.11.3 (11 Sep 2023) ------------------------- diff --git a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala index 0c1b00dff60..7c3c83d1f6d 100644 --- a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala +++ b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala @@ -4,16 +4,12 @@ import com.typesafe.config.Config import org.apache.flink.api.common.typeinfo.TypeInformation import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.component.ComponentDefinition -import pl.touk.nussknacker.engine.api.process.{ - ComponentUseCase, - EmptyProcessConfigCreator, - SourceFactory, - WithCategories -} +import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, EmptyProcessConfigCreator, SourceFactory} import pl.touk.nussknacker.engine.api.typed.typing import pl.touk.nussknacker.engine.api.typed.typing.Typed import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.DeploymentData +import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolder import pl.touk.nussknacker.engine.flink.util.source.CollectionSource import pl.touk.nussknacker.engine.flink.util.test.testComponents.{ @@ -32,10 +28,13 @@ import scala.reflect.ClassTag private object testComponents { - def testDataSourceComponent[T: ClassTag: TypeInformation](data: List[T]): ComponentDefinition = { + def testDataSourceComponent[T: ClassTag: TypeInformation]( + data: List[T], + timestampAssigner: Option[TimestampWatermarkHandler[T]] + ): ComponentDefinition = { ComponentDefinition( TestScenarioRunner.testDataSource, - SourceFactory.noParamFromClassTag[T](new CollectionSource[T](data, None, Typed.apply[T])) + SourceFactory.noParamFromClassTag[T](new CollectionSource[T](data, timestampAssigner, Typed.apply[T])) ) } @@ -64,7 +63,27 @@ class FlinkTestScenarioRunner( override def runWithData[I: ClassTag, R](scenario: CanonicalProcess, data: List[I]): RunnerListResult[R] = { implicit val typeInf: TypeInformation[I] = TypeInformation.of(implicitly[ClassTag[I]].runtimeClass.asInstanceOf[Class[I]]) - val testComponents = testDataSourceComponent(data) :: noopSourceComponent :: testResultServiceComponent :: Nil + runWithTestSourceComponent(scenario, testDataSourceComponent(data, None)) + } + + /** + * Can be used to test Flink aggregates where record timestamp is crucial + */ + def runWithDataAndTimestampAssigner[I: ClassTag, R]( + scenario: CanonicalProcess, + data: List[I], + timestampAssigner: TimestampWatermarkHandler[I] + ): RunnerListResult[R] = { + implicit val typeInf: TypeInformation[I] = + TypeInformation.of(implicitly[ClassTag[I]].runtimeClass.asInstanceOf[Class[I]]) + runWithTestSourceComponent(scenario, testDataSourceComponent(data, Some(timestampAssigner))) + } + + private def runWithTestSourceComponent[I: ClassTag, R]( + scenario: CanonicalProcess, + testDataSourceComponent: ComponentDefinition + ): RunnerListResult[R] = { + val testComponents = testDataSourceComponent :: noopSourceComponent :: testResultServiceComponent :: Nil val testComponentHolder = TestExtensionsHolder.registerTestExtensions(components ++ testComponents, globalVariables) run(scenario, testComponentHolder).map { _ => collectResults(testComponentHolder) @@ -88,7 +107,7 @@ class FlinkTestScenarioRunner( def runWithDataIgnoringResults[I: ClassTag](scenario: CanonicalProcess, data: List[I]): RunnerResult[Unit] = { implicit val typeInf: TypeInformation[I] = TypeInformation.of(implicitly[ClassTag[I]].runtimeClass.asInstanceOf[Class[I]]) - val testComponents = testDataSourceComponent(data) :: noopSourceComponent :: Nil + val testComponents = testDataSourceComponent(data, None) :: noopSourceComponent :: Nil val testComponentHolder = TestExtensionsHolder.registerTestExtensions(components ++ testComponents, globalVariables) run(scenario, testComponentHolder) }