diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 04a2dbd4faed..db0ac9362ab9 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -213,6 +213,17 @@ def test_expand_kafka_write(self): def test_sql(self): raise unittest.SkipTest("Requires an expansion service to execute.") + # The following tests require additional implementation in Prism. + + def test_custom_merging_window(self): + raise unittest.SkipTest("Requires Prism to support Custom Window Coders, and Merging Custom Windows. TODO(file issue)") + + def test_custom_window_type(self): + raise unittest.SkipTest("Requires Prism to support Custom Window Coders. TODO(file issue)") + + def test_pack_combiners(self): + raise unittest.SkipTest("Requires Prism to support coder: 'beam:coder:tuple:v1'. TODO(file issue)") + # Inherits all other tests. diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini index b10acaac71cd..b62a44aa25e7 100644 --- a/sdks/python/pytest.ini +++ b/sdks/python/pytest.ini @@ -49,6 +49,7 @@ markers = sickbay_direct: run without sickbay-direct sickbay_spark: run without sickbay-spark sickbay_flink: run without sickbay-flink + sickbay_prism: run without sickbay-prism sickbay_dataflow: run without sickbay-dataflow # Tests using this marker conflict with the xdist plugin in some way, such # as enabling save_main_session. diff --git a/sdks/python/test-suites/gradle.properties b/sdks/python/test-suites/gradle.properties index f8c04e0f5609..d027cd3144d3 100644 --- a/sdks/python/test-suites/gradle.properties +++ b/sdks/python/test-suites/gradle.properties @@ -47,5 +47,10 @@ samza_validates_runner_postcommit_py_versions=3.9,3.12 # spark runner test-suites spark_examples_postcommit_py_versions=3.9,3.12 +# prism runner test-suites +prism_validates_runner_precommit_py_versions=3.12 +prism_validates_runner_postcommit_py_versions=3.9,3.12 +prism_examples_postcommit_py_versions=3.9,3.12 + # cross language postcommit python test suites cross_language_validates_py_versions=3.9,3.12 diff --git a/sdks/python/test-suites/portable/build.gradle b/sdks/python/test-suites/portable/build.gradle index 390c39a10899..41cd88acfb6a 100644 --- a/sdks/python/test-suites/portable/build.gradle +++ b/sdks/python/test-suites/portable/build.gradle @@ -31,12 +31,31 @@ tasks.register("samzaValidatesRunner") { } } +tasks.register("prismValidatesRunner") { + getVersionsAsList('prism_validates_runner_postcommit_py_versions').each { + dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:prismValidatesRunner") + } +} + tasks.register("flinkValidatesRunnerPrecommit") { getVersionsAsList('flink_validates_runner_precommit_py_versions').each { dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:flinkValidatesRunner") } } +tasks.register("prismValidatesRunnerPrecommit") { + getVersionsAsList('prism_validates_runner_precommit_py_versions').each { + dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:prismValidatesRunner") + } +} + +// TODO merge with above once passing. Currently for convenience. +tasks.register("prismTriggerTranscript") { + getVersionsAsList('prism_validates_runner_precommit_py_versions').each { + dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:prismTriggerTranscript") + } +} + tasks.register("flinkExamplesPostCommit") { getVersionsAsList('flink_examples_postcommit_py_versions').each { dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:flinkExamples") @@ -48,3 +67,9 @@ tasks.register("sparkExamplesPostCommit") { dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:sparkExamples") } } + +tasks.register("prismExamplesPostCommit") { + getVersionsAsList('prism_examples_postcommit_py_versions').each { + dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:prismExamples") + } +} diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index 4f232c5b104f..5fd1b182a471 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -201,6 +201,56 @@ tasks.register("sparkValidatesRunner") { dependsOn 'sparkCompatibilityMatrixLOOPBACK' } +def createPrismRunnerTestTask(String workerType) { + def taskName = "prismCompatibilityMatrix${workerType}" + + def prismBin = "${rootDir}/runners/prism/build/tmp/prism" + def options = "--prism_bin=${prismBin} --environment_type=${workerType}" + if (workerType == 'PROCESS') { + options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh" + } + def task = toxTask(taskName, 'prism-runner-test', options) + task.configure { + dependsOn ":runners:prism:build" + // The Java SDK worker is required to execute external transforms. + def suffix = getSupportedJavaVersion() + dependsOn ":sdks:java:container:${suffix}:docker" + if (workerType == 'DOCKER') { + dependsOn pythonContainerTask + } else if (workerType == 'PROCESS') { + dependsOn createProcessWorker + } + } + return task +} + +createPrismRunnerTestTask('DOCKER') +createPrismRunnerTestTask('PROCESS') +createPrismRunnerTestTask('LOOPBACK') + +tasks.register("prismValidatesRunner") { + dependsOn 'prismCompatibilityMatrixLOOPBACK' +} + +tasks.register("prismTriggerTranscript") { + dependsOn 'setupVirtualenv' + dependsOn ':runners:prism:build' + def prismBin = "${rootDir}/runners/prism/build/tmp/prism" + doLast { + exec { + executable 'sh' + args '-c', """ + . ${envdir}/bin/activate \\ + && cd ${pythonRootDir} \\ + && pip install -e .[test] \\ + && pytest \\ + apache_beam/transforms/trigger_test.py::WeakTestStreamTranscriptTest \\ + --test-pipeline-options='--runner=PrismRunner --environment_type=LOOPBACK --prism_location=${prismBin}' + """ + } + } +} + project.tasks.register("preCommitPy${pythonVersionSuffix}") { dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker", ":runners:flink:${latestFlinkVersion}:job-server:shadowJar", @@ -283,6 +333,37 @@ project.tasks.register("sparkExamples") { } } +project.tasks.register("prismExamples") { + dependsOn = [ + 'setupVirtualenv', + 'installGcpTest', + ':runners:prism:build', + ] + def prismBin = "${rootDir}/runners/prism/build/tmp/prism" + doLast { + def testOpts = [ + "--log-cli-level=INFO", + ] + def pipelineOpts = [ + "--runner=PrismRunner", + "--project=apache-beam-testing", + "--environment_type=LOOPBACK", + "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", + "--prism_location=${prismBin}", + ] + def cmdArgs = mapToArgString([ + "test_opts": testOpts, + "suite": "postCommitExamples-prism-py${pythonVersionSuffix}", + "pipeline_opts": pipelineOpts.join(" "), + "collect": "examples_postcommit and not sickbay_prism" + ]) + exec { + executable 'sh' + args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs" + } + } +} + project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { dependsOn = [ 'setupVirtualenv', diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index d733fd17fb6b..2dfe0670ed0f 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -285,6 +285,10 @@ extras = test commands = bash {toxinidir}/scripts/pytest_validates_runner.sh {envname} {toxinidir}/apache_beam/runners/portability/spark_runner_test.py {posargs} +[testenv:prism-runner-test] +extras = test +commands = + bash {toxinidir}/scripts/pytest_validates_runner.sh {envname} {toxinidir}/apache_beam/runners/portability/prism_runner_test.py {posargs} [testenv:py{39,310}-pyarrow-{3,9,10,11,12,13,14,15,16}] deps =