Skip to content

Commit

Permalink
feat: enable ingest from pd.DataFrame (googleapis#977)
Browse files Browse the repository at this point in the history
* feat: enable ingest from pd.DataFrame

* fix: remove bq create_dataset, docstrings, mocks

* fix: e2e_base project

* fix: delete two optional args, add note for temp bq dataset, revert deleting bq dataset create, add featurestore_extra_require, update ic tests to use online read to validate feature value ingestionfrom df

* fix: add a comment of call complete upon ingestion, update unit tests
  • Loading branch information
morgandu authored and ivanmkc committed Jan 28, 2022
1 parent 2cdc95b commit e6cca10
Show file tree
Hide file tree
Showing 4 changed files with 471 additions and 58 deletions.
192 changes: 150 additions & 42 deletions google/cloud/aiplatform/featurestore/entity_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import datetime
from typing import Dict, List, Optional, Sequence, Tuple, Union
import uuid

from google.auth import credentials as auth_credentials
from google.protobuf import field_mask_pb2
Expand All @@ -34,6 +35,7 @@
from google.cloud.aiplatform import utils
from google.cloud.aiplatform.utils import featurestore_utils

from google.cloud import bigquery

_LOGGER = base.Logger(__name__)
_ALL_FEATURE_IDS = "*"
Expand Down Expand Up @@ -795,23 +797,16 @@ def _validate_and_get_import_feature_values_request(
If not provided, the source column need to be the same as the Feature ID.
Example:
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
In case all features' source field and ID match:
feature_source_fields = None or {}
In case all features' source field and ID do not match:
feature_source_fields = {
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
'my_feature_id_2': 'my_feature_id_2_source_field',
'my_feature_id_3': 'my_feature_id_3_source_field',
}
}
Note:
The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field',
The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'.
In case some features' source field and ID do not match:
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
}
entity_id_field (str):
Optional. Source column that holds entity IDs. If not provided, entity
IDs are extracted from the column named ``entity_id``.
Expand Down Expand Up @@ -954,23 +949,16 @@ def ingest_from_bq(
If not provided, the source column need to be the same as the Feature ID.
Example:
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
In case all features' source field and ID match:
feature_source_fields = None or {}
In case all features' source field and ID do not match:
feature_source_fields = {
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
'my_feature_id_2': 'my_feature_id_2_source_field',
'my_feature_id_3': 'my_feature_id_3_source_field',
}
}
Note:
The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field',
The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'.
In case some features' source field and ID do not match:
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
}
entity_id_field (str):
Optional. Source column that holds entity IDs. If not provided, entity
IDs are extracted from the column named ``entity_id``.
Expand Down Expand Up @@ -1000,6 +988,7 @@ def ingest_from_bq(
EntityType - The entityType resource object with feature values imported.
"""

bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri)

import_feature_values_request = self._validate_and_get_import_feature_values_request(
Expand Down Expand Up @@ -1065,23 +1054,16 @@ def ingest_from_gcs(
If not provided, the source column need to be the same as the Feature ID.
Example:
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
In case all features' source field and ID match:
feature_source_fields = None or {}
In case all features' source field and ID do not match:
feature_source_fields = {
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
'my_feature_id_2': 'my_feature_id_2_source_field',
'my_feature_id_3': 'my_feature_id_3_source_field',
}
}
Note:
The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field',
The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'.
In case some features' source field and ID do not match:
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
}
entity_id_field (str):
Optional. Source column that holds entity IDs. If not provided, entity
IDs are extracted from the column named ``entity_id``.
Expand Down Expand Up @@ -1146,6 +1128,132 @@ def ingest_from_gcs(
request_metadata=request_metadata,
)

def ingest_from_df(
self,
feature_ids: List[str],
feature_time: Union[str, datetime.datetime],
df_source: "pd.DataFrame", # noqa: F821 - skip check for undefined name 'pd'
feature_source_fields: Optional[Dict[str, str]] = None,
entity_id_field: Optional[str] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
) -> "EntityType":
"""Ingest feature values from DataFrame.
Note:
Calling this method will automatically create and delete a temporary
bigquery dataset in the same GCP project, which will be used
as the intermediary storage for ingesting feature values
from dataframe to featurestore.
The call will return upon ingestion completes, where the
feature values will be ingested into the entity_type.
Args:
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
EntityType, or the request will fail.
feature_time (Union[str, datetime.datetime]):
Required. The feature_time can be one of:
- The source column that holds the Feature
timestamp for all Feature values in each entity.
Note:
The dtype of the source column should be `datetime64`.
- A single Feature timestamp for all entities
being imported. The timestamp must not have
higher than millisecond precision.
Example:
feature_time = datetime.datetime(year=2022, month=1, day=1, hour=11, minute=59, second=59)
or
feature_time_str = datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds")
feature_time = datetime.datetime.strptime(feature_time_str, "%Y-%m-%d %H:%M:%S.%f")
df_source (pd.DataFrame):
Required. Pandas DataFrame containing the source data for ingestion.
feature_source_fields (Dict[str, str]):
Optional. User defined dictionary to map ID of the Feature for importing values
of to the source column for getting the Feature values from.
Specify the features whose ID and source column are not the same.
If not provided, the source column need to be the same as the Feature ID.
Example:
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
}
Note:
The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field',
The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'.
entity_id_field (str):
Optional. Source column that holds entity IDs. If not provided, entity
IDs are extracted from the column named ``entity_id``.
request_metadata (Sequence[Tuple[str, str]]):
Optional. Strings which should be sent along with the request as metadata.
Returns:
EntityType - The entityType resource object with feature values imported.
"""
try:
import pyarrow # noqa: F401 - skip check for 'pyarrow' which is required when using 'google.cloud.bigquery'
except ImportError:
raise ImportError(
f"Pyarrow is not installed. Please install pyarrow to use "
f"{self.ingest_from_df.__name__}"
)

bigquery_client = bigquery.Client(
project=self.project, credentials=self.credentials
)

entity_type_name_components = self._parse_resource_name(self.resource_name)
featurestore_id, entity_type_id = (
entity_type_name_components["featurestore"],
entity_type_name_components["entity_type"],
)

temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace(
"-", "_"
)
temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[
:1024
]
temp_bq_table_id = f"{temp_bq_dataset_id}.{entity_type_id}"

temp_bq_dataset = bigquery.Dataset(dataset_ref=temp_bq_dataset_id)
temp_bq_dataset.location = self.location

temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset)

try:
job = bigquery_client.load_table_from_dataframe(
dataframe=df_source, destination=temp_bq_table_id
)
job.result()

entity_type_obj = self.ingest_from_bq(
feature_ids=feature_ids,
feature_time=feature_time,
bq_source_uri=f"bq://{temp_bq_table_id}",
feature_source_fields=feature_source_fields,
entity_id_field=entity_id_field,
request_metadata=request_metadata,
)

finally:
bigquery_client.delete_dataset(
dataset=temp_bq_dataset.dataset_id, delete_contents=True,
)

return entity_type_obj

@staticmethod
def _instantiate_featurestore_online_client(
location: Optional[str] = None,
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
"werkzeug >= 2.0.0",
"tensorflow >=2.4.0",
]
featurestore_extra_require = ["pandas >= 1.0.0", "pyarrow >= 6.0.1"]

full_extra_require = list(
set(
tensorboard_extra_require
+ metadata_extra_require
+ xai_extra_require
+ lit_extra_require
+ featurestore_extra_require
)
)
testing_extra_require = (
Expand Down
Loading

0 comments on commit e6cca10

Please sign in to comment.