From 6edfcd453c7d9e502890262b0e1dc974f34a3522 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Mon, 18 Nov 2024 12:13:31 -0500 Subject: [PATCH] Revert "Enable Dataflow managed service for Python tests (#33134)" (#33154) This reverts commit de6965aea6e1a6cc1c35875b7d9cbc097acfb655. --- ...m_PostCommit_Python_Xlang_IO_Dataflow.json | 2 +- ...eam_PostCommit_Python_Xlang_IO_Direct.json | 2 +- .../python/apache_beam/transforms/external.py | 1 - .../transforms/managed_iceberg_it_test.py | 30 +++++++++++-------- 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json index c537844dc84a..b26833333238 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 3 + "modification": 2 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index b26833333238..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 1 } diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index cdf5d874d7fa..83c439ca8ddd 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -923,7 +923,6 @@ def _normalize(coder_proto): for tag, pcoll in self._expanded_transform.outputs.items() }, - annotations=self._expanded_transform.annotations, environment_id=self._expanded_transform.environment_id) diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index a09203f313eb..0dfa2aa19c51 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -16,13 +16,15 @@ # import os +import secrets +import shutil +import tempfile import time import unittest import pytest import apache_beam as beam -from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -33,15 +35,17 @@ "EXPANSION_JARS environment var is not provided, " "indicating that jars have not been built") class ManagedIcebergIT(unittest.TestCase): - WAREHOUSE = "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java" - def setUp(self): - self.test_pipeline = TestPipeline(is_integration_test=True) - self.args = self.test_pipeline.get_full_options_as_args() - self.args.extend([ - '--experiments=enable_managed_transforms', - '--dataflow_endpoint=https://dataflow-staging.sandbox.googleapis.com', - ]) + self._tempdir = tempfile.mkdtemp() + if not os.path.exists(self._tempdir): + os.mkdir(self._tempdir) + test_warehouse_name = 'test_warehouse_%d_%s' % ( + int(time.time()), secrets.token_hex(3)) + self.warehouse_path = os.path.join(self._tempdir, test_warehouse_name) + os.mkdir(self.warehouse_path) + + def tearDown(self): + shutil.rmtree(self._tempdir, ignore_errors=False) def _create_row(self, num: int): return beam.Row( @@ -53,24 +57,24 @@ def _create_row(self, num: int): def test_write_read_pipeline(self): iceberg_config = { - "table": "test_iceberg_write_read.test_" + str(int(time.time())), + "table": "test.write_read", "catalog_name": "default", "catalog_properties": { "type": "hadoop", - "warehouse": self.WAREHOUSE, + "warehouse": f"file://{self.warehouse_path}", } } rows = [self._create_row(i) for i in range(100)] expected_dicts = [row.as_dict() for row in rows] - with beam.Pipeline(argv=self.args) as write_pipeline: + with beam.Pipeline() as write_pipeline: _ = ( write_pipeline | beam.Create(rows) | beam.managed.Write(beam.managed.ICEBERG, config=iceberg_config)) - with beam.Pipeline(argv=self.args) as read_pipeline: + with beam.Pipeline() as read_pipeline: output_dicts = ( read_pipeline | beam.managed.Read(beam.managed.ICEBERG, config=iceberg_config)