diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 19834c8f26..b4f11a73c7 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -640,10 +640,7 @@ def _make_inferences( ) for odfv in odfvs_to_update: - try: - odfv.infer_features() - except Exception as _: - odfv.infer_features(use_lists=False) + odfv.infer_features() odfvs_to_write = [ odfv for odfv in odfvs_to_update if odfv.write_to_online_store diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 88874235a6..0ae87b5e35 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -74,6 +74,7 @@ class OnDemandFeatureView(BaseFeatureView): tags: dict[str, str] owner: str write_to_online_store: bool + singleton: bool def __init__( # noqa: C901 self, @@ -98,6 +99,7 @@ def __init__( # noqa: C901 tags: Optional[dict[str, str]] = None, owner: str = "", write_to_online_store: bool = False, + singleton: bool = False, ): """ Creates an OnDemandFeatureView object. @@ -121,6 +123,8 @@ def __init__( # noqa: C901 of the primary maintainer. write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to the online store for faster retrieval. + singleton (optional): A boolean that indicates whether the transformation is executed on a singleton + (only applicable when mode="python"). """ super().__init__( name=name, @@ -204,6 +208,9 @@ def __init__( # noqa: C901 self.features = features self.feature_transformation = feature_transformation self.write_to_online_store = write_to_online_store + self.singleton = singleton + if self.singleton and self.mode != "python": + raise ValueError("Singleton is only supported for Python mode.") @property def proto_class(self) -> type[OnDemandFeatureViewProto]: @@ -221,6 +228,7 @@ def __copy__(self): tags=self.tags, owner=self.owner, write_to_online_store=self.write_to_online_store, + singleton=self.singleton, ) fv.entities = self.entities fv.features = self.features @@ -247,6 +255,7 @@ def __eq__(self, other): or self.feature_transformation != other.feature_transformation or self.write_to_online_store != other.write_to_online_store or sorted(self.entity_columns) != sorted(other.entity_columns) + or self.singleton != other.singleton ): return False @@ -328,6 +337,7 @@ def to_proto(self) -> OnDemandFeatureViewProto: tags=self.tags, owner=self.owner, write_to_online_store=self.write_to_online_store, + singleton=self.singleton if self.singleton else False, ) return OnDemandFeatureViewProto(spec=spec, meta=meta) @@ -434,6 +444,9 @@ def from_proto( ] else: entity_columns = [] + singleton = False + if hasattr(on_demand_feature_view_proto.spec, "singleton"): + singleton = on_demand_feature_view_proto.spec.singleton on_demand_feature_view_obj = cls( name=on_demand_feature_view_proto.spec.name, @@ -451,6 +464,7 @@ def from_proto( tags=dict(on_demand_feature_view_proto.spec.tags), owner=on_demand_feature_view_proto.spec.owner, write_to_online_store=write_to_online_store, + singleton=singleton, ) on_demand_feature_view_obj.entities = entities @@ -614,15 +628,18 @@ def transform_dict( feature_dict[full_feature_ref] = feature_dict[feature.name] columns_to_cleanup.append(str(full_feature_ref)) - output_dict: dict[str, Any] = self.feature_transformation.transform( - feature_dict - ) + if self.singleton and self.mode == "python": + output_dict: dict[str, Any] = ( + self.feature_transformation.transform_singleton(feature_dict) + ) + else: + output_dict = self.feature_transformation.transform(feature_dict) for feature_name in columns_to_cleanup: del output_dict[feature_name] return output_dict - def infer_features(self, use_lists=True) -> None: - random_input = self._construct_random_input(use_lists) + def infer_features(self) -> None: + random_input = self._construct_random_input(singleton=self.singleton) inferred_features = self.feature_transformation.infer_features(random_input) if self.features: @@ -644,7 +661,7 @@ def infer_features(self, use_lists=True) -> None: ) def _construct_random_input( - self, use_lists: bool = True + self, singleton: bool = False ) -> dict[str, Union[list[Any], Any]]: rand_dict_value: dict[ValueType, Union[list[Any], Any]] = { ValueType.BYTES: [str.encode("hello world")], @@ -664,10 +681,10 @@ def _construct_random_input( ValueType.BOOL_LIST: [[True]], ValueType.UNIX_TIMESTAMP_LIST: [[_utc_now()]], } - if not use_lists: + if singleton: rand_dict_value = {k: rand_dict_value[k][0] for k in rand_dict_value} - rand_missing_value = [None] if use_lists else None + rand_missing_value = [None] if singleton else None feature_dict = {} for feature_view_projection in self.source_feature_view_projections.values(): for feature in feature_view_projection.features: @@ -719,6 +736,7 @@ def on_demand_feature_view( tags: Optional[dict[str, str]] = None, owner: str = "", write_to_online_store: bool = False, + singleton: bool = False, ): """ Creates an OnDemandFeatureView object with the given user function as udf. @@ -737,6 +755,8 @@ def on_demand_feature_view( of the primary maintainer. write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to the online store for faster retrieval. + singleton (optional): A boolean that indicates whether the transformation is executed on a singleton + (only applicable when mode="python"). """ def mainify(obj) -> None: @@ -781,6 +801,7 @@ def decorator(user_function): owner=owner, write_to_online_store=write_to_online_store, entities=entities, + singleton=singleton, ) functools.update_wrapper( wrapper=on_demand_feature_view_obj, wrapped=user_function diff --git a/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.py b/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.py index a27c4fba3b..020515a6b8 100644 --- a/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.py +++ b/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.py @@ -20,7 +20,7 @@ from feast.protos.feast.core import Transformation_pb2 as feast_dot_core_dot_Transformation__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n$feast/core/OnDemandFeatureView.proto\x12\nfeast.core\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1c\x66\x65\x61st/core/FeatureView.proto\x1a&feast/core/FeatureViewProjection.proto\x1a\x18\x66\x65\x61st/core/Feature.proto\x1a\x1b\x66\x65\x61st/core/DataSource.proto\x1a\x1f\x66\x65\x61st/core/Transformation.proto\"{\n\x13OnDemandFeatureView\x12\x31\n\x04spec\x18\x01 \x01(\x0b\x32#.feast.core.OnDemandFeatureViewSpec\x12\x31\n\x04meta\x18\x02 \x01(\x0b\x32#.feast.core.OnDemandFeatureViewMeta\"\xfd\x04\n\x17OnDemandFeatureViewSpec\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0f\n\x07project\x18\x02 \x01(\t\x12+\n\x08\x66\x65\x61tures\x18\x03 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x12\x41\n\x07sources\x18\x04 \x03(\x0b\x32\x30.feast.core.OnDemandFeatureViewSpec.SourcesEntry\x12\x42\n\x15user_defined_function\x18\x05 \x01(\x0b\x32\x1f.feast.core.UserDefinedFunctionB\x02\x18\x01\x12\x43\n\x16\x66\x65\x61ture_transformation\x18\n \x01(\x0b\x32#.feast.core.FeatureTransformationV2\x12\x13\n\x0b\x64\x65scription\x18\x06 \x01(\t\x12;\n\x04tags\x18\x07 \x03(\x0b\x32-.feast.core.OnDemandFeatureViewSpec.TagsEntry\x12\r\n\x05owner\x18\x08 \x01(\t\x12\x0c\n\x04mode\x18\x0b \x01(\t\x12\x1d\n\x15write_to_online_store\x18\x0c \x01(\x08\x12\x10\n\x08\x65ntities\x18\r \x03(\t\x12\x31\n\x0e\x65ntity_columns\x18\x0e \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x1aJ\n\x0cSourcesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12)\n\x05value\x18\x02 \x01(\x0b\x32\x1a.feast.core.OnDemandSource:\x02\x38\x01\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x8c\x01\n\x17OnDemandFeatureViewMeta\x12\x35\n\x11\x63reated_timestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12:\n\x16last_updated_timestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xc8\x01\n\x0eOnDemandSource\x12/\n\x0c\x66\x65\x61ture_view\x18\x01 \x01(\x0b\x32\x17.feast.core.FeatureViewH\x00\x12\x44\n\x17\x66\x65\x61ture_view_projection\x18\x03 \x01(\x0b\x32!.feast.core.FeatureViewProjectionH\x00\x12\x35\n\x13request_data_source\x18\x02 \x01(\x0b\x32\x16.feast.core.DataSourceH\x00\x42\x08\n\x06source\"H\n\x13UserDefinedFunction\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04\x62ody\x18\x02 \x01(\x0c\x12\x11\n\tbody_text\x18\x03 \x01(\t:\x02\x18\x01\x42]\n\x10\x66\x65\x61st.proto.coreB\x18OnDemandFeatureViewProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n$feast/core/OnDemandFeatureView.proto\x12\nfeast.core\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1c\x66\x65\x61st/core/FeatureView.proto\x1a&feast/core/FeatureViewProjection.proto\x1a\x18\x66\x65\x61st/core/Feature.proto\x1a\x1b\x66\x65\x61st/core/DataSource.proto\x1a\x1f\x66\x65\x61st/core/Transformation.proto\"{\n\x13OnDemandFeatureView\x12\x31\n\x04spec\x18\x01 \x01(\x0b\x32#.feast.core.OnDemandFeatureViewSpec\x12\x31\n\x04meta\x18\x02 \x01(\x0b\x32#.feast.core.OnDemandFeatureViewMeta\"\x90\x05\n\x17OnDemandFeatureViewSpec\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0f\n\x07project\x18\x02 \x01(\t\x12+\n\x08\x66\x65\x61tures\x18\x03 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x12\x41\n\x07sources\x18\x04 \x03(\x0b\x32\x30.feast.core.OnDemandFeatureViewSpec.SourcesEntry\x12\x42\n\x15user_defined_function\x18\x05 \x01(\x0b\x32\x1f.feast.core.UserDefinedFunctionB\x02\x18\x01\x12\x43\n\x16\x66\x65\x61ture_transformation\x18\n \x01(\x0b\x32#.feast.core.FeatureTransformationV2\x12\x13\n\x0b\x64\x65scription\x18\x06 \x01(\t\x12;\n\x04tags\x18\x07 \x03(\x0b\x32-.feast.core.OnDemandFeatureViewSpec.TagsEntry\x12\r\n\x05owner\x18\x08 \x01(\t\x12\x0c\n\x04mode\x18\x0b \x01(\t\x12\x1d\n\x15write_to_online_store\x18\x0c \x01(\x08\x12\x10\n\x08\x65ntities\x18\r \x03(\t\x12\x31\n\x0e\x65ntity_columns\x18\x0e \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x12\x11\n\tsingleton\x18\x0f \x01(\x08\x1aJ\n\x0cSourcesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12)\n\x05value\x18\x02 \x01(\x0b\x32\x1a.feast.core.OnDemandSource:\x02\x38\x01\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x8c\x01\n\x17OnDemandFeatureViewMeta\x12\x35\n\x11\x63reated_timestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12:\n\x16last_updated_timestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xc8\x01\n\x0eOnDemandSource\x12/\n\x0c\x66\x65\x61ture_view\x18\x01 \x01(\x0b\x32\x17.feast.core.FeatureViewH\x00\x12\x44\n\x17\x66\x65\x61ture_view_projection\x18\x03 \x01(\x0b\x32!.feast.core.FeatureViewProjectionH\x00\x12\x35\n\x13request_data_source\x18\x02 \x01(\x0b\x32\x16.feast.core.DataSourceH\x00\x42\x08\n\x06source\"H\n\x13UserDefinedFunction\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04\x62ody\x18\x02 \x01(\x0c\x12\x11\n\tbody_text\x18\x03 \x01(\t:\x02\x18\x01\x42]\n\x10\x66\x65\x61st.proto.coreB\x18OnDemandFeatureViewProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -39,15 +39,15 @@ _globals['_ONDEMANDFEATUREVIEW']._serialized_start=243 _globals['_ONDEMANDFEATUREVIEW']._serialized_end=366 _globals['_ONDEMANDFEATUREVIEWSPEC']._serialized_start=369 - _globals['_ONDEMANDFEATUREVIEWSPEC']._serialized_end=1006 - _globals['_ONDEMANDFEATUREVIEWSPEC_SOURCESENTRY']._serialized_start=887 - _globals['_ONDEMANDFEATUREVIEWSPEC_SOURCESENTRY']._serialized_end=961 - _globals['_ONDEMANDFEATUREVIEWSPEC_TAGSENTRY']._serialized_start=963 - _globals['_ONDEMANDFEATUREVIEWSPEC_TAGSENTRY']._serialized_end=1006 - _globals['_ONDEMANDFEATUREVIEWMETA']._serialized_start=1009 - _globals['_ONDEMANDFEATUREVIEWMETA']._serialized_end=1149 - _globals['_ONDEMANDSOURCE']._serialized_start=1152 - _globals['_ONDEMANDSOURCE']._serialized_end=1352 - _globals['_USERDEFINEDFUNCTION']._serialized_start=1354 - _globals['_USERDEFINEDFUNCTION']._serialized_end=1426 + _globals['_ONDEMANDFEATUREVIEWSPEC']._serialized_end=1025 + _globals['_ONDEMANDFEATUREVIEWSPEC_SOURCESENTRY']._serialized_start=906 + _globals['_ONDEMANDFEATUREVIEWSPEC_SOURCESENTRY']._serialized_end=980 + _globals['_ONDEMANDFEATUREVIEWSPEC_TAGSENTRY']._serialized_start=982 + _globals['_ONDEMANDFEATUREVIEWSPEC_TAGSENTRY']._serialized_end=1025 + _globals['_ONDEMANDFEATUREVIEWMETA']._serialized_start=1028 + _globals['_ONDEMANDFEATUREVIEWMETA']._serialized_end=1168 + _globals['_ONDEMANDSOURCE']._serialized_start=1171 + _globals['_ONDEMANDSOURCE']._serialized_end=1371 + _globals['_USERDEFINEDFUNCTION']._serialized_start=1373 + _globals['_USERDEFINEDFUNCTION']._serialized_end=1445 # @@protoc_insertion_point(module_scope) diff --git a/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.pyi b/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.pyi index b2ec15b186..3380779c97 100644 --- a/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.pyi +++ b/sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.pyi @@ -107,6 +107,7 @@ class OnDemandFeatureViewSpec(google.protobuf.message.Message): WRITE_TO_ONLINE_STORE_FIELD_NUMBER: builtins.int ENTITIES_FIELD_NUMBER: builtins.int ENTITY_COLUMNS_FIELD_NUMBER: builtins.int + SINGLETON_FIELD_NUMBER: builtins.int name: builtins.str """Name of the feature view. Must be unique. Not updated.""" project: builtins.str @@ -137,6 +138,7 @@ class OnDemandFeatureViewSpec(google.protobuf.message.Message): @property def entity_columns(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[feast.core.Feature_pb2.FeatureSpecV2]: """List of specifications for each entity defined as part of this feature view.""" + singleton: builtins.bool def __init__( self, *, @@ -153,9 +155,10 @@ class OnDemandFeatureViewSpec(google.protobuf.message.Message): write_to_online_store: builtins.bool = ..., entities: collections.abc.Iterable[builtins.str] | None = ..., entity_columns: collections.abc.Iterable[feast.core.Feature_pb2.FeatureSpecV2] | None = ..., + singleton: builtins.bool = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["feature_transformation", b"feature_transformation", "user_defined_function", b"user_defined_function"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["description", b"description", "entities", b"entities", "entity_columns", b"entity_columns", "feature_transformation", b"feature_transformation", "features", b"features", "mode", b"mode", "name", b"name", "owner", b"owner", "project", b"project", "sources", b"sources", "tags", b"tags", "user_defined_function", b"user_defined_function", "write_to_online_store", b"write_to_online_store"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["description", b"description", "entities", b"entities", "entity_columns", b"entity_columns", "feature_transformation", b"feature_transformation", "features", b"features", "mode", b"mode", "name", b"name", "owner", b"owner", "project", b"project", "singleton", b"singleton", "sources", b"sources", "tags", b"tags", "user_defined_function", b"user_defined_function", "write_to_online_store", b"write_to_online_store"]) -> None: ... global___OnDemandFeatureViewSpec = OnDemandFeatureViewSpec diff --git a/sdk/python/feast/transformation/pandas_transformation.py b/sdk/python/feast/transformation/pandas_transformation.py index ac31a4fa20..35e786aac8 100644 --- a/sdk/python/feast/transformation/pandas_transformation.py +++ b/sdk/python/feast/transformation/pandas_transformation.py @@ -35,6 +35,11 @@ def transform_arrow( def transform(self, input_df: pd.DataFrame) -> pd.DataFrame: return self.udf(input_df) + def transform_singleton(self, input_df: pd.DataFrame) -> pd.DataFrame: + raise ValueError( + "PandasTransformation does not support singleton transformations." + ) + def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]: df = pd.DataFrame.from_dict(random_input) output_df: pd.DataFrame = self.transform(df) diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index edc20bf803..ce2aaf2002 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -37,6 +37,14 @@ def transform(self, input_dict: dict) -> dict: output_dict = self.udf.__call__(input_dict) return {**input_dict, **output_dict} + def transform_singleton(self, input_dict: dict) -> dict: + # This flattens the list of elements to extract the first one + # in the case of a singleton element, it takes the value directly + # in the case of a list of lists, it takes the first list + input_dict = {k: v[0] for k, v in input_dict.items()} + output_dict = self.udf.__call__(input_dict) + return {**input_dict, **output_dict} + def infer_features(self, random_input: dict[str, Any]) -> list[Field]: output_dict: dict[str, Any] = self.transform(random_input) diff --git a/sdk/python/feast/transformation/substrait_transformation.py b/sdk/python/feast/transformation/substrait_transformation.py index 1de60aed00..47e2ced976 100644 --- a/sdk/python/feast/transformation/substrait_transformation.py +++ b/sdk/python/feast/transformation/substrait_transformation.py @@ -38,6 +38,11 @@ def table_provider(names, schema: pyarrow.Schema): ).read_all() return table.to_pandas() + def transform_singleton(self, input_df: pd.DataFrame) -> pd.DataFrame: + raise ValueError( + "SubstraitTransform does not support singleton transformations." + ) + def transform_ibis(self, table): return self.ibis_function(table) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 32cd2f606c..49af4fe717 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -570,6 +570,8 @@ def _augment_response_with_on_demand_transforms( proto_values.append( python_values_to_proto_values( feature_vector + if isinstance(feature_vector, list) + else [feature_vector] if odfv.mode == "python" else feature_vector.to_numpy(), feature_type, diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 521ad4391f..e8ab3040b4 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -162,6 +162,7 @@ def python_demo_view(inputs: dict[str, Any]) -> dict[str, Any]: Field(name="conv_rate_plus_acc_python_singleton", dtype=Float64) ], mode="python", + singleton=True, ) def python_singleton_view(inputs: dict[str, Any]) -> dict[str, Any]: output: dict[str, Any] = dict(conv_rate_plus_acc_python=float("-inf")) @@ -233,6 +234,8 @@ def test_python_singleton_view(self): entity_rows = [ { "driver_id": 1001, + "acc_rate": 0.25, + "conv_rate": 0.25, } ]