Skip to content

Commit

Permalink
feat: enable feature store online serving (#918)
Browse files Browse the repository at this point in the history
- [x] Added `_instantiate_featurestore_online_client`, and `read` method in `EntityType` class to enable feature store online serving
- [x] Enabled construct Pandas DataFrame for online serving result
- [x] Added unit tests
- [x] Added integration tests
  • Loading branch information
morgandu authored Jan 15, 2022
1 parent dfd2f8a commit b8f5f82
Show file tree
Hide file tree
Showing 5 changed files with 787 additions and 12 deletions.
154 changes: 151 additions & 3 deletions google/cloud/aiplatform/featurestore/entity_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
from google.cloud.aiplatform import base
from google.cloud.aiplatform.compat.types import (
entity_type as gca_entity_type,
feature_selector as gca_feature_selector,
featurestore_service as gca_featurestore_service,
featurestore_online_service as gca_featurestore_online_service,
io as gca_io,
)
from google.cloud.aiplatform import featurestore
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform import utils
from google.cloud.aiplatform.utils import featurestore_utils


_LOGGER = base.Logger(__name__)
_ALL_FEATURE_IDS = "*"

Expand All @@ -40,7 +44,6 @@ class EntityType(base.VertexAiResourceNounWithFutureManager):

client_class = utils.FeaturestoreClientWithOverride

_is_client_prediction_client = False
_resource_noun = "entityTypes"
_getter_method = "get_entity_type"
_list_method = "list_entity_types"
Expand Down Expand Up @@ -114,6 +117,10 @@ def __init__(
else featurestore_id,
)

self._featurestore_online_client = self._instantiate_featurestore_online_client(
location=self.location, credentials=credentials,
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this EntityType is."""
Expand Down Expand Up @@ -157,7 +164,7 @@ def update(
self,
description: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
request_metadata: Sequence[Tuple[str, str]] = (),
) -> "EntityType":
"""Updates an existing managed entityType resource.
Expand Down Expand Up @@ -189,7 +196,7 @@ def update(
System reserved label keys are prefixed with
"aiplatform.googleapis.com/" and are immutable.
request_metadata (Sequence[Tuple[str, str]]):
Optional. Strings which should be sent along with the request as metadata.
Required. Strings which should be sent along with the request as metadata.
Returns:
EntityType - The updated entityType resource object.
"""
Expand Down Expand Up @@ -1138,3 +1145,144 @@ def ingest_from_gcs(
import_feature_values_request=import_feature_values_request,
request_metadata=request_metadata,
)

@staticmethod
def _instantiate_featurestore_online_client(
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
) -> utils.FeaturestoreOnlineServingClientWithOverride:
"""Helper method to instantiates featurestore online client.
Args:
location (str): The location of this featurestore.
credentials (google.auth.credentials.Credentials):
Optional custom credentials to use when interacting with
the featurestore online client.
Returns:
utils.FeaturestoreOnlineServingClientWithOverride:
Initialized featurestore online client with optional overrides.
"""
return initializer.global_config.create_client(
client_class=utils.FeaturestoreOnlineServingClientWithOverride,
credentials=credentials,
location_override=location,
)

def read(
self,
entity_ids: Union[str, List[str]],
feature_ids: Union[str, List[str]] = "*",
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
) -> "pd.DataFrame": # noqa: F821 - skip check for undefined name 'pd'
"""Reads feature values for given feature IDs of given entity IDs in this EntityType.
Args:
entity_ids (Union[str, List[str]]):
Required. ID for a specific entity, or a list of IDs of entities
to read Feature values of. The maximum number of IDs is 100 if a list.
feature_ids (Union[str, List[str]]):
Required. ID for a specific feature, or a list of IDs of Features in the EntityType
for reading feature values. Default to "*", where value of all features will be read.
request_metadata (Sequence[Tuple[str, str]]):
Optional. Strings which should be sent along with the request as metadata.
Returns:
pd.DataFrame: entities' feature values in DataFrame
"""

if isinstance(feature_ids, str):
feature_ids = [feature_ids]

feature_selector = gca_feature_selector.FeatureSelector(
id_matcher=gca_feature_selector.IdMatcher(ids=feature_ids)
)

if isinstance(entity_ids, str):
read_feature_values_request = gca_featurestore_online_service.ReadFeatureValuesRequest(
entity_type=self.resource_name,
entity_id=entity_ids,
feature_selector=feature_selector,
)
read_feature_values_response = self._featurestore_online_client.read_feature_values(
request=read_feature_values_request, metadata=request_metadata
)
header = read_feature_values_response.header
entity_views = [read_feature_values_response.entity_view]
elif isinstance(entity_ids, list):
streaming_read_feature_values_request = gca_featurestore_online_service.StreamingReadFeatureValuesRequest(
entity_type=self.resource_name,
entity_ids=entity_ids,
feature_selector=feature_selector,
)
streaming_read_feature_values_responses = [
response
for response in self._featurestore_online_client.streaming_read_feature_values(
request=streaming_read_feature_values_request,
metadata=request_metadata,
)
]
header = streaming_read_feature_values_responses[0].header
entity_views = [
response.entity_view
for response in streaming_read_feature_values_responses[1:]
]

feature_ids = [
feature_descriptor.id for feature_descriptor in header.feature_descriptors
]

return EntityType._construct_dataframe(
feature_ids=feature_ids, entity_views=entity_views,
)

@staticmethod
def _construct_dataframe(
feature_ids: List[str],
entity_views: List[
gca_featurestore_online_service.ReadFeatureValuesResponse.EntityView
],
) -> "pd.DataFrame": # noqa: F821 - skip check for undefined name 'pd'
"""Constructs a dataframe using the header and entity_views
Args:
feature_ids (List[str]):
Required. A list of feature ids corresponding to the feature values for each entity in entity_views.
entity_views (List[gca_featurestore_online_service.ReadFeatureValuesResponse.EntityView]):
Required. A list of Entity views with Feature values.
For each Entity view, it may be
the entity in the Featurestore if values for all
Features were requested, or a projection of the
entity in the Featurestore if values for only
some Features were requested.
Raises:
ImportError: If pandas is not installed when using this method.
Returns:
pd.DataFrame - entities feature values in DataFrame
)
"""

try:
import pandas as pd
except ImportError:
raise ImportError(
f"Pandas is not installed. Please install pandas to use "
f"{EntityType._construct_dataframe.__name__}"
)

data = []
for entity_view in entity_views:
entity_data = {"entity_id": entity_view.entity_id}
for feature_id, feature_data in zip(feature_ids, entity_view.data):
if feature_data._pb.HasField("value"):
value_type = feature_data.value._pb.WhichOneof("value")
feature_value = getattr(feature_data.value, value_type)
if hasattr(feature_value, "values"):
feature_value = feature_value.values
entity_data[feature_id] = feature_value
else:
entity_data[feature_id] = None
data.append(entity_data)

return pd.DataFrame(data=data, columns=["entity_id"] + feature_ids)
1 change: 0 additions & 1 deletion google/cloud/aiplatform/featurestore/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class Feature(base.VertexAiResourceNounWithFutureManager):

client_class = utils.FeaturestoreClientWithOverride

_is_client_prediction_client = False
_resource_noun = "features"
_getter_method = "get_feature"
_list_method = "list_features"
Expand Down
1 change: 0 additions & 1 deletion google/cloud/aiplatform/featurestore/featurestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class Featurestore(base.VertexAiResourceNounWithFutureManager):

client_class = utils.FeaturestoreClientWithOverride

_is_client_prediction_client = False
_resource_noun = "featurestores"
_getter_method = "get_featurestore"
_list_method = "list_featurestores"
Expand Down
18 changes: 18 additions & 0 deletions tests/system/aiplatform/test_featurestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from google.cloud import aiplatform
from tests.system.aiplatform import e2e_base

import pandas as pd

_TEST_USERS_ENTITY_TYPE_GCS_SRC = (
"gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/users.avro"
)
Expand Down Expand Up @@ -247,3 +249,19 @@ def test_search_features(self, shared_state):
assert (
len(list_searched_features) - shared_state["base_list_searched_features"]
) == 6

def test_online_reads(self, shared_state):
assert shared_state["user_entity_type"]
assert shared_state["movie_entity_type"]

user_entity_type = shared_state["user_entity_type"]
movie_entity_type = shared_state["movie_entity_type"]

user_entity_views = user_entity_type.read(entity_ids="alice")
assert type(user_entity_views) == pd.DataFrame

movie_entity_views = movie_entity_type.read(
entity_ids=["movie_01", "movie_04"],
feature_ids=[_TEST_MOVIE_TITLE_FEATURE_ID, _TEST_MOVIE_GENRES_FEATURE_ID],
)
assert type(movie_entity_views) == pd.DataFrame
Loading

0 comments on commit b8f5f82

Please sign in to comment.