From c31d81ca875637f8b586050cb3c80ae3f41a255d Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 9 Oct 2024 10:58:37 -0400 Subject: [PATCH] Invoke teardown when DoFn throws in portable runners (#32522) * Invoke teardown when DoFn throws in portable runners * update CHANGES.md * adjusted comment and logging --- .../beam_PostCommit_Java_PVR_Flink_Batch.json | 2 +- ...m_PostCommit_Java_PVR_Flink_Streaming.json | 2 +- .../beam_PostCommit_Java_PVR_Samza.json | 2 +- ..._PostCommit_Java_PVR_Spark3_Streaming.json | 2 +- .../beam_PostCommit_Java_PVR_Spark_Batch.json | 2 +- CHANGES.md | 1 + .../flink/job-server/flink_job_server.gradle | 1 - .../google-cloud-dataflow-java/build.gradle | 2 +- runners/samza/job-server/build.gradle | 3 ++- .../spark/job-server/spark_job_server.gradle | 2 -- .../harness/control/ProcessBundleHandler.java | 19 ++++++++++++++++++- 11 files changed, 27 insertions(+), 11 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json index b970762c8397..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test" + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Streaming.json index b60f5c4cc3c8..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Samza.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Samza.json index b60f5c4cc3c8..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Samza.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Samza.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json index b60f5c4cc3c8..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark_Batch.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark_Batch.json index b60f5c4cc3c8..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark_Batch.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark_Batch.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/CHANGES.md b/CHANGES.md index fcb02d1d996a..b9d5f2c191c9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -80,6 +80,7 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners ([#18592](https://github.com/apache/beam/issues/18592), [#31381](https://github.com/apache/beam/issues/31381)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 56a58df4fb09..1c610477a444 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -171,7 +171,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage' excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' - excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle' excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index df2270d3b653..4906d9cf9cb8 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -185,7 +185,7 @@ def commonLegacyExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesGaugeMetrics', 'org.apache.beam.sdk.testing.UsesMultimapState', 'org.apache.beam.sdk.testing.UsesTestStream', - 'org.apache.beam.sdk.testing.UsesParDoLifecycle', + 'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner 'org.apache.beam.sdk.testing.UsesMetricsPusher', 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ] diff --git a/runners/samza/job-server/build.gradle b/runners/samza/job-server/build.gradle index f972f376e5c8..6fc8db98a4f9 100644 --- a/runners/samza/job-server/build.gradle +++ b/runners/samza/job-server/build.gradle @@ -90,7 +90,6 @@ def portableValidatesRunnerTask(String name, boolean docker) { excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage' excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' - excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle' excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' @@ -127,6 +126,8 @@ def portableValidatesRunnerTask(String name, boolean docker) { excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testParDoShiftTimestampInvalid' // TODO(https://github.com/apache/beam/issues/21144) excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testParDoShiftTimestampInvalidZeroAllowed' + // TODO(https://github.com/apache/beam/issues/32520) + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionIn*Stateful' // TODO(https://github.com/apache/beam/issues/21145) excludeTestsMatching 'org.apache.beam.sdk.transforms.DeduplicateTest.testEventTime' // TODO(https://github.com/apache/beam/issues/21146) diff --git a/runners/spark/job-server/spark_job_server.gradle b/runners/spark/job-server/spark_job_server.gradle index 6d2d4b2bafbf..5ed5f4277bf4 100644 --- a/runners/spark/job-server/spark_job_server.gradle +++ b/runners/spark/job-server/spark_job_server.gradle @@ -118,7 +118,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker, excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage' excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery' - excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle' excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' @@ -187,7 +186,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker, excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery' excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle' - excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle' excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 0d520dcf7f5c..c91d5ba71b89 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -596,7 +596,11 @@ public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.Instruction request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor); return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response); } catch (Exception e) { - // Make sure we clean-up from the active set of bundle processors. + // Make sure we clean up from the active set of bundle processors. + LOG.debug( + "Discard bundleProcessor for {} after exception: {}", + request.getProcessBundle().getProcessBundleDescriptorId(), + e.getMessage()); bundleProcessorCache.discard(bundleProcessor); throw e; } @@ -1168,6 +1172,18 @@ void discard() { if (this.bundleCache != null) { this.bundleCache.clear(); } + // setupFunctions are invoked in createBundleProcessor. Invoke teardownFunction here as the + // BundleProcessor is already removed from cache and won't be re-used. + for (ThrowingRunnable teardownFunction : Lists.reverse(this.getTearDownFunctions())) { + try { + teardownFunction.run(); + } catch (Throwable e) { + LOG.warn( + "Exceptions are thrown from DoFn.teardown method when trying to discard " + + "ProcessBundleHandler", + e); + } + } getMetricsEnvironmentStateForBundle().discard(); for (BeamFnDataOutboundAggregator aggregator : getOutboundAggregators().values()) { aggregator.discard(); @@ -1175,6 +1191,7 @@ void discard() { } } + // this is called in cachedBundleProcessors removal listener void shutdown() { for (ThrowingRunnable tearDownFunction : getTearDownFunctions()) { LOG.debug("Tearing down function {}", tearDownFunction);