From 11f9bce485c4f6fe466ff4bf5073d2414e43678c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Thu, 22 Feb 2024 16:13:55 -0500 Subject: [PATCH] Generate external transform wrappers using a script (#29834) --- ...eam_PostCommit_Python_Examples_Direct.json | 0 ..._PostCommit_Python_Xlang_Gcp_Dataflow.json | 1 + ...am_PostCommit_Python_Xlang_Gcp_Direct.json | 0 ...m_PostCommit_Python_Xlang_IO_Dataflow.json | 0 .github/workflows/README.md | 1 + ...m_PreCommit_Xlang_Generated_Transforms.yml | 114 +++++ .gitignore | 1 + CHANGES.md | 1 + build.gradle.kts | 5 + .../beam/gradle/BeamModulePlugin.groovy | 100 ++-- .../release/test/resources/mass_comment.txt | 2 + ...nerateSequenceSchemaTransformProvider.java | 2 +- sdks/python/MANIFEST.in | 1 + sdks/python/apache_beam/io/__init__.py | 1 + .../io/external/xlang_bigqueryio_it_test.py | 23 +- .../apache_beam/io/gcp/bigtableio_it_test.py | 22 +- .../transforms/external_transform_provider.py | 8 +- .../external_transform_provider_it_test.py | 413 ++++++++++++++++ .../external_transform_provider_test.py | 140 ------ .../apache_beam/transforms/xlang/__init__.py | 27 ++ sdks/python/build.gradle | 19 + sdks/python/gen_xlang_wrappers.py | 447 ++++++++++++++++++ sdks/python/pyproject.toml | 6 + sdks/python/pytest.ini | 1 + sdks/python/python_xlang_wrapper.template | 36 ++ sdks/python/setup.py | 49 +- .../python/test-suites/dataflow/common.gradle | 5 +- sdks/python/test-suites/direct/build.gradle | 7 + sdks/python/test-suites/direct/common.gradle | 8 +- sdks/python/test-suites/xlang/build.gradle | 56 +-- sdks/python/tox.ini | 4 +- sdks/standard_expansion_services.yaml | 77 +++ sdks/standard_external_transforms.yaml | 52 ++ 33 files changed, 1347 insertions(+), 282 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Python_Examples_Direct.json create mode 100644 .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json create mode 100644 .github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json create mode 100644 .github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml create mode 100644 sdks/python/apache_beam/transforms/external_transform_provider_it_test.py delete mode 100644 sdks/python/apache_beam/transforms/external_transform_provider_test.py create mode 100644 sdks/python/apache_beam/transforms/xlang/__init__.py create mode 100644 sdks/python/gen_xlang_wrappers.py create mode 100644 sdks/python/python_xlang_wrapper.template create mode 100644 sdks/standard_expansion_services.yaml create mode 100644 sdks/standard_external_transforms.yaml diff --git a/.github/trigger_files/beam_PostCommit_Python_Examples_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Examples_Direct.json new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json index e69de29bb2d1..8b137891791f 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json @@ -0,0 +1 @@ + diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.github/workflows/README.md b/.github/workflows/README.md index f882553ef9bf..42eaaaafdace 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -271,6 +271,7 @@ PreCommit Jobs run in a schedule and also get triggered in a PR if relevant sour | [ PreCommit Website ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml) | N/A |`Run Website PreCommit`| [![.github/workflows/beam_PreCommit_Website.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml?query=event%3Aschedule) | | [ PreCommit Website Stage GCS ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml) | N/A |`Run Website_Stage_GCS PreCommit`| [![.github/workflows/beam_PreCommit_Website_Stage_GCS.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml?query=event%3Aschedule) | | [ PreCommit Whitespace ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml) | N/A |`Run Whitespace PreCommit`| [![.github/workflows/beam_PreCommit_Whitespace.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml?query=event%3Aschedule) | +| [ PreCommit Xlang Generated Transforms ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml) | N/A |`Run Xlang_Generated_Transforms PreCommit`| [![.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml?query=event%3Aschedule) | ### PostCommit Jobs diff --git a/.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml b/.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml new file mode 100644 index 000000000000..f8d64eb5d4a6 --- /dev/null +++ b/.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml @@ -0,0 +1,114 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: PreCommit Xlang Generated Transforms + +on: + push: + tags: ['v*'] + branches: ['master', 'release-*'] + paths: + - 'model/**' + - 'sdks/python/**' + - 'sdks/java/expansion-service/**' + - 'sdks/java/core/**' + - 'sdks/java/io/**' + - 'sdks/java/extensions/sql/**' + - 'release/**' + - '.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml' + pull_request_target: + branches: ['master', 'release-*'] + paths: + - 'model/**' + - 'sdks/python/**' + - 'sdks/java/expansion-service/**' + - 'sdks/java/core/**' + - 'sdks/java/io/**' + - 'sdks/java/extensions/sql/**' + - 'release/**' + - 'release/trigger_all_tests.json' + - '.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml' + issue_comment: + types: [created] + schedule: + - cron: '30 2/6 * * *' + workflow_dispatch: + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: read + checks: read + contents: read + deployments: read + id-token: none + issues: read + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +env: + GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + beam_PreCommit_Xlang_Generated_Transforms: + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }} ${{ matrix.python_version }}) + timeout-minutes: 120 + runs-on: ['self-hosted', ubuntu-20.04, main] + strategy: + fail-fast: false + matrix: + job_name: ['beam_PreCommit_Xlang_Generated_Transforms'] + job_phrase: ['Run Xlang_Generated_Transforms PreCommit'] + python_version: ['3.8'] + if: | + github.event_name == 'push' || + github.event_name == 'workflow_dispatch' || + github.event_name == 'pull_request_target' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + startsWith(github.event.comment.body, 'Run Xlang_Generated_Transforms PreCommit') + steps: + - uses: actions/checkout@v4 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }} + - name: Setup environment + uses: ./.github/actions/setup-environment-action + with: + java-version: 8 + python-version: ${{ matrix.python_version }} + - name: Set PY_VER_CLEAN + id: set_py_ver_clean + run: | + PY_VER=${{ matrix.python_version }} + PY_VER_CLEAN=${PY_VER//.} + echo "py_ver_clean=$PY_VER_CLEAN" >> $GITHUB_OUTPUT + - name: run Cross-Language Wrapper Validation script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :sdks:python:test-suites:direct:crossLanguageWrapperValidationPreCommit \ No newline at end of file diff --git a/.gitignore b/.gitignore index 3f7d7af44a8f..c76441078f3d 100644 --- a/.gitignore +++ b/.gitignore @@ -52,6 +52,7 @@ sdks/python/**/*.egg sdks/python/LICENSE sdks/python/NOTICE sdks/python/README.md +sdks/python/apache_beam/transforms/xlang/* sdks/python/apache_beam/portability/api/* sdks/python/apache_beam/yaml/docs/* sdks/python/nosetests*.xml diff --git a/CHANGES.md b/CHANGES.md index 249897b16975..2e23d72e664a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -58,6 +58,7 @@ * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). +* The Python SDK will now include automatically generated wrappers for external Java transforms! ([#29834](https://github.com/apache/beam/pull/29834)) ## I/Os diff --git a/build.gradle.kts b/build.gradle.kts index 67653e59c0d9..788d9a48dc04 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -647,6 +647,11 @@ tasks.register("checkSetup") { dependsOn(":examples:java:wordCount") } +// Generates external transform config +project.tasks.register("generateExternalTransformsConfig") { + dependsOn(":sdks:python:generateExternalTransformsConfig") +} + // Configure the release plugin to do only local work; the release manager determines what, if // anything, to push. On failure, the release manager can reset the branch without pushing. release { diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 5b73940b99dd..6434746fd3ab 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -321,19 +321,17 @@ class BeamModulePlugin implements Plugin { // A class defining the common properties in a given suite of cross-language tests // Properties are shared across runners and are used when creating a CrossLanguageUsingJavaExpansionConfiguration object - static class CrossLanguageTaskCommon { + static class CrossLanguageTask { // Used as the task name for cross-language String name - // The expansion service's project path (required) - String expansionProjectPath + // List of project paths for required expansion services + List expansionProjectPaths // Collect Python pipeline tests with this marker String collectMarker - // Job server startup task. - TaskProvider startJobServer - // Job server cleanup task. - TaskProvider cleanupJobServer - // any additional environment variables specific to the suite of tests + // Additional environment variables to set before running tests Map additionalEnvs + // Additional Python dependencies to install before running tests + List additionalDeps } // A class defining the configuration for CrossLanguageUsingJavaExpansion. @@ -349,18 +347,16 @@ class BeamModulePlugin implements Plugin { ] // Additional pytest options List pytestOptions = [] - // Job server startup task. - TaskProvider startJobServer - // Job server cleanup task. - TaskProvider cleanupJobServer // Number of parallel test runs. Integer numParallelTests = 1 - // Project path for the expansion service to start up - String expansionProjectPath + // List of project paths for required expansion services + List expansionProjectPaths // Collect Python pipeline tests with this marker String collectMarker // any additional environment variables to be exported Map additionalEnvs + // Additional Python dependencies to install before running tests + List additionalDeps } // A class defining the configuration for CrossLanguageValidatesRunner. @@ -2576,7 +2572,7 @@ class BeamModulePlugin implements Plugin { /** ***********************************************************************************************/ // Method to create the createCrossLanguageUsingJavaExpansionTask. // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter. - // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service + // This method creates a task that runs Python SDK test-suites that use external Java transforms project.ext.createCrossLanguageUsingJavaExpansionTask = { // This task won't work if the python build file doesn't exist. if (!project.project(":sdks:python").buildFile.exists()) { @@ -2586,49 +2582,29 @@ class BeamModulePlugin implements Plugin { def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration() project.evaluationDependsOn(":sdks:python") - project.evaluationDependsOn(config.expansionProjectPath) + for (path in config.expansionProjectPaths) { + project.evaluationDependsOn(path) + } project.evaluationDependsOn(":sdks:java:extensions:python") - // Setting up args to launch the expansion service def pythonDir = project.project(":sdks:python").projectDir - def javaExpansionPort = -1 // will be populated in setupTask - def expansionJar = project.project(config.expansionProjectPath).shadowJar.archivePath - def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath() - def expansionServiceOpts = [ - "group_id": project.name, - "java_expansion_service_jar": expansionJar, - "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile, - ] def usesDataflowRunner = config.pythonPipelineOptions.contains("--runner=TestDataflowRunner") || config.pythonPipelineOptions.contains("--runner=DataflowRunner") def javaContainerSuffix = getSupportedJavaVersion() - // 1. Builds the chosen expansion service jar and launches it - def setupTask = project.tasks.register(config.name+"Setup") { - dependsOn ':sdks:java:container:' + javaContainerSuffix + ':docker' - dependsOn project.project(config.expansionProjectPath).shadowJar.getPath() - dependsOn 'installGcpTest' + // Sets up, collects, and runs Python pipeline tests + project.tasks.register(config.name+"PythonUsingJava") { + group = "Verification" + description = "Runs Python SDK pipeline tests that use a Java expansion service" + // Each expansion service we use needs to be built before running these tests + // The built jars will be started up automatically using the BeamJarExpansionService utility + for (path in config.expansionProjectPaths) { + dependsOn project.project(path).shadowJar.getPath() + } + dependsOn ":sdks:java:container:$javaContainerSuffix:docker" + dependsOn "installGcpTest" if (usesDataflowRunner) { dependsOn ":sdks:python:test-suites:dataflow:py${project.ext.pythonVersion.replace('.', '')}:initializeForDataflowJob" } - doLast { - project.exec { - // Prepare a port to use for the expansion service - javaExpansionPort = getRandomPort() - expansionServiceOpts.put("java_port", javaExpansionPort) - // setup test env - def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts) - executable 'sh' - args '-c', ". ${project.ext.envdir}/bin/activate && $pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name} && $pythonDir/scripts/run_expansion_services.sh start $serviceArgs" - } - } - } - - // 2. Sets up, collects, and runs Python pipeline tests - def pythonTask = project.tasks.register(config.name+"PythonUsingJava") { - group = "Verification" - description = "Runs Python SDK pipeline tests that use a Java expansion service" - dependsOn setupTask - dependsOn config.startJobServer doLast { def beamPythonTestPipelineOptions = [ "pipeline_opts": config.pythonPipelineOptions + (usesDataflowRunner ? [ @@ -2641,29 +2617,19 @@ class BeamModulePlugin implements Plugin { def cmdArgs = project.project(':sdks:python').mapToArgString(beamPythonTestPipelineOptions) project.exec { - environment "EXPANSION_JAR", expansionJar - environment "EXPANSION_PORT", javaExpansionPort - for (envs in config.additionalEnvs){ - environment envs.getKey(), envs.getValue() + // environment variable to indicate that jars have been built + environment "EXPANSION_JARS", config.expansionProjectPaths + String additionalDependencyCmd = "" + if (config.additionalDeps != null && !config.additionalDeps.isEmpty()){ + additionalDependencyCmd = "&& pip install ${config.additionalDeps.join(' ')} " } executable 'sh' - args '-c', ". ${project.ext.envdir}/bin/activate && cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs" + args '-c', ". ${project.ext.envdir}/bin/activate " + + additionalDependencyCmd + + "&& cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs" } } } - - // 3. Shuts down the expansion service - def cleanupTask = project.tasks.register(config.name+'Cleanup', Exec) { - // teardown test env - executable 'sh' - args '-c', ". ${project.ext.envdir}/bin/activate && $pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name}" - } - - setupTask.configure {finalizedBy cleanupTask} - config.startJobServer.configure {finalizedBy config.cleanupJobServer} - - cleanupTask.configure{mustRunAfter pythonTask} - config.cleanupJobServer.configure{mustRunAfter pythonTask} } /** ***********************************************************************************************/ diff --git a/scripts/ci/release/test/resources/mass_comment.txt b/scripts/ci/release/test/resources/mass_comment.txt index 93468b0c961b..1f6f340eb0b7 100644 --- a/scripts/ci/release/test/resources/mass_comment.txt +++ b/scripts/ci/release/test/resources/mass_comment.txt @@ -79,6 +79,7 @@ Run PythonDocs PreCommit Run PythonFormatter PreCommit Run PythonLint PreCommit Run Python_PVR_Flink PreCommit +Run Python_Xlang_Gcp_Direct PostCommit Run RAT PreCommit Run SQL PostCommit Run SQL PreCommit @@ -94,6 +95,7 @@ Run Twister2 ValidatesRunner Run Typescript PreCommit Run ULR Loopback ValidatesRunner Run Whitespace PreCommit +Run Xlang_Generated_Transforms PreCommit Run XVR_Direct PostCommit Run XVR_Flink PostCommit Run XVR_JavaUsingPython_Dataflow PostCommit diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java index 4b693f883fb7..d9dfc2a90bd8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java @@ -66,7 +66,7 @@ public List outputCollectionNames() { public String description() { return String.format( "Outputs a PCollection of Beam Rows, each containing a single INT64 " - + "number called \"value\". The count is produced from the given \"start\"" + + "number called \"value\". The count is produced from the given \"start\" " + "value and either up to the given \"end\" or until 2^63 - 1.%n" + "To produce an unbounded PCollection, simply do not specify an \"end\" value. " + "Unbounded sequences can specify a \"rate\" for output elements.%n" diff --git a/sdks/python/MANIFEST.in b/sdks/python/MANIFEST.in index 60b0989d9af7..d40273a9340d 100644 --- a/sdks/python/MANIFEST.in +++ b/sdks/python/MANIFEST.in @@ -16,6 +16,7 @@ # include gen_protos.py +include gen_xlang_wrappers.py include README.md include NOTICE include LICENSE diff --git a/sdks/python/apache_beam/io/__init__.py b/sdks/python/apache_beam/io/__init__.py index 4945da97d90f..83d45d81a5a1 100644 --- a/sdks/python/apache_beam/io/__init__.py +++ b/sdks/python/apache_beam/io/__init__.py @@ -36,6 +36,7 @@ from apache_beam.io.gcp.bigquery import * from apache_beam.io.gcp.pubsub import * from apache_beam.io.gcp import gcsio + from apache_beam.transforms.xlang.io import * except ImportError: pass # pylint: enable=wrong-import-order, wrong-import-position diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index c1e9754526e8..cfbb411b4e5f 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -53,6 +53,10 @@ @pytest.mark.uses_gcp_java_expansion_service +@unittest.skipUnless( + os.environ.get('EXPANSION_JARS'), + "EXPANSION_JARS environment var is not provided, " + "indicating that jars have not been built") class BigQueryXlangStorageWriteIT(unittest.TestCase): BIGQUERY_DATASET = 'python_xlang_storage_write' @@ -112,10 +116,6 @@ def setUp(self): _LOGGER.info( "Created dataset %s in project %s", self.dataset_id, self.project) - self.assertTrue( - os.environ.get('EXPANSION_PORT'), "Expansion service port not found!") - self.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT')) - def tearDown(self): try: _LOGGER.info( @@ -162,8 +162,7 @@ def run_storage_write_test( table=table_id, method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, schema=schema, - use_at_least_once=use_at_least_once, - expansion_service=self.expansion_service)) + use_at_least_once=use_at_least_once)) hamcrest_assert(p, bq_matcher) def test_all_types(self): @@ -243,8 +242,7 @@ def test_write_with_beam_rows(self): _ = ( p | beam.Create(row_elements) - | StorageWriteToBigQuery( - table=table_id, expansion_service=self.expansion_service)) + | StorageWriteToBigQuery(table=table_id)) hamcrest_assert(p, bq_matcher) def test_write_to_dynamic_destinations(self): @@ -268,8 +266,7 @@ def test_write_to_dynamic_destinations(self): table=lambda record: spec_with_project + str(record['int']), method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, schema=self.ALL_TYPES_SCHEMA, - use_at_least_once=False, - expansion_service=self.expansion_service)) + use_at_least_once=False)) hamcrest_assert(p, all_of(*bq_matchers)) def test_write_to_dynamic_destinations_with_beam_rows(self): @@ -303,8 +300,7 @@ def test_write_to_dynamic_destinations_with_beam_rows(self): | beam.io.WriteToBigQuery( table=lambda record: spec_with_project + str(record.my_int), method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - use_at_least_once=False, - expansion_service=self.expansion_service)) + use_at_least_once=False)) hamcrest_assert(p, all_of(*bq_matchers)) def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): @@ -335,8 +331,7 @@ def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): triggering_frequency=1, with_auto_sharding=auto_sharding, num_storage_api_streams=num_streams, - use_at_least_once=use_at_least_once, - expansion_service=self.expansion_service)) + use_at_least_once=use_at_least_once)) hamcrest_assert(p, bq_matcher) def skip_if_not_dataflow_runner(self) -> bool: diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index 867dca9a5e7e..13909cded1ff 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -58,10 +58,11 @@ def instance_prefix(instance): @pytest.mark.uses_gcp_java_expansion_service @pytest.mark.uses_transform_service -@unittest.skipUnless( - os.environ.get('EXPANSION_PORT'), - "EXPANSION_PORT environment var is not provided.") @unittest.skipIf(client is None, 'Bigtable dependencies are not installed') +@unittest.skipUnless( + os.environ.get('EXPANSION_JARS'), + "EXPANSION_JARS environment var is not provided, " + "indicating that jars have not been built") class TestReadFromBigTableIT(unittest.TestCase): INSTANCE = "bt-read-tests" TABLE_ID = "test-table" @@ -70,7 +71,6 @@ def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) self.args = self.test_pipeline.get_full_options_as_args() self.project = self.test_pipeline.get_option('project') - self.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT')) instance_id = instance_prefix(self.INSTANCE) @@ -141,8 +141,7 @@ def test_read_xlang(self): | bigtableio.ReadFromBigtable( project_id=self.project, instance_id=self.instance.instance_id, - table_id=self.table.table_id, - expansion_service=self.expansion_service) + table_id=self.table.table_id) | "Extract cells" >> beam.Map(lambda row: row._cells)) assert_that(cells, equal_to(expected_cells)) @@ -150,10 +149,11 @@ def test_read_xlang(self): @pytest.mark.uses_gcp_java_expansion_service @pytest.mark.uses_transform_service -@unittest.skipUnless( - os.environ.get('EXPANSION_PORT'), - "EXPANSION_PORT environment var is not provided.") @unittest.skipIf(client is None, 'Bigtable dependencies are not installed') +@unittest.skipUnless( + os.environ.get('EXPANSION_JARS'), + "EXPANSION_JARS environment var is not provided, " + "indicating that jars have not been built") class TestWriteToBigtableXlangIT(unittest.TestCase): # These are integration tests for the cross-language write transform. INSTANCE = "bt-write-xlang" @@ -164,7 +164,6 @@ def setUpClass(cls): cls.test_pipeline = TestPipeline(is_integration_test=True) cls.project = cls.test_pipeline.get_option('project') cls.args = cls.test_pipeline.get_full_options_as_args() - cls.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT')) instance_id = instance_prefix(cls.INSTANCE) @@ -215,8 +214,7 @@ def run_pipeline(self, rows): project_id=self.project, instance_id=self.instance.instance_id, table_id=self.table.table_id, - use_cross_language=True, - expansion_service=self.expansion_service)) + use_cross_language=True)) def test_set_mutation(self): row1: DirectRow = DirectRow('key-1') diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py b/sdks/python/apache_beam/transforms/external_transform_provider.py index 26cc31471e69..2799bd1b9e93 100644 --- a/sdks/python/apache_beam/transforms/external_transform_provider.py +++ b/sdks/python/apache_beam/transforms/external_transform_provider.py @@ -144,7 +144,7 @@ class ExternalTransformProvider: A :class:`ExternalTransform` subclass is generated for each external transform, and is named based on what can be inferred from the URN - (see :param urn_pattern). + (see the `urn_pattern` parameter). These classes are generated when :class:`ExternalTransformProvider` is initialized. We need to give it one or more expansion service addresses that @@ -256,7 +256,7 @@ def _create_wrappers(self): if skipped_urns: logging.info( - "Skipped URN(s) in %s that don't follow the pattern [%s]: %s", + "Skipped URN(s) in %s that don't follow the pattern \"%s\": %s", target, self._urn_pattern, skipped_urns) @@ -268,6 +268,10 @@ def get_available(self) -> List[Tuple[str, str]]: """Get a list of available ExternalTransform names and identifiers""" return list(self._name_to_urn.items()) + def get_all(self) -> Dict[str, ExternalTransform]: + """Get all ExternalTransform""" + return self._transforms + def get(self, name) -> ExternalTransform: """Get an ExternalTransform by its inferred class name""" return self._transforms[self._name_to_urn[name]] diff --git a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py new file mode 100644 index 000000000000..a53001c85fd3 --- /dev/null +++ b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py @@ -0,0 +1,413 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import importlib +import logging +import os +import secrets +import shutil +import time +import typing +import unittest +from os.path import dirname + +import numpy +import pytest +import yaml + +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external_transform_provider import STANDARD_URN_PATTERN +from apache_beam.transforms.external_transform_provider import ExternalTransform +from apache_beam.transforms.external_transform_provider import ExternalTransformProvider +from apache_beam.transforms.external_transform_provider import camel_case_to_snake_case +from apache_beam.transforms.external_transform_provider import infer_name_from_identifier +from apache_beam.transforms.external_transform_provider import snake_case_to_lower_camel_case +from apache_beam.transforms.external_transform_provider import snake_case_to_upper_camel_case +from apache_beam.transforms.xlang.io import GenerateSequence + + +class NameAndTypeUtilsTest(unittest.TestCase): + def test_snake_case_to_upper_camel_case(self): + test_cases = [("", ""), ("test", "Test"), ("test_name", "TestName"), + ("test_double_underscore", "TestDoubleUnderscore"), + ("TEST_CAPITALIZED", "TestCapitalized"), + ("_prepended_underscore", "PrependedUnderscore"), + ("appended_underscore_", "AppendedUnderscore")] + for case in test_cases: + self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0])) + + def test_snake_case_to_lower_camel_case(self): + test_cases = [("", ""), ("test", "test"), ("test_name", "testName"), + ("test_double_underscore", "testDoubleUnderscore"), + ("TEST_CAPITALIZED", "testCapitalized"), + ("_prepended_underscore", "prependedUnderscore"), + ("appended_underscore_", "appendedUnderscore")] + for case in test_cases: + self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0])) + + def test_camel_case_to_snake_case(self): + test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"), + ("TestDoubleUnderscore", + "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"), + ("BEGINNINGAllCaps", + "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"), + ("AllCapsMIDDLEWord", "all_caps_middle_word"), + ("lowerCamelCase", "lower_camel_case")] + for case in test_cases: + self.assertEqual(case[1], camel_case_to_snake_case(case[0])) + + def test_infer_name_from_identifier(self): + standard_test_cases = [ + ("beam:schematransform:org.apache.beam:transform:v1", "Transform"), + ("beam:schematransform:org.apache.beam:my_transform:v1", + "MyTransform"), ( + "beam:schematransform:org.apache.beam:my_transform:v2", + "MyTransformV2"), + ("beam:schematransform:org.apache.beam:fe_fi_fo_fum:v2", "FeFiFoFumV2"), + ("beam:schematransform:bad_match:my_transform:v1", None) + ] + for case in standard_test_cases: + self.assertEqual( + case[1], infer_name_from_identifier(case[0], STANDARD_URN_PATTERN)) + + custom_pattern_cases = [ + # (, , ) + ( + r"^custom:transform:([\w-]+):(\w+)$", + "custom:transform:my_transform:v1", + "MyTransformV1"), + ( + r"^org.user:([\w-]+):([\w-]+):([\w-]+):external$", + "org.user:some:custom_transform:we_made:external", + "SomeCustomTransformWeMade"), + ( + r"^([\w-]+):user.transforms", + "my_eXTErnal:user.transforms", + "MyExternal"), + (r"^([\w-]+):user.transforms", "my_external:badinput.transforms", None), + ] + for case in custom_pattern_cases: + self.assertEqual(case[2], infer_name_from_identifier(case[1], case[0])) + + +@pytest.mark.uses_io_java_expansion_service +@unittest.skipUnless( + os.environ.get('EXPANSION_JARS'), + "EXPANSION_JARS environment var is not provided, " + "indicating that jars have not been built") +class ExternalTransformProviderIT(unittest.TestCase): + def test_generate_sequence_config_schema_and_description(self): + provider = ExternalTransformProvider( + BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) + + self.assertTrue(( + 'GenerateSequence', + 'beam:schematransform:org.apache.beam:generate_sequence:v1' + ) in provider.get_available()) + + GenerateSequence = provider.get('GenerateSequence') + config_schema = GenerateSequence.configuration_schema + for param in ['start', 'end', 'rate']: + self.assertTrue(param in config_schema) + + description_substring = ( + "Outputs a PCollection of Beam Rows, each " + "containing a single INT64") + self.assertTrue(description_substring in GenerateSequence.description) + + def test_run_generate_sequence(self): + provider = ExternalTransformProvider( + BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) + + with beam.Pipeline() as p: + numbers = p | provider.GenerateSequence( + start=0, end=10) | beam.Map(lambda row: row.value) + + assert_that(numbers, equal_to([i for i in range(10)])) + + +@pytest.mark.xlang_wrapper_generation +@unittest.skipUnless( + os.environ.get('EXPANSION_JARS'), + "EXPANSION_JARS environment var is not provided, " + "indicating that jars have not been built") +class AutoGenerationScriptIT(unittest.TestCase): + """ + This class tests the generation and regeneration operations in + `gen_xlang_wrappers.py`. + """ + + # tests cases will use GenerateSequence + GEN_SEQ_IDENTIFIER = \ + 'beam:schematransform:org.apache.beam:generate_sequence:v1' + + def setUp(self): + # import script from top-level sdks/python directory + self.sdk_dir = os.path.abspath(dirname(dirname(dirname(__file__)))) + spec = importlib.util.spec_from_file_location( + 'gen_xlang_wrappers', + os.path.join(self.sdk_dir, 'gen_xlang_wrappers.py')) + self.script = importlib.util.module_from_spec(spec) + spec.loader.exec_module(self.script) + args = TestPipeline(is_integration_test=True).get_full_options_as_args() + runner = PipelineOptions(args).get_all_options()['runner'] + if runner and "direct" not in runner.lower(): + self.skipTest( + "It is sufficient to run this test in the DirectRunner " + "test suite only.") + + self.test_dir_name = 'test_gen_script_%d_%s' % ( + int(time.time()), secrets.token_hex(3)) + self.test_dir = os.path.join( + os.path.abspath(dirname(__file__)), self.test_dir_name) + self.service_config_path = os.path.join( + self.test_dir, "test_expansion_service_config.yaml") + self.transform_config_path = os.path.join( + self.test_dir, "test_transform_config.yaml") + os.mkdir(self.test_dir) + + def tearDown(self): + shutil.rmtree(self.test_dir, ignore_errors=False) + + def delete_and_validate(self): + self.script.delete_generated_files(self.test_dir) + self.assertEqual(len(os.listdir(self.test_dir)), 0) + + def test_script_fails_with_invalid_destinations(self): + expansion_service_config = { + "gradle_target": 'sdks:java:io:expansion-service:shadowJar', + 'destinations': { + 'python': 'apache_beam/some_nonexistent_dir' + } + } + with self.assertRaises(ValueError): + self.create_and_check_transforms_config_exists(expansion_service_config) + + def test_pretty_types(self): + types = [ + typing.Optional[typing.List[str]], + numpy.int16, + str, + typing.Dict[str, numpy.float64], + typing.Optional[typing.Dict[str, typing.List[numpy.int64]]], + typing.Dict[int, typing.Optional[str]] + ] + + expected_type_names = [('List[str]', True), ('numpy.int16', False), + ('str', False), ('Dict[str, numpy.float64]', False), + ('Dict[str, List[numpy.int64]]', True), + ('Dict[int, Union[str, NoneType]]', False)] + + for i in range(len(types)): + self.assertEqual( + self.script.pretty_type(types[i]), expected_type_names[i]) + + def create_and_check_transforms_config_exists(self, expansion_service_config): + with open(self.service_config_path, 'w') as f: + yaml.dump([expansion_service_config], f) + + self.script.generate_transforms_config( + self.service_config_path, self.transform_config_path) + self.assertTrue(os.path.exists(self.transform_config_path)) + + def create_and_validate_transforms_config( + self, expansion_service_config, expected_name, expected_destination): + self.create_and_check_transforms_config_exists(expansion_service_config) + + with open(self.transform_config_path) as f: + configs = yaml.safe_load(f) + gen_seq_config = None + for config in configs: + if config['identifier'] == self.GEN_SEQ_IDENTIFIER: + gen_seq_config = config + self.assertIsNotNone(gen_seq_config) + self.assertEqual( + gen_seq_config['default_service'], + expansion_service_config['gradle_target']) + self.assertEqual(gen_seq_config['name'], expected_name) + self.assertEqual( + gen_seq_config['destinations']['python'], expected_destination) + self.assertIn("end", gen_seq_config['fields']) + self.assertIn("start", gen_seq_config['fields']) + self.assertIn("rate", gen_seq_config['fields']) + + def get_module(self, dest): + module_name = dest.replace('apache_beam/', '').replace('/', '_') + module = 'apache_beam.transforms.%s.%s' % (self.test_dir_name, module_name) + return importlib.import_module(module) + + def write_wrappers_to_destinations_and_validate( + self, destinations: typing.List[str]): + """ + Generate wrappers from the config path and validate all destinations are + included. + Then write wrappers to destinations and validate all destination paths + exist. + + :return: Generated wrappers grouped by destination + """ + grouped_wrappers = self.script.get_wrappers_from_transform_configs( + self.transform_config_path) + for dest in destinations: + self.assertIn(dest, grouped_wrappers) + + # write to our test directory to avoid messing with other files + self.script.write_wrappers_to_destinations( + grouped_wrappers, self.test_dir, format_code=False) + + for dest in destinations: + self.assertTrue( + os.path.exists( + os.path.join( + self.test_dir, + dest.replace('apache_beam/', '').replace('/', '_') + ".py"))) + return grouped_wrappers + + def test_script_workflow(self): + expected_destination = 'apache_beam/transforms' + expansion_service_config = { + "gradle_target": 'sdks:java:io:expansion-service:shadowJar', + 'destinations': { + 'python': expected_destination + } + } + + self.create_and_validate_transforms_config( + expansion_service_config, 'GenerateSequence', expected_destination) + grouped_wrappers = self.write_wrappers_to_destinations_and_validate( + [expected_destination]) + # at least the GenerateSequence wrapper is set to this destination + self.assertGreaterEqual(len(grouped_wrappers[expected_destination]), 1) + + # check the wrapper exists in this destination and has correct properties + output_module = self.get_module(expected_destination) + self.assertTrue(hasattr(output_module, 'GenerateSequence')) + # Since our config isn't skipping any transforms, + # it should include these two Kafka IOs as well + self.assertTrue(hasattr(output_module, 'KafkaWrite')) + self.assertTrue(hasattr(output_module, 'KafkaRead')) + self.assertTrue( + isinstance(output_module.GenerateSequence(start=0), ExternalTransform)) + self.assertEqual( + output_module.GenerateSequence.identifier, self.GEN_SEQ_IDENTIFIER) + + self.delete_and_validate() + + def test_script_workflow_with_modified_transforms(self): + modified_name = 'ModifiedSequence' + modified_dest = 'apache_beam/io/gcp' + expansion_service_config = { + "gradle_target": 'sdks:java:io:expansion-service:shadowJar', + 'destinations': { + 'python': 'apache_beam/transforms' + }, + 'transforms': { + 'beam:schematransform:org.apache.beam:generate_sequence:v1': { + 'name': modified_name, + 'destinations': { + 'python': modified_dest + } + } + } + } + + self.create_and_validate_transforms_config( + expansion_service_config, modified_name, modified_dest) + + grouped_wrappers = self.write_wrappers_to_destinations_and_validate( + [modified_dest]) + self.assertIn(modified_name, grouped_wrappers[modified_dest][0]) + self.assertEqual(len(grouped_wrappers[modified_dest]), 1) + + # check the modified wrapper exists in the modified destination + # and check it has the correct properties + output_module = self.get_module(modified_dest) + self.assertTrue( + isinstance(output_module.ModifiedSequence(start=0), ExternalTransform)) + self.assertEqual( + output_module.ModifiedSequence.identifier, self.GEN_SEQ_IDENTIFIER) + + self.delete_and_validate() + + def test_script_workflow_with_skipped_transform(self): + expansion_service_config = { + "gradle_target": 'sdks:java:io:expansion-service:shadowJar', + 'destinations': { + 'python': f'apache_beam/transforms/{self.test_dir_name}' + }, + 'skip_transforms': [ + 'beam:schematransform:org.apache.beam:generate_sequence:v1' + ] + } + + with open(self.service_config_path, 'w') as f: + yaml.dump([expansion_service_config], f) + + self.script.generate_transforms_config( + self.service_config_path, self.transform_config_path) + + # gen sequence shouldn't exist in the transform config + with open(self.transform_config_path) as f: + transforms = yaml.safe_load(f) + gen_seq_config = None + for transform in transforms: + if transform['identifier'] == self.GEN_SEQ_IDENTIFIER: + gen_seq_config = transform + self.assertIsNone(gen_seq_config) + + def test_run_pipeline_with_generated_transform(self): + with beam.Pipeline() as p: + numbers = ( + p | GenerateSequence(start=0, end=10) + | beam.Map(lambda row: row.value)) + assert_that(numbers, equal_to([i for i in range(10)])) + + def test_check_standard_external_transforms_config_in_sync(self): + """ + This test creates a transforms config file and checks it against + `sdks/standard_external_transforms.yaml`. Fails if the two configs don't + match. + + Fix by running `./gradlew generateExternalTransformsConfig` and + committing the changes. + """ + sdks_dir = os.path.abspath(dirname(self.sdk_dir)) + self.script.generate_transforms_config( + os.path.join(sdks_dir, 'standard_expansion_services.yaml'), + self.transform_config_path) + with open(self.transform_config_path) as f: + test_config = yaml.safe_load(f) + with open(os.path.join(sdks_dir, 'standard_external_transforms.yaml'), + 'r') as f: + standard_config = yaml.safe_load(f) + + self.assertEqual( + test_config, + standard_config, + "The standard xlang transforms config file " + "\"standard_external_transforms.yaml\" is out of sync! Please update" + "by running './gradlew generateExternalTransformsConfig'" + "and committing the changes.") + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/transforms/external_transform_provider_test.py b/sdks/python/apache_beam/transforms/external_transform_provider_test.py deleted file mode 100644 index 36fe9b5c4bd6..000000000000 --- a/sdks/python/apache_beam/transforms/external_transform_provider_test.py +++ /dev/null @@ -1,140 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import logging -import os -import unittest - -import pytest - -import apache_beam as beam -from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to -from apache_beam.transforms.external import BeamJarExpansionService -from apache_beam.transforms.external_transform_provider import STANDARD_URN_PATTERN -from apache_beam.transforms.external_transform_provider import ExternalTransformProvider -from apache_beam.transforms.external_transform_provider import camel_case_to_snake_case -from apache_beam.transforms.external_transform_provider import infer_name_from_identifier -from apache_beam.transforms.external_transform_provider import snake_case_to_lower_camel_case -from apache_beam.transforms.external_transform_provider import snake_case_to_upper_camel_case - - -class NameUtilsTest(unittest.TestCase): - def test_snake_case_to_upper_camel_case(self): - test_cases = [("", ""), ("test", "Test"), ("test_name", "TestName"), - ("test_double_underscore", "TestDoubleUnderscore"), - ("TEST_CAPITALIZED", "TestCapitalized"), - ("_prepended_underscore", "PrependedUnderscore"), - ("appended_underscore_", "AppendedUnderscore")] - for case in test_cases: - self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0])) - - def test_snake_case_to_lower_camel_case(self): - test_cases = [("", ""), ("test", "test"), ("test_name", "testName"), - ("test_double_underscore", "testDoubleUnderscore"), - ("TEST_CAPITALIZED", "testCapitalized"), - ("_prepended_underscore", "prependedUnderscore"), - ("appended_underscore_", "appendedUnderscore")] - for case in test_cases: - self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0])) - - def test_camel_case_to_snake_case(self): - test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"), - ("TestDoubleUnderscore", - "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"), - ("BEGINNINGAllCaps", - "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"), - ("AllCapsMIDDLEWord", "all_caps_middle_word"), - ("lowerCamelCase", "lower_camel_case")] - for case in test_cases: - self.assertEqual(case[1], camel_case_to_snake_case(case[0])) - - def test_infer_name_from_identifier(self): - standard_test_cases = [ - ("beam:schematransform:org.apache.beam:transform:v1", "Transform"), - ("beam:schematransform:org.apache.beam:my_transform:v1", - "MyTransform"), ( - "beam:schematransform:org.apache.beam:my_transform:v2", - "MyTransformV2"), - ("beam:schematransform:org.apache.beam:fe_fi_fo_fum:v2", "FeFiFoFumV2"), - ("beam:schematransform:bad_match:my_transform:v1", None) - ] - for case in standard_test_cases: - self.assertEqual( - case[1], infer_name_from_identifier(case[0], STANDARD_URN_PATTERN)) - - custom_pattern_cases = [ - # (, , ) - ( - r"^custom:transform:([\w-]+):(\w+)$", - "custom:transform:my_transform:v1", - "MyTransformV1"), - ( - r"^org.user:([\w-]+):([\w-]+):([\w-]+):external$", - "org.user:some:custom_transform:we_made:external", - "SomeCustomTransformWeMade"), - ( - r"^([\w-]+):user.transforms", - "my_eXTErnal:user.transforms", - "MyExternal"), - (r"^([\w-]+):user.transforms", "my_external:badinput.transforms", None), - ] - for case in custom_pattern_cases: - self.assertEqual(case[2], infer_name_from_identifier(case[1], case[0])) - - -@pytest.mark.uses_io_java_expansion_service -@unittest.skipUnless( - os.environ.get('EXPANSION_PORT'), - "EXPANSION_PORT environment var is not provided.") -class ExternalTransformProviderTest(unittest.TestCase): - def setUp(self): - self.test_pipeline = TestPipeline(is_integration_test=True) - - def test_generate_sequence_config_schema_and_description(self): - provider = ExternalTransformProvider( - BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) - - self.assertTrue(( - 'GenerateSequence', - 'beam:schematransform:org.apache.beam:generate_sequence:v1' - ) in provider.get_available()) - - GenerateSequence = provider.get('GenerateSequence') - config_schema = GenerateSequence.configuration_schema - for param in ['start', 'end', 'rate']: - self.assertTrue(param in config_schema) - - description_substring = ( - "Outputs a PCollection of Beam Rows, each " - "containing a single INT64") - self.assertTrue(description_substring in GenerateSequence.description) - - def test_run_generate_sequence(self): - provider = ExternalTransformProvider( - BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar")) - - with beam.Pipeline() as p: - numbers = p | provider.GenerateSequence( - start=0, end=10) | beam.Map(lambda row: row.value) - - assert_that(numbers, equal_to([i for i in range(10)])) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() diff --git a/sdks/python/apache_beam/transforms/xlang/__init__.py b/sdks/python/apache_beam/transforms/xlang/__init__.py new file mode 100644 index 000000000000..ba896615cbc4 --- /dev/null +++ b/sdks/python/apache_beam/transforms/xlang/__init__.py @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This package contains autogenerated Python wrappers for cross-language +transforms available in other Beam SDKs. + +For documentation on creating and using cross-language transforms, see: +https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms + +For more information on transform wrapper generation, see the top level script: +`gen_xlang_wrappers.py` +""" diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index 7f2bc7f5d423..85b26ca3a464 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -66,6 +66,25 @@ artifacts { distTarBall file: file("${buildDir}/${tarball}"), builtBy: sdist } +tasks.register("generateExternalTransformsConfig") { + description "Discovers external transforms and regenerates the config at sdks/standard_expansion_services.yaml" + + dependsOn buildPython + // Need to build all expansion services listed in sdks/standard_expansion_services.yaml + dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build" + dependsOn ":sdks:java:io:expansion-service:build" + // Keep this in-sync with pyproject.toml + def PyYaml = "'pyyaml>=3.12,<7.0.0'" + + doLast { + exec { + executable 'sh' + args '-c', "pip install $PyYaml && " + + "python gen_xlang_wrappers.py --cleanup --generate-config-only" + } + } +} + // Create Python wheels for given platform and Python version // build identifiers for cibuildwheel def platform_identifiers_map = [ diff --git a/sdks/python/gen_xlang_wrappers.py b/sdks/python/gen_xlang_wrappers.py new file mode 100644 index 000000000000..a75fc05cba73 --- /dev/null +++ b/sdks/python/gen_xlang_wrappers.py @@ -0,0 +1,447 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Generates Python wrappers for external transforms (specifically, +SchemaTransforms) +""" + +import argparse +import datetime +import logging +import os +import shutil +import subprocess +import typing +from typing import Any +from typing import Dict +from typing import List +from typing import Union + +import yaml + +from gen_protos import LICENSE_HEADER +from gen_protos import PROJECT_ROOT +from gen_protos import PYTHON_SDK_ROOT + +SUPPORTED_SDK_DESTINATIONS = ['python'] +PYTHON_SUFFIX = "_et.py" +PY_WRAPPER_OUTPUT_DIR = os.path.join( + PYTHON_SDK_ROOT, 'apache_beam', 'transforms', 'xlang') + + +def generate_transforms_config(input_services, output_file): + """ + Generates a YAML file containing a list of transform configurations. + + Takes an input YAML file containing a list of expansion service gradle + targets. Each service must provide a `destinations` field that specifies the + default package (relative path) that generated wrappers should be imported + to. A default destination package is specified for each SDK, like so:: + + - gradle_target: 'sdks:java:io:expansion-service:shadowJar' + destinations: + python: 'apache_beam/io' + + We use :class:`ExternalTransformProvider` to discover external + transforms. Then, we extract the necessary details of each transform and + compile them into a new YAML file, which is later used to generate wrappers. + + Importing generated transforms to an existing package + ----------------------------------------------------- + When running the script on the config above, a new module will be created at + `apache_beam/transforms/xlang/io.py`. This contains all + generated wrappers that are set to destination 'apache_beam/io'. Finally, + to make these available to the `apache_beam.io` package (or any package + really), just add the following line to the package's `__init__.py` file:: + from apache_beam.transforms.xlang.io import * + + Modifying a transform's name and destination + -------------------------------------------- + Each service may also specify modifications for particular transform. + Currently, one can modify the generated wrapper's **name** and + **destination** package: + - By default, the transform's identifier is used to generate the wrapper + class name. This can be overriden by manually providing a name. + - By default, generated wrappers are made available to the package provided + by their respective expansion service. This can be overridden by + providing a relative path to a different package. + + See the following example for what such modifications can look like:: + + - gradle_target: 'sdks:java:io:expansion-service:shadowJar' + destinations: + python: 'apache_beam/io' + transforms: + 'beam:schematransform:org.apache.beam:my_transform:v1': + name: 'MyCustomTransformName' + destinations: + python: 'apache_beam/io/gcp' + + For the above example, we would take the transform with identifier + `beam:schematransform:org.apache.beam:my_transform:v1` and by default infer + a wrapper class name of `MyTransform` then write it to the module + `apache_beam/transforms/xlang/io.py`. With these modifications + however, we instead use the provided name `MyCustomTransformName` and write + it to `apache_beam/transforms/xlang/io_gcp.py`. + Similar to above, this can be made available by importing it in the + `__init__.py` file like so:: + from apache_beam.transforms.xlang.io_gcp import * + + Skipping transforms + ------------------- + To skip a particular transform, simply list its identifier in the + `skip_transforms` field, like so:: + + - gradle_target: 'sdks:java:io:expansion-service:shadowJar' + destinations: + python: 'apache_beam/io' + skip_transforms: + - 'beam:schematransform:org.apache.beam:some_transform:v1' + """ + from apache_beam.transforms.external import BeamJarExpansionService + from apache_beam.transforms.external_transform_provider import ExternalTransform + from apache_beam.transforms.external_transform_provider import ExternalTransformProvider + + transform_list: List[Dict[str, Any]] = [] + + with open(input_services) as f: + services = yaml.safe_load(f) + for service in services: + target = service['gradle_target'] + + if "destinations" not in service: + raise ValueError( + f"Expansion service with target '{target}' does not " + "specify any default destinations.") + service_destinations: Dict[str, str] = service['destinations'] + for sdk, dest in service_destinations.items(): + validate_sdks_destinations(sdk, dest, target) + + transforms_to_skip = service.get('skip_transforms', []) + + # use dynamic provider to discover and populate wrapper details + provider = ExternalTransformProvider(BeamJarExpansionService(target)) + discovered: Dict[str, ExternalTransform] = provider.get_all() + for identifier, wrapper in discovered.items(): + if identifier in transforms_to_skip: + continue + + transform_destinations = service_destinations.copy() + + # apply any modifications + modified_transform = {} + if 'transforms' in service and identifier in service['transforms']: + modified_transform = service['transforms'][identifier] + for sdk, dest in modified_transform.get('destinations', {}).items(): + validate_sdks_destinations(sdk, dest, target, identifier) + transform_destinations[sdk] = dest # override the destination + name = modified_transform.get('name', wrapper.__name__) + + fields = {} + for param in wrapper.configuration_schema.values(): + (tp, nullable) = pretty_type(param.type) + field_info = { + 'type': str(tp), + 'description': param.description, + 'nullable': nullable + } + fields[param.original_name] = field_info + + transform = { + 'identifier': identifier, + 'name': name, + 'destinations': transform_destinations, + 'default_service': target, + 'fields': fields, + 'description': wrapper.description + } + transform_list.append(transform) + + with open(output_file, 'w') as f: + f.write(LICENSE_HEADER.lstrip()) + f.write( + "# NOTE: This file is autogenerated and should " + "not be edited by hand.\n") + f.write( + "# Configs are generated based on the expansion service\n" + f"# configuration in {input_services.replace(PROJECT_ROOT, '')}.\n") + f.write("# Refer to gen_xlang_wrappers.py for more info.\n") + dt = datetime.datetime.now().date() + f.write(f"#\n# Last updated on: {dt}\n\n") + yaml.dump(transform_list, f) + logging.info("Successfully wrote transform configs to file: %s", output_file) + + +def validate_sdks_destinations(sdk, dest, service, identifier=None): + if identifier: + message = f"Identifier '{identifier}'" + else: + message = f"Service '{service}'" + if sdk not in SUPPORTED_SDK_DESTINATIONS: + raise ValueError( + message + " specifies a destination for an invalid SDK:" + f" '{sdk}'. The supported SDKs are {SUPPORTED_SDK_DESTINATIONS}") + if not os.path.isdir(os.path.join(PYTHON_SDK_ROOT, *dest.split('/'))): + raise ValueError( + message + f" specifies an invalid destination '{dest}'." + " Please make sure the destination is an existing directory.") + + +def pretty_type(tp): + """ + Takes a type and returns a tuple containing a pretty string representing it + and a bool signifying if it is nullable or not. + + For optional types, the contained type is unwrapped and returned. This does + not recurse however, so inner Optional types are not affected. + E.g. the input typing.Optional[typing.Dict[int, typing.Optional[str]]] will + return (Dict[int, Union[str, NoneType]], True) + """ + nullable = False + if (typing.get_origin(tp) is Union and type(None) in typing.get_args(tp)): + nullable = True + # only unwrap if it's a single nullable type. if the type is truly a union + # of multiple types, leave it alone. + args = typing.get_args(tp) + if len(args) == 2: + tp = list(filter(lambda t: not isinstance(t, type(None)), args))[0] + + # TODO(ahmedabu98): Make this more generic to support other remote SDKs + # Potentially use Runner API types + if tp.__module__ == 'builtins': + tp = tp.__name__ + elif tp.__module__ == 'typing': + tp = str(tp).replace("typing.", "") + elif tp.__module__ == 'numpy': + tp = "%s.%s" % (tp.__module__, tp.__name__) + + return (tp, nullable) + + +def camel_case_to_snake_case(string): + """Convert camelCase to snake_case""" + arr = [] + word = [] + for i, n in enumerate(string): + # If seeing an upper letter after a lower letter, we just witnessed a word + # If seeing an upper letter and the next letter is lower, we may have just + # witnessed an all caps word + if n.isupper() and ((i > 0 and string[i - 1].islower()) or + (i + 1 < len(string) and string[i + 1].islower())): + arr.append(''.join(word)) + word = [n.lower()] + else: + word.append(n.lower()) + arr.append(''.join(word)) + return '_'.join(arr).strip('_') + + +def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]: + """ + Generates code for external transform wrapper classes (subclasses of + :class:`ExternalTransform`). + + Takes a YAML file containing a list of SchemaTransform configurations. For + each configuration, the code for a wrapper class is generated, along with any + documentation that may be included. + + Each configuration must include a destination file that the generated class + will be written to. + + Returns the generated classes, grouped by destination. + """ + from jinja2 import Environment + from jinja2 import FileSystemLoader + + env = Environment(loader=FileSystemLoader(PYTHON_SDK_ROOT)) + python_wrapper_template = env.get_template("python_xlang_wrapper.template") + + # maintain a list of wrappers to write in each file. if modified destinations + # are used, we may end up with multiple wrappers in one file. + destinations: Dict[str, List[str]] = {} + + with open(config_file) as f: + transforms = yaml.safe_load(f) + for config in transforms: + default_service = config['default_service'] + description = config['description'] + destination = config['destinations']['python'] + name = config['name'] + fields = config['fields'] + identifier = config['identifier'] + + parameters = [] + for param, info in fields.items(): + pythonic_name = camel_case_to_snake_case(param) + param_details = { + "name": pythonic_name, + "type": info['type'], + "description": info['description'], + } + + if info['nullable']: + param_details["default"] = None + parameters.append(param_details) + + # Python syntax requires function definitions to have + # non-default parameters first + parameters = sorted(parameters, key=lambda p: 'default' in p) + default_service = f"BeamJarExpansionService(\"{default_service}\")" + + python_wrapper_class = python_wrapper_template.render( + class_name=name, + identifier=identifier, + parameters=parameters, + description=description, + default_expansion_service=default_service) + + if destination not in destinations: + destinations[destination] = [] + destinations[destination].append(python_wrapper_class) + + return destinations + + +def write_wrappers_to_destinations( + grouped_wrappers: Dict[str, List[str]], + output_dir=PY_WRAPPER_OUTPUT_DIR, + format_code=True): + """ + Takes a dictionary of generated wrapper code, grouped by destination. + For each destination, create a new file containing the respective wrapper + classes. Each file includes the Apache License header and relevant imports. + Note: the Jinja template should already follow linting and formatting rules. + """ + written_files = [] + for dest, wrappers in grouped_wrappers.items(): + module_name = dest.replace('apache_beam/', '').replace('/', '_') + module_path = os.path.join(output_dir, module_name) + ".py" + with open(module_path, "w") as file: + file.write(LICENSE_HEADER.lstrip()) + file.write( + "\n# NOTE: This file contains autogenerated external transform(s)\n" + "# and should not be edited by hand.\n" + "# Refer to gen_xlang_wrappers.py for more info.\n\n") + file.write( + "\"\"\"" + "Cross-language transforms in this module can be imported from the\n" + f":py:mod:`{dest.replace('/', '.')}` package." + "\"\"\"\n\n") + file.write( + "# pylint:disable=line-too-long\n\n" + "from apache_beam.transforms.external import " + "BeamJarExpansionService\n" + "from apache_beam.transforms.external_transform_provider " + "import ExternalTransform\n") + for wrapper in wrappers: + file.write(wrapper + "\n") + written_files.append(module_path) + + logging.info("Created external transform wrapper modules: %s", written_files) + + if format_code: + formatting_cmd = ['yapf', '--in-place', *written_files] + subprocess.run(formatting_cmd, capture_output=True, check=True) + + +def delete_generated_files(root_dir): + """Scans for and deletes generated wrapper files.""" + logging.info("Deleting external transform wrappers from dir %s", root_dir) + deleted_files = os.listdir(root_dir) + for file in deleted_files: + if file == '__init__.py': + deleted_files.remove(file) + continue + path = os.path.join(root_dir, file) + if os.path.isfile(path) or os.path.islink(path): + os.unlink(os.path.join(root_dir, file)) + else: + shutil.rmtree(path) + logging.info("Successfully deleted files: %s", deleted_files) + + +def run_script( + cleanup, + generate_config_only, + input_expansion_services, + transforms_config_source): + # Cleanup first if requested. This is needed to remove outdated wrappers. + if cleanup: + delete_generated_files(PY_WRAPPER_OUTPUT_DIR) + + # This step requires the expansion service. + # Only generate a transforms config file if none are provided + if not transforms_config_source: + output_transforms_config = os.path.join( + PROJECT_ROOT, 'sdks', 'standard_external_transforms.yaml') + generate_transforms_config( + input_services=input_expansion_services, + output_file=output_transforms_config) + + transforms_config_source = output_transforms_config + else: + if not os.path.exists(transforms_config_source): + raise RuntimeError( + "Could not find the provided transforms config " + f"source: {transforms_config_source}") + + if generate_config_only: + return + + wrappers_grouped_by_destination = get_wrappers_from_transform_configs( + transforms_config_source) + + write_wrappers_to_destinations(wrappers_grouped_by_destination) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '--cleanup', + dest='cleanup', + action='store_true', + help="Whether to cleanup existing generated wrappers first.") + parser.add_argument( + '--generate-config-only', + dest='generate_config_only', + action='store_true', + help="If set, will generate the transform config only without generating" + "any wrappers.") + parser.add_argument( + '--input-expansion-services', + dest='input_expansion_services', + default=os.path.join( + PROJECT_ROOT, 'sdks', 'standard_expansion_services.yaml'), + help=( + "Absolute path to the input YAML file that contains " + "expansion service configs. Ignored if a transforms config" + "source is provided.")) + parser.add_argument( + '--transforms-config-source', + dest='transforms_config_source', + help=( + "Absolute path to a source transforms config YAML file to " + "generate wrapper modules from. If not provided, one will be " + "created by this script.")) + args = parser.parse_args() + + run_script( + args.cleanup, + args.generate_config_only, + args.input_expansion_services, + args.transforms_config_source) diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index f1a65c842d53..9829671ccb72 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -29,6 +29,12 @@ requires = [ "numpy>=1.14.3,<1.27", # Update setup.py as well. # having cython here will create wheels that are platform dependent. "cython==0.29.36", + ## deps for generating external transform wrappers: + # also update PyYaml bounds in sdks:python:generateExternalTransformsConfig + 'pyyaml>=3.12,<7.0.0', + # also update Jinja2 bounds in test-suites/xlang/build.gradle (look for xlangWrapperValidation task) + "jinja2>=2.7.1,<4.0.0", + 'yapf==0.29.0' ] diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini index c95aa5974da7..e78697169bb0 100644 --- a/sdks/python/pytest.ini +++ b/sdks/python/pytest.ini @@ -34,6 +34,7 @@ markers = uses_java_expansion_service: collect Cross Language Java transforms test runs uses_python_expansion_service: collect Cross Language Python transforms test runs uses_io_java_expansion_service: collect Cross Language IO Java transform test runs (with Kafka bootstrap server) + xlang_wrapper_generation: collect tests that validate Cross Language wrapper generation uses_transform_service: collect Cross Language test runs that uses the Transform Service xlang_sql_expansion_service: collect for Cross Language with SQL expansion service test runs it_postcommit: collect for post-commit integration test runs diff --git a/sdks/python/python_xlang_wrapper.template b/sdks/python/python_xlang_wrapper.template new file mode 100644 index 000000000000..f3d3728aaceb --- /dev/null +++ b/sdks/python/python_xlang_wrapper.template @@ -0,0 +1,36 @@ +{# + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +#} + +class {{ class_name }}(ExternalTransform): +{% if description %} """ + {{ description | wordwrap(78) | replace('\n', '\n ') }} + """{% endif %} + identifier = "{{ identifier }}" + + def __init__( + self,{% if parameters %}{% for param in parameters%} + {{ param.name }}{% if 'default' in param %}={{ param.default }}{% endif %},{% endfor %}{% endif %} + expansion_service=None): + {% if parameters %}"""{% for param in parameters %} + :param {{ param.name }}: ({{ param.type }}){% if param.description %} + {{ param.description | wordwrap(72) | replace('\n', '\n ') }} {% endif %}{% endfor %} + """{% endif %} + self.default_expansion_service = {{ default_expansion_service }} + super().__init__( + {% for param in parameters %}{{ param.name }}={{ param.name }}, + {% endfor %}expansion_service=expansion_service) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index a5a14c035dc2..409951cbc415 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -16,8 +16,8 @@ # """Apache Beam SDK for Python setup file.""" - import glob +import logging import os import shutil import subprocess @@ -227,6 +227,50 @@ def copy_tests_from_docs(): f'Could not locate yaml docs in {docs_src} or {docs_dest}.') +def generate_external_transform_wrappers(): + try: + sdk_dir = os.path.abspath(os.path.dirname(__file__)) + script_exists = os.path.exists( + os.path.join(sdk_dir, 'gen_xlang_wrappers.py')) + config_exists = os.path.exists( + os.path.join(os.path.dirname(sdk_dir), + 'standard_external_transforms.yaml')) + # we need both the script and the standard transforms config file. + # at build time, we don't have access to apache_beam to discover and + # retrieve external transforms, so the config file has to already exist + if not script_exists or not config_exists: + generated_transforms_dir = os.path.join( + sdk_dir, 'apache_beam', 'transforms', 'xlang') + + # if exists, this directory will have at least its __init__.py file + if (not os.path.exists(generated_transforms_dir) or + len(os.listdir(generated_transforms_dir)) <= 1): + message = 'External transform wrappers have not been generated ' + if not script_exists: + message += 'and the generation script `gen_xlang_wrappers.py`' + if not config_exists: + message += 'and the standard external transforms config' + message += ' could not be found' + raise RuntimeError(message) + else: + logging.info( + 'Skipping external transform wrapper generation as they ' + 'are already generated.') + return + subprocess.run([ + sys.executable, + os.path.join(sdk_dir, 'gen_xlang_wrappers.py'), + '--cleanup', + '--transforms-config-source', + os.path.join(os.path.dirname(sdk_dir), + 'standard_external_transforms.yaml') + ], capture_output=True, check=True) + except subprocess.CalledProcessError as err: + raise RuntimeError( + 'Could not generate external transform wrappers due to ' + 'error: %s', err.stderr) + + def get_portability_package_data(): files = [] portability_dir = Path(__file__).parent / 'apache_beam' / \ @@ -253,6 +297,8 @@ def get_portability_package_data(): # executes below. generate_protos_first() + generate_external_transform_wrappers() + # These data files live elsewhere in the full Beam repository. copy_tests_from_docs() @@ -385,7 +431,6 @@ def get_portability_package_data(): 'testcontainers[mysql]>=3.0.3,<4.0.0', 'cryptography>=41.0.2', 'hypothesis>5.0.0,<=7.0.0', - 'pyyaml>=3.12,<7.0.0', ], 'gcp': [ 'cachetools>=3.1.0,<6', diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index d29d85af3011..cadf3a6ae2c6 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -536,10 +536,8 @@ def dataflowRegion = project.findProperty('dataflowRegion') ?: 'us-central1' project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata -> createCrossLanguageUsingJavaExpansionTask( name: taskMetadata.name, - expansionProjectPath: taskMetadata.expansionProjectPath, + expansionProjectPaths: taskMetadata.expansionProjectPaths, collectMarker: taskMetadata.collectMarker, - startJobServer: taskMetadata.startJobServer, - cleanupJobServer: taskMetadata.cleanupJobServer, pythonPipelineOptions: [ "--runner=TestDataflowRunner", "--project=${dataflowProject}", @@ -548,6 +546,7 @@ project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata -> "--sdk_harness_container_image_overrides=.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest" ], pytestOptions: basicPytestOpts, + additionalDeps: taskMetadata.additionalDeps, additionalEnvs: taskMetadata.additionalEnvs ) } diff --git a/sdks/python/test-suites/direct/build.gradle b/sdks/python/test-suites/direct/build.gradle index ea643c3303aa..4b1025343985 100644 --- a/sdks/python/test-suites/direct/build.gradle +++ b/sdks/python/test-suites/direct/build.gradle @@ -42,3 +42,10 @@ task ioCrossLanguagePostCommit { dependsOn.add(":sdks:python:test-suites:direct:py${getVersionSuffix(it)}:ioCrossLanguagePythonUsingJava") } } + +task crossLanguageWrapperValidationPreCommit { + // Different python versions may output types that look different and lead to + // false failures. To be consistent, we test on the lowest version only + def lowestSupportedVersion = getVersionsAsList('cross_language_validates_py_versions')[0] + dependsOn.add(":sdks:python:test-suites:direct:py${getVersionSuffix(lowestSupportedVersion)}:xlangWrapperValidationPythonUsingJava") +} diff --git a/sdks/python/test-suites/direct/common.gradle b/sdks/python/test-suites/direct/common.gradle index 657f7adf801d..b5680c2e1e9a 100644 --- a/sdks/python/test-suites/direct/common.gradle +++ b/sdks/python/test-suites/direct/common.gradle @@ -412,10 +412,8 @@ def gcpProject = project.findProperty('dataflowProject') ?: 'apache-beam-testing project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata -> createCrossLanguageUsingJavaExpansionTask( name: taskMetadata.name, - expansionProjectPath: taskMetadata.expansionProjectPath, + expansionProjectPaths: taskMetadata.expansionProjectPaths, collectMarker: taskMetadata.collectMarker, - startJobServer: taskMetadata.startJobServer, - cleanupJobServer: taskMetadata.cleanupJobServer, numParallelTests: 1, pythonPipelineOptions: [ "--runner=TestDirectRunner", @@ -426,6 +424,8 @@ project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata -> "--timeout=4500", // timeout of whole command execution "--color=yes", // console color "--log-cli-level=INFO" //log level info - ] + ], + additionalDeps: taskMetadata.additionalDeps, + additionalEnvs: taskMetadata.additionalEnvs ) } diff --git a/sdks/python/test-suites/xlang/build.gradle b/sdks/python/test-suites/xlang/build.gradle index 5a124ac20ce2..3065ad8377e3 100644 --- a/sdks/python/test-suites/xlang/build.gradle +++ b/sdks/python/test-suites/xlang/build.gradle @@ -16,57 +16,43 @@ * limitations under the License. */ // This is a base file to set up cross language tests for different runners -import org.apache.beam.gradle.BeamModulePlugin -import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon +import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTask project.evaluationDependsOn(":sdks:python") -// Set up cross language tests -def envDir = project.project(":sdks:python").envdir -def jobPort = BeamModulePlugin.getRandomPort() -def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp" -def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid" - -def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) { - dependsOn ':sdks:python:installGcpTest' - - executable 'sh' - args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --job_port ${jobPort} --pid_file ${pidFile} --background --stdout_file ${tmpDir}/beam-fnapi-job-server.log" -} - -def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) { - executable 'sh' - args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} --stop" -} - -// List of objects representing task metadata to create cross-language tasks from. -// Each object contains the minimum relevant metadata. -def xlangTasks = [] - // ******** Java GCP expansion service ******** // Note: this only runs cross-language tests that use the Java GCP expansion service -// To run tests that use another expansion service, create a new CrossLanguageTaskCommon with the +// To run tests that use another expansion service, create a new CrossLanguageTask with the // relevant fields as done here, then add it to `xlangTasks`. -def gcpExpansionProject = project.project(':sdks:java:io:google-cloud-platform:expansion-service') +def gcpExpansionPath = project.project(':sdks:java:io:google-cloud-platform:expansion-service').getPath() +def ioExpansionPath = project.project(':sdks:java:io:expansion-service').getPath() // Properties that are common across runners. // Used to launch the expansion service, collect the right tests, and cleanup afterwards -def gcpXlangCommon = new CrossLanguageTaskCommon().tap { +def gcpXlang = new CrossLanguageTask().tap { name = "gcpCrossLanguage" - expansionProjectPath = gcpExpansionProject.getPath() + expansionProjectPaths = [gcpExpansionPath] collectMarker = "uses_gcp_java_expansion_service" - startJobServer = setupTask - cleanupJobServer = cleanupTask } -def ioXlangCommon = new CrossLanguageTaskCommon().tap { +def ioXlang = new CrossLanguageTask().tap { name = "ioCrossLanguage" - expansionProjectPath = project.project(':sdks:java:io:expansion-service').getPath() + expansionProjectPaths = [ioExpansionPath] collectMarker = "uses_io_java_expansion_service" - startJobServer = setupTask - cleanupJobServer = cleanupTask //See .test-infra/kafka/bitnami/README.md for setup instructions additionalEnvs = ["KAFKA_BOOTSTRAP_SERVER":project.findProperty('kafkaBootstrapServer')] } -xlangTasks.addAll(gcpXlangCommon, ioXlangCommon) +// This list should include all expansion service targets in sdks/python/standard_expansion_services.yaml +def servicesToGenerateFrom = [ioExpansionPath, gcpExpansionPath] +def xlangWrapperValidation = new CrossLanguageTask().tap { + name = "xlangWrapperValidation" + expansionProjectPaths = servicesToGenerateFrom + collectMarker = "xlang_wrapper_generation" + // update Jinja2 bounds in pyproject.toml as well + additionalDeps = ['\"Jinja2>=2.7.1,<4.0.0\"'] +} + +// List of task metadata objects to create cross-language tasks from. +// Each object contains the minimum relevant metadata. +def xlangTasks = [gcpXlang, ioXlang, xlangWrapperValidation] ext.xlangTasks = xlangTasks \ No newline at end of file diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index f8e2b63a5091..ca35c383eea1 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -220,7 +220,7 @@ commands = # --azure_managed_identity_client_id "abc123" [testenv:py3-yapf] -# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml +# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml and pyproject.toml deps = yapf==0.29.0 commands = @@ -228,7 +228,7 @@ commands = time yapf --in-place --parallel --recursive apache_beam [testenv:py3-yapf-check] -# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml +# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml and pyproject.toml deps = yapf==0.29.0 commands = diff --git a/sdks/standard_expansion_services.yaml b/sdks/standard_expansion_services.yaml new file mode 100644 index 000000000000..e9e6871be82f --- /dev/null +++ b/sdks/standard_expansion_services.yaml @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This file enumerates the standard Apache Beam expansion services. +# Each service must specify a package destination for each supported SDK, which +# is where generated wrappers will go by default. +# +# Individual transforms can modify their destination module as well as their +# generated wrapper class name. +# +# Transform identifiers listed in the `skip_transforms` field will be skipped. +# +# Any new gradle targets added here should also be added to: +# - sdks/python/build.gradle (as a dependency in the 'generateExternalTransformsConfig' task) +# - sdks/python/test-suites/xlang/build.gradle (look for 'servicesToGenerateFrom') +# +# Refer to sdks/python/gen_xlang_wrappers.py for more info. + +- gradle_target: 'sdks:java:io:expansion-service:shadowJar' + destinations: + python: 'apache_beam/io' + transforms: + 'beam:schematransform:org.apache.beam:kafka_write:v1': + name: 'WriteToKafka' + 'beam:schematransform:org.apache.beam:kafka_read:v1': + name: 'ReadFromKafka' + skip_transforms: + # Handwritten Kafka wrappers already exist in apache_beam/io/kafka.py + - 'beam:schematransform:org.apache.beam:kafka_write:v1' + - 'beam:schematransform:org.apache.beam:kafka_read:v1' + +# TODO(ahmedabu98): Enable this service in a future PR +#- gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' +# destinations: +# python: 'apache_beam/io/gcp' +# transforms: +# 'beam:schematransform:org.apache.beam:spanner_cdc_read:v1': +# name: 'ReadFromSpannerChangeStreams' +# skip_transforms: +# # generate_sequence is already included in the Java IO expansion service +# - 'beam:schematransform:org.apache.beam:generate_sequence:v1' +# # Handwritten wrappers exist in apache_beam/io/gcp/pubsublite/ +# - 'beam:schematransform:org.apache.beam:pubsublite_read:v1' +# - 'beam:schematransform:org.apache.beam:pubsublite_write:v1' +# # Handwritten wrapper exists in apache_beam/io/gcp/spanner.py +# - 'beam:schematransform:org.apache.beam:spanner_write:v1' +# # Native IO exists in apache_beam/io/gcp/pubsub.py +# - 'beam:schematransform:org.apache.beam:pubsub_read:v1' +# - 'beam:schematransform:org.apache.beam:pubsub_write:v1' +# # Native IO exists in apache_beam/io/gcp/bigquery.py +# - 'beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1' +# - 'beam:schematransform:org.apache.beam:bigquery_export_read:v1' +# - 'beam:schematransform:org.apache.beam:bigquery_storage_write:v2' +# - 'beam:schematransform:org.apache.beam:bigquery_storage_read:v1' +## - 'beam:schematransform:org.apache.beam:bigquery_storage_write:v2' +# # Handwritten wrappers exists in apache_beam/io/jdbc.py +# - 'beam:schematransform:org.apache.beam:jdbc_write:v1' +# - 'beam:schematransform:org.apache.beam:jdbc_read:v1' +# # Handwritten wrappers exist in apache_beam/io/gcp/bigtableio.py +# - 'beam:schematransform:org.apache.beam:bigtable_write:v1' +# - 'beam:schematransform:org.apache.beam:bigtable_read:v1' diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml new file mode 100644 index 000000000000..b43e93ab4919 --- /dev/null +++ b/sdks/standard_external_transforms.yaml @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# NOTE: This file is autogenerated and should not be edited by hand. +# Configs are generated based on the expansion service +# configuration in /sdks/standard_expansion_services.yaml. +# Refer to gen_xlang_wrappers.py for more info. +# +# Last updated on: 2024-02-22 + +- default_service: sdks:java:io:expansion-service:shadowJar + description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 + number called "value". The count is produced from the given "start" value and + either up to the given "end" or until 2^63 - 1. + + To produce an unbounded PCollection, simply do not specify an "end" value. Unbounded + sequences can specify a "rate" for output elements. + + In all cases, the sequence of numbers is generated in parallel, so there is no + inherent ordering between the generated values' + destinations: + python: apache_beam/io + fields: + end: + description: The maximum number to generate (exclusive). Will be an unbounded + sequence if left unspecified. + nullable: true + type: numpy.int64 + rate: + description: Specifies the rate to generate a given number of elements per a + given number of seconds. Applicable only to unbounded sequences. + nullable: true + type: Row(seconds=typing.Union[numpy.int64, NoneType], elements=) + start: + description: The minimum number to generate (inclusive). + nullable: false + type: numpy.int64 + identifier: beam:schematransform:org.apache.beam:generate_sequence:v1 + name: GenerateSequence