Skip to content

Commit

Permalink
Merge pull request #51 from cbtham/main
Browse files Browse the repository at this point in the history
Update Feast to support 0.18.x and later.
resolve #40
  • Loading branch information
samuel100 authored Mar 15, 2022
2 parents 887625e + 63a6731 commit 8e8657c
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 35 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,8 @@ config.json

# any data files
*.csv

# ignore python distrubution build folder
provider/sdk/build/
provider/sdk/dist/
provider/sdk/feast_azure_provider.egg-info
12 changes: 12 additions & 0 deletions provider/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ The only 2 required parameters during the set-up are:
az ad signed-in-user show --query objectId -o tsv
```

## Note
If you would like to recompile a custom version of feast-azure-provider, go to provider/sdk where setup.py is located, run
```bash
python setup.py bdist_wheel --universal
```
to generate an installable .egg file.

If you would like to install or run a custom version of feast-azure-provider, go to provider/sdk where setup.py is located, run
```bash
python setup.py install
```


## Contributing

Expand Down
2 changes: 1 addition & 1 deletion provider/cloud/fs_synapse_azuredeploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,4 @@
"outputs": {

}
}
}
108 changes: 81 additions & 27 deletions provider/sdk/feast_azure_provider/azure_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
import pyarrow as pa
from tqdm import tqdm

from feast import FeatureTable
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
_convert_arrow_to_proto,
_get_column_names,
_run_field_mapping,
Expand All @@ -23,62 +23,88 @@
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDataset

DEFAULT_BATCH_SIZE = 10_000

class AzureProvider(Provider):
def __init__(self, config: RepoConfig):
self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)
self.online_store = (
get_online_store_from_config(config.online_store)
if config.online_store
else None
)

def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)
# Call update only if there is an online store
if self.online_store:
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

def teardown_infra(
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
tables: Sequence[FeatureView],
entities: Sequence[Entity],
) -> None:
self.online_store.teardown(self.repo_config, tables, entities)
if self.online_store:
self.online_store.teardown(self.repo_config, tables, entities)

def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
self.online_store.online_write_batch(config, table, data, progress)
if self.online_store:
self.online_store.online_write_batch(config, table, data, progress)

def online_read(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
result = self.online_store.online_read(config, table, entity_keys)

result = []
if self.online_store:
result = self.online_store.online_read(config, table, entity_keys, requested_features)
return result

def ingest_df(
self, feature_view: FeatureView, entities: List[Entity], df: pandas.DataFrame,
):
table = pa.Table.from_pandas(df)

if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)

join_keys = {entity.join_key: entity.value_type for entity in entities}
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(
self.repo_config, feature_view, rows_to_write, progress=None
)

def materialize_single_feature_view(
self,
config: RepoConfig,
Expand Down Expand Up @@ -116,13 +142,17 @@ def materialize_single_feature_view(
if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
)
join_keys = {entity.join_key: entity.value_type for entity in entities}

with tqdm_builder(table.num_rows) as pbar:
for batch in table.to_batches(DEFAULT_BATCH_SIZE):
rows_to_write = _convert_arrow_to_proto(batch, feature_view, join_keys)
self.online_write_batch(
self.repo_config,
feature_view,
rows_to_write,
lambda x: pbar.update(x),
)

def get_historical_features(
self,
Expand All @@ -144,3 +174,27 @@ def get_historical_features(
full_feature_names=full_feature_names,
)
return job

def retrieve_saved_dataset(
self,
config: RepoConfig,
dataset: SavedDataset
) -> RetrievalJob:
feature_name_columns = [
ref.replace(":", "__") if dataset.full_feature_names else ref.split(":")[1]
for ref in dataset.features
]

# ToDo: replace hardcoded value
event_ts_column = "event_timestamp"

return self.offline_store.pull_all_from_table_or_query(
config=config,
data_source=dataset.storage.to_data_source(),
join_key_columns=dataset.join_keys,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_ts_column,
start_date=make_tzaware(dataset.min_event_timestamp), # type: ignore
end_date=make_tzaware(dataset.max_event_timestamp + timedelta(seconds=1)), # type: ignore
)

70 changes: 69 additions & 1 deletion provider/sdk/feast_azure_provider/mssqlserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
from feast.data_source import DataSource
from .mssqlserver_source import MsSqlServerSource
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.offline_stores.offline_store import (
OfflineStore,
RetrievalJob,
RetrievalMetadata,
)
from feast.infra.offline_stores.offline_utils import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
)
Expand All @@ -38,6 +42,8 @@
)
from feast.registry import Registry
from feast.repo_config import FeastBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast import FileSource

EntitySchema = Dict[str, np.dtype]

Expand Down Expand Up @@ -113,6 +119,43 @@ def pull_latest_from_table_or_query(
on_demand_feature_views=None,
)

def pull_all_from_table_or_query(
self,
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
assert type(data_source).__name__ == "MsSqlServerSource"
assert (
config.offline_store.type
== "feast_azure_provider.mssqlserver.MsSqlServerOfflineStore"
)
from_expression = data_source.get_table_query_string().replace("`", "")

field_string = ", ".join(join_key_columns + feature_name_columns + timestamps)

query = f"""
SELECT {field_string}
FROM (
SELECT {field_string}
FROM {from_expression}
WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
)
"""
self._make_engine(config.offline_store)

return MsSqlServerRetrievalJob(
query=query,
engine=self._engine,
config=config,
full_feature_names=False,
)


def get_historical_features(
self,
config: RepoConfig,
Expand Down Expand Up @@ -241,6 +284,7 @@ def __init__(
config: RepoConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
metadata: Optional[RetrievalMetadata] = None,
drop_columns: Optional[List[str]] = None,
):
self.query = query
Expand All @@ -249,6 +293,7 @@ def __init__(
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views
self._drop_columns = drop_columns
self._metadata = metadata

@property
def full_feature_names(self) -> bool:
Expand All @@ -264,6 +309,29 @@ def _to_df_internal(self) -> pandas.DataFrame:
def _to_arrow_internal(self) -> pyarrow.Table:
result = pandas.read_sql(self.query, con=self.engine).fillna(value=np.nan)
return pyarrow.Table.from_pandas(result)

## Implements persist in Feast 0.18 - This persists to filestorage
## ToDo: Persist to Azure Storage
def persist(self, storage: SavedDatasetStorage):
assert isinstance(storage, SavedDatasetFileStorage)

filesystem, path = FileSource.create_filesystem_and_path(
storage.file_options.file_url, storage.file_options.s3_endpoint_override,
)

if path.endswith(".parquet"):
pyarrow.parquet.write_table(
self.to_arrow(), where=path, filesystem=filesystem
)
else:
# otherwise assume destination is directory
pyarrow.parquet.write_to_dataset(
self.to_arrow(), root_path=path, filesystem=filesystem
)

@property
def metadata(self) -> Optional[RetrievalMetadata]:
return self._metadata


@dataclass(frozen=True)
Expand Down
12 changes: 6 additions & 6 deletions provider/sdk/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@

# README file from Feast repo root directory
README_FILE = os.path.join(repo_root, "README.md")
with open(README_FILE, "r") as f:
with open(README_FILE, "r", encoding="utf-8") as f:
LONG_DESCRIPTION = f.read()

setup(
name="feast-azure-provider",
author="Microsoft",
version="0.2.2",
version="0.2.299",
description="A Feast Azure Provider",
URL="https://github.com/Azure/feast-azure",
URL="https://github.com/azure/feast-azure",
long_description=LONG_DESCRIPTION,
long_description_content_type="text/markdown",
python_requires=">=3.7.0",
packages=find_packages(exclude=("tests",)),
install_requires=[
"feast[redis]==0.15.1",
"feast[redis]==0.18.1",
"azure-storage-blob>=0.37.0",
"azure-identity>=1.6.1",
"SQLAlchemy>=1.4.19",
"dill==0.3.4",
"pyodbc>=4.0.30",
"sqlalchemy>=1.4",
],
extras_require={"dev": ["pytest", "mypy", "assertpy"]},
extras_require={"dev": ["pytest", "mypy", "assertpy"],
"snowflake": ["snowflake-connector-python[pandas]>=2.7.3"]},
# https://stackoverflow.com/questions/28509965/setuptools-development-requirements
# Install dev requirements with: pip install -e .[dev]
include_package_data=True,
Expand Down

0 comments on commit 8e8657c

Please sign in to comment.