From b0484e792bc615b7494b5195e3ef3d464ec7e426 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 26 Aug 2022 16:34:13 -0400 Subject: [PATCH] Fix logical type with same language type gets completely hidden introduced in #22679 --- .../io/external/xlang_jdbcio_it_test.py | 6 ++++++ sdks/python/apache_beam/typehints/schemas.py | 15 ++++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py index d4d58476b5e2..62620122e598 100644 --- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py @@ -36,6 +36,8 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.typehints.schemas import LogicalType +from apache_beam.typehints.schemas import MillisInstant from apache_beam.utils.timestamp import Timestamp # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -192,6 +194,10 @@ def test_xlang_jdbc_read(self, database): self.engine.execute( "INSERT INTO {} VALUES({},'{}')".format(table_name, i, strtime)) + # Register MillisInstant logical type to override the mapping from Timestamp + # originally handled by MicrosInstant. + LogicalType.register_logical_type(MillisInstant) + with TestPipeline() as p: p.not_use_test_runner_api = True result = ( diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index c98b5a6e84fc..dc572b4d07dd 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -588,6 +588,7 @@ def to_language_type(self, value): def register_logical_type(cls, logical_type_cls): """Register an implementation of LogicalType.""" cls._known_logical_types.add(logical_type_cls.urn(), logical_type_cls) + return logical_type_cls @classmethod def from_typing(cls, typ): @@ -661,9 +662,16 @@ class MillisInstant(NoArgumentLogicalType[Timestamp, np.int64]): """Millisecond-precision instant logical type handles values consistent with that encoded by ``InstantCoder`` in the Java SDK. - This class handles Timestamp language type as :class:`MicrosInstant`, but it - only provides millisecond precision, because it is aimed to handle data - encoded by Java sdk's InstantCoder which has same precision level. + This class handles :class:`apache_beam.utils.timestamp.Timestamp` language + type as :class:`MicrosInstant`, but it only provides millisecond precision, + because it is aimed to handle data encoded by Java sdk's InstantCoder which + has same precision level. + + Timestamp is handled by `MicrosInstant` by default. In some scenario, such as + read from cross-language transform with rows containing InstantCoder encoded + timestamps, one may need to override the mapping of Timetamp to MillisInstant. + To do this, re-register this class with + :func:`~LogicalType.register_logical_type`. """ @classmethod def representation_type(cls): @@ -722,6 +730,7 @@ def to_language_type(self, value): @LogicalType.register_logical_type class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): + """A logical type for PythonCallableSource objects.""" @classmethod def urn(cls): return common_urns.python_callable.urn