Skip to content

Commit

Permalink
Improvements: Fixes and changes on TestScenarioRunner (#4901)
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki authored and philemone committed Oct 23, 2023
1 parent 4d59c44 commit f3e98f5
Show file tree
Hide file tree
Showing 19 changed files with 534 additions and 88 deletions.
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ lazy val flinkComponentsTestkit = (project in utils("flink-components-testkit"))
)
}
)
.dependsOn(componentsTestkit, flinkExecutor, flinkTestUtils)
.dependsOn(componentsTestkit, flinkExecutor, flinkTestUtils, defaultHelpers % "test")

//this should be only added in scope test - 'module % "test"'
lazy val liteComponentsTestkit = (project in utils("lite-components-testkit"))
Expand All @@ -1016,7 +1016,8 @@ lazy val liteComponentsTestkit = (project in utils("lite-components-testkit"))
requestResponseRuntime,
liteEngineRuntime,
liteKafkaComponents,
liteRequestResponseComponents
liteRequestResponseComponents,
defaultHelpers % "test"
)

lazy val commonUtils = (project in utils("utils"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import pl.touk.nussknacker.engine.process.{CheckpointConfig, ExecutionConfigPrep
import pl.touk.nussknacker.engine.resultcollector.{ProductionServiceInvocationCollector, ResultCollector}
import pl.touk.nussknacker.engine.splittedgraph.end.BranchEnd
import pl.touk.nussknacker.engine.splittedgraph.{SplittedNodesCollector, splittednode}
import pl.touk.nussknacker.engine.testmode.{SinkInvocationCollector, TestRunId, TestServiceInvocationCollector}
import pl.touk.nussknacker.engine.testmode.TestServiceInvocationCollector
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader
import pl.touk.nussknacker.engine.util.{MetaDataExtractor, ThreadUtils}
import shapeless.syntax.typeable.typeableOps

import java.util.concurrent.TimeUnit
import scala.language.implicitConversions
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap

/*
This is main class where we translate Nussknacker model to Flink job.
Expand All @@ -59,26 +59,30 @@ class FlinkProcessRegistrar(

implicit def millisToTime(duration: Long): Time = Time.of(duration, TimeUnit.MILLISECONDS)

def register(
env: StreamExecutionEnvironment,
process: CanonicalProcess,
processVersion: ProcessVersion,
deploymentData: DeploymentData
): Unit =
register(env, process, processVersion, deploymentData, ProductionServiceInvocationCollector)

def register(
env: StreamExecutionEnvironment,
process: CanonicalProcess,
processVersion: ProcessVersion,
deploymentData: DeploymentData,
testRunId: Option[TestRunId] = None
resultCollector: ResultCollector
): Unit = {
usingRightClassloader(env) { userClassLoader =>
// TODO: move creation outside Registrar, together with refactoring SinkInvocationCollector...
val collector =
testRunId.map(new TestServiceInvocationCollector(_)).getOrElse(ProductionServiceInvocationCollector)

val processCompilation = compileProcess(process, processVersion, collector)
val processCompilation = compileProcess(process, processVersion, resultCollector)
val processWithDeps = processCompilation(UsedNodes.empty, userClassLoader)

streamExecutionEnvPreparer.preRegistration(env, processWithDeps, deploymentData)
val typeInformationDetection = TypeInformationDetectionUtils.forExecutionConfig(env.getConfig, userClassLoader)

val partCompilation = FlinkProcessRegistrar.partCompilation[FlinkProcessCompilerData](processCompilation) _
register(env, partCompilation, processWithDeps, testRunId, typeInformationDetection)
register(env, partCompilation, processWithDeps, resultCollector, typeInformationDetection)
streamExecutionEnvPreparer.postRegistration(env, processWithDeps, deploymentData)
}
}
Expand Down Expand Up @@ -111,7 +115,7 @@ class FlinkProcessRegistrar(
env: StreamExecutionEnvironment,
compiledProcessWithDeps: Option[ProcessPart] => ClassLoader => FlinkProcessCompilerData,
processWithDeps: FlinkProcessCompilerData,
testRunId: Option[TestRunId],
resultCollector: ResultCollector,
typeInformationDetection: TypeInformationDetection
): Unit = {

Expand Down Expand Up @@ -265,19 +269,19 @@ class FlinkProcessRegistrar(
val customNodeContext = nodeContext(nodeComponentInfoFrom(part), Left(contextBefore))
val withValuePrepared = sink.prepareValue(afterInterpretation, customNodeContext)
// TODO: maybe this logic should be moved to compiler instead?
val withSinkAdded = testRunId match {
case None =>
sink.registerSink(withValuePrepared, nodeContext(nodeComponentInfoFrom(part), Left(contextBefore)))
case Some(runId) =>
val withSinkAdded = resultCollector match {
case testResultCollector: TestServiceInvocationCollector =>
val typ = part.node.data.ref.typ
val collectingSink = SinkInvocationCollector(runId, part.id, typ)
val collectingSink = testResultCollector.createSinkInvocationCollector(part.id, typ)
withValuePrepared
.map(
(ds: ValueWithContext[sink.Value]) => ds.map(sink.prepareTestValue),
customNodeContext.valueWithContextInfo.forUnknown
)
// FIXME: ...
.addSink(new CollectingSinkFunction[AnyRef](compiledProcessWithDeps(None), collectingSink, part.id))
case _ =>
sink.registerSink(withValuePrepared, nodeContext(nodeComponentInfoFrom(part), Left(contextBefore)))
}

withSinkAdded.name(operatorName(metaData, part.node, "sink"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer
import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompiler
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListener, ResultsCollectingListenerHolder}
import pl.touk.nussknacker.engine.testmode.{
ResultsCollectingListener,
ResultsCollectingListenerHolder,
TestServiceInvocationCollector
}

import scala.util.Using

object FlinkTestMain extends FlinkRunner {

Expand Down Expand Up @@ -41,18 +47,16 @@ class FlinkTestMain(
val configuration: Configuration
) extends FlinkStubbedRunner {

def runTest[T](variableEncoder: Any => T): TestResults[T] = {
val env = createEnv
val collectingListener = ResultsCollectingListenerHolder.registerRun(variableEncoder)
try {
val registrar: FlinkProcessRegistrar = prepareRegistrar(collectingListener, scenarioTestData)
registrar.register(env, process, processVersion, deploymentData, Option(collectingListener.runId))
def runTest[T](variableEncoder: Any => T): TestResults[T] =
Using.resource(ResultsCollectingListenerHolder.registerRun(variableEncoder)) { collectingListener =>
val resultCollector = new TestServiceInvocationCollector(collectingListener.runId)
val registrar = prepareRegistrar(collectingListener, scenarioTestData)
val env = createEnv

registrar.register(env, process, processVersion, deploymentData, resultCollector)
execute(env, SavepointRestoreSettings.none())
collectingListener.results
} finally {
collectingListener.clean()
}
}

protected def prepareRegistrar[T](
collectingListener: ResultsCollectingListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer
import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompiler
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.testmode.TestRunId
import pl.touk.nussknacker.engine.testmode.{TestRunId, TestServiceInvocationCollector}

object FlinkVerificationMain extends FlinkRunner {

Expand All @@ -36,9 +36,11 @@ class FlinkVerificationMain(
) extends FlinkStubbedRunner {

def runTest(): Unit = {
val env = createEnv
val registrar = prepareRegistrar()
registrar.register(env, process, processVersion, deploymentData, Option(TestRunId("dummy")))
val resultCollector = new TestServiceInvocationCollector(TestRunId("dummy"))
val registrar = prepareRegistrar()
val env = createEnv

registrar.register(env, process, processVersion, deploymentData, resultCollector)
execute(env, SavepointRestoreSettings.forPath(savepointPath, true))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompiler
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.resultcollector.{ProductionServiceInvocationCollector, ResultCollector}
import pl.touk.nussknacker.engine.testing.LocalModelData
import pl.touk.nussknacker.engine.testmode.TestRunId

object TestFlinkRunner {

def registerInEnvironmentWithModel(env: StreamExecutionEnvironment, modelData: ModelData)(
scenario: CanonicalProcess,
deploymentData: DeploymentData = DeploymentData.empty,
version: ProcessVersion = ProcessVersion.empty,
testRunId: Option[TestRunId] = None
resultCollector: ResultCollector = ProductionServiceInvocationCollector
): Unit = {
val registrar =
FlinkProcessRegistrar(new FlinkProcessCompiler(modelData), ExecutionConfigPreparer.unOptimizedChain(modelData))
registrar.register(env, scenario, version, deploymentData, testRunId)
registrar.register(env, scenario, version, deploymentData, resultCollector)
}

def registerInEnvironment(
Expand All @@ -35,10 +35,10 @@ object TestFlinkRunner {
scenario: CanonicalProcess,
deploymentData: DeploymentData = DeploymentData.empty,
version: ProcessVersion = ProcessVersion.empty,
testRunId: Option[TestRunId] = None
resultCollector: ResultCollector = ProductionServiceInvocationCollector
): Unit = {
val modelData = LocalModelData(config, configCreator)
registerInEnvironmentWithModel(env, modelData)(scenario, deploymentData, version, testRunId)
registerInEnvironmentWithModel(env, modelData)(scenario, deploymentData, version, resultCollector)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ class InterpreterTestRunner[F[_]: InterpreterShape: CapabilityTransformer: Effec
// in tests we don't send metrics anywhere
val testContext = LiteEngineRuntimeContextPreparer.noOp.prepare(testJobData(process))
val componentUseCase: ComponentUseCase = ComponentUseCase.TestRuntime
val testServiceInvocationCollector = new TestServiceInvocationCollector(collectingListener.runId)

// FIXME: validation??
val scenarioInterpreter = ScenarioInterpreterFactory.createInterpreter[F, Input, Res](
process,
modelData,
additionalListeners = List(collectingListener),
new TestServiceInvocationCollector(collectingListener.runId),
testServiceInvocationCollector,
componentUseCase
)(SynchronousExecutionContext.ctx, implicitly[InterpreterShape[F]], implicitly[CapabilityTransformer[F]]) match {
case Valid(interpreter) => interpreter
Expand All @@ -81,7 +82,7 @@ class InterpreterTestRunner[F[_]: InterpreterShape: CapabilityTransformer: Effec

val results = implicitly[EffectUnwrapper[F]].apply(scenarioInterpreter.invoke(inputs))

collectSinkResults(collectingListener.runId, results)
collectSinkResults(testServiceInvocationCollector, results)
collectingListener.results
} finally {
collectingListener.clean()
Expand All @@ -96,11 +97,16 @@ class InterpreterTestRunner[F[_]: InterpreterShape: CapabilityTransformer: Effec
JobData(process.metaData, processVersion)
}

private def collectSinkResults(runId: TestRunId, results: ResultType[EndResult[Res]]): Unit = {
private def collectSinkResults(
testServiceInvocationCollector: TestServiceInvocationCollector,
results: ResultType[EndResult[Res]]
): Unit = {
val successfulResults = results.value
successfulResults.foreach { result =>
val node = result.nodeId.id
SinkInvocationCollector(runId, node, node).collect(result.context, result.result)
testServiceInvocationCollector
.createSinkInvocationCollector(node, node)
.collect(result.context, result.result)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package pl.touk.nussknacker.engine.testmode

import cats.Monad
import pl.touk.nussknacker.engine.api.{Context, ContextId}
import pl.touk.nussknacker.engine.api.test.InvocationCollectors.{CollectableAction, ToCollect, TransmissionNames}
import cats.implicits._
import pl.touk.nussknacker.engine.api.NodeId
import pl.touk.nussknacker.engine.api.test.InvocationCollectors.{CollectableAction, ToCollect, TransmissionNames}
import pl.touk.nussknacker.engine.api.{Context, ContextId, NodeId}
import pl.touk.nussknacker.engine.resultcollector.ResultCollector

import scala.language.higherKinds
Expand Down Expand Up @@ -39,10 +38,13 @@ class TestServiceInvocationCollector(testRunId: TestRunId) extends ResultCollect
}
}

def createSinkInvocationCollector(nodeId: String, ref: String): SinkInvocationCollector =
new SinkInvocationCollector(testRunId, nodeId, ref)

}

//TODO: this should be somehow expressed via ResultCollector/TestServiceInvocationCollector
case class SinkInvocationCollector(runId: TestRunId, nodeId: String, ref: String) {
final class SinkInvocationCollector(runId: TestRunId, nodeId: String, ref: String) extends Serializable {

def collect(context: Context, result: Any): Unit = {
ResultsCollectingListenerHolder.updateResults(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigValueFactory.fromAnyRef
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.process.EmptyProcessConfigCreator
import pl.touk.nussknacker.engine.api.process.{
EmptyProcessConfigCreator,
ExpressionConfig,
ProcessObjectDependencies,
WithCategories
}
import pl.touk.nussknacker.engine.testing.LocalModelData

object ModelWithTestExtensions {
Expand All @@ -21,7 +26,18 @@ object ModelWithTestExtensions {
s"components.${TestComponentsProvider.name}.${TestComponentsProvider.testRunIdConfig}",
fromAnyRef(testExtensionsHolder.runId.id)
)
val model = LocalModelData(configWithRunId, new EmptyProcessConfigCreator)

val configCreator = new EmptyProcessConfigCreator {
override def expressionConfig(processObjectDependencies: ProcessObjectDependencies) = {
val globalProcessVariables = globalVariables.map { case (key, value) =>
key -> WithCategories.anyCategory(value)
}

ExpressionConfig(globalProcessVariables, List.empty)
}
}

val model = LocalModelData(configWithRunId, configCreator)
try {
action(model)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@ import cats.data.ValidatedNel
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError
import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo
import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, WithCategories}
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.process.ComponentUseCase.{EngineRuntime, TestRuntime}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.resultcollector.{ProductionServiceInvocationCollector, ResultCollector}
import pl.touk.nussknacker.engine.testmode.{
ResultsCollectingListener,
ResultsCollectingListenerHolder,
TestServiceInvocationCollector
}
import pl.touk.nussknacker.engine.util.test.TestScenarioRunner.RunnerListResult

import scala.reflect.ClassTag
Expand Down Expand Up @@ -58,6 +64,28 @@ trait TestScenarioRunnerBuilder[R <: TestScenarioRunner, B <: TestScenarioRunner

}

object TestScenarioCollectorHandler {

def createHandler(componentUseCase: ComponentUseCase): TestScenarioCollectorHandler = {
val (resultCollector, resultsCollectingHolder) = if (ComponentUseCase.TestRuntime == componentUseCase) {
val collectingListener = ResultsCollectingListenerHolder.registerRun(identity)
(new TestServiceInvocationCollector(collectingListener.runId), Some(collectingListener))
} else {
(ProductionServiceInvocationCollector, None)
}

new TestScenarioCollectorHandler(resultCollector, resultsCollectingHolder)
}

final class TestScenarioCollectorHandler(
val resultCollector: ResultCollector,
private val resultsCollectingListener: Option[ResultsCollectingListener]
) extends AutoCloseable {
def close(): Unit = resultsCollectingListener.foreach(_.clean())
}

}

trait ClassBasedTestScenarioRunner extends TestScenarioRunner {
// TODO: add generate test data support
def runWithData[T: ClassTag, R](scenario: CanonicalProcess, data: List[T]): RunnerListResult[R]
Expand Down
Loading

0 comments on commit f3e98f5

Please sign in to comment.