Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Stop parsing #1556

Merged
merged 6 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 129 additions & 47 deletions api/apps/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pathlib
import re
import warnings
from functools import partial
from io import BytesIO

from elasticsearch_dsl import Q
Expand All @@ -26,13 +27,12 @@
from api.contants import NAME_LENGTH_LIMIT
from api.db import FileType, ParserType, FileSource, TaskStatus
from api.db import StatusEnum
from api.db.db_models import File, Task
from api.db.db_models import File
from api.db.services import duplicate_name
from api.db.services.document_service import DocumentService
from api.db.services.file2document_service import File2DocumentService
from api.db.services.file_service import FileService
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.services.task_service import TaskService
from api.db.services.user_service import TenantService
from api.settings import RetCode
from api.utils import get_uuid
Expand Down Expand Up @@ -233,9 +233,10 @@ def update_dataset(dataset_id):
if chunk_num == 0:
dataset_updating_data["embd_id"] = req["embedding_model_id"]
else:
return construct_json_result(code=RetCode.DATA_ERROR, message="You have already parsed the document in this "
"dataset, so you cannot change the embedding "
"model.")
return construct_json_result(code=RetCode.DATA_ERROR,
message="You have already parsed the document in this "
"dataset, so you cannot change the embedding "
"model.")
# only if chunk_num is 0, the user can update the chunk_method
if "chunk_method" in req:
type_value = req["chunk_method"]
Expand Down Expand Up @@ -614,35 +615,39 @@ def download_document(dataset_id, document_id):

# ----------------------------start parsing a document-----------------------------------------------------
# helper method for parsing
def dummy(prog=None, msg=""):
pass
# callback method
def doc_parse_callback(doc_id, prog=None, msg=""):
cancel = DocumentService.do_cancel(doc_id)
if cancel:
raise Exception("The parsing process has been cancelled!")


def doc_parse(binary, doc_name, parser_name, tenant_id):
def doc_parse(binary, doc_name, parser_name, tenant_id, doc_id):
match parser_name:
case "book":
book.chunk(doc_name, binary=binary, callback=dummy)
book.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id))
case "laws":
laws.chunk(doc_name, binary=binary, callback=dummy)
laws.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id))
case "manual":
manual.chunk(doc_name, binary=binary, callback=dummy)
manual.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id))
case "naive":
# It's the mode by default, which is general in the front-end
naive.chunk(doc_name, binary=binary, callback=dummy)
naive.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id))
case "one":
one.chunk(doc_name, binary=binary, callback=dummy)
one.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id))
case "paper":
paper.chunk(doc_name, binary=binary, callback=dummy)
paper.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id))
case "picture":
picture.chunk(doc_name, binary=binary, tenant_id=tenant_id, lang="Chinese", callback=dummy)
picture.chunk(doc_name, binary=binary, tenant_id=tenant_id, lang="Chinese",
callback=partial(doc_parse_callback, doc_id))
case "presentation":
presentation.chunk(doc_name, binary=binary, callback=dummy)
presentation.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id))
case "qa":
qa.chunk(doc_name, binary=binary, callback=dummy)
qa.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id))
case "resume":
resume.chunk(doc_name, binary=binary, callback=dummy)
resume.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id))
case "table":
table.chunk(doc_name, binary=binary, callback=dummy)
table.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id))
case _:
return False

Expand All @@ -658,13 +663,8 @@ def parse_document(dataset_id, document_id):
if not exist:
return construct_json_result(code=RetCode.DATA_ERROR,
message=f"This dataset '{dataset_id}' cannot be found!")
message = ""
res = get_message_during_parsing_document(document_id, message)
if isinstance(res, str):
message += res
return construct_json_result(code=RetCode.SUCCESS, message=message)
else:
return res

return parsing_document_internal(document_id)

except Exception as e:
return construct_error_response(e)
Expand All @@ -680,34 +680,31 @@ def parse_documents(dataset_id):
if not exist:
return construct_json_result(code=RetCode.DATA_ERROR,
message=f"This dataset '{dataset_id}' cannot be found!")

def process(doc_ids):
message = ""
# for loop
for id in doc_ids:
res = get_message_during_parsing_document(id, message)
if isinstance(res, str):
message += res
else:
return res
return construct_json_result(data=True, code=RetCode.SUCCESS, message=message)

# two conditions
if doc_ids:
return process(doc_ids)
else:
if not doc_ids:
# documents inside the dataset
docs, total = DocumentService.list_documents_in_dataset(dataset_id, 0, -1, "create_time",
True, "")
doc_ids = [doc["id"] for doc in docs]
return process(doc_ids)

message = ""
# for loop
for id in doc_ids:
res = parsing_document_internal(id)
res_body = res.json
if res_body["code"] == RetCode.SUCCESS:
message += res_body["message"]
else:
return res
return construct_json_result(data=True, code=RetCode.SUCCESS, message=message)

except Exception as e:
return construct_error_response(e)


# helper method for getting message or response when parsing the document
def get_message_during_parsing_document(id, message):
# helper method for parsing the document
def parsing_document_internal(id):
message = ""
try:
# Check whether there is this document
exist, document = DocumentService.get_by_id(id)
Expand Down Expand Up @@ -736,18 +733,102 @@ def get_message_during_parsing_document(id, message):
binary = MINIO.get(bucket, doc_name)
parser_name = doc_attributes["parser_id"]
if binary:
res = doc_parse(binary, doc_name, parser_name, tenant_id)
res = doc_parse(binary, doc_name, parser_name, tenant_id, doc_id)
if res is False:
message += f"The parser id: {parser_name} of the document {doc_id} is not supported; "
else:
message += f"Empty data in the document: {doc_name}; "
# failed in parsing
if doc_attributes["status"] == TaskStatus.FAIL.value:
message += f"Failed in parsing the document: {doc_id}; "
return message
return construct_json_result(code=RetCode.SUCCESS, message=message)
except Exception as e:
return construct_error_response(e)


# ----------------------------stop parsing a doc-----------------------------------------------------
@manager.route("<dataset_id>/documents/<document_id>/status", methods=["DELETE"])
@login_required
def stop_parsing_document(dataset_id, document_id):
try:
# valid dataset
exist, _ = KnowledgebaseService.get_by_id(dataset_id)
if not exist:
return construct_json_result(code=RetCode.DATA_ERROR,
message=f"This dataset '{dataset_id}' cannot be found!")

return stop_parsing_document_internal(document_id)

except Exception as e:
return construct_error_response(e)


# ----------------------------stop parsing docs-----------------------------------------------------
@manager.route("<dataset_id>/documents/status", methods=["DELETE"])
@login_required
def stop_parsing_documents(dataset_id):
doc_ids = request.json["doc_ids"]
try:
# valid dataset?
exist, _ = KnowledgebaseService.get_by_id(dataset_id)
if not exist:
return construct_json_result(code=RetCode.DATA_ERROR,
message=f"This dataset '{dataset_id}' cannot be found!")
if not doc_ids:
# documents inside the dataset
docs, total = DocumentService.list_documents_in_dataset(dataset_id, 0, -1, "create_time",
True, "")
doc_ids = [doc["id"] for doc in docs]

message = ""
# for loop
for id in doc_ids:
res = stop_parsing_document_internal(id)
res_body = res.json
if res_body["code"] == RetCode.SUCCESS:
message += res_body["message"]
else:
return res
return construct_json_result(data=True, code=RetCode.SUCCESS, message=message)

except Exception as e:
return construct_error_response(e)


# Helper method
def stop_parsing_document_internal(document_id):
try:
# valid doc?
exist, doc = DocumentService.get_by_id(document_id)
if not exist:
return construct_json_result(message=f"This document '{document_id}' cannot be found!",
code=RetCode.ARGUMENT_ERROR)
doc_attributes = doc.to_dict()

# only when the status is parsing, we need to stop it
if doc_attributes["status"] == TaskStatus.RUNNING.value:
cecilia-uu marked this conversation as resolved.
Show resolved Hide resolved
tenant_id = DocumentService.get_tenant_id(document_id)
if not tenant_id:
return construct_json_result(message="Tenant not found!", code=RetCode.AUTHENTICATION_ERROR)

# update successfully?
if not DocumentService.update_by_id(document_id, {"status": "2"}): # cancel
return construct_json_result(
code=RetCode.OPERATING_ERROR,
message="There was an error during the stopping parsing the document process. "
"Please check the status of the RAGFlow server and try the update again."
)

_, doc_attributes = DocumentService.get_by_id(document_id)
doc_attributes = doc_attributes.to_dict()

# failed in stop parsing
if doc_attributes["status"] == TaskStatus.RUNNING.value:
return construct_json_result(message=f"Failed in parsing the document: {document_id}; ", code=RetCode.SUCCESS)
return construct_json_result(code=RetCode.SUCCESS, message="")
except Exception as e:
return construct_error_response(e)
# ----------------------------stop parsing-----------------------------------------------------


# ----------------------------show the status of the file-----------------------------------------------------
@manager.route("/<dataset_id>/documents/<document_id>/status", methods=["GET"])
Expand All @@ -774,6 +855,7 @@ def show_parsing_status(dataset_id, document_id):
)
except Exception as e:
return construct_error_response(e)

# ----------------------------list the chunks of the file-----------------------------------------------------

# -- --------------------------delete the chunk-----------------------------------------------------
Expand Down
13 changes: 12 additions & 1 deletion api/db/services/document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,17 @@ def get_kb_doc_count(cls, kb_id):
cls.model.kb_id == kb_id).dicts())


@classmethod
@DB.connection_context()
def do_cancel(cls, doc_id):
try:
_, doc = DocumentService.get_by_id(doc_id)
return doc.run == TaskStatus.CANCEL.value or doc.progress < 0
except Exception as e:
pass
return False


def queue_raptor_tasks(doc):
def new_task():
nonlocal doc
Expand All @@ -347,4 +358,4 @@ def new_task():
task = new_task()
bulk_insert_into_db(Task, [task], True)
task["type"] = "raptor"
assert REDIS_CONN.queue_product(SVR_QUEUE_NAME, message=task), "Can't access Redis. Please check the Redis' status."
assert REDIS_CONN.queue_product(SVR_QUEUE_NAME, message=task), "Can't access Redis. Please check the Redis' status."
2 changes: 1 addition & 1 deletion docs/references/ragflow_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ This method enables a specific document to start parsing for a specific user.
```json
{
"code": 102,
"message": "This dataset 'imagination.txt' cannot be found!"
"message": "This dataset 'imagination' cannot be found!"
}
```

Expand Down
11 changes: 11 additions & 0 deletions sdk/python/ragflow/ragflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ def start_parsing_documents(self, dataset_id, doc_ids=None):
return res.json()

# ----------------------------stop parsing-----------------------------------------------------
def stop_parsing_document(self, dataset_id, document_id):
endpoint = f"{self.dataset_url}/{dataset_id}/documents/{document_id}/status"
res = requests.delete(endpoint, headers=self.authorization_header)

return res.json()

def stop_parsing_documents(self, dataset_id, doc_ids=None):
endpoint = f"{self.dataset_url}/{dataset_id}/documents/status"
res = requests.delete(endpoint, headers=self.authorization_header, json={"doc_ids": doc_ids})

return res.json()

# ----------------------------show the status of the file-----------------------------------------------------
def show_parsing_status(self, dataset_id, document_id):
Expand Down
Loading