diff --git a/comps/dataprep/redis/README.md b/comps/dataprep/redis/README.md index b548af29b..31135219a 100644 --- a/comps/dataprep/redis/README.md +++ b/comps/dataprep/redis/README.md @@ -11,6 +11,8 @@ We organized these two folders in the same way, so you can use either framework - option 1: Install Single-process version (for 1-10 files processing) ```bash +apt update +apt install default-jre # for langchain cd langchain # for llama_index diff --git a/comps/dataprep/redis/langchain/docker/Dockerfile b/comps/dataprep/redis/langchain/docker/Dockerfile index 4c364c428..1d9f529b7 100644 --- a/comps/dataprep/redis/langchain/docker/Dockerfile +++ b/comps/dataprep/redis/langchain/docker/Dockerfile @@ -12,6 +12,7 @@ RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missin build-essential \ libgl1-mesa-glx \ libjemalloc-dev \ + default-jre \ vim RUN useradd -m -s /bin/bash user && \ diff --git a/comps/dataprep/redis/langchain/prepare_doc_redis.py b/comps/dataprep/redis/langchain/prepare_doc_redis.py index 41a870878..cdcbdd93a 100644 --- a/comps/dataprep/redis/langchain/prepare_doc_redis.py +++ b/comps/dataprep/redis/langchain/prepare_doc_redis.py @@ -14,6 +14,7 @@ from langchain_community.vectorstores import Redis from langchain_text_splitters import HTMLHeaderTextSplitter from langsmith import traceable +from pyspark import SparkConf, SparkContext from comps import DocPath, opea_microservices, register_microservice from comps.dataprep.utils import document_loader, parse_html @@ -128,11 +129,34 @@ async def ingest_documents( upload_folder = "./uploaded_files/" if not os.path.exists(upload_folder): Path(upload_folder).mkdir(parents=True, exist_ok=True) + uploaded_files = [] for file in files: save_path = upload_folder + file.filename await save_file_to_local_disk(save_path, file) - ingest_data_to_redis(DocPath(path=save_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap)) + uploaded_files.append(save_path) print(f"Successfully saved file {save_path}") + + def process_files_wrapper(files): + if not isinstance(files, list): + files = [files] + for file in files: + ingest_data_to_redis(DocPath(path=file, chunk_size=chunk_size, chunk_overlap=chunk_overlap)) + + try: + # Create a SparkContext + conf = SparkConf().setAppName("Parallel-dataprep").setMaster("local[*]") + sc = SparkContext(conf=conf) + # Create an RDD with parallel processing + parallel_num = min(len(uploaded_files), os.cpu_count()) + rdd = sc.parallelize(uploaded_files, parallel_num) + # Perform a parallel operation + rdd_trans = rdd.map(process_files_wrapper) + rdd_trans.collect() + # Stop the SparkContext + sc.stop() + except: + # Stop the SparkContext + sc.stop() return {"status": 200, "message": "Data preparation succeeded"} if link_list: diff --git a/comps/dataprep/redis/langchain/requirements.txt b/comps/dataprep/redis/langchain/requirements.txt index c4c559253..d974603b6 100644 --- a/comps/dataprep/redis/langchain/requirements.txt +++ b/comps/dataprep/redis/langchain/requirements.txt @@ -7,6 +7,7 @@ langchain langchain-community langchain-text-splitters langsmith +markdown numpy opentelemetry-api opentelemetry-exporter-otlp @@ -15,7 +16,9 @@ pandas Pillow prometheus-fastapi-instrumentator pymupdf +pyspark python-docx redis sentence_transformers shortuuid +unstructured