Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update document sdk #2445

Merged
merged 17 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
587 changes: 556 additions & 31 deletions api/apps/sdk/doc.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion sdk/python/ragflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
from .modules.dataset import DataSet
from .modules.assistant import Assistant
from .modules.session import Session
from .modules.document import Document
from .modules.document import Document
from .modules.chunk import Chunk
49 changes: 49 additions & 0 deletions sdk/python/ragflow/modules/chunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from .base import Base


class Chunk(Base):
def __init__(self, rag, res_dict):
self.id = ""
self.content = ""
self.important_keywords = []
self.create_time = ""
self.create_timestamp_flt = 0.0
self.knowledgebase_id = None
self.document_name = ""
self.document_id = ""
self.status = "1"
for k in list(res_dict.keys()):
if k not in self.__dict__:
res_dict.pop(k)
super().__init__(rag, res_dict)

def delete(self) -> bool:
"""
Delete the chunk in the document.
"""
res = self.post('/doc/chunk/rm',
{"doc_id": self.document_id, 'chunk_ids': [self.id]})
res = res.json()
if res.get("retmsg") == "success":
return True
raise Exception(res["retmsg"])

def save(self) -> bool:
"""
Save the document details to the server.
"""
res = self.post('/doc/chunk/set',
{"chunk_id": self.id,
"kb_id": self.knowledgebase_id,
"name": self.document_name,
"content_with_weight": self.content,
"important_kwd": self.important_keywords,
"create_time": self.create_time,
"create_timestamp_flt": self.create_timestamp_flt,
"doc_id": self.document_id,
"status": self.status,
})
res = res.json()
if res.get("retmsg") == "success":
return True
raise Exception(res["retmsg"])
139 changes: 132 additions & 7 deletions sdk/python/ragflow/modules/document.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
import time

from .base import Base

from .chunk import Chunk


class Document(Base):
def __init__(self, rag, res_dict):
self.id = ""
self.name = ""
self.thumbnail = None
self.kb_id = None
self.knowledgebase_id = None
self.parser_method = ""
self.parser_config = {"pages": [[1, 1000000]]}
self.source_type = "local"
self.type = ""
self.created_by = ""
self.size = 0
self.token_num = 0
self.chunk_num = 0
self.token_count = 0
self.chunk_count = 0
self.progress = 0.0
self.progress_msg = ""
self.process_begin_at = None
self.process_duration = 0.0
self.run = "0"
self.status = "1"
for k in list(res_dict.keys()):
if k not in self.__dict__:
res_dict.pop(k)
Expand All @@ -31,10 +34,10 @@ def save(self) -> bool:
Save the document details to the server.
"""
res = self.post('/doc/save',
{"id": self.id, "name": self.name, "thumbnail": self.thumbnail, "kb_id": self.kb_id,
{"id": self.id, "name": self.name, "thumbnail": self.thumbnail, "kb_id": self.knowledgebase_id,
"parser_id": self.parser_method, "parser_config": self.parser_config.to_json(),
"source_type": self.source_type, "type": self.type, "created_by": self.created_by,
"size": self.size, "token_num": self.token_num, "chunk_num": self.chunk_num,
"size": self.size, "token_num": self.token_count, "chunk_num": self.chunk_count,
"progress": self.progress, "progress_msg": self.progress_msg,
"process_begin_at": self.process_begin_at, "process_duation": self.process_duration
})
Expand All @@ -61,7 +64,7 @@ def download(self) -> bytes:
:return: The downloaded document content in bytes.
"""
# Construct the URL for the API request using the document ID and knowledge base ID
res = self.get(f"/doc/{self.kb_id}/documents/{self.id}",
res = self.get(f"/doc/{self.id}",
{"headers": self.rag.authorization_header, "id": self.id, "name": self.name, "stream": True})

# Check the response status code to ensure the request was successful
Expand All @@ -73,3 +76,125 @@ def download(self) -> bytes:
raise Exception(
f"Failed to download document. Server responded with: {res.status_code}, {res.text}"
)

def async_parse(self):
"""
Initiate document parsing asynchronously without waiting for completion.
"""
try:
# Construct request data including document ID and run status (assuming 1 means to run)
data = {"doc_ids": [self.id], "run": 1}

# Send a POST request to the specified parsing status endpoint to start parsing
res = self.post(f'/doc/run', data)

# Check the server response status code
if res.status_code != 200:
raise Exception(f"Failed to start async parsing: {res.text}")

print("Async parsing started successfully.")

except Exception as e:
# Catch and handle exceptions
print(f"Error occurred during async parsing: {str(e)}")
raise

import time

def join(self, interval=5, timeout=3600):
"""
Wait for the asynchronous parsing to complete and yield parsing progress periodically.

:param interval: The time interval (in seconds) for progress reports.
:param timeout: The timeout (in seconds) for the parsing operation.
:return: An iterator yielding parsing progress and messages.
"""
start_time = time.time()
while time.time() - start_time < timeout:
# Check the parsing status
res = self.get(f'/doc/{self.id}/status', {"doc_ids": [self.id]})
res_data = res.json()
data = res_data.get("data", [])

# Retrieve progress and status message
progress = data.get("progress", 0)
progress_msg = data.get("status", "")

yield progress, progress_msg # Yield progress and message

if progress == 100: # Parsing completed
break

time.sleep(interval)

def cancel(self):
"""
Cancel the parsing task for the document.
"""
try:
# Construct request data, including document ID and action to cancel (assuming 2 means cancel)
data = {"doc_ids": [self.id], "run": 2}

# Send a POST request to the specified parsing status endpoint to cancel parsing
res = self.post(f'/doc/run', data)

# Check the server response status code
if res.status_code != 200:
print("Failed to cancel parsing. Server response:", res.text)
else:
print("Parsing cancelled successfully.")

except Exception as e:
print(f"Error occurred during async parsing cancellation: {str(e)}")
raise

def list_chunks(self, page=1, offset=0, limit=12,size=30, keywords="", available_int=None):
"""
List all chunks associated with this document by calling the external API.

Args:
page (int): The page number to retrieve (default 1).
size (int): The number of chunks per page (default 30).
keywords (str): Keywords for searching specific chunks (default "").
available_int (int): Filter for available chunks (optional).

Returns:
list: A list of chunks returned from the API.
"""
data = {
"doc_id": self.id,
"page": page,
"size": size,
"keywords": keywords,
"offset":offset,
"limit":limit
}

if available_int is not None:
data["available_int"] = available_int

res = self.post(f'/doc/chunk/list', data)
if res.status_code == 200:
res_data = res.json()
if res_data.get("retmsg") == "success":
chunks=[]
for chunk_data in res_data["data"].get("chunks", []):
chunk=Chunk(self.rag,chunk_data)
chunks.append(chunk)
# chunks = res_data["data"]["chunks"]
# self.chunks = chunks # Store the chunks in the document instance
return chunks
else:
raise Exception(f"Error fetching chunks: {res_data.get('retmsg')}")
else:
raise Exception(f"API request failed with status code {res.status_code}")

def add_chunk(self, content: str):
res = self.post('/doc/chunk/create', {"doc_id": self.id, "content_with_weight":content})

if res.status_code == 200:
res_data = res.json().get("data")
chunk_data = res_data.get("chunk")
return Chunk(self.rag,chunk_data)
else:
raise Exception(f"Failed to add chunk: {res.status_code} {res.text}")
107 changes: 106 additions & 1 deletion sdk/python/ragflow/ragflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from .modules.assistant import Assistant
from .modules.dataset import DataSet
from .modules.document import Document
from .modules.chunk import Chunk


class RAGFlow:
def __init__(self, user_key, base_url, version='v1'):
Expand Down Expand Up @@ -143,7 +145,7 @@ def list_assistants(self) -> List[Assistant]:
return result_list
raise Exception(res["retmsg"])

def create_document(self, ds:DataSet, name: str, blob: bytes) -> bool:
def create_document(self, ds: DataSet, name: str, blob: bytes) -> bool:
url = f"/doc/dataset/{ds.id}/documents/upload"
files = {
'file': (name, blob)
Expand All @@ -164,10 +166,113 @@ def create_document(self, ds:DataSet, name: str, blob: bytes) -> bool:
raise Exception(f"Upload failed: {response.json().get('retmsg')}")

return False

def get_document(self, id: str = None, name: str = None) -> Document:
res = self.get("/doc/infos", {"id": id, "name": name})
res = res.json()
if res.get("retmsg") == "success":
return Document(self, res['data'])
raise Exception(res["retmsg"])

def async_parse_documents(self, doc_ids):
"""
Asynchronously start parsing multiple documents without waiting for completion.

:param doc_ids: A list containing multiple document IDs.
"""
try:
if not doc_ids or not isinstance(doc_ids, list):
raise ValueError("doc_ids must be a non-empty list of document IDs")

data = {"doc_ids": doc_ids, "run": 1}

res = self.post(f'/doc/run', data)

if res.status_code != 200:
raise Exception(f"Failed to start async parsing for documents: {res.text}")

print(f"Async parsing started successfully for documents: {doc_ids}")

except Exception as e:
print(f"Error occurred during async parsing for documents: {str(e)}")
raise

def async_cancel_parse_documents(self, doc_ids):
"""
Cancel the asynchronous parsing of multiple documents.

:param doc_ids: A list containing multiple document IDs.
"""
try:
if not doc_ids or not isinstance(doc_ids, list):
raise ValueError("doc_ids must be a non-empty list of document IDs")
data = {"doc_ids": doc_ids, "run": 2}

res = self.post(f'/doc/run', data)

if res.status_code != 200:
raise Exception(f"Failed to cancel async parsing for documents: {res.text}")

print(f"Async parsing canceled successfully for documents: {doc_ids}")

except Exception as e:
print(f"Error occurred during canceling parsing for documents: {str(e)}")
raise

def retrieval(self,
question,
datasets=None,
documents=None,
offset=0,
limit=6,
similarity_threshold=0.1,
vector_similarity_weight=0.3,
top_k=1024):
"""
Perform document retrieval based on the given parameters.

:param question: The query question.
:param datasets: A list of datasets (optional, as documents may be provided directly).
:param documents: A list of documents (if specific documents are provided).
:param offset: Offset for the retrieval results.
:param limit: Maximum number of retrieval results.
:param similarity_threshold: Similarity threshold.
:param vector_similarity_weight: Weight of vector similarity.
:param top_k: Number of top most similar documents to consider (for pre-filtering or ranking).

Note: This is a hypothetical implementation and may need adjustments based on the actual backend service API.
"""
try:
data = {
"question": question,
"datasets": datasets if datasets is not None else [],
"documents": [doc.id if hasattr(doc, 'id') else doc for doc in
documents] if documents is not None else [],
"offset": offset,
"limit": limit,
"similarity_threshold": similarity_threshold,
"vector_similarity_weight": vector_similarity_weight,
"top_k": top_k,
"kb_id": datasets,
}

# Send a POST request to the backend service (using requests library as an example, actual implementation may vary)
res = self.post(f'/doc/retrieval_test', data)

# Check the response status code
if res.status_code == 200:
res_data = res.json()
if res_data.get("retmsg") == "success":
chunks = []
for chunk_data in res_data["data"].get("chunks", []):
chunk = Chunk(self, chunk_data)
chunks.append(chunk)
return chunks
else:
raise Exception(f"Error fetching chunks: {res_data.get('retmsg')}")
else:
raise Exception(f"API request failed with status code {res.status_code}")

except Exception as e:
print(f"An error occurred during retrieval: {e}")
raise
Loading