Skip to content

Commit

Permalink
feat: Adding Milvus demo to examples (#4910)
Browse files Browse the repository at this point in the history
* checking in progress but this Pr still is not ready yet

Signed-off-by: Francisco Javier Arceo <[email protected]>

* feat: Adding new method to FeatureStore to allow more flexible retrieval of features from vector similarity search

Signed-off-by: Francisco Javier Arceo <[email protected]>

* Adding requested_features back into online_store

Signed-off-by: Francisco Javier Arceo <[email protected]>

* feat: Adding RAG demo displaying Milvus usage for RAG

Signed-off-by: Francisco Javier Arceo <[email protected]>

* uploading sample data and updated yaml

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updating workflow

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updated example

Signed-off-by: Francisco Javier Arceo <[email protected]>

* removing modified files

Signed-off-by: Francisco Javier Arceo <[email protected]>

* reverting postgres change

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updating test_workflow

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updated and fixed bug

Signed-off-by: Francisco Javier Arceo <[email protected]>

* fixing a bad merge/rebase

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updated linter because of latest nistall

Signed-off-by: Francisco Javier Arceo <[email protected]>

* reverting feature store change

Signed-off-by: Francisco Javier Arceo <[email protected]>

* adding logging of local milvus back

Signed-off-by: Francisco Javier Arceo <[email protected]>

* Updating readme and adding notebook

Signed-off-by: Francisco Javier Arceo <[email protected]>

* updated readme

Signed-off-by: Francisco Javier Arceo <[email protected]>

---------

Signed-off-by: Francisco Javier Arceo <[email protected]>
  • Loading branch information
franciscojavierarceo authored Jan 30, 2025
1 parent aaa915a commit 2daf852
Show file tree
Hide file tree
Showing 10 changed files with 1,281 additions and 33 deletions.
36 changes: 18 additions & 18 deletions docs/reference/online-stores/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ Details for each specific online store, such as how to configure it in a `featur

Below is a matrix indicating which online stores support what functionality.

| | Sqlite | Redis | DynamoDB | Snowflake | Datastore | Postgres | Hbase | [[Cassandra](https://cassandra.apache.org/_/index.html) / [Astra DB](https://www.datastax.com/products/datastax-astra?utm_source=feast)] | [IKV](https://inlined.io) |
| :-------------------------------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- |
| write feature values to the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| read feature values from the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| update infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| teardown infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| generate a plan of infrastructure changes | yes | no | no | no | no | no | no | yes | no |
| support for on-demand transforms | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| readable by Python SDK | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| readable by Java | no | yes | no | no | no | no | no | no | no |
| readable by Go | yes | yes | no | no | no | no | no | no | no |
| support for entityless feature views | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| support for concurrent writing to the same key | no | yes | no | no | no | no | no | no | yes |
| support for ttl (time to live) at retrieval | no | yes | no | no | no | no | no | no | no |
| support for deleting expired data | no | yes | no | no | no | no | no | no | no |
| collocated by feature view | yes | no | yes | yes | yes | yes | yes | yes | no |
| collocated by feature service | no | no | no | no | no | no | no | no | no |
| collocated by entity key | no | yes | no | no | no | no | no | no | yes |
| | Sqlite | Redis | DynamoDB | Snowflake | Datastore | Postgres | Hbase | [[Cassandra](https://cassandra.apache.org/_/index.html) / [Astra DB](https://www.datastax.com/products/datastax-astra?utm_source=feast)] | [IKV](https://inlined.io) | Milvus |
| :-------------------------------------------------------- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- | :-- |:-------|
| write feature values to the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| read feature values from the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| update infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| teardown infrastructure (e.g. tables) in the online store | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| generate a plan of infrastructure changes | yes | no | no | no | no | no | no | yes | no | no |
| support for on-demand transforms | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| readable by Python SDK | yes | yes | yes | yes | yes | yes | yes | yes | yes | yes |
| readable by Java | no | yes | no | no | no | no | no | no | no | no |
| readable by Go | yes | yes | no | no | no | no | no | no | no | no |
| support for entityless feature views | yes | yes | yes | yes | yes | yes | yes | yes | yes | no |
| support for concurrent writing to the same key | no | yes | no | no | no | no | no | no | yes | no |
| support for ttl (time to live) at retrieval | no | yes | no | no | no | no | no | no | no | no |
| support for deleting expired data | no | yes | no | no | no | no | no | no | no | no |
| collocated by feature view | yes | no | yes | yes | yes | yes | yes | yes | no | no |
| collocated by feature service | no | no | no | no | no | no | no | no | no | no |
| collocated by entity key | no | yes | no | no | no | no | no | no | yes | no |
88 changes: 88 additions & 0 deletions examples/rag/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# 🚀 Quickstart: Retrieval-Augmented Generation (RAG) using Feast and Large Language Models (LLMs)

This project demonstrates how to use **Feast** to power a **Retrieval-Augmented Generation (RAG)** application.
The RAG architecture combines retrieval of documents (using vector search) with In-Context-Learning (ICL) through a
**Large Language Model (LLM)** to answer user questions accurately using structured and unstructured data.

## 💡 Why Use Feast for RAG?

- **Online retrieval of features:** Ensure real-time access to precomputed document embeddings and other structured data.
- **Declarative feature definitions:** Define feature views and entities in a Python file and empower Data Scientists to easily ship scalabe RAG applications with all of the existing benefits of Feast.
- **Vector search:** Leverage Feast’s integration with vector databases like **Milvus** to find relevant documents based on a similarity metric (e.g., cosine).
- **Structured and unstructured context:** Retrieve both embeddings and traditional features, injecting richer context into LLM prompts.
- **Versioning and reusability:** Collaborate across teams with discoverable, versioned data pipelines.

---

## 📂 Project Structure

- **`data/`**: Contains the demo data, including Wikipedia summaries of cities with sentence embeddings stored in a Parquet file.
- **`example_repo.py`**: Defines the feature views and entity configurations for Feast.
- **`feature_store.yaml`**: Configures the offline and online stores (using local files and Milvus Lite in this demo).
- **`test_workflow.py`**: Demonstrates key Feast commands to define, retrieve, and push features.

---

## 🛠️ Setup

1. **Install the necessary packages**:
```bash
pip install feast torch transformers openai
```
2. Initialize and inspect the feature store:

```bash
feast apply
```

3. Materialize features into the online store:

```bash
python -c "from datetime import datetime; from feast import FeatureStore; store = FeatureStore(repo_path='.')"
python -c "store.materialize_incremental(datetime.utcnow())"
```
4. Run a query:

- Prepare your question:
`question = "Which city has the largest population in New York?"`
- Embed the question using sentence-transformers/all-MiniLM-L6-v2.
- Retrieve the top K most relevant documents using Milvus vector search.
- Pass the retrieved context to the OpenAI model for conversational output.

## 🛠️ Key Commands for Data Scientists
- Apply feature definitions:

```bash
feast apply
```

- Materialize features to the online store:
```python
store.write_to_online_store(feature_view_name='city_embeddings', df=df)
```

-Inspect retrieved features using Python:
```python
context_data = store.retrieve_online_documents_v2(
features=[
"city_embeddings:vector",
"city_embeddings:item_id",
"city_embeddings:state",
"city_embeddings:sentence_chunks",
"city_embeddings:wiki_summary",
],
query=query,
top_k=3,
distance_metric='COSINE',
).to_df()
display(context_data)
```

📊 Example Output
When querying: Which city has the largest population in New York?

The model provides:

```
The largest city in New York is New York City, often referred to as NYC. It is the most populous city in the United States, with an estimated population of 8,335,897 in 2022.
```
Empty file added examples/rag/__init__.py
Empty file.
Empty file.
Binary file not shown.
42 changes: 42 additions & 0 deletions examples/rag/feature_repo/example_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from datetime import timedelta

from feast import (
FeatureView,
Field,
FileSource,
)
from feast.data_format import ParquetFormat
from feast.types import Float32, Array, String, ValueType
from feast import Entity

item = Entity(
name="item_id",
description="Item ID",
value_type=ValueType.INT64,
)

parquet_file_path = "./data/city_wikipedia_summaries_with_embeddings.parquet"

source = FileSource(
file_format=ParquetFormat(),
path=parquet_file_path,
timestamp_field="event_timestamp",
)

city_embeddings_feature_view = FeatureView(
name="city_embeddings",
entities=[item],
schema=[
Field(
name="vector",
dtype=Array(Float32),
vector_index=True,
vector_search_metric="COSINE",
),
Field(name="state", dtype=String),
Field(name="sentence_chunks", dtype=String),
Field(name="wiki_summary", dtype=String),
],
source=source,
ttl=timedelta(hours=2),
)
17 changes: 17 additions & 0 deletions examples/rag/feature_repo/feature_store.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
project: rag
provider: local
registry: data/registry.db
online_store:
type: milvus
path: data/online_store.db
vector_enabled: true
embedding_dim: 384
index_type: "IVF_FLAT"


offline_store:
type: file
entity_key_serialization_version: 3
# By default, no_auth for authentication and authorization, other possible values kubernetes and oidc. Refer the documentation for more details.
auth:
type: no_auth
74 changes: 74 additions & 0 deletions examples/rag/feature_repo/test_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import pandas as pd
import torch
import torch.nn.functional as F
from feast import FeatureStore
from transformers import AutoTokenizer, AutoModel
from example_repo import city_embeddings_feature_view, item

TOKENIZER = "sentence-transformers/all-MiniLM-L6-v2"
MODEL = "sentence-transformers/all-MiniLM-L6-v2"


def mean_pooling(model_output, attention_mask):
token_embeddings = model_output[
0
] # First element of model_output contains all token embeddings
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
)
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9
)


def run_model(sentences, tokenizer, model):
encoded_input = tokenizer(
sentences, padding=True, truncation=True, return_tensors="pt"
)
# Compute token embeddings
with torch.no_grad():
model_output = model(**encoded_input)

sentence_embeddings = mean_pooling(model_output, encoded_input["attention_mask"])
sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1)
return sentence_embeddings

def run_demo():
store = FeatureStore(repo_path=".")
df = pd.read_parquet("./data/city_wikipedia_summaries_with_embeddings.parquet")
embedding_length = len(df['vector'][0])
print(f'embedding length = {embedding_length}')

store.apply([city_embeddings_feature_view, item])
fields = [
f.name for f in city_embeddings_feature_view.features
] + city_embeddings_feature_view.entities + [city_embeddings_feature_view.batch_source.timestamp_field]
print('\ndata=')
print(df[fields].head().T)
store.write_to_online_store("city_embeddings", df[fields][0:3])


question = "the most populous city in the state of New York is New York"
tokenizer = AutoTokenizer.from_pretrained(TOKENIZER)
model = AutoModel.from_pretrained(MODEL)
query_embedding = run_model(question, tokenizer, model)
query = query_embedding.detach().cpu().numpy().tolist()[0]

# Retrieve top k documents
features = store.retrieve_online_documents_v2(
features=[
"city_embeddings:vector",
"city_embeddings:item_id",
"city_embeddings:state",
"city_embeddings:sentence_chunks",
"city_embeddings:wiki_summary",
],
query=query,
top_k=3,
)
print("features =")
print(features.to_df())
store.teardown()

if __name__ == "__main__":
run_demo()
Loading

0 comments on commit 2daf852

Please sign in to comment.