Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding support for native Python transformations on a single dictionary #4724

Merged
merged 5 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ message OnDemandFeatureViewSpec {
repeated string entities = 13;
// List of specifications for each entity defined as part of this feature view.
repeated FeatureSpecV2 entity_columns = 14;
bool singleton = 15;
}

message OnDemandFeatureViewMeta {
Expand Down
49 changes: 38 additions & 11 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class OnDemandFeatureView(BaseFeatureView):
tags: dict[str, str]
owner: str
write_to_online_store: bool
singleton: bool

def __init__( # noqa: C901
self,
Expand All @@ -98,6 +99,7 @@ def __init__( # noqa: C901
tags: Optional[dict[str, str]] = None,
owner: str = "",
write_to_online_store: bool = False,
singleton: bool = False,
):
"""
Creates an OnDemandFeatureView object.
Expand All @@ -121,6 +123,8 @@ def __init__( # noqa: C901
of the primary maintainer.
write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to
the online store for faster retrieval.
singleton (optional): A boolean that indicates whether the transformation is executed on a singleton
(only applicable when mode="python").
"""
super().__init__(
name=name,
Expand Down Expand Up @@ -204,6 +208,9 @@ def __init__( # noqa: C901
self.features = features
self.feature_transformation = feature_transformation
self.write_to_online_store = write_to_online_store
self.singleton = singleton
if self.singleton and self.mode != "python":
raise ValueError("Singleton is only supported for Python mode.")

@property
def proto_class(self) -> type[OnDemandFeatureViewProto]:
Expand All @@ -221,6 +228,7 @@ def __copy__(self):
tags=self.tags,
owner=self.owner,
write_to_online_store=self.write_to_online_store,
singleton=self.singleton,
)
fv.entities = self.entities
fv.features = self.features
Expand All @@ -247,6 +255,7 @@ def __eq__(self, other):
or self.feature_transformation != other.feature_transformation
or self.write_to_online_store != other.write_to_online_store
or sorted(self.entity_columns) != sorted(other.entity_columns)
or self.singleton != other.singleton
):
return False

Expand Down Expand Up @@ -328,6 +337,7 @@ def to_proto(self) -> OnDemandFeatureViewProto:
tags=self.tags,
owner=self.owner,
write_to_online_store=self.write_to_online_store,
singleton=self.singleton if self.singleton else False,
)

return OnDemandFeatureViewProto(spec=spec, meta=meta)
Expand Down Expand Up @@ -434,6 +444,9 @@ def from_proto(
]
else:
entity_columns = []
singleton = False
if hasattr(on_demand_feature_view_proto.spec, "singleton"):
singleton = on_demand_feature_view_proto.spec.singleton

on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
Expand All @@ -451,6 +464,7 @@ def from_proto(
tags=dict(on_demand_feature_view_proto.spec.tags),
owner=on_demand_feature_view_proto.spec.owner,
write_to_online_store=write_to_online_store,
singleton=singleton,
)

on_demand_feature_view_obj.entities = entities
Expand Down Expand Up @@ -614,17 +628,19 @@ def transform_dict(
feature_dict[full_feature_ref] = feature_dict[feature.name]
columns_to_cleanup.append(str(full_feature_ref))

output_dict: dict[str, Any] = self.feature_transformation.transform(
feature_dict
)
if self.singleton and self.mode == "python":
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robhowley see here

output_dict: dict[str, Any] = (
self.feature_transformation.transform_singleton(feature_dict)
)
else:
output_dict = self.feature_transformation.transform(feature_dict)
for feature_name in columns_to_cleanup:
del output_dict[feature_name]
return output_dict

def infer_features(self) -> None:
inferred_features = self.feature_transformation.infer_features(
self._construct_random_input()
)
random_input = self._construct_random_input(singleton=self.singleton)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robhowley and here

inferred_features = self.feature_transformation.infer_features(random_input)

if self.features:
missing_features = []
Expand All @@ -644,8 +660,10 @@ def infer_features(self) -> None:
f"Could not infer Features for the feature view '{self.name}'.",
)

def _construct_random_input(self) -> dict[str, list[Any]]:
rand_dict_value: dict[ValueType, list[Any]] = {
def _construct_random_input(
self, singleton: bool = False
) -> dict[str, Union[list[Any], Any]]:
rand_dict_value: dict[ValueType, Union[list[Any], Any]] = {
ValueType.BYTES: [str.encode("hello world")],
ValueType.STRING: ["hello world"],
ValueType.INT32: [1],
Expand All @@ -663,20 +681,25 @@ def _construct_random_input(self) -> dict[str, list[Any]]:
ValueType.BOOL_LIST: [[True]],
ValueType.UNIX_TIMESTAMP_LIST: [[_utc_now()]],
}
if singleton:
rand_dict_value = {k: rand_dict_value[k][0] for k in rand_dict_value}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robhowley and here


rand_missing_value = [None] if singleton else None
feature_dict = {}
for feature_view_projection in self.source_feature_view_projections.values():
for feature in feature_view_projection.features:
feature_dict[f"{feature_view_projection.name}__{feature.name}"] = (
rand_dict_value.get(feature.dtype.to_value_type(), [None])
rand_dict_value.get(
feature.dtype.to_value_type(), rand_missing_value
)
)
feature_dict[f"{feature.name}"] = rand_dict_value.get(
feature.dtype.to_value_type(), [None]
feature.dtype.to_value_type(), rand_missing_value
)
for request_data in self.source_request_sources.values():
for field in request_data.schema:
feature_dict[f"{field.name}"] = rand_dict_value.get(
field.dtype.to_value_type(), [None]
field.dtype.to_value_type(), rand_missing_value
)

return feature_dict
Expand Down Expand Up @@ -713,6 +736,7 @@ def on_demand_feature_view(
tags: Optional[dict[str, str]] = None,
owner: str = "",
write_to_online_store: bool = False,
singleton: bool = False,
):
"""
Creates an OnDemandFeatureView object with the given user function as udf.
Expand All @@ -731,6 +755,8 @@ def on_demand_feature_view(
of the primary maintainer.
write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to
the online store for faster retrieval.
singleton (optional): A boolean that indicates whether the transformation is executed on a singleton
(only applicable when mode="python").
"""

def mainify(obj) -> None:
Expand Down Expand Up @@ -775,6 +801,7 @@ def decorator(user_function):
owner=owner,
write_to_online_store=write_to_online_store,
entities=entities,
singleton=singleton,
)
functools.update_wrapper(
wrapper=on_demand_feature_view_obj, wrapped=user_function
Expand Down
24 changes: 12 additions & 12 deletions sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class OnDemandFeatureViewSpec(google.protobuf.message.Message):
WRITE_TO_ONLINE_STORE_FIELD_NUMBER: builtins.int
ENTITIES_FIELD_NUMBER: builtins.int
ENTITY_COLUMNS_FIELD_NUMBER: builtins.int
SINGLETON_FIELD_NUMBER: builtins.int
name: builtins.str
"""Name of the feature view. Must be unique. Not updated."""
project: builtins.str
Expand Down Expand Up @@ -137,6 +138,7 @@ class OnDemandFeatureViewSpec(google.protobuf.message.Message):
@property
def entity_columns(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[feast.core.Feature_pb2.FeatureSpecV2]:
"""List of specifications for each entity defined as part of this feature view."""
singleton: builtins.bool
def __init__(
self,
*,
Expand All @@ -153,9 +155,10 @@ class OnDemandFeatureViewSpec(google.protobuf.message.Message):
write_to_online_store: builtins.bool = ...,
entities: collections.abc.Iterable[builtins.str] | None = ...,
entity_columns: collections.abc.Iterable[feast.core.Feature_pb2.FeatureSpecV2] | None = ...,
singleton: builtins.bool = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["feature_transformation", b"feature_transformation", "user_defined_function", b"user_defined_function"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["description", b"description", "entities", b"entities", "entity_columns", b"entity_columns", "feature_transformation", b"feature_transformation", "features", b"features", "mode", b"mode", "name", b"name", "owner", b"owner", "project", b"project", "sources", b"sources", "tags", b"tags", "user_defined_function", b"user_defined_function", "write_to_online_store", b"write_to_online_store"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["description", b"description", "entities", b"entities", "entity_columns", b"entity_columns", "feature_transformation", b"feature_transformation", "features", b"features", "mode", b"mode", "name", b"name", "owner", b"owner", "project", b"project", "singleton", b"singleton", "sources", b"sources", "tags", b"tags", "user_defined_function", b"user_defined_function", "write_to_online_store", b"write_to_online_store"]) -> None: ...

global___OnDemandFeatureViewSpec = OnDemandFeatureViewSpec

Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/transformation/pandas_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def transform_arrow(
def transform(self, input_df: pd.DataFrame) -> pd.DataFrame:
return self.udf(input_df)

def transform_singleton(self, input_df: pd.DataFrame) -> pd.DataFrame:
raise ValueError(
"PandasTransformation does not support singleton transformations."
)

def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
df = pd.DataFrame.from_dict(random_input)
output_df: pd.DataFrame = self.transform(df)
Expand Down
33 changes: 24 additions & 9 deletions sdk/python/feast/transformation/python_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,39 @@ def transform(self, input_dict: dict) -> dict:
output_dict = self.udf.__call__(input_dict)
return {**input_dict, **output_dict}

def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
output_dict: dict[str, list[Any]] = self.transform(random_input)
def transform_singleton(self, input_dict: dict) -> dict:
# This flattens the list of elements to extract the first one
# in the case of a singleton element, it takes the value directly
# in the case of a list of lists, it takes the first list
input_dict = {k: v[0] for k, v in input_dict.items()}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v[0] might cause error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't but I'll add a test for it

output_dict = self.udf.__call__(input_dict)
return {**input_dict, **output_dict}

def infer_features(self, random_input: dict[str, Any]) -> list[Field]:
output_dict: dict[str, Any] = self.transform(random_input)

fields = []
for feature_name, feature_value in output_dict.items():
if len(feature_value) <= 0:
raise TypeError(
f"Failed to infer type for feature '{feature_name}' with value "
+ f"'{feature_value}' since no items were returned by the UDF."
)
if isinstance(feature_value, list):
if len(feature_value) <= 0:
raise TypeError(
f"Failed to infer type for feature '{feature_name}' with value "
+ f"'{feature_value}' since no items were returned by the UDF."
)
inferred_type = type(feature_value[0])
inferred_value = feature_value[0]
else:
inferred_type = type(feature_value)
inferred_value = feature_value

fields.append(
Field(
name=feature_name,
dtype=from_value_type(
python_type_to_feast_value_type(
feature_name,
value=feature_value[0],
type_name=type(feature_value[0]).__name__,
value=inferred_value,
type_name=inferred_type.__name__,
)
),
)
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/transformation/substrait_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def table_provider(names, schema: pyarrow.Schema):
).read_all()
return table.to_pandas()

def transform_singleton(self, input_df: pd.DataFrame) -> pd.DataFrame:
raise ValueError(
"SubstraitTransform does not support singleton transformations."
)

def transform_ibis(self, table):
return self.ibis_function(table)

Expand Down
Loading
Loading