Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enforce bq SchemaField field_type and mode using feature value_type #1019

Merged
merged 11 commits into from
Feb 24, 2022
8 changes: 4 additions & 4 deletions google/cloud/aiplatform/compat/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@
# v1
dataset_service_client_v1,
endpoint_service_client_v1,
featurestore_online_serving_service_client_v1beta1,
featurestore_service_client_v1beta1,
featurestore_online_serving_service_client_v1,
featurestore_service_client_v1,
job_service_client_v1,
metadata_service_client_v1,
model_service_client_v1,
Expand All @@ -99,8 +99,8 @@
# v1beta1
dataset_service_client_v1beta1,
endpoint_service_client_v1beta1,
featurestore_online_serving_service_client_v1,
featurestore_service_client_v1,
featurestore_online_serving_service_client_v1beta1,
featurestore_service_client_v1beta1,
job_service_client_v1beta1,
model_service_client_v1beta1,
pipeline_service_client_v1beta1,
Expand Down
51 changes: 50 additions & 1 deletion google/cloud/aiplatform/featurestore/entity_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,17 @@ def ingest_from_df(
)

self.wait()

feature_source_fields = feature_source_fields or {}
bq_schema = []
for feature_id in feature_ids:
feature_field_name = feature_source_fields.get(feature_id, feature_id)
feature_value_type = self.get_feature(feature_id).to_dict()["valueType"]
bq_schema_field = self._get_bq_schema_field(
feature_field_name, feature_value_type
)
bq_schema.append(bq_schema_field)

entity_type_name_components = self._parse_resource_name(self.resource_name)
featurestore_id, entity_type_id = (
entity_type_name_components["featurestore"],
Expand All @@ -1260,8 +1271,20 @@ def ingest_from_df(
temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset)

try:

parquet_options = bigquery.format_options.ParquetOptions()
parquet_options.enable_list_inference = True

job_config = bigquery.LoadJobConfig(
schema=bq_schema,
source_format=bigquery.SourceFormat.PARQUET,
parquet_options=parquet_options,
)

job = bigquery_client.load_table_from_dataframe(
Copy link
Contributor

@ivanmkc ivanmkc Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like there's no assertion that the correct args are passed to bq_load_table_from_dataframe_mock. Perhaps add to a bq_load_table_from_dataframe_mock.assert_called_once_with(...) to test_ingest_from_df_using_column and test_ingest_from_df_using_datetime.

dataframe=df_source, destination=temp_bq_table_id
dataframe=df_source,
destination=temp_bq_table_id,
job_config=job_config,
)
job.result()

Expand All @@ -1281,6 +1304,32 @@ def ingest_from_df(

return entity_type_obj

@staticmethod
def _get_bq_schema_field(
name: str, feature_value_type: str
) -> bigquery.SchemaField:
"""Helper method to get BigQuery Schema Field.

Args:
name (str):
Required. The name of the schema field, which can be either the feature_id,
or the field_name in BigQuery for the feature if different than the feature_id.
feature_value_type (str):
Required. The feature value_type.

Returns:
bigquery.SchemaField: bigquery.SchemaField
"""
bq_data_type = utils.featurestore_utils.FEATURE_STORE_VALUE_TYPE_TO_BQ_DATA_TYPE_MAP[
feature_value_type
]
bq_schema_field = bigquery.SchemaField(
name=name,
field_type=bq_data_type["field_type"],
mode=bq_data_type.get("mode") or "NULLABLE",
)
return bq_schema_field

@staticmethod
def _instantiate_featurestore_online_client(
location: Optional[str] = None,
Expand Down
12 changes: 12 additions & 0 deletions google/cloud/aiplatform/utils/featurestore_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@

_FEATURE_VALUE_TYPE_UNSPECIFIED = "VALUE_TYPE_UNSPECIFIED"

FEATURE_STORE_VALUE_TYPE_TO_BQ_DATA_TYPE_MAP = {
"BOOL": {"field_type": "BOOL"},
"BOOL_ARRAY": {"field_type": "BOOL", "mode": "REPEATED"},
"DOUBLE": {"field_type": "FLOAT64"},
"DOUBLE_ARRAY": {"field_type": "FLOAT64", "mode": "REPEATED"},
"INT64": {"field_type": "INT64"},
"INT64_ARRAY": {"field_type": "INT64", "mode": "REPEATED"},
"STRING": {"field_type": "STRING"},
"STRING_ARRAY": {"field_type": "STRING", "mode": "REPEATED"},
"BYTES": {"field_type": "BYTES"},
}


def validate_id(resource_id: str) -> None:
"""Validates feature store resource ID pattern.
Expand Down
14 changes: 7 additions & 7 deletions tests/system/aiplatform/test_featurestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def test_batch_create_features(self, shared_state):

movie_feature_configs = {
_TEST_MOVIE_TITLE_FEATURE_ID: {"value_type": "STRING"},
_TEST_MOVIE_GENRES_FEATURE_ID: {"value_type": "STRING"},
_TEST_MOVIE_GENRES_FEATURE_ID: {"value_type": "STRING_ARRAY"},
_TEST_MOVIE_AVERAGE_RATING_FEATURE_ID: {"value_type": "DOUBLE"},
}

Expand Down Expand Up @@ -277,14 +277,14 @@ def test_ingest_feature_values_from_df_using_feature_time_column_and_online_read
"movie_id": "movie_01",
"average_rating": 4.9,
"title": "The Shawshank Redemption",
"genres": "Drama",
"genres": ["Drama"],
"update_time": "2021-08-20 20:44:11.094375+00:00",
},
{
"movie_id": "movie_02",
"average_rating": 4.2,
"title": "The Shining",
"genres": "Horror",
"genres": ["Horror"],
"update_time": "2021-08-20 20:44:11.094375+00:00",
},
],
Expand Down Expand Up @@ -312,13 +312,13 @@ def test_ingest_feature_values_from_df_using_feature_time_column_and_online_read
"movie_id": "movie_01",
"average_rating": 4.9,
"title": "The Shawshank Redemption",
"genres": "Drama",
"genres": ["Drama"],
},
{
"movie_id": "movie_02",
"average_rating": 4.2,
"title": "The Shining",
"genres": "Horror",
"genres": ["Horror"],
},
]
expected_movie_entity_views_df_after_ingest = pd.DataFrame(
Expand Down Expand Up @@ -350,13 +350,13 @@ def test_ingest_feature_values_from_df_using_feature_time_datetime_and_online_re
"movie_id": "movie_03",
"average_rating": 4.5,
"title": "Cinema Paradiso",
"genres": "Romance",
"genres": ["Romance"],
},
{
"movie_id": "movie_04",
"average_rating": 4.6,
"title": "The Dark Knight",
"genres": "Action",
"genres": ["Action"],
},
],
columns=["movie_id", "average_rating", "title", "genres"],
Expand Down
98 changes: 82 additions & 16 deletions tests/unit/aiplatform/test_featurestores.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@
}

_TEST_FEATURE_VALUE_TYPE = _TEST_INT_TYPE
_TEST_FEATURE_VALUE_TYPE_BQ_FIELD_TYPE = "INT64"
_TEST_FEATURE_VALUE_TYPE_BQ_MODE = "NULLABLE"

_ARRAY_FEATURE_VALUE_TYPE_TO_GCA_TYPE_MAP = {
_TEST_BOOL_ARR_TYPE: gca_types.BoolArray,
Expand Down Expand Up @@ -211,6 +213,9 @@
"my_feature_id_1": {"value_type": _TEST_FEATURE_VALUE_TYPE_STR},
}

_TEST_IMPORTING_FEATURE_ID = "my_feature_id_1"
_TEST_IMPORTING_FEATURE_SOURCE_FIELD = "my_feature_id_1_source_field"

_TEST_IMPORTING_FEATURE_IDS = ["my_feature_id_1"]

_TEST_IMPORTING_FEATURE_SOURCE_FIELDS = {
Expand Down Expand Up @@ -363,22 +368,22 @@ def bq_init_dataset_mock(bq_dataset_mock):


@pytest.fixture
def bq_create_dataset_mock(bq_init_client_mock):
with patch.object(bigquery.Client, "create_dataset") as bq_create_dataset_mock:
def bq_create_dataset_mock(bq_client_mock):
with patch.object(bq_client_mock, "create_dataset") as bq_create_dataset_mock:
yield bq_create_dataset_mock


@pytest.fixture
def bq_load_table_from_dataframe_mock(bq_init_client_mock):
def bq_load_table_from_dataframe_mock(bq_client_mock):
with patch.object(
bigquery.Client, "load_table_from_dataframe"
bq_client_mock, "load_table_from_dataframe"
) as bq_load_table_from_dataframe_mock:
yield bq_load_table_from_dataframe_mock


@pytest.fixture
def bq_delete_dataset_mock(bq_init_client_mock):
with patch.object(bigquery.Client, "delete_dataset") as bq_delete_dataset_mock:
def bq_delete_dataset_mock(bq_client_mock):
with patch.object(bq_client_mock, "delete_dataset") as bq_delete_dataset_mock:
yield bq_delete_dataset_mock


Expand All @@ -396,16 +401,29 @@ def bqs_init_client_mock(bqs_client_mock):


@pytest.fixture
def bqs_create_read_session(bqs_init_client_mock):
def bqs_create_read_session(bqs_client_mock):
with patch.object(
bigquery_storage.BigQueryReadClient, "create_read_session"
bqs_client_mock, "create_read_session"
) as bqs_create_read_session:
read_session_proto = gcbqs_stream.ReadSession()
read_session_proto.streams = [gcbqs_stream.ReadStream()]
bqs_create_read_session.return_value = read_session_proto
yield bqs_create_read_session


@pytest.fixture
def bq_schema_field_mock():
mock = MagicMock(bigquery.SchemaField)
yield mock


@pytest.fixture
def bq_init_schema_field_mock(bq_schema_field_mock):
with patch.object(bigquery, "SchemaField") as bq_init_schema_field_mock:
bq_init_schema_field_mock.return_value = bq_schema_field_mock
yield bq_init_schema_field_mock


# All Featurestore Mocks
@pytest.fixture
def get_featurestore_mock():
Expand Down Expand Up @@ -1672,14 +1690,19 @@ def test_ingest_from_gcs_with_invalid_gcs_source_type(self):

@pytest.mark.usefixtures(
"get_entity_type_mock",
"get_feature_mock",
"bq_init_client_mock",
"bq_init_dataset_mock",
"bq_create_dataset_mock",
"bq_load_table_from_dataframe_mock",
"bq_delete_dataset_mock",
)
@patch("uuid.uuid4", uuid_mock)
def test_ingest_from_df_using_column(self, import_feature_values_mock):
def test_ingest_from_df_using_column(
self,
import_feature_values_mock,
bq_load_table_from_dataframe_mock,
bq_init_schema_field_mock,
):

aiplatform.init(project=_TEST_PROJECT)

Expand All @@ -1701,7 +1724,7 @@ def test_ingest_from_df_using_column(self, import_feature_values_mock):
f"{expecte_temp_bq_dataset_id}.{_TEST_ENTITY_TYPE_ID}"
)

true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest(
expected_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest(
entity_type=_TEST_ENTITY_TYPE_NAME,
feature_specs=[
gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec(
Expand All @@ -1714,20 +1737,32 @@ def test_ingest_from_df_using_column(self, import_feature_values_mock):
feature_time_field=_TEST_FEATURE_TIME_FIELD,
)

bq_init_schema_field_mock.assert_called_once_with(
name=_TEST_IMPORTING_FEATURE_SOURCE_FIELD,
field_type=_TEST_FEATURE_VALUE_TYPE_BQ_FIELD_TYPE,
mode=_TEST_FEATURE_VALUE_TYPE_BQ_MODE,
)

import_feature_values_mock.assert_called_once_with(
request=true_import_feature_values_request, metadata=_TEST_REQUEST_METADATA,
request=expected_import_feature_values_request,
metadata=_TEST_REQUEST_METADATA,
)

@pytest.mark.usefixtures(
"get_entity_type_mock",
"get_feature_mock",
"bq_init_client_mock",
"bq_init_dataset_mock",
"bq_create_dataset_mock",
"bq_load_table_from_dataframe_mock",
"bq_delete_dataset_mock",
)
@patch("uuid.uuid4", uuid_mock)
def test_ingest_from_df_using_datetime(self, import_feature_values_mock):
def test_ingest_from_df_using_datetime(
self,
import_feature_values_mock,
bq_load_table_from_dataframe_mock,
bq_init_schema_field_mock,
):
aiplatform.init(project=_TEST_PROJECT)

my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME)
Expand All @@ -1752,7 +1787,7 @@ def test_ingest_from_df_using_datetime(self, import_feature_values_mock):
timestamp_proto = timestamp_pb2.Timestamp()
timestamp_proto.FromDatetime(_TEST_FEATURE_TIME_DATETIME)

true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest(
expected_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest(
entity_type=_TEST_ENTITY_TYPE_NAME,
feature_specs=[
gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec(
Expand All @@ -1765,8 +1800,39 @@ def test_ingest_from_df_using_datetime(self, import_feature_values_mock):
feature_time=timestamp_proto,
)

bq_init_schema_field_mock.assert_called_once_with(
name=_TEST_IMPORTING_FEATURE_SOURCE_FIELD,
field_type=_TEST_FEATURE_VALUE_TYPE_BQ_FIELD_TYPE,
mode=_TEST_FEATURE_VALUE_TYPE_BQ_MODE,
)

import_feature_values_mock.assert_called_once_with(
request=true_import_feature_values_request, metadata=_TEST_REQUEST_METADATA,
request=expected_import_feature_values_request,
metadata=_TEST_REQUEST_METADATA,
)

@pytest.mark.parametrize(
"feature_value_type, expected_field_type, expected_mode",
[
("BOOL", "BOOL", "NULLABLE"),
("BOOL_ARRAY", "BOOL", "REPEATED"),
("DOUBLE", "FLOAT64", "NULLABLE"),
("DOUBLE_ARRAY", "FLOAT64", "REPEATED"),
("INT64", "INT64", "NULLABLE"),
("INT64_ARRAY", "INT64", "REPEATED"),
("STRING", "STRING", "NULLABLE"),
("STRING_ARRAY", "STRING", "REPEATED"),
("BYTES", "BYTES", "NULLABLE"),
],
)
def test_get_bq_schema_field(
self, feature_value_type, expected_field_type, expected_mode
):
expected_bq_schema_field = bigquery.SchemaField(
name=_TEST_FEATURE_ID, field_type=expected_field_type, mode=expected_mode,
)
assert expected_bq_schema_field == aiplatform.EntityType._get_bq_schema_field(
name=_TEST_FEATURE_ID, feature_value_type=feature_value_type
)

@pytest.mark.usefixtures("get_entity_type_mock", "get_feature_mock")
Expand Down