diff --git a/google/cloud/aiplatform/featurestore/featurestore.py b/google/cloud/aiplatform/featurestore/featurestore.py index 4b98ccfd7d..6d02bb9f76 100644 --- a/google/cloud/aiplatform/featurestore/featurestore.py +++ b/google/cloud/aiplatform/featurestore/featurestore.py @@ -15,13 +15,18 @@ # limitations under the License. # -from typing import Dict, List, Optional, Sequence, Tuple +from typing import Dict, List, Optional, Sequence, Tuple, Union from google.auth import credentials as auth_credentials from google.protobuf import field_mask_pb2 from google.cloud.aiplatform import base -from google.cloud.aiplatform.compat.types import featurestore as gca_featurestore +from google.cloud.aiplatform.compat.types import ( + feature_selector as gca_feature_selector, + featurestore as gca_featurestore, + featurestore_service as gca_featurestore_service, + io as gca_io, +) from google.cloud.aiplatform import featurestore from google.cloud.aiplatform import initializer from google.cloud.aiplatform import utils @@ -384,14 +389,8 @@ def create( Example Usage: - my_entity_type = aiplatform.EntityType.create( - entity_type_id='my_entity_type_id', - featurestore_name='projects/123/locations/us-central1/featurestores/my_featurestore_id' - ) - or - my_entity_type = aiplatform.EntityType.create( - entity_type_id='my_entity_type_id', - featurestore_name='my_featurestore_id', + my_featurestore = aiplatform.Featurestore.create( + featurestore_id='my_featurestore_id', ) Args: @@ -556,3 +555,461 @@ def create_entity_type( request_metadata=request_metadata, sync=sync, ) + + def _batch_read_feature_values( + self, + batch_read_feature_values_request: gca_featurestore_service.BatchReadFeatureValuesRequest, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + ) -> "Featurestore": + """Batch read Feature values from the Featurestore to a destination storage. + + Args: + batch_read_feature_values_request (gca_featurestore_service.BatchReadFeatureValuesRequest): + Required. Request of batch read feature values. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + + Returns: + Featurestore: The featurestore resource object batch read feature values from. + """ + + _LOGGER.log_action_start_against_resource( + "Serving", "feature values", self, + ) + + batch_read_lro = self.api_client.batch_read_feature_values( + request=batch_read_feature_values_request, metadata=request_metadata, + ) + + _LOGGER.log_action_started_against_resource_with_lro( + "Serve", "feature values", self.__class__, batch_read_lro + ) + + batch_read_lro.result() + + _LOGGER.log_action_completed_against_resource("feature values", "served", self) + + return self + + def _validate_and_get_batch_read_feature_values_request( + self, + serving_feature_ids: Dict[str, List[str]], + destination: Union[ + gca_io.BigQueryDestination, + gca_io.CsvDestination, + gca_io.TFRecordDestination, + ], + feature_destination_fields: Optional[Dict[str, str]] = None, + read_instances: Optional[Union[gca_io.BigQuerySource, gca_io.CsvSource]] = None, + pass_through_fields: Optional[List[str]] = None, + ) -> gca_featurestore_service.BatchReadFeatureValuesRequest: + """Validates and gets batch_read_feature_values_request + + Args: + serving_feature_ids (Dict[str, List[str]]): + Required. A user defined dictionary to define the entity_types and their features for batch serve/read. + The keys of the dictionary are the serving entity_type ids and + the values are lists of serving feature ids in each entity_type. + + Example: + serving_feature_ids = { + 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], + 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], + } + + destination (Union[gca_io.BigQueryDestination, gca_io.CsvDestination, gca_io.TFRecordDestination]): + Required. BigQuery destination, Csv destination or TFRecord destination. + + feature_destination_fields (Dict[str, str]): + Optional. A user defined dictionary to map a feature's fully qualified resource name to + its destination field name. If the destination field name is not defined, + the feature ID will be used as its destination field name. + + Example: + feature_destination_fields = { + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id1/features/f_id11': 'foo', + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id2/features/f_id22': 'bar', + } + + read_instances (Union[gca_io.BigQuerySource, gca_io.CsvSource]): + Optional. BigQuery source or Csv source for read instances. + pass_through_fields (List[str]): + Optional. When not empty, the specified fields in the + read_instances source will be joined as-is in the output, + in addition to those fields from the Featurestore Entity. + + For BigQuery source, the type of the pass-through values + will be automatically inferred. For CSV source, the + pass-through values will be passed as opaque bytes. + + Returns: + gca_featurestore_service.BatchReadFeatureValuesRequest: batch read feature values request + """ + + featurestore_name_components = self._parse_resource_name(self.resource_name) + + feature_destination_fields = feature_destination_fields or {} + + entity_type_specs = [] + for entity_type_id, feature_ids in serving_feature_ids.items(): + destination_feature_settings = [] + for feature_id in feature_ids: + feature_resource_name = featurestore.Feature._format_resource_name( + project=featurestore_name_components["project"], + location=featurestore_name_components["location"], + featurestore=featurestore_name_components["featurestore"], + entity_type=entity_type_id, + feature=feature_id, + ) + + feature_destination_field = feature_destination_fields.get( + feature_resource_name + ) + if feature_destination_field: + destination_feature_setting_proto = gca_featurestore_service.DestinationFeatureSetting( + feature_id=feature_id, + destination_field=feature_destination_field, + ) + destination_feature_settings.append( + destination_feature_setting_proto + ) + + entity_type_spec = gca_featurestore_service.BatchReadFeatureValuesRequest.EntityTypeSpec( + entity_type_id=entity_type_id, + feature_selector=gca_feature_selector.FeatureSelector( + id_matcher=gca_feature_selector.IdMatcher(ids=feature_ids) + ), + settings=destination_feature_settings or None, + ) + entity_type_specs.append(entity_type_spec) + + batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( + featurestore=self.resource_name, entity_type_specs=entity_type_specs, + ) + + if isinstance(destination, gca_io.BigQueryDestination): + batch_read_feature_values_request.destination = gca_featurestore_service.FeatureValueDestination( + bigquery_destination=destination + ) + elif isinstance(destination, gca_io.CsvDestination): + batch_read_feature_values_request.destination = gca_featurestore_service.FeatureValueDestination( + csv_destination=destination + ) + elif isinstance(destination, gca_io.TFRecordDestination): + batch_read_feature_values_request.destination = gca_featurestore_service.FeatureValueDestination( + tfrecord_destination=destination + ) + + if isinstance(read_instances, gca_io.BigQuerySource): + batch_read_feature_values_request.bigquery_read_instances = read_instances + elif isinstance(read_instances, gca_io.CsvSource): + batch_read_feature_values_request.csv_read_instances = read_instances + + if pass_through_fields is not None: + batch_read_feature_values_request.pass_through_fields = [ + gca_featurestore_service.BatchReadFeatureValuesRequest.PassThroughField( + field_name=pass_through_field + ) + for pass_through_field in pass_through_fields + ] + + return batch_read_feature_values_request + + def _get_read_instances( + self, read_instances: Union[str, List[str]], + ) -> Union[gca_io.BigQuerySource, gca_io.CsvSource]: + """Gets read_instances + + Args: + read_instances (Union[str, List[str]]): + Required. Read_instances can be either BigQuery URI to the input table, + or Google Cloud Storage URI(-s) to the csv file(s). + + Returns: + Union[gca_io.BigQuerySource, gca_io.CsvSource]: BigQuery source or Csv source for read instances. + + Raises: + TypeError if read_instances is not a string or a list of strings. + ValueError if read_instances uri does not start with 'bq://' or 'gs://'. + ValueError if uris in read_instances do not start with 'gs://'. + """ + if isinstance(read_instances, str): + if not ( + read_instances.startswith("bq://") or read_instances.startswith("gs://") + ): + raise ValueError( + "The read_instances accepts a single uri starts with 'bq://' or 'gs://'." + ) + elif isinstance(read_instances, list) and all( + [isinstance(e, str) for e in read_instances] + ): + if not all([e.startswith("gs://") for e in read_instances]): + raise ValueError( + "The read_instances accepts a list of uris start with 'gs://' only." + ) + else: + raise TypeError( + "The read_instances type should to be either a str or a List[str]." + ) + + if isinstance(read_instances, str): + if read_instances.startswith("bq://"): + return gca_io.BigQuerySource(input_uri=read_instances) + else: + read_instances = [read_instances] + + return gca_io.CsvSource(gcs_source=gca_io.GcsSource(uris=read_instances)) + + @base.optional_sync(return_input_arg="self") + def batch_serve_to_bq( + self, + bq_destination_output_uri: str, + serving_feature_ids: Dict[str, List[str]], + feature_destination_fields: Optional[Dict[str, str]] = None, + read_instances: Optional[Union[str, List[str]]] = None, + pass_through_fields: Optional[List[str]] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + sync: bool = True, + ) -> "Featurestore": + """ Batch serves feature values to BigQuery destination + + Args: + bq_destination_output_uri (str): + Required. BigQuery URI to the detination table. + + Example: + 'bq://project.dataset.table_name' + + It requires an existing BigQuery destination Dataset, under the same project as the Featurestore. + + serving_feature_ids (Dict[str, List[str]]): + Required. A user defined dictionary to define the entity_types and their features for batch serve/read. + The keys of the dictionary are the serving entity_type ids and + the values are lists of serving feature ids in each entity_type. + + Example: + serving_feature_ids = { + 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], + 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], + } + + feature_destination_fields (Dict[str, str]): + Optional. A user defined dictionary to map a feature's fully qualified resource name to + its destination field name. If the destination field name is not defined, + the feature ID will be used as its destination field name. + + Example: + feature_destination_fields = { + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id1/features/f_id11': 'foo', + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id2/features/f_id22': 'bar', + } + + read_instances (Union[str, List[str]]): + Optional. Read_instances can be either BigQuery URI to the input table, + or Google Cloud Storage URI(-s) to the + csv file(s). May contain wildcards. For more + information on wildcards, see + https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames. + Example: + 'bq://project.dataset.table_name' + or + ["gs://my_bucket/my_file_1.csv", "gs://my_bucket/my_file_2.csv"] + + Each read instance consists of exactly one read timestamp + and one or more entity IDs identifying entities of the + corresponding EntityTypes whose Features are requested. + + Each output instance contains Feature values of requested + entities concatenated together as of the read time. + + An example read instance may be + ``foo_entity_id, bar_entity_id, 2020-01-01T10:00:00.123Z``. + + An example output instance may be + ``foo_entity_id, bar_entity_id, 2020-01-01T10:00:00.123Z, foo_entity_feature1_value, bar_entity_feature2_value``. + + Timestamp in each read instance must be millisecond-aligned. + + The columns can be in any order. + + Values in the timestamp column must use the RFC 3339 format, + e.g. ``2012-07-30T10:43:17.123Z``. + + pass_through_fields (List[str]): + Optional. When not empty, the specified fields in the + read_instances source will be joined as-is in the output, + in addition to those fields from the Featurestore Entity. + + For BigQuery source, the type of the pass-through values + will be automatically inferred. For CSV source, the + pass-through values will be passed as opaque bytes. + + Returns: + Featurestore: The featurestore resource object batch read feature values from. + + Raises: + NotFound: if the BigQuery destination Dataset does not exist. + FailedPrecondition: if the BigQuery destination Dataset/Table is in a different project. + """ + batch_read_feature_values_request = self._validate_and_get_batch_read_feature_values_request( + serving_feature_ids=serving_feature_ids, + destination=gca_io.BigQueryDestination( + output_uri=bq_destination_output_uri + ), + feature_destination_fields=feature_destination_fields, + read_instances=read_instances + if read_instances is None + else self._get_read_instances(read_instances), + pass_through_fields=pass_through_fields, + ) + + return self._batch_read_feature_values( + batch_read_feature_values_request=batch_read_feature_values_request, + request_metadata=request_metadata, + ) + + @base.optional_sync(return_input_arg="self") + def batch_serve_to_gcs( + self, + gcs_destination_output_uri_prefix: str, + gcs_destination_type: str, + serving_feature_ids: Dict[str, List[str]], + feature_destination_fields: Optional[Dict[str, str]] = None, + read_instances: Optional[Union[str, List[str]]] = None, + pass_through_fields: Optional[List[str]] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + sync: bool = True, + ) -> "Featurestore": + """ Batch serves feature values to GCS destination + + Args: + gcs_destination_output_uri_prefix (str): + Required. Google Cloud Storage URI to output + directory. If the uri doesn't end with '/', a + '/' will be automatically appended. The + directory is created if it doesn't exist. + + Example: + "gs://bucket/path/to/prefix" + + gcs_destination_type (str): + Required. The type of the destination files(s), + the value of gcs_destination_type can only be either `csv`, or `tfrecord`. + + For CSV format. Array Feature value types are not allowed in CSV format. + + For TFRecord format. + + Below are the mapping from Feature value type in + Featurestore to Feature value type in TFRecord: + + :: + + Value type in Featurestore | Value type in TFRecord + DOUBLE, DOUBLE_ARRAY | FLOAT_LIST + INT64, INT64_ARRAY | INT64_LIST + STRING, STRING_ARRAY, BYTES | BYTES_LIST + true -> byte_string("true"), false -> byte_string("false") + BOOL, BOOL_ARRAY (true, false) | BYTES_LIST + + serving_feature_ids (Dict[str, List[str]]): + Required. A user defined dictionary to define the entity_types and their features for batch serve/read. + The keys of the dictionary are the serving entity_type ids and + the values are lists of serving feature ids in each entity_type. + + Example: + serving_feature_ids = { + 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], + 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], + } + + feature_destination_fields (Dict[str, str]): + Optional. A user defined dictionary to map a feature's fully qualified resource name to + its destination field name. If the destination field name is not defined, + the feature ID will be used as its destination field name. + + Example: + feature_destination_fields = { + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id1/features/f_id11': 'foo', + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id2/features/f_id22': 'bar', + } + + read_instances (Union[str, List[str]]): + Optional. Read_instances can be either BigQuery URI to the input table, + or Google Cloud Storage URI(-s) to the + csv file(s). May contain wildcards. For more + information on wildcards, see + https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames. + Example: + 'bq://project.dataset.table_name' + or + ["gs://my_bucket/my_file_1.csv", "gs://my_bucket/my_file_2.csv"] + + Each read instance consists of exactly one read timestamp + and one or more entity IDs identifying entities of the + corresponding EntityTypes whose Features are requested. + + Each output instance contains Feature values of requested + entities concatenated together as of the read time. + + An example read instance may be + ``foo_entity_id, bar_entity_id, 2020-01-01T10:00:00.123Z``. + + An example output instance may be + ``foo_entity_id, bar_entity_id, 2020-01-01T10:00:00.123Z, foo_entity_feature1_value, bar_entity_feature2_value``. + + Timestamp in each read instance must be millisecond-aligned. + + The columns can be in any order. + + Values in the timestamp column must use the RFC 3339 format, + e.g. ``2012-07-30T10:43:17.123Z``. + + pass_through_fields (List[str]): + Optional. When not empty, the specified fields in the + read_instances source will be joined as-is in the output, + in addition to those fields from the Featurestore Entity. + + For BigQuery source, the type of the pass-through values + will be automatically inferred. For CSV source, the + pass-through values will be passed as opaque bytes. + + Returns: + Featurestore: The featurestore resource object batch read feature values from. + + Raises: + ValueError if gcs_destination_type is not supported. + + """ + destination = None + if gcs_destination_type not in featurestore_utils.GCS_DESTINATION_TYPE: + raise ValueError( + "Only %s are supported gcs_destination_type, not `%s`. " + % ( + "`" + "`, `".join(featurestore_utils.GCS_DESTINATION_TYPE) + "`", + gcs_destination_type, + ) + ) + + gcs_destination = gca_io.GcsDestination( + output_uri_prefix=gcs_destination_output_uri_prefix + ) + if gcs_destination_type == "csv": + destination = gca_io.CsvDestination(gcs_destination=gcs_destination) + if gcs_destination_type == "tfrecord": + destination = gca_io.TFRecordDestination(gcs_destination=gcs_destination) + + batch_read_feature_values_request = self._validate_and_get_batch_read_feature_values_request( + serving_feature_ids=serving_feature_ids, + destination=destination, + feature_destination_fields=feature_destination_fields, + read_instances=read_instances + if read_instances is None + else self._get_read_instances(read_instances), + pass_through_fields=pass_through_fields, + ) + + return self._batch_read_feature_values( + batch_read_feature_values_request=batch_read_feature_values_request, + request_metadata=request_metadata, + ) diff --git a/google/cloud/aiplatform/utils/__init__.py b/google/cloud/aiplatform/utils/__init__.py index 26b28dcdd7..c5c21a2a0b 100644 --- a/google/cloud/aiplatform/utils/__init__.py +++ b/google/cloud/aiplatform/utils/__init__.py @@ -628,9 +628,11 @@ def get_timestamp_proto( """ if not time: time = datetime.datetime.now() - t = time.timestamp() - seconds = int(t) - # must not have higher than millisecond precision. - nanos = int((t % 1 * 1e6) * 1e3) - return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos) + time_str = time.isoformat(sep=" ", timespec="milliseconds") + time = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S.%f") + + timestamp_proto = timestamp_pb2.Timestamp() + timestamp_proto.FromDatetime(time) + + return timestamp_proto diff --git a/google/cloud/aiplatform/utils/featurestore_utils.py b/google/cloud/aiplatform/utils/featurestore_utils.py index e9d26b62be..45dbbbf44f 100644 --- a/google/cloud/aiplatform/utils/featurestore_utils.py +++ b/google/cloud/aiplatform/utils/featurestore_utils.py @@ -29,6 +29,7 @@ RESOURCE_ID_PATTERN_REGEX = r"[a-z_][a-z0-9_]{0,59}" GCS_SOURCE_TYPE = {"csv", "avro"} +GCS_DESTINATION_TYPE = {"csv", "tfrecord"} _FEATURE_VALUE_TYPE_UNSPECIFIED = "VALUE_TYPE_UNSPECIFIED" diff --git a/tests/system/aiplatform/e2e_base.py b/tests/system/aiplatform/e2e_base.py index 61b9e7f36c..3a9f87e8ae 100644 --- a/tests/system/aiplatform/e2e_base.py +++ b/tests/system/aiplatform/e2e_base.py @@ -24,6 +24,7 @@ from google.api_core import exceptions from google.cloud import aiplatform +from google.cloud import bigquery from google.cloud import storage from google.cloud.aiplatform import initializer @@ -90,6 +91,38 @@ def delete_staging_bucket(self, shared_state: Dict[str, Any]): bucket = shared_state["bucket"] bucket.delete(force=True) + @pytest.fixture(scope="class") + def prepare_bigquery_dataset( + self, shared_state: Dict[str, Any] + ) -> Generator[bigquery.dataset.Dataset, None, None]: + """Create a bigquery dataset and store bigquery resource object in shared state.""" + + bigquery_client = bigquery.Client(project=_PROJECT) + shared_state["bigquery_client"] = bigquery_client + + dataset_name = f"{self._temp_prefix.lower()}_{uuid.uuid4()}".replace("-", "_") + dataset_id = f"{_PROJECT}.{dataset_name}" + shared_state["bigquery_dataset_id"] = dataset_id + + dataset = bigquery.Dataset(dataset_id) + dataset.location = _LOCATION + shared_state["bigquery_dataset"] = bigquery_client.create_dataset(dataset) + + yield + + @pytest.fixture(scope="class") + def delete_bigquery_dataset(self, shared_state: Dict[str, Any]): + """Delete the bigquery dataset""" + + yield + + # Get the bigquery dataset id used for testing and wipe it + bigquery_dataset = shared_state["bigquery_dataset"] + bigquery_client = shared_state["bigquery_client"] + bigquery_client.delete_dataset( + bigquery_dataset.dataset_id, delete_contents=True, not_found_ok=True + ) # Make an API request. + @pytest.fixture(scope="class", autouse=True) def teardown(self, shared_state: Dict[str, Any]): """Delete every Vertex AI resource created during test""" diff --git a/tests/system/aiplatform/test_featurestore.py b/tests/system/aiplatform/test_featurestore.py index d22119ea22..b67dec6883 100644 --- a/tests/system/aiplatform/test_featurestore.py +++ b/tests/system/aiplatform/test_featurestore.py @@ -16,6 +16,7 @@ # import logging +import pytest from google.cloud import aiplatform from tests.system.aiplatform import e2e_base @@ -29,6 +30,8 @@ "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movies.avro" ) +_TEST_READ_INSTANCE_SRC = "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movie_prediction.csv" + _TEST_FEATURESTORE_ID = "movie_prediction" _TEST_USER_ENTITY_TYPE_ID = "users" _TEST_MOVIE_ENTITY_TYPE_ID = "movies" @@ -42,6 +45,12 @@ _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID = "average_rating" +@pytest.mark.usefixtures( + "prepare_staging_bucket", + "delete_staging_bucket", + "prepare_bigquery_dataset", + "delete_bigquery_dataset", +) class TestFeaturestore(e2e_base.TestEndToEnd): _temp_prefix = "temp_vertex_sdk_e2e_featurestore_test" @@ -131,7 +140,7 @@ def test_create_get_list_features(self, shared_state): user_age_feature = user_entity_type.create_feature( feature_id=_TEST_USER_AGE_FEATURE_ID, value_type="INT64" ) - + shared_state["user_age_feature_resource_name"] = user_age_feature.resource_name get_user_age_feature = user_entity_type.get_feature( feature_id=_TEST_USER_AGE_FEATURE_ID ) @@ -142,6 +151,9 @@ def test_create_get_list_features(self, shared_state): value_type="STRING", entity_type_name=user_entity_type_name, ) + shared_state[ + "user_gender_feature_resource_name" + ] = user_gender_feature.resource_name get_user_gender_feature = aiplatform.Feature( feature_name=user_gender_feature.resource_name @@ -153,6 +165,9 @@ def test_create_get_list_features(self, shared_state): user_liked_genres_feature = user_entity_type.create_feature( feature_id=_TEST_USER_LIKED_GENRES_FEATURE_ID, value_type="STRING_ARRAY", ) + shared_state[ + "user_liked_genres_feature_resource_name" + ] = user_liked_genres_feature.resource_name get_user_liked_genres_feature = aiplatform.Feature( feature_name=user_liked_genres_feature.resource_name @@ -250,6 +265,105 @@ def test_search_features(self, shared_state): len(list_searched_features) - shared_state["base_list_searched_features"] ) == 6 + def test_batch_serve_to_gcs(self, shared_state, caplog): + + assert shared_state["featurestore"] + assert shared_state["bucket"] + assert shared_state["user_age_feature_resource_name"] + assert shared_state["user_gender_feature_resource_name"] + assert shared_state["user_liked_genres_feature_resource_name"] + + featurestore = shared_state["featurestore"] + bucket_name = shared_state["staging_bucket_name"] + user_age_feature_resource_name = shared_state["user_age_feature_resource_name"] + user_gender_feature_resource_name = shared_state[ + "user_gender_feature_resource_name" + ] + user_liked_genres_feature_resource_name = shared_state[ + "user_liked_genres_feature_resource_name" + ] + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + caplog.set_level(logging.INFO) + + featurestore.batch_serve_to_gcs( + serving_feature_ids={ + _TEST_USER_ENTITY_TYPE_ID: [ + _TEST_USER_AGE_FEATURE_ID, + _TEST_USER_GENDER_FEATURE_ID, + _TEST_USER_LIKED_GENRES_FEATURE_ID, + ], + _TEST_MOVIE_ENTITY_TYPE_ID: [ + _TEST_MOVIE_TITLE_FEATURE_ID, + _TEST_MOVIE_GENRES_FEATURE_ID, + _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID, + ], + }, + feature_destination_fields={ + user_age_feature_resource_name: "user_age_dest", + user_gender_feature_resource_name: "user_gender_dest", + user_liked_genres_feature_resource_name: "user_liked_genres_dest", + }, + read_instances=_TEST_READ_INSTANCE_SRC, + gcs_destination_output_uri_prefix=f"gs://{bucket_name}/featurestore_test/tfrecord", + gcs_destination_type="tfrecord", + ) + assert "Featurestore feature values served." in caplog.text + + caplog.clear() + + def test_batch_serve_to_bq(self, shared_state, caplog): + + assert shared_state["featurestore"] + assert shared_state["bigquery_dataset"] + assert shared_state["user_age_feature_resource_name"] + assert shared_state["user_gender_feature_resource_name"] + assert shared_state["user_liked_genres_feature_resource_name"] + + featurestore = shared_state["featurestore"] + bigquery_dataset_id = shared_state["bigquery_dataset_id"] + user_age_feature_resource_name = shared_state["user_age_feature_resource_name"] + user_gender_feature_resource_name = shared_state[ + "user_gender_feature_resource_name" + ] + user_liked_genres_feature_resource_name = shared_state[ + "user_liked_genres_feature_resource_name" + ] + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + caplog.set_level(logging.INFO) + + featurestore.batch_serve_to_bq( + serving_feature_ids={ + _TEST_USER_ENTITY_TYPE_ID: [ + _TEST_USER_AGE_FEATURE_ID, + _TEST_USER_GENDER_FEATURE_ID, + _TEST_USER_LIKED_GENRES_FEATURE_ID, + ], + _TEST_MOVIE_ENTITY_TYPE_ID: [ + _TEST_MOVIE_TITLE_FEATURE_ID, + _TEST_MOVIE_GENRES_FEATURE_ID, + _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID, + ], + }, + feature_destination_fields={ + user_age_feature_resource_name: "user_age_dest", + user_gender_feature_resource_name: "user_gender_dest", + user_liked_genres_feature_resource_name: "user_liked_genres_dest", + }, + read_instances=_TEST_READ_INSTANCE_SRC, + bq_destination_output_uri=f"bq://{bigquery_dataset_id}.test_table", + ) + + assert "Featurestore feature values served." in caplog.text + caplog.clear() + def test_online_reads(self, shared_state): assert shared_state["user_entity_type"] assert shared_state["movie_entity_type"] diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index a92043969e..97cec0056f 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -207,6 +207,11 @@ "my_feature_id_1": "my_feature_id_1_source_field", } +_TEST_SERVING_FEATURE_IDS = { + "my_entity_type_id_1": ["my_feature_id_1_1", "my_feature_id_1_2"], + "my_entity_type_id_2": ["my_feature_id_2_1", "my_feature_id_2_2"], +} + _TEST_FEATURE_TIME_FIELD = "feature_time_field" _TEST_FEATURE_TIME = datetime.datetime.now() @@ -214,6 +219,7 @@ _TEST_GCS_AVRO_SOURCE_URIS = [ "gs://my_bucket/my_file_1.avro", ] +_TEST_GCS_CSV_SOURCE_URI = "gs://my_bucket/my_file_1.csv" _TEST_GCS_CSV_SOURCE_URIS = [ "gs://my_bucket/my_file_1.csv", ] @@ -221,6 +227,13 @@ _TEST_GCS_SOURCE_TYPE_AVRO = "avro" _TEST_GCS_SOURCE_TYPE_INVALID = "json" +_TEST_BQ_DESTINATION_URI = "bq://project.dataset.table_name" +_TEST_GCS_OUTPUT_URI_PREFIX = "gs://my_bucket/path/to_prefix" + +_TEST_GCS_DESTINATION_TYPE_CSV = "csv" +_TEST_GCS_DESTINATION_TYPE_TFRECORD = "tfrecord" +_TEST_GCS_DESTINATION_TYPE_INVALID = "json" + _TEST_BQ_SOURCE = gca_io.BigQuerySource(input_uri=_TEST_BQ_SOURCE_URI) _TEST_AVRO_SOURCE = gca_io.AvroSource( gcs_source=gca_io.GcsSource(uris=_TEST_GCS_AVRO_SOURCE_URIS) @@ -229,6 +242,14 @@ gcs_source=gca_io.GcsSource(uris=_TEST_GCS_CSV_SOURCE_URIS) ) +_TEST_BQ_DESTINATION = gca_io.BigQueryDestination(output_uri=_TEST_BQ_DESTINATION_URI) +_TEST_CSV_DESTINATION = gca_io.CsvDestination( + gcs_destination=gca_io.GcsDestination(output_uri_prefix=_TEST_GCS_OUTPUT_URI_PREFIX) +) +_TEST_TFRECORD_DESTINATION = gca_io.TFRecordDestination( + gcs_destination=gca_io.GcsDestination(output_uri_prefix=_TEST_GCS_OUTPUT_URI_PREFIX) +) + _TEST_READ_ENTITY_ID = "entity_id_1" _TEST_READ_ENTITY_IDS = ["entity_id_1"] @@ -243,6 +264,26 @@ ) +def _get_entity_type_spec_proto_with_feature_ids( + entity_type_id, feature_ids, feature_destination_fields=None +): + feature_destination_fields = feature_destination_fields or {} + entity_type_spec_proto = gca_featurestore_service.BatchReadFeatureValuesRequest.EntityTypeSpec( + entity_type_id=entity_type_id, + feature_selector=gca_feature_selector.FeatureSelector( + id_matcher=gca_feature_selector.IdMatcher(ids=feature_ids) + ), + settings=[ + gca_featurestore_service.DestinationFeatureSetting( + feature_id=feature_id, destination_field=feature_destination_field + ) + for feature_id, feature_destination_field in feature_destination_fields.items() + ] + or None, + ) + return entity_type_spec_proto + + def _get_header_proto(feature_ids): header_proto = copy.deepcopy(_TEST_BASE_HEADER_PROTO) header_proto.feature_descriptors = [ @@ -351,6 +392,17 @@ def create_featurestore_mock(): yield create_featurestore_mock +@pytest.fixture +def batch_read_feature_values_mock(): + with patch.object( + featurestore_service_client.FeaturestoreServiceClient, + "batch_read_feature_values", + ) as batch_read_feature_values_mock: + batch_read_feature_values_lro_mock = mock.Mock(operation.Operation) + batch_read_feature_values_mock.return_value = batch_read_feature_values_lro_mock + yield batch_read_feature_values_mock + + # ALL EntityType Mocks @pytest.fixture def get_entity_type_mock(): @@ -875,6 +927,288 @@ def test_create_featurestore(self, create_featurestore_mock, sync): metadata=_TEST_REQUEST_METADATA, ) + @pytest.mark.usefixtures("get_featurestore_mock") + @pytest.mark.parametrize( + "serving_feature_ids, feature_destination_fields, expected_entity_type_specs", + [ + ( + { + "my_entity_type_id_1": ["my_feature_id_1_1", "my_feature_id_1_2"], + "my_entity_type_id_2": ["my_feature_id_2_1", "my_feature_id_2_2"], + }, + None, + [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + ), + ], + ), + ( + { + "my_entity_type_id_1": ["my_feature_id_1_1", "my_feature_id_1_2"], + "my_entity_type_id_2": ["my_feature_id_2_1", "my_feature_id_2_2"], + }, + { + f"{_TEST_FEATURESTORE_NAME}/entityTypes/my_entity_type_id_1/features/my_feature_id_1_1": "my_feature_id_1_1_dest", + f"{_TEST_FEATURESTORE_NAME}/entityTypes/my_entity_type_id_1/features/my_feature_id_1_2": "my_feature_id_1_2_dest", + }, + [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + feature_destination_fields={ + "my_feature_id_1_1": "my_feature_id_1_1_dest", + "my_feature_id_1_2": "my_feature_id_1_2_dest", + }, + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + ), + ], + ), + ( + { + "my_entity_type_id_1": ["my_feature_id_1_1", "my_feature_id_1_2"], + "my_entity_type_id_2": ["my_feature_id_2_1", "my_feature_id_2_2"], + }, + { + f"{_TEST_FEATURESTORE_NAME}/entityTypes/my_entity_type_id_1/features/my_feature_id_1_1": "my_feature_id_1_1_dest", + f"{_TEST_FEATURESTORE_NAME}/entityTypes/my_entity_type_id_2/features/my_feature_id_2_1": "my_feature_id_2_1_dest", + }, + [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + feature_destination_fields={ + "my_feature_id_1_1": "my_feature_id_1_1_dest" + }, + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + feature_destination_fields={ + "my_feature_id_2_1": "my_feature_id_2_1_dest" + }, + ), + ], + ), + ], + ) + def test_validate_and_get_batch_read_feature_values_request( + self, + serving_feature_ids, + feature_destination_fields, + expected_entity_type_specs, + ): + + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( + featurestore=my_featurestore.resource_name, + destination=gca_featurestore_service.FeatureValueDestination( + bigquery_destination=_TEST_BQ_DESTINATION, + ), + entity_type_specs=expected_entity_type_specs, + ) + assert ( + expected_batch_read_feature_values_request + == my_featurestore._validate_and_get_batch_read_feature_values_request( + serving_feature_ids=serving_feature_ids, + destination=_TEST_BQ_DESTINATION, + feature_destination_fields=feature_destination_fields, + ) + ) + + @pytest.mark.usefixtures("get_featurestore_mock") + def test_validate_and_get_batch_read_feature_values_request_with_read_instances( + self, + ): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + expected_entity_type_specs = [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + ), + ] + expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( + featurestore=my_featurestore.resource_name, + destination=gca_featurestore_service.FeatureValueDestination( + bigquery_destination=_TEST_BQ_DESTINATION, + ), + entity_type_specs=expected_entity_type_specs, + bigquery_read_instances=_TEST_BQ_SOURCE, + ) + assert ( + expected_batch_read_feature_values_request + == my_featurestore._validate_and_get_batch_read_feature_values_request( + serving_feature_ids=_TEST_SERVING_FEATURE_IDS, + destination=_TEST_BQ_DESTINATION, + read_instances=_TEST_BQ_SOURCE, + ) + ) + + @pytest.mark.usefixtures("get_featurestore_mock") + @pytest.mark.parametrize( + "read_instances, expected", + [ + (_TEST_BQ_SOURCE_URI, _TEST_BQ_SOURCE), + (_TEST_GCS_CSV_SOURCE_URIS, _TEST_CSV_SOURCE), + (_TEST_GCS_CSV_SOURCE_URI, _TEST_CSV_SOURCE), + ], + ) + def test_get_read_instances(self, read_instances, expected): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + assert expected == my_featurestore._get_read_instances( + read_instances=read_instances + ) + + @pytest.mark.usefixtures("get_featurestore_mock") + @pytest.mark.parametrize( + "read_instances", + [[1, 2, 3, 4, 5], 1, (_TEST_GCS_CSV_SOURCE_URI, _TEST_GCS_CSV_SOURCE_URI)], + ) + def test_get_read_instances_with_raise_typeerror(self, read_instances): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + with pytest.raises(TypeError): + my_featurestore._get_read_instances(read_instances=read_instances) + + @pytest.mark.usefixtures("get_featurestore_mock") + @pytest.mark.parametrize( + "read_instances", + [ + "gcs://my_bucket/my_file_1.csv", + "bigquery://my_bucket/my_file_1.csv", + "my_bucket/my_file_1.csv", + [_TEST_BQ_SOURCE_URI], + ], + ) + def test_get_read_instances_with_raise_valueerror(self, read_instances): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + with pytest.raises(ValueError): + my_featurestore._get_read_instances(read_instances=read_instances) + + @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.usefixtures("get_featurestore_mock") + def test_batch_serve_to_bq(self, batch_read_feature_values_mock, sync): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + + expected_entity_type_specs = [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + ), + ] + + expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( + featurestore=my_featurestore.resource_name, + destination=gca_featurestore_service.FeatureValueDestination( + bigquery_destination=_TEST_BQ_DESTINATION, + ), + entity_type_specs=expected_entity_type_specs, + ) + + my_featurestore.batch_serve_to_bq( + bq_destination_output_uri=_TEST_BQ_DESTINATION_URI, + serving_feature_ids=_TEST_SERVING_FEATURE_IDS, + sync=sync, + ) + + if not sync: + my_featurestore.wait() + + batch_read_feature_values_mock.assert_called_once_with( + request=expected_batch_read_feature_values_request, + metadata=_TEST_REQUEST_METADATA, + ) + + @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.usefixtures("get_featurestore_mock") + def test_batch_serve_to_gcs(self, batch_read_feature_values_mock, sync): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + + expected_entity_type_specs = [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + ), + ] + + expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( + featurestore=my_featurestore.resource_name, + destination=gca_featurestore_service.FeatureValueDestination( + tfrecord_destination=_TEST_TFRECORD_DESTINATION, + ), + entity_type_specs=expected_entity_type_specs, + ) + + my_featurestore.batch_serve_to_gcs( + gcs_destination_output_uri_prefix=_TEST_GCS_OUTPUT_URI_PREFIX, + gcs_destination_type=_TEST_GCS_DESTINATION_TYPE_TFRECORD, + serving_feature_ids=_TEST_SERVING_FEATURE_IDS, + sync=sync, + ) + + if not sync: + my_featurestore.wait() + + batch_read_feature_values_mock.assert_called_once_with( + request=expected_batch_read_feature_values_request, + metadata=_TEST_REQUEST_METADATA, + ) + + @pytest.mark.usefixtures("get_featurestore_mock") + def test_batch_serve_to_gcs_with_invalid_gcs_destination_type(self): + + aiplatform.init(project=_TEST_PROJECT) + + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + with pytest.raises(ValueError): + my_featurestore.batch_serve_to_gcs( + gcs_destination_output_uri_prefix=_TEST_GCS_OUTPUT_URI_PREFIX, + gcs_destination_type=_TEST_GCS_DESTINATION_TYPE_INVALID, + serving_feature_ids=_TEST_SERVING_FEATURE_IDS, + ) + class TestEntityType: def setup_method(self): diff --git a/tests/unit/aiplatform/test_utils.py b/tests/unit/aiplatform/test_utils.py index d4840609b1..b47eb684d8 100644 --- a/tests/unit/aiplatform/test_utils.py +++ b/tests/unit/aiplatform/test_utils.py @@ -19,7 +19,6 @@ import pytest from typing import Callable, Dict, Optional import datetime -from decimal import Decimal from google.protobuf import timestamp_pb2 @@ -326,28 +325,8 @@ def test_client_w_override_select_version(): @pytest.mark.parametrize( "year,month,day,hour,minute,second,microsecond,expected_seconds,expected_nanos", [ - ( - 2021, - 12, - 23, - 23, - 59, - 59, - 999999, - 1640303999, - int(str(Decimal(1640303999.999999)).split(".")[1][:9]), - ), - ( - 2013, - 1, - 1, - 1, - 1, - 1, - 199999, - 1357002061, - int(str(Decimal(1357002061.199999)).split(".")[1][:9]), - ), + (2021, 12, 23, 23, 59, 59, 999999, 1640303999, 999000000,), + (2013, 1, 1, 1, 1, 1, 199999, 1357002061, 199000000,), ], ) def test_get_timestamp_proto( @@ -369,7 +348,6 @@ def test_get_timestamp_proto( minute=minute, second=second, microsecond=microsecond, - tzinfo=datetime.timezone.utc, ) true_timestamp_proto = timestamp_pb2.Timestamp( seconds=expected_seconds, nanos=expected_nanos