From 779e8dfef6ae8a62c3374888b7e3820dbcfec737 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:13:57 -0700 Subject: [PATCH 1/3] [#28187] Add gradle targets to execute python tests with prism. --- .../runners/portability/prism_runner_test.py | 11 +++ sdks/python/pytest.ini | 1 + sdks/python/test-suites/gradle.properties | 5 ++ sdks/python/test-suites/portable/build.gradle | 25 ++++++ .../python/test-suites/portable/common.gradle | 81 +++++++++++++++++++ sdks/python/tox.ini | 4 + 6 files changed, 127 insertions(+) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 04a2dbd4faed..db0ac9362ab9 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -213,6 +213,17 @@ def test_expand_kafka_write(self): def test_sql(self): raise unittest.SkipTest("Requires an expansion service to execute.") + # The following tests require additional implementation in Prism. + + def test_custom_merging_window(self): + raise unittest.SkipTest("Requires Prism to support Custom Window Coders, and Merging Custom Windows. TODO(file issue)") + + def test_custom_window_type(self): + raise unittest.SkipTest("Requires Prism to support Custom Window Coders. TODO(file issue)") + + def test_pack_combiners(self): + raise unittest.SkipTest("Requires Prism to support coder: 'beam:coder:tuple:v1'. TODO(file issue)") + # Inherits all other tests. diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini index b10acaac71cd..b62a44aa25e7 100644 --- a/sdks/python/pytest.ini +++ b/sdks/python/pytest.ini @@ -49,6 +49,7 @@ markers = sickbay_direct: run without sickbay-direct sickbay_spark: run without sickbay-spark sickbay_flink: run without sickbay-flink + sickbay_prism: run without sickbay-prism sickbay_dataflow: run without sickbay-dataflow # Tests using this marker conflict with the xdist plugin in some way, such # as enabling save_main_session. diff --git a/sdks/python/test-suites/gradle.properties b/sdks/python/test-suites/gradle.properties index f8c04e0f5609..d027cd3144d3 100644 --- a/sdks/python/test-suites/gradle.properties +++ b/sdks/python/test-suites/gradle.properties @@ -47,5 +47,10 @@ samza_validates_runner_postcommit_py_versions=3.9,3.12 # spark runner test-suites spark_examples_postcommit_py_versions=3.9,3.12 +# prism runner test-suites +prism_validates_runner_precommit_py_versions=3.12 +prism_validates_runner_postcommit_py_versions=3.9,3.12 +prism_examples_postcommit_py_versions=3.9,3.12 + # cross language postcommit python test suites cross_language_validates_py_versions=3.9,3.12 diff --git a/sdks/python/test-suites/portable/build.gradle b/sdks/python/test-suites/portable/build.gradle index 390c39a10899..41cd88acfb6a 100644 --- a/sdks/python/test-suites/portable/build.gradle +++ b/sdks/python/test-suites/portable/build.gradle @@ -31,12 +31,31 @@ tasks.register("samzaValidatesRunner") { } } +tasks.register("prismValidatesRunner") { + getVersionsAsList('prism_validates_runner_postcommit_py_versions').each { + dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:prismValidatesRunner") + } +} + tasks.register("flinkValidatesRunnerPrecommit") { getVersionsAsList('flink_validates_runner_precommit_py_versions').each { dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:flinkValidatesRunner") } } +tasks.register("prismValidatesRunnerPrecommit") { + getVersionsAsList('prism_validates_runner_precommit_py_versions').each { + dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:prismValidatesRunner") + } +} + +// TODO merge with above once passing. Currently for convenience. +tasks.register("prismTriggerTranscript") { + getVersionsAsList('prism_validates_runner_precommit_py_versions').each { + dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:prismTriggerTranscript") + } +} + tasks.register("flinkExamplesPostCommit") { getVersionsAsList('flink_examples_postcommit_py_versions').each { dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:flinkExamples") @@ -48,3 +67,9 @@ tasks.register("sparkExamplesPostCommit") { dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:sparkExamples") } } + +tasks.register("prismExamplesPostCommit") { + getVersionsAsList('prism_examples_postcommit_py_versions').each { + dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:prismExamples") + } +} diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index 4f232c5b104f..5fd1b182a471 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -201,6 +201,56 @@ tasks.register("sparkValidatesRunner") { dependsOn 'sparkCompatibilityMatrixLOOPBACK' } +def createPrismRunnerTestTask(String workerType) { + def taskName = "prismCompatibilityMatrix${workerType}" + + def prismBin = "${rootDir}/runners/prism/build/tmp/prism" + def options = "--prism_bin=${prismBin} --environment_type=${workerType}" + if (workerType == 'PROCESS') { + options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh" + } + def task = toxTask(taskName, 'prism-runner-test', options) + task.configure { + dependsOn ":runners:prism:build" + // The Java SDK worker is required to execute external transforms. + def suffix = getSupportedJavaVersion() + dependsOn ":sdks:java:container:${suffix}:docker" + if (workerType == 'DOCKER') { + dependsOn pythonContainerTask + } else if (workerType == 'PROCESS') { + dependsOn createProcessWorker + } + } + return task +} + +createPrismRunnerTestTask('DOCKER') +createPrismRunnerTestTask('PROCESS') +createPrismRunnerTestTask('LOOPBACK') + +tasks.register("prismValidatesRunner") { + dependsOn 'prismCompatibilityMatrixLOOPBACK' +} + +tasks.register("prismTriggerTranscript") { + dependsOn 'setupVirtualenv' + dependsOn ':runners:prism:build' + def prismBin = "${rootDir}/runners/prism/build/tmp/prism" + doLast { + exec { + executable 'sh' + args '-c', """ + . ${envdir}/bin/activate \\ + && cd ${pythonRootDir} \\ + && pip install -e .[test] \\ + && pytest \\ + apache_beam/transforms/trigger_test.py::WeakTestStreamTranscriptTest \\ + --test-pipeline-options='--runner=PrismRunner --environment_type=LOOPBACK --prism_location=${prismBin}' + """ + } + } +} + project.tasks.register("preCommitPy${pythonVersionSuffix}") { dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker", ":runners:flink:${latestFlinkVersion}:job-server:shadowJar", @@ -283,6 +333,37 @@ project.tasks.register("sparkExamples") { } } +project.tasks.register("prismExamples") { + dependsOn = [ + 'setupVirtualenv', + 'installGcpTest', + ':runners:prism:build', + ] + def prismBin = "${rootDir}/runners/prism/build/tmp/prism" + doLast { + def testOpts = [ + "--log-cli-level=INFO", + ] + def pipelineOpts = [ + "--runner=PrismRunner", + "--project=apache-beam-testing", + "--environment_type=LOOPBACK", + "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", + "--prism_location=${prismBin}", + ] + def cmdArgs = mapToArgString([ + "test_opts": testOpts, + "suite": "postCommitExamples-prism-py${pythonVersionSuffix}", + "pipeline_opts": pipelineOpts.join(" "), + "collect": "examples_postcommit and not sickbay_prism" + ]) + exec { + executable 'sh' + args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs" + } + } +} + project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { dependsOn = [ 'setupVirtualenv', diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index d733fd17fb6b..2dfe0670ed0f 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -285,6 +285,10 @@ extras = test commands = bash {toxinidir}/scripts/pytest_validates_runner.sh {envname} {toxinidir}/apache_beam/runners/portability/spark_runner_test.py {posargs} +[testenv:prism-runner-test] +extras = test +commands = + bash {toxinidir}/scripts/pytest_validates_runner.sh {envname} {toxinidir}/apache_beam/runners/portability/prism_runner_test.py {posargs} [testenv:py{39,310}-pyarrow-{3,9,10,11,12,13,14,15,16}] deps = From 201bdb95aeba06bbdbcb4b21c4974470dbea44b7 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:19:09 -0700 Subject: [PATCH 2/3] Add issues for non-expansion service skipped tests. --- .../apache_beam/runners/portability/prism_runner_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index db0ac9362ab9..1578a626368b 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -216,13 +216,13 @@ def test_sql(self): # The following tests require additional implementation in Prism. def test_custom_merging_window(self): - raise unittest.SkipTest("Requires Prism to support Custom Window Coders, and Merging Custom Windows. TODO(file issue)") + raise unittest.SkipTest("Requires Prism to support Custom Window Coders, and Merging Custom Windows. https://github.com/apache/beam/issues/31921") def test_custom_window_type(self): - raise unittest.SkipTest("Requires Prism to support Custom Window Coders. TODO(file issue)") + raise unittest.SkipTest("Requires Prism to support Custom Window Coders. https://github.com/apache/beam/issues/31921") def test_pack_combiners(self): - raise unittest.SkipTest("Requires Prism to support coder: 'beam:coder:tuple:v1'. TODO(file issue)") + raise unittest.SkipTest("Requires Prism to support coder: 'beam:coder:tuple:v1'. https://github.com/apache/beam/issues/32636") # Inherits all other tests. From 8eb2309792985aed4818dc1749ff4b3b92561e7e Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 3 Oct 2024 15:09:25 -0700 Subject: [PATCH 3/3] Format diff. --- .../runners/portability/prism_runner_test.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 1578a626368b..bc72d551f966 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -216,13 +216,20 @@ def test_sql(self): # The following tests require additional implementation in Prism. def test_custom_merging_window(self): - raise unittest.SkipTest("Requires Prism to support Custom Window Coders, and Merging Custom Windows. https://github.com/apache/beam/issues/31921") + raise unittest.SkipTest( + "Requires Prism to support Custom Window " + + "Coders, and Merging Custom Windows. " + + "https://github.com/apache/beam/issues/31921") def test_custom_window_type(self): - raise unittest.SkipTest("Requires Prism to support Custom Window Coders. https://github.com/apache/beam/issues/31921") + raise unittest.SkipTest( + "Requires Prism to support Custom Window Coders." + + " https://github.com/apache/beam/issues/31921") def test_pack_combiners(self): - raise unittest.SkipTest("Requires Prism to support coder: 'beam:coder:tuple:v1'. https://github.com/apache/beam/issues/32636") + raise unittest.SkipTest( + "Requires Prism to support coder:" + + " 'beam:coder:tuple:v1'. https://github.com/apache/beam/issues/32636") # Inherits all other tests.