Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable feature store batch serve to BigQuery and GCS for csv and tfrecord #919

Merged
merged 18 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
6db4abc
feat: add batch_serve_to_bq for bigquery table and batch_serve_to_gcs…
morgandu Dec 17, 2021
964fb1e
Merge branch 'main' into mor--feature-store-batch-serve
morgandu Dec 21, 2021
c6dd8f8
Merge branch 'main' into mor--feature-store-batch-serve
morgandu Dec 22, 2021
dc83aaa
Merge branch 'main' into mor--feature-store-batch-serve
morgandu Jan 4, 2022
e1ec440
Merge branch 'main' into mor--feature-store-batch-serve
morgandu Jan 14, 2022
5781c2b
Merge branch 'main' into mor--feature-store-batch-serve
morgandu Jan 15, 2022
fed974c
Merge branch 'main' into mor--feature-store-batch-serve
morgandu Jan 15, 2022
f4c9976
fix: change entity_type_ids and entity_type_destination_fields to ser…
morgandu Jan 15, 2022
8f85a88
fix: remove white space
morgandu Jan 15, 2022
758b0dd
Update google/cloud/aiplatform/featurestore/featurestore.py
morgandu Jan 25, 2022
79a340b
Update google/cloud/aiplatform/featurestore/featurestore.py
morgandu Jan 25, 2022
7e0a18b
Update google/cloud/aiplatform/featurestore/featurestore.py
morgandu Jan 25, 2022
a49d2b4
Update google/cloud/aiplatform/featurestore/featurestore.py
morgandu Jan 25, 2022
c191c48
Update google/cloud/aiplatform/featurestore/featurestore.py
morgandu Jan 25, 2022
c6bf16c
Merge branch 'main' into mor--feature-store-batch-serve
morgandu Jan 25, 2022
d949b10
fix: Featurestore create method example usage
morgandu Jan 25, 2022
90bbfc9
fix: get_timestamp_proto for millisecond precision cap
morgandu Jan 25, 2022
45f0a02
fix: unit tests for get_timestamp_proto
morgandu Jan 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
477 changes: 467 additions & 10 deletions google/cloud/aiplatform/featurestore/featurestore.py

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions google/cloud/aiplatform/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,9 +628,11 @@ def get_timestamp_proto(
"""
if not time:
time = datetime.datetime.now()
t = time.timestamp()
seconds = int(t)
# must not have higher than millisecond precision.
nanos = int((t % 1 * 1e6) * 1e3)

return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
time_str = time.isoformat(sep=" ", timespec="milliseconds")
time = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S.%f")

timestamp_proto = timestamp_pb2.Timestamp()
timestamp_proto.FromDatetime(time)

return timestamp_proto
1 change: 1 addition & 0 deletions google/cloud/aiplatform/utils/featurestore_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

RESOURCE_ID_PATTERN_REGEX = r"[a-z_][a-z0-9_]{0,59}"
GCS_SOURCE_TYPE = {"csv", "avro"}
GCS_DESTINATION_TYPE = {"csv", "tfrecord"}

_FEATURE_VALUE_TYPE_UNSPECIFIED = "VALUE_TYPE_UNSPECIFIED"

Expand Down
33 changes: 33 additions & 0 deletions tests/system/aiplatform/e2e_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from google.api_core import exceptions
from google.cloud import aiplatform
from google.cloud import bigquery
from google.cloud import storage
from google.cloud.aiplatform import initializer

Expand Down Expand Up @@ -90,6 +91,38 @@ def delete_staging_bucket(self, shared_state: Dict[str, Any]):
bucket = shared_state["bucket"]
bucket.delete(force=True)

@pytest.fixture(scope="class")
def prepare_bigquery_dataset(
self, shared_state: Dict[str, Any]
) -> Generator[bigquery.dataset.Dataset, None, None]:
"""Create a bigquery dataset and store bigquery resource object in shared state."""

bigquery_client = bigquery.Client(project=_PROJECT)
shared_state["bigquery_client"] = bigquery_client

dataset_name = f"{self._temp_prefix.lower()}_{uuid.uuid4()}".replace("-", "_")
dataset_id = f"{_PROJECT}.{dataset_name}"
shared_state["bigquery_dataset_id"] = dataset_id

dataset = bigquery.Dataset(dataset_id)
dataset.location = _LOCATION
shared_state["bigquery_dataset"] = bigquery_client.create_dataset(dataset)

yield

@pytest.fixture(scope="class")
def delete_bigquery_dataset(self, shared_state: Dict[str, Any]):
"""Delete the bigquery dataset"""

yield

# Get the bigquery dataset id used for testing and wipe it
bigquery_dataset = shared_state["bigquery_dataset"]
bigquery_client = shared_state["bigquery_client"]
bigquery_client.delete_dataset(
bigquery_dataset.dataset_id, delete_contents=True, not_found_ok=True
) # Make an API request.

@pytest.fixture(scope="class", autouse=True)
def teardown(self, shared_state: Dict[str, Any]):
"""Delete every Vertex AI resource created during test"""
Expand Down
116 changes: 115 additions & 1 deletion tests/system/aiplatform/test_featurestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import logging
import pytest

from google.cloud import aiplatform
from tests.system.aiplatform import e2e_base
Expand All @@ -29,6 +30,8 @@
"gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movies.avro"
)

_TEST_READ_INSTANCE_SRC = "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movie_prediction.csv"

_TEST_FEATURESTORE_ID = "movie_prediction"
_TEST_USER_ENTITY_TYPE_ID = "users"
_TEST_MOVIE_ENTITY_TYPE_ID = "movies"
Expand All @@ -42,6 +45,12 @@
_TEST_MOVIE_AVERAGE_RATING_FEATURE_ID = "average_rating"


@pytest.mark.usefixtures(
"prepare_staging_bucket",
"delete_staging_bucket",
"prepare_bigquery_dataset",
"delete_bigquery_dataset",
)
class TestFeaturestore(e2e_base.TestEndToEnd):

_temp_prefix = "temp_vertex_sdk_e2e_featurestore_test"
Expand Down Expand Up @@ -131,7 +140,7 @@ def test_create_get_list_features(self, shared_state):
user_age_feature = user_entity_type.create_feature(
feature_id=_TEST_USER_AGE_FEATURE_ID, value_type="INT64"
)

shared_state["user_age_feature_resource_name"] = user_age_feature.resource_name
get_user_age_feature = user_entity_type.get_feature(
feature_id=_TEST_USER_AGE_FEATURE_ID
)
Expand All @@ -142,6 +151,9 @@ def test_create_get_list_features(self, shared_state):
value_type="STRING",
entity_type_name=user_entity_type_name,
)
shared_state[
"user_gender_feature_resource_name"
] = user_gender_feature.resource_name

get_user_gender_feature = aiplatform.Feature(
feature_name=user_gender_feature.resource_name
Expand All @@ -153,6 +165,9 @@ def test_create_get_list_features(self, shared_state):
user_liked_genres_feature = user_entity_type.create_feature(
feature_id=_TEST_USER_LIKED_GENRES_FEATURE_ID, value_type="STRING_ARRAY",
)
shared_state[
"user_liked_genres_feature_resource_name"
] = user_liked_genres_feature.resource_name

get_user_liked_genres_feature = aiplatform.Feature(
feature_name=user_liked_genres_feature.resource_name
Expand Down Expand Up @@ -250,6 +265,105 @@ def test_search_features(self, shared_state):
len(list_searched_features) - shared_state["base_list_searched_features"]
) == 6

def test_batch_serve_to_gcs(self, shared_state, caplog):

assert shared_state["featurestore"]
assert shared_state["bucket"]
assert shared_state["user_age_feature_resource_name"]
assert shared_state["user_gender_feature_resource_name"]
assert shared_state["user_liked_genres_feature_resource_name"]

featurestore = shared_state["featurestore"]
bucket_name = shared_state["staging_bucket_name"]
user_age_feature_resource_name = shared_state["user_age_feature_resource_name"]
user_gender_feature_resource_name = shared_state[
"user_gender_feature_resource_name"
]
user_liked_genres_feature_resource_name = shared_state[
"user_liked_genres_feature_resource_name"
]

aiplatform.init(
project=e2e_base._PROJECT, location=e2e_base._LOCATION,
)

caplog.set_level(logging.INFO)

featurestore.batch_serve_to_gcs(
serving_feature_ids={
_TEST_USER_ENTITY_TYPE_ID: [
_TEST_USER_AGE_FEATURE_ID,
_TEST_USER_GENDER_FEATURE_ID,
_TEST_USER_LIKED_GENRES_FEATURE_ID,
],
_TEST_MOVIE_ENTITY_TYPE_ID: [
_TEST_MOVIE_TITLE_FEATURE_ID,
_TEST_MOVIE_GENRES_FEATURE_ID,
_TEST_MOVIE_AVERAGE_RATING_FEATURE_ID,
],
},
feature_destination_fields={
user_age_feature_resource_name: "user_age_dest",
user_gender_feature_resource_name: "user_gender_dest",
user_liked_genres_feature_resource_name: "user_liked_genres_dest",
},
read_instances=_TEST_READ_INSTANCE_SRC,
gcs_destination_output_uri_prefix=f"gs://{bucket_name}/featurestore_test/tfrecord",
gcs_destination_type="tfrecord",
)
assert "Featurestore feature values served." in caplog.text

caplog.clear()

def test_batch_serve_to_bq(self, shared_state, caplog):

assert shared_state["featurestore"]
assert shared_state["bigquery_dataset"]
assert shared_state["user_age_feature_resource_name"]
assert shared_state["user_gender_feature_resource_name"]
assert shared_state["user_liked_genres_feature_resource_name"]

featurestore = shared_state["featurestore"]
bigquery_dataset_id = shared_state["bigquery_dataset_id"]
user_age_feature_resource_name = shared_state["user_age_feature_resource_name"]
user_gender_feature_resource_name = shared_state[
"user_gender_feature_resource_name"
]
user_liked_genres_feature_resource_name = shared_state[
"user_liked_genres_feature_resource_name"
]

aiplatform.init(
project=e2e_base._PROJECT, location=e2e_base._LOCATION,
)

caplog.set_level(logging.INFO)

featurestore.batch_serve_to_bq(
serving_feature_ids={
_TEST_USER_ENTITY_TYPE_ID: [
_TEST_USER_AGE_FEATURE_ID,
_TEST_USER_GENDER_FEATURE_ID,
_TEST_USER_LIKED_GENRES_FEATURE_ID,
],
_TEST_MOVIE_ENTITY_TYPE_ID: [
_TEST_MOVIE_TITLE_FEATURE_ID,
_TEST_MOVIE_GENRES_FEATURE_ID,
_TEST_MOVIE_AVERAGE_RATING_FEATURE_ID,
],
},
feature_destination_fields={
user_age_feature_resource_name: "user_age_dest",
user_gender_feature_resource_name: "user_gender_dest",
user_liked_genres_feature_resource_name: "user_liked_genres_dest",
},
read_instances=_TEST_READ_INSTANCE_SRC,
bq_destination_output_uri=f"bq://{bigquery_dataset_id}.test_table",
)

assert "Featurestore feature values served." in caplog.text
caplog.clear()

def test_online_reads(self, shared_state):
assert shared_state["user_entity_type"]
assert shared_state["movie_entity_type"]
Expand Down
Loading