From f1593926fdedffdd286f31813b6aa82f3113bd14 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Fri, 21 Jan 2022 22:32:20 -0800 Subject: [PATCH 1/5] feat: enable ingest from pd.DataFrame --- .../aiplatform/featurestore/entity_type.py | 145 ++++++++++++++++++ google/cloud/aiplatform/utils/__init__.py | 12 +- tests/system/aiplatform/e2e_base.py | 1 + tests/system/aiplatform/test_featurestore.py | 110 +++++++++++-- tests/unit/aiplatform/test_featurestores.py | 137 ++++++++++++++++- 5 files changed, 385 insertions(+), 20 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index 6e993f26b5..6d5a42e596 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -17,6 +17,7 @@ import datetime from typing import Dict, List, Optional, Sequence, Tuple, Union +import uuid from google.auth import credentials as auth_credentials from google.protobuf import field_mask_pb2 @@ -34,6 +35,7 @@ from google.cloud.aiplatform import utils from google.cloud.aiplatform.utils import featurestore_utils +from google.cloud import bigquery _LOGGER = base.Logger(__name__) _ALL_FEATURE_IDS = "*" @@ -1000,6 +1002,7 @@ def ingest_from_bq( EntityType - The entityType resource object with feature values imported. """ + bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri) import_feature_values_request = self._validate_and_get_import_feature_values_request( @@ -1146,6 +1149,148 @@ def ingest_from_gcs( request_metadata=request_metadata, ) + def ingest_from_df( + self, + feature_ids: List[str], + feature_time: Union[str, datetime.datetime], + df_source: "pd.DataFrame", # noqa: F821 - skip check for undefined name 'pd' + feature_source_fields: Optional[Dict[str, str]] = None, + entity_id_field: Optional[str] = None, + disable_online_serving: Optional[bool] = None, + worker_count: Optional[int] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + ) -> "EntityType": + """Ingest feature values from DataFrame. + + Args: + feature_ids (List[str]): + Required. IDs of the Feature to import values + of. The Features must exist in the target + EntityType, or the request will fail. + feature_time (Union[str, datetime.datetime]): + Required. The feature_time can be one of: + - The source column that holds the Feature + timestamp for all Feature values in each entity. + + Note: + The dtype of the source column should be `datetime64`. + + - A single Feature timestamp for all entities + being imported. The timestamp must not have + higher than millisecond precision. + + Example: + feature_time = datetime.datetime(year=2022, month=1, day=1, hour=11, minute=59, second=59) + or + feature_time_str = datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds") + feature_time = datetime.datetime.strptime(feature_time_str, "%Y-%m-%d %H:%M:%S.%f") + + df_source (pd.DataFrame): + Required. Pandas DataFrame containing the source data for ingestion. + feature_source_fields (Dict[str, str]): + Optional. User defined dictionary to map ID of the Feature for importing values + of to the source column for getting the Feature values from. + + Specify the features whose ID and source column are not the same. + If not provided, the source column need to be the same as the Feature ID. + + Example: + + feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] + + In case all features' source field and ID match: + feature_source_fields = None or {} + + In case all features' source field and ID do not match: + feature_source_fields = { + 'my_feature_id_1': 'my_feature_id_1_source_field', + 'my_feature_id_2': 'my_feature_id_2_source_field', + 'my_feature_id_3': 'my_feature_id_3_source_field', + } + + In case some features' source field and ID do not match: + feature_source_fields = { + 'my_feature_id_1': 'my_feature_id_1_source_field', + } + entity_id_field (str): + Optional. Source column that holds entity IDs. If not provided, entity + IDs are extracted from the column named ``entity_id``. + disable_online_serving (bool): + Optional. If set, data will not be imported for online + serving. This is typically used for backfilling, + where Feature generation timestamps are not in + the timestamp range needed for online serving. + worker_count (int): + Optional. Specifies the number of workers that are used + to write data to the Featurestore. Consider the + online serving capacity that you require to + achieve the desired import throughput without + interfering with online serving. The value must + be positive, and less than or equal to 100. If + not set, defaults to using 1 worker. The low + count ensures minimal impact on online serving + performance. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + + Returns: + EntityType - The entityType resource object with feature values imported. + + """ + try: + import pyarrow # noqa: F401 - skip check for 'pyarrow' which is required when using 'google.cloud.bigquery' + except ImportError: + raise ImportError( + f"Pyarrow is not installed. Please install pyarrow to use " + f"{self.ingest_from_df.__name__}" + ) + + bigquery_client = bigquery.Client( + project=self.project, credentials=self.credentials + ) + + entity_type_name_components = self._parse_resource_name(self.resource_name) + featurestore_id, entity_type_id = ( + entity_type_name_components["featurestore"], + entity_type_name_components["entity_type"], + ) + + temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace( + "-", "_" + ) + temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[ + :1024 + ] + temp_bq_table_id = f"{temp_bq_dataset_id}.{entity_type_id}" + + temp_bq_dataset = bigquery.Dataset(dataset_ref=temp_bq_dataset_id) + temp_bq_dataset.location = self.location + temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset) + + try: + job = bigquery_client.load_table_from_dataframe( + dataframe=df_source, destination=temp_bq_table_id + ) + job.result() + + entity_type_obj = self.ingest_from_bq( + feature_ids=feature_ids, + feature_time=feature_time, + bq_source_uri=f"bq://{temp_bq_table_id}", + feature_source_fields=feature_source_fields, + entity_id_field=entity_id_field, + disable_online_serving=disable_online_serving, + worker_count=worker_count, + request_metadata=request_metadata, + ) + + finally: + bigquery_client.delete_dataset( + dataset=temp_bq_dataset.dataset_id, delete_contents=True, + ) + + return entity_type_obj + @staticmethod def _instantiate_featurestore_online_client( location: Optional[str] = None, diff --git a/google/cloud/aiplatform/utils/__init__.py b/google/cloud/aiplatform/utils/__init__.py index 26b28dcdd7..c5c21a2a0b 100644 --- a/google/cloud/aiplatform/utils/__init__.py +++ b/google/cloud/aiplatform/utils/__init__.py @@ -628,9 +628,11 @@ def get_timestamp_proto( """ if not time: time = datetime.datetime.now() - t = time.timestamp() - seconds = int(t) - # must not have higher than millisecond precision. - nanos = int((t % 1 * 1e6) * 1e3) - return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos) + time_str = time.isoformat(sep=" ", timespec="milliseconds") + time = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S.%f") + + timestamp_proto = timestamp_pb2.Timestamp() + timestamp_proto.FromDatetime(time) + + return timestamp_proto diff --git a/tests/system/aiplatform/e2e_base.py b/tests/system/aiplatform/e2e_base.py index 61b9e7f36c..3a35a12707 100644 --- a/tests/system/aiplatform/e2e_base.py +++ b/tests/system/aiplatform/e2e_base.py @@ -29,6 +29,7 @@ _PROJECT = os.getenv("BUILD_SPECIFIC_GCLOUD_PROJECT") _LOCATION = "us-central1" +_PROJECT = "aiplatform-dev" class TestEndToEnd(metaclass=abc.ABCMeta): diff --git a/tests/system/aiplatform/test_featurestore.py b/tests/system/aiplatform/test_featurestore.py index d22119ea22..8e47c9f18d 100644 --- a/tests/system/aiplatform/test_featurestore.py +++ b/tests/system/aiplatform/test_featurestore.py @@ -15,6 +15,7 @@ # limitations under the License. # +import datetime import logging from google.cloud import aiplatform @@ -186,22 +187,17 @@ def test_ingest_feature_values(self, shared_state, caplog): gcs_source_uris=_TEST_USERS_ENTITY_TYPE_GCS_SRC, gcs_source_type="avro", entity_id_field="user_id", - worker_count=2, + worker_count=1, ) assert "EntityType feature values imported." in caplog.text caplog.clear() - def test_batch_create_features_and_ingest_feature_values( - self, shared_state, caplog - ): - + def test_batch_create_features(self, shared_state): assert shared_state["movie_entity_type"] movie_entity_type = shared_state["movie_entity_type"] - caplog.set_level(logging.INFO) - aiplatform.init( project=e2e_base._PROJECT, location=e2e_base._LOCATION, ) @@ -217,21 +213,107 @@ def test_batch_create_features_and_ingest_feature_values( movie_entity_type.batch_create_features(feature_configs=movie_feature_configs) - movie_entity_type.ingest_from_gcs( + list_movie_features = movie_entity_type.list_features() + assert len(list_movie_features) == 3 + + def test_ingest_feature_values_from_df_using_feature_time_column( + self, shared_state, caplog + ): + + assert shared_state["movie_entity_type"] + movie_entity_type = shared_state["movie_entity_type"] + + caplog.set_level(logging.INFO) + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + movies_df = pd.DataFrame( + data=[ + { + "movie_id": "movie_01", + "average_rating": 4.9, + "title": "The Shawshank Redemption", + "genres": "Drama", + "update_time": "2021-08-20 20:44:11.094375+00:00", + }, + { + "movie_id": "movie_02", + "average_rating": 4.2, + "title": "The Shining", + "genres": "Horror", + "update_time": "2021-08-20 20:44:11.094375+00:00", + }, + ], + columns=["movie_id", "average_rating", "title", "genres", "update_time"], + ) + movies_df = movies_df.astype({"update_time": "datetime64"}) + feature_time_column = "update_time" + + movie_entity_type.ingest_from_df( feature_ids=[ _TEST_MOVIE_TITLE_FEATURE_ID, _TEST_MOVIE_GENRES_FEATURE_ID, _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID, ], - feature_time="update_time", - gcs_source_uris=_TEST_MOVIES_ENTITY_TYPE_GCS_SRC, - gcs_source_type="avro", + feature_time=feature_time_column, + df_source=movies_df, entity_id_field="movie_id", - worker_count=2, + worker_count=1, ) - list_movie_features = movie_entity_type.list_features() - assert len(list_movie_features) == 3 + assert "EntityType feature values imported." in caplog.text + caplog.clear() + + def test_ingest_feature_values_from_df_using_feature_time_datetime( + self, shared_state, caplog + ): + assert shared_state["movie_entity_type"] + movie_entity_type = shared_state["movie_entity_type"] + + caplog.set_level(logging.INFO) + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + movies_df = pd.DataFrame( + data=[ + { + "movie_id": "movie_03", + "average_rating": 4.5, + "title": "Cinema Paradiso", + "genres": "Romance", + }, + { + "movie_id": "movie_04", + "average_rating": 4.6, + "title": "The Dark Knight", + "genres": "Action", + }, + ], + columns=["movie_id", "average_rating", "title", "genres"], + ) + + feature_time_datetime_str = datetime.datetime.now().isoformat( + sep=" ", timespec="milliseconds" + ) + feature_time_datetime = datetime.datetime.strptime( + feature_time_datetime_str, "%Y-%m-%d %H:%M:%S.%f" + ) + + movie_entity_type.ingest_from_df( + feature_ids=[ + _TEST_MOVIE_TITLE_FEATURE_ID, + _TEST_MOVIE_GENRES_FEATURE_ID, + _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID, + ], + feature_time=feature_time_datetime, + df_source=movies_df, + entity_id_field="movie_id", + worker_count=1, + ) assert "EntityType feature values imported." in caplog.text diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index a92043969e..5056c88ed1 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -19,13 +19,14 @@ import pytest import datetime import pandas as pd +import uuid from unittest import mock from importlib import reload from unittest.mock import patch from google.api_core import operation -from google.protobuf import field_mask_pb2 +from google.protobuf import field_mask_pb2, timestamp_pb2 from google.cloud import aiplatform from google.cloud.aiplatform import base @@ -51,11 +52,17 @@ types as gca_types, ) +from google.cloud import bigquery + # project _TEST_PROJECT = "test-project" _TEST_LOCATION = "us-central1" _TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}" +_TEST_FEATURE_TIME_DATETIME = datetime.datetime( + year=2022, month=1, day=1, hour=11, minute=59, second=59 +) + # featurestore _TEST_FEATURESTORE_ID = "featurestore_id" _TEST_FEATURESTORE_NAME = f"{_TEST_PARENT}/featurestores/{_TEST_FEATURESTORE_ID}" @@ -280,6 +287,43 @@ def _get_entity_view_proto(entity_id, feature_value_types, feature_values): return entity_view_proto +def mock_uuid(): + return uuid.UUID(int=1) + + +@pytest.fixture +def mock_init_bq_client(): + with patch.object(bigquery, "Client") as mock_init_bq_client: + yield mock_init_bq_client + + +@pytest.fixture +def mock_bq_dataset(): + with patch.object(bigquery, "Dataset") as mock_bq_dataset: + yield mock_bq_dataset + + +@pytest.fixture +def mock_bq_create_dataset(mock_bq_dataset): + with patch.object(bigquery.Client, "create_dataset") as mock_bq_create_dataset: + mock_bq_create_dataset.return_value = mock_bq_dataset + yield mock_bq_create_dataset + + +@pytest.fixture +def mock_bq_load_table_from_dataframe(mock_init_bq_client): + with patch.object( + bigquery.Client, "load_table_from_dataframe" + ) as mock_bq_load_table_from_dataframe: + yield mock_bq_load_table_from_dataframe + + +@pytest.fixture +def mock_bq_delete_dataset(mock_init_bq_client): + with patch.object(bigquery.Client, "delete_dataset") as mock_bq_delete_dataset: + yield mock_bq_delete_dataset + + # All Featurestore Mocks @pytest.fixture def get_featurestore_mock(): @@ -1218,6 +1262,97 @@ def test_ingest_from_gcs_with_invalid_gcs_source_type(self): gcs_source_type=_TEST_GCS_SOURCE_TYPE_INVALID, ) + @pytest.mark.usefixtures( + "get_entity_type_mock", + "mock_init_bq_client", + "mock_bq_dataset", + "mock_bq_create_dataset", + "mock_bq_load_table_from_dataframe", + "mock_bq_delete_dataset", + ) + @patch("uuid.uuid4", mock_uuid) + def test_ingest_from_df_using_column(self, import_feature_values_mock): + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) + df_source = pd.DataFrame() + my_entity_type.ingest_from_df( + feature_ids=_TEST_IMPORTING_FEATURE_IDS, + feature_time=_TEST_FEATURE_TIME_FIELD, + df_source=df_source, + feature_source_fields=_TEST_IMPORTING_FEATURE_SOURCE_FIELDS, + ) + mock_bq_create_dataset.location = my_entity_type.location + + temp_bq_dataset_name = f"temp_{_TEST_FEATURESTORE_ID}_{uuid.uuid4()}".replace( + "-", "_" + ) + temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[ + :1024 + ] + temp_bq_table_id = f"{temp_bq_dataset_id}.{_TEST_ENTITY_TYPE_ID}" + + true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( + entity_type=_TEST_ENTITY_TYPE_NAME, + feature_specs=[ + gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec( + id="my_feature_id_1", source_field="my_feature_id_1_source_field" + ), + ], + bigquery_source=gca_io.BigQuerySource(input_uri=f"bq://{temp_bq_table_id}"), + feature_time_field=_TEST_FEATURE_TIME_FIELD, + ) + + import_feature_values_mock.assert_called_once_with( + request=true_import_feature_values_request, metadata=_TEST_REQUEST_METADATA, + ) + + @pytest.mark.usefixtures( + "get_entity_type_mock", + "mock_init_bq_client", + "mock_bq_dataset", + "mock_bq_create_dataset", + "mock_bq_load_table_from_dataframe", + "mock_bq_delete_dataset", + ) + @patch("uuid.uuid4", mock_uuid) + def test_ingest_from_df_using_datetime(self, import_feature_values_mock): + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) + df_source = pd.DataFrame() + my_entity_type.ingest_from_df( + feature_ids=_TEST_IMPORTING_FEATURE_IDS, + feature_time=_TEST_FEATURE_TIME_DATETIME, + df_source=df_source, + feature_source_fields=_TEST_IMPORTING_FEATURE_SOURCE_FIELDS, + ) + mock_bq_create_dataset.location = my_entity_type.location + + temp_bq_dataset_name = f"temp_{_TEST_FEATURESTORE_ID}_{uuid.uuid4()}".replace( + "-", "_" + ) + temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[ + :1024 + ] + temp_bq_table_id = f"{temp_bq_dataset_id}.{_TEST_ENTITY_TYPE_ID}" + timestamp_proto = timestamp_pb2.Timestamp() + timestamp_proto.FromDatetime(_TEST_FEATURE_TIME_DATETIME) + true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( + entity_type=_TEST_ENTITY_TYPE_NAME, + feature_specs=[ + gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec( + id="my_feature_id_1", source_field="my_feature_id_1_source_field" + ), + ], + bigquery_source=gca_io.BigQuerySource(input_uri=f"bq://{temp_bq_table_id}"), + feature_time=timestamp_proto, + ) + + import_feature_values_mock.assert_called_once_with( + request=true_import_feature_values_request, metadata=_TEST_REQUEST_METADATA, + ) + @pytest.mark.usefixtures("get_entity_type_mock", "get_feature_mock") def test_read_single_entity(self, read_feature_values_mock): aiplatform.init(project=_TEST_PROJECT) From f2f2f1ec9891598c0e92d87f64bcb7c5523aff72 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Tue, 25 Jan 2022 16:25:07 -0800 Subject: [PATCH 2/5] fix: remove bq create_dataset, docstrings, mocks --- .../aiplatform/featurestore/entity_type.py | 85 ++++++------------- tests/unit/aiplatform/test_featurestores.py | 17 +--- tests/unit/aiplatform/test_utils.py | 26 +----- 3 files changed, 34 insertions(+), 94 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index 6d5a42e596..db3d150c3f 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -797,23 +797,16 @@ def _validate_and_get_import_feature_values_request( If not provided, the source column need to be the same as the Feature ID. Example: + feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - - In case all features' source field and ID match: - feature_source_fields = None or {} - - In case all features' source field and ID do not match: - feature_source_fields = { + feature_source_fields = { 'my_feature_id_1': 'my_feature_id_1_source_field', - 'my_feature_id_2': 'my_feature_id_2_source_field', - 'my_feature_id_3': 'my_feature_id_3_source_field', - } + } + + Note: + The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field', + The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'. - In case some features' source field and ID do not match: - feature_source_fields = { - 'my_feature_id_1': 'my_feature_id_1_source_field', - } entity_id_field (str): Optional. Source column that holds entity IDs. If not provided, entity IDs are extracted from the column named ``entity_id``. @@ -956,23 +949,16 @@ def ingest_from_bq( If not provided, the source column need to be the same as the Feature ID. Example: + feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - - In case all features' source field and ID match: - feature_source_fields = None or {} - - In case all features' source field and ID do not match: - feature_source_fields = { + feature_source_fields = { 'my_feature_id_1': 'my_feature_id_1_source_field', - 'my_feature_id_2': 'my_feature_id_2_source_field', - 'my_feature_id_3': 'my_feature_id_3_source_field', - } + } + + Note: + The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field', + The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'. - In case some features' source field and ID do not match: - feature_source_fields = { - 'my_feature_id_1': 'my_feature_id_1_source_field', - } entity_id_field (str): Optional. Source column that holds entity IDs. If not provided, entity IDs are extracted from the column named ``entity_id``. @@ -1068,23 +1054,16 @@ def ingest_from_gcs( If not provided, the source column need to be the same as the Feature ID. Example: + feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - - In case all features' source field and ID match: - feature_source_fields = None or {} - - In case all features' source field and ID do not match: - feature_source_fields = { + feature_source_fields = { 'my_feature_id_1': 'my_feature_id_1_source_field', - 'my_feature_id_2': 'my_feature_id_2_source_field', - 'my_feature_id_3': 'my_feature_id_3_source_field', - } + } + + Note: + The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field', + The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'. - In case some features' source field and ID do not match: - feature_source_fields = { - 'my_feature_id_1': 'my_feature_id_1_source_field', - } entity_id_field (str): Optional. Source column that holds entity IDs. If not provided, entity IDs are extracted from the column named ``entity_id``. @@ -1195,23 +1174,16 @@ def ingest_from_df( If not provided, the source column need to be the same as the Feature ID. Example: + feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] - - In case all features' source field and ID match: - feature_source_fields = None or {} - - In case all features' source field and ID do not match: - feature_source_fields = { + feature_source_fields = { 'my_feature_id_1': 'my_feature_id_1_source_field', - 'my_feature_id_2': 'my_feature_id_2_source_field', - 'my_feature_id_3': 'my_feature_id_3_source_field', - } + } + + Note: + The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field', + The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'. - In case some features' source field and ID do not match: - feature_source_fields = { - 'my_feature_id_1': 'my_feature_id_1_source_field', - } entity_id_field (str): Optional. Source column that holds entity IDs. If not provided, entity IDs are extracted from the column named ``entity_id``. @@ -1265,7 +1237,6 @@ def ingest_from_df( temp_bq_dataset = bigquery.Dataset(dataset_ref=temp_bq_dataset_id) temp_bq_dataset.location = self.location - temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset) try: job = bigquery_client.load_table_from_dataframe( diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index 5056c88ed1..2246da3415 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -304,14 +304,7 @@ def mock_bq_dataset(): @pytest.fixture -def mock_bq_create_dataset(mock_bq_dataset): - with patch.object(bigquery.Client, "create_dataset") as mock_bq_create_dataset: - mock_bq_create_dataset.return_value = mock_bq_dataset - yield mock_bq_create_dataset - - -@pytest.fixture -def mock_bq_load_table_from_dataframe(mock_init_bq_client): +def mock_bq_load_table_from_dataframe(): with patch.object( bigquery.Client, "load_table_from_dataframe" ) as mock_bq_load_table_from_dataframe: @@ -319,7 +312,7 @@ def mock_bq_load_table_from_dataframe(mock_init_bq_client): @pytest.fixture -def mock_bq_delete_dataset(mock_init_bq_client): +def mock_bq_delete_dataset(): with patch.object(bigquery.Client, "delete_dataset") as mock_bq_delete_dataset: yield mock_bq_delete_dataset @@ -1266,7 +1259,6 @@ def test_ingest_from_gcs_with_invalid_gcs_source_type(self): "get_entity_type_mock", "mock_init_bq_client", "mock_bq_dataset", - "mock_bq_create_dataset", "mock_bq_load_table_from_dataframe", "mock_bq_delete_dataset", ) @@ -1282,7 +1274,7 @@ def test_ingest_from_df_using_column(self, import_feature_values_mock): df_source=df_source, feature_source_fields=_TEST_IMPORTING_FEATURE_SOURCE_FIELDS, ) - mock_bq_create_dataset.location = my_entity_type.location + mock_bq_dataset.location = my_entity_type.location temp_bq_dataset_name = f"temp_{_TEST_FEATURESTORE_ID}_{uuid.uuid4()}".replace( "-", "_" @@ -1311,7 +1303,6 @@ def test_ingest_from_df_using_column(self, import_feature_values_mock): "get_entity_type_mock", "mock_init_bq_client", "mock_bq_dataset", - "mock_bq_create_dataset", "mock_bq_load_table_from_dataframe", "mock_bq_delete_dataset", ) @@ -1327,7 +1318,7 @@ def test_ingest_from_df_using_datetime(self, import_feature_values_mock): df_source=df_source, feature_source_fields=_TEST_IMPORTING_FEATURE_SOURCE_FIELDS, ) - mock_bq_create_dataset.location = my_entity_type.location + mock_bq_dataset.location = my_entity_type.location temp_bq_dataset_name = f"temp_{_TEST_FEATURESTORE_ID}_{uuid.uuid4()}".replace( "-", "_" diff --git a/tests/unit/aiplatform/test_utils.py b/tests/unit/aiplatform/test_utils.py index d4840609b1..b47eb684d8 100644 --- a/tests/unit/aiplatform/test_utils.py +++ b/tests/unit/aiplatform/test_utils.py @@ -19,7 +19,6 @@ import pytest from typing import Callable, Dict, Optional import datetime -from decimal import Decimal from google.protobuf import timestamp_pb2 @@ -326,28 +325,8 @@ def test_client_w_override_select_version(): @pytest.mark.parametrize( "year,month,day,hour,minute,second,microsecond,expected_seconds,expected_nanos", [ - ( - 2021, - 12, - 23, - 23, - 59, - 59, - 999999, - 1640303999, - int(str(Decimal(1640303999.999999)).split(".")[1][:9]), - ), - ( - 2013, - 1, - 1, - 1, - 1, - 1, - 199999, - 1357002061, - int(str(Decimal(1357002061.199999)).split(".")[1][:9]), - ), + (2021, 12, 23, 23, 59, 59, 999999, 1640303999, 999000000,), + (2013, 1, 1, 1, 1, 1, 199999, 1357002061, 199000000,), ], ) def test_get_timestamp_proto( @@ -369,7 +348,6 @@ def test_get_timestamp_proto( minute=minute, second=second, microsecond=microsecond, - tzinfo=datetime.timezone.utc, ) true_timestamp_proto = timestamp_pb2.Timestamp( seconds=expected_seconds, nanos=expected_nanos From ca613d74a62b3e25af863021cc826f26cec5fd2a Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Tue, 25 Jan 2022 16:29:17 -0800 Subject: [PATCH 3/5] fix: e2e_base project --- tests/system/aiplatform/e2e_base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/system/aiplatform/e2e_base.py b/tests/system/aiplatform/e2e_base.py index 3a35a12707..61b9e7f36c 100644 --- a/tests/system/aiplatform/e2e_base.py +++ b/tests/system/aiplatform/e2e_base.py @@ -29,7 +29,6 @@ _PROJECT = os.getenv("BUILD_SPECIFIC_GCLOUD_PROJECT") _LOCATION = "us-central1" -_PROJECT = "aiplatform-dev" class TestEndToEnd(metaclass=abc.ABCMeta): From de3206c482854b6438713805f980b09c313382cc Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Wed, 26 Jan 2022 15:41:58 -0800 Subject: [PATCH 4/5] fix: delete two optional args, add note for temp bq dataset, revert deleting bq dataset create, add featurestore_extra_require, update ic tests to use online read to validate feature value ingestionfrom df --- .../aiplatform/featurestore/entity_type.py | 27 ++---- setup.py | 2 + tests/system/aiplatform/test_featurestore.py | 88 ++++++++++++++----- 3 files changed, 78 insertions(+), 39 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index db3d150c3f..60e8ee84de 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -1135,12 +1135,16 @@ def ingest_from_df( df_source: "pd.DataFrame", # noqa: F821 - skip check for undefined name 'pd' feature_source_fields: Optional[Dict[str, str]] = None, entity_id_field: Optional[str] = None, - disable_online_serving: Optional[bool] = None, - worker_count: Optional[int] = None, request_metadata: Optional[Sequence[Tuple[str, str]]] = (), ) -> "EntityType": """Ingest feature values from DataFrame. + Note: + Calling this method will automatically create a temporary + bigquery dataset in the same GCP project, which will be used + as the intermediary storage for ingesting feature values + from dataframe to featurestore. + Args: feature_ids (List[str]): Required. IDs of the Feature to import values @@ -1187,21 +1191,6 @@ def ingest_from_df( entity_id_field (str): Optional. Source column that holds entity IDs. If not provided, entity IDs are extracted from the column named ``entity_id``. - disable_online_serving (bool): - Optional. If set, data will not be imported for online - serving. This is typically used for backfilling, - where Feature generation timestamps are not in - the timestamp range needed for online serving. - worker_count (int): - Optional. Specifies the number of workers that are used - to write data to the Featurestore. Consider the - online serving capacity that you require to - achieve the desired import throughput without - interfering with online serving. The value must - be positive, and less than or equal to 100. If - not set, defaults to using 1 worker. The low - count ensures minimal impact on online serving - performance. request_metadata (Sequence[Tuple[str, str]]): Optional. Strings which should be sent along with the request as metadata. @@ -1238,6 +1227,8 @@ def ingest_from_df( temp_bq_dataset = bigquery.Dataset(dataset_ref=temp_bq_dataset_id) temp_bq_dataset.location = self.location + temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset) + try: job = bigquery_client.load_table_from_dataframe( dataframe=df_source, destination=temp_bq_table_id @@ -1250,8 +1241,6 @@ def ingest_from_df( bq_source_uri=f"bq://{temp_bq_table_id}", feature_source_fields=feature_source_fields, entity_id_field=entity_id_field, - disable_online_serving=disable_online_serving, - worker_count=worker_count, request_metadata=request_metadata, ) diff --git a/setup.py b/setup.py index 2b8c58f033..011da8aea8 100644 --- a/setup.py +++ b/setup.py @@ -47,6 +47,7 @@ "werkzeug >= 2.0.0", "tensorflow >=2.4.0", ] +featurestore_extra_require = ["pandas >= 1.0.0", "pyarrow >= 6.0.1"] full_extra_require = list( set( @@ -54,6 +55,7 @@ + metadata_extra_require + xai_extra_require + lit_extra_require + + featurestore_extra_require ) ) testing_extra_require = ( diff --git a/tests/system/aiplatform/test_featurestore.py b/tests/system/aiplatform/test_featurestore.py index 8e47c9f18d..73685ee0a9 100644 --- a/tests/system/aiplatform/test_featurestore.py +++ b/tests/system/aiplatform/test_featurestore.py @@ -216,7 +216,7 @@ def test_batch_create_features(self, shared_state): list_movie_features = movie_entity_type.list_features() assert len(list_movie_features) == 3 - def test_ingest_feature_values_from_df_using_feature_time_column( + def test_ingest_feature_values_from_df_using_feature_time_column_and_online_read_multiple_entities( self, shared_state, caplog ): @@ -229,6 +229,33 @@ def test_ingest_feature_values_from_df_using_feature_time_column( project=e2e_base._PROJECT, location=e2e_base._LOCATION, ) + read_feature_ids = ["average_rating", "title", "genres"] + + movie_entity_views_df_before_ingest = movie_entity_type.read( + entity_ids=["movie_01", "movie_02"], feature_ids=read_feature_ids, + ) + expected_data_before_ingest = [ + { + "entity_id": "movie_01", + "average_rating": None, + "title": None, + "genres": None, + }, + { + "entity_id": "movie_02", + "average_rating": None, + "title": None, + "genres": None, + }, + ] + expected_movie_entity_views_df_before_ingest = pd.DataFrame( + data=expected_data_before_ingest, columns=read_feature_ids + ) + + movie_entity_views_df_before_ingest.equals( + expected_movie_entity_views_df_before_ingest + ) + movies_df = pd.DataFrame( data=[ { @@ -260,13 +287,37 @@ def test_ingest_feature_values_from_df_using_feature_time_column( feature_time=feature_time_column, df_source=movies_df, entity_id_field="movie_id", - worker_count=1, + ) + + movie_entity_views_df_after_ingest = movie_entity_type.read( + entity_ids=["movie_01", "movie_02"], feature_ids=read_feature_ids, + ) + expected_data_after_ingest = [ + { + "movie_id": "movie_01", + "average_rating": 4.9, + "title": "The Shawshank Redemption", + "genres": "Drama", + }, + { + "movie_id": "movie_02", + "average_rating": 4.2, + "title": "The Shining", + "genres": "Horror", + }, + ] + expected_movie_entity_views_df_after_ingest = pd.DataFrame( + data=expected_data_after_ingest, columns=read_feature_ids + ) + + movie_entity_views_df_after_ingest.equals( + expected_movie_entity_views_df_after_ingest ) assert "EntityType feature values imported." in caplog.text caplog.clear() - def test_ingest_feature_values_from_df_using_feature_time_datetime( + def test_ingest_feature_values_from_df_using_feature_time_datetime_and_online_read_single_entity( self, shared_state, caplog ): assert shared_state["movie_entity_type"] @@ -312,7 +363,20 @@ def test_ingest_feature_values_from_df_using_feature_time_datetime( feature_time=feature_time_datetime, df_source=movies_df, entity_id_field="movie_id", - worker_count=1, + ) + + movie_entity_views_df_avg_rating = movie_entity_type.read( + entity_ids="movie_04", feature_ids="average_rating", + ) + expected_data_avg_rating = [ + {"movie_id": "movie_04", "average_rating": 4.6}, + ] + expected_movie_entity_views_df_avg_rating = pd.DataFrame( + data=expected_data_avg_rating, columns=["average_rating"] + ) + + movie_entity_views_df_avg_rating.equals( + expected_movie_entity_views_df_avg_rating ) assert "EntityType feature values imported." in caplog.text @@ -331,19 +395,3 @@ def test_search_features(self, shared_state): assert ( len(list_searched_features) - shared_state["base_list_searched_features"] ) == 6 - - def test_online_reads(self, shared_state): - assert shared_state["user_entity_type"] - assert shared_state["movie_entity_type"] - - user_entity_type = shared_state["user_entity_type"] - movie_entity_type = shared_state["movie_entity_type"] - - user_entity_views = user_entity_type.read(entity_ids="alice") - assert type(user_entity_views) == pd.DataFrame - - movie_entity_views = movie_entity_type.read( - entity_ids=["movie_01", "movie_04"], - feature_ids=[_TEST_MOVIE_TITLE_FEATURE_ID, _TEST_MOVIE_GENRES_FEATURE_ID], - ) - assert type(movie_entity_views) == pd.DataFrame From a6702e81adba00680cd29badcdab880cf1c087a8 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Wed, 26 Jan 2022 21:12:27 -0800 Subject: [PATCH 5/5] fix: add a comment of call complete upon ingestion, update unit tests --- .../aiplatform/featurestore/entity_type.py | 5 +- tests/unit/aiplatform/test_featurestores.py | 101 ++++++++++++------ 2 files changed, 70 insertions(+), 36 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index 60e8ee84de..8a85b1aa7a 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -1140,11 +1140,14 @@ def ingest_from_df( """Ingest feature values from DataFrame. Note: - Calling this method will automatically create a temporary + Calling this method will automatically create and delete a temporary bigquery dataset in the same GCP project, which will be used as the intermediary storage for ingesting feature values from dataframe to featurestore. + The call will return upon ingestion completes, where the + feature values will be ingested into the entity_type. + Args: feature_ids (List[str]): Required. IDs of the Feature to import values diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index b736467f79..449d0348c3 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -23,7 +23,7 @@ from unittest import mock from importlib import reload -from unittest.mock import patch +from unittest.mock import MagicMock, patch from google.api_core import operation from google.protobuf import field_mask_pb2, timestamp_pb2 @@ -328,34 +328,55 @@ def _get_entity_view_proto(entity_id, feature_value_types, feature_values): return entity_view_proto -def mock_uuid(): +def uuid_mock(): return uuid.UUID(int=1) +# All BigQuery Mocks @pytest.fixture -def mock_init_bq_client(): - with patch.object(bigquery, "Client") as mock_init_bq_client: - yield mock_init_bq_client +def bq_client_mock(): + mock = MagicMock(bigquery.client.Client) + yield mock @pytest.fixture -def mock_bq_dataset(): - with patch.object(bigquery, "Dataset") as mock_bq_dataset: - yield mock_bq_dataset +def bq_dataset_mock(): + mock = MagicMock(bigquery.dataset.Dataset) + yield mock @pytest.fixture -def mock_bq_load_table_from_dataframe(): +def bq_init_client_mock(bq_client_mock): + with patch.object(bigquery, "Client") as bq_init_client_mock: + bq_init_client_mock.return_value = bq_client_mock + yield bq_init_client_mock + + +@pytest.fixture +def bq_init_dataset_mock(bq_dataset_mock): + with patch.object(bigquery, "Dataset") as bq_init_dataset_mock: + bq_init_dataset_mock.return_value = bq_dataset_mock + yield bq_init_dataset_mock + + +@pytest.fixture +def bq_create_dataset_mock(bq_init_client_mock): + with patch.object(bigquery.Client, "create_dataset") as bq_create_dataset_mock: + yield bq_create_dataset_mock + + +@pytest.fixture +def bq_load_table_from_dataframe_mock(bq_init_client_mock): with patch.object( bigquery.Client, "load_table_from_dataframe" - ) as mock_bq_load_table_from_dataframe: - yield mock_bq_load_table_from_dataframe + ) as bq_load_table_from_dataframe_mock: + yield bq_load_table_from_dataframe_mock @pytest.fixture -def mock_bq_delete_dataset(): - with patch.object(bigquery.Client, "delete_dataset") as mock_bq_delete_dataset: - yield mock_bq_delete_dataset +def bq_delete_dataset_mock(bq_init_client_mock): + with patch.object(bigquery.Client, "delete_dataset") as bq_delete_dataset_mock: + yield bq_delete_dataset_mock # All Featurestore Mocks @@ -1591,13 +1612,15 @@ def test_ingest_from_gcs_with_invalid_gcs_source_type(self): @pytest.mark.usefixtures( "get_entity_type_mock", - "mock_init_bq_client", - "mock_bq_dataset", - "mock_bq_load_table_from_dataframe", - "mock_bq_delete_dataset", + "bq_init_client_mock", + "bq_init_dataset_mock", + "bq_create_dataset_mock", + "bq_load_table_from_dataframe_mock", + "bq_delete_dataset_mock", ) - @patch("uuid.uuid4", mock_uuid) + @patch("uuid.uuid4", uuid_mock) def test_ingest_from_df_using_column(self, import_feature_values_mock): + aiplatform.init(project=_TEST_PROJECT) my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) @@ -1608,15 +1631,15 @@ def test_ingest_from_df_using_column(self, import_feature_values_mock): df_source=df_source, feature_source_fields=_TEST_IMPORTING_FEATURE_SOURCE_FIELDS, ) - mock_bq_dataset.location = my_entity_type.location - - temp_bq_dataset_name = f"temp_{_TEST_FEATURESTORE_ID}_{uuid.uuid4()}".replace( + expected_temp_bq_dataset_name = f"temp_{_TEST_FEATURESTORE_ID}_{uuid.uuid4()}".replace( "-", "_" ) - temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[ + expecte_temp_bq_dataset_id = f"{initializer.global_config.project}.{expected_temp_bq_dataset_name}"[ :1024 ] - temp_bq_table_id = f"{temp_bq_dataset_id}.{_TEST_ENTITY_TYPE_ID}" + expected_temp_bq_table_id = ( + f"{expecte_temp_bq_dataset_id}.{_TEST_ENTITY_TYPE_ID}" + ) true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( entity_type=_TEST_ENTITY_TYPE_NAME, @@ -1625,7 +1648,9 @@ def test_ingest_from_df_using_column(self, import_feature_values_mock): id="my_feature_id_1", source_field="my_feature_id_1_source_field" ), ], - bigquery_source=gca_io.BigQuerySource(input_uri=f"bq://{temp_bq_table_id}"), + bigquery_source=gca_io.BigQuerySource( + input_uri=f"bq://{expected_temp_bq_table_id}" + ), feature_time_field=_TEST_FEATURE_TIME_FIELD, ) @@ -1635,12 +1660,13 @@ def test_ingest_from_df_using_column(self, import_feature_values_mock): @pytest.mark.usefixtures( "get_entity_type_mock", - "mock_init_bq_client", - "mock_bq_dataset", - "mock_bq_load_table_from_dataframe", - "mock_bq_delete_dataset", + "bq_init_client_mock", + "bq_init_dataset_mock", + "bq_create_dataset_mock", + "bq_load_table_from_dataframe_mock", + "bq_delete_dataset_mock", ) - @patch("uuid.uuid4", mock_uuid) + @patch("uuid.uuid4", uuid_mock) def test_ingest_from_df_using_datetime(self, import_feature_values_mock): aiplatform.init(project=_TEST_PROJECT) @@ -1652,17 +1678,20 @@ def test_ingest_from_df_using_datetime(self, import_feature_values_mock): df_source=df_source, feature_source_fields=_TEST_IMPORTING_FEATURE_SOURCE_FIELDS, ) - mock_bq_dataset.location = my_entity_type.location - temp_bq_dataset_name = f"temp_{_TEST_FEATURESTORE_ID}_{uuid.uuid4()}".replace( + expected_temp_bq_dataset_name = f"temp_{_TEST_FEATURESTORE_ID}_{uuid.uuid4()}".replace( "-", "_" ) - temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[ + expecte_temp_bq_dataset_id = f"{initializer.global_config.project}.{expected_temp_bq_dataset_name}"[ :1024 ] - temp_bq_table_id = f"{temp_bq_dataset_id}.{_TEST_ENTITY_TYPE_ID}" + expected_temp_bq_table_id = ( + f"{expecte_temp_bq_dataset_id}.{_TEST_ENTITY_TYPE_ID}" + ) + timestamp_proto = timestamp_pb2.Timestamp() timestamp_proto.FromDatetime(_TEST_FEATURE_TIME_DATETIME) + true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( entity_type=_TEST_ENTITY_TYPE_NAME, feature_specs=[ @@ -1670,7 +1699,9 @@ def test_ingest_from_df_using_datetime(self, import_feature_values_mock): id="my_feature_id_1", source_field="my_feature_id_1_source_field" ), ], - bigquery_source=gca_io.BigQuerySource(input_uri=f"bq://{temp_bq_table_id}"), + bigquery_source=gca_io.BigQuerySource( + input_uri=f"bq://{expected_temp_bq_table_id}" + ), feature_time=timestamp_proto, )