Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable feature store batch serve to Pandas DataFrame; fix: read instances uri for batch serve #983

Merged
47 changes: 37 additions & 10 deletions google/cloud/aiplatform/featurestore/entity_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,21 @@ def __init__(
location=self.location, credentials=credentials,
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this EntityType is."""
def _get_featurestore_name(self) -> str:
"""Gets full qualified resource name of the managed featurestore in which this EntityType is."""
entity_type_name_components = self._parse_resource_name(self.resource_name)
return featurestore.Featurestore._format_resource_name(
project=entity_type_name_components["project"],
location=entity_type_name_components["location"],
featurestore=entity_type_name_components["featurestore"],
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this EntityType is."""
self.wait()
return self._get_featurestore_name()

def get_featurestore(self) -> "featurestore.Featurestore":
"""Retrieves the managed featurestore in which this EntityType is.

Expand All @@ -141,7 +146,7 @@ def get_featurestore(self) -> "featurestore.Featurestore":
"""
return featurestore.Featurestore(self.featurestore_name)

def get_feature(self, feature_id: str) -> "featurestore.Feature":
def _get_feature(self, feature_id: str) -> "featurestore.Feature":
"""Retrieves an existing managed feature in this EntityType.

Args:
Expand All @@ -151,7 +156,6 @@ def get_feature(self, feature_id: str) -> "featurestore.Feature":
featurestore.Feature - The managed feature resource object.
"""
entity_type_name_components = self._parse_resource_name(self.resource_name)

return featurestore.Feature(
feature_name=featurestore.Feature._format_resource_name(
project=entity_type_name_components["project"],
Expand All @@ -162,6 +166,18 @@ def get_feature(self, feature_id: str) -> "featurestore.Feature":
)
)

def get_feature(self, feature_id: str) -> "featurestore.Feature":
"""Retrieves an existing managed feature in this EntityType.

Args:
feature_id (str):
Required. The managed feature resource ID in this EntityType.
Returns:
featurestore.Feature - The managed feature resource object.
"""
self.wait()
return self._get_feature(feature_id=feature_id)

def update(
self,
description: Optional[str] = None,
Expand Down Expand Up @@ -202,6 +218,7 @@ def update(
Returns:
EntityType - The updated entityType resource object.
"""
self.wait()
update_mask = list()

if description:
Expand Down Expand Up @@ -380,6 +397,7 @@ def list_features(
Returns:
List[featurestore.Feature] - A list of managed feature resource objects.
"""
self.wait()
return featurestore.Feature.list(
entity_type_name=self.resource_name, filter=filter, order_by=order_by,
)
Expand All @@ -399,7 +417,7 @@ def delete_features(self, feature_ids: List[str], sync: bool = True,) -> None:
"""
features = []
for feature_id in feature_ids:
feature = self.get_feature(feature_id=feature_id)
feature = self._get_feature(feature_id=feature_id)
feature.delete(sync=False)
features.append(feature)

Expand Down Expand Up @@ -626,6 +644,7 @@ def create_feature(
featurestore.Feature - feature resource object

"""
self.wait()
return featurestore.Feature.create(
feature_id=feature_id,
value_type=value_type,
Expand Down Expand Up @@ -761,8 +780,9 @@ def batch_create_features(

return self

@staticmethod
def _validate_and_get_import_feature_values_request(
self,
entity_type_name: str,
feature_ids: List[str],
feature_time: Union[str, datetime.datetime],
data_source: Union[gca_io.AvroSource, gca_io.BigQuerySource, gca_io.CsvSource],
Expand All @@ -773,6 +793,8 @@ def _validate_and_get_import_feature_values_request(
) -> gca_featurestore_service.ImportFeatureValuesRequest:
"""Validates and get import feature values request.
Args:
entity_type_name (str):
Required. A fully-qualified entityType resource name.
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
Expand Down Expand Up @@ -840,7 +862,7 @@ def _validate_and_get_import_feature_values_request(
]

import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest(
entity_type=self.resource_name,
entity_type=entity_type_name,
feature_specs=feature_specs,
entity_id_field=entity_id_field,
disable_online_serving=disable_online_serving,
Expand Down Expand Up @@ -992,6 +1014,7 @@ def ingest_from_bq(
bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri)

import_feature_values_request = self._validate_and_get_import_feature_values_request(
entity_type_name=self.resource_name,
feature_ids=feature_ids,
feature_time=feature_time,
data_source=bigquery_source,
Expand Down Expand Up @@ -1114,6 +1137,7 @@ def ingest_from_gcs(
data_source = gca_io.AvroSource(gcs_source=gcs_source)

import_feature_values_request = self._validate_and_get_import_feature_values_request(
entity_type_name=self.resource_name,
feature_ids=feature_ids,
feature_time=feature_time,
data_source=data_source,
Expand Down Expand Up @@ -1213,6 +1237,7 @@ def ingest_from_df(
project=self.project, credentials=self.credentials
)

self.wait()
entity_type_name_components = self._parse_resource_name(self.resource_name)
featurestore_id, entity_type_id = (
entity_type_name_components["featurestore"],
Expand All @@ -1222,6 +1247,8 @@ def ingest_from_df(
temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace(
"-", "_"
)

# TODO(b/216497263): Add support for resource project does not match initializer.global_config.project
temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[
:1024
]
Expand Down Expand Up @@ -1297,7 +1324,7 @@ def read(
Returns:
pd.DataFrame: entities' feature values in DataFrame
"""

self.wait()
if isinstance(feature_ids, str):
feature_ids = [feature_ids]

Expand Down Expand Up @@ -1339,7 +1366,7 @@ def read(
feature_descriptor.id for feature_descriptor in header.feature_descriptors
]

return EntityType._construct_dataframe(
return self._construct_dataframe(
feature_ids=feature_ids, entity_views=entity_views,
)

Expand Down
25 changes: 17 additions & 8 deletions google/cloud/aiplatform/featurestore/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,21 @@ def __init__(
else featurestore_id,
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this Feature is."""
def _get_featurestore_name(self) -> str:
"""Gets full qualified resource name of the managed featurestore in which this Feature is."""
feature_path_components = self._parse_resource_name(self.resource_name)

return featurestore.Featurestore._format_resource_name(
project=feature_path_components["project"],
location=feature_path_components["location"],
featurestore=feature_path_components["featurestore"],
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this Feature is."""
self.wait()
return self._get_featurestore_name()

def get_featurestore(self) -> "featurestore.Featurestore":
"""Retrieves the managed featurestore in which this Feature is.

Expand All @@ -141,18 +145,22 @@ def get_featurestore(self) -> "featurestore.Featurestore":
"""
return featurestore.Featurestore(featurestore_name=self.featurestore_name)

@property
def entity_type_name(self) -> str:
"""Full qualified resource name of the managed entityType in which this Feature is."""
def _get_entity_type_name(self) -> str:
"""Gets full qualified resource name of the managed entityType in which this Feature is."""
feature_path_components = self._parse_resource_name(self.resource_name)

return featurestore.EntityType._format_resource_name(
project=feature_path_components["project"],
location=feature_path_components["location"],
featurestore=feature_path_components["featurestore"],
entity_type=feature_path_components["entity_type"],
)

@property
def entity_type_name(self) -> str:
"""Full qualified resource name of the managed entityType in which this Feature is."""
self.wait()
return self._get_entity_type_name()

def get_entity_type(self) -> "featurestore.EntityType":
"""Retrieves the managed entityType in which this Feature is.

Expand Down Expand Up @@ -203,6 +211,7 @@ def update(
Returns:
Feature - The updated feature resource object.
"""
self.wait()
update_mask = list()

if description:
Expand Down
Loading