Skip to content

Commit

Permalink
Improvement: FlinkTestScenarioRunner allow passing timestampAssigner (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki authored and philemone committed Oct 23, 2023
1 parent b266ec4 commit 4d59c44
Showing 1 changed file with 29 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@ import com.typesafe.config.Config
import org.apache.flink.api.common.typeinfo.TypeInformation
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.process.{
ComponentUseCase,
EmptyProcessConfigCreator,
SourceFactory,
WithCategories
}
import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, EmptyProcessConfigCreator, SourceFactory}
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler
import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolder
import pl.touk.nussknacker.engine.flink.util.source.CollectionSource
import pl.touk.nussknacker.engine.flink.util.test.testComponents.{
Expand All @@ -32,10 +28,13 @@ import scala.reflect.ClassTag

private object testComponents {

def testDataSourceComponent[T: ClassTag: TypeInformation](data: List[T]): ComponentDefinition = {
def testDataSourceComponent[T: ClassTag: TypeInformation](
data: List[T],
timestampAssigner: Option[TimestampWatermarkHandler[T]]
): ComponentDefinition = {
ComponentDefinition(
TestScenarioRunner.testDataSource,
SourceFactory.noParamFromClassTag[T](new CollectionSource[T](data, None, Typed.apply[T]))
SourceFactory.noParamFromClassTag[T](new CollectionSource[T](data, timestampAssigner, Typed.apply[T]))
)
}

Expand Down Expand Up @@ -64,7 +63,27 @@ class FlinkTestScenarioRunner(
override def runWithData[I: ClassTag, R](scenario: CanonicalProcess, data: List[I]): RunnerListResult[R] = {
implicit val typeInf: TypeInformation[I] =
TypeInformation.of(implicitly[ClassTag[I]].runtimeClass.asInstanceOf[Class[I]])
val testComponents = testDataSourceComponent(data) :: noopSourceComponent :: testResultServiceComponent :: Nil
runWithTestSourceComponent(scenario, testDataSourceComponent(data, None))
}

/**
* Can be used to test Flink aggregates where record timestamp is crucial
*/
def runWithDataAndTimestampAssigner[I: ClassTag, R](
scenario: CanonicalProcess,
data: List[I],
timestampAssigner: TimestampWatermarkHandler[I]
): RunnerListResult[R] = {
implicit val typeInf: TypeInformation[I] =
TypeInformation.of(implicitly[ClassTag[I]].runtimeClass.asInstanceOf[Class[I]])
runWithTestSourceComponent(scenario, testDataSourceComponent(data, Some(timestampAssigner)))
}

private def runWithTestSourceComponent[I: ClassTag, R](
scenario: CanonicalProcess,
testDataSourceComponent: ComponentDefinition
): RunnerListResult[R] = {
val testComponents = testDataSourceComponent :: noopSourceComponent :: testResultServiceComponent :: Nil
val testComponentHolder = TestExtensionsHolder.registerTestExtensions(components ++ testComponents, globalVariables)
run(scenario, testComponentHolder).map { _ =>
collectResults(testComponentHolder)
Expand All @@ -88,7 +107,7 @@ class FlinkTestScenarioRunner(
def runWithDataIgnoringResults[I: ClassTag](scenario: CanonicalProcess, data: List[I]): RunnerResult[Unit] = {
implicit val typeInf: TypeInformation[I] =
TypeInformation.of(implicitly[ClassTag[I]].runtimeClass.asInstanceOf[Class[I]])
val testComponents = testDataSourceComponent(data) :: noopSourceComponent :: Nil
val testComponents = testDataSourceComponent(data, None) :: noopSourceComponent :: Nil
val testComponentHolder = TestExtensionsHolder.registerTestExtensions(components ++ testComponents, globalVariables)
run(scenario, testComponentHolder)
}
Expand Down

0 comments on commit 4d59c44

Please sign in to comment.