Skip to content

Commit

Permalink
Improvements: Fixes and changes on TestScenarioRunner (#4901)
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki authored and philemone committed Nov 13, 2023
1 parent d270456 commit 97c4b75
Show file tree
Hide file tree
Showing 21 changed files with 575 additions and 121 deletions.
71 changes: 36 additions & 35 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -623,18 +623,18 @@ lazy val flinkDeploymentManager = (project in flink("management"))
IntegrationTest / parallelExecution := false,
libraryDependencies ++= {
Seq(
"org.typelevel" %% "cats-core" % catsV % "provided",
"org.apache.flink" % "flink-streaming-java" % flinkV % flinkScope
"org.typelevel" %% "cats-core" % catsV % "provided",
"org.apache.flink" % "flink-streaming-java" % flinkV % flinkScope
excludeAll (
ExclusionRule("log4j", "log4j"),
ExclusionRule("org.slf4j", "slf4j-log4j12"),
ExclusionRule("com.esotericsoftware", "kryo-shaded"),
),
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV % flinkScope,
"com.softwaremill.retry" %% "retry" % "0.3.6",
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % "it,test",
"com.dimafeng" %% "testcontainers-scala-kafka" % testContainersScalaV % "it,test",
"com.github.tomakehurst" % "wiremock-jre8" % wireMockV % Test
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV % flinkScope,
"com.softwaremill.retry" %% "retry" % "0.3.6",
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % "it,test",
"com.dimafeng" %% "testcontainers-scala-kafka" % testContainersScalaV % "it,test",
"com.github.tomakehurst" % "wiremock-jre8" % wireMockV % Test
) ++ flinkLibScalaDeps(scalaVersion.value, Some(flinkScope))
},
// override scala-collection-compat from com.softwaremill.retry:retry
Expand Down Expand Up @@ -996,7 +996,7 @@ lazy val flinkComponentsTestkit = (project in utils("flink-components-testkit"))
)
}
)
.dependsOn(componentsTestkit, flinkExecutor, flinkTestUtils)
.dependsOn(componentsTestkit, flinkExecutor, flinkTestUtils, defaultHelpers % "test")

//this should be only added in scope test - 'module % "test"'
lazy val liteComponentsTestkit = (project in utils("lite-components-testkit"))
Expand All @@ -1009,7 +1009,8 @@ lazy val liteComponentsTestkit = (project in utils("lite-components-testkit"))
requestResponseRuntime,
liteEngineRuntime,
liteKafkaComponents,
liteRequestResponseComponents
liteRequestResponseComponents,
defaultHelpers % "test"
)

lazy val commonUtils = (project in utils("utils"))
Expand Down Expand Up @@ -1273,8 +1274,8 @@ lazy val liteEngineKafkaIntegrationTest: Project = (project in lite("integration
)
.value,
libraryDependencies ++= Seq(
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % "it",
"com.dimafeng" %% "testcontainers-scala-kafka" % testContainersScalaV % "it"
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % "it",
"com.dimafeng" %% "testcontainers-scala-kafka" % testContainersScalaV % "it"
)
)
.dependsOn(
Expand Down Expand Up @@ -1444,16 +1445,16 @@ lazy val componentsApi = (project in file("components-api"))
name := "nussknacker-components-api",
libraryDependencies ++= {
Seq(
"org.apache.commons" % "commons-text" % flinkCommonsTextV,
"org.typelevel" %% "cats-core" % catsV,
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV,
"com.typesafe" % "config" % configV,
"com.vdurmont" % "semver4j" % "3.1.0",
"javax.validation" % "validation-api" % javaxValidationApiV,
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV,
"com.iheart" %% "ficus" % ficusV,
"org.springframework" % "spring-core" % springV,
"com.google.code.findbugs" % "jsr305" % findBugsV,
"org.apache.commons" % "commons-text" % flinkCommonsTextV,
"org.typelevel" %% "cats-core" % catsV,
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV,
"com.typesafe" % "config" % configV,
"com.vdurmont" % "semver4j" % "3.1.0",
"javax.validation" % "validation-api" % javaxValidationApiV,
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV,
"com.iheart" %% "ficus" % ficusV,
"org.springframework" % "spring-core" % springV,
"com.google.code.findbugs" % "jsr305" % findBugsV,
"com.softwaremill.sttp.client3" %% "async-http-client-backend-future" % sttpV
)
}
Expand Down Expand Up @@ -1526,10 +1527,10 @@ lazy val security = (project in file("security"))
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV,
"com.auth0" % "jwks-rsa" % "0.22.0", // a tool library for reading a remote JWK store, not an Auth0 service dependency

"com.softwaremill.sttp.tapir" %% "tapir-core" % tapirV,
"com.softwaremill.sttp.tapir" %% "tapir-json-circe" % tapirV,
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % "it,test",
"com.github.dasniko" % "testcontainers-keycloak" % "2.5.0" % "it,test" excludeAll (
"com.softwaremill.sttp.tapir" %% "tapir-core" % tapirV,
"com.softwaremill.sttp.tapir" %% "tapir-json-circe" % tapirV,
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % "it,test",
"com.github.dasniko" % "testcontainers-keycloak" % "2.5.0" % "it,test" excludeAll (
// we're using testcontainers-scala which requires a proper junit4 dependency
ExclusionRule("io.quarkus", "quarkus-junit4-mock")
)
Expand Down Expand Up @@ -1570,9 +1571,9 @@ lazy val processReports = (project in file("designer/processReports"))
name := "nussknacker-process-reports",
libraryDependencies ++= {
Seq(
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % "it,test",
"com.dimafeng" %% "testcontainers-scala-influxdb" % testContainersScalaV % "it,test",
"org.influxdb" % "influxdb-java" % "2.23" % "it,test"
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % "it,test",
"com.dimafeng" %% "testcontainers-scala-influxdb" % testContainersScalaV % "it,test",
"org.influxdb" % "influxdb-java" % "2.23" % "it,test"
)
}
)
Expand All @@ -1584,9 +1585,9 @@ lazy val httpUtils = (project in utils("http-utils"))
name := "nussknacker-http-utils",
libraryDependencies ++= {
Seq(
"com.softwaremill.sttp.client3" %% "core" % sttpV,
"com.softwaremill.sttp.client3" %% "json-common" % sttpV,
"com.softwaremill.sttp.client3" %% "circe" % sttpV,
"com.softwaremill.sttp.client3" %% "core" % sttpV,
"com.softwaremill.sttp.client3" %% "json-common" % sttpV,
"com.softwaremill.sttp.client3" %% "circe" % sttpV,
)
}
)
Expand All @@ -1604,13 +1605,13 @@ lazy val openapiComponents = (project in component("openapi"))
.settings(
name := "nussknacker-openapi",
libraryDependencies ++= Seq(
"io.swagger.core.v3" % "swagger-integration" % swaggerIntegrationV excludeAll (
"io.swagger.core.v3" % "swagger-integration" % swaggerIntegrationV excludeAll (
ExclusionRule(organization = "jakarta.activation"),
ExclusionRule(organization = "jakarta.validation")
),
"io.netty" % "netty-transport-native-epoll" % nettyV,
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
"org.scalatest" %% "scalatest" % scalaTestV % "it,test"
"io.netty" % "netty-transport-native-epoll" % nettyV,
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
"org.scalatest" %% "scalatest" % scalaTestV % "it,test"
),
)
.dependsOn(
Expand Down
6 changes: 6 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

1.12.0 (6 Oct 2023)
-------------------------
* [#4901](https://github.com/TouK/nussknacker/pull/4901) Improvements TestScenarioRunner:
* Run runner with proper prepared invocation collector for test mode
* Fix passing global variables on LiteTestScenarioRunner and RequestResponseTestScenarioRunner
* Add missing tests for passing global variables
* Fix bug with passing components on RequestResponseTestScenarioRunner
* Fix bug building source test context on LiteTestScenarioRunner
* [#4697](https://github.com/TouK/nussknacker/pull/4697) Change `api/parameters/*/validate` and `api/parameters/*/suggestions` endpoints.
* Use `processingType` instead of `processName`
* Add `scenarioName` parameter to `ParametersValidationRequest` used in `api/parameters/*/validate`
Expand Down
2 changes: 2 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ To see the biggest differences please consult the [changelog](Changelog.md).

### Other changes
* [#4860](https://github.com/TouK/nussknacker/pull/4860) In file-based configuration, the field `scenarioTypes.<scenarioType>.additionalPropertiesConfig` is renamed to `scenarioTypes.<scenarioType>.scenarioPropertiesConfig`
* [#4901](https://github.com/TouK/nussknacker/pull/4901) Improvements TestScenarioRunner:
* Changes at `FlinkProcessRegistrar.register` passing `resultCollector` instead of `testRunId`

## In version 1.12.x

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import pl.touk.nussknacker.engine.process.{CheckpointConfig, ExecutionConfigPrep
import pl.touk.nussknacker.engine.resultcollector.{ProductionServiceInvocationCollector, ResultCollector}
import pl.touk.nussknacker.engine.splittedgraph.end.BranchEnd
import pl.touk.nussknacker.engine.splittedgraph.{SplittedNodesCollector, splittednode}
import pl.touk.nussknacker.engine.testmode.{SinkInvocationCollector, TestRunId, TestServiceInvocationCollector}
import pl.touk.nussknacker.engine.testmode.TestServiceInvocationCollector
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader
import pl.touk.nussknacker.engine.util.{MetaDataExtractor, ThreadUtils}
import shapeless.syntax.typeable.typeableOps

import java.util.concurrent.TimeUnit
import scala.language.implicitConversions
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap

/*
This is main class where we translate Nussknacker model to Flink job.
Expand All @@ -59,26 +59,30 @@ class FlinkProcessRegistrar(

implicit def millisToTime(duration: Long): Time = Time.of(duration, TimeUnit.MILLISECONDS)

def register(
env: StreamExecutionEnvironment,
process: CanonicalProcess,
processVersion: ProcessVersion,
deploymentData: DeploymentData
): Unit =
register(env, process, processVersion, deploymentData, ProductionServiceInvocationCollector)

def register(
env: StreamExecutionEnvironment,
process: CanonicalProcess,
processVersion: ProcessVersion,
deploymentData: DeploymentData,
testRunId: Option[TestRunId] = None
resultCollector: ResultCollector
): Unit = {
usingRightClassloader(env) { userClassLoader =>
// TODO: move creation outside Registrar, together with refactoring SinkInvocationCollector...
val collector =
testRunId.map(new TestServiceInvocationCollector(_)).getOrElse(ProductionServiceInvocationCollector)

val processCompilation = compileProcess(process, processVersion, collector)
val processCompilation = compileProcess(process, processVersion, resultCollector)
val processWithDeps = processCompilation(UsedNodes.empty, userClassLoader)

streamExecutionEnvPreparer.preRegistration(env, processWithDeps, deploymentData)
val typeInformationDetection = TypeInformationDetectionUtils.forExecutionConfig(env.getConfig, userClassLoader)

val partCompilation = FlinkProcessRegistrar.partCompilation[FlinkProcessCompilerData](processCompilation) _
register(env, partCompilation, processWithDeps, testRunId, typeInformationDetection)
register(env, partCompilation, processWithDeps, resultCollector, typeInformationDetection)
streamExecutionEnvPreparer.postRegistration(env, processWithDeps, deploymentData)
}
}
Expand Down Expand Up @@ -111,7 +115,7 @@ class FlinkProcessRegistrar(
env: StreamExecutionEnvironment,
compiledProcessWithDeps: Option[ProcessPart] => ClassLoader => FlinkProcessCompilerData,
processWithDeps: FlinkProcessCompilerData,
testRunId: Option[TestRunId],
resultCollector: ResultCollector,
typeInformationDetection: TypeInformationDetection
): Unit = {

Expand Down Expand Up @@ -265,19 +269,19 @@ class FlinkProcessRegistrar(
val customNodeContext = nodeContext(nodeComponentInfoFrom(part), Left(contextBefore))
val withValuePrepared = sink.prepareValue(afterInterpretation, customNodeContext)
// TODO: maybe this logic should be moved to compiler instead?
val withSinkAdded = testRunId match {
case None =>
sink.registerSink(withValuePrepared, nodeContext(nodeComponentInfoFrom(part), Left(contextBefore)))
case Some(runId) =>
val withSinkAdded = resultCollector match {
case testResultCollector: TestServiceInvocationCollector =>
val typ = part.node.data.ref.typ
val collectingSink = SinkInvocationCollector(runId, part.id, typ)
val collectingSink = testResultCollector.createSinkInvocationCollector(part.id, typ)
withValuePrepared
.map(
(ds: ValueWithContext[sink.Value]) => ds.map(sink.prepareTestValue),
customNodeContext.valueWithContextInfo.forUnknown
)
// FIXME: ...
.addSink(new CollectingSinkFunction[AnyRef](compiledProcessWithDeps(None), collectingSink, part.id))
case _ =>
sink.registerSink(withValuePrepared, nodeContext(nodeComponentInfoFrom(part), Left(contextBefore)))
}

withSinkAdded.name(operatorName(metaData, part.node, "sink"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer
import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompiler
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListener, ResultsCollectingListenerHolder}
import pl.touk.nussknacker.engine.testmode.{
ResultsCollectingListener,
ResultsCollectingListenerHolder,
TestServiceInvocationCollector
}

import scala.util.Using

object FlinkTestMain extends FlinkRunner {

Expand Down Expand Up @@ -41,18 +47,16 @@ class FlinkTestMain(
val configuration: Configuration
) extends FlinkStubbedRunner {

def runTest[T](variableEncoder: Any => T): TestResults[T] = {
val env = createEnv
val collectingListener = ResultsCollectingListenerHolder.registerRun(variableEncoder)
try {
val registrar: FlinkProcessRegistrar = prepareRegistrar(collectingListener, scenarioTestData)
registrar.register(env, process, processVersion, deploymentData, Option(collectingListener.runId))
def runTest[T](variableEncoder: Any => T): TestResults[T] =
Using.resource(ResultsCollectingListenerHolder.registerRun(variableEncoder)) { collectingListener =>
val resultCollector = new TestServiceInvocationCollector(collectingListener.runId)
val registrar = prepareRegistrar(collectingListener, scenarioTestData)
val env = createEnv

registrar.register(env, process, processVersion, deploymentData, resultCollector)
execute(env, SavepointRestoreSettings.none())
collectingListener.results
} finally {
collectingListener.clean()
}
}

protected def prepareRegistrar[T](
collectingListener: ResultsCollectingListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer
import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompiler
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.testmode.TestRunId
import pl.touk.nussknacker.engine.testmode.{TestRunId, TestServiceInvocationCollector}

object FlinkVerificationMain extends FlinkRunner {

Expand All @@ -36,9 +36,11 @@ class FlinkVerificationMain(
) extends FlinkStubbedRunner {

def runTest(): Unit = {
val env = createEnv
val registrar = prepareRegistrar()
registrar.register(env, process, processVersion, deploymentData, Option(TestRunId("dummy")))
val resultCollector = new TestServiceInvocationCollector(TestRunId("dummy"))
val registrar = prepareRegistrar()
val env = createEnv

registrar.register(env, process, processVersion, deploymentData, resultCollector)
execute(env, SavepointRestoreSettings.forPath(savepointPath, true))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompiler
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.resultcollector.{ProductionServiceInvocationCollector, ResultCollector}
import pl.touk.nussknacker.engine.testing.LocalModelData
import pl.touk.nussknacker.engine.testmode.TestRunId

object TestFlinkRunner {

def registerInEnvironmentWithModel(env: StreamExecutionEnvironment, modelData: ModelData)(
scenario: CanonicalProcess,
deploymentData: DeploymentData = DeploymentData.empty,
version: ProcessVersion = ProcessVersion.empty,
testRunId: Option[TestRunId] = None
resultCollector: ResultCollector = ProductionServiceInvocationCollector
): Unit = {
val registrar =
FlinkProcessRegistrar(new FlinkProcessCompiler(modelData), ExecutionConfigPreparer.unOptimizedChain(modelData))
registrar.register(env, scenario, version, deploymentData, testRunId)
registrar.register(env, scenario, version, deploymentData, resultCollector)
}

def registerInEnvironment(
Expand All @@ -35,10 +35,10 @@ object TestFlinkRunner {
scenario: CanonicalProcess,
deploymentData: DeploymentData = DeploymentData.empty,
version: ProcessVersion = ProcessVersion.empty,
testRunId: Option[TestRunId] = None
resultCollector: ResultCollector = ProductionServiceInvocationCollector
): Unit = {
val modelData = LocalModelData(config, configCreator)
registerInEnvironmentWithModel(env, modelData)(scenario, deploymentData, version, testRunId)
registerInEnvironmentWithModel(env, modelData)(scenario, deploymentData, version, resultCollector)
}

}
Loading

0 comments on commit 97c4b75

Please sign in to comment.