Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for pyspark in dataprep microservice #180

Merged
merged 8 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions comps/dataprep/redis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ We organized these two folders in the same way, so you can use either framework
- option 1: Install Single-process version (for 1-10 files processing)

```bash
apt update
apt install default-jre
# for langchain
cd langchain
# for llama_index
Expand Down
1 change: 1 addition & 0 deletions comps/dataprep/redis/langchain/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missin
build-essential \
libgl1-mesa-glx \
libjemalloc-dev \
default-jre \
vim

RUN useradd -m -s /bin/bash user && \
Expand Down
26 changes: 25 additions & 1 deletion comps/dataprep/redis/langchain/prepare_doc_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from langchain_community.vectorstores import Redis
from langchain_text_splitters import HTMLHeaderTextSplitter
from langsmith import traceable
from pyspark import SparkConf, SparkContext

from comps import DocPath, opea_microservices, register_microservice
from comps.dataprep.utils import document_loader, parse_html
Expand Down Expand Up @@ -128,11 +129,34 @@ async def ingest_documents(
upload_folder = "./uploaded_files/"
if not os.path.exists(upload_folder):
Path(upload_folder).mkdir(parents=True, exist_ok=True)
uploaded_files = []
for file in files:
save_path = upload_folder + file.filename
await save_file_to_local_disk(save_path, file)
ingest_data_to_redis(DocPath(path=save_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap))
uploaded_files.append(save_path)
print(f"Successfully saved file {save_path}")

def process_files_wrapper(files):
if not isinstance(files, list):
files = [files]
for file in files:
ingest_data_to_redis(DocPath(path=file, chunk_size=chunk_size, chunk_overlap=chunk_overlap))

try:
# Create a SparkContext
conf = SparkConf().setAppName("Parallel-dataprep").setMaster("local[*]")
sc = SparkContext(conf=conf)
# Create an RDD with parallel processing
parallel_num = min(len(uploaded_files), os.cpu_count())
rdd = sc.parallelize(uploaded_files, parallel_num)
# Perform a parallel operation
rdd_trans = rdd.map(process_files_wrapper)
rdd_trans.collect()
# Stop the SparkContext
sc.stop()
except:
# Stop the SparkContext
sc.stop()
return {"status": 200, "message": "Data preparation succeeded"}

if link_list:
Expand Down
3 changes: 3 additions & 0 deletions comps/dataprep/redis/langchain/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ langchain
langchain-community
langchain-text-splitters
langsmith
markdown
numpy
opentelemetry-api
opentelemetry-exporter-otlp
Expand All @@ -15,7 +16,9 @@ pandas
Pillow
prometheus-fastapi-instrumentator
pymupdf
pyspark
python-docx
redis
sentence_transformers
shortuuid
unstructured