From 6db4abca004035c769cc0d9bd19da4a6b4027a9b Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Thu, 16 Dec 2021 20:46:23 -0800 Subject: [PATCH 01/11] feat: add batch_serve_to_bq for bigquery table and batch_serve_to_gcs for csv and tfrecord files in Featurestore class --- .../aiplatform/featurestore/featurestore.py | 603 +++++++++++++++++- .../aiplatform/utils/featurestore_utils.py | 1 + tests/system/aiplatform/e2e_base.py | 33 + tests/system/aiplatform/test_featurestore.py | 54 ++ tests/unit/aiplatform/test_featurestores.py | 302 +++++++++ 5 files changed, 991 insertions(+), 2 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/featurestore.py b/google/cloud/aiplatform/featurestore/featurestore.py index d799e22963..ad90cdc14b 100644 --- a/google/cloud/aiplatform/featurestore/featurestore.py +++ b/google/cloud/aiplatform/featurestore/featurestore.py @@ -15,13 +15,18 @@ # limitations under the License. # -from typing import Dict, List, Optional, Sequence, Tuple +from typing import Dict, List, Optional, Set, Sequence, Tuple, Union from google.auth import credentials as auth_credentials from google.protobuf import field_mask_pb2 from google.cloud.aiplatform import base -from google.cloud.aiplatform.compat.types import featurestore as gca_featurestore +from google.cloud.aiplatform.compat.types import ( + feature_selector as gca_feature_selector, + featurestore as gca_featurestore, + featurestore_service as gca_featurestore_service, + io as gca_io, +) from google.cloud.aiplatform import featurestore from google.cloud.aiplatform import initializer from google.cloud.aiplatform import utils @@ -557,3 +562,597 @@ def create_entity_type( request_metadata=request_metadata, sync=sync, ) + + def _batch_read_feature_values( + self, + batch_read_feature_values_request: gca_featurestore_service.BatchReadFeatureValuesRequest, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + ) -> "Featurestore": + """Batch read Feature values from the Featurestore to a destination storage. + + Args: + batch_read_feature_values_request (gca_featurestore_service.BatchReadFeatureValuesRequest): + Required. Request of batch read feature values. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + + Returns: + Featurestore - The featurestore resource object batch read feature values from. + """ + + _LOGGER.log_action_start_against_resource( + "Serving", "feature values", self, + ) + + batch_read_lro = self.api_client.batch_read_feature_values( + request=batch_read_feature_values_request, metadata=request_metadata, + ) + + _LOGGER.log_action_started_against_resource_with_lro( + "Serve", "feature values", self.__class__, batch_read_lro + ) + + batch_read_lro.result() + + _LOGGER.log_action_completed_against_resource("feature values", "served", self) + + return self + + def _validate_and_get_feature_id_and_destination_feature_setting( + self, + feature_destination_fields: Optional[ + Union[Dict[str, str], List[str], Set[str]] + ] = None, + ) -> Tuple[List[str], List[gca_featurestore_service.DestinationFeatureSetting]]: + """Validates and gets feature_ids and destination_feature_settings from feature_destination_fields config. + + Args: + feature_destination_fields (Union[Dict[str, str], List[str], Set[str]]): + Optional. User defined feature_destination_fields config. + + Returns: + Tuple[List[str], List[gca_featurestore_service.DestinationFeatureSetting]] - A list of feature_id and a list of DestinationFeatureSetting list + + Raises: + TypeError - if the type of feature_destination_fields is not supported. + """ + feature_ids = [] + destination_feature_settings = [] + + if not feature_destination_fields: + return feature_ids, destination_feature_settings + + if isinstance(feature_destination_fields, dict): + for ( + feature_id, + feature_detination_field, + ) in feature_destination_fields.items(): + + destination_feature_setting = gca_featurestore_service.DestinationFeatureSetting( + feature_id=feature_id, destination_field=feature_detination_field, + ) + + feature_ids.append(feature_id) + destination_feature_settings.append(destination_feature_setting) + + elif isinstance(feature_destination_fields, (set, list)): + for feature_id in set(feature_destination_fields): + + destination_feature_setting = gca_featurestore_service.DestinationFeatureSetting( + feature_id=feature_id + ) + + feature_ids.append(feature_id) + destination_feature_settings.append(destination_feature_setting) + + else: + raise TypeError( + f"The 'feature_destination_fields' for each entity_type should be a dict, list or set, " + f"instead, got {type(feature_destination_fields)}." + ) + + return feature_ids, destination_feature_settings + + def _validate_and_get_batch_read_feature_values_request( + self, + entity_type_ids: List[str], + destination: Union[ + gca_io.BigQueryDestination, + gca_io.CsvDestination, + gca_io.TFRecordDestination, + ], + entity_type_destination_fields: Optional[ + Dict[str, Union[Dict[str, str], List[str], Set[str]]] + ] = None, + read_instances: Optional[Union[gca_io.BigQuerySource, gca_io.CsvSource]] = None, + pass_through_fields: Optional[List[str]] = None, + ) -> gca_featurestore_service.BatchReadFeatureValuesRequest: + """Validates and gets batch_read_feature_values_request + + Args: + entity_type_ids (List[str]): + Required. ID of the EntityType to select batch serving Features. The + EntityType id is the specified during EntityType creation. + destination (Union[gca_io.BigQueryDestination, gca_io.CsvDestination, gca_io.TFRecordDestination]): + Required. BigQuery destination, Csv destination or TFRecord destination. + entity_type_destination_fields (Dict[str, Union[Dict[str, str], List[str], Set[str]]]): + Optional. User defined dictionary to map ID of the EntityType's Features + to the batch serving destination field name. + + Specify the features to be batch served in each entityType, and their destination field name. + If the features are not specified, all features will be batch served. + If the destination field name is not specified, Feature ID will be used as destination field name. + + Example: + + - In case all features will be batch served and using Feature ID as destination field name: + + entity_type_ids = ['my_entity_type_id_1', 'my_entity_type_id_2', 'my_entity_type_id_3'] + + entity_type_destination_fields = {} + or + entity_type_destination_fields = { + 'my_entity_type_id_1': {}, + 'my_entity_type_id_2': [], + 'my_entity_type_id_3': None, + } + + - In case selected features will be batch served and using Feature ID as destination field name: + + entity_type_ids = ['my_entity_type_id_1', 'my_entity_type_id_2', 'my_entity_type_id_3'] + + feature_source_fields = { + 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], + 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], + 'my_entity_type_id_3': ['feature_id_3_1', 'feature_id_3_2'], + } + + - In case selected features will be batch served with specified destination field name + + feature_source_fields = { + 'my_entity_type_id_1': { + 'feature_id_1_1': 'feature_id_1_1_destination_field', + 'feature_id_1_2': 'feature_id_1_2_destination_field', + }, + 'my_entity_type_id_2': { + 'feature_id_2_1': 'feature_id_2_1_destination_field', + 'feature_id_2_2': 'feature_id_2_2_destination_field', + }, + 'my_entity_type_id_3': { + 'feature_id_3_1': 'feature_id_3_1_destination_field', + 'feature_id_3_2': 'feature_id_3_2_destination_field', + }, + } + Note: the above three cases can be mixed in use. + + read_instances (Union[gca_io.BigQuerySource, gca_io.CsvSource]): + Optional. BigQuery source or Csv source for read instances. + pass_through_fields (List[str]): + Optional. When not empty, the specified fields in the + read_instances source will be joined as-is in the output, + in addition to those fields from the Featurestore Entity. + + For BigQuery source, the type of the pass-through values + will be automatically inferred. For CSV source, the + pass-through values will be passed as opaque bytes. + + Returns: + gca_featurestore_service.BatchReadFeatureValuesRequest - batch read feature values request + """ + entity_type_destination_fields = entity_type_destination_fields or {} + entity_type_specs = [] + for entity_type_id in set(entity_type_ids): + ( + feature_ids, + destination_feature_settings, + ) = self._validate_and_get_feature_id_and_destination_feature_setting( + feature_destination_fields=entity_type_destination_fields.get( + entity_type_id + ) + ) + + entity_type_spec = gca_featurestore_service.BatchReadFeatureValuesRequest.EntityTypeSpec( + entity_type_id=entity_type_id, + feature_selector=gca_feature_selector.FeatureSelector( + id_matcher=gca_feature_selector.IdMatcher(ids=feature_ids or ["*"]) + ), + settings=destination_feature_settings or None, + ) + entity_type_specs.append(entity_type_spec) + + batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( + featurestore=self.resource_name, entity_type_specs=entity_type_specs, + ) + + if isinstance(destination, gca_io.BigQueryDestination): + batch_read_feature_values_request.destination = gca_featurestore_service.FeatureValueDestination( + bigquery_destination=destination + ) + elif isinstance(destination, gca_io.CsvDestination): + batch_read_feature_values_request.destination = gca_featurestore_service.FeatureValueDestination( + csv_destination=destination + ) + elif isinstance(destination, gca_io.TFRecordDestination): + batch_read_feature_values_request.destination = gca_featurestore_service.FeatureValueDestination( + tfrecord_destination=destination + ) + + if isinstance(read_instances, gca_io.BigQuerySource): + batch_read_feature_values_request.bigquery_read_instances = read_instances + elif isinstance(read_instances, gca_io.CsvSource): + batch_read_feature_values_request.csv_read_instances = read_instances + + if pass_through_fields is not None: + batch_read_feature_values_request.pass_through_fields = [ + gca_featurestore_service.BatchReadFeatureValuesRequest.PassThroughField( + field_name=pass_through_field + ) + for pass_through_field in pass_through_fields + ] + + return batch_read_feature_values_request + + def _get_read_instances( + self, read_instances: Union[str, List[str]], + ) -> Union[gca_io.BigQuerySource, gca_io.CsvSource]: + """Gets read_instances + + Args: + read_instances (Union[str, List[str]]): + Required. Read_instances can be either BigQuery URI to the input table, + or Google Cloud Storage URI(-s) to the csv file(s). + + Returns: + Union[gca_io.BigQuerySource, gca_io.CsvSource] - BigQuery source or Csv source for read instances. + + Raises: + TypeError if read_instances is not a string or a list of strings. + ValueError if read_instances uri does not start with 'bq://' or 'gs://'. + ValueError if uris in read_instances do not start with 'gs://'. + """ + if isinstance(read_instances, str): + if not ( + read_instances.startswith("bq://") or read_instances.startswith("gs://") + ): + raise ValueError( + "The read_instances accepts a single uri starts with 'bq://' or 'gs://'." + ) + elif isinstance(read_instances, list) and all( + [isinstance(e, str) for e in read_instances] + ): + if not all([e.startswith("gs://") for e in read_instances]): + raise ValueError( + "The read_instances accepts a list of uris start with 'gs://' only." + ) + else: + raise TypeError( + "The read_instances type should to be either a str or a List[str]." + ) + + if isinstance(read_instances, str): + if read_instances.startswith("bq://"): + return gca_io.BigQuerySource(input_uri=read_instances) + else: + read_instances = [read_instances] + + return gca_io.CsvSource(gcs_source=gca_io.GcsSource(uris=read_instances)) + + @base.optional_sync(return_input_arg="self") + def batch_serve_to_bq( + self, + bq_destination_output_uri: str, + entity_type_ids: List[str], + entity_type_destination_fields: Optional[ + Dict[str, Union[Dict[str, str], List[str], Set[str]]] + ] = None, + read_instances: Optional[Union[str, List[str]]] = None, + pass_through_fields: Optional[List[str]] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + sync: bool = True, + ) -> "Featurestore": + """ Batch serves feature values to BigQuery destination + + Args: + bq_destination_output_uri (str): + Required. BigQuery URI to the detination table. + + Example: + 'bq://project.dataset.table_name' + + It requires an existing BigQuery destination Dataset, under the same project as the Featurestore. + + entity_type_ids (List[str]): + Required. ID of the EntityType to select batch serving Features. The + EntityType id is the specified during EntityType creation. + entity_type_destination_fields (Dict[str, Union[Dict[str, str], List[str], Set[str]]]): + Optional. User defined dictionary to map ID of the EntityType's Features + to the batch serving destination field name. + + Specify the features to be batch served in each entityType, and their destination field name. + If the features are not specified, all features will be batch served. + If the destination field name is not specified, Feature ID will be used as destination field name. + + Example: + + - In case all features will be batch served and using Feature ID as destination field name: + + entity_type_ids = ['my_entity_type_id_1', 'my_entity_type_id_2', 'my_entity_type_id_3'] + + entity_type_destination_fields = {} + or + entity_type_destination_fields = { + 'my_entity_type_id_1': {}, + 'my_entity_type_id_2': [], + 'my_entity_type_id_3': None, + } + + - In case selected features will be batch served and using Feature ID as destination field name: + + entity_type_ids = ['my_entity_type_id_1', 'my_entity_type_id_2', 'my_entity_type_id_3'] + + feature_source_fields = { + 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], + 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], + 'my_entity_type_id_3': ['feature_id_3_1', 'feature_id_3_2'], + } + + - In case selected features will be batch served with specified destination field name + + feature_source_fields = { + 'my_entity_type_id_1': { + 'feature_id_1_1': 'feature_id_1_1_destination_field', + 'feature_id_1_2': 'feature_id_1_2_destination_field', + }, + 'my_entity_type_id_2': { + 'feature_id_2_1': 'feature_id_2_1_destination_field', + 'feature_id_2_2': 'feature_id_2_2_destination_field', + }, + 'my_entity_type_id_3': { + 'feature_id_3_1': 'feature_id_3_1_destination_field', + 'feature_id_3_2': 'feature_id_3_2_destination_field', + }, + } + Note: the above three cases can be mixed in use. + + read_instances (Union[str, List[str]]): + Optional. Read_instances can be either BigQuery URI to the input table, + or Google Cloud Storage URI(-s) to the + csv file(s). May contain wildcards. For more + information on wildcards, see + https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames. + Example: + 'bq://project.dataset.table_name' + or + ["gs://my_bucket/my_file_1.csv", "gs://my_bucket/my_file_2.csv"] + + Each read instance consists of exactly one read timestamp + and one or more entity IDs identifying entities of the + corresponding EntityTypes whose Features are requested. + + Each output instance contains Feature values of requested + entities concatenated together as of the read time. + + An example read instance may be + ``foo_entity_id, bar_entity_id, 2020-01-01T10:00:00.123Z``. + + An example output instance may be + ``foo_entity_id, bar_entity_id, 2020-01-01T10:00:00.123Z, foo_entity_feature1_value, bar_entity_feature2_value``. + + Timestamp in each read instance must be millisecond-aligned. + + The columns can be in any order. + + Values in the timestamp column must use the RFC 3339 format, + e.g. ``2012-07-30T10:43:17.123Z``. + + pass_through_fields (List[str]): + Optional. When not empty, the specified fields in the + read_instances source will be joined as-is in the output, + in addition to those fields from the Featurestore Entity. + + For BigQuery source, the type of the pass-through values + will be automatically inferred. For CSV source, the + pass-through values will be passed as opaque bytes. + + Returns: + Featurestore - The featurestore resource object batch read feature values from. + + Raises: + NotFound: if the BigQuery destination Dataset does not exist. + FailedPrecondition: if the BigQuery destination Dataset/Table is in a different project. + """ + batch_read_feature_values_request = self._validate_and_get_batch_read_feature_values_request( + entity_type_ids=entity_type_ids, + destination=gca_io.BigQueryDestination( + output_uri=bq_destination_output_uri + ), + entity_type_destination_fields=entity_type_destination_fields, + read_instances=read_instances + if read_instances is None + else self._get_read_instances(read_instances), + pass_through_fields=pass_through_fields, + ) + + return self._batch_read_feature_values( + batch_read_feature_values_request=batch_read_feature_values_request, + request_metadata=request_metadata, + ) + + @base.optional_sync(return_input_arg="self") + def batch_serve_to_gcs( + self, + gcs_destination_output_uri_prefix: str, + gcs_destination_type: str, + entity_type_ids: List[str], + entity_type_destination_fields: Optional[ + Dict[str, Union[Dict[str, str], List[str], Set[str]]] + ] = None, + read_instances: Optional[Union[str, List[str]]] = None, + pass_through_fields: Optional[List[str]] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + sync: bool = True, + ) -> "Featurestore": + """ Batch serves feature values to GCS destination + + Args: + gcs_destination_output_uri_prefix (str): + Required. Google Cloud Storage URI to output + directory. If the uri doesn't end with '/', a + '/' will be automatically appended. The + directory is created if it doesn't exist. + + Example: + "gs://bucket/path/to/prefix" + + gcs_destination_type (str): + Required. The type of the destination files(s), + the value of gcs_destination_type can only be either `csv`, or `tfrecord`. + + For CSV format. Array Feature value types are not allowed in CSV format. + + For TFRecord format. + + Below are the mapping from Feature value type in + Featurestore to Feature value type in TFRecord: + + :: + + Value type in Featurestore | Value type in TFRecord + DOUBLE, DOUBLE_ARRAY | FLOAT_LIST + INT64, INT64_ARRAY | INT64_LIST + STRING, STRING_ARRAY, BYTES | BYTES_LIST + true -> byte_string("true"), false -> byte_string("false") + BOOL, BOOL_ARRAY (true, false) | BYTES_LIST + + entity_type_ids (List[str]): + Required. ID of the EntityType to select batch serving Features. The + EntityType id is the specified during EntityType creation. + entity_type_destination_fields (Dict[str, Union[Dict[str, str], List[str], Set[str]]]): + Optional. User defined dictionary to map ID of the EntityType's Features + to the batch serving destination field name. + + Specify the features to be batch served in each entityType, and their destination field name. + If the features are not specified, all features will be batch served. + If the destination field name is not specified, Feature ID will be used as destination field name. + + Example: + + - In case all features will be batch served and using Feature ID as destination field name: + + entity_type_ids = ['my_entity_type_id_1', 'my_entity_type_id_2', 'my_entity_type_id_3'] + + entity_type_destination_fields = {} + or + entity_type_destination_fields = { + 'my_entity_type_id_1': {}, + 'my_entity_type_id_2': [], + 'my_entity_type_id_3': None, + } + + - In case selected features will be batch served and using Feature ID as destination field name: + + entity_type_ids = ['my_entity_type_id_1', 'my_entity_type_id_2', 'my_entity_type_id_3'] + + feature_source_fields = { + 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], + 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], + 'my_entity_type_id_3': ['feature_id_3_1', 'feature_id_3_2'], + } + + - In case selected features will be batch served with specified destination field name + + feature_source_fields = { + 'my_entity_type_id_1': { + 'feature_id_1_1': 'feature_id_1_1_destination_field', + 'feature_id_1_2': 'feature_id_1_2_destination_field', + }, + 'my_entity_type_id_2': { + 'feature_id_2_1': 'feature_id_2_1_destination_field', + 'feature_id_2_2': 'feature_id_2_2_destination_field', + }, + 'my_entity_type_id_3': { + 'feature_id_3_1': 'feature_id_3_1_destination_field', + 'feature_id_3_2': 'feature_id_3_2_destination_field', + }, + } + Note: the above three cases can be mixed in use. + + read_instances (Union[str, List[str]]): + Optional. Read_instances can be either BigQuery URI to the input table, + or Google Cloud Storage URI(-s) to the + csv file(s). May contain wildcards. For more + information on wildcards, see + https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames. + Example: + 'bq://project.dataset.table_name' + or + ["gs://my_bucket/my_file_1.csv", "gs://my_bucket/my_file_2.csv"] + + Each read instance consists of exactly one read timestamp + and one or more entity IDs identifying entities of the + corresponding EntityTypes whose Features are requested. + + Each output instance contains Feature values of requested + entities concatenated together as of the read time. + + An example read instance may be + ``foo_entity_id, bar_entity_id, 2020-01-01T10:00:00.123Z``. + + An example output instance may be + ``foo_entity_id, bar_entity_id, 2020-01-01T10:00:00.123Z, foo_entity_feature1_value, bar_entity_feature2_value``. + + Timestamp in each read instance must be millisecond-aligned. + + The columns can be in any order. + + Values in the timestamp column must use the RFC 3339 format, + e.g. ``2012-07-30T10:43:17.123Z``. + + pass_through_fields (List[str]): + Optional. When not empty, the specified fields in the + read_instances source will be joined as-is in the output, + in addition to those fields from the Featurestore Entity. + + For BigQuery source, the type of the pass-through values + will be automatically inferred. For CSV source, the + pass-through values will be passed as opaque bytes. + + Returns: + Featurestore - The featurestore resource object batch read feature values from. + + Raises: + ValueError if gcs_destination_type is not supported. + + """ + destination = None + if gcs_destination_type not in featurestore_utils.GCS_DESTINATION_TYPE: + raise ValueError( + "Only %s are supported gcs_destination_type, not `%s`. " + % ( + "`" + "`, `".join(featurestore_utils.GCS_DESTINATION_TYPE) + "`", + gcs_destination_type, + ) + ) + + gcs_destination = gca_io.GcsDestination( + output_uri_prefix=gcs_destination_output_uri_prefix + ) + if gcs_destination_type == "csv": + destination = gca_io.CsvDestination(gcs_destination=gcs_destination) + if gcs_destination_type == "tfrecord": + destination = gca_io.TFRecordDestination(gcs_destination=gcs_destination) + + batch_read_feature_values_request = self._validate_and_get_batch_read_feature_values_request( + entity_type_ids=entity_type_ids, + destination=destination, + entity_type_destination_fields=entity_type_destination_fields, + read_instances=read_instances + if read_instances is None + else self._get_read_instances(read_instances), + pass_through_fields=pass_through_fields, + ) + + return self._batch_read_feature_values( + batch_read_feature_values_request=batch_read_feature_values_request, + request_metadata=request_metadata, + ) diff --git a/google/cloud/aiplatform/utils/featurestore_utils.py b/google/cloud/aiplatform/utils/featurestore_utils.py index e9d26b62be..45dbbbf44f 100644 --- a/google/cloud/aiplatform/utils/featurestore_utils.py +++ b/google/cloud/aiplatform/utils/featurestore_utils.py @@ -29,6 +29,7 @@ RESOURCE_ID_PATTERN_REGEX = r"[a-z_][a-z0-9_]{0,59}" GCS_SOURCE_TYPE = {"csv", "avro"} +GCS_DESTINATION_TYPE = {"csv", "tfrecord"} _FEATURE_VALUE_TYPE_UNSPECIFIED = "VALUE_TYPE_UNSPECIFIED" diff --git a/tests/system/aiplatform/e2e_base.py b/tests/system/aiplatform/e2e_base.py index 61b9e7f36c..3a9f87e8ae 100644 --- a/tests/system/aiplatform/e2e_base.py +++ b/tests/system/aiplatform/e2e_base.py @@ -24,6 +24,7 @@ from google.api_core import exceptions from google.cloud import aiplatform +from google.cloud import bigquery from google.cloud import storage from google.cloud.aiplatform import initializer @@ -90,6 +91,38 @@ def delete_staging_bucket(self, shared_state: Dict[str, Any]): bucket = shared_state["bucket"] bucket.delete(force=True) + @pytest.fixture(scope="class") + def prepare_bigquery_dataset( + self, shared_state: Dict[str, Any] + ) -> Generator[bigquery.dataset.Dataset, None, None]: + """Create a bigquery dataset and store bigquery resource object in shared state.""" + + bigquery_client = bigquery.Client(project=_PROJECT) + shared_state["bigquery_client"] = bigquery_client + + dataset_name = f"{self._temp_prefix.lower()}_{uuid.uuid4()}".replace("-", "_") + dataset_id = f"{_PROJECT}.{dataset_name}" + shared_state["bigquery_dataset_id"] = dataset_id + + dataset = bigquery.Dataset(dataset_id) + dataset.location = _LOCATION + shared_state["bigquery_dataset"] = bigquery_client.create_dataset(dataset) + + yield + + @pytest.fixture(scope="class") + def delete_bigquery_dataset(self, shared_state: Dict[str, Any]): + """Delete the bigquery dataset""" + + yield + + # Get the bigquery dataset id used for testing and wipe it + bigquery_dataset = shared_state["bigquery_dataset"] + bigquery_client = shared_state["bigquery_client"] + bigquery_client.delete_dataset( + bigquery_dataset.dataset_id, delete_contents=True, not_found_ok=True + ) # Make an API request. + @pytest.fixture(scope="class", autouse=True) def teardown(self, shared_state: Dict[str, Any]): """Delete every Vertex AI resource created during test""" diff --git a/tests/system/aiplatform/test_featurestore.py b/tests/system/aiplatform/test_featurestore.py index 65850f7d67..bb16eb0e2d 100644 --- a/tests/system/aiplatform/test_featurestore.py +++ b/tests/system/aiplatform/test_featurestore.py @@ -16,6 +16,7 @@ # import logging +import pytest from google.cloud import aiplatform from tests.system.aiplatform import e2e_base @@ -27,6 +28,8 @@ "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movies.avro" ) +_TEST_READ_INSTANCE_SRC = "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movie_prediction.csv" + _TEST_FEATURESTORE_ID = "movie_prediction" _TEST_USER_ENTITY_TYPE_ID = "users" _TEST_MOVIE_ENTITY_TYPE_ID = "movies" @@ -40,6 +43,12 @@ _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID = "average_rating" +@pytest.mark.usefixtures( + "prepare_staging_bucket", + "delete_staging_bucket", + "prepare_bigquery_dataset", + "delete_bigquery_dataset", +) class TestFeaturestore(e2e_base.TestEndToEnd): _temp_prefix = "temp_vertex_sdk_e2e_featurestore_test" @@ -247,3 +256,48 @@ def test_search_features(self, shared_state): assert ( len(list_searched_features) - shared_state["base_list_searched_features"] ) == 6 + + def test_batch_serve_to_gcs(self, shared_state, caplog): + + assert shared_state["featurestore"] + assert shared_state["bucket"] + featurestore = shared_state["featurestore"] + bucket_name = shared_state["staging_bucket_name"] + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + caplog.set_level(logging.INFO) + + featurestore.batch_serve_to_gcs( + entity_type_ids=[_TEST_USER_ENTITY_TYPE_ID, _TEST_MOVIE_ENTITY_TYPE_ID], + read_instances=_TEST_READ_INSTANCE_SRC, + gcs_destination_output_uri_prefix=f"gs://{bucket_name}/featurestore_test/tfrecord", + gcs_destination_type="tfrecord", + ) + assert "Featurestore feature values served." in caplog.text + + caplog.clear() + + def test_batch_serve_to_bq(self, shared_state, caplog): + + assert shared_state["featurestore"] + assert shared_state["bigquery_dataset"] + featurestore = shared_state["featurestore"] + bigquery_dataset_id = shared_state["bigquery_dataset_id"] + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + caplog.set_level(logging.INFO) + + featurestore.batch_serve_to_bq( + entity_type_ids=[_TEST_USER_ENTITY_TYPE_ID, _TEST_MOVIE_ENTITY_TYPE_ID], + read_instances=_TEST_READ_INSTANCE_SRC, + bq_destination_output_uri=f"bq://{bigquery_dataset_id}.test_table", + ) + + assert "Featurestore feature values served." in caplog.text + caplog.clear() diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index f76e6ecf22..3516f3d770 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -38,6 +38,7 @@ encryption_spec as gca_encryption_spec, entity_type as gca_entity_type, feature as gca_feature, + feature_selector as gca_feature_selector, featurestore as gca_featurestore, featurestore_service as gca_featurestore_service, io as gca_io, @@ -133,6 +134,8 @@ "my_feature_id_1": "my_feature_id_1_source_field", } +_TEST_BATCH_READING_ENTITY_TYPE_IDS = ["my_entity_type_id_1"] + _TEST_FEATURE_TIME_FIELD = "feature_time_field" _TEST_FEATURE_TIME = datetime.datetime.now() @@ -140,6 +143,7 @@ _TEST_GCS_AVRO_SOURCE_URIS = [ "gs://my_bucket/my_file_1.avro", ] +_TEST_GCS_CSV_SOURCE_URI = "gs://my_bucket/my_file_1.csv" _TEST_GCS_CSV_SOURCE_URIS = [ "gs://my_bucket/my_file_1.csv", ] @@ -147,6 +151,13 @@ _TEST_GCS_SOURCE_TYPE_AVRO = "avro" _TEST_GCS_SOURCE_TYPE_INVALID = "json" +_TEST_BQ_DESTINATION_URI = "bq://project.dataset.table_name" +_TEST_GCS_OUTPUT_URI_PREFIX = "gs://my_bucket/path/to_prefix" + +_TEST_GCS_DESTINATION_TYPE_CSV = "csv" +_TEST_GCS_DESTINATION_TYPE_TFRECORD = "tfrecord" +_TEST_GCS_DESTINATION_TYPE_INVALID = "json" + _TEST_BQ_SOURCE = gca_io.BigQuerySource(input_uri=_TEST_BQ_SOURCE_URI) _TEST_AVRO_SOURCE = gca_io.AvroSource( gcs_source=gca_io.GcsSource(uris=_TEST_GCS_AVRO_SOURCE_URIS) @@ -155,6 +166,33 @@ gcs_source=gca_io.GcsSource(uris=_TEST_GCS_CSV_SOURCE_URIS) ) +_TEST_BQ_DESTINATION = gca_io.BigQueryDestination(output_uri=_TEST_BQ_DESTINATION_URI) +_TEST_CSV_DESTINATION = gca_io.CsvDestination( + gcs_destination=gca_io.GcsDestination(output_uri_prefix=_TEST_GCS_OUTPUT_URI_PREFIX) +) +_TEST_TFRECORD_DESTINATION = gca_io.TFRecordDestination( + gcs_destination=gca_io.GcsDestination(output_uri_prefix=_TEST_GCS_OUTPUT_URI_PREFIX) +) + + +def _get_feature_destination_setting_proto(feature_id, destination_field=None): + destination_feature_setting_proto = gca_featurestore_service.DestinationFeatureSetting( + feature_id=feature_id + ) + if destination_field: + destination_feature_setting_proto.destination_field = destination_field + return destination_feature_setting_proto + + +def _get_entity_type_spec_proto_with_all_features(entity_type_id): + entity_type_spec_proto = gca_featurestore_service.BatchReadFeatureValuesRequest.EntityTypeSpec( + entity_type_id=entity_type_id, + feature_selector=gca_feature_selector.FeatureSelector( + id_matcher=gca_feature_selector.IdMatcher(ids=["*"]) + ), + ) + return entity_type_spec_proto + # All Featurestore Mocks @pytest.fixture @@ -227,6 +265,17 @@ def create_featurestore_mock(): yield create_featurestore_mock +@pytest.fixture +def batch_read_feature_values_mock(): + with patch.object( + featurestore_service_client.FeaturestoreServiceClient, + "batch_read_feature_values", + ) as batch_read_feature_values_mock: + batch_read_feature_values_lro_mock = mock.Mock(operation.Operation) + batch_read_feature_values_mock.return_value = batch_read_feature_values_lro_mock + yield batch_read_feature_values_mock + + # ALL EntityType Mocks @pytest.fixture def get_entity_type_mock(): @@ -711,6 +760,259 @@ def test_create_featurestore(self, create_featurestore_mock, sync): metadata=_TEST_REQUEST_METADATA, ) + @pytest.mark.usefixtures("get_featurestore_mock") + @pytest.mark.parametrize( + "feature_destination_fields, expected", + [ + ({}, ([], [])), + ([], ([], [])), + (None, ([], [])), + ( + {"feature_id_1": "feature_id_1_destination_field"}, + ( + ["feature_id_1"], + [ + _get_feature_destination_setting_proto( + "feature_id_1", "feature_id_1_destination_field" + ) + ], + ), + ), + ( + ["feature_id_1"], + ( + ["feature_id_1"], + [_get_feature_destination_setting_proto("feature_id_1")], + ), + ), + ( + {"feature_id_1"}, + ( + ["feature_id_1"], + [_get_feature_destination_setting_proto("feature_id_1")], + ), + ), + ( + {"feature_id_1": None}, + ( + ["feature_id_1"], + [_get_feature_destination_setting_proto("feature_id_1")], + ), + ), + ], + ) + def test_validate_and_get_feature_id_and_destination_feature_setting( + self, feature_destination_fields, expected + ): + aiplatform.init(project=_TEST_PROJECT) + + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + assert ( + expected + == my_featurestore._validate_and_get_feature_id_and_destination_feature_setting( + feature_destination_fields=feature_destination_fields + ) + ) + + @pytest.mark.usefixtures("get_featurestore_mock") + @pytest.mark.parametrize( + "feature_destination_fields", ["feature_id_1", {"feature_id_1": 1}, 1], + ) + def test_validate_and_get_feature_id_and_destination_feature_setting_with_raise( + self, feature_destination_fields + ): + aiplatform.init(project=_TEST_PROJECT) + + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + with pytest.raises(TypeError): + my_featurestore._validate_and_get_feature_id_and_destination_feature_setting( + feature_destination_fields=feature_destination_fields + ) + + @pytest.mark.usefixtures("get_featurestore_mock") + def test_validate_and_get_batch_read_feature_values_request(self,): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( + featurestore=my_featurestore.resource_name, + destination=gca_featurestore_service.FeatureValueDestination( + bigquery_destination=_TEST_BQ_DESTINATION, + ), + entity_type_specs=[ + _get_entity_type_spec_proto_with_all_features("my_entity_type_id_1") + ], + ) + assert ( + expected_batch_read_feature_values_request + == my_featurestore._validate_and_get_batch_read_feature_values_request( + entity_type_ids=_TEST_BATCH_READING_ENTITY_TYPE_IDS, + destination=_TEST_BQ_DESTINATION, + ) + ) + + @pytest.mark.usefixtures("get_featurestore_mock") + def test_validate_and_get_batch_read_feature_values_request_with_read_instances( + self, + ): + + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( + featurestore=my_featurestore.resource_name, + destination=gca_featurestore_service.FeatureValueDestination( + bigquery_destination=_TEST_BQ_DESTINATION, + ), + entity_type_specs=[ + _get_entity_type_spec_proto_with_all_features("my_entity_type_id_1") + ], + bigquery_read_instances=_TEST_BQ_SOURCE, + ) + assert ( + expected_batch_read_feature_values_request + == my_featurestore._validate_and_get_batch_read_feature_values_request( + entity_type_ids=_TEST_BATCH_READING_ENTITY_TYPE_IDS, + destination=_TEST_BQ_DESTINATION, + read_instances=_TEST_BQ_SOURCE, + ) + ) + + @pytest.mark.usefixtures("get_featurestore_mock") + @pytest.mark.parametrize( + "read_instances, expected", + [ + (_TEST_BQ_SOURCE_URI, _TEST_BQ_SOURCE), + (_TEST_GCS_CSV_SOURCE_URIS, _TEST_CSV_SOURCE), + (_TEST_GCS_CSV_SOURCE_URI, _TEST_CSV_SOURCE), + ], + ) + def test_get_read_instances(self, read_instances, expected): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + assert expected == my_featurestore._get_read_instances( + read_instances=read_instances + ) + + @pytest.mark.usefixtures("get_featurestore_mock") + @pytest.mark.parametrize( + "read_instances", + [[1, 2, 3, 4, 5], 1, (_TEST_GCS_CSV_SOURCE_URI, _TEST_GCS_CSV_SOURCE_URI)], + ) + def test_get_read_instances_with_raise_typeerror(self, read_instances): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + with pytest.raises(TypeError): + my_featurestore._get_read_instances(read_instances=read_instances) + + @pytest.mark.usefixtures("get_featurestore_mock") + @pytest.mark.parametrize( + "read_instances", + [ + "gcs://my_bucket/my_file_1.csv", + "bigquery://my_bucket/my_file_1.csv", + "my_bucket/my_file_1.csv", + [_TEST_BQ_SOURCE_URI], + ], + ) + def test_get_read_instances_with_raise_valueerror(self, read_instances): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + with pytest.raises(ValueError): + my_featurestore._get_read_instances(read_instances=read_instances) + + @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.usefixtures("get_featurestore_mock") + def test_batch_serve_to_bq(self, batch_read_feature_values_mock, sync): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + + expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( + featurestore=my_featurestore.resource_name, + destination=gca_featurestore_service.FeatureValueDestination( + bigquery_destination=_TEST_BQ_DESTINATION, + ), + entity_type_specs=[ + _get_entity_type_spec_proto_with_all_features("my_entity_type_id_1") + ], + ) + + my_featurestore.batch_serve_to_bq( + bq_destination_output_uri=_TEST_BQ_DESTINATION_URI, + entity_type_ids=_TEST_BATCH_READING_ENTITY_TYPE_IDS, + sync=sync, + ) + + if not sync: + my_featurestore.wait() + + batch_read_feature_values_mock.assert_called_once_with( + request=expected_batch_read_feature_values_request, + metadata=_TEST_REQUEST_METADATA, + ) + + @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.usefixtures("get_featurestore_mock") + def test_batch_serve_to_gcs(self, batch_read_feature_values_mock, sync): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + + expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( + featurestore=my_featurestore.resource_name, + destination=gca_featurestore_service.FeatureValueDestination( + tfrecord_destination=_TEST_TFRECORD_DESTINATION, + ), + entity_type_specs=[ + _get_entity_type_spec_proto_with_all_features("my_entity_type_id_1") + ], + ) + + my_featurestore.batch_serve_to_gcs( + gcs_destination_output_uri_prefix=_TEST_GCS_OUTPUT_URI_PREFIX, + gcs_destination_type=_TEST_GCS_DESTINATION_TYPE_TFRECORD, + entity_type_ids=_TEST_BATCH_READING_ENTITY_TYPE_IDS, + sync=sync, + ) + + if not sync: + my_featurestore.wait() + + batch_read_feature_values_mock.assert_called_once_with( + request=expected_batch_read_feature_values_request, + metadata=_TEST_REQUEST_METADATA, + ) + + @pytest.mark.usefixtures("get_featurestore_mock") + def test_batch_serve_to_gcs_with_invalid_gcs_destination_type(self): + + aiplatform.init(project=_TEST_PROJECT) + + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + with pytest.raises(ValueError): + my_featurestore.batch_serve_to_gcs( + gcs_destination_output_uri_prefix=_TEST_GCS_OUTPUT_URI_PREFIX, + gcs_destination_type=_TEST_GCS_DESTINATION_TYPE_INVALID, + entity_type_ids=_TEST_BATCH_READING_ENTITY_TYPE_IDS, + ) + class TestEntityType: def setup_method(self): From f4c9976a34498bd671c8226198152ede36ea9100 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Fri, 14 Jan 2022 17:20:18 -0800 Subject: [PATCH 02/11] fix: change entity_type_ids and entity_type_destination_fields to serving_feature_ids and feature_destination_fields --- .../aiplatform/featurestore/featurestore.py | 316 +++++------------- tests/system/aiplatform/test_featurestore.py | 66 +++- tests/unit/aiplatform/test_featurestores.py | 217 +++++++----- 3 files changed, 278 insertions(+), 321 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/featurestore.py b/google/cloud/aiplatform/featurestore/featurestore.py index f527c89709..5e466eb208 100644 --- a/google/cloud/aiplatform/featurestore/featurestore.py +++ b/google/cloud/aiplatform/featurestore/featurestore.py @@ -15,7 +15,7 @@ # limitations under the License. # -from typing import Dict, List, Optional, Set, Sequence, Tuple, Union +from typing import Dict, List, Optional, Sequence, Tuple, Union from google.auth import credentials as auth_credentials from google.protobuf import field_mask_pb2 @@ -597,132 +597,45 @@ def _batch_read_feature_values( return self - def _validate_and_get_feature_id_and_destination_feature_setting( - self, - feature_destination_fields: Optional[ - Union[Dict[str, str], List[str], Set[str]] - ] = None, - ) -> Tuple[List[str], List[gca_featurestore_service.DestinationFeatureSetting]]: - """Validates and gets feature_ids and destination_feature_settings from feature_destination_fields config. - - Args: - feature_destination_fields (Union[Dict[str, str], List[str], Set[str]]): - Optional. User defined feature_destination_fields config. - - Returns: - Tuple[List[str], List[gca_featurestore_service.DestinationFeatureSetting]] - A list of feature_id and a list of DestinationFeatureSetting list - - Raises: - TypeError - if the type of feature_destination_fields is not supported. - """ - feature_ids = [] - destination_feature_settings = [] - - if not feature_destination_fields: - return feature_ids, destination_feature_settings - - if isinstance(feature_destination_fields, dict): - for ( - feature_id, - feature_detination_field, - ) in feature_destination_fields.items(): - - destination_feature_setting = gca_featurestore_service.DestinationFeatureSetting( - feature_id=feature_id, destination_field=feature_detination_field, - ) - - feature_ids.append(feature_id) - destination_feature_settings.append(destination_feature_setting) - - elif isinstance(feature_destination_fields, (set, list)): - for feature_id in set(feature_destination_fields): - - destination_feature_setting = gca_featurestore_service.DestinationFeatureSetting( - feature_id=feature_id - ) - - feature_ids.append(feature_id) - destination_feature_settings.append(destination_feature_setting) - - else: - raise TypeError( - f"The 'feature_destination_fields' for each entity_type should be a dict, list or set, " - f"instead, got {type(feature_destination_fields)}." - ) - - return feature_ids, destination_feature_settings - def _validate_and_get_batch_read_feature_values_request( self, - entity_type_ids: List[str], + serving_feature_ids: Dict[str, List[str]], destination: Union[ gca_io.BigQueryDestination, gca_io.CsvDestination, gca_io.TFRecordDestination, ], - entity_type_destination_fields: Optional[ - Dict[str, Union[Dict[str, str], List[str], Set[str]]] - ] = None, + feature_destination_fields: Optional[Dict[str, str]] = None, read_instances: Optional[Union[gca_io.BigQuerySource, gca_io.CsvSource]] = None, pass_through_fields: Optional[List[str]] = None, ) -> gca_featurestore_service.BatchReadFeatureValuesRequest: """Validates and gets batch_read_feature_values_request Args: - entity_type_ids (List[str]): - Required. ID of the EntityType to select batch serving Features. The - EntityType id is the specified during EntityType creation. + serving_feature_ids (Dict[str, List[str]]): + Required. A user defined dictionary to define the entity_types and their features for batch serve/read. + The keys of the dictionary are the serving entity_type ids and + the values are lists of serving feature ids in each entity_type. + + Example: + serving_feature_ids = { + 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], + 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], + } + destination (Union[gca_io.BigQueryDestination, gca_io.CsvDestination, gca_io.TFRecordDestination]): Required. BigQuery destination, Csv destination or TFRecord destination. - entity_type_destination_fields (Dict[str, Union[Dict[str, str], List[str], Set[str]]]): - Optional. User defined dictionary to map ID of the EntityType's Features - to the batch serving destination field name. - Specify the features to be batch served in each entityType, and their destination field name. - If the features are not specified, all features will be batch served. - If the destination field name is not specified, Feature ID will be used as destination field name. + feature_destination_fields (Dict[str, str]): + Optional. A user defined dictionary to map a feature's fully qualified resource name to + its destination field name. If the destination field name is not defined, + the feature ID will be used as its destination field name. Example: - - - In case all features will be batch served and using Feature ID as destination field name: - - entity_type_ids = ['my_entity_type_id_1', 'my_entity_type_id_2', 'my_entity_type_id_3'] - - entity_type_destination_fields = {} - or - entity_type_destination_fields = { - 'my_entity_type_id_1': {}, - 'my_entity_type_id_2': [], - 'my_entity_type_id_3': None, - } - - - In case selected features will be batch served and using Feature ID as destination field name: - - entity_type_ids = ['my_entity_type_id_1', 'my_entity_type_id_2', 'my_entity_type_id_3'] - - feature_source_fields = { - 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], - 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], - 'my_entity_type_id_3': ['feature_id_3_1', 'feature_id_3_2'], - } - - - In case selected features will be batch served with specified destination field name - - feature_source_fields = { - 'my_entity_type_id_1': { - 'feature_id_1_1': 'feature_id_1_1_destination_field', - 'feature_id_1_2': 'feature_id_1_2_destination_field', - }, - 'my_entity_type_id_2': { - 'feature_id_2_1': 'feature_id_2_1_destination_field', - 'feature_id_2_2': 'feature_id_2_2_destination_field', - }, - 'my_entity_type_id_3': { - 'feature_id_3_1': 'feature_id_3_1_destination_field', - 'feature_id_3_2': 'feature_id_3_2_destination_field', - }, + feature_destination_fields = { + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id1/features/f_id11': 'foo', + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id2/features/f_id22': 'bar', } - Note: the above three cases can be mixed in use. read_instances (Union[gca_io.BigQuerySource, gca_io.CsvSource]): Optional. BigQuery source or Csv source for read instances. @@ -738,22 +651,39 @@ def _validate_and_get_batch_read_feature_values_request( Returns: gca_featurestore_service.BatchReadFeatureValuesRequest - batch read feature values request """ - entity_type_destination_fields = entity_type_destination_fields or {} + + featurestore_name_components = self._parse_resource_name(self.resource_name) + + feature_destination_fields = feature_destination_fields or {} + entity_type_specs = [] - for entity_type_id in set(entity_type_ids): - ( - feature_ids, - destination_feature_settings, - ) = self._validate_and_get_feature_id_and_destination_feature_setting( - feature_destination_fields=entity_type_destination_fields.get( - entity_type_id + for entity_type_id, feature_ids in serving_feature_ids.items(): + destination_feature_settings = [] + for feature_id in feature_ids: + feature_resource_name = featurestore.Feature._format_resource_name( + project=featurestore_name_components["project"], + location=featurestore_name_components["location"], + featurestore=featurestore_name_components["featurestore"], + entity_type=entity_type_id, + feature=feature_id, ) - ) + + feature_destination_field = feature_destination_fields.get( + feature_resource_name + ) + if feature_destination_field: + destination_feature_setting_proto = gca_featurestore_service.DestinationFeatureSetting( + feature_id=feature_id, + destination_field=feature_destination_field, + ) + destination_feature_settings.append( + destination_feature_setting_proto + ) entity_type_spec = gca_featurestore_service.BatchReadFeatureValuesRequest.EntityTypeSpec( entity_type_id=entity_type_id, feature_selector=gca_feature_selector.FeatureSelector( - id_matcher=gca_feature_selector.IdMatcher(ids=feature_ids or ["*"]) + id_matcher=gca_feature_selector.IdMatcher(ids=feature_ids) ), settings=destination_feature_settings or None, ) @@ -840,10 +770,8 @@ def _get_read_instances( def batch_serve_to_bq( self, bq_destination_output_uri: str, - entity_type_ids: List[str], - entity_type_destination_fields: Optional[ - Dict[str, Union[Dict[str, str], List[str], Set[str]]] - ] = None, + serving_feature_ids: Dict[str, List[str]], + feature_destination_fields: Optional[Dict[str, str]] = None, read_instances: Optional[Union[str, List[str]]] = None, pass_through_fields: Optional[List[str]] = None, request_metadata: Optional[Sequence[Tuple[str, str]]] = (), @@ -860,58 +788,27 @@ def batch_serve_to_bq( It requires an existing BigQuery destination Dataset, under the same project as the Featurestore. - entity_type_ids (List[str]): - Required. ID of the EntityType to select batch serving Features. The - EntityType id is the specified during EntityType creation. - entity_type_destination_fields (Dict[str, Union[Dict[str, str], List[str], Set[str]]]): - Optional. User defined dictionary to map ID of the EntityType's Features - to the batch serving destination field name. - - Specify the features to be batch served in each entityType, and their destination field name. - If the features are not specified, all features will be batch served. - If the destination field name is not specified, Feature ID will be used as destination field name. + serving_feature_ids (Dict[str, List[str]]): + Required. A user defined dictionary to define the entity_types and their features for batch serve/read. + The keys of the dictionary are the serving entity_type ids and + the values are lists of serving feature ids in each entity_type. Example: + serving_feature_ids = { + 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], + 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], + } + + feature_destination_fields (Dict[str, str]): + Optional. A user defined dictionary to map a feature's fully qualified resource name to + its destination field name. If the destination field name is not defined, + the feature ID will be used as its destination field name. - - In case all features will be batch served and using Feature ID as destination field name: - - entity_type_ids = ['my_entity_type_id_1', 'my_entity_type_id_2', 'my_entity_type_id_3'] - - entity_type_destination_fields = {} - or - entity_type_destination_fields = { - 'my_entity_type_id_1': {}, - 'my_entity_type_id_2': [], - 'my_entity_type_id_3': None, - } - - - In case selected features will be batch served and using Feature ID as destination field name: - - entity_type_ids = ['my_entity_type_id_1', 'my_entity_type_id_2', 'my_entity_type_id_3'] - - feature_source_fields = { - 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], - 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], - 'my_entity_type_id_3': ['feature_id_3_1', 'feature_id_3_2'], - } - - - In case selected features will be batch served with specified destination field name - - feature_source_fields = { - 'my_entity_type_id_1': { - 'feature_id_1_1': 'feature_id_1_1_destination_field', - 'feature_id_1_2': 'feature_id_1_2_destination_field', - }, - 'my_entity_type_id_2': { - 'feature_id_2_1': 'feature_id_2_1_destination_field', - 'feature_id_2_2': 'feature_id_2_2_destination_field', - }, - 'my_entity_type_id_3': { - 'feature_id_3_1': 'feature_id_3_1_destination_field', - 'feature_id_3_2': 'feature_id_3_2_destination_field', - }, + Example: + feature_destination_fields = { + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id1/features/f_id11': 'foo', + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id2/features/f_id22': 'bar', } - Note: the above three cases can be mixed in use. read_instances (Union[str, List[str]]): Optional. Read_instances can be either BigQuery URI to the input table, @@ -961,11 +858,11 @@ def batch_serve_to_bq( FailedPrecondition: if the BigQuery destination Dataset/Table is in a different project. """ batch_read_feature_values_request = self._validate_and_get_batch_read_feature_values_request( - entity_type_ids=entity_type_ids, + serving_feature_ids=serving_feature_ids, destination=gca_io.BigQueryDestination( output_uri=bq_destination_output_uri ), - entity_type_destination_fields=entity_type_destination_fields, + feature_destination_fields=feature_destination_fields, read_instances=read_instances if read_instances is None else self._get_read_instances(read_instances), @@ -982,10 +879,8 @@ def batch_serve_to_gcs( self, gcs_destination_output_uri_prefix: str, gcs_destination_type: str, - entity_type_ids: List[str], - entity_type_destination_fields: Optional[ - Dict[str, Union[Dict[str, str], List[str], Set[str]]] - ] = None, + serving_feature_ids: Dict[str, List[str]], + feature_destination_fields: Optional[Dict[str, str]] = None, read_instances: Optional[Union[str, List[str]]] = None, pass_through_fields: Optional[List[str]] = None, request_metadata: Optional[Sequence[Tuple[str, str]]] = (), @@ -1023,58 +918,27 @@ def batch_serve_to_gcs( true -> byte_string("true"), false -> byte_string("false") BOOL, BOOL_ARRAY (true, false) | BYTES_LIST - entity_type_ids (List[str]): - Required. ID of the EntityType to select batch serving Features. The - EntityType id is the specified during EntityType creation. - entity_type_destination_fields (Dict[str, Union[Dict[str, str], List[str], Set[str]]]): - Optional. User defined dictionary to map ID of the EntityType's Features - to the batch serving destination field name. - - Specify the features to be batch served in each entityType, and their destination field name. - If the features are not specified, all features will be batch served. - If the destination field name is not specified, Feature ID will be used as destination field name. + serving_feature_ids (Dict[str, List[str]]): + Required. A user defined dictionary to define the entity_types and their features for batch serve/read. + The keys of the dictionary are the serving entity_type ids and + the values are lists of serving feature ids in each entity_type. Example: + serving_feature_ids = { + 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], + 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], + } - - In case all features will be batch served and using Feature ID as destination field name: - - entity_type_ids = ['my_entity_type_id_1', 'my_entity_type_id_2', 'my_entity_type_id_3'] - - entity_type_destination_fields = {} - or - entity_type_destination_fields = { - 'my_entity_type_id_1': {}, - 'my_entity_type_id_2': [], - 'my_entity_type_id_3': None, - } - - - In case selected features will be batch served and using Feature ID as destination field name: - - entity_type_ids = ['my_entity_type_id_1', 'my_entity_type_id_2', 'my_entity_type_id_3'] - - feature_source_fields = { - 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], - 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], - 'my_entity_type_id_3': ['feature_id_3_1', 'feature_id_3_2'], - } - - - In case selected features will be batch served with specified destination field name - - feature_source_fields = { - 'my_entity_type_id_1': { - 'feature_id_1_1': 'feature_id_1_1_destination_field', - 'feature_id_1_2': 'feature_id_1_2_destination_field', - }, - 'my_entity_type_id_2': { - 'feature_id_2_1': 'feature_id_2_1_destination_field', - 'feature_id_2_2': 'feature_id_2_2_destination_field', - }, - 'my_entity_type_id_3': { - 'feature_id_3_1': 'feature_id_3_1_destination_field', - 'feature_id_3_2': 'feature_id_3_2_destination_field', - }, + feature_destination_fields (Dict[str, str]): + Optional. A user defined dictionary to map a feature's fully qualified resource name to + its destination field name. If the destination field name is not defined, + the feature ID will be used as its destination field name. + + Example: + feature_destination_fields = { + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id1/features/f_id11': 'foo', + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id2/features/f_id22': 'bar', } - Note: the above three cases can be mixed in use. read_instances (Union[str, List[str]]): Optional. Read_instances can be either BigQuery URI to the input table, @@ -1142,9 +1006,9 @@ def batch_serve_to_gcs( destination = gca_io.TFRecordDestination(gcs_destination=gcs_destination) batch_read_feature_values_request = self._validate_and_get_batch_read_feature_values_request( - entity_type_ids=entity_type_ids, + serving_feature_ids=serving_feature_ids, destination=destination, - entity_type_destination_fields=entity_type_destination_fields, + feature_destination_fields=feature_destination_fields, read_instances=read_instances if read_instances is None else self._get_read_instances(read_instances), diff --git a/tests/system/aiplatform/test_featurestore.py b/tests/system/aiplatform/test_featurestore.py index 8159f5faf5..b67dec6883 100644 --- a/tests/system/aiplatform/test_featurestore.py +++ b/tests/system/aiplatform/test_featurestore.py @@ -140,7 +140,7 @@ def test_create_get_list_features(self, shared_state): user_age_feature = user_entity_type.create_feature( feature_id=_TEST_USER_AGE_FEATURE_ID, value_type="INT64" ) - + shared_state["user_age_feature_resource_name"] = user_age_feature.resource_name get_user_age_feature = user_entity_type.get_feature( feature_id=_TEST_USER_AGE_FEATURE_ID ) @@ -151,6 +151,9 @@ def test_create_get_list_features(self, shared_state): value_type="STRING", entity_type_name=user_entity_type_name, ) + shared_state[ + "user_gender_feature_resource_name" + ] = user_gender_feature.resource_name get_user_gender_feature = aiplatform.Feature( feature_name=user_gender_feature.resource_name @@ -162,6 +165,9 @@ def test_create_get_list_features(self, shared_state): user_liked_genres_feature = user_entity_type.create_feature( feature_id=_TEST_USER_LIKED_GENRES_FEATURE_ID, value_type="STRING_ARRAY", ) + shared_state[ + "user_liked_genres_feature_resource_name" + ] = user_liked_genres_feature.resource_name get_user_liked_genres_feature = aiplatform.Feature( feature_name=user_liked_genres_feature.resource_name @@ -263,8 +269,19 @@ def test_batch_serve_to_gcs(self, shared_state, caplog): assert shared_state["featurestore"] assert shared_state["bucket"] + assert shared_state["user_age_feature_resource_name"] + assert shared_state["user_gender_feature_resource_name"] + assert shared_state["user_liked_genres_feature_resource_name"] + featurestore = shared_state["featurestore"] bucket_name = shared_state["staging_bucket_name"] + user_age_feature_resource_name = shared_state["user_age_feature_resource_name"] + user_gender_feature_resource_name = shared_state[ + "user_gender_feature_resource_name" + ] + user_liked_genres_feature_resource_name = shared_state[ + "user_liked_genres_feature_resource_name" + ] aiplatform.init( project=e2e_base._PROJECT, location=e2e_base._LOCATION, @@ -273,7 +290,23 @@ def test_batch_serve_to_gcs(self, shared_state, caplog): caplog.set_level(logging.INFO) featurestore.batch_serve_to_gcs( - entity_type_ids=[_TEST_USER_ENTITY_TYPE_ID, _TEST_MOVIE_ENTITY_TYPE_ID], + serving_feature_ids={ + _TEST_USER_ENTITY_TYPE_ID: [ + _TEST_USER_AGE_FEATURE_ID, + _TEST_USER_GENDER_FEATURE_ID, + _TEST_USER_LIKED_GENRES_FEATURE_ID, + ], + _TEST_MOVIE_ENTITY_TYPE_ID: [ + _TEST_MOVIE_TITLE_FEATURE_ID, + _TEST_MOVIE_GENRES_FEATURE_ID, + _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID, + ], + }, + feature_destination_fields={ + user_age_feature_resource_name: "user_age_dest", + user_gender_feature_resource_name: "user_gender_dest", + user_liked_genres_feature_resource_name: "user_liked_genres_dest", + }, read_instances=_TEST_READ_INSTANCE_SRC, gcs_destination_output_uri_prefix=f"gs://{bucket_name}/featurestore_test/tfrecord", gcs_destination_type="tfrecord", @@ -286,8 +319,19 @@ def test_batch_serve_to_bq(self, shared_state, caplog): assert shared_state["featurestore"] assert shared_state["bigquery_dataset"] + assert shared_state["user_age_feature_resource_name"] + assert shared_state["user_gender_feature_resource_name"] + assert shared_state["user_liked_genres_feature_resource_name"] + featurestore = shared_state["featurestore"] bigquery_dataset_id = shared_state["bigquery_dataset_id"] + user_age_feature_resource_name = shared_state["user_age_feature_resource_name"] + user_gender_feature_resource_name = shared_state[ + "user_gender_feature_resource_name" + ] + user_liked_genres_feature_resource_name = shared_state[ + "user_liked_genres_feature_resource_name" + ] aiplatform.init( project=e2e_base._PROJECT, location=e2e_base._LOCATION, @@ -296,7 +340,23 @@ def test_batch_serve_to_bq(self, shared_state, caplog): caplog.set_level(logging.INFO) featurestore.batch_serve_to_bq( - entity_type_ids=[_TEST_USER_ENTITY_TYPE_ID, _TEST_MOVIE_ENTITY_TYPE_ID], + serving_feature_ids={ + _TEST_USER_ENTITY_TYPE_ID: [ + _TEST_USER_AGE_FEATURE_ID, + _TEST_USER_GENDER_FEATURE_ID, + _TEST_USER_LIKED_GENRES_FEATURE_ID, + ], + _TEST_MOVIE_ENTITY_TYPE_ID: [ + _TEST_MOVIE_TITLE_FEATURE_ID, + _TEST_MOVIE_GENRES_FEATURE_ID, + _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID, + ], + }, + feature_destination_fields={ + user_age_feature_resource_name: "user_age_dest", + user_gender_feature_resource_name: "user_gender_dest", + user_liked_genres_feature_resource_name: "user_liked_genres_dest", + }, read_instances=_TEST_READ_INSTANCE_SRC, bq_destination_output_uri=f"bq://{bigquery_dataset_id}.test_table", ) diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index 7bce8ce45d..ddfaf13da7 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -207,7 +207,10 @@ "my_feature_id_1": "my_feature_id_1_source_field", } -_TEST_BATCH_READING_ENTITY_TYPE_IDS = ["my_entity_type_id_1"] +_TEST_SERVING_FEATURE_IDS = { + "my_entity_type_id_1": ["my_feature_id_1_1", "my_feature_id_1_2"], + "my_entity_type_id_2": ["my_feature_id_2_1", "my_feature_id_2_2"], +} _TEST_FEATURE_TIME_FIELD = "feature_time_field" _TEST_FEATURE_TIME = datetime.datetime.now() @@ -261,21 +264,22 @@ ) -def _get_feature_destination_setting_proto(feature_id, destination_field=None): - destination_feature_setting_proto = gca_featurestore_service.DestinationFeatureSetting( - feature_id=feature_id - ) - if destination_field: - destination_feature_setting_proto.destination_field = destination_field - return destination_feature_setting_proto - - -def _get_entity_type_spec_proto_with_all_features(entity_type_id): +def _get_entity_type_spec_proto_with_feature_ids( + entity_type_id, feature_ids, feature_destination_fields=None +): + feature_destination_fields = feature_destination_fields or {} entity_type_spec_proto = gca_featurestore_service.BatchReadFeatureValuesRequest.EntityTypeSpec( entity_type_id=entity_type_id, feature_selector=gca_feature_selector.FeatureSelector( - id_matcher=gca_feature_selector.IdMatcher(ids=["*"]) + id_matcher=gca_feature_selector.IdMatcher(ids=feature_ids) ), + settings=[ + gca_featurestore_service.DestinationFeatureSetting( + feature_id=feature_id, destination_field=feature_destination_field + ) + for feature_id, feature_destination_field in feature_destination_fields.items() + ] + or None, ) return entity_type_spec_proto @@ -925,79 +929,84 @@ def test_create_featurestore(self, create_featurestore_mock, sync): @pytest.mark.usefixtures("get_featurestore_mock") @pytest.mark.parametrize( - "feature_destination_fields, expected", + "serving_feature_ids, feature_destination_fields, expected_entity_type_specs", [ - ({}, ([], [])), - ([], ([], [])), - (None, ([], [])), ( - {"feature_id_1": "feature_id_1_destination_field"}, - ( - ["feature_id_1"], - [ - _get_feature_destination_setting_proto( - "feature_id_1", "feature_id_1_destination_field" - ) - ], - ), - ), - ( - ["feature_id_1"], - ( - ["feature_id_1"], - [_get_feature_destination_setting_proto("feature_id_1")], - ), + { + "my_entity_type_id_1": ["my_feature_id_1_1", "my_feature_id_1_2"], + "my_entity_type_id_2": ["my_feature_id_2_1", "my_feature_id_2_2"], + }, + None, + [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + ), + ], ), ( - {"feature_id_1"}, - ( - ["feature_id_1"], - [_get_feature_destination_setting_proto("feature_id_1")], - ), + { + "my_entity_type_id_1": ["my_feature_id_1_1", "my_feature_id_1_2"], + "my_entity_type_id_2": ["my_feature_id_2_1", "my_feature_id_2_2"], + }, + { + f"{_TEST_FEATURESTORE_NAME}/entityTypes/my_entity_type_id_1/features/my_feature_id_1_1": "my_feature_id_1_1_dest", + f"{_TEST_FEATURESTORE_NAME}/entityTypes/my_entity_type_id_1/features/my_feature_id_1_2": "my_feature_id_1_2_dest", + }, + [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + feature_destination_fields={ + "my_feature_id_1_1": "my_feature_id_1_1_dest", + "my_feature_id_1_2": "my_feature_id_1_2_dest", + }, + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + ), + ], ), ( - {"feature_id_1": None}, - ( - ["feature_id_1"], - [_get_feature_destination_setting_proto("feature_id_1")], - ), + { + "my_entity_type_id_1": ["my_feature_id_1_1", "my_feature_id_1_2"], + "my_entity_type_id_2": ["my_feature_id_2_1", "my_feature_id_2_2"], + }, + { + f"{_TEST_FEATURESTORE_NAME}/entityTypes/my_entity_type_id_1/features/my_feature_id_1_1": "my_feature_id_1_1_dest", + f"{_TEST_FEATURESTORE_NAME}/entityTypes/my_entity_type_id_2/features/my_feature_id_2_1": "my_feature_id_2_1_dest", + }, + [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + feature_destination_fields={ + "my_feature_id_1_1": "my_feature_id_1_1_dest" + }, + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + feature_destination_fields={ + "my_feature_id_2_1": "my_feature_id_2_1_dest" + }, + ), + ], ), ], ) - def test_validate_and_get_feature_id_and_destination_feature_setting( - self, feature_destination_fields, expected - ): - aiplatform.init(project=_TEST_PROJECT) - - my_featurestore = aiplatform.Featurestore( - featurestore_name=_TEST_FEATURESTORE_NAME - ) - assert ( - expected - == my_featurestore._validate_and_get_feature_id_and_destination_feature_setting( - feature_destination_fields=feature_destination_fields - ) - ) - - @pytest.mark.usefixtures("get_featurestore_mock") - @pytest.mark.parametrize( - "feature_destination_fields", ["feature_id_1", {"feature_id_1": 1}, 1], - ) - def test_validate_and_get_feature_id_and_destination_feature_setting_with_raise( - self, feature_destination_fields + def test_validate_and_get_batch_read_feature_values_request( + self, + serving_feature_ids, + feature_destination_fields, + expected_entity_type_specs, ): - aiplatform.init(project=_TEST_PROJECT) - my_featurestore = aiplatform.Featurestore( - featurestore_name=_TEST_FEATURESTORE_NAME - ) - with pytest.raises(TypeError): - my_featurestore._validate_and_get_feature_id_and_destination_feature_setting( - feature_destination_fields=feature_destination_fields - ) - - @pytest.mark.usefixtures("get_featurestore_mock") - def test_validate_and_get_batch_read_feature_values_request(self,): aiplatform.init(project=_TEST_PROJECT) my_featurestore = aiplatform.Featurestore( featurestore_name=_TEST_FEATURESTORE_NAME @@ -1007,15 +1016,14 @@ def test_validate_and_get_batch_read_feature_values_request(self,): destination=gca_featurestore_service.FeatureValueDestination( bigquery_destination=_TEST_BQ_DESTINATION, ), - entity_type_specs=[ - _get_entity_type_spec_proto_with_all_features("my_entity_type_id_1") - ], + entity_type_specs=expected_entity_type_specs, ) assert ( expected_batch_read_feature_values_request == my_featurestore._validate_and_get_batch_read_feature_values_request( - entity_type_ids=_TEST_BATCH_READING_ENTITY_TYPE_IDS, + serving_feature_ids=serving_feature_ids, destination=_TEST_BQ_DESTINATION, + feature_destination_fields=feature_destination_fields, ) ) @@ -1023,25 +1031,32 @@ def test_validate_and_get_batch_read_feature_values_request(self,): def test_validate_and_get_batch_read_feature_values_request_with_read_instances( self, ): - aiplatform.init(project=_TEST_PROJECT) my_featurestore = aiplatform.Featurestore( featurestore_name=_TEST_FEATURESTORE_NAME ) + expected_entity_type_specs = [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + ), + ] expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( featurestore=my_featurestore.resource_name, destination=gca_featurestore_service.FeatureValueDestination( bigquery_destination=_TEST_BQ_DESTINATION, ), - entity_type_specs=[ - _get_entity_type_spec_proto_with_all_features("my_entity_type_id_1") - ], + entity_type_specs=expected_entity_type_specs, bigquery_read_instances=_TEST_BQ_SOURCE, ) assert ( expected_batch_read_feature_values_request == my_featurestore._validate_and_get_batch_read_feature_values_request( - entity_type_ids=_TEST_BATCH_READING_ENTITY_TYPE_IDS, + serving_feature_ids=_TEST_SERVING_FEATURE_IDS, destination=_TEST_BQ_DESTINATION, read_instances=_TEST_BQ_SOURCE, ) @@ -1104,19 +1119,28 @@ def test_batch_serve_to_bq(self, batch_read_feature_values_mock, sync): featurestore_name=_TEST_FEATURESTORE_NAME ) + expected_entity_type_specs = [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + ), + ] + expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( featurestore=my_featurestore.resource_name, destination=gca_featurestore_service.FeatureValueDestination( bigquery_destination=_TEST_BQ_DESTINATION, ), - entity_type_specs=[ - _get_entity_type_spec_proto_with_all_features("my_entity_type_id_1") - ], + entity_type_specs=expected_entity_type_specs, ) my_featurestore.batch_serve_to_bq( bq_destination_output_uri=_TEST_BQ_DESTINATION_URI, - entity_type_ids=_TEST_BATCH_READING_ENTITY_TYPE_IDS, + serving_feature_ids=_TEST_SERVING_FEATURE_IDS, sync=sync, ) @@ -1136,20 +1160,29 @@ def test_batch_serve_to_gcs(self, batch_read_feature_values_mock, sync): featurestore_name=_TEST_FEATURESTORE_NAME ) + expected_entity_type_specs = [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + ), + ] + expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( featurestore=my_featurestore.resource_name, destination=gca_featurestore_service.FeatureValueDestination( tfrecord_destination=_TEST_TFRECORD_DESTINATION, ), - entity_type_specs=[ - _get_entity_type_spec_proto_with_all_features("my_entity_type_id_1") - ], + entity_type_specs=expected_entity_type_specs, ) my_featurestore.batch_serve_to_gcs( gcs_destination_output_uri_prefix=_TEST_GCS_OUTPUT_URI_PREFIX, gcs_destination_type=_TEST_GCS_DESTINATION_TYPE_TFRECORD, - entity_type_ids=_TEST_BATCH_READING_ENTITY_TYPE_IDS, + serving_feature_ids=_TEST_SERVING_FEATURE_IDS, sync=sync, ) @@ -1173,7 +1206,7 @@ def test_batch_serve_to_gcs_with_invalid_gcs_destination_type(self): my_featurestore.batch_serve_to_gcs( gcs_destination_output_uri_prefix=_TEST_GCS_OUTPUT_URI_PREFIX, gcs_destination_type=_TEST_GCS_DESTINATION_TYPE_INVALID, - entity_type_ids=_TEST_BATCH_READING_ENTITY_TYPE_IDS, + serving_feature_ids=_TEST_SERVING_FEATURE_IDS, ) From 8f85a88c8a515624223a38eed22b82768b853307 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Fri, 14 Jan 2022 21:18:47 -0800 Subject: [PATCH 03/11] fix: remove white space --- tests/unit/aiplatform/test_featurestores.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index ddfaf13da7..97cec0056f 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -282,8 +282,8 @@ def _get_entity_type_spec_proto_with_feature_ids( or None, ) return entity_type_spec_proto - - + + def _get_header_proto(feature_ids): header_proto = copy.deepcopy(_TEST_BASE_HEADER_PROTO) header_proto.feature_descriptors = [ From 758b0dd1abacdc247b5b714b1635efe19310f44d Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Tue, 25 Jan 2022 09:13:17 -0800 Subject: [PATCH 04/11] Update google/cloud/aiplatform/featurestore/featurestore.py Co-authored-by: sasha-gitg <44654632+sasha-gitg@users.noreply.github.com> --- google/cloud/aiplatform/featurestore/featurestore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/featurestore/featurestore.py b/google/cloud/aiplatform/featurestore/featurestore.py index 5e466eb208..33852334c9 100644 --- a/google/cloud/aiplatform/featurestore/featurestore.py +++ b/google/cloud/aiplatform/featurestore/featurestore.py @@ -576,7 +576,7 @@ def _batch_read_feature_values( Optional. Strings which should be sent along with the request as metadata. Returns: - Featurestore - The featurestore resource object batch read feature values from. + Featurestore: The featurestore resource object batch read feature values from. """ _LOGGER.log_action_start_against_resource( From 79a340bc242330b128de659306ba9dd5115db333 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Tue, 25 Jan 2022 09:13:25 -0800 Subject: [PATCH 05/11] Update google/cloud/aiplatform/featurestore/featurestore.py Co-authored-by: sasha-gitg <44654632+sasha-gitg@users.noreply.github.com> --- google/cloud/aiplatform/featurestore/featurestore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/featurestore/featurestore.py b/google/cloud/aiplatform/featurestore/featurestore.py index 33852334c9..9e68e08bdc 100644 --- a/google/cloud/aiplatform/featurestore/featurestore.py +++ b/google/cloud/aiplatform/featurestore/featurestore.py @@ -649,7 +649,7 @@ def _validate_and_get_batch_read_feature_values_request( pass-through values will be passed as opaque bytes. Returns: - gca_featurestore_service.BatchReadFeatureValuesRequest - batch read feature values request + gca_featurestore_service.BatchReadFeatureValuesRequest: batch read feature values request """ featurestore_name_components = self._parse_resource_name(self.resource_name) From 7e0a18bb7908ad1d521e8bacaf2950902bd840ae Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Tue, 25 Jan 2022 09:13:37 -0800 Subject: [PATCH 06/11] Update google/cloud/aiplatform/featurestore/featurestore.py Co-authored-by: sasha-gitg <44654632+sasha-gitg@users.noreply.github.com> --- google/cloud/aiplatform/featurestore/featurestore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/featurestore/featurestore.py b/google/cloud/aiplatform/featurestore/featurestore.py index 9e68e08bdc..bee3d8b25b 100644 --- a/google/cloud/aiplatform/featurestore/featurestore.py +++ b/google/cloud/aiplatform/featurestore/featurestore.py @@ -851,7 +851,7 @@ def batch_serve_to_bq( pass-through values will be passed as opaque bytes. Returns: - Featurestore - The featurestore resource object batch read feature values from. + Featurestore: The featurestore resource object batch read feature values from. Raises: NotFound: if the BigQuery destination Dataset does not exist. From a49d2b4577a48b616adc88d794024f10ac547cd8 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Tue, 25 Jan 2022 09:13:51 -0800 Subject: [PATCH 07/11] Update google/cloud/aiplatform/featurestore/featurestore.py Co-authored-by: sasha-gitg <44654632+sasha-gitg@users.noreply.github.com> --- google/cloud/aiplatform/featurestore/featurestore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/featurestore/featurestore.py b/google/cloud/aiplatform/featurestore/featurestore.py index bee3d8b25b..12a2237fee 100644 --- a/google/cloud/aiplatform/featurestore/featurestore.py +++ b/google/cloud/aiplatform/featurestore/featurestore.py @@ -981,7 +981,7 @@ def batch_serve_to_gcs( pass-through values will be passed as opaque bytes. Returns: - Featurestore - The featurestore resource object batch read feature values from. + Featurestore: The featurestore resource object batch read feature values from. Raises: ValueError if gcs_destination_type is not supported. From c191c48a2820c8059c7805b155fc01241f0baea7 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Tue, 25 Jan 2022 09:13:58 -0800 Subject: [PATCH 08/11] Update google/cloud/aiplatform/featurestore/featurestore.py Co-authored-by: sasha-gitg <44654632+sasha-gitg@users.noreply.github.com> --- google/cloud/aiplatform/featurestore/featurestore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/featurestore/featurestore.py b/google/cloud/aiplatform/featurestore/featurestore.py index 12a2237fee..5e54f658c1 100644 --- a/google/cloud/aiplatform/featurestore/featurestore.py +++ b/google/cloud/aiplatform/featurestore/featurestore.py @@ -732,7 +732,7 @@ def _get_read_instances( or Google Cloud Storage URI(-s) to the csv file(s). Returns: - Union[gca_io.BigQuerySource, gca_io.CsvSource] - BigQuery source or Csv source for read instances. + Union[gca_io.BigQuerySource, gca_io.CsvSource]: BigQuery source or Csv source for read instances. Raises: TypeError if read_instances is not a string or a list of strings. From d949b10b15adc312645a44f87072ef7b01d60a57 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Tue, 25 Jan 2022 09:22:17 -0800 Subject: [PATCH 09/11] fix: Featurestore create method example usage --- google/cloud/aiplatform/featurestore/featurestore.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/featurestore.py b/google/cloud/aiplatform/featurestore/featurestore.py index 5e54f658c1..6d02bb9f76 100644 --- a/google/cloud/aiplatform/featurestore/featurestore.py +++ b/google/cloud/aiplatform/featurestore/featurestore.py @@ -389,14 +389,8 @@ def create( Example Usage: - my_entity_type = aiplatform.EntityType.create( - entity_type_id='my_entity_type_id', - featurestore_name='projects/123/locations/us-central1/featurestores/my_featurestore_id' - ) - or - my_entity_type = aiplatform.EntityType.create( - entity_type_id='my_entity_type_id', - featurestore_name='my_featurestore_id', + my_featurestore = aiplatform.Featurestore.create( + featurestore_id='my_featurestore_id', ) Args: From 90bbfc9a185fbde4fd46c3935e8e1a0f77db612d Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Tue, 25 Jan 2022 11:53:34 -0800 Subject: [PATCH 10/11] fix: get_timestamp_proto for millisecond precision cap --- google/cloud/aiplatform/utils/__init__.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/google/cloud/aiplatform/utils/__init__.py b/google/cloud/aiplatform/utils/__init__.py index 26b28dcdd7..c5c21a2a0b 100644 --- a/google/cloud/aiplatform/utils/__init__.py +++ b/google/cloud/aiplatform/utils/__init__.py @@ -628,9 +628,11 @@ def get_timestamp_proto( """ if not time: time = datetime.datetime.now() - t = time.timestamp() - seconds = int(t) - # must not have higher than millisecond precision. - nanos = int((t % 1 * 1e6) * 1e3) - return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos) + time_str = time.isoformat(sep=" ", timespec="milliseconds") + time = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S.%f") + + timestamp_proto = timestamp_pb2.Timestamp() + timestamp_proto.FromDatetime(time) + + return timestamp_proto From 45f0a02886540f510220ee2a5d21c3a67fa55fbf Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Tue, 25 Jan 2022 14:18:07 -0800 Subject: [PATCH 11/11] fix: unit tests for get_timestamp_proto --- tests/unit/aiplatform/test_utils.py | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/tests/unit/aiplatform/test_utils.py b/tests/unit/aiplatform/test_utils.py index d4840609b1..b47eb684d8 100644 --- a/tests/unit/aiplatform/test_utils.py +++ b/tests/unit/aiplatform/test_utils.py @@ -19,7 +19,6 @@ import pytest from typing import Callable, Dict, Optional import datetime -from decimal import Decimal from google.protobuf import timestamp_pb2 @@ -326,28 +325,8 @@ def test_client_w_override_select_version(): @pytest.mark.parametrize( "year,month,day,hour,minute,second,microsecond,expected_seconds,expected_nanos", [ - ( - 2021, - 12, - 23, - 23, - 59, - 59, - 999999, - 1640303999, - int(str(Decimal(1640303999.999999)).split(".")[1][:9]), - ), - ( - 2013, - 1, - 1, - 1, - 1, - 1, - 199999, - 1357002061, - int(str(Decimal(1357002061.199999)).split(".")[1][:9]), - ), + (2021, 12, 23, 23, 59, 59, 999999, 1640303999, 999000000,), + (2013, 1, 1, 1, 1, 1, 199999, 1357002061, 199000000,), ], ) def test_get_timestamp_proto( @@ -369,7 +348,6 @@ def test_get_timestamp_proto( minute=minute, second=second, microsecond=microsecond, - tzinfo=datetime.timezone.utc, ) true_timestamp_proto = timestamp_pb2.Timestamp( seconds=expected_seconds, nanos=expected_nanos