From 24ece4d4991e0fb5bc0380f552209f902714609a Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Wed, 16 Feb 2022 11:10:11 -0800 Subject: [PATCH 01/10] samples: add feature store samples --- samples/model-builder/conftest.py | 45 ++++++++++++++ .../entity_type_ingest_from_df_sample.py | 58 ++++++++++++++++++ .../entity_type_ingest_from_df_sample_test.py | 33 ++++++++++ ...batch_serve_feature_values_to_df_sample.py | 60 +++++++++++++++++++ 4 files changed, 196 insertions(+) create mode 100644 samples/model-builder/entity_type_ingest_from_df_sample.py create mode 100644 samples/model-builder/entity_type_ingest_from_df_sample_test.py create mode 100644 samples/model-builder/featurestore_batch_serve_feature_values_to_df_sample.py diff --git a/samples/model-builder/conftest.py b/samples/model-builder/conftest.py index c6bbd30fc0..a0200e87a1 100644 --- a/samples/model-builder/conftest.py +++ b/samples/model-builder/conftest.py @@ -364,3 +364,48 @@ def mock_endpoint_explain(mock_endpoint): with patch.object(mock_endpoint, "explain") as mock_endpoint_explain: mock_get_endpoint.return_value = mock_endpoint yield mock_endpoint_explain + + +""" +---------------------------------------------------------------------------- +Featurestore Fixtures +---------------------------------------------------------------------------- +""" + + +@pytest.fixture +def mock_featurestore(): + mock = MagicMock(aiplatform.featurestore.Featurestore) + yield mock + + +@pytest.fixture +def mock_entity_type(): + mock = MagicMock(aiplatform.featurestore.EntityType) + yield mock + + +@pytest.fixture +def mock_get_featurestore(mock_featurestore): + with patch.object(aiplatform, "Featurestore") as mock_get_featurestore: + mock_get_featurestore.return_value = mock_featurestore + yield mock_get_featurestore + + +@pytest.fixture +def mock_batch_serve_to_df(mock_featurestore): + with patch.object(mock_featurestore, "batch_serve_to_df") as mock_batch_serve_to_df: + yield mock_batch_serve_to_df + + +@pytest.fixture +def mock_get_entity_type(mock_entity_type): + with patch.object(aiplatform, "EntityType") as mock_get_entity_type: + mock_get_entity_type.return_value = mock_entity_type + yield mock_get_entity_type + + +@pytest.fixture +def mock_ingest_from_df(mock_entity_type): + with patch.object(mock_entity_type, "ingest_from_df") as mock_ingest_from_df: + yield mock_ingest_from_df diff --git a/samples/model-builder/entity_type_ingest_from_df_sample.py b/samples/model-builder/entity_type_ingest_from_df_sample.py new file mode 100644 index 0000000000..b93d7d60b2 --- /dev/null +++ b/samples/model-builder/entity_type_ingest_from_df_sample.py @@ -0,0 +1,58 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pandas as pd + +from google.cloud import aiplatform + +# [START aiplatform_sdk_entity_type_ingest_feature_values_from_df_sample] +def entity_type_ingest_feature_values_from_df_with_feature_time_field_sample( + project: str, location: str, entity_type_name: str, +): + aiplatform.init(project=project, location=location) + + et = aiplatform.EntityType(entity_type_name=entity_type_name) + + df_source = pd.DataFrame( + data=[ + { + "movie_id": "movie_01", + "average_rating": 4.9, + "title": "The Shawshank Redemption", + "genres": "Drama", + "update_time": "2021-08-20 20:44:11.094375+00:00", + }, + { + "movie_id": "movie_02", + "average_rating": 4.2, + "title": "The Shining", + "genres": "Horror", + "update_time": "2021-08-20 20:44:11.094375+00:00", + }, + ], + columns=["movie_id", "average_rating", "title", "genres", "update_time"], + ) + + et.ingest_from_df( + feature_ids=["movid_id", "average_rating", "title", "genres"], + feature_time="update_time", + df_source=df_source, + entity_id_field="movie_id", + ) + + return et + + +# [END aiplatform_sdk_entity_type_ingest_feature_values_from_df_sample] diff --git a/samples/model-builder/entity_type_ingest_from_df_sample_test.py b/samples/model-builder/entity_type_ingest_from_df_sample_test.py new file mode 100644 index 0000000000..66e7f628f9 --- /dev/null +++ b/samples/model-builder/entity_type_ingest_from_df_sample_test.py @@ -0,0 +1,33 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import entity_type_ingest_from_df_sample + +import test_constants as constants + + +def test_image_dataset_create_classification_sample( + mock_sdk_init, mock_get_entity_type, mock_ingest_from_df +): + entity_type_ingest_from_df_sample.entity_type_ingest_feature_values_from_df_with_feature_time_field_sample( + project=constants.PROJECT, + location=constants.LOCATION, + ) + + mock_sdk_init.assert_called_once_with( + project=constants.PROJECT, location=constants.LOCATION + ) + mock_ingest_from_df.assert_called_once_with( + ) diff --git a/samples/model-builder/featurestore_batch_serve_feature_values_to_df_sample.py b/samples/model-builder/featurestore_batch_serve_feature_values_to_df_sample.py new file mode 100644 index 0000000000..557bc6f9d1 --- /dev/null +++ b/samples/model-builder/featurestore_batch_serve_feature_values_to_df_sample.py @@ -0,0 +1,60 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pandas as pd + +from google.cloud import aiplatform + +# [START aiplatform_sdk_featurestore_batch_serve_feature_values_to_df_sample] +def featurestore_batch_serve_feature_values_to_df_sample( + project: str, location: str, featurestore_name: str, +): + aiplatform.init(project=project, location=location) + + fs = aiplatform.Featurestore(featurestore_name=featurestore_name) + + read_instances_df = pd.DataFrame( + data=[ + ["alice", "movie_01", "2021-09-15T08:28:14Z"], + ["bob", "movie_02", "2021-09-15T08:28:14Z"], + ["dav", "movie_03", "2021-09-15T08:28:14Z"], + ["eve", "movie_04", "2021-09-15T08:28:14Z"], + ["alice", "movie_03", "2021-09-14T09:35:15Z"], + ["bob", "movie_04", "2020-02-14T09:35:15Z"], + ], + columns=["users_entity_type_id", "movies_entity_type_id", "timestamp"], + ) + read_instances_df = read_instances_df.astype({"timestamp": "datetime64"}) + + df = fs.batch_serve_to_df( + serving_feature_ids={ + "users_entity_type_id": [ + "user_age_feature_id", + "user_gender_feature_id", + "user_liked_genres_feature_id", + ], + "movies_entity_type_id": [ + "movie_title_feature_id", + "movie_genres_feature_id", + "movie_average_rating_feature_id", + ], + }, + read_instances_df=read_instances_df, + ) + + return df + + +# [END aiplatform_sdk_featurestore_batch_serve_feature_values_to_df_sample] From ee55d1efa1bfcf31a9bc098d18247d5d58cb833b Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Wed, 16 Feb 2022 20:03:00 -0800 Subject: [PATCH 02/10] fix: force bq has a data type for temp table before ingestion --- .../aiplatform/featurestore/entity_type.py | 22 ++++++++++++++++++- .../aiplatform/utils/featurestore_utils.py | 12 ++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index 274f89d2aa..1f49a66391 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -1259,9 +1259,29 @@ def ingest_from_df( temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset) + feature_source_fields = feature_source_fields or {} + try: + + schema = [] + for feature_id in feature_ids: + feature_value_type = self.get_feature(feature_id).to_dict()["valueType"] + bq_data_type = utils.featurestore_utils.FEATURE_STORE_VALUE_TYPE_TO_BQ_DATA_TYPE_MAP[ + feature_value_type + ] + bq_schema_field = bigquery.SchemaField( + name=feature_source_fields.get(feature_id, feature_id), + field_type=bq_data_type["field_type"], + mode=bq_data_type.get("mode") or "NULLABLE", + ) + schema.append(bq_schema_field) + + job_config = bigquery.LoadJobConfig(schema=schema) + job = bigquery_client.load_table_from_dataframe( - dataframe=df_source, destination=temp_bq_table_id + dataframe=df_source, + destination=temp_bq_table_id, + job_config=job_config, ) job.result() diff --git a/google/cloud/aiplatform/utils/featurestore_utils.py b/google/cloud/aiplatform/utils/featurestore_utils.py index 45dbbbf44f..195a12a733 100644 --- a/google/cloud/aiplatform/utils/featurestore_utils.py +++ b/google/cloud/aiplatform/utils/featurestore_utils.py @@ -33,6 +33,18 @@ _FEATURE_VALUE_TYPE_UNSPECIFIED = "VALUE_TYPE_UNSPECIFIED" +FEATURE_STORE_VALUE_TYPE_TO_BQ_DATA_TYPE_MAP = { + "BOOL": {"field_type": "BOOL"}, + "BOOL_ARRAY": {"field_type": "BOOL", "mode": "REPEATED"}, + "DOUBLE": {"field_type": "DOUBLE"}, + "DOUBLE_ARRAY": {"field_type": "DOUBLE", "mode": "REPEATED"}, + "INT64": {"field_type": "INT64"}, + "INT64_ARRAY": {"field_type": "INT64", "mode": "REPEATED"}, + "STRING": {"field_type": "STRING"}, + "STRING_ARRAY": {"field_type": "STRING", "mode": "REPEATED"}, + "BYTES": {"field_type": "BYTES"}, +} + def validate_id(resource_id: str) -> None: """Validates feature store resource ID pattern. From 9c95e644e5bd9630c2104e8e4353e85eaf168843 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Wed, 16 Feb 2022 20:15:31 -0800 Subject: [PATCH 03/10] Revert "samples: add feature store samples" This reverts commit 24ece4d4991e0fb5bc0380f552209f902714609a. --- samples/model-builder/conftest.py | 45 -------------- .../entity_type_ingest_from_df_sample.py | 58 ------------------ .../entity_type_ingest_from_df_sample_test.py | 33 ---------- ...batch_serve_feature_values_to_df_sample.py | 60 ------------------- 4 files changed, 196 deletions(-) delete mode 100644 samples/model-builder/entity_type_ingest_from_df_sample.py delete mode 100644 samples/model-builder/entity_type_ingest_from_df_sample_test.py delete mode 100644 samples/model-builder/featurestore_batch_serve_feature_values_to_df_sample.py diff --git a/samples/model-builder/conftest.py b/samples/model-builder/conftest.py index a0200e87a1..c6bbd30fc0 100644 --- a/samples/model-builder/conftest.py +++ b/samples/model-builder/conftest.py @@ -364,48 +364,3 @@ def mock_endpoint_explain(mock_endpoint): with patch.object(mock_endpoint, "explain") as mock_endpoint_explain: mock_get_endpoint.return_value = mock_endpoint yield mock_endpoint_explain - - -""" ----------------------------------------------------------------------------- -Featurestore Fixtures ----------------------------------------------------------------------------- -""" - - -@pytest.fixture -def mock_featurestore(): - mock = MagicMock(aiplatform.featurestore.Featurestore) - yield mock - - -@pytest.fixture -def mock_entity_type(): - mock = MagicMock(aiplatform.featurestore.EntityType) - yield mock - - -@pytest.fixture -def mock_get_featurestore(mock_featurestore): - with patch.object(aiplatform, "Featurestore") as mock_get_featurestore: - mock_get_featurestore.return_value = mock_featurestore - yield mock_get_featurestore - - -@pytest.fixture -def mock_batch_serve_to_df(mock_featurestore): - with patch.object(mock_featurestore, "batch_serve_to_df") as mock_batch_serve_to_df: - yield mock_batch_serve_to_df - - -@pytest.fixture -def mock_get_entity_type(mock_entity_type): - with patch.object(aiplatform, "EntityType") as mock_get_entity_type: - mock_get_entity_type.return_value = mock_entity_type - yield mock_get_entity_type - - -@pytest.fixture -def mock_ingest_from_df(mock_entity_type): - with patch.object(mock_entity_type, "ingest_from_df") as mock_ingest_from_df: - yield mock_ingest_from_df diff --git a/samples/model-builder/entity_type_ingest_from_df_sample.py b/samples/model-builder/entity_type_ingest_from_df_sample.py deleted file mode 100644 index b93d7d60b2..0000000000 --- a/samples/model-builder/entity_type_ingest_from_df_sample.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import pandas as pd - -from google.cloud import aiplatform - -# [START aiplatform_sdk_entity_type_ingest_feature_values_from_df_sample] -def entity_type_ingest_feature_values_from_df_with_feature_time_field_sample( - project: str, location: str, entity_type_name: str, -): - aiplatform.init(project=project, location=location) - - et = aiplatform.EntityType(entity_type_name=entity_type_name) - - df_source = pd.DataFrame( - data=[ - { - "movie_id": "movie_01", - "average_rating": 4.9, - "title": "The Shawshank Redemption", - "genres": "Drama", - "update_time": "2021-08-20 20:44:11.094375+00:00", - }, - { - "movie_id": "movie_02", - "average_rating": 4.2, - "title": "The Shining", - "genres": "Horror", - "update_time": "2021-08-20 20:44:11.094375+00:00", - }, - ], - columns=["movie_id", "average_rating", "title", "genres", "update_time"], - ) - - et.ingest_from_df( - feature_ids=["movid_id", "average_rating", "title", "genres"], - feature_time="update_time", - df_source=df_source, - entity_id_field="movie_id", - ) - - return et - - -# [END aiplatform_sdk_entity_type_ingest_feature_values_from_df_sample] diff --git a/samples/model-builder/entity_type_ingest_from_df_sample_test.py b/samples/model-builder/entity_type_ingest_from_df_sample_test.py deleted file mode 100644 index 66e7f628f9..0000000000 --- a/samples/model-builder/entity_type_ingest_from_df_sample_test.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import entity_type_ingest_from_df_sample - -import test_constants as constants - - -def test_image_dataset_create_classification_sample( - mock_sdk_init, mock_get_entity_type, mock_ingest_from_df -): - entity_type_ingest_from_df_sample.entity_type_ingest_feature_values_from_df_with_feature_time_field_sample( - project=constants.PROJECT, - location=constants.LOCATION, - ) - - mock_sdk_init.assert_called_once_with( - project=constants.PROJECT, location=constants.LOCATION - ) - mock_ingest_from_df.assert_called_once_with( - ) diff --git a/samples/model-builder/featurestore_batch_serve_feature_values_to_df_sample.py b/samples/model-builder/featurestore_batch_serve_feature_values_to_df_sample.py deleted file mode 100644 index 557bc6f9d1..0000000000 --- a/samples/model-builder/featurestore_batch_serve_feature_values_to_df_sample.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import pandas as pd - -from google.cloud import aiplatform - -# [START aiplatform_sdk_featurestore_batch_serve_feature_values_to_df_sample] -def featurestore_batch_serve_feature_values_to_df_sample( - project: str, location: str, featurestore_name: str, -): - aiplatform.init(project=project, location=location) - - fs = aiplatform.Featurestore(featurestore_name=featurestore_name) - - read_instances_df = pd.DataFrame( - data=[ - ["alice", "movie_01", "2021-09-15T08:28:14Z"], - ["bob", "movie_02", "2021-09-15T08:28:14Z"], - ["dav", "movie_03", "2021-09-15T08:28:14Z"], - ["eve", "movie_04", "2021-09-15T08:28:14Z"], - ["alice", "movie_03", "2021-09-14T09:35:15Z"], - ["bob", "movie_04", "2020-02-14T09:35:15Z"], - ], - columns=["users_entity_type_id", "movies_entity_type_id", "timestamp"], - ) - read_instances_df = read_instances_df.astype({"timestamp": "datetime64"}) - - df = fs.batch_serve_to_df( - serving_feature_ids={ - "users_entity_type_id": [ - "user_age_feature_id", - "user_gender_feature_id", - "user_liked_genres_feature_id", - ], - "movies_entity_type_id": [ - "movie_title_feature_id", - "movie_genres_feature_id", - "movie_average_rating_feature_id", - ], - }, - read_instances_df=read_instances_df, - ) - - return df - - -# [END aiplatform_sdk_featurestore_batch_serve_feature_values_to_df_sample] From ecf8aad98cdd5eb72bc54db321f781b5e6f389f7 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Wed, 16 Feb 2022 21:02:55 -0800 Subject: [PATCH 04/10] fix: double to float64 --- google/cloud/aiplatform/utils/featurestore_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/aiplatform/utils/featurestore_utils.py b/google/cloud/aiplatform/utils/featurestore_utils.py index 195a12a733..392773661e 100644 --- a/google/cloud/aiplatform/utils/featurestore_utils.py +++ b/google/cloud/aiplatform/utils/featurestore_utils.py @@ -36,8 +36,8 @@ FEATURE_STORE_VALUE_TYPE_TO_BQ_DATA_TYPE_MAP = { "BOOL": {"field_type": "BOOL"}, "BOOL_ARRAY": {"field_type": "BOOL", "mode": "REPEATED"}, - "DOUBLE": {"field_type": "DOUBLE"}, - "DOUBLE_ARRAY": {"field_type": "DOUBLE", "mode": "REPEATED"}, + "DOUBLE": {"field_type": "FLOAT64"}, + "DOUBLE_ARRAY": {"field_type": "FLOAT64", "mode": "REPEATED"}, "INT64": {"field_type": "INT64"}, "INT64_ARRAY": {"field_type": "INT64", "mode": "REPEATED"}, "STRING": {"field_type": "STRING"}, From 6f9314dc7730d391a071cd235742a552ba34b41d Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Wed, 16 Feb 2022 22:37:21 -0800 Subject: [PATCH 05/10] fix: add job_config for repeated data type --- google/cloud/aiplatform/featurestore/entity_type.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index 1f49a66391..25997ef6f2 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -1276,7 +1276,14 @@ def ingest_from_df( ) schema.append(bq_schema_field) - job_config = bigquery.LoadJobConfig(schema=schema) + parquet_options = bigquery.format_options.ParquetOptions() + parquet_options.enable_list_inference = True + + job_config = bigquery.LoadJobConfig( + schema=schema, + source_format=bigquery.SourceFormat.PARQUET, + parquet_options=parquet_options, + ) job = bigquery_client.load_table_from_dataframe( dataframe=df_source, @@ -1385,7 +1392,7 @@ def read( feature_ids = [ feature_descriptor.id for feature_descriptor in header.feature_descriptors ] - + print(entity_views) return self._construct_dataframe( feature_ids=feature_ids, entity_views=entity_views, ) From b4ffd5994ea64eac9c023c72eb38fcb884425f00 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Wed, 16 Feb 2022 22:57:38 -0800 Subject: [PATCH 06/10] fix: remove print --- google/cloud/aiplatform/featurestore/entity_type.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index 25997ef6f2..a216eb5b36 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -1392,7 +1392,7 @@ def read( feature_ids = [ feature_descriptor.id for feature_descriptor in header.feature_descriptors ] - print(entity_views) + return self._construct_dataframe( feature_ids=feature_ids, entity_views=entity_views, ) From 80c97aaaf6de34b746a18c57017e2e448cc839fe Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Tue, 22 Feb 2022 15:11:28 -0800 Subject: [PATCH 07/10] fix: bq_schema and tests --- .../aiplatform/featurestore/entity_type.py | 32 +++++++++---------- tests/unit/aiplatform/test_featurestores.py | 2 ++ 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index a216eb5b36..5cb32f473c 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -1238,6 +1238,21 @@ def ingest_from_df( ) self.wait() + + feature_source_fields = feature_source_fields or {} + bq_schema = [] + for feature_id in feature_ids: + feature_field_name = feature_source_fields.get(feature_id, feature_id) + bq_data_type = utils.featurestore_utils.FEATURE_STORE_VALUE_TYPE_TO_BQ_DATA_TYPE_MAP[ + self.get_feature(feature_id).to_dict()["valueType"] + ] + bq_schema_field = bigquery.SchemaField( + name=feature_field_name, + field_type=bq_data_type["field_type"], + mode=bq_data_type.get("mode") or "NULLABLE", + ) + bq_schema.append(bq_schema_field) + entity_type_name_components = self._parse_resource_name(self.resource_name) featurestore_id, entity_type_id = ( entity_type_name_components["featurestore"], @@ -1259,28 +1274,13 @@ def ingest_from_df( temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset) - feature_source_fields = feature_source_fields or {} - try: - schema = [] - for feature_id in feature_ids: - feature_value_type = self.get_feature(feature_id).to_dict()["valueType"] - bq_data_type = utils.featurestore_utils.FEATURE_STORE_VALUE_TYPE_TO_BQ_DATA_TYPE_MAP[ - feature_value_type - ] - bq_schema_field = bigquery.SchemaField( - name=feature_source_fields.get(feature_id, feature_id), - field_type=bq_data_type["field_type"], - mode=bq_data_type.get("mode") or "NULLABLE", - ) - schema.append(bq_schema_field) - parquet_options = bigquery.format_options.ParquetOptions() parquet_options.enable_list_inference = True job_config = bigquery.LoadJobConfig( - schema=schema, + schema=bq_schema, source_format=bigquery.SourceFormat.PARQUET, parquet_options=parquet_options, ) diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index df7d544d95..eb6163d1ea 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -1672,6 +1672,7 @@ def test_ingest_from_gcs_with_invalid_gcs_source_type(self): @pytest.mark.usefixtures( "get_entity_type_mock", + "get_feature_mock", "bq_init_client_mock", "bq_init_dataset_mock", "bq_create_dataset_mock", @@ -1720,6 +1721,7 @@ def test_ingest_from_df_using_column(self, import_feature_values_mock): @pytest.mark.usefixtures( "get_entity_type_mock", + "get_feature_mock", "bq_init_client_mock", "bq_init_dataset_mock", "bq_create_dataset_mock", From 1536b3ac6dbc683a08e76cff0dd746fbcd637042 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Tue, 22 Feb 2022 16:21:54 -0800 Subject: [PATCH 08/10] fix: add unit tests for get_bq_schema and ic tests for string array ingestion validation --- .../aiplatform/featurestore/entity_type.py | 36 +++++++++++++++---- tests/system/aiplatform/test_featurestore.py | 14 ++++---- tests/unit/aiplatform/test_featurestores.py | 24 +++++++++++++ 3 files changed, 60 insertions(+), 14 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index 5cb32f473c..7fc0a13965 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -1243,13 +1243,9 @@ def ingest_from_df( bq_schema = [] for feature_id in feature_ids: feature_field_name = feature_source_fields.get(feature_id, feature_id) - bq_data_type = utils.featurestore_utils.FEATURE_STORE_VALUE_TYPE_TO_BQ_DATA_TYPE_MAP[ - self.get_feature(feature_id).to_dict()["valueType"] - ] - bq_schema_field = bigquery.SchemaField( - name=feature_field_name, - field_type=bq_data_type["field_type"], - mode=bq_data_type.get("mode") or "NULLABLE", + feature_value_type = self.get_feature(feature_id).to_dict()["valueType"] + bq_schema_field = self._get_bq_schema_field( + feature_field_name, feature_value_type ) bq_schema.append(bq_schema_field) @@ -1308,6 +1304,32 @@ def ingest_from_df( return entity_type_obj + @staticmethod + def _get_bq_schema_field( + name: str, feature_value_type: str + ) -> bigquery.SchemaField: + """Helper method to get BigQuery Schema Field. + + Args: + name (str): + Required. The name of the schema field, which can be either the feature_id, + or the field_name in BigQuery for the feature if different than the feature_id. + feature_value_type (str): + Required. The feature value_type. + + Returns: + bigquery.SchemaField: bigquery.SchemaField + """ + bq_data_type = utils.featurestore_utils.FEATURE_STORE_VALUE_TYPE_TO_BQ_DATA_TYPE_MAP[ + feature_value_type + ] + bq_schema_field = bigquery.SchemaField( + name=name, + field_type=bq_data_type["field_type"], + mode=bq_data_type.get("mode") or "NULLABLE", + ) + return bq_schema_field + @staticmethod def _instantiate_featurestore_online_client( location: Optional[str] = None, diff --git a/tests/system/aiplatform/test_featurestore.py b/tests/system/aiplatform/test_featurestore.py index cbbfd82efb..9adabcaf3b 100644 --- a/tests/system/aiplatform/test_featurestore.py +++ b/tests/system/aiplatform/test_featurestore.py @@ -219,7 +219,7 @@ def test_batch_create_features(self, shared_state): movie_feature_configs = { _TEST_MOVIE_TITLE_FEATURE_ID: {"value_type": "STRING"}, - _TEST_MOVIE_GENRES_FEATURE_ID: {"value_type": "STRING"}, + _TEST_MOVIE_GENRES_FEATURE_ID: {"value_type": "STRING_ARRAY"}, _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID: {"value_type": "DOUBLE"}, } @@ -277,14 +277,14 @@ def test_ingest_feature_values_from_df_using_feature_time_column_and_online_read "movie_id": "movie_01", "average_rating": 4.9, "title": "The Shawshank Redemption", - "genres": "Drama", + "genres": ["Drama"], "update_time": "2021-08-20 20:44:11.094375+00:00", }, { "movie_id": "movie_02", "average_rating": 4.2, "title": "The Shining", - "genres": "Horror", + "genres": ["Horror"], "update_time": "2021-08-20 20:44:11.094375+00:00", }, ], @@ -312,13 +312,13 @@ def test_ingest_feature_values_from_df_using_feature_time_column_and_online_read "movie_id": "movie_01", "average_rating": 4.9, "title": "The Shawshank Redemption", - "genres": "Drama", + "genres": ["Drama"], }, { "movie_id": "movie_02", "average_rating": 4.2, "title": "The Shining", - "genres": "Horror", + "genres": ["Horror"], }, ] expected_movie_entity_views_df_after_ingest = pd.DataFrame( @@ -350,13 +350,13 @@ def test_ingest_feature_values_from_df_using_feature_time_datetime_and_online_re "movie_id": "movie_03", "average_rating": 4.5, "title": "Cinema Paradiso", - "genres": "Romance", + "genres": ["Romance"], }, { "movie_id": "movie_04", "average_rating": 4.6, "title": "The Dark Knight", - "genres": "Action", + "genres": ["Action"], }, ], columns=["movie_id", "average_rating", "title", "genres"], diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index eb6163d1ea..569d9fe7f0 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -1771,6 +1771,30 @@ def test_ingest_from_df_using_datetime(self, import_feature_values_mock): request=true_import_feature_values_request, metadata=_TEST_REQUEST_METADATA, ) + @pytest.mark.parametrize( + "feature_value_type, expected_field_type, expected_mode", + [ + ("BOOL", "BOOL", "NULLABLE"), + ("BOOL_ARRAY", "BOOL", "REPEATED"), + ("DOUBLE", "FLOAT64", "NULLABLE"), + ("DOUBLE_ARRAY", "FLOAT64", "REPEATED"), + ("INT64", "INT64", "NULLABLE"), + ("INT64_ARRAY", "INT64", "REPEATED"), + ("STRING", "STRING", "NULLABLE"), + ("STRING_ARRAY", "STRING", "REPEATED"), + ("BYTES", "BYTES", "NULLABLE"), + ], + ) + def test_get_bq_schema_field( + self, feature_value_type, expected_field_type, expected_mode + ): + expected_bq_schema_field = bigquery.SchemaField( + name=_TEST_FEATURE_ID, field_type=expected_field_type, mode=expected_mode, + ) + assert expected_bq_schema_field == aiplatform.EntityType._get_bq_schema_field( + name=_TEST_FEATURE_ID, feature_value_type=feature_value_type + ) + @pytest.mark.usefixtures("get_entity_type_mock", "get_feature_mock") def test_read_single_entity(self, read_feature_values_mock): aiplatform.init(project=_TEST_PROJECT) From a28e7655df420549974e179f6082c5c3ff83f5dc Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Wed, 23 Feb 2022 16:57:02 -0800 Subject: [PATCH 09/10] fix compat service init misplace fs version --- google/cloud/aiplatform/compat/services/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/aiplatform/compat/services/__init__.py b/google/cloud/aiplatform/compat/services/__init__.py index f8545a688c..10b02c483c 100644 --- a/google/cloud/aiplatform/compat/services/__init__.py +++ b/google/cloud/aiplatform/compat/services/__init__.py @@ -87,8 +87,8 @@ # v1 dataset_service_client_v1, endpoint_service_client_v1, - featurestore_online_serving_service_client_v1beta1, - featurestore_service_client_v1beta1, + featurestore_online_serving_service_client_v1, + featurestore_service_client_v1, job_service_client_v1, metadata_service_client_v1, model_service_client_v1, @@ -99,8 +99,8 @@ # v1beta1 dataset_service_client_v1beta1, endpoint_service_client_v1beta1, - featurestore_online_serving_service_client_v1, - featurestore_service_client_v1, + featurestore_online_serving_service_client_v1beta1, + featurestore_service_client_v1beta1, job_service_client_v1beta1, model_service_client_v1beta1, pipeline_service_client_v1beta1, From aa3feb35a2a29f2b82a4168d27853c3de99aade1 Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Wed, 23 Feb 2022 17:22:42 -0800 Subject: [PATCH 10/10] fix: unit tests by adding assert for bq schema field mock --- tests/unit/aiplatform/test_featurestores.py | 72 ++++++++++++++++----- 1 file changed, 56 insertions(+), 16 deletions(-) diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index 569d9fe7f0..92c5be0f81 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -114,6 +114,8 @@ } _TEST_FEATURE_VALUE_TYPE = _TEST_INT_TYPE +_TEST_FEATURE_VALUE_TYPE_BQ_FIELD_TYPE = "INT64" +_TEST_FEATURE_VALUE_TYPE_BQ_MODE = "NULLABLE" _ARRAY_FEATURE_VALUE_TYPE_TO_GCA_TYPE_MAP = { _TEST_BOOL_ARR_TYPE: gca_types.BoolArray, @@ -211,6 +213,9 @@ "my_feature_id_1": {"value_type": _TEST_FEATURE_VALUE_TYPE_STR}, } +_TEST_IMPORTING_FEATURE_ID = "my_feature_id_1" +_TEST_IMPORTING_FEATURE_SOURCE_FIELD = "my_feature_id_1_source_field" + _TEST_IMPORTING_FEATURE_IDS = ["my_feature_id_1"] _TEST_IMPORTING_FEATURE_SOURCE_FIELDS = { @@ -363,22 +368,22 @@ def bq_init_dataset_mock(bq_dataset_mock): @pytest.fixture -def bq_create_dataset_mock(bq_init_client_mock): - with patch.object(bigquery.Client, "create_dataset") as bq_create_dataset_mock: +def bq_create_dataset_mock(bq_client_mock): + with patch.object(bq_client_mock, "create_dataset") as bq_create_dataset_mock: yield bq_create_dataset_mock @pytest.fixture -def bq_load_table_from_dataframe_mock(bq_init_client_mock): +def bq_load_table_from_dataframe_mock(bq_client_mock): with patch.object( - bigquery.Client, "load_table_from_dataframe" + bq_client_mock, "load_table_from_dataframe" ) as bq_load_table_from_dataframe_mock: yield bq_load_table_from_dataframe_mock @pytest.fixture -def bq_delete_dataset_mock(bq_init_client_mock): - with patch.object(bigquery.Client, "delete_dataset") as bq_delete_dataset_mock: +def bq_delete_dataset_mock(bq_client_mock): + with patch.object(bq_client_mock, "delete_dataset") as bq_delete_dataset_mock: yield bq_delete_dataset_mock @@ -396,9 +401,9 @@ def bqs_init_client_mock(bqs_client_mock): @pytest.fixture -def bqs_create_read_session(bqs_init_client_mock): +def bqs_create_read_session(bqs_client_mock): with patch.object( - bigquery_storage.BigQueryReadClient, "create_read_session" + bqs_client_mock, "create_read_session" ) as bqs_create_read_session: read_session_proto = gcbqs_stream.ReadSession() read_session_proto.streams = [gcbqs_stream.ReadStream()] @@ -406,6 +411,19 @@ def bqs_create_read_session(bqs_init_client_mock): yield bqs_create_read_session +@pytest.fixture +def bq_schema_field_mock(): + mock = MagicMock(bigquery.SchemaField) + yield mock + + +@pytest.fixture +def bq_init_schema_field_mock(bq_schema_field_mock): + with patch.object(bigquery, "SchemaField") as bq_init_schema_field_mock: + bq_init_schema_field_mock.return_value = bq_schema_field_mock + yield bq_init_schema_field_mock + + # All Featurestore Mocks @pytest.fixture def get_featurestore_mock(): @@ -1676,11 +1694,15 @@ def test_ingest_from_gcs_with_invalid_gcs_source_type(self): "bq_init_client_mock", "bq_init_dataset_mock", "bq_create_dataset_mock", - "bq_load_table_from_dataframe_mock", "bq_delete_dataset_mock", ) @patch("uuid.uuid4", uuid_mock) - def test_ingest_from_df_using_column(self, import_feature_values_mock): + def test_ingest_from_df_using_column( + self, + import_feature_values_mock, + bq_load_table_from_dataframe_mock, + bq_init_schema_field_mock, + ): aiplatform.init(project=_TEST_PROJECT) @@ -1702,7 +1724,7 @@ def test_ingest_from_df_using_column(self, import_feature_values_mock): f"{expecte_temp_bq_dataset_id}.{_TEST_ENTITY_TYPE_ID}" ) - true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( + expected_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( entity_type=_TEST_ENTITY_TYPE_NAME, feature_specs=[ gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec( @@ -1715,8 +1737,15 @@ def test_ingest_from_df_using_column(self, import_feature_values_mock): feature_time_field=_TEST_FEATURE_TIME_FIELD, ) + bq_init_schema_field_mock.assert_called_once_with( + name=_TEST_IMPORTING_FEATURE_SOURCE_FIELD, + field_type=_TEST_FEATURE_VALUE_TYPE_BQ_FIELD_TYPE, + mode=_TEST_FEATURE_VALUE_TYPE_BQ_MODE, + ) + import_feature_values_mock.assert_called_once_with( - request=true_import_feature_values_request, metadata=_TEST_REQUEST_METADATA, + request=expected_import_feature_values_request, + metadata=_TEST_REQUEST_METADATA, ) @pytest.mark.usefixtures( @@ -1725,11 +1754,15 @@ def test_ingest_from_df_using_column(self, import_feature_values_mock): "bq_init_client_mock", "bq_init_dataset_mock", "bq_create_dataset_mock", - "bq_load_table_from_dataframe_mock", "bq_delete_dataset_mock", ) @patch("uuid.uuid4", uuid_mock) - def test_ingest_from_df_using_datetime(self, import_feature_values_mock): + def test_ingest_from_df_using_datetime( + self, + import_feature_values_mock, + bq_load_table_from_dataframe_mock, + bq_init_schema_field_mock, + ): aiplatform.init(project=_TEST_PROJECT) my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) @@ -1754,7 +1787,7 @@ def test_ingest_from_df_using_datetime(self, import_feature_values_mock): timestamp_proto = timestamp_pb2.Timestamp() timestamp_proto.FromDatetime(_TEST_FEATURE_TIME_DATETIME) - true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( + expected_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( entity_type=_TEST_ENTITY_TYPE_NAME, feature_specs=[ gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec( @@ -1767,8 +1800,15 @@ def test_ingest_from_df_using_datetime(self, import_feature_values_mock): feature_time=timestamp_proto, ) + bq_init_schema_field_mock.assert_called_once_with( + name=_TEST_IMPORTING_FEATURE_SOURCE_FIELD, + field_type=_TEST_FEATURE_VALUE_TYPE_BQ_FIELD_TYPE, + mode=_TEST_FEATURE_VALUE_TYPE_BQ_MODE, + ) + import_feature_values_mock.assert_called_once_with( - request=true_import_feature_values_request, metadata=_TEST_REQUEST_METADATA, + request=expected_import_feature_values_request, + metadata=_TEST_REQUEST_METADATA, ) @pytest.mark.parametrize(