From e3505e43c1344df3222a3d8ab2476324fb014123 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Mon, 8 Jul 2024 13:29:06 +0200 Subject: [PATCH] [NU-1683] Fixes memory leak in test mechanism introduced in #4901 --- docs/Changelog.md | 1 + .../nussknacker/engine/process/runner/FlinkTestMain.scala | 8 ++++++-- .../engine/testmode/ResultsCollectingListener.scala | 4 ++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index 569d348f6b0..5916ca4de54 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -21,6 +21,7 @@ * [#6245](https://github.com/TouK/nussknacker/pull/6245) Parameter validations defined in AdditionalUIConfigProvider now properly impact dynamic components. * [#6264](https://github.com/TouK/nussknacker/pull/6264) Fix for DatabaseLookupEnricher mixing fields values when it is connected to ignite db * [#6270](https://github.com/TouK/nussknacker/pull/6270) Resolved an issue with comparing remote versions +* [#6337](https://github.com/TouK/nussknacker/pull/6337) Fixes memory leak in test mechanism introduced in 1.13 version ([#4901](https://github.com/TouK/nussknacker/pull/4901)) 1.15.4 (5 July 2025) ------------------------- diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala index f83d74675cd..39c842f72f2 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMain.scala @@ -46,8 +46,9 @@ class FlinkTestMain( val configuration: Configuration ) extends FlinkStubbedRunner { - def runTest: TestResults[Json] = - Using.resource(ResultsCollectingListenerHolder.registerTestEngineListener) { collectingListener => + def runTest: TestResults[Json] = { + val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener + try { val resultCollector = new TestServiceInvocationCollector(collectingListener) val registrar = prepareRegistrar(collectingListener, scenarioTestData) val env = createEnv @@ -55,7 +56,10 @@ class FlinkTestMain( registrar.register(env, process, processVersion, deploymentData, resultCollector) execute(env, SavepointRestoreSettings.none()) collectingListener.results + } finally { + collectingListener.clean() } + } protected def prepareRegistrar( collectingListener: ResultsCollectingListener[Json], diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testmode/ResultsCollectingListener.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testmode/ResultsCollectingListener.scala index d36174f1da4..8fb97dfc98b 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testmode/ResultsCollectingListener.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/testmode/ResultsCollectingListener.scala @@ -24,6 +24,10 @@ case class ResultsCollectingListener[T](holderClass: String, runId: TestRunId, v def results: TestResults[T] = ResultsCollectingListenerHolder.resultsForId(runId) + // Warning! close can't clean resources because listener is passed into each scenario subpart and it will be closed few times + // We have to use dedicated clean() method when we are sure that we consumed results instead. + override final def close(): Unit = {} + def clean(): Unit = ResultsCollectingListenerHolder.cleanResult(runId) override def nodeEntered(nodeId: String, context: Context, processMetaData: MetaData): Unit = {