Skip to content

Commit

Permalink
Storage: Support the s3, azure blob as the object storage of ragflow.
Browse files Browse the repository at this point in the history
Common: Create the s3 and azure logger.

Dev: Clean code.

Dev: Clean code.
  • Loading branch information
baifachuan committed Sep 5, 2024
1 parent fd3e55c commit 7fb19d1
Show file tree
Hide file tree
Showing 16 changed files with 375 additions and 37 deletions.
10 changes: 5 additions & 5 deletions api/apps/api_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

from api.utils.file_utils import filename_type, thumbnail
from rag.nlp import keyword_extraction
from rag.utils.minio_conn import MINIO
from rag.utils.storage_factory import STORAGE_IMPL

from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService
from agent.canvas import Canvas
Expand Down Expand Up @@ -427,10 +427,10 @@ def upload():
retmsg="This type of file has not been supported yet!")

location = filename
while MINIO.obj_exist(kb_id, location):
while STORAGE_IMPL.obj_exist(kb_id, location):
location += "_"
blob = request.files['file'].read()
MINIO.put(kb_id, location, blob)
STORAGE_IMPL.put(kb_id, location, blob)
doc = {
"id": get_uuid(),
"kb_id": kb.id,
Expand Down Expand Up @@ -650,7 +650,7 @@ def document_rm():
FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id])
File2DocumentService.delete_by_document_id(doc_id)

MINIO.rm(b, n)
STORAGE_IMPL.rm(b, n)
except Exception as e:
errors += str(e)

Expand Down Expand Up @@ -723,7 +723,7 @@ def fillin_conv(ans):
if ans["reference"]["chunks"][chunk_idx]["img_id"]:
try:
bkt, nm = ans["reference"]["chunks"][chunk_idx]["img_id"].split("-")
response = MINIO.get(bkt, nm)
response = STORAGE_IMPL.get(bkt, nm)
data_type_picture["url"] = base64.b64encode(response).decode('utf-8')
data.append(data_type_picture)
break
Expand Down
12 changes: 6 additions & 6 deletions api/apps/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from rag.app import book, laws, manual, naive, one, paper, presentation, qa, resume, table, picture, audio, email
from rag.nlp import search
from rag.utils.es_conn import ELASTICSEARCH
from rag.utils.minio_conn import MINIO
from rag.utils.storage_factory import STORAGE_IMPL

MAXIMUM_OF_UPLOADING_FILES = 256

Expand Down Expand Up @@ -352,7 +352,7 @@ def upload_documents(dataset_id):

# upload to the minio
location = filename
while MINIO.obj_exist(dataset_id, location):
while STORAGE_IMPL.obj_exist(dataset_id, location):
location += "_"

blob = file.read()
Expand All @@ -361,7 +361,7 @@ def upload_documents(dataset_id):
if blob == b'':
warnings.warn(f"[WARNING]: The content of the file {filename} is empty.")

MINIO.put(dataset_id, location, blob)
STORAGE_IMPL.put(dataset_id, location, blob)

doc = {
"id": get_uuid(),
Expand Down Expand Up @@ -441,7 +441,7 @@ def delete_document(document_id, dataset_id): # string
File2DocumentService.delete_by_document_id(document_id)

# delete it from minio
MINIO.rm(dataset_id, location)
STORAGE_IMPL.rm(dataset_id, location)
except Exception as e:
errors += str(e)
if errors:
Expand Down Expand Up @@ -596,7 +596,7 @@ def download_document(dataset_id, document_id):

# The process of downloading
doc_id, doc_location = File2DocumentService.get_minio_address(doc_id=document_id) # minio address
file_stream = MINIO.get(doc_id, doc_location)
file_stream = STORAGE_IMPL.get(doc_id, doc_location)
if not file_stream:
return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR)

Expand Down Expand Up @@ -737,7 +737,7 @@ def parsing_document_internal(id):
doc_id = doc_attributes["id"]

bucket, doc_name = File2DocumentService.get_minio_address(doc_id=doc_id)
binary = MINIO.get(bucket, doc_name)
binary = STORAGE_IMPL.get(bucket, doc_name)
parser_name = doc_attributes["parser_id"]
if binary:
res = doc_parse(binary, doc_name, parser_name, tenant_id, doc_id)
Expand Down
12 changes: 6 additions & 6 deletions api/apps/document_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from api.db.services.document_service import DocumentService, doc_upload_and_parse
from api.settings import RetCode, stat_logger
from api.utils.api_utils import get_json_result
from rag.utils.minio_conn import MINIO
from rag.utils.storage_factory import STORAGE_IMPL
from api.utils.file_utils import filename_type, thumbnail, get_project_base_directory
from api.utils.web_utils import html2pdf, is_valid_url

Expand Down Expand Up @@ -118,9 +118,9 @@ def web_crawl():
raise RuntimeError("This type of file has not been supported yet!")

location = filename
while MINIO.obj_exist(kb_id, location):
while STORAGE_IMPL.obj_exist(kb_id, location):
location += "_"
MINIO.put(kb_id, location, blob)
STORAGE_IMPL.put(kb_id, location, blob)
doc = {
"id": get_uuid(),
"kb_id": kb.id,
Expand Down Expand Up @@ -307,7 +307,7 @@ def rm():
FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id])
File2DocumentService.delete_by_document_id(doc_id)

MINIO.rm(b, n)
STORAGE_IMPL.rm(b, n)
except Exception as e:
errors += str(e)

Expand Down Expand Up @@ -394,7 +394,7 @@ def get(doc_id):
return get_data_error_result(retmsg="Document not found!")

b, n = File2DocumentService.get_minio_address(doc_id=doc_id)
response = flask.make_response(MINIO.get(b, n))
response = flask.make_response(STORAGE_IMPL.get(b, n))

ext = re.search(r"\.([^.]+)$", doc.name)
if ext:
Expand Down Expand Up @@ -458,7 +458,7 @@ def change_parser():
def get_image(image_id):
try:
bkt, nm = image_id.split("-")
response = flask.make_response(MINIO.get(bkt, nm))
response = flask.make_response(STORAGE_IMPL.get(bkt, nm))
response.headers.set('Content-Type', 'image/JPEG')
return response
except Exception as e:
Expand Down
8 changes: 4 additions & 4 deletions api/apps/file_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from api.utils.file_utils import filename_type
from rag.nlp import search
from rag.utils.es_conn import ELASTICSEARCH
from rag.utils.minio_conn import MINIO
from rag.utils.storage_factory import STORAGE_IMPL


@manager.route('/upload', methods=['POST'])
Expand Down Expand Up @@ -98,7 +98,7 @@ def upload():
# file type
filetype = filename_type(file_obj_names[file_len - 1])
location = file_obj_names[file_len - 1]
while MINIO.obj_exist(last_folder.id, location):
while STORAGE_IMPL.obj_exist(last_folder.id, location):
location += "_"
blob = file_obj.read()
filename = duplicate_name(
Expand Down Expand Up @@ -260,7 +260,7 @@ def rm():
e, file = FileService.get_by_id(inner_file_id)
if not e:
return get_data_error_result(retmsg="File not found!")
MINIO.rm(file.parent_id, file.location)
STORAGE_IMPL.rm(file.parent_id, file.location)
FileService.delete_folder_by_pf_id(current_user.id, file_id)
else:
if not FileService.delete(file):
Expand Down Expand Up @@ -333,7 +333,7 @@ def get(file_id):
if not e:
return get_data_error_result(retmsg="Document not found!")
b, n = File2DocumentService.get_minio_address(file_id=file_id)
response = flask.make_response(MINIO.get(b, n))
response = flask.make_response(STORAGE_IMPL.get(b, n))
ext = re.search(r"\.([^.]+)$", file.name)
if ext:
if file.type == FileType.VISUAL.value:
Expand Down
4 changes: 2 additions & 2 deletions api/apps/system_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from api.versions import get_rag_version
from rag.settings import SVR_QUEUE_NAME
from rag.utils.es_conn import ELASTICSEARCH
from rag.utils.minio_conn import MINIO
from rag.utils.storage_factory import STORAGE_IMPL
from timeit import default_timer as timer

from rag.utils.redis_conn import REDIS_CONN
Expand All @@ -47,7 +47,7 @@ def status():

st = timer()
try:
MINIO.health()
STORAGE_IMPL.health()
res["minio"] = {"status": "green", "elapsed": "{:.1f}".format((timer() - st)*1000.)}
except Exception as e:
res["minio"] = {"status": "red", "elapsed": "{:.1f}".format((timer() - st)*1000.), "error": str(e)}
Expand Down
4 changes: 2 additions & 2 deletions api/db/services/document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from graphrag.mind_map_extractor import MindMapExtractor
from rag.settings import SVR_QUEUE_NAME
from rag.utils.es_conn import ELASTICSEARCH
from rag.utils.minio_conn import MINIO
from rag.utils.storage_factory import STORAGE_IMPL
from rag.nlp import search, rag_tokenizer

from api.db import FileType, TaskStatus, ParserType, LLMType
Expand Down Expand Up @@ -473,7 +473,7 @@ def dummy(prog=None, msg=""):
else:
d["image"].save(output_buffer, format='JPEG')

MINIO.put(kb.id, d["_id"], output_buffer.getvalue())
STORAGE_IMPL.put(kb.id, d["_id"], output_buffer.getvalue())
d["img_id"] = "{}-{}".format(kb.id, d["_id"])
del d["image"]
docs.append(d)
Expand Down
6 changes: 3 additions & 3 deletions api/db/services/file_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from api.db.services.file2document_service import File2DocumentService
from api.utils import get_uuid
from api.utils.file_utils import filename_type, thumbnail
from rag.utils.minio_conn import MINIO
from rag.utils.storage_factory import STORAGE_IMPL


class FileService(CommonService):
Expand Down Expand Up @@ -350,10 +350,10 @@ def upload_document(self, kb, file_objs, user_id):
raise RuntimeError("This type of file has not been supported yet!")

location = filename
while MINIO.obj_exist(kb.id, location):
while STORAGE_IMPL.obj_exist(kb.id, location):
location += "_"
blob = file.read()
MINIO.put(kb.id, location, blob)
STORAGE_IMPL.put(kb.id, location, blob)
doc = {
"id": get_uuid(),
"kb_id": kb.id,
Expand Down
6 changes: 3 additions & 3 deletions api/db/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from api.utils import current_timestamp, get_uuid
from deepdoc.parser.excel_parser import RAGFlowExcelParser
from rag.settings import SVR_QUEUE_NAME
from rag.utils.minio_conn import MINIO
from rag.utils.storage_factory import STORAGE_IMPL
from rag.utils.redis_conn import REDIS_CONN


Expand Down Expand Up @@ -143,7 +143,7 @@ def new_task():
tsks = []

if doc["type"] == FileType.PDF.value:
file_bin = MINIO.get(bucket, name)
file_bin = STORAGE_IMPL.get(bucket, name)
do_layout = doc["parser_config"].get("layout_recognize", True)
pages = PdfParser.total_page_number(doc["name"], file_bin)
page_size = doc["parser_config"].get("task_page_size", 12)
Expand All @@ -169,7 +169,7 @@ def new_task():
tsks.append(task)

elif doc["parser_id"] == "table":
file_bin = MINIO.get(bucket, name)
file_bin = STORAGE_IMPL.get(bucket, name)
rn = RAGFlowExcelParser.row_number(
doc["name"], file_bin)
for i in range(0, rn, 3000):
Expand Down
2 changes: 2 additions & 0 deletions rag/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

es_logger = getLogger("es")
minio_logger = getLogger("minio")
s3_logger = getLogger("s3")
azure_logger = getLogger("azure")
cron_logger = getLogger("cron_logger")
cron_logger.setLevel(20)
chunk_logger = getLogger("chunk_logger")
Expand Down
4 changes: 2 additions & 2 deletions rag/svr/cache_file_svr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from api.db.db_models import close_connection
from api.db.services.task_service import TaskService
from rag.settings import cron_logger
from rag.utils.minio_conn import MINIO
from rag.utils.storage_factory import STORAGE_IMPL
from rag.utils.redis_conn import REDIS_CONN


Expand All @@ -42,7 +42,7 @@ def main():
try:
key = "{}/{}".format(kb_id, loc)
if REDIS_CONN.exist(key):continue
file_bin = MINIO.get(kb_id, loc)
file_bin = STORAGE_IMPL.get(kb_id, loc)
REDIS_CONN.transaction(key, file_bin, 12 * 60)
cron_logger.info("CACHE: {}".format(loc))
except Exception as e:
Expand Down
6 changes: 3 additions & 3 deletions rag/svr/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from api.db.services.file2document_service import File2DocumentService
from api.settings import retrievaler
from rag.raptor import RecursiveAbstractiveProcessing4TreeOrganizedRetrieval as Raptor
from rag.utils.minio_conn import MINIO
from rag.utils.storage_factory import STORAGE_IMPL
from api.db.db_models import close_connection
from rag.settings import database_logger, SVR_QUEUE_NAME
from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
Expand Down Expand Up @@ -138,7 +138,7 @@ def collect():


def get_minio_binary(bucket, name):
return MINIO.get(bucket, name)
return STORAGE_IMPL.get(bucket, name)


def build(row):
Expand Down Expand Up @@ -214,7 +214,7 @@ def build(row):
d["image"].save(output_buffer, format='JPEG')

st = timer()
MINIO.put(row["kb_id"], d["_id"], output_buffer.getvalue())
STORAGE_IMPL.put(row["kb_id"], d["_id"], output_buffer.getvalue())
el += timer() - st
except Exception as e:
cron_logger.error(str(e))
Expand Down
79 changes: 79 additions & 0 deletions rag/utils/azure_sas_conn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
import time
from io import BytesIO
from rag.settings import azure_logger
from rag.utils import singleton
from azure.storage.blob import ContainerClient


@singleton
class RAGFlowAzureSasBlob(object):
def __init__(self):
self.conn = None
self.account_url = os.getenv('CONTAINER_URL', None)
self.sas_token = os.getenv('SAS_TOKEN', None)
self.__open__()

def __open__(self):
try:
if self.conn:
self.__close__()
except Exception as e:
pass

try:
self.conn = ContainerClient.from_container_url(self.account_url + "?" + self.sas_token)
except Exception as e:
azure_logger.error(
"Fail to connect %s " % self.account_url + str(e))

def __close__(self):
del self.conn
self.conn = None

def health(self):
bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
return self.conn.upload_blob(name=fnm, data=BytesIO(binary), length=len(binary))

def put(self, bucket, fnm, binary):
for _ in range(3):
try:
return self.conn.upload_blob(name=fnm, data=BytesIO(binary), length=len(binary))
except Exception as e:
azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
self.__open__()
time.sleep(1)

def rm(self, bucket, fnm):
try:
self.conn.delete_blob(fnm)
except Exception as e:
azure_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e))

def get(self, bucket, fnm):
for _ in range(1):
try:
r = self.conn.download_blob(fnm)
return r.read()
except Exception as e:
azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
self.__open__()
time.sleep(1)
return

def obj_exist(self, bucket, fnm):
try:
return self.conn.get_blob_client(fnm).exists()
except Exception as e:
azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
return False

def get_presigned_url(self, bucket, fnm, expires):
for _ in range(10):
try:
return self.conn.get_presigned_url("GET", bucket, fnm, expires)
except Exception as e:
azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
self.__open__()
time.sleep(1)
return
Loading

0 comments on commit 7fb19d1

Please sign in to comment.