From 290b07d70c0a01300e79507746a06a9c0dcb8d10 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Thu, 3 Feb 2022 21:17:27 -0800 Subject: [PATCH] fix: optional_sync --- .../aiplatform/featurestore/entity_type.py | 46 ++++++++++++++----- .../cloud/aiplatform/featurestore/feature.py | 27 +++++++---- .../aiplatform/featurestore/featurestore.py | 31 ++++++++++--- tests/unit/aiplatform/test_featurestores.py | 5 +- 4 files changed, 79 insertions(+), 30 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index 67b5b9d4ab..274f89d2aa 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -123,10 +123,8 @@ def __init__( location=self.location, credentials=credentials, ) - @property - def featurestore_name(self) -> str: - """Full qualified resource name of the managed featurestore in which this EntityType is.""" - self.wait() + def _get_featurestore_name(self) -> str: + """Gets full qualified resource name of the managed featurestore in which this EntityType is.""" entity_type_name_components = self._parse_resource_name(self.resource_name) return featurestore.Featurestore._format_resource_name( project=entity_type_name_components["project"], @@ -134,6 +132,12 @@ def featurestore_name(self) -> str: featurestore=entity_type_name_components["featurestore"], ) + @property + def featurestore_name(self) -> str: + """Full qualified resource name of the managed featurestore in which this EntityType is.""" + self.wait() + return self._get_featurestore_name() + def get_featurestore(self) -> "featurestore.Featurestore": """Retrieves the managed featurestore in which this EntityType is. @@ -142,7 +146,7 @@ def get_featurestore(self) -> "featurestore.Featurestore": """ return featurestore.Featurestore(self.featurestore_name) - def get_feature(self, feature_id: str) -> "featurestore.Feature": + def _get_feature(self, feature_id: str) -> "featurestore.Feature": """Retrieves an existing managed feature in this EntityType. Args: @@ -151,9 +155,7 @@ def get_feature(self, feature_id: str) -> "featurestore.Feature": Returns: featurestore.Feature - The managed feature resource object. """ - self.wait() entity_type_name_components = self._parse_resource_name(self.resource_name) - return featurestore.Feature( feature_name=featurestore.Feature._format_resource_name( project=entity_type_name_components["project"], @@ -164,6 +166,18 @@ def get_feature(self, feature_id: str) -> "featurestore.Feature": ) ) + def get_feature(self, feature_id: str) -> "featurestore.Feature": + """Retrieves an existing managed feature in this EntityType. + + Args: + feature_id (str): + Required. The managed feature resource ID in this EntityType. + Returns: + featurestore.Feature - The managed feature resource object. + """ + self.wait() + return self._get_feature(feature_id=feature_id) + def update( self, description: Optional[str] = None, @@ -204,6 +218,7 @@ def update( Returns: EntityType - The updated entityType resource object. """ + self.wait() update_mask = list() if description: @@ -382,6 +397,7 @@ def list_features( Returns: List[featurestore.Feature] - A list of managed feature resource objects. """ + self.wait() return featurestore.Feature.list( entity_type_name=self.resource_name, filter=filter, order_by=order_by, ) @@ -401,7 +417,7 @@ def delete_features(self, feature_ids: List[str], sync: bool = True,) -> None: """ features = [] for feature_id in feature_ids: - feature = self.get_feature(feature_id=feature_id) + feature = self._get_feature(feature_id=feature_id) feature.delete(sync=False) features.append(feature) @@ -628,6 +644,7 @@ def create_feature( featurestore.Feature - feature resource object """ + self.wait() return featurestore.Feature.create( feature_id=feature_id, value_type=value_type, @@ -763,8 +780,9 @@ def batch_create_features( return self + @staticmethod def _validate_and_get_import_feature_values_request( - self, + entity_type_name: str, feature_ids: List[str], feature_time: Union[str, datetime.datetime], data_source: Union[gca_io.AvroSource, gca_io.BigQuerySource, gca_io.CsvSource], @@ -775,6 +793,8 @@ def _validate_and_get_import_feature_values_request( ) -> gca_featurestore_service.ImportFeatureValuesRequest: """Validates and get import feature values request. Args: + entity_type_name (str): + Required. A fully-qualified entityType resource name. feature_ids (List[str]): Required. IDs of the Feature to import values of. The Features must exist in the target @@ -842,7 +862,7 @@ def _validate_and_get_import_feature_values_request( ] import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( - entity_type=self.resource_name, + entity_type=entity_type_name, feature_specs=feature_specs, entity_id_field=entity_id_field, disable_online_serving=disable_online_serving, @@ -994,6 +1014,7 @@ def ingest_from_bq( bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri) import_feature_values_request = self._validate_and_get_import_feature_values_request( + entity_type_name=self.resource_name, feature_ids=feature_ids, feature_time=feature_time, data_source=bigquery_source, @@ -1116,6 +1137,7 @@ def ingest_from_gcs( data_source = gca_io.AvroSource(gcs_source=gcs_source) import_feature_values_request = self._validate_and_get_import_feature_values_request( + entity_type_name=self.resource_name, feature_ids=feature_ids, feature_time=feature_time, data_source=data_source, @@ -1302,7 +1324,7 @@ def read( Returns: pd.DataFrame: entities' feature values in DataFrame """ - + self.wait() if isinstance(feature_ids, str): feature_ids = [feature_ids] @@ -1344,7 +1366,7 @@ def read( feature_descriptor.id for feature_descriptor in header.feature_descriptors ] - return EntityType._construct_dataframe( + return self._construct_dataframe( feature_ids=feature_ids, entity_views=entity_views, ) diff --git a/google/cloud/aiplatform/featurestore/feature.py b/google/cloud/aiplatform/featurestore/feature.py index 3c623d6b9e..1564abfd62 100644 --- a/google/cloud/aiplatform/featurestore/feature.py +++ b/google/cloud/aiplatform/featurestore/feature.py @@ -122,18 +122,21 @@ def __init__( else featurestore_id, ) - @property - def featurestore_name(self) -> str: - """Full qualified resource name of the managed featurestore in which this Feature is.""" - self.wait() + def _get_featurestore_name(self) -> str: + """Gets full qualified resource name of the managed featurestore in which this Feature is.""" feature_path_components = self._parse_resource_name(self.resource_name) - return featurestore.Featurestore._format_resource_name( project=feature_path_components["project"], location=feature_path_components["location"], featurestore=feature_path_components["featurestore"], ) + @property + def featurestore_name(self) -> str: + """Full qualified resource name of the managed featurestore in which this Feature is.""" + self.wait() + return self._get_featurestore_name() + def get_featurestore(self) -> "featurestore.Featurestore": """Retrieves the managed featurestore in which this Feature is. @@ -142,12 +145,9 @@ def get_featurestore(self) -> "featurestore.Featurestore": """ return featurestore.Featurestore(featurestore_name=self.featurestore_name) - @property - def entity_type_name(self) -> str: - """Full qualified resource name of the managed entityType in which this Feature is.""" - self.wait() + def _get_entity_type_name(self) -> str: + """Gets full qualified resource name of the managed entityType in which this Feature is.""" feature_path_components = self._parse_resource_name(self.resource_name) - return featurestore.EntityType._format_resource_name( project=feature_path_components["project"], location=feature_path_components["location"], @@ -155,6 +155,12 @@ def entity_type_name(self) -> str: entity_type=feature_path_components["entity_type"], ) + @property + def entity_type_name(self) -> str: + """Full qualified resource name of the managed entityType in which this Feature is.""" + self.wait() + return self._get_entity_type_name() + def get_entity_type(self) -> "featurestore.EntityType": """Retrieves the managed entityType in which this Feature is. @@ -205,6 +211,7 @@ def update( Returns: Feature - The updated feature resource object. """ + self.wait() update_mask = list() if description: diff --git a/google/cloud/aiplatform/featurestore/featurestore.py b/google/cloud/aiplatform/featurestore/featurestore.py index a9504ee498..648dd7ad2b 100644 --- a/google/cloud/aiplatform/featurestore/featurestore.py +++ b/google/cloud/aiplatform/featurestore/featurestore.py @@ -113,8 +113,18 @@ def get_entity_type(self, entity_type_id: str) -> "featurestore.EntityType": featurestore.EntityType - The managed entityType resource object. """ self.wait() - featurestore_name_components = self._parse_resource_name(self.resource_name) + return self._get_entity_type(entity_type_id=entity_type_id) + + def _get_entity_type(self, entity_type_id: str) -> "featurestore.EntityType": + """Retrieves an existing managed entityType in this Featurestore. + Args: + entity_type_id (str): + Required. The managed entityType resource ID in this Featurestore. + Returns: + featurestore.EntityType - The managed entityType resource object. + """ + featurestore_name_components = self._parse_resource_name(self.resource_name) return featurestore.EntityType( entity_type_name=featurestore.EntityType._format_resource_name( project=featurestore_name_components["project"], @@ -225,6 +235,7 @@ def _update( Returns: Featurestore - The updated featurestore resource object. """ + self.wait() update_mask = list() if labels: @@ -314,6 +325,7 @@ def list_entity_types( Returns: List[featurestore.EntityType] - A list of managed entityType resource objects. """ + self.wait() return featurestore.EntityType.list( featurestore_name=self.resource_name, filter=filter, order_by=order_by, ) @@ -338,7 +350,7 @@ def delete_entity_types( """ entity_types = [] for entity_type_id in entity_type_ids: - entity_type = self.get_entity_type(entity_type_id=entity_type_id) + entity_type = self._get_entity_type(entity_type_id=entity_type_id) entity_type.delete(force=force, sync=False) entity_types.append(entity_type) @@ -551,6 +563,7 @@ def create_entity_type( featurestore.EntityType - EntityType resource object """ + self.wait() return featurestore.EntityType.create( entity_type_id=entity_type_id, featurestore_name=self.resource_name, @@ -595,8 +608,9 @@ def _batch_read_feature_values( return self + @staticmethod def _validate_and_get_read_instances( - self, read_instances_uri: str, + read_instances_uri: str, ) -> Union[gca_io.BigQuerySource, gca_io.CsvSource]: """Gets read_instances @@ -629,6 +643,7 @@ def _validate_and_get_read_instances( def _validate_and_get_batch_read_feature_values_request( self, + featurestore_name: str, serving_feature_ids: Dict[str, List[str]], destination: Union[ gca_io.BigQueryDestination, @@ -642,6 +657,8 @@ def _validate_and_get_batch_read_feature_values_request( """Validates and gets batch_read_feature_values_request Args: + featurestore_name (str): + Required. A fully-qualified featurestore resource name. serving_feature_ids (Dict[str, List[str]]): Required. A user defined dictionary to define the entity_types and their features for batch serve/read. The keys of the dictionary are the serving entity_type ids and @@ -680,9 +697,7 @@ def _validate_and_get_batch_read_feature_values_request( Returns: gca_featurestore_service.BatchReadFeatureValuesRequest: batch read feature values request """ - - self.wait() - featurestore_name_components = self._parse_resource_name(self.resource_name) + featurestore_name_components = self._parse_resource_name(featurestore_name) feature_destination_fields = feature_destination_fields or {} @@ -720,7 +735,7 @@ def _validate_and_get_batch_read_feature_values_request( entity_type_specs.append(entity_type_spec) batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( - featurestore=self.resource_name, entity_type_specs=entity_type_specs, + featurestore=featurestore_name, entity_type_specs=entity_type_specs, ) if isinstance(destination, gca_io.BigQueryDestination): @@ -843,6 +858,7 @@ def batch_serve_to_bq( read_instances = self._validate_and_get_read_instances(read_instances_uri) batch_read_feature_values_request = self._validate_and_get_batch_read_feature_values_request( + featurestore_name=self.resource_name, serving_feature_ids=serving_feature_ids, destination=gca_io.BigQueryDestination( output_uri=bq_destination_output_uri @@ -989,6 +1005,7 @@ def batch_serve_to_gcs( read_instances = self._validate_and_get_read_instances(read_instances_uri) batch_read_feature_values_request = self._validate_and_get_batch_read_feature_values_request( + featurestore_name=self.resource_name, serving_feature_ids=serving_feature_ids, destination=destination, feature_destination_fields=feature_destination_fields, diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index 5da494c443..df7d544d95 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -1097,7 +1097,7 @@ def test_validate_and_get_batch_read_feature_values_request( featurestore_name=_TEST_FEATURESTORE_NAME ) expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( - featurestore=my_featurestore.resource_name, + featurestore=_TEST_FEATURESTORE_NAME, destination=gca_featurestore_service.FeatureValueDestination( bigquery_destination=_TEST_BQ_DESTINATION, ), @@ -1107,6 +1107,7 @@ def test_validate_and_get_batch_read_feature_values_request( assert ( expected_batch_read_feature_values_request == my_featurestore._validate_and_get_batch_read_feature_values_request( + featurestore_name=my_featurestore.resource_name, serving_feature_ids=serving_feature_ids, destination=_TEST_BQ_DESTINATION, read_instances=_TEST_BQ_SOURCE, @@ -1560,6 +1561,7 @@ def test_validate_and_get_import_feature_values_request_with_source_fields(self) assert ( true_import_feature_values_request == my_entity_type._validate_and_get_import_feature_values_request( + entity_type_name=my_entity_type.resource_name, feature_ids=_TEST_IMPORTING_FEATURE_IDS, feature_time=_TEST_FEATURE_TIME_FIELD, data_source=_TEST_BQ_SOURCE, @@ -1586,6 +1588,7 @@ def test_validate_and_get_import_feature_values_request_without_source_fields(se assert ( true_import_feature_values_request == my_entity_type._validate_and_get_import_feature_values_request( + entity_type_name=my_entity_type.resource_name, feature_ids=_TEST_IMPORTING_FEATURE_IDS, feature_time=_TEST_FEATURE_TIME, data_source=_TEST_CSV_SOURCE,