diff --git a/build.sbt b/build.sbt index 70e96249a9e..bd2b319c4a5 100644 --- a/build.sbt +++ b/build.sbt @@ -1003,7 +1003,7 @@ lazy val flinkComponentsTestkit = (project in utils("flink-components-testkit")) ) } ) - .dependsOn(componentsTestkit, flinkExecutor, flinkTestUtils) + .dependsOn(componentsTestkit, flinkExecutor, flinkTestUtils, defaultHelpers % "test") //this should be only added in scope test - 'module % "test"' lazy val liteComponentsTestkit = (project in utils("lite-components-testkit")) @@ -1016,7 +1016,8 @@ lazy val liteComponentsTestkit = (project in utils("lite-components-testkit")) requestResponseRuntime, liteEngineRuntime, liteKafkaComponents, - liteRequestResponseComponents + liteRequestResponseComponents, + defaultHelpers % "test" ) lazy val commonUtils = (project in utils("utils")) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala index db0640772a8..3d38bb268a5 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala @@ -32,14 +32,14 @@ import pl.touk.nussknacker.engine.process.{CheckpointConfig, ExecutionConfigPrep import pl.touk.nussknacker.engine.resultcollector.{ProductionServiceInvocationCollector, ResultCollector} import pl.touk.nussknacker.engine.splittedgraph.end.BranchEnd import pl.touk.nussknacker.engine.splittedgraph.{SplittedNodesCollector, splittednode} -import pl.touk.nussknacker.engine.testmode.{SinkInvocationCollector, TestRunId, TestServiceInvocationCollector} +import pl.touk.nussknacker.engine.testmode.TestServiceInvocationCollector +import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader import pl.touk.nussknacker.engine.util.{MetaDataExtractor, ThreadUtils} import shapeless.syntax.typeable.typeableOps import java.util.concurrent.TimeUnit import scala.language.implicitConversions -import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap /* This is main class where we translate Nussknacker model to Flink job. @@ -59,26 +59,30 @@ class FlinkProcessRegistrar( implicit def millisToTime(duration: Long): Time = Time.of(duration, TimeUnit.MILLISECONDS) + def register( + env: StreamExecutionEnvironment, + process: CanonicalProcess, + processVersion: ProcessVersion, + deploymentData: DeploymentData + ): Unit = + register(env, process, processVersion, deploymentData, ProductionServiceInvocationCollector) + def register( env: StreamExecutionEnvironment, process: CanonicalProcess, processVersion: ProcessVersion, deploymentData: DeploymentData, - testRunId: Option[TestRunId] = None + resultCollector: ResultCollector ): Unit = { usingRightClassloader(env) { userClassLoader => - // TODO: move creation outside Registrar, together with refactoring SinkInvocationCollector... - val collector = - testRunId.map(new TestServiceInvocationCollector(_)).getOrElse(ProductionServiceInvocationCollector) - - val processCompilation = compileProcess(process, processVersion, collector) + val processCompilation = compileProcess(process, processVersion, resultCollector) val processWithDeps = processCompilation(UsedNodes.empty, userClassLoader) streamExecutionEnvPreparer.preRegistration(env, processWithDeps, deploymentData) val typeInformationDetection = TypeInformationDetectionUtils.forExecutionConfig(env.getConfig, userClassLoader) val partCompilation = FlinkProcessRegistrar.partCompilation[FlinkProcessCompilerData](processCompilation) _ - register(env, partCompilation, processWithDeps, testRunId, typeInformationDetection) + register(env, partCompilation, processWithDeps, resultCollector, typeInformationDetection) streamExecutionEnvPreparer.postRegistration(env, processWithDeps, deploymentData) } } @@ -111,7 +115,7 @@ class FlinkProcessRegistrar( env: StreamExecutionEnvironment, compiledProcessWithDeps: Option[ProcessPart] => ClassLoader => FlinkProcessCompilerData, processWithDeps: FlinkProcessCompilerData, - testRunId: Option[TestRunId], + resultCollector: ResultCollector, typeInformationDetection: TypeInformationDetection ): Unit = { @@ -265,12 +269,10 @@ class FlinkProcessRegistrar( val customNodeContext = nodeContext(nodeComponentInfoFrom(part), Left(contextBefore)) val withValuePrepared = sink.prepareValue(afterInterpretation, customNodeContext) // TODO: maybe this logic should be moved to compiler instead? - val withSinkAdded = testRunId match { - case None => - sink.registerSink(withValuePrepared, nodeContext(nodeComponentInfoFrom(part), Left(contextBefore))) - case Some(runId) => + val withSinkAdded = resultCollector match { + case testResultCollector: TestServiceInvocationCollector => val typ = part.node.data.ref.typ - val collectingSink = SinkInvocationCollector(runId, part.id, typ) + val collectingSink = testResultCollector.createSinkInvocationCollector(part.id, typ) withValuePrepared .map( (ds: ValueWithContext[sink.Value]) => ds.map(sink.prepareTestValue), @@ -278,6 +280,8 @@ class FlinkProcessRegistrar( ) // FIXME: ... .addSink(new CollectingSinkFunction[AnyRef](compiledProcessWithDeps(None), collectingSink, part.id)) + case _ => + sink.registerSink(withValuePrepared, nodeContext(nodeComponentInfoFrom(part), Left(contextBefore))) } withSinkAdded.name(operatorName(metaData, part.node, "sink")) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala index dde35e710a6..bf096a8514a 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala @@ -12,7 +12,13 @@ import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer import pl.touk.nussknacker.engine.process.compiler.TestFlinkProcessCompiler import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults -import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListener, ResultsCollectingListenerHolder} +import pl.touk.nussknacker.engine.testmode.{ + ResultsCollectingListener, + ResultsCollectingListenerHolder, + TestServiceInvocationCollector +} + +import scala.util.Using object FlinkTestMain extends FlinkRunner { @@ -41,18 +47,16 @@ class FlinkTestMain( val configuration: Configuration ) extends FlinkStubbedRunner { - def runTest[T](variableEncoder: Any => T): TestResults[T] = { - val env = createEnv - val collectingListener = ResultsCollectingListenerHolder.registerRun(variableEncoder) - try { - val registrar: FlinkProcessRegistrar = prepareRegistrar(collectingListener, scenarioTestData) - registrar.register(env, process, processVersion, deploymentData, Option(collectingListener.runId)) + def runTest[T](variableEncoder: Any => T): TestResults[T] = + Using.resource(ResultsCollectingListenerHolder.registerRun(variableEncoder)) { collectingListener => + val resultCollector = new TestServiceInvocationCollector(collectingListener.runId) + val registrar = prepareRegistrar(collectingListener, scenarioTestData) + val env = createEnv + + registrar.register(env, process, processVersion, deploymentData, resultCollector) execute(env, SavepointRestoreSettings.none()) collectingListener.results - } finally { - collectingListener.clean() } - } protected def prepareRegistrar[T]( collectingListener: ResultsCollectingListener, diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala index 386dec48da3..d974bde3b53 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkVerificationMain.scala @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer import pl.touk.nussknacker.engine.process.compiler.VerificationFlinkProcessCompiler import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar -import pl.touk.nussknacker.engine.testmode.TestRunId +import pl.touk.nussknacker.engine.testmode.{TestRunId, TestServiceInvocationCollector} object FlinkVerificationMain extends FlinkRunner { @@ -36,9 +36,11 @@ class FlinkVerificationMain( ) extends FlinkStubbedRunner { def runTest(): Unit = { - val env = createEnv - val registrar = prepareRegistrar() - registrar.register(env, process, processVersion, deploymentData, Option(TestRunId("dummy"))) + val resultCollector = new TestServiceInvocationCollector(TestRunId("dummy")) + val registrar = prepareRegistrar() + val env = createEnv + + registrar.register(env, process, processVersion, deploymentData, resultCollector) execute(env, SavepointRestoreSettings.forPath(savepointPath, true)) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/TestFlinkRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/TestFlinkRunner.scala index 2cbfbfd9333..8dc5f70d89d 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/TestFlinkRunner.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/TestFlinkRunner.scala @@ -11,8 +11,8 @@ import pl.touk.nussknacker.engine.deployment.DeploymentData import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompiler import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar +import pl.touk.nussknacker.engine.resultcollector.{ProductionServiceInvocationCollector, ResultCollector} import pl.touk.nussknacker.engine.testing.LocalModelData -import pl.touk.nussknacker.engine.testmode.TestRunId object TestFlinkRunner { @@ -20,11 +20,11 @@ object TestFlinkRunner { scenario: CanonicalProcess, deploymentData: DeploymentData = DeploymentData.empty, version: ProcessVersion = ProcessVersion.empty, - testRunId: Option[TestRunId] = None + resultCollector: ResultCollector = ProductionServiceInvocationCollector ): Unit = { val registrar = FlinkProcessRegistrar(new FlinkProcessCompiler(modelData), ExecutionConfigPreparer.unOptimizedChain(modelData)) - registrar.register(env, scenario, version, deploymentData, testRunId) + registrar.register(env, scenario, version, deploymentData, resultCollector) } def registerInEnvironment( @@ -35,10 +35,10 @@ object TestFlinkRunner { scenario: CanonicalProcess, deploymentData: DeploymentData = DeploymentData.empty, version: ProcessVersion = ProcessVersion.empty, - testRunId: Option[TestRunId] = None + resultCollector: ResultCollector = ProductionServiceInvocationCollector ): Unit = { val modelData = LocalModelData(config, configCreator) - registerInEnvironmentWithModel(env, modelData)(scenario, deploymentData, version, testRunId) + registerInEnvironmentWithModel(env, modelData)(scenario, deploymentData, version, resultCollector) } } diff --git a/engine/lite/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/TestRunner.scala b/engine/lite/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/TestRunner.scala index 89b1db43717..e4fac6fccb3 100644 --- a/engine/lite/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/TestRunner.scala +++ b/engine/lite/runtime/src/main/scala/pl/touk/nussknacker/engine/lite/TestRunner.scala @@ -48,13 +48,14 @@ class InterpreterTestRunner[F[_]: InterpreterShape: CapabilityTransformer: Effec // in tests we don't send metrics anywhere val testContext = LiteEngineRuntimeContextPreparer.noOp.prepare(testJobData(process)) val componentUseCase: ComponentUseCase = ComponentUseCase.TestRuntime + val testServiceInvocationCollector = new TestServiceInvocationCollector(collectingListener.runId) // FIXME: validation?? val scenarioInterpreter = ScenarioInterpreterFactory.createInterpreter[F, Input, Res]( process, modelData, additionalListeners = List(collectingListener), - new TestServiceInvocationCollector(collectingListener.runId), + testServiceInvocationCollector, componentUseCase )(SynchronousExecutionContext.ctx, implicitly[InterpreterShape[F]], implicitly[CapabilityTransformer[F]]) match { case Valid(interpreter) => interpreter @@ -81,7 +82,7 @@ class InterpreterTestRunner[F[_]: InterpreterShape: CapabilityTransformer: Effec val results = implicitly[EffectUnwrapper[F]].apply(scenarioInterpreter.invoke(inputs)) - collectSinkResults(collectingListener.runId, results) + collectSinkResults(testServiceInvocationCollector, results) collectingListener.results } finally { collectingListener.clean() @@ -96,11 +97,16 @@ class InterpreterTestRunner[F[_]: InterpreterShape: CapabilityTransformer: Effec JobData(process.metaData, processVersion) } - private def collectSinkResults(runId: TestRunId, results: ResultType[EndResult[Res]]): Unit = { + private def collectSinkResults( + testServiceInvocationCollector: TestServiceInvocationCollector, + results: ResultType[EndResult[Res]] + ): Unit = { val successfulResults = results.value successfulResults.foreach { result => val node = result.nodeId.id - SinkInvocationCollector(runId, node, node).collect(result.context, result.result) + testServiceInvocationCollector + .createSinkInvocationCollector(node, node) + .collect(result.context, result.result) } } diff --git a/interpreter/src/main/scala/pl/touk/nussknacker/engine/testmode/TestServiceInvocationCollector.scala b/interpreter/src/main/scala/pl/touk/nussknacker/engine/testmode/TestServiceInvocationCollector.scala index 3ed5f321883..46f92445f0c 100644 --- a/interpreter/src/main/scala/pl/touk/nussknacker/engine/testmode/TestServiceInvocationCollector.scala +++ b/interpreter/src/main/scala/pl/touk/nussknacker/engine/testmode/TestServiceInvocationCollector.scala @@ -1,10 +1,9 @@ package pl.touk.nussknacker.engine.testmode import cats.Monad -import pl.touk.nussknacker.engine.api.{Context, ContextId} -import pl.touk.nussknacker.engine.api.test.InvocationCollectors.{CollectableAction, ToCollect, TransmissionNames} import cats.implicits._ -import pl.touk.nussknacker.engine.api.NodeId +import pl.touk.nussknacker.engine.api.test.InvocationCollectors.{CollectableAction, ToCollect, TransmissionNames} +import pl.touk.nussknacker.engine.api.{Context, ContextId, NodeId} import pl.touk.nussknacker.engine.resultcollector.ResultCollector import scala.language.higherKinds @@ -39,10 +38,13 @@ class TestServiceInvocationCollector(testRunId: TestRunId) extends ResultCollect } } + def createSinkInvocationCollector(nodeId: String, ref: String): SinkInvocationCollector = + new SinkInvocationCollector(testRunId, nodeId, ref) + } //TODO: this should be somehow expressed via ResultCollector/TestServiceInvocationCollector -case class SinkInvocationCollector(runId: TestRunId, nodeId: String, ref: String) { +final class SinkInvocationCollector(runId: TestRunId, nodeId: String, ref: String) extends Serializable { def collect(context: Context, result: Any): Unit = { ResultsCollectingListenerHolder.updateResults( diff --git a/utils/components-testkit/src/main/scala/pl/touk/nussknacker/engine/util/test/ModelWithTestExtensions.scala b/utils/components-testkit/src/main/scala/pl/touk/nussknacker/engine/util/test/ModelWithTestExtensions.scala index 5f3fd70e8e3..3b82c69cb3d 100644 --- a/utils/components-testkit/src/main/scala/pl/touk/nussknacker/engine/util/test/ModelWithTestExtensions.scala +++ b/utils/components-testkit/src/main/scala/pl/touk/nussknacker/engine/util/test/ModelWithTestExtensions.scala @@ -4,7 +4,12 @@ import com.typesafe.config.Config import com.typesafe.config.ConfigValueFactory.fromAnyRef import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.component.ComponentDefinition -import pl.touk.nussknacker.engine.api.process.EmptyProcessConfigCreator +import pl.touk.nussknacker.engine.api.process.{ + EmptyProcessConfigCreator, + ExpressionConfig, + ProcessObjectDependencies, + WithCategories +} import pl.touk.nussknacker.engine.testing.LocalModelData object ModelWithTestExtensions { @@ -21,7 +26,18 @@ object ModelWithTestExtensions { s"components.${TestComponentsProvider.name}.${TestComponentsProvider.testRunIdConfig}", fromAnyRef(testExtensionsHolder.runId.id) ) - val model = LocalModelData(configWithRunId, new EmptyProcessConfigCreator) + + val configCreator = new EmptyProcessConfigCreator { + override def expressionConfig(processObjectDependencies: ProcessObjectDependencies) = { + val globalProcessVariables = globalVariables.map { case (key, value) => + key -> WithCategories.anyCategory(value) + } + + ExpressionConfig(globalProcessVariables, List.empty) + } + } + + val model = LocalModelData(configWithRunId, configCreator) try { action(model) } finally { diff --git a/utils/components-testkit/src/main/scala/pl/touk/nussknacker/engine/util/test/TestScenarioRunner.scala b/utils/components-testkit/src/main/scala/pl/touk/nussknacker/engine/util/test/TestScenarioRunner.scala index 26c4a85b30e..af4b1969f23 100644 --- a/utils/components-testkit/src/main/scala/pl/touk/nussknacker/engine/util/test/TestScenarioRunner.scala +++ b/utils/components-testkit/src/main/scala/pl/touk/nussknacker/engine/util/test/TestScenarioRunner.scala @@ -4,9 +4,15 @@ import cats.data.ValidatedNel import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.api.context.ProcessCompilationError import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo -import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, WithCategories} +import pl.touk.nussknacker.engine.api.process.ComponentUseCase import pl.touk.nussknacker.engine.api.process.ComponentUseCase.{EngineRuntime, TestRuntime} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess +import pl.touk.nussknacker.engine.resultcollector.{ProductionServiceInvocationCollector, ResultCollector} +import pl.touk.nussknacker.engine.testmode.{ + ResultsCollectingListener, + ResultsCollectingListenerHolder, + TestServiceInvocationCollector +} import pl.touk.nussknacker.engine.util.test.TestScenarioRunner.RunnerListResult import scala.reflect.ClassTag @@ -58,6 +64,28 @@ trait TestScenarioRunnerBuilder[R <: TestScenarioRunner, B <: TestScenarioRunner } +object TestScenarioCollectorHandler { + + def createHandler(componentUseCase: ComponentUseCase): TestScenarioCollectorHandler = { + val (resultCollector, resultsCollectingHolder) = if (ComponentUseCase.TestRuntime == componentUseCase) { + val collectingListener = ResultsCollectingListenerHolder.registerRun(identity) + (new TestServiceInvocationCollector(collectingListener.runId), Some(collectingListener)) + } else { + (ProductionServiceInvocationCollector, None) + } + + new TestScenarioCollectorHandler(resultCollector, resultsCollectingHolder) + } + + final class TestScenarioCollectorHandler( + val resultCollector: ResultCollector, + private val resultsCollectingListener: Option[ResultsCollectingListener] + ) extends AutoCloseable { + def close(): Unit = resultsCollectingListener.foreach(_.clean()) + } + +} + trait ClassBasedTestScenarioRunner extends TestScenarioRunner { // TODO: add generate test data support def runWithData[T: ClassTag, R](scenario: CanonicalProcess, data: List[T]): RunnerListResult[R] diff --git a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala index 7c3c83d1f6d..8c3d532c8f6 100644 --- a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala +++ b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala @@ -25,6 +25,7 @@ import pl.touk.nussknacker.engine.util.test.TestScenarioRunner.{RunnerListResult import pl.touk.nussknacker.engine.util.test._ import scala.reflect.ClassTag +import scala.util.Using private object testComponents { @@ -121,18 +122,27 @@ class FlinkTestScenarioRunner( // It's copied from registrar.register only for handling compilation errors.. // TODO: figure how to get compilation result on highest level - registrar.register? val compiler = new FlinkProcessCompilerWithTestComponents(testExtensionsHolder, modelData, componentUseCase) - val compileProcessData = compiler.compileProcess( - scenario, - ProcessVersion.empty, - ProductionServiceInvocationCollector, - getClass.getClassLoader - ) - compileProcessData.compileProcess().map { _ => - val registrar = FlinkProcessRegistrar(compiler, ExecutionConfigPreparer.unOptimizedChain(modelData)) - registrar.register(env, scenario, ProcessVersion.empty, DeploymentData.empty, testRunId = None) - env.executeAndWaitForFinished(scenario.id)() - RunUnitResult(errors = Nil) + Using.resource(TestScenarioCollectorHandler.createHandler(componentUseCase)) { testScenarioCollectorHandler => + val compileProcessData = compiler.compileProcess( + scenario, + ProcessVersion.empty, + testScenarioCollectorHandler.resultCollector, + getClass.getClassLoader + ) + + compileProcessData.compileProcess().map { _ => + val registrar = FlinkProcessRegistrar(compiler, ExecutionConfigPreparer.unOptimizedChain(modelData)) + registrar.register( + env, + scenario, + ProcessVersion.empty, + DeploymentData.empty, + testScenarioCollectorHandler.resultCollector + ) + env.executeAndWaitForFinished(scenario.id)() + RunUnitResult(errors = Nil) + } } } diff --git a/utils/flink-components-testkit/src/test/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunnerSpec.scala b/utils/flink-components-testkit/src/test/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunnerSpec.scala new file mode 100644 index 00000000000..09b220dc760 --- /dev/null +++ b/utils/flink-components-testkit/src/test/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunnerSpec.scala @@ -0,0 +1,133 @@ +package pl.touk.nussknacker.engine.flink.util.test + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api._ +import pl.touk.nussknacker.engine.api.component.ComponentDefinition +import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo +import pl.touk.nussknacker.engine.api.process.ComponentUseCase +import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector +import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess +import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.spel.SpelExpressionEvaluationException +import pl.touk.nussknacker.engine.util.functions._ +import pl.touk.nussknacker.engine.util.test.TestScenarioRunner +import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage + +import java.time.{Clock, Instant, ZoneId} +import scala.concurrent.{ExecutionContext, Future} + +class FlinkTestScenarioRunnerSpec extends AnyFunSuite with Matchers with FlinkSpec with ValidatedValuesDetailedMessage { + + import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ + import pl.touk.nussknacker.engine.spel.Implicits._ + + test("should return service invoke value") { + val input = "input" + + val scenario: CanonicalProcess = + ScenarioBuilder + .streaming(getClass.getName) + .source("start", TestScenarioRunner.testDataSource) + .enricher("service", "output", TestService.ServiceId, "param" -> "#input") + .processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#output") + + val runResults = + TestScenarioRunner + .flinkBased(config, flinkMiniCluster) + .withExtraComponents(List(ComponentDefinition(TestService.ServiceId, TestService))) + .build() + .runWithData[String, String](scenario, List(input)) + + runResults.validValue.successes shouldBe List(input) + } + + test("should return service invoke mocked value for test runtime mode") { + val input = "input" + + val scenario: CanonicalProcess = + ScenarioBuilder + .streaming(getClass.getName) + .source("start", TestScenarioRunner.testDataSource) + .enricher("service", "output", TestService.ServiceId, "param" -> "#input") + .processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#output") + + val runResults = + TestScenarioRunner + .flinkBased(config, flinkMiniCluster) + .withExtraComponents(List(ComponentDefinition(TestService.ServiceId, TestService))) + .inTestRuntimeMode + .build() + .runWithData[String, String](scenario, List(input)) + + runResults.validValue.successes shouldBe List(TestService.MockedValued) + } + + test("should allowing use global variable - date helper") { + val now = Instant.now() + val dateHelper = new DateUtils(Clock.fixed(now, ZoneId.systemDefault())) + + val scenario: CanonicalProcess = + ScenarioBuilder + .streaming(getClass.getName) + .source("start", TestScenarioRunner.testDataSource) + .processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#DATE.now.toString") + + val runResults = + TestScenarioRunner + .flinkBased(config, flinkMiniCluster) + .withExtraGlobalVariables(Map("DATE" -> dateHelper)) + .build() + .runWithData[String, String](scenario, List("lcl")) + + runResults.validValue.successes shouldBe List(now.toString) + } + + // TODO: FlinkTestScenarioRunner doesn't collect errors, see FlinkTestScenarioRunner.collectResults + ignore("should catch exception during compilation in test run mode") { + val scenario = + ScenarioBuilder + .streaming(getClass.getName) + .source("start", TestScenarioRunner.testDataSource) + .filter("filter", "#input / 0 != 0") // intentional throwing of an exception + .processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input") + + val runResults = + TestScenarioRunner + .flinkBased(config, flinkMiniCluster) + .build() + .runWithData[Int, Int](scenario, List(10)) + + runResults.validValue.errors.map(_.throwable).map { exc => + exc.asInstanceOf[SpelExpressionEvaluationException].getMessage + } shouldBe List( + "Expression [#input / 0 != 0] evaluation failed, message: / by zero", + ) + } + + object TestService extends EagerService { + + val ServiceId = "testService" + + val MockedValued = "sample-mocked" + + @MethodToInvoke + def invoke(@ParamName("param") value: LazyParameter[String]): ServiceInvoker = new ServiceInvoker { + + override def invokeService(params: Map[String, Any])( + implicit ec: ExecutionContext, + collector: ServiceInvocationCollector, + contextId: ContextId, + componentUseCase: ComponentUseCase + ): Future[String] = { + collector.collect(s"test-service-$value", Option(MockedValued)) { + Future.successful(params("param").asInstanceOf[String]) + } + } + + } + + } + +} diff --git a/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/ConsumerRecordHelper.scala b/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/ConsumerRecordHelper.scala index 048855629dd..400dbd01e5d 100644 --- a/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/ConsumerRecordHelper.scala +++ b/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/ConsumerRecordHelper.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.engine.kafka -import io.circe.Decoder +import io.circe.{Decoder, Encoder} import pl.touk.nussknacker.engine.api.CirceUtil import java.nio.charset.StandardCharsets @@ -8,13 +8,27 @@ import scala.reflect.{ClassTag, classTag} object ConsumerRecordHelper { - def asJson[T: Decoder: ClassTag](data: Array[Byte]): T = { + // General helper's method - don't remove it + def asBytes[T: Encoder](value: T): Array[Byte] = + value match { + case null => null + case _ => asString(value).getBytes(StandardCharsets.UTF_8) + } + + def asString[T: Encoder](value: T): String = + value match { + case null => null + case str: String => str + case _ => implicitly[Encoder[T]].apply(value).noSpaces + } + + def asJson[T: Decoder: ClassTag](value: Array[Byte]): T = { val clazz = classTag[T].runtimeClass if (classOf[String].isAssignableFrom(clazz)) { - Option(data).map(value => new String(value, StandardCharsets.UTF_8)).orNull.asInstanceOf[T] + Option(value).map(value => new String(value, StandardCharsets.UTF_8)).orNull.asInstanceOf[T] } else { - CirceUtil.decodeJsonUnsafe[T](data) + CirceUtil.decodeJsonUnsafe[T](value) } } diff --git a/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaClient.scala b/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaClient.scala index c645a512b6e..d171a2515aa 100644 --- a/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaClient.scala +++ b/utils/kafka-test-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaClient.scala @@ -11,6 +11,7 @@ import java.time.Duration import java.util import java.util.{Collections, UUID} import scala.concurrent.{Future, Promise} +import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} class KafkaClient(kafkaAddress: String, id: String) extends LazyLogging { @@ -62,13 +63,9 @@ class KafkaClient(kafkaAddress: String, id: String) extends LazyLogging { timestamp: java.lang.Long = null, headers: Headers = KafkaRecordUtils.emptyHeaders ): Future[RecordMetadata] = { - val strContent = content match { - case str: String => str - case _ => implicitly[Encoder[T]].apply(content).noSpaces - } - - val promise = Promise[RecordMetadata]() - val record = createRecord(topic, key, strContent, partition, timestamp, headers) + val strContent = ConsumerRecordHelper.asString(content) + val promise = Promise[RecordMetadata]() + val record = createRecord(topic, key, strContent, partition, timestamp, headers) producer.send(record, producerCallback(promise)) promise.future } diff --git a/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/LiteTestScenarioRunner.scala b/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/LiteTestScenarioRunner.scala index 64c0a6160ec..2eba0898e0c 100644 --- a/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/LiteTestScenarioRunner.scala +++ b/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/LiteTestScenarioRunner.scala @@ -6,7 +6,14 @@ import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.api.context.ValidationContext import pl.touk.nussknacker.engine.api.context.transformation.{NodeDependencyValue, SingleInputGenericNodeTransformation} import pl.touk.nussknacker.engine.api.definition.{NodeDependency, TypedNodeDependency, WithExplicitTypesToExtract} -import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, SinkFactory, Source, SourceFactory, WithCategories} +import pl.touk.nussknacker.engine.api.process.{ + BasicContextInitializer, + ComponentUseCase, + SinkFactory, + Source, + SourceFactory, + WithCategories +} import pl.touk.nussknacker.engine.api.typed.typing import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess @@ -40,8 +47,8 @@ case class LiteTestScenarioRunnerBuilder( import TestScenarioRunner._ - override def withExtraComponents(extraComponents: List[ComponentDefinition]): LiteTestScenarioRunnerBuilder = - copy(components = extraComponents) + override def withExtraComponents(components: List[ComponentDefinition]): LiteTestScenarioRunnerBuilder = + copy(components = components) override def withExtraGlobalVariables( globalVariables: Map[String, AnyRef] @@ -104,7 +111,8 @@ private[test] class SimpleSourceFactory(result: TypingResult) override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])( implicit nodeId: NodeId ): NodeTransformationDefinition = { case TransformationStep(Nil, _) => - FinalResults(ValidationContext(Map(VariableConstants.InputVariableName -> result))) + val finalInitializer = new BasicContextInitializer(result) + FinalResults.forValidation(context, Nil, None)(finalInitializer.validationContext) } override def implementation( diff --git a/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/RequestResponseTestScenarioRunner.scala b/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/RequestResponseTestScenarioRunner.scala index 8cc65dd45d4..5fc931cde43 100644 --- a/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/RequestResponseTestScenarioRunner.scala +++ b/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/RequestResponseTestScenarioRunner.scala @@ -5,6 +5,7 @@ import cats.Id import cats.data.{NonEmptyList, ValidatedNel} import com.typesafe.config.{Config, ConfigFactory} import io.circe.Json +import org.everit.json.schema.TrueSchema import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.component.ComponentDefinition import pl.touk.nussknacker.engine.api.context.ProcessCompilationError @@ -15,7 +16,12 @@ import pl.touk.nussknacker.engine.lite.api.runtimecontext.LiteEngineRuntimeConte import pl.touk.nussknacker.engine.lite.util.test.SynchronousLiteInterpreter._ import pl.touk.nussknacker.engine.requestresponse.{RequestResponseHttpHandler, RequestResponseInterpreter} import pl.touk.nussknacker.engine.resultcollector.ProductionServiceInvocationCollector -import pl.touk.nussknacker.engine.util.test.{ModelWithTestExtensions, TestScenarioRunner, TestScenarioRunnerBuilder} +import pl.touk.nussknacker.engine.util.test.{ + ModelWithTestExtensions, + TestScenarioCollectorHandler, + TestScenarioRunner, + TestScenarioRunnerBuilder +} import scala.reflect.ClassTag @@ -37,6 +43,8 @@ object RequestResponseTestScenarioRunner { |} |""".stripMargin + val trueFieldSchema: String = TrueSchema.builder().build().toString + val sampleSchemas: Map[String, String] = Map("inputSchema" -> stringFieldSchema, "outputSchema" -> stringFieldSchema) } @@ -51,6 +59,7 @@ class RequestResponseTestScenarioRunner( def runWithRequests[T]( scenario: CanonicalProcess )(run: (HttpRequest => Either[NonEmptyList[ErrorType], Json]) => T): ValidatedNel[ProcessCompilationError, T] = { + val testScenarioCollectorHandler = TestScenarioCollectorHandler.createHandler(componentUseCase) ModelWithTestExtensions.withExtensions(config, components, globalVariables) { modelData => RequestResponseInterpreter[Id]( scenario, @@ -58,7 +67,7 @@ class RequestResponseTestScenarioRunner( LiteEngineRuntimeContextPreparer.noOp, modelData, additionalListeners = Nil, - resultCollector = ProductionServiceInvocationCollector, + resultCollector = testScenarioCollectorHandler.resultCollector, componentUseCase = componentUseCase ).map { interpreter => interpreter.open() @@ -69,6 +78,7 @@ class RequestResponseTestScenarioRunner( handler.invoke(req, entity) }) } finally { + testScenarioCollectorHandler.close() interpreter.close() } } @@ -87,7 +97,7 @@ case class RequestResponseTestScenarioRunnerBuilder( import TestScenarioRunner._ override def withExtraComponents( - extraComponents: List[ComponentDefinition] + components: List[ComponentDefinition] ): RequestResponseTestScenarioRunnerBuilder = copy(components = components) diff --git a/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/SynchronousLiteInterpreter.scala b/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/SynchronousLiteInterpreter.scala index bb2e72fc6e1..97fc141aac0 100644 --- a/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/SynchronousLiteInterpreter.scala +++ b/utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/SynchronousLiteInterpreter.scala @@ -15,8 +15,8 @@ import pl.touk.nussknacker.engine.lite.api.customComponentTypes.CapabilityTransf import pl.touk.nussknacker.engine.lite.api.interpreterTypes.{EndResult, ScenarioInputBatch} import pl.touk.nussknacker.engine.lite.api.runtimecontext.LiteEngineRuntimeContextPreparer import pl.touk.nussknacker.engine.lite.capabilities.FixedCapabilityTransformer -import pl.touk.nussknacker.engine.resultcollector.ProductionServiceInvocationCollector import pl.touk.nussknacker.engine.util.SynchronousExecutionContext +import pl.touk.nussknacker.engine.util.test.TestScenarioCollectorHandler import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} @@ -49,13 +49,13 @@ object SynchronousLiteInterpreter { componentUseCase: ComponentUseCase, runtimeContextPreparer: LiteEngineRuntimeContextPreparer = LiteEngineRuntimeContextPreparer.noOp ): SynchronousResult = { - + val testScenarioCollectorHandler = TestScenarioCollectorHandler.createHandler(componentUseCase) ScenarioInterpreterFactory .createInterpreter[Id, Any, AnyRef]( scenario, modelData, Nil, - ProductionServiceInvocationCollector, + testScenarioCollectorHandler.resultCollector, componentUseCase ) .map { interpreter => @@ -64,6 +64,7 @@ object SynchronousLiteInterpreter { val value: Id[ResultType[EndResult[AnyRef]]] = interpreter.invoke(data) value.run } finally { + testScenarioCollectorHandler.close() interpreter.close() } } diff --git a/utils/lite-components-testkit/src/test/scala/pl/touk/nussknacker/engine/lite/util/test/LiteTestScenarioRunnerSpec.scala b/utils/lite-components-testkit/src/test/scala/pl/touk/nussknacker/engine/lite/util/test/LiteTestScenarioRunnerSpec.scala index 2714e83a656..f9b41871bb8 100644 --- a/utils/lite-components-testkit/src/test/scala/pl/touk/nussknacker/engine/lite/util/test/LiteTestScenarioRunnerSpec.scala +++ b/utils/lite-components-testkit/src/test/scala/pl/touk/nussknacker/engine/lite/util/test/LiteTestScenarioRunnerSpec.scala @@ -2,21 +2,28 @@ package pl.touk.nussknacker.engine.lite.util.test import com.typesafe.config.ConfigValueFactory.fromAnyRef import com.typesafe.config.{Config, ConfigFactory} +import org.scalatest.Inside import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion} +import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo import pl.touk.nussknacker.engine.api.process.ComponentUseCase.EngineRuntime import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName, Service} import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.spel.Implicits._ +import pl.touk.nussknacker.engine.spel.SpelExpressionEvaluationException +import pl.touk.nussknacker.engine.util.functions.DateUtils import pl.touk.nussknacker.engine.util.test.{RunResult, TestScenarioRunner} import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage +import java.time.{Clock, Instant, ZoneId} import java.util import scala.concurrent.Future -class LiteTestScenarioRunnerSpec extends AnyFunSuite with Matchers with ValidatedValuesDetailedMessage { +class LiteTestScenarioRunnerSpec extends AnyFunSuite with Matchers with ValidatedValuesDetailedMessage with Inside { + + import LiteTestScenarioRunner._ test("should test custom component with lite") { val scenario = ScenarioBuilder @@ -52,6 +59,89 @@ class LiteTestScenarioRunnerSpec extends AnyFunSuite with Matchers with Validate result.validValue shouldBe RunResult.success("abc") } + test("should return service invoke value") { + val input = "input" + + val scenario = + ScenarioBuilder + .streamingLite("stream") + .source("source", TestScenarioRunner.testDataSource) + .enricher("service", "output", TestService.ServiceId, "param" -> "#input") + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#output") + + val runResults = + TestScenarioRunner + .liteBased(ConfigFactory.empty()) + .withExtraComponents(List(ComponentDefinition(TestService.ServiceId, TestService))) + .build() + .runWithData(scenario, List(input)) + + runResults.validValue.successes shouldBe List(input) + } + + test("should return service invoke mocked value for test runtime mode") { + val input = "input" + + val scenario = + ScenarioBuilder + .streamingLite("stream") + .source("source", TestScenarioRunner.testDataSource) + .enricher("service", "output", TestService.ServiceId, "param" -> "#input") + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#output") + + val runResults = + TestScenarioRunner + .liteBased(ConfigFactory.empty()) + .withExtraComponents(List(ComponentDefinition(TestService.ServiceId, TestService))) + .inTestRuntimeMode + .build() + .runWithData(scenario, List(input)) + + runResults.validValue.successes shouldBe List(TestService.MockedValued) + } + + test("should allowing use global variable - date helper") { + val now = Instant.now() + val dateHelper = new DateUtils(Clock.fixed(now, ZoneId.systemDefault())) + + val scenario = + ScenarioBuilder + .streaming(getClass.getName) + .source("start", TestScenarioRunner.testDataSource) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#DATE.now.toString") + + val runResults = + TestScenarioRunner + .liteBased(ConfigFactory.empty()) + .withExtraGlobalVariables(Map("DATE" -> dateHelper)) + .build() + .runWithData[String, String](scenario, List("input")) + + runResults.validValue.successes shouldBe List(now.toString) + } + + test("should catch exception during compilation in test run mode") { + val scenario = + ScenarioBuilder + .streaming(getClass.getName) + .source("start", TestScenarioRunner.testDataSource) + .filter("filter", "#input / 0 != 0") // intentional throwing of an exception + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#input") + + val runResults = + TestScenarioRunner + .liteBased(ConfigFactory.empty()) + .inTestRuntimeMode + .build() + .runWithData[Int, Int](scenario, List(10)) + + runResults.validValue.errors.map(_.throwable).map { exc => + exc.asInstanceOf[SpelExpressionEvaluationException].getMessage + } shouldBe List( + "Expression [#input / 0 != 0] evaluation failed, message: / by zero", + ) + } + } private case class SourceData(field: String) diff --git a/utils/lite-components-testkit/src/test/scala/pl/touk/nussknacker/engine/lite/util/test/RequestResponseTestScenarioRunnerSpec.scala b/utils/lite-components-testkit/src/test/scala/pl/touk/nussknacker/engine/lite/util/test/RequestResponseTestScenarioRunnerSpec.scala index bcc1c21ab96..3e34a07b6fd 100644 --- a/utils/lite-components-testkit/src/test/scala/pl/touk/nussknacker/engine/lite/util/test/RequestResponseTestScenarioRunnerSpec.scala +++ b/utils/lite-components-testkit/src/test/scala/pl/touk/nussknacker/engine/lite/util/test/RequestResponseTestScenarioRunnerSpec.scala @@ -9,37 +9,41 @@ import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName, Service} import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.lite.util.test.RequestResponseTestScenarioRunner._ import pl.touk.nussknacker.engine.spel.Implicits._ +import pl.touk.nussknacker.engine.util.functions.DateUtils import pl.touk.nussknacker.engine.util.test.TestScenarioRunner +import java.time.{Clock, Instant, ZoneId} import scala.concurrent.Future class RequestResponseTestScenarioRunnerSpec extends AnyFunSuite with Matchers { private val failingComponent = "failing" - private val scenarioRunner: RequestResponseTestScenarioRunner = TestScenarioRunner - .requestResponseBased() - .withExtraComponents(List(ComponentDefinition(failingComponent, FailingService))) - .build() + private val baseRunner: RequestResponseTestScenarioRunner = + TestScenarioRunner + .requestResponseBased() + .withExtraComponents(List(ComponentDefinition(failingComponent, FailingService))) + .build() test("runs tests") { - val runner = scenarioRunner + val runner = baseRunner val scenario = ScenarioBuilder .requestResponse("test") .additionalFields(properties = RequestResponseTestScenarioRunner.sampleSchemas) .source("input", "request") .emptySink("output", "response", "Raw editor" -> "true", "Value" -> "{field1: #input.field1 + '-suffix'}") - runner.runWithRequests(scenario) { invoker => + val runResults = runner.runWithRequests(scenario) { invoker => invoker(HttpRequest(HttpMethods.POST, entity = Map("field1" -> "value").asJson.spaces2)) shouldBe Right( Map("field1" -> "value-suffix").asJson ) } + runResults.isValid shouldBe true // FIXME: verify false positives } test("runs failure handling") { - val runner = scenarioRunner + val runner = baseRunner val scenario = ScenarioBuilder .requestResponse("test") .additionalFields(properties = RequestResponseTestScenarioRunner.sampleSchemas) @@ -47,7 +51,7 @@ class RequestResponseTestScenarioRunnerSpec extends AnyFunSuite with Matchers { .enricher("fail", "output", failingComponent, "value" -> "#input.field1") .emptySink("output", "response", "value" -> "#output") - runner.runWithRequests(scenario) { invoker => + val runResults = runner.runWithRequests(scenario) { invoker => val firstError = invoker( HttpRequest(HttpMethods.POST, entity = Map("field1" -> FailingService.failTrigger).asJson.spaces2) ).swap.toOption.get.head @@ -59,6 +63,84 @@ class RequestResponseTestScenarioRunnerSpec extends AnyFunSuite with Matchers { } + runResults.isValid shouldBe true // FIXME: verify false positives + } + + test("should return service invoke value") { + val input = "input" + val scenario = + ScenarioBuilder + .requestResponse("test") + .additionalFields(properties = Map("inputSchema" -> stringFieldSchema, "outputSchema" -> trueFieldSchema)) + .source("input", "request") + .enricher("service", "output", TestService.ServiceId, "param" -> "#input.field1") + .emptySink("output", "response", "Raw editor" -> "true", "Value" -> "{field1: #output}") + + val runResults = + TestScenarioRunner + .requestResponseBased() + .withExtraComponents(List(ComponentDefinition(TestService.ServiceId, TestService))) + .build() + .runWithRequests(scenario) { invoker => + val result = invoker(HttpRequest(HttpMethods.POST, entity = Map("field1" -> input).asJson.spaces2)) + result shouldBe Right( + Map("field1" -> input).asJson + ) + } + + runResults.isValid shouldBe true // FIXME: verify false positives + } + + test("should return service invoke mocked value for test runtime mode") { + val input = "input" + val scenario = + ScenarioBuilder + .requestResponse("test") + .additionalFields(properties = Map("inputSchema" -> stringFieldSchema, "outputSchema" -> trueFieldSchema)) + .source("input", "request") + .enricher("service", "output", TestService.ServiceId, "param" -> "#input.field1") + .emptySink("output", "response", "Raw editor" -> "true", "Value" -> "{field1: #output}") + + val runResults = + TestScenarioRunner + .requestResponseBased() + .withExtraComponents(List(ComponentDefinition(TestService.ServiceId, TestService))) + .inTestRuntimeMode + .build() + .runWithRequests(scenario) { invoker => + val result = invoker(HttpRequest(HttpMethods.POST, entity = Map("field1" -> input).asJson.spaces2)) + result shouldBe Right( + Map("field1" -> TestService.MockedValued).asJson + ) + } + + runResults.isValid shouldBe true // FIXME: verify false positives + } + + test("should allowing use global variable - date helper") { + val now = Instant.now() + val dateHelper = new DateUtils(Clock.fixed(now, ZoneId.systemDefault())) + + val scenario = + ScenarioBuilder + .requestResponse("test") + .additionalFields(properties = Map("inputSchema" -> stringFieldSchema, "outputSchema" -> trueFieldSchema)) + .source("input", "request") + .emptySink("output", "response", "Raw editor" -> "true", "Value" -> "{field1: #DATE.now.toString}") + + val runResults = + TestScenarioRunner + .requestResponseBased() + .withExtraGlobalVariables(Map("DATE" -> dateHelper)) + .build() + .runWithRequests(scenario) { invoker => + val result = invoker(HttpRequest(HttpMethods.POST, entity = Map("field1" -> "input").asJson.spaces2)) + result shouldBe Right( + Map("field1" -> now.toString).asJson + ) + } + + runResults.isValid shouldBe true // FIXME: verify false positives } } diff --git a/utils/lite-components-testkit/src/test/scala/pl/touk/nussknacker/engine/lite/util/test/TestService.scala b/utils/lite-components-testkit/src/test/scala/pl/touk/nussknacker/engine/lite/util/test/TestService.scala new file mode 100644 index 00000000000..492f94294ac --- /dev/null +++ b/utils/lite-components-testkit/src/test/scala/pl/touk/nussknacker/engine/lite/util/test/TestService.scala @@ -0,0 +1,38 @@ +package pl.touk.nussknacker.engine.lite.util.test + +import pl.touk.nussknacker.engine.api.{ + ContextId, + EagerService, + LazyParameter, + MethodToInvoke, + ParamName, + ServiceInvoker +} +import pl.touk.nussknacker.engine.api.process.ComponentUseCase +import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector + +import scala.concurrent.{ExecutionContext, Future} + +object TestService extends EagerService { + + val ServiceId = "testService" + + val MockedValued = "sample-mocked" + + @MethodToInvoke + def invoke(@ParamName("param") value: LazyParameter[String]): ServiceInvoker = new ServiceInvoker { + + override def invokeService(params: Map[String, Any])( + implicit ec: ExecutionContext, + collector: ServiceInvocationCollector, + contextId: ContextId, + componentUseCase: ComponentUseCase + ): Future[String] = { + collector.collect(s"test-service-$value", Option(MockedValued)) { + Future.successful(params("param").asInstanceOf[String]) + } + } + + } + +}