Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable ingest from pd.DataFrame #977

Merged
merged 6 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions google/cloud/aiplatform/featurestore/entity_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import datetime
from typing import Dict, List, Optional, Sequence, Tuple, Union
import uuid

from google.auth import credentials as auth_credentials
from google.protobuf import field_mask_pb2
Expand All @@ -34,6 +35,7 @@
from google.cloud.aiplatform import utils
from google.cloud.aiplatform.utils import featurestore_utils

from google.cloud import bigquery

_LOGGER = base.Logger(__name__)
_ALL_FEATURE_IDS = "*"
Expand Down Expand Up @@ -1000,6 +1002,7 @@ def ingest_from_bq(
EntityType - The entityType resource object with feature values imported.

"""

bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri)

import_feature_values_request = self._validate_and_get_import_feature_values_request(
Expand Down Expand Up @@ -1146,6 +1149,148 @@ def ingest_from_gcs(
request_metadata=request_metadata,
)

def ingest_from_df(
self,
feature_ids: List[str],
feature_time: Union[str, datetime.datetime],
df_source: "pd.DataFrame", # noqa: F821 - skip check for undefined name 'pd'
feature_source_fields: Optional[Dict[str, str]] = None,
entity_id_field: Optional[str] = None,
disable_online_serving: Optional[bool] = None,
morgandu marked this conversation as resolved.
Show resolved Hide resolved
worker_count: Optional[int] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
) -> "EntityType":
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to mention whether this function wait for ingestion complete (in other words, do users expected data is available once the function return)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

"""Ingest feature values from DataFrame.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also state that a temporary BQ table is created.

Copy link
Contributor Author

@morgandu morgandu Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a Note: section to state the temporary BQ dataset


Args:
feature_ids (List[str]):
Required. IDs of the Feature to import values
morgandu marked this conversation as resolved.
Show resolved Hide resolved
of. The Features must exist in the target
EntityType, or the request will fail.
feature_time (Union[str, datetime.datetime]):
Required. The feature_time can be one of:
- The source column that holds the Feature
timestamp for all Feature values in each entity.

Note:
The dtype of the source column should be `datetime64`.

- A single Feature timestamp for all entities
being imported. The timestamp must not have
higher than millisecond precision.

Example:
feature_time = datetime.datetime(year=2022, month=1, day=1, hour=11, minute=59, second=59)
or
feature_time_str = datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds")
feature_time = datetime.datetime.strptime(feature_time_str, "%Y-%m-%d %H:%M:%S.%f")

df_source (pd.DataFrame):
Required. Pandas DataFrame containing the source data for ingestion.
feature_source_fields (Dict[str, str]):
Optional. User defined dictionary to map ID of the Feature for importing values
of to the source column for getting the Feature values from.

Specify the features whose ID and source column are not the same.
If not provided, the source column need to be the same as the Feature ID.

Example:

feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']

In case all features' source field and ID match:
feature_source_fields = None or {}

In case all features' source field and ID do not match:
morgandu marked this conversation as resolved.
Show resolved Hide resolved
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
'my_feature_id_2': 'my_feature_id_2_source_field',
'my_feature_id_3': 'my_feature_id_3_source_field',
}

In case some features' source field and ID do not match:
feature_source_fields = {
'my_feature_id_1': 'my_feature_id_1_source_field',
}
entity_id_field (str):
Optional. Source column that holds entity IDs. If not provided, entity
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Does this make sense to have the default value to entity_id and drop the optional?

Copy link
Contributor Author

@morgandu morgandu Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional->Required will also lead to breaking change for ingest_from_bq and ingest_from_gcs.

Prefer to use service default over manually defined default.

IDs are extracted from the column named ``entity_id``.
disable_online_serving (bool):
Optional. If set, data will not be imported for online
serving. This is typically used for backfilling,
where Feature generation timestamps are not in
the timestamp range needed for online serving.
worker_count (int):
Optional. Specifies the number of workers that are used
to write data to the Featurestore. Consider the
online serving capacity that you require to
achieve the desired import throughput without
interfering with online serving. The value must
be positive, and less than or equal to 100. If
not set, defaults to using 1 worker. The low
count ensures minimal impact on online serving
performance.
request_metadata (Sequence[Tuple[str, str]]):
Optional. Strings which should be sent along with the request as metadata.

Returns:
EntityType - The entityType resource object with feature values imported.

"""
try:
import pyarrow # noqa: F401 - skip check for 'pyarrow' which is required when using 'google.cloud.bigquery'
morgandu marked this conversation as resolved.
Show resolved Hide resolved
except ImportError:
raise ImportError(
f"Pyarrow is not installed. Please install pyarrow to use "
f"{self.ingest_from_df.__name__}"
)

bigquery_client = bigquery.Client(
project=self.project, credentials=self.credentials
)

entity_type_name_components = self._parse_resource_name(self.resource_name)
featurestore_id, entity_type_id = (
entity_type_name_components["featurestore"],
entity_type_name_components["entity_type"],
)

temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace(
"-", "_"
)
temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like self.project is more consistent here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.project returns the project number, but bq dataset need a project ID

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. This is an issue moving forward because the project used to construct EntityType might not match the project ID set in init or, if project is not set using init, the project ID returned from google.auth.default.

Additionally, we don't stop the user from using a project number when setting init.

We shouldn't block this PR as there is a workaround but we should capture this issue in a ticket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, will address this in a separate ticket/PR

:1024
]
temp_bq_table_id = f"{temp_bq_dataset_id}.{entity_type_id}"

temp_bq_dataset = bigquery.Dataset(dataset_ref=temp_bq_dataset_id)
temp_bq_dataset.location = self.location
temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset)

try:
job = bigquery_client.load_table_from_dataframe(
morgandu marked this conversation as resolved.
Show resolved Hide resolved
dataframe=df_source, destination=temp_bq_table_id
)
job.result()

entity_type_obj = self.ingest_from_bq(
feature_ids=feature_ids,
feature_time=feature_time,
bq_source_uri=f"bq://{temp_bq_table_id}",
feature_source_fields=feature_source_fields,
entity_id_field=entity_id_field,
disable_online_serving=disable_online_serving,
worker_count=worker_count,
request_metadata=request_metadata,
)

finally:
bigquery_client.delete_dataset(
dataset=temp_bq_dataset.dataset_id, delete_contents=True,
)

return entity_type_obj

@staticmethod
def _instantiate_featurestore_online_client(
location: Optional[str] = None,
Expand Down
12 changes: 7 additions & 5 deletions google/cloud/aiplatform/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,9 +628,11 @@ def get_timestamp_proto(
"""
if not time:
time = datetime.datetime.now()
t = time.timestamp()
seconds = int(t)
# must not have higher than millisecond precision.
nanos = int((t % 1 * 1e6) * 1e3)

return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
time_str = time.isoformat(sep=" ", timespec="milliseconds")
time = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S.%f")

timestamp_proto = timestamp_pb2.Timestamp()
timestamp_proto.FromDatetime(time)

return timestamp_proto
1 change: 1 addition & 0 deletions tests/system/aiplatform/e2e_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

_PROJECT = os.getenv("BUILD_SPECIFIC_GCLOUD_PROJECT")
_LOCATION = "us-central1"
_PROJECT = "aiplatform-dev"
morgandu marked this conversation as resolved.
Show resolved Hide resolved


class TestEndToEnd(metaclass=abc.ABCMeta):
Expand Down
110 changes: 96 additions & 14 deletions tests/system/aiplatform/test_featurestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#

import datetime
import logging

from google.cloud import aiplatform
Expand Down Expand Up @@ -186,22 +187,17 @@ def test_ingest_feature_values(self, shared_state, caplog):
gcs_source_uris=_TEST_USERS_ENTITY_TYPE_GCS_SRC,
gcs_source_type="avro",
entity_id_field="user_id",
worker_count=2,
worker_count=1,
)

assert "EntityType feature values imported." in caplog.text

caplog.clear()

def test_batch_create_features_and_ingest_feature_values(
self, shared_state, caplog
):

def test_batch_create_features(self, shared_state):
assert shared_state["movie_entity_type"]
movie_entity_type = shared_state["movie_entity_type"]

caplog.set_level(logging.INFO)

aiplatform.init(
project=e2e_base._PROJECT, location=e2e_base._LOCATION,
)
Expand All @@ -217,21 +213,107 @@ def test_batch_create_features_and_ingest_feature_values(

movie_entity_type.batch_create_features(feature_configs=movie_feature_configs)

movie_entity_type.ingest_from_gcs(
list_movie_features = movie_entity_type.list_features()
assert len(list_movie_features) == 3

def test_ingest_feature_values_from_df_using_feature_time_column(
self, shared_state, caplog
):

assert shared_state["movie_entity_type"]
movie_entity_type = shared_state["movie_entity_type"]

caplog.set_level(logging.INFO)

aiplatform.init(
project=e2e_base._PROJECT, location=e2e_base._LOCATION,
)

movies_df = pd.DataFrame(
data=[
{
"movie_id": "movie_01",
"average_rating": 4.9,
"title": "The Shawshank Redemption",
"genres": "Drama",
"update_time": "2021-08-20 20:44:11.094375+00:00",
},
{
"movie_id": "movie_02",
"average_rating": 4.2,
"title": "The Shining",
"genres": "Horror",
"update_time": "2021-08-20 20:44:11.094375+00:00",
},
],
columns=["movie_id", "average_rating", "title", "genres", "update_time"],
)
movies_df = movies_df.astype({"update_time": "datetime64"})
feature_time_column = "update_time"

movie_entity_type.ingest_from_df(
feature_ids=[
_TEST_MOVIE_TITLE_FEATURE_ID,
_TEST_MOVIE_GENRES_FEATURE_ID,
_TEST_MOVIE_AVERAGE_RATING_FEATURE_ID,
],
feature_time="update_time",
gcs_source_uris=_TEST_MOVIES_ENTITY_TYPE_GCS_SRC,
gcs_source_type="avro",
feature_time=feature_time_column,
df_source=movies_df,
entity_id_field="movie_id",
worker_count=2,
worker_count=1,
)

list_movie_features = movie_entity_type.list_features()
assert len(list_movie_features) == 3
assert "EntityType feature values imported." in caplog.text
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anyway we can assert the resource has been updated based on the ingest from df?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated with the read method which produce a pd from online reading, to validate the feature values before and after ingestion, using a raw data created dataframe. This in turn validated both read and ingest_from_dfmethod.

caplog.clear()

def test_ingest_feature_values_from_df_using_feature_time_datetime(
self, shared_state, caplog
):
assert shared_state["movie_entity_type"]
movie_entity_type = shared_state["movie_entity_type"]

caplog.set_level(logging.INFO)

aiplatform.init(
project=e2e_base._PROJECT, location=e2e_base._LOCATION,
)

movies_df = pd.DataFrame(
data=[
{
"movie_id": "movie_03",
"average_rating": 4.5,
"title": "Cinema Paradiso",
"genres": "Romance",
},
{
"movie_id": "movie_04",
"average_rating": 4.6,
"title": "The Dark Knight",
"genres": "Action",
},
],
columns=["movie_id", "average_rating", "title", "genres"],
)

feature_time_datetime_str = datetime.datetime.now().isoformat(
sep=" ", timespec="milliseconds"
)
feature_time_datetime = datetime.datetime.strptime(
feature_time_datetime_str, "%Y-%m-%d %H:%M:%S.%f"
)

movie_entity_type.ingest_from_df(
feature_ids=[
_TEST_MOVIE_TITLE_FEATURE_ID,
_TEST_MOVIE_GENRES_FEATURE_ID,
_TEST_MOVIE_AVERAGE_RATING_FEATURE_ID,
],
feature_time=feature_time_datetime,
df_source=movies_df,
entity_id_field="movie_id",
worker_count=1,
)

assert "EntityType feature values imported." in caplog.text

Expand Down
Loading