From e6cca106e8288ac9e76e0938ad0ec4e828db3028 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Thu, 27 Jan 2022 07:56:05 -0800 Subject: [PATCH] feat: enable ingest from pd.DataFrame (#977) * feat: enable ingest from pd.DataFrame * fix: remove bq create_dataset, docstrings, mocks * fix: e2e_base project * fix: delete two optional args, add note for temp bq dataset, revert deleting bq dataset create, add featurestore_extra_require, update ic tests to use online read to validate feature value ingestionfrom df * fix: add a comment of call complete upon ingestion, update unit tests --- .../aiplatform/featurestore/entity_type.py | 192 ++++++++++++++---- setup.py | 2 + tests/system/aiplatform/test_featurestore.py | 174 ++++++++++++++-- tests/unit/aiplatform/test_featurestores.py | 161 ++++++++++++++- 4 files changed, 471 insertions(+), 58 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index 6e993f26b5..8a85b1aa7a 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -17,6 +17,7 @@ import datetime from typing import Dict, List, Optional, Sequence, Tuple, Union +import uuid from google.auth import credentials as auth_credentials from google.protobuf import field_mask_pb2 @@ -34,6 +35,7 @@ from google.cloud.aiplatform import utils from google.cloud.aiplatform.utils import featurestore_utils +from google.cloud import bigquery _LOGGER = base.Logger(__name__) _ALL_FEATURE_IDS = "*" @@ -795,23 +797,16 @@ def _validate_and_get_import_feature_values_request( If not provided, the source column need to be the same as the Feature ID. Example: + feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - - In case all features' source field and ID match: - feature_source_fields = None or {} - - In case all features' source field and ID do not match: - feature_source_fields = { + feature_source_fields = { 'my_feature_id_1': 'my_feature_id_1_source_field', - 'my_feature_id_2': 'my_feature_id_2_source_field', - 'my_feature_id_3': 'my_feature_id_3_source_field', - } + } + + Note: + The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field', + The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'. - In case some features' source field and ID do not match: - feature_source_fields = { - 'my_feature_id_1': 'my_feature_id_1_source_field', - } entity_id_field (str): Optional. Source column that holds entity IDs. If not provided, entity IDs are extracted from the column named ``entity_id``. @@ -954,23 +949,16 @@ def ingest_from_bq( If not provided, the source column need to be the same as the Feature ID. Example: + feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - - In case all features' source field and ID match: - feature_source_fields = None or {} - - In case all features' source field and ID do not match: - feature_source_fields = { + feature_source_fields = { 'my_feature_id_1': 'my_feature_id_1_source_field', - 'my_feature_id_2': 'my_feature_id_2_source_field', - 'my_feature_id_3': 'my_feature_id_3_source_field', - } + } + + Note: + The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field', + The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'. - In case some features' source field and ID do not match: - feature_source_fields = { - 'my_feature_id_1': 'my_feature_id_1_source_field', - } entity_id_field (str): Optional. Source column that holds entity IDs. If not provided, entity IDs are extracted from the column named ``entity_id``. @@ -1000,6 +988,7 @@ def ingest_from_bq( EntityType - The entityType resource object with feature values imported. """ + bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri) import_feature_values_request = self._validate_and_get_import_feature_values_request( @@ -1065,23 +1054,16 @@ def ingest_from_gcs( If not provided, the source column need to be the same as the Feature ID. Example: + feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - - In case all features' source field and ID match: - feature_source_fields = None or {} - - In case all features' source field and ID do not match: - feature_source_fields = { + feature_source_fields = { 'my_feature_id_1': 'my_feature_id_1_source_field', - 'my_feature_id_2': 'my_feature_id_2_source_field', - 'my_feature_id_3': 'my_feature_id_3_source_field', - } + } + + Note: + The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field', + The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'. - In case some features' source field and ID do not match: - feature_source_fields = { - 'my_feature_id_1': 'my_feature_id_1_source_field', - } entity_id_field (str): Optional. Source column that holds entity IDs. If not provided, entity IDs are extracted from the column named ``entity_id``. @@ -1146,6 +1128,132 @@ def ingest_from_gcs( request_metadata=request_metadata, ) + def ingest_from_df( + self, + feature_ids: List[str], + feature_time: Union[str, datetime.datetime], + df_source: "pd.DataFrame", # noqa: F821 - skip check for undefined name 'pd' + feature_source_fields: Optional[Dict[str, str]] = None, + entity_id_field: Optional[str] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + ) -> "EntityType": + """Ingest feature values from DataFrame. + + Note: + Calling this method will automatically create and delete a temporary + bigquery dataset in the same GCP project, which will be used + as the intermediary storage for ingesting feature values + from dataframe to featurestore. + + The call will return upon ingestion completes, where the + feature values will be ingested into the entity_type. + + Args: + feature_ids (List[str]): + Required. IDs of the Feature to import values + of. The Features must exist in the target + EntityType, or the request will fail. + feature_time (Union[str, datetime.datetime]): + Required. The feature_time can be one of: + - The source column that holds the Feature + timestamp for all Feature values in each entity. + + Note: + The dtype of the source column should be `datetime64`. + + - A single Feature timestamp for all entities + being imported. The timestamp must not have + higher than millisecond precision. + + Example: + feature_time = datetime.datetime(year=2022, month=1, day=1, hour=11, minute=59, second=59) + or + feature_time_str = datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds") + feature_time = datetime.datetime.strptime(feature_time_str, "%Y-%m-%d %H:%M:%S.%f") + + df_source (pd.DataFrame): + Required. Pandas DataFrame containing the source data for ingestion. + feature_source_fields (Dict[str, str]): + Optional. User defined dictionary to map ID of the Feature for importing values + of to the source column for getting the Feature values from. + + Specify the features whose ID and source column are not the same. + If not provided, the source column need to be the same as the Feature ID. + + Example: + feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] + + feature_source_fields = { + 'my_feature_id_1': 'my_feature_id_1_source_field', + } + + Note: + The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field', + The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'. + + entity_id_field (str): + Optional. Source column that holds entity IDs. If not provided, entity + IDs are extracted from the column named ``entity_id``. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + + Returns: + EntityType - The entityType resource object with feature values imported. + + """ + try: + import pyarrow # noqa: F401 - skip check for 'pyarrow' which is required when using 'google.cloud.bigquery' + except ImportError: + raise ImportError( + f"Pyarrow is not installed. Please install pyarrow to use " + f"{self.ingest_from_df.__name__}" + ) + + bigquery_client = bigquery.Client( + project=self.project, credentials=self.credentials + ) + + entity_type_name_components = self._parse_resource_name(self.resource_name) + featurestore_id, entity_type_id = ( + entity_type_name_components["featurestore"], + entity_type_name_components["entity_type"], + ) + + temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace( + "-", "_" + ) + temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[ + :1024 + ] + temp_bq_table_id = f"{temp_bq_dataset_id}.{entity_type_id}" + + temp_bq_dataset = bigquery.Dataset(dataset_ref=temp_bq_dataset_id) + temp_bq_dataset.location = self.location + + temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset) + + try: + job = bigquery_client.load_table_from_dataframe( + dataframe=df_source, destination=temp_bq_table_id + ) + job.result() + + entity_type_obj = self.ingest_from_bq( + feature_ids=feature_ids, + feature_time=feature_time, + bq_source_uri=f"bq://{temp_bq_table_id}", + feature_source_fields=feature_source_fields, + entity_id_field=entity_id_field, + request_metadata=request_metadata, + ) + + finally: + bigquery_client.delete_dataset( + dataset=temp_bq_dataset.dataset_id, delete_contents=True, + ) + + return entity_type_obj + @staticmethod def _instantiate_featurestore_online_client( location: Optional[str] = None, diff --git a/setup.py b/setup.py index 2b8c58f033..011da8aea8 100644 --- a/setup.py +++ b/setup.py @@ -47,6 +47,7 @@ "werkzeug >= 2.0.0", "tensorflow >=2.4.0", ] +featurestore_extra_require = ["pandas >= 1.0.0", "pyarrow >= 6.0.1"] full_extra_require = list( set( @@ -54,6 +55,7 @@ + metadata_extra_require + xai_extra_require + lit_extra_require + + featurestore_extra_require ) ) testing_extra_require = ( diff --git a/tests/system/aiplatform/test_featurestore.py b/tests/system/aiplatform/test_featurestore.py index b67dec6883..03070eee3c 100644 --- a/tests/system/aiplatform/test_featurestore.py +++ b/tests/system/aiplatform/test_featurestore.py @@ -15,6 +15,7 @@ # limitations under the License. # +import datetime import logging import pytest @@ -201,22 +202,17 @@ def test_ingest_feature_values(self, shared_state, caplog): gcs_source_uris=_TEST_USERS_ENTITY_TYPE_GCS_SRC, gcs_source_type="avro", entity_id_field="user_id", - worker_count=2, + worker_count=1, ) assert "EntityType feature values imported." in caplog.text caplog.clear() - def test_batch_create_features_and_ingest_feature_values( - self, shared_state, caplog - ): - + def test_batch_create_features(self, shared_state): assert shared_state["movie_entity_type"] movie_entity_type = shared_state["movie_entity_type"] - caplog.set_level(logging.INFO) - aiplatform.init( project=e2e_base._PROJECT, location=e2e_base._LOCATION, ) @@ -232,21 +228,171 @@ def test_batch_create_features_and_ingest_feature_values( movie_entity_type.batch_create_features(feature_configs=movie_feature_configs) - movie_entity_type.ingest_from_gcs( + list_movie_features = movie_entity_type.list_features() + assert len(list_movie_features) == 3 + + def test_ingest_feature_values_from_df_using_feature_time_column_and_online_read_multiple_entities( + self, shared_state, caplog + ): + + assert shared_state["movie_entity_type"] + movie_entity_type = shared_state["movie_entity_type"] + + caplog.set_level(logging.INFO) + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + read_feature_ids = ["average_rating", "title", "genres"] + + movie_entity_views_df_before_ingest = movie_entity_type.read( + entity_ids=["movie_01", "movie_02"], feature_ids=read_feature_ids, + ) + expected_data_before_ingest = [ + { + "entity_id": "movie_01", + "average_rating": None, + "title": None, + "genres": None, + }, + { + "entity_id": "movie_02", + "average_rating": None, + "title": None, + "genres": None, + }, + ] + expected_movie_entity_views_df_before_ingest = pd.DataFrame( + data=expected_data_before_ingest, columns=read_feature_ids + ) + + movie_entity_views_df_before_ingest.equals( + expected_movie_entity_views_df_before_ingest + ) + + movies_df = pd.DataFrame( + data=[ + { + "movie_id": "movie_01", + "average_rating": 4.9, + "title": "The Shawshank Redemption", + "genres": "Drama", + "update_time": "2021-08-20 20:44:11.094375+00:00", + }, + { + "movie_id": "movie_02", + "average_rating": 4.2, + "title": "The Shining", + "genres": "Horror", + "update_time": "2021-08-20 20:44:11.094375+00:00", + }, + ], + columns=["movie_id", "average_rating", "title", "genres", "update_time"], + ) + movies_df = movies_df.astype({"update_time": "datetime64"}) + feature_time_column = "update_time" + + movie_entity_type.ingest_from_df( feature_ids=[ _TEST_MOVIE_TITLE_FEATURE_ID, _TEST_MOVIE_GENRES_FEATURE_ID, _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID, ], - feature_time="update_time", - gcs_source_uris=_TEST_MOVIES_ENTITY_TYPE_GCS_SRC, - gcs_source_type="avro", + feature_time=feature_time_column, + df_source=movies_df, entity_id_field="movie_id", - worker_count=2, ) - list_movie_features = movie_entity_type.list_features() - assert len(list_movie_features) == 3 + movie_entity_views_df_after_ingest = movie_entity_type.read( + entity_ids=["movie_01", "movie_02"], feature_ids=read_feature_ids, + ) + expected_data_after_ingest = [ + { + "movie_id": "movie_01", + "average_rating": 4.9, + "title": "The Shawshank Redemption", + "genres": "Drama", + }, + { + "movie_id": "movie_02", + "average_rating": 4.2, + "title": "The Shining", + "genres": "Horror", + }, + ] + expected_movie_entity_views_df_after_ingest = pd.DataFrame( + data=expected_data_after_ingest, columns=read_feature_ids + ) + + movie_entity_views_df_after_ingest.equals( + expected_movie_entity_views_df_after_ingest + ) + + assert "EntityType feature values imported." in caplog.text + caplog.clear() + + def test_ingest_feature_values_from_df_using_feature_time_datetime_and_online_read_single_entity( + self, shared_state, caplog + ): + assert shared_state["movie_entity_type"] + movie_entity_type = shared_state["movie_entity_type"] + + caplog.set_level(logging.INFO) + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + movies_df = pd.DataFrame( + data=[ + { + "movie_id": "movie_03", + "average_rating": 4.5, + "title": "Cinema Paradiso", + "genres": "Romance", + }, + { + "movie_id": "movie_04", + "average_rating": 4.6, + "title": "The Dark Knight", + "genres": "Action", + }, + ], + columns=["movie_id", "average_rating", "title", "genres"], + ) + + feature_time_datetime_str = datetime.datetime.now().isoformat( + sep=" ", timespec="milliseconds" + ) + feature_time_datetime = datetime.datetime.strptime( + feature_time_datetime_str, "%Y-%m-%d %H:%M:%S.%f" + ) + + movie_entity_type.ingest_from_df( + feature_ids=[ + _TEST_MOVIE_TITLE_FEATURE_ID, + _TEST_MOVIE_GENRES_FEATURE_ID, + _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID, + ], + feature_time=feature_time_datetime, + df_source=movies_df, + entity_id_field="movie_id", + ) + + movie_entity_views_df_avg_rating = movie_entity_type.read( + entity_ids="movie_04", feature_ids="average_rating", + ) + expected_data_avg_rating = [ + {"movie_id": "movie_04", "average_rating": 4.6}, + ] + expected_movie_entity_views_df_avg_rating = pd.DataFrame( + data=expected_data_avg_rating, columns=["average_rating"] + ) + + movie_entity_views_df_avg_rating.equals( + expected_movie_entity_views_df_avg_rating + ) assert "EntityType feature values imported." in caplog.text diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index 97cec0056f..449d0348c3 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -19,13 +19,14 @@ import pytest import datetime import pandas as pd +import uuid from unittest import mock from importlib import reload -from unittest.mock import patch +from unittest.mock import MagicMock, patch from google.api_core import operation -from google.protobuf import field_mask_pb2 +from google.protobuf import field_mask_pb2, timestamp_pb2 from google.cloud import aiplatform from google.cloud.aiplatform import base @@ -51,11 +52,17 @@ types as gca_types, ) +from google.cloud import bigquery + # project _TEST_PROJECT = "test-project" _TEST_LOCATION = "us-central1" _TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}" +_TEST_FEATURE_TIME_DATETIME = datetime.datetime( + year=2022, month=1, day=1, hour=11, minute=59, second=59 +) + # featurestore _TEST_FEATURESTORE_ID = "featurestore_id" _TEST_FEATURESTORE_NAME = f"{_TEST_PARENT}/featurestores/{_TEST_FEATURESTORE_ID}" @@ -321,6 +328,57 @@ def _get_entity_view_proto(entity_id, feature_value_types, feature_values): return entity_view_proto +def uuid_mock(): + return uuid.UUID(int=1) + + +# All BigQuery Mocks +@pytest.fixture +def bq_client_mock(): + mock = MagicMock(bigquery.client.Client) + yield mock + + +@pytest.fixture +def bq_dataset_mock(): + mock = MagicMock(bigquery.dataset.Dataset) + yield mock + + +@pytest.fixture +def bq_init_client_mock(bq_client_mock): + with patch.object(bigquery, "Client") as bq_init_client_mock: + bq_init_client_mock.return_value = bq_client_mock + yield bq_init_client_mock + + +@pytest.fixture +def bq_init_dataset_mock(bq_dataset_mock): + with patch.object(bigquery, "Dataset") as bq_init_dataset_mock: + bq_init_dataset_mock.return_value = bq_dataset_mock + yield bq_init_dataset_mock + + +@pytest.fixture +def bq_create_dataset_mock(bq_init_client_mock): + with patch.object(bigquery.Client, "create_dataset") as bq_create_dataset_mock: + yield bq_create_dataset_mock + + +@pytest.fixture +def bq_load_table_from_dataframe_mock(bq_init_client_mock): + with patch.object( + bigquery.Client, "load_table_from_dataframe" + ) as bq_load_table_from_dataframe_mock: + yield bq_load_table_from_dataframe_mock + + +@pytest.fixture +def bq_delete_dataset_mock(bq_init_client_mock): + with patch.object(bigquery.Client, "delete_dataset") as bq_delete_dataset_mock: + yield bq_delete_dataset_mock + + # All Featurestore Mocks @pytest.fixture def get_featurestore_mock(): @@ -1552,6 +1610,105 @@ def test_ingest_from_gcs_with_invalid_gcs_source_type(self): gcs_source_type=_TEST_GCS_SOURCE_TYPE_INVALID, ) + @pytest.mark.usefixtures( + "get_entity_type_mock", + "bq_init_client_mock", + "bq_init_dataset_mock", + "bq_create_dataset_mock", + "bq_load_table_from_dataframe_mock", + "bq_delete_dataset_mock", + ) + @patch("uuid.uuid4", uuid_mock) + def test_ingest_from_df_using_column(self, import_feature_values_mock): + + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) + df_source = pd.DataFrame() + my_entity_type.ingest_from_df( + feature_ids=_TEST_IMPORTING_FEATURE_IDS, + feature_time=_TEST_FEATURE_TIME_FIELD, + df_source=df_source, + feature_source_fields=_TEST_IMPORTING_FEATURE_SOURCE_FIELDS, + ) + expected_temp_bq_dataset_name = f"temp_{_TEST_FEATURESTORE_ID}_{uuid.uuid4()}".replace( + "-", "_" + ) + expecte_temp_bq_dataset_id = f"{initializer.global_config.project}.{expected_temp_bq_dataset_name}"[ + :1024 + ] + expected_temp_bq_table_id = ( + f"{expecte_temp_bq_dataset_id}.{_TEST_ENTITY_TYPE_ID}" + ) + + true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( + entity_type=_TEST_ENTITY_TYPE_NAME, + feature_specs=[ + gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec( + id="my_feature_id_1", source_field="my_feature_id_1_source_field" + ), + ], + bigquery_source=gca_io.BigQuerySource( + input_uri=f"bq://{expected_temp_bq_table_id}" + ), + feature_time_field=_TEST_FEATURE_TIME_FIELD, + ) + + import_feature_values_mock.assert_called_once_with( + request=true_import_feature_values_request, metadata=_TEST_REQUEST_METADATA, + ) + + @pytest.mark.usefixtures( + "get_entity_type_mock", + "bq_init_client_mock", + "bq_init_dataset_mock", + "bq_create_dataset_mock", + "bq_load_table_from_dataframe_mock", + "bq_delete_dataset_mock", + ) + @patch("uuid.uuid4", uuid_mock) + def test_ingest_from_df_using_datetime(self, import_feature_values_mock): + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) + df_source = pd.DataFrame() + my_entity_type.ingest_from_df( + feature_ids=_TEST_IMPORTING_FEATURE_IDS, + feature_time=_TEST_FEATURE_TIME_DATETIME, + df_source=df_source, + feature_source_fields=_TEST_IMPORTING_FEATURE_SOURCE_FIELDS, + ) + + expected_temp_bq_dataset_name = f"temp_{_TEST_FEATURESTORE_ID}_{uuid.uuid4()}".replace( + "-", "_" + ) + expecte_temp_bq_dataset_id = f"{initializer.global_config.project}.{expected_temp_bq_dataset_name}"[ + :1024 + ] + expected_temp_bq_table_id = ( + f"{expecte_temp_bq_dataset_id}.{_TEST_ENTITY_TYPE_ID}" + ) + + timestamp_proto = timestamp_pb2.Timestamp() + timestamp_proto.FromDatetime(_TEST_FEATURE_TIME_DATETIME) + + true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( + entity_type=_TEST_ENTITY_TYPE_NAME, + feature_specs=[ + gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec( + id="my_feature_id_1", source_field="my_feature_id_1_source_field" + ), + ], + bigquery_source=gca_io.BigQuerySource( + input_uri=f"bq://{expected_temp_bq_table_id}" + ), + feature_time=timestamp_proto, + ) + + import_feature_values_mock.assert_called_once_with( + request=true_import_feature_values_request, metadata=_TEST_REQUEST_METADATA, + ) + @pytest.mark.usefixtures("get_entity_type_mock", "get_feature_mock") def test_read_single_entity(self, read_feature_values_mock): aiplatform.init(project=_TEST_PROJECT)