Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement: FlinkTestScenarioRunner allow passing timestampAssigner #4828

Merged
merged 2 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* `accessTokenIsJwt` Oidc configuration introduced in [#4283](https://github.com/TouK/nussknacker/pull/4283) is removed: `audience` configuration specifies that access token is a JWT as it was before this change
* [#4797](https://github.com/TouK/nussknacker/pull/4797) Ability to define the name of query parameter with access token that will be passed into tabs url
* [#4804](https://github.com/TouK/nussknacker/pull/4804) Improvement: Allow passing globalVariables on TestRunner
* [#4828](https://github.com/TouK/nussknacker/pull/4828) Improvement: Allow passing timestampAssigner at FlinkTestScenarioRunner

1.11.3 (11 Sep 2023)
-------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@ import com.typesafe.config.Config
import org.apache.flink.api.common.typeinfo.TypeInformation
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.process.{
ComponentUseCase,
EmptyProcessConfigCreator,
SourceFactory,
WithCategories
}
import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, EmptyProcessConfigCreator, SourceFactory}
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler
import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolder
import pl.touk.nussknacker.engine.flink.util.source.CollectionSource
import pl.touk.nussknacker.engine.flink.util.test.testComponents.{
Expand All @@ -32,10 +28,13 @@ import scala.reflect.ClassTag

private object testComponents {

def testDataSourceComponent[T: ClassTag: TypeInformation](data: List[T]): ComponentDefinition = {
def testDataSourceComponent[T: ClassTag: TypeInformation](
data: List[T],
timestampAssigner: Option[TimestampWatermarkHandler[T]]
): ComponentDefinition = {
ComponentDefinition(
TestScenarioRunner.testDataSource,
SourceFactory.noParamFromClassTag[T](new CollectionSource[T](data, None, Typed.apply[T]))
SourceFactory.noParamFromClassTag[T](new CollectionSource[T](data, timestampAssigner, Typed.apply[T]))
)
}

Expand Down Expand Up @@ -64,7 +63,27 @@ class FlinkTestScenarioRunner(
override def runWithData[I: ClassTag, R](scenario: CanonicalProcess, data: List[I]): RunnerListResult[R] = {
implicit val typeInf: TypeInformation[I] =
TypeInformation.of(implicitly[ClassTag[I]].runtimeClass.asInstanceOf[Class[I]])
val testComponents = testDataSourceComponent(data) :: noopSourceComponent :: testResultServiceComponent :: Nil
runWithTestSourceComponent(scenario, testDataSourceComponent(data, None))
}

/**
* Can be used to test Flink aggregates where record timestamp is crucial
*/
def runWithDataAndTimestampAssigner[I: ClassTag, R](
scenario: CanonicalProcess,
data: List[I],
timestampAssigner: TimestampWatermarkHandler[I]
): RunnerListResult[R] = {
implicit val typeInf: TypeInformation[I] =
TypeInformation.of(implicitly[ClassTag[I]].runtimeClass.asInstanceOf[Class[I]])
runWithTestSourceComponent(scenario, testDataSourceComponent(data, Some(timestampAssigner)))
}

private def runWithTestSourceComponent[I: ClassTag, R](
scenario: CanonicalProcess,
testDataSourceComponent: ComponentDefinition
): RunnerListResult[R] = {
val testComponents = testDataSourceComponent :: noopSourceComponent :: testResultServiceComponent :: Nil
val testComponentHolder = TestExtensionsHolder.registerTestExtensions(components ++ testComponents, globalVariables)
run(scenario, testComponentHolder).map { _ =>
collectResults(testComponentHolder)
Expand All @@ -88,7 +107,7 @@ class FlinkTestScenarioRunner(
def runWithDataIgnoringResults[I: ClassTag](scenario: CanonicalProcess, data: List[I]): RunnerResult[Unit] = {
implicit val typeInf: TypeInformation[I] =
TypeInformation.of(implicitly[ClassTag[I]].runtimeClass.asInstanceOf[Class[I]])
val testComponents = testDataSourceComponent(data) :: noopSourceComponent :: Nil
val testComponents = testDataSourceComponent(data, None) :: noopSourceComponent :: Nil
val testComponentHolder = TestExtensionsHolder.registerTestExtensions(components ++ testComponents, globalVariables)
run(scenario, testComponentHolder)
}
Expand Down
Loading