From 792baa2645909571ad0e86fe17a2081aaa690d12 Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Thu, 12 May 2022 14:46:24 -0700 Subject: [PATCH] move logical types def --- .../beam/model/pipeline/v1/beam_runner_api.proto | 7 ------- .../apache/beam/model/pipeline/v1/schema.proto | 16 ++++++++++++++++ .../sdk/schemas/logicaltypes/PythonCallable.java | 4 ++-- .../apache_beam/portability/common_urns.py | 3 ++- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index 3974a4f48627..54d328cce82c 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -1094,13 +1094,6 @@ message StandardCoders { } } -message LogicalTypes { - enum Enum { - // A URN for Python Callable logical type - PYTHON_CALLABLE = 0 [(beam_urn) = "beam:logical_type:python_callable:v1"]; - } -} - // A windowing strategy describes the window function, triggering, allowed // lateness, and accumulation mode for a PCollection. // diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto index b26fc8fef8d6..af3c72ebaf8c 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto @@ -31,6 +31,8 @@ option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v option java_package = "org.apache.beam.model.pipeline.v1"; option java_outer_classname = "SchemaApi"; +import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto"; + message Schema { // List of fields for this schema. Two fields may not share a name. repeated Field fields = 1; @@ -110,6 +112,20 @@ message LogicalType { FieldValue argument = 5; } +// Universally defined Logical types for Row schemas. +// These logical types are supposed to be understood by all SDKs. +message LogicalTypes { + enum Enum { + // A URN for Python Callable logical type + // - Representation type: STRING + // - Language type: In Python SDK, PythonCallableWithSource. + // In any other SDKs, a wrapper object for a string which + // can be evaluated to a Python Callable object. + PYTHON_CALLABLE = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:logical_type:python_callable:v1"]; + } +} + message Option { // REQUIRED. Identifier for the option. string name = 1; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java index ac0d918e7bc3..ea4e297515e9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/PythonCallable.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.schemas.logicaltypes; import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.model.pipeline.v1.RunnerApi.LogicalTypes; +import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.LogicalType; @@ -30,7 +30,7 @@ @Experimental(Experimental.Kind.SCHEMAS) public class PythonCallable implements LogicalType { public static final String IDENTIFIER = - LogicalTypes.Enum.PYTHON_CALLABLE + SchemaApi.LogicalTypes.Enum.PYTHON_CALLABLE .getValueDescriptor() .getOptions() .getExtension(RunnerApi.beamUrn); diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 1cda8e832626..6a9ff62a5e81 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -22,10 +22,10 @@ from .api import beam_runner_api_pb2_urns from .api import external_transforms_pb2_urns from .api import metrics_pb2_urns +from .api import schema_pb2_urns from .api import standard_window_fns_pb2_urns BeamConstants = beam_runner_api_pb2_urns.BeamConstants -LogicalTypes = beam_runner_api_pb2_urns.LogicalTypes StandardArtifacts = beam_runner_api_pb2_urns.StandardArtifacts StandardCoders = beam_runner_api_pb2_urns.StandardCoders StandardDisplayData = beam_runner_api_pb2_urns.StandardDisplayData @@ -40,6 +40,7 @@ MonitoringInfo = metrics_pb2_urns.MonitoringInfo MonitoringInfoSpecs = metrics_pb2_urns.MonitoringInfoSpecs MonitoringInfoTypeUrns = metrics_pb2_urns.MonitoringInfoTypeUrns +LogicalTypes = schema_pb2_urns.LogicalTypes FixedWindowsPayload = standard_window_fns_pb2_urns.FixedWindowsPayload GlobalWindowsPayload = standard_window_fns_pb2_urns.GlobalWindowsPayload SessionWindowsPayload = standard_window_fns_pb2_urns.SessionWindowsPayload