diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py index ae08cdaf54cd..91d237c7de96 100644 --- a/sdks/python/apache_beam/dataframe/expressions.py +++ b/sdks/python/apache_beam/dataframe/expressions.py @@ -365,8 +365,10 @@ def __init__( self._preserves_partition_by = preserves_partition_by def placeholders(self): - return frozenset.union( - frozenset(), *[arg.placeholders() for arg in self.args()]) + if not hasattr(self, '_placeholders'): + self._placeholders = frozenset.union( + frozenset(), *[arg.placeholders() for arg in self.args()]) + return self._placeholders def args(self): return self._args diff --git a/sdks/python/apache_beam/dataframe/io_test.py b/sdks/python/apache_beam/dataframe/io_test.py index 782dac53e2c6..92bb10225c78 100644 --- a/sdks/python/apache_beam/dataframe/io_test.py +++ b/sdks/python/apache_beam/dataframe/io_test.py @@ -117,6 +117,15 @@ def test_read_write_csv(self): self.assertCountEqual(['a,b,c', '1,2,3', '3,4,7'], set(self.read_all_lines(output + 'out.csv*'))) + def test_wide_csv_with_dtypes(self): + # Verify https://github.com/apache/beam/issues/31152 is resolved. + cols = ','.join(f'col{ix}' for ix in range(123)) + data = ','.join(str(ix) for ix in range(123)) + input = self.temp_dir({'tmp.csv': f'{cols}\n{data}'}) + with beam.Pipeline() as p: + pcoll = p | beam.io.ReadFromCsv(f'{input}tmp.csv', dtype=str) + assert_that(pcoll | beam.Map(max), equal_to(['99'])) + def test_sharding_parameters(self): data = pd.DataFrame({'label': ['11a', '37a', '389a'], 'rank': [0, 1, 2]}) output = self.temp_dir() diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index 2e815408314c..852b49c4e2ed 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -302,6 +302,7 @@ def __repr__(self, indent=0): self.outputs)) # First define some helper functions. + @_memoize def output_partitioning_in_stage(expr, stage): """Return the output partitioning of expr when computed in stage, or returns None if the expression cannot be computed in this stage.