From 369032b63c4782bedf28cc5e5cd59348e392a4d3 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 14 Nov 2022 15:37:06 -0500 Subject: [PATCH 1/2] Fix Python PostCommit Examples on portable * Fix custom_ptransform pipeline options gets modified --- .../apache_beam/examples/cookbook/custom_ptransform.py | 9 +++++---- sdks/python/test-suites/portable/common.gradle | 5 +++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py index 4beab18aab1e..a922216a5220 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py @@ -118,11 +118,12 @@ def get_args(argv): def run(argv=None): known_args, pipeline_args = get_args(argv) - options = PipelineOptions(pipeline_args) - run_count1(known_args, options) - run_count2(known_args, options) - run_count3(known_args, options) + # pipeline initialization may modify PipelineOptions object. + # Create instances for each. + run_count1(known_args, PipelineOptions(pipeline_args)) + run_count2(known_args, PipelineOptions(pipeline_args)) + run_count3(known_args, PipelineOptions(pipeline_args)) if __name__ == '__main__': diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index 0eae96c8bec9..c87696793f0f 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -230,6 +230,11 @@ project.tasks.register("flinkExamples") { "--environment_type=LOOPBACK", "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", "--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}", + '--sdk_harness_log_level_overrides=' + + // suppress info level flink.runtime log flood + '{\\"org.apache.flink.runtime\\":\\"WARN\\",' + + // suppress full __metricscontainers log printed in FlinkPipelineRunner.createPortablePipelineResult + '\\"org.apache.beam.runners.flink.FlinkPipelineRunner\\":\\"WARN\\"}' ] def cmdArgs = mapToArgString([ "test_opts": testOpts, From 57ee1c5c4450f6ba2526e57593ac426e2e33bc6d Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 15 Nov 2022 15:41:10 -0500 Subject: [PATCH 2/2] Specify flinkConfDir --- .../jenkins/job_PostCommit_Python_Examples_Flink.groovy | 1 + sdks/python/test-suites/portable/common.gradle | 3 +++ 2 files changed, 4 insertions(+) diff --git a/.test-infra/jenkins/job_PostCommit_Python_Examples_Flink.groovy b/.test-infra/jenkins/job_PostCommit_Python_Examples_Flink.groovy index 779395bf7093..c1a44b8e9d43 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_Examples_Flink.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_Examples_Flink.groovy @@ -37,6 +37,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Examples_Flink', gradle { rootBuildScriptDir(commonJobProperties.checkoutDir) tasks(":sdks:python:test-suites:portable:flinkExamplesPostCommit") + switches("-PflinkConfDir=$WORKSPACE/src/runners/flink/src/test/resources") commonJobProperties.setGradleSwitches(delegate) } } diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index c87696793f0f..9585d0922046 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -317,6 +317,9 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { // suppress metric name collision warning logs '\\"org.apache.flink.runtime.metrics.groups\\":\\"ERROR\\"}' ] + if (project.hasProperty('flinkConfDir')) { + pipelineOpts += ["--flink-conf-dir=${project.property('flinkConfDir')}"] + } def cmdArgs = mapToArgString([ "test_opts": testOpts, "suite": "postCommitIT-flink-py${pythonVersionSuffix}",