Skip to content

Commit

Permalink
fix: optional_sync
Browse files Browse the repository at this point in the history
  • Loading branch information
morgandu committed Feb 4, 2022
1 parent 472de1e commit 290b07d
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 30 deletions.
46 changes: 34 additions & 12 deletions google/cloud/aiplatform/featurestore/entity_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,21 @@ def __init__(
location=self.location, credentials=credentials,
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this EntityType is."""
self.wait()
def _get_featurestore_name(self) -> str:
"""Gets full qualified resource name of the managed featurestore in which this EntityType is."""
entity_type_name_components = self._parse_resource_name(self.resource_name)
return featurestore.Featurestore._format_resource_name(
project=entity_type_name_components["project"],
location=entity_type_name_components["location"],
featurestore=entity_type_name_components["featurestore"],
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this EntityType is."""
self.wait()
return self._get_featurestore_name()

def get_featurestore(self) -> "featurestore.Featurestore":
"""Retrieves the managed featurestore in which this EntityType is.
Expand All @@ -142,7 +146,7 @@ def get_featurestore(self) -> "featurestore.Featurestore":
"""
return featurestore.Featurestore(self.featurestore_name)

def get_feature(self, feature_id: str) -> "featurestore.Feature":
def _get_feature(self, feature_id: str) -> "featurestore.Feature":
"""Retrieves an existing managed feature in this EntityType.
Args:
Expand All @@ -151,9 +155,7 @@ def get_feature(self, feature_id: str) -> "featurestore.Feature":
Returns:
featurestore.Feature - The managed feature resource object.
"""
self.wait()
entity_type_name_components = self._parse_resource_name(self.resource_name)

return featurestore.Feature(
feature_name=featurestore.Feature._format_resource_name(
project=entity_type_name_components["project"],
Expand All @@ -164,6 +166,18 @@ def get_feature(self, feature_id: str) -> "featurestore.Feature":
)
)

def get_feature(self, feature_id: str) -> "featurestore.Feature":
"""Retrieves an existing managed feature in this EntityType.
Args:
feature_id (str):
Required. The managed feature resource ID in this EntityType.
Returns:
featurestore.Feature - The managed feature resource object.
"""
self.wait()
return self._get_feature(feature_id=feature_id)

def update(
self,
description: Optional[str] = None,
Expand Down Expand Up @@ -204,6 +218,7 @@ def update(
Returns:
EntityType - The updated entityType resource object.
"""
self.wait()
update_mask = list()

if description:
Expand Down Expand Up @@ -382,6 +397,7 @@ def list_features(
Returns:
List[featurestore.Feature] - A list of managed feature resource objects.
"""
self.wait()
return featurestore.Feature.list(
entity_type_name=self.resource_name, filter=filter, order_by=order_by,
)
Expand All @@ -401,7 +417,7 @@ def delete_features(self, feature_ids: List[str], sync: bool = True,) -> None:
"""
features = []
for feature_id in feature_ids:
feature = self.get_feature(feature_id=feature_id)
feature = self._get_feature(feature_id=feature_id)
feature.delete(sync=False)
features.append(feature)

Expand Down Expand Up @@ -628,6 +644,7 @@ def create_feature(
featurestore.Feature - feature resource object
"""
self.wait()
return featurestore.Feature.create(
feature_id=feature_id,
value_type=value_type,
Expand Down Expand Up @@ -763,8 +780,9 @@ def batch_create_features(

return self

@staticmethod
def _validate_and_get_import_feature_values_request(
self,
entity_type_name: str,
feature_ids: List[str],
feature_time: Union[str, datetime.datetime],
data_source: Union[gca_io.AvroSource, gca_io.BigQuerySource, gca_io.CsvSource],
Expand All @@ -775,6 +793,8 @@ def _validate_and_get_import_feature_values_request(
) -> gca_featurestore_service.ImportFeatureValuesRequest:
"""Validates and get import feature values request.
Args:
entity_type_name (str):
Required. A fully-qualified entityType resource name.
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
Expand Down Expand Up @@ -842,7 +862,7 @@ def _validate_and_get_import_feature_values_request(
]

import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest(
entity_type=self.resource_name,
entity_type=entity_type_name,
feature_specs=feature_specs,
entity_id_field=entity_id_field,
disable_online_serving=disable_online_serving,
Expand Down Expand Up @@ -994,6 +1014,7 @@ def ingest_from_bq(
bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri)

import_feature_values_request = self._validate_and_get_import_feature_values_request(
entity_type_name=self.resource_name,
feature_ids=feature_ids,
feature_time=feature_time,
data_source=bigquery_source,
Expand Down Expand Up @@ -1116,6 +1137,7 @@ def ingest_from_gcs(
data_source = gca_io.AvroSource(gcs_source=gcs_source)

import_feature_values_request = self._validate_and_get_import_feature_values_request(
entity_type_name=self.resource_name,
feature_ids=feature_ids,
feature_time=feature_time,
data_source=data_source,
Expand Down Expand Up @@ -1302,7 +1324,7 @@ def read(
Returns:
pd.DataFrame: entities' feature values in DataFrame
"""

self.wait()
if isinstance(feature_ids, str):
feature_ids = [feature_ids]

Expand Down Expand Up @@ -1344,7 +1366,7 @@ def read(
feature_descriptor.id for feature_descriptor in header.feature_descriptors
]

return EntityType._construct_dataframe(
return self._construct_dataframe(
feature_ids=feature_ids, entity_views=entity_views,
)

Expand Down
27 changes: 17 additions & 10 deletions google/cloud/aiplatform/featurestore/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,21 @@ def __init__(
else featurestore_id,
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this Feature is."""
self.wait()
def _get_featurestore_name(self) -> str:
"""Gets full qualified resource name of the managed featurestore in which this Feature is."""
feature_path_components = self._parse_resource_name(self.resource_name)

return featurestore.Featurestore._format_resource_name(
project=feature_path_components["project"],
location=feature_path_components["location"],
featurestore=feature_path_components["featurestore"],
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this Feature is."""
self.wait()
return self._get_featurestore_name()

def get_featurestore(self) -> "featurestore.Featurestore":
"""Retrieves the managed featurestore in which this Feature is.
Expand All @@ -142,19 +145,22 @@ def get_featurestore(self) -> "featurestore.Featurestore":
"""
return featurestore.Featurestore(featurestore_name=self.featurestore_name)

@property
def entity_type_name(self) -> str:
"""Full qualified resource name of the managed entityType in which this Feature is."""
self.wait()
def _get_entity_type_name(self) -> str:
"""Gets full qualified resource name of the managed entityType in which this Feature is."""
feature_path_components = self._parse_resource_name(self.resource_name)

return featurestore.EntityType._format_resource_name(
project=feature_path_components["project"],
location=feature_path_components["location"],
featurestore=feature_path_components["featurestore"],
entity_type=feature_path_components["entity_type"],
)

@property
def entity_type_name(self) -> str:
"""Full qualified resource name of the managed entityType in which this Feature is."""
self.wait()
return self._get_entity_type_name()

def get_entity_type(self) -> "featurestore.EntityType":
"""Retrieves the managed entityType in which this Feature is.
Expand Down Expand Up @@ -205,6 +211,7 @@ def update(
Returns:
Feature - The updated feature resource object.
"""
self.wait()
update_mask = list()

if description:
Expand Down
31 changes: 24 additions & 7 deletions google/cloud/aiplatform/featurestore/featurestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,18 @@ def get_entity_type(self, entity_type_id: str) -> "featurestore.EntityType":
featurestore.EntityType - The managed entityType resource object.
"""
self.wait()
featurestore_name_components = self._parse_resource_name(self.resource_name)
return self._get_entity_type(entity_type_id=entity_type_id)

def _get_entity_type(self, entity_type_id: str) -> "featurestore.EntityType":
"""Retrieves an existing managed entityType in this Featurestore.
Args:
entity_type_id (str):
Required. The managed entityType resource ID in this Featurestore.
Returns:
featurestore.EntityType - The managed entityType resource object.
"""
featurestore_name_components = self._parse_resource_name(self.resource_name)
return featurestore.EntityType(
entity_type_name=featurestore.EntityType._format_resource_name(
project=featurestore_name_components["project"],
Expand Down Expand Up @@ -225,6 +235,7 @@ def _update(
Returns:
Featurestore - The updated featurestore resource object.
"""
self.wait()
update_mask = list()

if labels:
Expand Down Expand Up @@ -314,6 +325,7 @@ def list_entity_types(
Returns:
List[featurestore.EntityType] - A list of managed entityType resource objects.
"""
self.wait()
return featurestore.EntityType.list(
featurestore_name=self.resource_name, filter=filter, order_by=order_by,
)
Expand All @@ -338,7 +350,7 @@ def delete_entity_types(
"""
entity_types = []
for entity_type_id in entity_type_ids:
entity_type = self.get_entity_type(entity_type_id=entity_type_id)
entity_type = self._get_entity_type(entity_type_id=entity_type_id)
entity_type.delete(force=force, sync=False)
entity_types.append(entity_type)

Expand Down Expand Up @@ -551,6 +563,7 @@ def create_entity_type(
featurestore.EntityType - EntityType resource object
"""
self.wait()
return featurestore.EntityType.create(
entity_type_id=entity_type_id,
featurestore_name=self.resource_name,
Expand Down Expand Up @@ -595,8 +608,9 @@ def _batch_read_feature_values(

return self

@staticmethod
def _validate_and_get_read_instances(
self, read_instances_uri: str,
read_instances_uri: str,
) -> Union[gca_io.BigQuerySource, gca_io.CsvSource]:
"""Gets read_instances
Expand Down Expand Up @@ -629,6 +643,7 @@ def _validate_and_get_read_instances(

def _validate_and_get_batch_read_feature_values_request(
self,
featurestore_name: str,
serving_feature_ids: Dict[str, List[str]],
destination: Union[
gca_io.BigQueryDestination,
Expand All @@ -642,6 +657,8 @@ def _validate_and_get_batch_read_feature_values_request(
"""Validates and gets batch_read_feature_values_request
Args:
featurestore_name (str):
Required. A fully-qualified featurestore resource name.
serving_feature_ids (Dict[str, List[str]]):
Required. A user defined dictionary to define the entity_types and their features for batch serve/read.
The keys of the dictionary are the serving entity_type ids and
Expand Down Expand Up @@ -680,9 +697,7 @@ def _validate_and_get_batch_read_feature_values_request(
Returns:
gca_featurestore_service.BatchReadFeatureValuesRequest: batch read feature values request
"""

self.wait()
featurestore_name_components = self._parse_resource_name(self.resource_name)
featurestore_name_components = self._parse_resource_name(featurestore_name)

feature_destination_fields = feature_destination_fields or {}

Expand Down Expand Up @@ -720,7 +735,7 @@ def _validate_and_get_batch_read_feature_values_request(
entity_type_specs.append(entity_type_spec)

batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest(
featurestore=self.resource_name, entity_type_specs=entity_type_specs,
featurestore=featurestore_name, entity_type_specs=entity_type_specs,
)

if isinstance(destination, gca_io.BigQueryDestination):
Expand Down Expand Up @@ -843,6 +858,7 @@ def batch_serve_to_bq(
read_instances = self._validate_and_get_read_instances(read_instances_uri)

batch_read_feature_values_request = self._validate_and_get_batch_read_feature_values_request(
featurestore_name=self.resource_name,
serving_feature_ids=serving_feature_ids,
destination=gca_io.BigQueryDestination(
output_uri=bq_destination_output_uri
Expand Down Expand Up @@ -989,6 +1005,7 @@ def batch_serve_to_gcs(
read_instances = self._validate_and_get_read_instances(read_instances_uri)

batch_read_feature_values_request = self._validate_and_get_batch_read_feature_values_request(
featurestore_name=self.resource_name,
serving_feature_ids=serving_feature_ids,
destination=destination,
feature_destination_fields=feature_destination_fields,
Expand Down
5 changes: 4 additions & 1 deletion tests/unit/aiplatform/test_featurestores.py
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,7 @@ def test_validate_and_get_batch_read_feature_values_request(
featurestore_name=_TEST_FEATURESTORE_NAME
)
expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest(
featurestore=my_featurestore.resource_name,
featurestore=_TEST_FEATURESTORE_NAME,
destination=gca_featurestore_service.FeatureValueDestination(
bigquery_destination=_TEST_BQ_DESTINATION,
),
Expand All @@ -1107,6 +1107,7 @@ def test_validate_and_get_batch_read_feature_values_request(
assert (
expected_batch_read_feature_values_request
== my_featurestore._validate_and_get_batch_read_feature_values_request(
featurestore_name=my_featurestore.resource_name,
serving_feature_ids=serving_feature_ids,
destination=_TEST_BQ_DESTINATION,
read_instances=_TEST_BQ_SOURCE,
Expand Down Expand Up @@ -1560,6 +1561,7 @@ def test_validate_and_get_import_feature_values_request_with_source_fields(self)
assert (
true_import_feature_values_request
== my_entity_type._validate_and_get_import_feature_values_request(
entity_type_name=my_entity_type.resource_name,
feature_ids=_TEST_IMPORTING_FEATURE_IDS,
feature_time=_TEST_FEATURE_TIME_FIELD,
data_source=_TEST_BQ_SOURCE,
Expand All @@ -1586,6 +1588,7 @@ def test_validate_and_get_import_feature_values_request_without_source_fields(se
assert (
true_import_feature_values_request
== my_entity_type._validate_and_get_import_feature_values_request(
entity_type_name=my_entity_type.resource_name,
feature_ids=_TEST_IMPORTING_FEATURE_IDS,
feature_time=_TEST_FEATURE_TIME,
data_source=_TEST_CSV_SOURCE,
Expand Down

0 comments on commit 290b07d

Please sign in to comment.