From 716de2e47c4963b091b9e8ba0f9a07bce0edafbd Mon Sep 17 00:00:00 2001 From: Francisco Arceo Date: Wed, 6 Nov 2024 06:18:18 -0500 Subject: [PATCH] feat: Adding support for native Python transformations on a single dictionary (#4724) * feat: Adding support for native Python transformations on a dictionary Signed-off-by: Francisco Javier Arceo * Updated type checking and added exception handling to try basic dict...not an ideal solution Signed-off-by: Francisco Javier Arceo * updated tests Signed-off-by: Francisco Javier Arceo * adding protos Signed-off-by: Francisco Javier Arceo * fixed unit test Signed-off-by: Francisco Javier Arceo --------- Signed-off-by: Francisco Javier Arceo --- protos/feast/core/OnDemandFeatureView.proto | 1 + sdk/python/feast/on_demand_feature_view.py | 49 ++++++++++---- .../feast/core/OnDemandFeatureView_pb2.py | 24 +++---- .../feast/core/OnDemandFeatureView_pb2.pyi | 5 +- .../transformation/pandas_transformation.py | 5 ++ .../transformation/python_transformation.py | 33 +++++++--- .../substrait_transformation.py | 5 ++ sdk/python/feast/utils.py | 2 + .../test_on_demand_python_transformation.py | 66 ++++++++++++------- 9 files changed, 134 insertions(+), 56 deletions(-) diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index 789a9011d4..3ed8ffe4ae 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -69,6 +69,7 @@ message OnDemandFeatureViewSpec { repeated string entities = 13; // List of specifications for each entity defined as part of this feature view. repeated FeatureSpecV2 entity_columns = 14; + bool singleton = 15; } message OnDemandFeatureViewMeta { diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 1b75d23ed4..0ae87b5e35 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -74,6 +74,7 @@ class OnDemandFeatureView(BaseFeatureView): tags: dict[str, str] owner: str write_to_online_store: bool + singleton: bool def __init__( # noqa: C901 self, @@ -98,6 +99,7 @@ def __init__( # noqa: C901 tags: Optional[dict[str, str]] = None, owner: str = "", write_to_online_store: bool = False, + singleton: bool = False, ): """ Creates an OnDemandFeatureView object. @@ -121,6 +123,8 @@ def __init__( # noqa: C901 of the primary maintainer. write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to the online store for faster retrieval. + singleton (optional): A boolean that indicates whether the transformation is executed on a singleton + (only applicable when mode="python"). """ super().__init__( name=name, @@ -204,6 +208,9 @@ def __init__( # noqa: C901 self.features = features self.feature_transformation = feature_transformation self.write_to_online_store = write_to_online_store + self.singleton = singleton + if self.singleton and self.mode != "python": + raise ValueError("Singleton is only supported for Python mode.") @property def proto_class(self) -> type[OnDemandFeatureViewProto]: @@ -221,6 +228,7 @@ def __copy__(self): tags=self.tags, owner=self.owner, write_to_online_store=self.write_to_online_store, + singleton=self.singleton, ) fv.entities = self.entities fv.features = self.features @@ -247,6 +255,7 @@ def __eq__(self, other): or self.feature_transformation != other.feature_transformation or self.write_to_online_store != other.write_to_online_store or sorted(self.entity_columns) != sorted(other.entity_columns) + or self.singleton != other.singleton ): return False @@ -328,6 +337,7 @@ def to_proto(self) -> OnDemandFeatureViewProto: tags=self.tags, owner=self.owner, write_to_online_store=self.write_to_online_store, + singleton=self.singleton if self.singleton else False, ) return OnDemandFeatureViewProto(spec=spec, meta=meta) @@ -434,6 +444,9 @@ def from_proto( ] else: entity_columns = [] + singleton = False + if hasattr(on_demand_feature_view_proto.spec, "singleton"): + singleton = on_demand_feature_view_proto.spec.singleton on_demand_feature_view_obj = cls( name=on_demand_feature_view_proto.spec.name, @@ -451,6 +464,7 @@ def from_proto( tags=dict(on_demand_feature_view_proto.spec.tags), owner=on_demand_feature_view_proto.spec.owner, write_to_online_store=write_to_online_store, + singleton=singleton, ) on_demand_feature_view_obj.entities = entities @@ -614,17 +628,19 @@ def transform_dict( feature_dict[full_feature_ref] = feature_dict[feature.name] columns_to_cleanup.append(str(full_feature_ref)) - output_dict: dict[str, Any] = self.feature_transformation.transform( - feature_dict - ) + if self.singleton and self.mode == "python": + output_dict: dict[str, Any] = ( + self.feature_transformation.transform_singleton(feature_dict) + ) + else: + output_dict = self.feature_transformation.transform(feature_dict) for feature_name in columns_to_cleanup: del output_dict[feature_name] return output_dict def infer_features(self) -> None: - inferred_features = self.feature_transformation.infer_features( - self._construct_random_input() - ) + random_input = self._construct_random_input(singleton=self.singleton) + inferred_features = self.feature_transformation.infer_features(random_input) if self.features: missing_features = [] @@ -644,8 +660,10 @@ def infer_features(self) -> None: f"Could not infer Features for the feature view '{self.name}'.", ) - def _construct_random_input(self) -> dict[str, list[Any]]: - rand_dict_value: dict[ValueType, list[Any]] = { + def _construct_random_input( + self, singleton: bool = False + ) -> dict[str, Union[list[Any], Any]]: + rand_dict_value: dict[ValueType, Union[list[Any], Any]] = { ValueType.BYTES: [str.encode("hello world")], ValueType.STRING: ["hello world"], ValueType.INT32: [1], @@ -663,20 +681,25 @@ def _construct_random_input(self) -> dict[str, list[Any]]: ValueType.BOOL_LIST: [[True]], ValueType.UNIX_TIMESTAMP_LIST: [[_utc_now()]], } + if singleton: + rand_dict_value = {k: rand_dict_value[k][0] for k in rand_dict_value} + rand_missing_value = [None] if singleton else None feature_dict = {} for feature_view_projection in self.source_feature_view_projections.values(): for feature in feature_view_projection.features: feature_dict[f"{feature_view_projection.name}__{feature.name}"] = ( - rand_dict_value.get(feature.dtype.to_value_type(), [None]) + rand_dict_value.get( + feature.dtype.to_value_type(), rand_missing_value + ) ) feature_dict[f"{feature.name}"] = rand_dict_value.get( - feature.dtype.to_value_type(), [None] + feature.dtype.to_value_type(), rand_missing_value ) for request_data in self.source_request_sources.values(): for field in request_data.schema: feature_dict[f"{field.name}"] = rand_dict_value.get( - field.dtype.to_value_type(), [None] + field.dtype.to_value_type(), rand_missing_value ) return feature_dict @@ -713,6 +736,7 @@ def on_demand_feature_view( tags: Optional[dict[str, str]] = None, owner: str = "", write_to_online_store: bool = False, + singleton: bool = False, ): """ Creates an OnDemandFeatureView object with the given user function as udf. @@ -731,6 +755,8 @@ def on_demand_feature_view( of the primary maintainer. write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to the online store for faster retrieval. + singleton (optional): A boolean that indicates whether the transformation is executed on a singleton + (only applicable when mode="python"). """ def mainify(obj) -> None: @@ -775,6 +801,7 @@ def decorator(user_function): owner=owner, write_to_online_store=write_to_online_store, entities=entities, + singleton=singleton, ) functools.update_wrapper( wrapper=on_demand_feature_view_obj, wrapped=user_function diff --git a/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.py b/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.py index a27c4fba3b..020515a6b8 100644 --- a/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.py +++ b/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.py @@ -20,7 +20,7 @@ from feast.protos.feast.core import Transformation_pb2 as feast_dot_core_dot_Transformation__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n$feast/core/OnDemandFeatureView.proto\x12\nfeast.core\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1c\x66\x65\x61st/core/FeatureView.proto\x1a&feast/core/FeatureViewProjection.proto\x1a\x18\x66\x65\x61st/core/Feature.proto\x1a\x1b\x66\x65\x61st/core/DataSource.proto\x1a\x1f\x66\x65\x61st/core/Transformation.proto\"{\n\x13OnDemandFeatureView\x12\x31\n\x04spec\x18\x01 \x01(\x0b\x32#.feast.core.OnDemandFeatureViewSpec\x12\x31\n\x04meta\x18\x02 \x01(\x0b\x32#.feast.core.OnDemandFeatureViewMeta\"\xfd\x04\n\x17OnDemandFeatureViewSpec\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0f\n\x07project\x18\x02 \x01(\t\x12+\n\x08\x66\x65\x61tures\x18\x03 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x12\x41\n\x07sources\x18\x04 \x03(\x0b\x32\x30.feast.core.OnDemandFeatureViewSpec.SourcesEntry\x12\x42\n\x15user_defined_function\x18\x05 \x01(\x0b\x32\x1f.feast.core.UserDefinedFunctionB\x02\x18\x01\x12\x43\n\x16\x66\x65\x61ture_transformation\x18\n \x01(\x0b\x32#.feast.core.FeatureTransformationV2\x12\x13\n\x0b\x64\x65scription\x18\x06 \x01(\t\x12;\n\x04tags\x18\x07 \x03(\x0b\x32-.feast.core.OnDemandFeatureViewSpec.TagsEntry\x12\r\n\x05owner\x18\x08 \x01(\t\x12\x0c\n\x04mode\x18\x0b \x01(\t\x12\x1d\n\x15write_to_online_store\x18\x0c \x01(\x08\x12\x10\n\x08\x65ntities\x18\r \x03(\t\x12\x31\n\x0e\x65ntity_columns\x18\x0e \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x1aJ\n\x0cSourcesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12)\n\x05value\x18\x02 \x01(\x0b\x32\x1a.feast.core.OnDemandSource:\x02\x38\x01\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x8c\x01\n\x17OnDemandFeatureViewMeta\x12\x35\n\x11\x63reated_timestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12:\n\x16last_updated_timestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xc8\x01\n\x0eOnDemandSource\x12/\n\x0c\x66\x65\x61ture_view\x18\x01 \x01(\x0b\x32\x17.feast.core.FeatureViewH\x00\x12\x44\n\x17\x66\x65\x61ture_view_projection\x18\x03 \x01(\x0b\x32!.feast.core.FeatureViewProjectionH\x00\x12\x35\n\x13request_data_source\x18\x02 \x01(\x0b\x32\x16.feast.core.DataSourceH\x00\x42\x08\n\x06source\"H\n\x13UserDefinedFunction\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04\x62ody\x18\x02 \x01(\x0c\x12\x11\n\tbody_text\x18\x03 \x01(\t:\x02\x18\x01\x42]\n\x10\x66\x65\x61st.proto.coreB\x18OnDemandFeatureViewProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n$feast/core/OnDemandFeatureView.proto\x12\nfeast.core\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1c\x66\x65\x61st/core/FeatureView.proto\x1a&feast/core/FeatureViewProjection.proto\x1a\x18\x66\x65\x61st/core/Feature.proto\x1a\x1b\x66\x65\x61st/core/DataSource.proto\x1a\x1f\x66\x65\x61st/core/Transformation.proto\"{\n\x13OnDemandFeatureView\x12\x31\n\x04spec\x18\x01 \x01(\x0b\x32#.feast.core.OnDemandFeatureViewSpec\x12\x31\n\x04meta\x18\x02 \x01(\x0b\x32#.feast.core.OnDemandFeatureViewMeta\"\x90\x05\n\x17OnDemandFeatureViewSpec\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0f\n\x07project\x18\x02 \x01(\t\x12+\n\x08\x66\x65\x61tures\x18\x03 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x12\x41\n\x07sources\x18\x04 \x03(\x0b\x32\x30.feast.core.OnDemandFeatureViewSpec.SourcesEntry\x12\x42\n\x15user_defined_function\x18\x05 \x01(\x0b\x32\x1f.feast.core.UserDefinedFunctionB\x02\x18\x01\x12\x43\n\x16\x66\x65\x61ture_transformation\x18\n \x01(\x0b\x32#.feast.core.FeatureTransformationV2\x12\x13\n\x0b\x64\x65scription\x18\x06 \x01(\t\x12;\n\x04tags\x18\x07 \x03(\x0b\x32-.feast.core.OnDemandFeatureViewSpec.TagsEntry\x12\r\n\x05owner\x18\x08 \x01(\t\x12\x0c\n\x04mode\x18\x0b \x01(\t\x12\x1d\n\x15write_to_online_store\x18\x0c \x01(\x08\x12\x10\n\x08\x65ntities\x18\r \x03(\t\x12\x31\n\x0e\x65ntity_columns\x18\x0e \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x12\x11\n\tsingleton\x18\x0f \x01(\x08\x1aJ\n\x0cSourcesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12)\n\x05value\x18\x02 \x01(\x0b\x32\x1a.feast.core.OnDemandSource:\x02\x38\x01\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x8c\x01\n\x17OnDemandFeatureViewMeta\x12\x35\n\x11\x63reated_timestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12:\n\x16last_updated_timestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xc8\x01\n\x0eOnDemandSource\x12/\n\x0c\x66\x65\x61ture_view\x18\x01 \x01(\x0b\x32\x17.feast.core.FeatureViewH\x00\x12\x44\n\x17\x66\x65\x61ture_view_projection\x18\x03 \x01(\x0b\x32!.feast.core.FeatureViewProjectionH\x00\x12\x35\n\x13request_data_source\x18\x02 \x01(\x0b\x32\x16.feast.core.DataSourceH\x00\x42\x08\n\x06source\"H\n\x13UserDefinedFunction\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04\x62ody\x18\x02 \x01(\x0c\x12\x11\n\tbody_text\x18\x03 \x01(\t:\x02\x18\x01\x42]\n\x10\x66\x65\x61st.proto.coreB\x18OnDemandFeatureViewProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -39,15 +39,15 @@ _globals['_ONDEMANDFEATUREVIEW']._serialized_start=243 _globals['_ONDEMANDFEATUREVIEW']._serialized_end=366 _globals['_ONDEMANDFEATUREVIEWSPEC']._serialized_start=369 - _globals['_ONDEMANDFEATUREVIEWSPEC']._serialized_end=1006 - _globals['_ONDEMANDFEATUREVIEWSPEC_SOURCESENTRY']._serialized_start=887 - _globals['_ONDEMANDFEATUREVIEWSPEC_SOURCESENTRY']._serialized_end=961 - _globals['_ONDEMANDFEATUREVIEWSPEC_TAGSENTRY']._serialized_start=963 - _globals['_ONDEMANDFEATUREVIEWSPEC_TAGSENTRY']._serialized_end=1006 - _globals['_ONDEMANDFEATUREVIEWMETA']._serialized_start=1009 - _globals['_ONDEMANDFEATUREVIEWMETA']._serialized_end=1149 - _globals['_ONDEMANDSOURCE']._serialized_start=1152 - _globals['_ONDEMANDSOURCE']._serialized_end=1352 - _globals['_USERDEFINEDFUNCTION']._serialized_start=1354 - _globals['_USERDEFINEDFUNCTION']._serialized_end=1426 + _globals['_ONDEMANDFEATUREVIEWSPEC']._serialized_end=1025 + _globals['_ONDEMANDFEATUREVIEWSPEC_SOURCESENTRY']._serialized_start=906 + _globals['_ONDEMANDFEATUREVIEWSPEC_SOURCESENTRY']._serialized_end=980 + _globals['_ONDEMANDFEATUREVIEWSPEC_TAGSENTRY']._serialized_start=982 + _globals['_ONDEMANDFEATUREVIEWSPEC_TAGSENTRY']._serialized_end=1025 + _globals['_ONDEMANDFEATUREVIEWMETA']._serialized_start=1028 + _globals['_ONDEMANDFEATUREVIEWMETA']._serialized_end=1168 + _globals['_ONDEMANDSOURCE']._serialized_start=1171 + _globals['_ONDEMANDSOURCE']._serialized_end=1371 + _globals['_USERDEFINEDFUNCTION']._serialized_start=1373 + _globals['_USERDEFINEDFUNCTION']._serialized_end=1445 # @@protoc_insertion_point(module_scope) diff --git a/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.pyi b/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.pyi index b2ec15b186..3380779c97 100644 --- a/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.pyi +++ b/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.pyi @@ -107,6 +107,7 @@ class OnDemandFeatureViewSpec(google.protobuf.message.Message): WRITE_TO_ONLINE_STORE_FIELD_NUMBER: builtins.int ENTITIES_FIELD_NUMBER: builtins.int ENTITY_COLUMNS_FIELD_NUMBER: builtins.int + SINGLETON_FIELD_NUMBER: builtins.int name: builtins.str """Name of the feature view. Must be unique. Not updated.""" project: builtins.str @@ -137,6 +138,7 @@ class OnDemandFeatureViewSpec(google.protobuf.message.Message): @property def entity_columns(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[feast.core.Feature_pb2.FeatureSpecV2]: """List of specifications for each entity defined as part of this feature view.""" + singleton: builtins.bool def __init__( self, *, @@ -153,9 +155,10 @@ class OnDemandFeatureViewSpec(google.protobuf.message.Message): write_to_online_store: builtins.bool = ..., entities: collections.abc.Iterable[builtins.str] | None = ..., entity_columns: collections.abc.Iterable[feast.core.Feature_pb2.FeatureSpecV2] | None = ..., + singleton: builtins.bool = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["feature_transformation", b"feature_transformation", "user_defined_function", b"user_defined_function"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["description", b"description", "entities", b"entities", "entity_columns", b"entity_columns", "feature_transformation", b"feature_transformation", "features", b"features", "mode", b"mode", "name", b"name", "owner", b"owner", "project", b"project", "sources", b"sources", "tags", b"tags", "user_defined_function", b"user_defined_function", "write_to_online_store", b"write_to_online_store"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["description", b"description", "entities", b"entities", "entity_columns", b"entity_columns", "feature_transformation", b"feature_transformation", "features", b"features", "mode", b"mode", "name", b"name", "owner", b"owner", "project", b"project", "singleton", b"singleton", "sources", b"sources", "tags", b"tags", "user_defined_function", b"user_defined_function", "write_to_online_store", b"write_to_online_store"]) -> None: ... global___OnDemandFeatureViewSpec = OnDemandFeatureViewSpec diff --git a/sdk/python/feast/transformation/pandas_transformation.py b/sdk/python/feast/transformation/pandas_transformation.py index ac31a4fa20..35e786aac8 100644 --- a/sdk/python/feast/transformation/pandas_transformation.py +++ b/sdk/python/feast/transformation/pandas_transformation.py @@ -35,6 +35,11 @@ def transform_arrow( def transform(self, input_df: pd.DataFrame) -> pd.DataFrame: return self.udf(input_df) + def transform_singleton(self, input_df: pd.DataFrame) -> pd.DataFrame: + raise ValueError( + "PandasTransformation does not support singleton transformations." + ) + def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]: df = pd.DataFrame.from_dict(random_input) output_df: pd.DataFrame = self.transform(df) diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index 7e7c6e8bc3..ce2aaf2002 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -37,24 +37,39 @@ def transform(self, input_dict: dict) -> dict: output_dict = self.udf.__call__(input_dict) return {**input_dict, **output_dict} - def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]: - output_dict: dict[str, list[Any]] = self.transform(random_input) + def transform_singleton(self, input_dict: dict) -> dict: + # This flattens the list of elements to extract the first one + # in the case of a singleton element, it takes the value directly + # in the case of a list of lists, it takes the first list + input_dict = {k: v[0] for k, v in input_dict.items()} + output_dict = self.udf.__call__(input_dict) + return {**input_dict, **output_dict} + + def infer_features(self, random_input: dict[str, Any]) -> list[Field]: + output_dict: dict[str, Any] = self.transform(random_input) fields = [] for feature_name, feature_value in output_dict.items(): - if len(feature_value) <= 0: - raise TypeError( - f"Failed to infer type for feature '{feature_name}' with value " - + f"'{feature_value}' since no items were returned by the UDF." - ) + if isinstance(feature_value, list): + if len(feature_value) <= 0: + raise TypeError( + f"Failed to infer type for feature '{feature_name}' with value " + + f"'{feature_value}' since no items were returned by the UDF." + ) + inferred_type = type(feature_value[0]) + inferred_value = feature_value[0] + else: + inferred_type = type(feature_value) + inferred_value = feature_value + fields.append( Field( name=feature_name, dtype=from_value_type( python_type_to_feast_value_type( feature_name, - value=feature_value[0], - type_name=type(feature_value[0]).__name__, + value=inferred_value, + type_name=inferred_type.__name__, ) ), ) diff --git a/sdk/python/feast/transformation/substrait_transformation.py b/sdk/python/feast/transformation/substrait_transformation.py index 1de60aed00..47e2ced976 100644 --- a/sdk/python/feast/transformation/substrait_transformation.py +++ b/sdk/python/feast/transformation/substrait_transformation.py @@ -38,6 +38,11 @@ def table_provider(names, schema: pyarrow.Schema): ).read_all() return table.to_pandas() + def transform_singleton(self, input_df: pd.DataFrame) -> pd.DataFrame: + raise ValueError( + "SubstraitTransform does not support singleton transformations." + ) + def transform_ibis(self, table): return self.ibis_function(table) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 32cd2f606c..49af4fe717 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -570,6 +570,8 @@ def _augment_response_with_on_demand_transforms( proto_values.append( python_values_to_proto_values( feature_vector + if isinstance(feature_vector, list) + else [feature_vector] if odfv.mode == "python" else feature_vector.to_numpy(), feature_type, diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 530bf1fa0a..a0c33fadfd 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -126,12 +126,10 @@ def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame: ) def python_view(inputs: dict[str, Any]) -> dict[str, Any]: output: dict[str, Any] = { - "conv_rate_plus_acc_python": [ - conv_rate + acc_rate - for conv_rate, acc_rate in zip( - inputs["conv_rate"], inputs["acc_rate"] - ) - ] + "conv_rate_plus_acc_python": conv_rate + acc_rate + for conv_rate, acc_rate in zip( + inputs["conv_rate"], inputs["acc_rate"] + ) } return output @@ -166,6 +164,7 @@ def python_demo_view(inputs: dict[str, Any]) -> dict[str, Any]: Field(name="conv_rate_plus_acc_python_singleton", dtype=Float64) ], mode="python", + singleton=True, ) def python_singleton_view(inputs: dict[str, Any]) -> dict[str, Any]: output: dict[str, Any] = dict(conv_rate_plus_acc_python=float("-inf")) @@ -204,21 +203,6 @@ def python_stored_writes_feature_view( } return output - with pytest.raises(TypeError): - # Note the singleton view will fail as the type is - # expected to be a list which can be confirmed in _infer_features_dict - self.store.apply( - [ - driver, - driver_stats_source, - driver_stats_fv, - pandas_view, - python_view, - python_singleton_view, - driver_stats_entity_less_fv, - ] - ) - self.store.apply( [ driver, @@ -226,6 +210,7 @@ def python_stored_writes_feature_view( driver_stats_fv, pandas_view, python_view, + python_singleton_view, python_demo_view, driver_stats_entity_less_fv, python_stored_writes_feature_view, @@ -239,11 +224,46 @@ def python_stored_writes_feature_view( ] assert driver_stats_entity_less_fv.entity_columns == [DUMMY_ENTITY_FIELD] - assert len(self.store.list_all_feature_views()) == 6 + assert len(self.store.list_all_feature_views()) == 7 assert len(self.store.list_feature_views()) == 2 - assert len(self.store.list_on_demand_feature_views()) == 4 + assert len(self.store.list_on_demand_feature_views()) == 5 assert len(self.store.list_stream_feature_views()) == 0 + def test_setup(self): + pass + + def test_python_singleton_view(self): + entity_rows = [ + { + "driver_id": 1001, + "acc_rate": 0.25, + "conv_rate": 0.25, + } + ] + + online_python_response = self.store.get_online_features( + entity_rows=entity_rows, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "python_singleton_view:conv_rate_plus_acc_python_singleton", + ], + ).to_dict() + + assert sorted(list(online_python_response.keys())) == sorted( + [ + "driver_id", + "acc_rate", + "conv_rate", + "conv_rate_plus_acc_python_singleton", + ] + ) + + assert online_python_response["conv_rate_plus_acc_python_singleton"][0] == ( + online_python_response["conv_rate"][0] + + online_python_response["acc_rate"][0] + ) + def test_python_pandas_parity(self): entity_rows = [ {