Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable ingest from pd.DataFrame #977

Merged
merged 6 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 150 additions & 42 deletions google/cloud/aiplatform/featurestore/entity_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import datetime
from typing import Dict, List, Optional, Sequence, Tuple, Union
import uuid

from google.auth import credentials as auth_credentials
from google.protobuf import field_mask_pb2
Expand All @@ -34,6 +35,7 @@
from google.cloud.aiplatform import utils
from google.cloud.aiplatform.utils import featurestore_utils

from google.cloud import bigquery

_LOGGER = base.Logger(__name__)
_ALL_FEATURE_IDS = "*"
Expand Down Expand Up @@ -795,23 +797,16 @@ def _validate_and_get_import_feature_values_request(
If not provided, the source column need to be the same as the Feature ID.

Example:
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']

feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']

In case all features' source field and ID match:
feature_source_fields = None or {}

In case all features' source field and ID do not match:
feature_source_fields = {
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
'my_feature_id_2': 'my_feature_id_2_source_field',
'my_feature_id_3': 'my_feature_id_3_source_field',
}
}

Note:
The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field',
The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'.

In case some features' source field and ID do not match:
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
}
entity_id_field (str):
Optional. Source column that holds entity IDs. If not provided, entity
IDs are extracted from the column named ``entity_id``.
Expand Down Expand Up @@ -954,23 +949,16 @@ def ingest_from_bq(
If not provided, the source column need to be the same as the Feature ID.

Example:
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']

feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']

In case all features' source field and ID match:
feature_source_fields = None or {}

In case all features' source field and ID do not match:
feature_source_fields = {
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
'my_feature_id_2': 'my_feature_id_2_source_field',
'my_feature_id_3': 'my_feature_id_3_source_field',
}
}

Note:
The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field',
The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'.

In case some features' source field and ID do not match:
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
}
entity_id_field (str):
Optional. Source column that holds entity IDs. If not provided, entity
IDs are extracted from the column named ``entity_id``.
Expand Down Expand Up @@ -1000,6 +988,7 @@ def ingest_from_bq(
EntityType - The entityType resource object with feature values imported.

"""

bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri)

import_feature_values_request = self._validate_and_get_import_feature_values_request(
Expand Down Expand Up @@ -1065,23 +1054,16 @@ def ingest_from_gcs(
If not provided, the source column need to be the same as the Feature ID.

Example:
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']

feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']

In case all features' source field and ID match:
feature_source_fields = None or {}

In case all features' source field and ID do not match:
feature_source_fields = {
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
'my_feature_id_2': 'my_feature_id_2_source_field',
'my_feature_id_3': 'my_feature_id_3_source_field',
}
}

Note:
The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field',
The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'.

In case some features' source field and ID do not match:
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
}
entity_id_field (str):
Optional. Source column that holds entity IDs. If not provided, entity
IDs are extracted from the column named ``entity_id``.
Expand Down Expand Up @@ -1146,6 +1128,132 @@ def ingest_from_gcs(
request_metadata=request_metadata,
)

def ingest_from_df(
self,
feature_ids: List[str],
feature_time: Union[str, datetime.datetime],
df_source: "pd.DataFrame", # noqa: F821 - skip check for undefined name 'pd'
feature_source_fields: Optional[Dict[str, str]] = None,
entity_id_field: Optional[str] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
) -> "EntityType":
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to mention whether this function wait for ingestion complete (in other words, do users expected data is available once the function return)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

"""Ingest feature values from DataFrame.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also state that a temporary BQ table is created.

Copy link
Contributor Author

@morgandu morgandu Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a Note: section to state the temporary BQ dataset


Note:
Calling this method will automatically create and delete a temporary
bigquery dataset in the same GCP project, which will be used
as the intermediary storage for ingesting feature values
from dataframe to featurestore.

The call will return upon ingestion completes, where the
feature values will be ingested into the entity_type.

Args:
feature_ids (List[str]):
Required. IDs of the Feature to import values
morgandu marked this conversation as resolved.
Show resolved Hide resolved
of. The Features must exist in the target
EntityType, or the request will fail.
feature_time (Union[str, datetime.datetime]):
Required. The feature_time can be one of:
- The source column that holds the Feature
timestamp for all Feature values in each entity.

Note:
The dtype of the source column should be `datetime64`.

- A single Feature timestamp for all entities
being imported. The timestamp must not have
higher than millisecond precision.

Example:
feature_time = datetime.datetime(year=2022, month=1, day=1, hour=11, minute=59, second=59)
or
feature_time_str = datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds")
feature_time = datetime.datetime.strptime(feature_time_str, "%Y-%m-%d %H:%M:%S.%f")

df_source (pd.DataFrame):
Required. Pandas DataFrame containing the source data for ingestion.
feature_source_fields (Dict[str, str]):
Optional. User defined dictionary to map ID of the Feature for importing values
of to the source column for getting the Feature values from.

Specify the features whose ID and source column are not the same.
If not provided, the source column need to be the same as the Feature ID.

Example:
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']

feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
}

Note:
The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field',
The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'.

entity_id_field (str):
Optional. Source column that holds entity IDs. If not provided, entity
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Does this make sense to have the default value to entity_id and drop the optional?

Copy link
Contributor Author

@morgandu morgandu Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional->Required will also lead to breaking change for ingest_from_bq and ingest_from_gcs.

Prefer to use service default over manually defined default.

IDs are extracted from the column named ``entity_id``.
request_metadata (Sequence[Tuple[str, str]]):
Optional. Strings which should be sent along with the request as metadata.

Returns:
EntityType - The entityType resource object with feature values imported.

"""
try:
import pyarrow # noqa: F401 - skip check for 'pyarrow' which is required when using 'google.cloud.bigquery'
morgandu marked this conversation as resolved.
Show resolved Hide resolved
except ImportError:
raise ImportError(
f"Pyarrow is not installed. Please install pyarrow to use "
f"{self.ingest_from_df.__name__}"
)

bigquery_client = bigquery.Client(
project=self.project, credentials=self.credentials
)

entity_type_name_components = self._parse_resource_name(self.resource_name)
featurestore_id, entity_type_id = (
entity_type_name_components["featurestore"],
entity_type_name_components["entity_type"],
)

temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace(
"-", "_"
)
temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like self.project is more consistent here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.project returns the project number, but bq dataset need a project ID

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. This is an issue moving forward because the project used to construct EntityType might not match the project ID set in init or, if project is not set using init, the project ID returned from google.auth.default.

Additionally, we don't stop the user from using a project number when setting init.

We shouldn't block this PR as there is a workaround but we should capture this issue in a ticket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, will address this in a separate ticket/PR

:1024
]
temp_bq_table_id = f"{temp_bq_dataset_id}.{entity_type_id}"

temp_bq_dataset = bigquery.Dataset(dataset_ref=temp_bq_dataset_id)
temp_bq_dataset.location = self.location

temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset)

try:
job = bigquery_client.load_table_from_dataframe(
morgandu marked this conversation as resolved.
Show resolved Hide resolved
dataframe=df_source, destination=temp_bq_table_id
)
job.result()

entity_type_obj = self.ingest_from_bq(
feature_ids=feature_ids,
feature_time=feature_time,
bq_source_uri=f"bq://{temp_bq_table_id}",
feature_source_fields=feature_source_fields,
entity_id_field=entity_id_field,
request_metadata=request_metadata,
)

finally:
bigquery_client.delete_dataset(
dataset=temp_bq_dataset.dataset_id, delete_contents=True,
)

return entity_type_obj

@staticmethod
def _instantiate_featurestore_online_client(
location: Optional[str] = None,
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
"werkzeug >= 2.0.0",
"tensorflow >=2.4.0",
]
featurestore_extra_require = ["pandas >= 1.0.0", "pyarrow >= 6.0.1"]

full_extra_require = list(
set(
tensorboard_extra_require
+ metadata_extra_require
+ xai_extra_require
+ lit_extra_require
+ featurestore_extra_require
)
)
testing_extra_require = (
Expand Down
Loading