Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine Dataprep Code & UT #404

Merged
merged 5 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 45 additions & 64 deletions comps/dataprep/redis/langchain/prepare_doc_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,34 +104,8 @@ def delete_by_id(client, id):
return True


def ingest_data_to_redis(doc_path: DocPath):
"""Ingest document to Redis."""
path = doc_path.path
print(f"Parsing document {path}.")

if path.endswith(".html"):
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
("h3", "Header 3"),
]
text_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
else:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=doc_path.chunk_size,
chunk_overlap=doc_path.chunk_overlap,
add_start_index=True,
separators=get_separators(),
)

content = document_loader(path)

chunks = text_splitter.split_text(content)
if doc_path.process_table and path.endswith(".pdf"):
table_chunks = get_tables_result(path, doc_path.table_strategy)
chunks = chunks + table_chunks
print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf")

def ingest_chunks_to_redis(file_name: str, chunks: List):
print(f"[ ingest chunks ] file name: {file_name}")
# Create vectorstore
if tei_embedding_endpoint:
# create embeddings using TEI endpoint service
Expand All @@ -146,7 +120,7 @@ def ingest_data_to_redis(doc_path: DocPath):

file_ids = []
for i in range(0, num_chunks, batch_size):
print(f"Current batch: {i}")
print(f"[ ingest chunks ] Current batch: {i}")
batch_chunks = chunks[i : i + batch_size]
batch_texts = batch_chunks

Expand All @@ -156,56 +130,50 @@ def ingest_data_to_redis(doc_path: DocPath):
index_name=INDEX_NAME,
redis_url=REDIS_URL,
)
print(f"keys: {keys}")
print(f"[ ingest chunks ] keys: {keys}")
file_ids.extend(keys)
print(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}")
print(f"[ ingest chunks ] Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}")

# store file_ids into index file-keys
r = redis.Redis(connection_pool=redis_pool)
client = r.ft(KEY_INDEX_NAME)
if not check_index_existance(client):
assert create_index(client)
file_name = doc_path.path.split("/")[-1]
assert store_by_id(client, key=file_name, value="#".join(file_ids))

return True


async def ingest_link_to_redis(link_list: List[str]):
# Create embedding obj
if tei_embedding_endpoint:
# create embeddings using TEI endpoint service
embedder = HuggingFaceHubEmbeddings(model=tei_embedding_endpoint)
else:
# create embeddings using local embedding model
embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)
def ingest_data_to_redis(doc_path: DocPath):
"""Ingest document to Redis."""
path = doc_path.path
print(f"Parsing document {path}.")

# Create redis connection obj
r = redis.Redis(connection_pool=redis_pool)
client = r.ft(KEY_INDEX_NAME)
if path.endswith(".html"):
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
("h3", "Header 3"),
]
text_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
else:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=doc_path.chunk_size,
chunk_overlap=doc_path.chunk_overlap,
add_start_index=True,
separators=get_separators(),
)

# save link contents and doc_ids one by one
for link in link_list:
content = parse_html([link])[0][0]
print(f"[ ingest link ] link: {link} content: {content}")
encoded_link = encode_filename(link)
save_path = upload_folder + encoded_link + ".txt"
print(f"[ ingest link ] save_path: {save_path}")
await save_content_to_local_disk(save_path, content)
content = document_loader(path)

_, keys = Redis.from_texts_return_keys(
texts=content,
embedding=embedder,
index_name=INDEX_NAME,
redis_url=REDIS_URL,
)
print(f"keys: {keys}")
if not check_index_existance(client):
assert create_index(client)
file_name = encoded_link + ".txt"
assert store_by_id(client, key=file_name, value="#".join(keys))
chunks = text_splitter.split_text(content)
if doc_path.process_table and path.endswith(".pdf"):
table_chunks = get_tables_result(path, doc_path.table_strategy)
chunks = chunks + table_chunks
print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf")

return True
file_name = doc_path.path.split("/")[-1]
return ingest_chunks_to_redis(file_name, chunks)


@register_microservice(name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep", host="0.0.0.0", port=6007)
Expand Down Expand Up @@ -270,7 +238,20 @@ async def ingest_documents(
link_list = json.loads(link_list) # Parse JSON string to list
if not isinstance(link_list, list):
raise HTTPException(status_code=400, detail="link_list should be a list.")
await ingest_link_to_redis(link_list)
for link in link_list:
encoded_link = encode_filename(link)
save_path = upload_folder + encoded_link + ".txt"
content = parse_html([link])[0][0]
await save_content_to_local_disk(save_path, content)
ingest_data_to_redis(
DocPath(
path=save_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)
)
print(f"Successfully saved link list {link_list}")
return {"status": 200, "message": "Data preparation succeeded"}
except json.JSONDecodeError:
Expand Down
1 change: 1 addition & 0 deletions comps/dataprep/redis/langchain/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ redis
sentence_transformers
shortuuid
unstructured[all-docs]==0.11.5
uvicorn
106 changes: 69 additions & 37 deletions tests/test_dataprep_redis_langchain.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,61 +28,93 @@ function start_service() {
function validate_microservice() {
cd $LOG_PATH

# test /v1/dataprep
dataprep_service_port=5013
# test /v1/dataprep upload file
URL="http://${ip_address}:$dataprep_service_port/v1/dataprep"
echo "Deep learning is a subset of machine learning that utilizes neural networks with multiple layers to analyze various levels of abstract data representations. It enables computers to identify patterns and make decisions with minimal human intervention by learning from large amounts of data." > $LOG_PATH/dataprep_file.txt
HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST -F 'files=@./dataprep_file.txt' -H 'Content-Type: multipart/form-data' "$URL")
if [ "$HTTP_STATUS" -eq 200 ]; then
echo "[ dataprep ] HTTP status is 200. Checking content..."
cp ./dataprep_file.txt ./dataprep_file2.txt
local CONTENT=$(curl -s -X POST -F 'files=@./dataprep_file2.txt' -H 'Content-Type: multipart/form-data' "$URL" | tee ${LOG_PATH}/dataprep.log)

if echo "$CONTENT" | grep -q "Data preparation succeeded"; then
echo "[ dataprep ] Content is as expected."
else
echo "[ dataprep ] Content does not match the expected result: $CONTENT"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep.log
exit 1
fi
HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F 'files=@./dataprep_file.txt' -H 'Content-Type: multipart/form-data' "$URL")
HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://')
RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g')
SERVICE_NAME="dataprep - upload - file"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_upload_file.log

if [ "$HTTP_STATUS" -ne "200" ]; then
echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS"
exit 1
else
echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..."
fi
if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then
echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY"
exit 1
else
echo "[ $SERVICE_NAME ] Content is as expected."
fi

# test /v1/dataprep upload link
URL="http://${ip_address}:$dataprep_service_port/v1/dataprep"
HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F 'link_list=["https://www.ces.tech/"]' "$URL")
HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://')
RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g')
SERVICE_NAME="dataprep - upload - link"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_upload_link.log

if [ "$HTTP_STATUS" -ne "200" ]; then
echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS"
exit 1
else
echo "[ dataprep ] HTTP status is not 200. Received status was $HTTP_STATUS"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep.log
echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..."
fi
if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then
echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY"
exit 1
else
echo "[ $SERVICE_NAME ] Content is as expected."
fi

# test /v1/dataprep/get_file
dataprep_file_service_port=5016
URL="http://${ip_address}:$dataprep_file_service_port/v1/dataprep/get_file"
HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST -H 'Content-Type: application/json' "$URL")
if [ "$HTTP_STATUS" -eq 200 ]; then
echo "[ dataprep - file ] HTTP status is 200. Checking content..."
local CONTENT=$(curl -s -X POST -H 'Content-Type: application/json' "$URL" | tee ${LOG_PATH}/dataprep_file.log)

if echo "$CONTENT" | grep -q '{"name":'; then
echo "[ dataprep - file ] Content is as expected."
else
echo "[ dataprep - file ] Content does not match the expected result: $CONTENT"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_file.log
exit 1
fi
HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST "$URL")
HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://')
RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g')
SERVICE_NAME="dataprep - get"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_file.log

if [ "$HTTP_STATUS" -ne "200" ]; then
echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS"
exit 1
else
echo "[ dataprep - file ] HTTP status is not 200. Received status was $HTTP_STATUS"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_file.log
echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..."
fi
if [[ "$RESPONSE_BODY" != *'{"name":'* ]]; then
echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY"
exit 1
else
echo "[ $SERVICE_NAME ] Content is as expected."
fi

# test /v1/dataprep/delete_file
dataprep_del_service_port=5020
URL="http://${ip_address}:$dataprep_del_service_port/v1/dataprep/delete_file"
HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST -d '{"file_path": "dataprep_file.txt"}' -H 'Content-Type: application/json' "$URL")
if [ "$HTTP_STATUS" -eq 200 ]; then
echo "[ dataprep - del ] HTTP status is 200."
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_del.log
HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -d '{"file_path": "dataprep_file.txt"}' -H 'Content-Type: application/json' "$URL")
HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://')
RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g')
SERVICE_NAME="dataprep - del"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_del.log

# check response status
if [ "$HTTP_STATUS" -ne "200" ]; then
echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS"
exit 1
else
echo "[ dataprep - del ] HTTP status is not 200. Received status was $HTTP_STATUS"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_del.log
echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..."
fi
# check response body
if [[ "$RESPONSE_BODY" != *'{"status":true}'* ]]; then
echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY"
exit 1
else
echo "[ $SERVICE_NAME ] Content is as expected."
fi
}

Expand Down