Skip to content

Commit

Permalink
Refine Dataprep Code & UT (#404)
Browse files Browse the repository at this point in the history
* refine dataprep upload_link & ut

Signed-off-by: letonghan <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add requirement

Signed-off-by: letonghan <[email protected]>

* modify container name

Signed-off-by: letonghan <[email protected]>

---------

Signed-off-by: letonghan <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
letonghan and pre-commit-ci[bot] authored Aug 5, 2024
1 parent de27e6b commit 867e9d7
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 101 deletions.
109 changes: 45 additions & 64 deletions comps/dataprep/redis/langchain/prepare_doc_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,34 +104,8 @@ def delete_by_id(client, id):
return True


def ingest_data_to_redis(doc_path: DocPath):
"""Ingest document to Redis."""
path = doc_path.path
print(f"Parsing document {path}.")

if path.endswith(".html"):
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
("h3", "Header 3"),
]
text_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
else:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=doc_path.chunk_size,
chunk_overlap=doc_path.chunk_overlap,
add_start_index=True,
separators=get_separators(),
)

content = document_loader(path)

chunks = text_splitter.split_text(content)
if doc_path.process_table and path.endswith(".pdf"):
table_chunks = get_tables_result(path, doc_path.table_strategy)
chunks = chunks + table_chunks
print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf")

def ingest_chunks_to_redis(file_name: str, chunks: List):
print(f"[ ingest chunks ] file name: {file_name}")
# Create vectorstore
if tei_embedding_endpoint:
# create embeddings using TEI endpoint service
Expand All @@ -146,7 +120,7 @@ def ingest_data_to_redis(doc_path: DocPath):

file_ids = []
for i in range(0, num_chunks, batch_size):
print(f"Current batch: {i}")
print(f"[ ingest chunks ] Current batch: {i}")
batch_chunks = chunks[i : i + batch_size]
batch_texts = batch_chunks

Expand All @@ -156,56 +130,50 @@ def ingest_data_to_redis(doc_path: DocPath):
index_name=INDEX_NAME,
redis_url=REDIS_URL,
)
print(f"keys: {keys}")
print(f"[ ingest chunks ] keys: {keys}")
file_ids.extend(keys)
print(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}")
print(f"[ ingest chunks ] Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}")

# store file_ids into index file-keys
r = redis.Redis(connection_pool=redis_pool)
client = r.ft(KEY_INDEX_NAME)
if not check_index_existance(client):
assert create_index(client)
file_name = doc_path.path.split("/")[-1]
assert store_by_id(client, key=file_name, value="#".join(file_ids))

return True


async def ingest_link_to_redis(link_list: List[str]):
# Create embedding obj
if tei_embedding_endpoint:
# create embeddings using TEI endpoint service
embedder = HuggingFaceHubEmbeddings(model=tei_embedding_endpoint)
else:
# create embeddings using local embedding model
embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)
def ingest_data_to_redis(doc_path: DocPath):
"""Ingest document to Redis."""
path = doc_path.path
print(f"Parsing document {path}.")

# Create redis connection obj
r = redis.Redis(connection_pool=redis_pool)
client = r.ft(KEY_INDEX_NAME)
if path.endswith(".html"):
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
("h3", "Header 3"),
]
text_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
else:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=doc_path.chunk_size,
chunk_overlap=doc_path.chunk_overlap,
add_start_index=True,
separators=get_separators(),
)

# save link contents and doc_ids one by one
for link in link_list:
content = parse_html([link])[0][0]
print(f"[ ingest link ] link: {link} content: {content}")
encoded_link = encode_filename(link)
save_path = upload_folder + encoded_link + ".txt"
print(f"[ ingest link ] save_path: {save_path}")
await save_content_to_local_disk(save_path, content)
content = document_loader(path)

_, keys = Redis.from_texts_return_keys(
texts=content,
embedding=embedder,
index_name=INDEX_NAME,
redis_url=REDIS_URL,
)
print(f"keys: {keys}")
if not check_index_existance(client):
assert create_index(client)
file_name = encoded_link + ".txt"
assert store_by_id(client, key=file_name, value="#".join(keys))
chunks = text_splitter.split_text(content)
if doc_path.process_table and path.endswith(".pdf"):
table_chunks = get_tables_result(path, doc_path.table_strategy)
chunks = chunks + table_chunks
print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf")

return True
file_name = doc_path.path.split("/")[-1]
return ingest_chunks_to_redis(file_name, chunks)


@register_microservice(name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep", host="0.0.0.0", port=6007)
Expand Down Expand Up @@ -270,7 +238,20 @@ async def ingest_documents(
link_list = json.loads(link_list) # Parse JSON string to list
if not isinstance(link_list, list):
raise HTTPException(status_code=400, detail="link_list should be a list.")
await ingest_link_to_redis(link_list)
for link in link_list:
encoded_link = encode_filename(link)
save_path = upload_folder + encoded_link + ".txt"
content = parse_html([link])[0][0]
await save_content_to_local_disk(save_path, content)
ingest_data_to_redis(
DocPath(
path=save_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)
)
print(f"Successfully saved link list {link_list}")
return {"status": 200, "message": "Data preparation succeeded"}
except json.JSONDecodeError:
Expand Down
1 change: 1 addition & 0 deletions comps/dataprep/redis/langchain/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ redis
sentence_transformers
shortuuid
unstructured[all-docs]==0.11.5
uvicorn
106 changes: 69 additions & 37 deletions tests/test_dataprep_redis_langchain.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,61 +28,93 @@ function start_service() {
function validate_microservice() {
cd $LOG_PATH

# test /v1/dataprep
dataprep_service_port=5013
# test /v1/dataprep upload file
URL="http://${ip_address}:$dataprep_service_port/v1/dataprep"
echo "Deep learning is a subset of machine learning that utilizes neural networks with multiple layers to analyze various levels of abstract data representations. It enables computers to identify patterns and make decisions with minimal human intervention by learning from large amounts of data." > $LOG_PATH/dataprep_file.txt
HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST -F 'files=@./dataprep_file.txt' -H 'Content-Type: multipart/form-data' "$URL")
if [ "$HTTP_STATUS" -eq 200 ]; then
echo "[ dataprep ] HTTP status is 200. Checking content..."
cp ./dataprep_file.txt ./dataprep_file2.txt
local CONTENT=$(curl -s -X POST -F 'files=@./dataprep_file2.txt' -H 'Content-Type: multipart/form-data' "$URL" | tee ${LOG_PATH}/dataprep.log)

if echo "$CONTENT" | grep -q "Data preparation succeeded"; then
echo "[ dataprep ] Content is as expected."
else
echo "[ dataprep ] Content does not match the expected result: $CONTENT"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep.log
exit 1
fi
HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F 'files=@./dataprep_file.txt' -H 'Content-Type: multipart/form-data' "$URL")
HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://')
RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g')
SERVICE_NAME="dataprep - upload - file"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_upload_file.log

if [ "$HTTP_STATUS" -ne "200" ]; then
echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS"
exit 1
else
echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..."
fi
if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then
echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY"
exit 1
else
echo "[ $SERVICE_NAME ] Content is as expected."
fi

# test /v1/dataprep upload link
URL="http://${ip_address}:$dataprep_service_port/v1/dataprep"
HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F 'link_list=["https://www.ces.tech/"]' "$URL")
HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://')
RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g')
SERVICE_NAME="dataprep - upload - link"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_upload_link.log

if [ "$HTTP_STATUS" -ne "200" ]; then
echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS"
exit 1
else
echo "[ dataprep ] HTTP status is not 200. Received status was $HTTP_STATUS"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep.log
echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..."
fi
if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then
echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY"
exit 1
else
echo "[ $SERVICE_NAME ] Content is as expected."
fi

# test /v1/dataprep/get_file
dataprep_file_service_port=5016
URL="http://${ip_address}:$dataprep_file_service_port/v1/dataprep/get_file"
HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST -H 'Content-Type: application/json' "$URL")
if [ "$HTTP_STATUS" -eq 200 ]; then
echo "[ dataprep - file ] HTTP status is 200. Checking content..."
local CONTENT=$(curl -s -X POST -H 'Content-Type: application/json' "$URL" | tee ${LOG_PATH}/dataprep_file.log)

if echo "$CONTENT" | grep -q '{"name":'; then
echo "[ dataprep - file ] Content is as expected."
else
echo "[ dataprep - file ] Content does not match the expected result: $CONTENT"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_file.log
exit 1
fi
HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST "$URL")
HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://')
RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g')
SERVICE_NAME="dataprep - get"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_file.log

if [ "$HTTP_STATUS" -ne "200" ]; then
echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS"
exit 1
else
echo "[ dataprep - file ] HTTP status is not 200. Received status was $HTTP_STATUS"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_file.log
echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..."
fi
if [[ "$RESPONSE_BODY" != *'{"name":'* ]]; then
echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY"
exit 1
else
echo "[ $SERVICE_NAME ] Content is as expected."
fi

# test /v1/dataprep/delete_file
dataprep_del_service_port=5020
URL="http://${ip_address}:$dataprep_del_service_port/v1/dataprep/delete_file"
HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST -d '{"file_path": "dataprep_file.txt"}' -H 'Content-Type: application/json' "$URL")
if [ "$HTTP_STATUS" -eq 200 ]; then
echo "[ dataprep - del ] HTTP status is 200."
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_del.log
HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -d '{"file_path": "dataprep_file.txt"}' -H 'Content-Type: application/json' "$URL")
HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://')
RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g')
SERVICE_NAME="dataprep - del"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_del.log

# check response status
if [ "$HTTP_STATUS" -ne "200" ]; then
echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS"
exit 1
else
echo "[ dataprep - del ] HTTP status is not 200. Received status was $HTTP_STATUS"
docker logs test-comps-dataprep-redis-langchain-server >> ${LOG_PATH}/dataprep_del.log
echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..."
fi
# check response body
if [[ "$RESPONSE_BODY" != *'{"status":true}'* ]]; then
echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY"
exit 1
else
echo "[ $SERVICE_NAME ] Content is as expected."
fi
}

Expand Down

0 comments on commit 867e9d7

Please sign in to comment.