Skip to content

Commit

Permalink
feat: enable feature store batch serve to BigQuery and GCS for csv an…
Browse files Browse the repository at this point in the history
…d tfrecord (googleapis#919)

* feat: add batch_serve_to_bq for bigquery table and batch_serve_to_gcs for csv and tfrecord files in Featurestore class

* fix: change entity_type_ids and entity_type_destination_fields to serving_feature_ids and feature_destination_fields

* fix: remove white space

* Update google/cloud/aiplatform/featurestore/featurestore.py

Co-authored-by: sasha-gitg <[email protected]>

* Update google/cloud/aiplatform/featurestore/featurestore.py

Co-authored-by: sasha-gitg <[email protected]>

* Update google/cloud/aiplatform/featurestore/featurestore.py

Co-authored-by: sasha-gitg <[email protected]>

* Update google/cloud/aiplatform/featurestore/featurestore.py

Co-authored-by: sasha-gitg <[email protected]>

* Update google/cloud/aiplatform/featurestore/featurestore.py

Co-authored-by: sasha-gitg <[email protected]>

* fix: Featurestore create method example usage

* fix: get_timestamp_proto for millisecond precision cap

* fix: unit tests for get_timestamp_proto

Co-authored-by: sasha-gitg <[email protected]>
  • Loading branch information
2 people authored and ivanmkc committed Jan 28, 2022
1 parent 11493b5 commit 2cdc95b
Show file tree
Hide file tree
Showing 7 changed files with 959 additions and 40 deletions.
477 changes: 467 additions & 10 deletions google/cloud/aiplatform/featurestore/featurestore.py

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions google/cloud/aiplatform/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,9 +628,11 @@ def get_timestamp_proto(
"""
if not time:
time = datetime.datetime.now()
t = time.timestamp()
seconds = int(t)
# must not have higher than millisecond precision.
nanos = int((t % 1 * 1e6) * 1e3)

return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
time_str = time.isoformat(sep=" ", timespec="milliseconds")
time = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S.%f")

timestamp_proto = timestamp_pb2.Timestamp()
timestamp_proto.FromDatetime(time)

return timestamp_proto
1 change: 1 addition & 0 deletions google/cloud/aiplatform/utils/featurestore_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

RESOURCE_ID_PATTERN_REGEX = r"[a-z_][a-z0-9_]{0,59}"
GCS_SOURCE_TYPE = {"csv", "avro"}
GCS_DESTINATION_TYPE = {"csv", "tfrecord"}

_FEATURE_VALUE_TYPE_UNSPECIFIED = "VALUE_TYPE_UNSPECIFIED"

Expand Down
33 changes: 33 additions & 0 deletions tests/system/aiplatform/e2e_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from google.api_core import exceptions
from google.cloud import aiplatform
from google.cloud import bigquery
from google.cloud import storage
from google.cloud.aiplatform import initializer

Expand Down Expand Up @@ -90,6 +91,38 @@ def delete_staging_bucket(self, shared_state: Dict[str, Any]):
bucket = shared_state["bucket"]
bucket.delete(force=True)

@pytest.fixture(scope="class")
def prepare_bigquery_dataset(
self, shared_state: Dict[str, Any]
) -> Generator[bigquery.dataset.Dataset, None, None]:
"""Create a bigquery dataset and store bigquery resource object in shared state."""

bigquery_client = bigquery.Client(project=_PROJECT)
shared_state["bigquery_client"] = bigquery_client

dataset_name = f"{self._temp_prefix.lower()}_{uuid.uuid4()}".replace("-", "_")
dataset_id = f"{_PROJECT}.{dataset_name}"
shared_state["bigquery_dataset_id"] = dataset_id

dataset = bigquery.Dataset(dataset_id)
dataset.location = _LOCATION
shared_state["bigquery_dataset"] = bigquery_client.create_dataset(dataset)

yield

@pytest.fixture(scope="class")
def delete_bigquery_dataset(self, shared_state: Dict[str, Any]):
"""Delete the bigquery dataset"""

yield

# Get the bigquery dataset id used for testing and wipe it
bigquery_dataset = shared_state["bigquery_dataset"]
bigquery_client = shared_state["bigquery_client"]
bigquery_client.delete_dataset(
bigquery_dataset.dataset_id, delete_contents=True, not_found_ok=True
) # Make an API request.

@pytest.fixture(scope="class", autouse=True)
def teardown(self, shared_state: Dict[str, Any]):
"""Delete every Vertex AI resource created during test"""
Expand Down
116 changes: 115 additions & 1 deletion tests/system/aiplatform/test_featurestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import logging
import pytest

from google.cloud import aiplatform
from tests.system.aiplatform import e2e_base
Expand All @@ -29,6 +30,8 @@
"gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movies.avro"
)

_TEST_READ_INSTANCE_SRC = "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movie_prediction.csv"

_TEST_FEATURESTORE_ID = "movie_prediction"
_TEST_USER_ENTITY_TYPE_ID = "users"
_TEST_MOVIE_ENTITY_TYPE_ID = "movies"
Expand All @@ -42,6 +45,12 @@
_TEST_MOVIE_AVERAGE_RATING_FEATURE_ID = "average_rating"


@pytest.mark.usefixtures(
"prepare_staging_bucket",
"delete_staging_bucket",
"prepare_bigquery_dataset",
"delete_bigquery_dataset",
)
class TestFeaturestore(e2e_base.TestEndToEnd):

_temp_prefix = "temp_vertex_sdk_e2e_featurestore_test"
Expand Down Expand Up @@ -131,7 +140,7 @@ def test_create_get_list_features(self, shared_state):
user_age_feature = user_entity_type.create_feature(
feature_id=_TEST_USER_AGE_FEATURE_ID, value_type="INT64"
)

shared_state["user_age_feature_resource_name"] = user_age_feature.resource_name
get_user_age_feature = user_entity_type.get_feature(
feature_id=_TEST_USER_AGE_FEATURE_ID
)
Expand All @@ -142,6 +151,9 @@ def test_create_get_list_features(self, shared_state):
value_type="STRING",
entity_type_name=user_entity_type_name,
)
shared_state[
"user_gender_feature_resource_name"
] = user_gender_feature.resource_name

get_user_gender_feature = aiplatform.Feature(
feature_name=user_gender_feature.resource_name
Expand All @@ -153,6 +165,9 @@ def test_create_get_list_features(self, shared_state):
user_liked_genres_feature = user_entity_type.create_feature(
feature_id=_TEST_USER_LIKED_GENRES_FEATURE_ID, value_type="STRING_ARRAY",
)
shared_state[
"user_liked_genres_feature_resource_name"
] = user_liked_genres_feature.resource_name

get_user_liked_genres_feature = aiplatform.Feature(
feature_name=user_liked_genres_feature.resource_name
Expand Down Expand Up @@ -250,6 +265,105 @@ def test_search_features(self, shared_state):
len(list_searched_features) - shared_state["base_list_searched_features"]
) == 6

def test_batch_serve_to_gcs(self, shared_state, caplog):

assert shared_state["featurestore"]
assert shared_state["bucket"]
assert shared_state["user_age_feature_resource_name"]
assert shared_state["user_gender_feature_resource_name"]
assert shared_state["user_liked_genres_feature_resource_name"]

featurestore = shared_state["featurestore"]
bucket_name = shared_state["staging_bucket_name"]
user_age_feature_resource_name = shared_state["user_age_feature_resource_name"]
user_gender_feature_resource_name = shared_state[
"user_gender_feature_resource_name"
]
user_liked_genres_feature_resource_name = shared_state[
"user_liked_genres_feature_resource_name"
]

aiplatform.init(
project=e2e_base._PROJECT, location=e2e_base._LOCATION,
)

caplog.set_level(logging.INFO)

featurestore.batch_serve_to_gcs(
serving_feature_ids={
_TEST_USER_ENTITY_TYPE_ID: [
_TEST_USER_AGE_FEATURE_ID,
_TEST_USER_GENDER_FEATURE_ID,
_TEST_USER_LIKED_GENRES_FEATURE_ID,
],
_TEST_MOVIE_ENTITY_TYPE_ID: [
_TEST_MOVIE_TITLE_FEATURE_ID,
_TEST_MOVIE_GENRES_FEATURE_ID,
_TEST_MOVIE_AVERAGE_RATING_FEATURE_ID,
],
},
feature_destination_fields={
user_age_feature_resource_name: "user_age_dest",
user_gender_feature_resource_name: "user_gender_dest",
user_liked_genres_feature_resource_name: "user_liked_genres_dest",
},
read_instances=_TEST_READ_INSTANCE_SRC,
gcs_destination_output_uri_prefix=f"gs://{bucket_name}/featurestore_test/tfrecord",
gcs_destination_type="tfrecord",
)
assert "Featurestore feature values served." in caplog.text

caplog.clear()

def test_batch_serve_to_bq(self, shared_state, caplog):

assert shared_state["featurestore"]
assert shared_state["bigquery_dataset"]
assert shared_state["user_age_feature_resource_name"]
assert shared_state["user_gender_feature_resource_name"]
assert shared_state["user_liked_genres_feature_resource_name"]

featurestore = shared_state["featurestore"]
bigquery_dataset_id = shared_state["bigquery_dataset_id"]
user_age_feature_resource_name = shared_state["user_age_feature_resource_name"]
user_gender_feature_resource_name = shared_state[
"user_gender_feature_resource_name"
]
user_liked_genres_feature_resource_name = shared_state[
"user_liked_genres_feature_resource_name"
]

aiplatform.init(
project=e2e_base._PROJECT, location=e2e_base._LOCATION,
)

caplog.set_level(logging.INFO)

featurestore.batch_serve_to_bq(
serving_feature_ids={
_TEST_USER_ENTITY_TYPE_ID: [
_TEST_USER_AGE_FEATURE_ID,
_TEST_USER_GENDER_FEATURE_ID,
_TEST_USER_LIKED_GENRES_FEATURE_ID,
],
_TEST_MOVIE_ENTITY_TYPE_ID: [
_TEST_MOVIE_TITLE_FEATURE_ID,
_TEST_MOVIE_GENRES_FEATURE_ID,
_TEST_MOVIE_AVERAGE_RATING_FEATURE_ID,
],
},
feature_destination_fields={
user_age_feature_resource_name: "user_age_dest",
user_gender_feature_resource_name: "user_gender_dest",
user_liked_genres_feature_resource_name: "user_liked_genres_dest",
},
read_instances=_TEST_READ_INSTANCE_SRC,
bq_destination_output_uri=f"bq://{bigquery_dataset_id}.test_table",
)

assert "Featurestore feature values served." in caplog.text
caplog.clear()

def test_online_reads(self, shared_state):
assert shared_state["user_entity_type"]
assert shared_state["movie_entity_type"]
Expand Down
Loading

0 comments on commit 2cdc95b

Please sign in to comment.