Skip to content

Commit

Permalink
Fix FlatMapTuple typehint bug (#33307)
Browse files Browse the repository at this point in the history
* create unit test

* minimize to not using flatmaptuple

* fix by adding a tuple conersion in flatmaptuple

* add comment referring to ticket

* remove extra pipeline

* manually isort

* retrigger builder

* retrigger builder

* isort?

* try manually isorting again

* Revert "try manually isorting again"

This reverts commit a0fac32.

* manually fix isort
  • Loading branch information
hjtran authored Dec 6, 2024
1 parent d138b75 commit 1712964
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2238,7 +2238,7 @@ def FlatMapTuple(fn, *args, **kwargs): # pylint: disable=invalid-name
if defaults or args or kwargs:
wrapper = lambda x, *args, **kwargs: fn(*(tuple(x) + args), **kwargs)
else:
wrapper = lambda x: fn(*x)
wrapper = lambda x: fn(*tuple(x))

# Proxy the type-hint information from the original function to this new
# wrapped function.
Expand Down
18 changes: 18 additions & 0 deletions sdks/python/apache_beam/typehints/typed_pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import typing
import unittest
from typing import Tuple

import apache_beam as beam
from apache_beam import pvalue
Expand Down Expand Up @@ -999,5 +1000,22 @@ def filter_fn(element: int) -> bool:
self.assertEqual(th.output_types, ((int, ), {}))


class TestFlatMapTuple(unittest.TestCase):
def test_flatmaptuple(self):
# Regression test. See
# https://github.com/apache/beam/issues/33014

def identity(x: Tuple[str, int]) -> Tuple[str, int]:
return x

with beam.Pipeline() as p:
# Just checking that this doesn't raise an exception.
(
p
| "Generate input" >> beam.Create([('P1', [2])])
| "Flat" >> beam.FlatMapTuple(lambda k, vs: [(k, v) for v in vs])
| "Identity" >> beam.Map(identity))


if __name__ == '__main__':
unittest.main()

0 comments on commit 1712964

Please sign in to comment.