From 06c76b4e930d76c6e38a9a78fce3ccd223de4cbd Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Tue, 12 Mar 2024 17:00:30 -0700 Subject: [PATCH] Updates the test suite to use the transform service (#30605) * Updates the test suite to use the transform service * Trigger the test suite * Fix formatting * Addressing reviewer comments --- ...am_PostCommit_TransformService_Direct.json | 3 +++ .../apache_beam/io/gcp/bigtableio_it_test.py | 16 +++++++++++-- .../expansion-service-container/boot.go | 24 +++++++++++++++---- 3 files changed, 36 insertions(+), 7 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_TransformService_Direct.json diff --git a/.github/trigger_files/beam_PostCommit_TransformService_Direct.json b/.github/trigger_files/beam_PostCommit_TransformService_Direct.json new file mode 100644 index 000000000000..c4edaa85a89d --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_TransformService_Direct.json @@ -0,0 +1,3 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run" +} diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index 8c0ca29c4101..2445fdd5fcc0 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -93,6 +93,11 @@ def setUp(self): self.table = self.instance.table(self.TABLE_ID) self.table.create() _LOGGER.info("Created table [%s]", self.table.table_id) + if (os.environ.get('TRANSFORM_SERVICE_PORT')): + self._transform_service_address = ( + 'localhost:' + os.environ.get('TRANSFORM_SERVICE_PORT')) + else: + self._transform_service_address = None def tearDown(self): try: @@ -142,7 +147,8 @@ def test_read_xlang(self): | bigtableio.ReadFromBigtable( project_id=self.project, instance_id=self.instance.instance_id, - table_id=self.table.table_id) + table_id=self.table.table_id, + expansion_service=self._transform_service_address) | "Extract cells" >> beam.Map(lambda row: row._cells)) assert_that(cells, equal_to(expected_cells)) @@ -190,6 +196,11 @@ def setUp(self): (self.TABLE_ID, str(int(time.time())), secrets.token_hex(3))) self.table.create() _LOGGER.info("Created table [%s]", self.table.table_id) + if (os.environ.get('TRANSFORM_SERVICE_PORT')): + self._transform_service_address = ( + 'localhost:' + os.environ.get('TRANSFORM_SERVICE_PORT')) + else: + self._transform_service_address = None def tearDown(self): try: @@ -216,7 +227,8 @@ def run_pipeline(self, rows): project_id=self.project, instance_id=self.instance.instance_id, table_id=self.table.table_id, - use_cross_language=True)) + use_cross_language=True, + expansion_service=self._transform_service_address)) def test_set_mutation(self): row1: DirectRow = DirectRow('key-1') diff --git a/sdks/python/expansion-service-container/boot.go b/sdks/python/expansion-service-container/boot.go index ba56b349c4ea..a04b176f5d8c 100644 --- a/sdks/python/expansion-service-container/boot.go +++ b/sdks/python/expansion-service-container/boot.go @@ -97,8 +97,8 @@ func installExtraPackages(requirementsFile string) error { return nil } -func getUpdatedRequirementsFile(oldRequirementsFileName string, dependenciesDir string) (string, error) { - oldExtraPackages, err := getLines(filepath.Join(dependenciesDir, oldRequirementsFileName)) +func getUpdatedRequirementsFile(oldDependenciesRequirementsFile string, dependenciesDir string) (string, error) { + oldExtraPackages, err := getLines(oldDependenciesRequirementsFile) if err != nil { return "", err } @@ -145,9 +145,20 @@ func launchExpansionServiceProcess() error { args := []string{"-m", expansionServiceEntrypoint, "-p", strconv.Itoa(*port), "--fully_qualified_name_glob", "*"} - if *requirements_file != "" { - log.Printf("Received the requirements file %v", *requirements_file) - updatedRequirementsFileName, err := getUpdatedRequirementsFile(*requirements_file, *dependencies_dir) + // Requirements file with dependencies to install. + // Note that we have to look for the requirements file in the dependencies + // volume here not the requirements file at the top level. Latter provides + // Beam dependencies. + dependencies_requirements_file := filepath.Join(*dependencies_dir, *requirements_file) + dependencies_requirements_file_exists := false + if _, err := os.Stat(dependencies_requirements_file); err == nil { + dependencies_requirements_file_exists = true + } + + // We only try to install dependencies, if the requirements file exists. + if dependencies_requirements_file_exists { + log.Printf("Received the requirements file %s with extra packages.", dependencies_requirements_file) + updatedRequirementsFileName, err := getUpdatedRequirementsFile(dependencies_requirements_file, *dependencies_dir) if err != nil { return err } @@ -161,7 +172,10 @@ func launchExpansionServiceProcess() error { if err != nil { return err } + } else { + log.Printf("Requirements file %s was provided but not available.", dependencies_requirements_file) } + if err := execx.Execute(pythonVersion, args...); err != nil { return fmt.Errorf("could not start the expansion service: %s", err) }