From 8dd3adc443b5887d043e0bd8a3a6e3af86d9e8eb Mon Sep 17 00:00:00 2001 From: Fachuan Bai Date: Mon, 9 Sep 2024 09:41:14 +0800 Subject: [PATCH] Storage: Support the s3, azure blob as the object storage of ragflow. (#2278) ### What problem does this PR solve? issue: https://github.com/infiniflow/ragflow/issues/2277 _Briefly describe what this PR aims to solve. Include background context that will help reviewers understand the purpose of the PR._ ### Type of change - [ ] Bug Fix (non-breaking change which fixes an issue) - [x] New Feature (non-breaking change which adds functionality) - [ ] Documentation Update - [ ] Refactoring - [ ] Performance Improvement - [ ] Other (please describe): Co-authored-by: Kevin Hu --- api/apps/api_app.py | 10 +-- api/apps/dataset_api.py | 12 +-- api/apps/document_app.py | 12 +-- api/apps/file_app.py | 8 +- api/apps/system_app.py | 4 +- api/db/services/document_service.py | 4 +- api/db/services/file_service.py | 6 +- api/db/services/task_service.py | 6 +- conf/service_conf.yaml | 16 ++++ rag/settings.py | 4 + rag/svr/cache_file_svr.py | 4 +- rag/svr/task_executor.py | 6 +- rag/utils/azure_sas_conn.py | 80 +++++++++++++++++ rag/utils/azure_spn_conn.py | 90 +++++++++++++++++++ rag/utils/s3_conn.py | 135 ++++++++++++++++++++++++++++ rag/utils/storage_factory.py | 30 +++++++ requirements.txt | 6 +- 17 files changed, 395 insertions(+), 38 deletions(-) create mode 100644 rag/utils/azure_sas_conn.py create mode 100644 rag/utils/azure_spn_conn.py create mode 100644 rag/utils/s3_conn.py create mode 100644 rag/utils/storage_factory.py diff --git a/api/apps/api_app.py b/api/apps/api_app.py index 601774a058..388f5dd3f9 100644 --- a/api/apps/api_app.py +++ b/api/apps/api_app.py @@ -39,7 +39,7 @@ from api.utils.file_utils import filename_type, thumbnail from rag.nlp import keyword_extraction -from rag.utils.minio_conn import MINIO +from rag.utils.storage_factory import STORAGE_IMPL from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService from agent.canvas import Canvas @@ -427,10 +427,10 @@ def upload(): retmsg="This type of file has not been supported yet!") location = filename - while MINIO.obj_exist(kb_id, location): + while STORAGE_IMPL.obj_exist(kb_id, location): location += "_" blob = request.files['file'].read() - MINIO.put(kb_id, location, blob) + STORAGE_IMPL.put(kb_id, location, blob) doc = { "id": get_uuid(), "kb_id": kb.id, @@ -650,7 +650,7 @@ def document_rm(): FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id]) File2DocumentService.delete_by_document_id(doc_id) - MINIO.rm(b, n) + STORAGE_IMPL.rm(b, n) except Exception as e: errors += str(e) @@ -723,7 +723,7 @@ def fillin_conv(ans): if ans["reference"]["chunks"][chunk_idx]["img_id"]: try: bkt, nm = ans["reference"]["chunks"][chunk_idx]["img_id"].split("-") - response = MINIO.get(bkt, nm) + response = STORAGE_IMPL.get(bkt, nm) data_type_picture["url"] = base64.b64encode(response).decode('utf-8') data.append(data_type_picture) break diff --git a/api/apps/dataset_api.py b/api/apps/dataset_api.py index 7b2074239a..a7bbde8b10 100644 --- a/api/apps/dataset_api.py +++ b/api/apps/dataset_api.py @@ -42,7 +42,7 @@ from rag.app import book, laws, manual, naive, one, paper, presentation, qa, resume, table, picture, audio, email from rag.nlp import search from rag.utils.es_conn import ELASTICSEARCH -from rag.utils.minio_conn import MINIO +from rag.utils.storage_factory import STORAGE_IMPL MAXIMUM_OF_UPLOADING_FILES = 256 @@ -352,7 +352,7 @@ def upload_documents(dataset_id): # upload to the minio location = filename - while MINIO.obj_exist(dataset_id, location): + while STORAGE_IMPL.obj_exist(dataset_id, location): location += "_" blob = file.read() @@ -361,7 +361,7 @@ def upload_documents(dataset_id): if blob == b'': warnings.warn(f"[WARNING]: The content of the file {filename} is empty.") - MINIO.put(dataset_id, location, blob) + STORAGE_IMPL.put(dataset_id, location, blob) doc = { "id": get_uuid(), @@ -441,7 +441,7 @@ def delete_document(document_id, dataset_id): # string File2DocumentService.delete_by_document_id(document_id) # delete it from minio - MINIO.rm(dataset_id, location) + STORAGE_IMPL.rm(dataset_id, location) except Exception as e: errors += str(e) if errors: @@ -596,7 +596,7 @@ def download_document(dataset_id, document_id): # The process of downloading doc_id, doc_location = File2DocumentService.get_minio_address(doc_id=document_id) # minio address - file_stream = MINIO.get(doc_id, doc_location) + file_stream = STORAGE_IMPL.get(doc_id, doc_location) if not file_stream: return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR) @@ -737,7 +737,7 @@ def parsing_document_internal(id): doc_id = doc_attributes["id"] bucket, doc_name = File2DocumentService.get_minio_address(doc_id=doc_id) - binary = MINIO.get(bucket, doc_name) + binary = STORAGE_IMPL.get(bucket, doc_name) parser_name = doc_attributes["parser_id"] if binary: res = doc_parse(binary, doc_name, parser_name, tenant_id, doc_id) diff --git a/api/apps/document_app.py b/api/apps/document_app.py index 04d4c4d0a6..9b0834dd52 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -48,7 +48,7 @@ from api.db.services.document_service import DocumentService, doc_upload_and_parse from api.settings import RetCode, stat_logger from api.utils.api_utils import get_json_result -from rag.utils.minio_conn import MINIO +from rag.utils.storage_factory import STORAGE_IMPL from api.utils.file_utils import filename_type, thumbnail, get_project_base_directory from api.utils.web_utils import html2pdf, is_valid_url @@ -118,9 +118,9 @@ def web_crawl(): raise RuntimeError("This type of file has not been supported yet!") location = filename - while MINIO.obj_exist(kb_id, location): + while STORAGE_IMPL.obj_exist(kb_id, location): location += "_" - MINIO.put(kb_id, location, blob) + STORAGE_IMPL.put(kb_id, location, blob) doc = { "id": get_uuid(), "kb_id": kb.id, @@ -307,7 +307,7 @@ def rm(): FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id]) File2DocumentService.delete_by_document_id(doc_id) - MINIO.rm(b, n) + STORAGE_IMPL.rm(b, n) except Exception as e: errors += str(e) @@ -394,7 +394,7 @@ def get(doc_id): return get_data_error_result(retmsg="Document not found!") b, n = File2DocumentService.get_minio_address(doc_id=doc_id) - response = flask.make_response(MINIO.get(b, n)) + response = flask.make_response(STORAGE_IMPL.get(b, n)) ext = re.search(r"\.([^.]+)$", doc.name) if ext: @@ -458,7 +458,7 @@ def change_parser(): def get_image(image_id): try: bkt, nm = image_id.split("-") - response = flask.make_response(MINIO.get(bkt, nm)) + response = flask.make_response(STORAGE_IMPL.get(bkt, nm)) response.headers.set('Content-Type', 'image/JPEG') return response except Exception as e: diff --git a/api/apps/file_app.py b/api/apps/file_app.py index 17f3c64265..555a4658e0 100644 --- a/api/apps/file_app.py +++ b/api/apps/file_app.py @@ -34,7 +34,7 @@ from api.utils.file_utils import filename_type from rag.nlp import search from rag.utils.es_conn import ELASTICSEARCH -from rag.utils.minio_conn import MINIO +from rag.utils.storage_factory import STORAGE_IMPL @manager.route('/upload', methods=['POST']) @@ -98,7 +98,7 @@ def upload(): # file type filetype = filename_type(file_obj_names[file_len - 1]) location = file_obj_names[file_len - 1] - while MINIO.obj_exist(last_folder.id, location): + while STORAGE_IMPL.obj_exist(last_folder.id, location): location += "_" blob = file_obj.read() filename = duplicate_name( @@ -260,7 +260,7 @@ def rm(): e, file = FileService.get_by_id(inner_file_id) if not e: return get_data_error_result(retmsg="File not found!") - MINIO.rm(file.parent_id, file.location) + STORAGE_IMPL.rm(file.parent_id, file.location) FileService.delete_folder_by_pf_id(current_user.id, file_id) else: if not FileService.delete(file): @@ -333,7 +333,7 @@ def get(file_id): if not e: return get_data_error_result(retmsg="Document not found!") b, n = File2DocumentService.get_minio_address(file_id=file_id) - response = flask.make_response(MINIO.get(b, n)) + response = flask.make_response(STORAGE_IMPL.get(b, n)) ext = re.search(r"\.([^.]+)$", file.name) if ext: if file.type == FileType.VISUAL.value: diff --git a/api/apps/system_app.py b/api/apps/system_app.py index d6e4a098f6..6f46d9104e 100644 --- a/api/apps/system_app.py +++ b/api/apps/system_app.py @@ -22,7 +22,7 @@ from api.versions import get_rag_version from rag.settings import SVR_QUEUE_NAME from rag.utils.es_conn import ELASTICSEARCH -from rag.utils.minio_conn import MINIO +from rag.utils.storage_factory import STORAGE_IMPL from timeit import default_timer as timer from rag.utils.redis_conn import REDIS_CONN @@ -47,7 +47,7 @@ def status(): st = timer() try: - MINIO.health() + STORAGE_IMPL.health() res["minio"] = {"status": "green", "elapsed": "{:.1f}".format((timer() - st)*1000.)} except Exception as e: res["minio"] = {"status": "red", "elapsed": "{:.1f}".format((timer() - st)*1000.), "error": str(e)} diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 83a38650a1..5d28ecd644 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -34,7 +34,7 @@ from graphrag.mind_map_extractor import MindMapExtractor from rag.settings import SVR_QUEUE_NAME from rag.utils.es_conn import ELASTICSEARCH -from rag.utils.minio_conn import MINIO +from rag.utils.storage_factory import STORAGE_IMPL from rag.nlp import search, rag_tokenizer from api.db import FileType, TaskStatus, ParserType, LLMType @@ -473,7 +473,7 @@ def dummy(prog=None, msg=""): else: d["image"].save(output_buffer, format='JPEG') - MINIO.put(kb.id, d["_id"], output_buffer.getvalue()) + STORAGE_IMPL.put(kb.id, d["_id"], output_buffer.getvalue()) d["img_id"] = "{}-{}".format(kb.id, d["_id"]) del d["image"] docs.append(d) diff --git a/api/db/services/file_service.py b/api/db/services/file_service.py index 670c4c36a1..3e8a804209 100644 --- a/api/db/services/file_service.py +++ b/api/db/services/file_service.py @@ -27,7 +27,7 @@ from api.db.services.file2document_service import File2DocumentService from api.utils import get_uuid from api.utils.file_utils import filename_type, thumbnail -from rag.utils.minio_conn import MINIO +from rag.utils.storage_factory import STORAGE_IMPL class FileService(CommonService): @@ -350,10 +350,10 @@ def upload_document(self, kb, file_objs, user_id): raise RuntimeError("This type of file has not been supported yet!") location = filename - while MINIO.obj_exist(kb.id, location): + while STORAGE_IMPL.obj_exist(kb.id, location): location += "_" blob = file.read() - MINIO.put(kb.id, location, blob) + STORAGE_IMPL.put(kb.id, location, blob) doc = { "id": get_uuid(), "kb_id": kb.id, diff --git a/api/db/services/task_service.py b/api/db/services/task_service.py index a8dc09380b..2b251c374e 100644 --- a/api/db/services/task_service.py +++ b/api/db/services/task_service.py @@ -27,7 +27,7 @@ from api.utils import current_timestamp, get_uuid from deepdoc.parser.excel_parser import RAGFlowExcelParser from rag.settings import SVR_QUEUE_NAME -from rag.utils.minio_conn import MINIO +from rag.utils.storage_factory import STORAGE_IMPL from rag.utils.redis_conn import REDIS_CONN @@ -143,7 +143,7 @@ def new_task(): tsks = [] if doc["type"] == FileType.PDF.value: - file_bin = MINIO.get(bucket, name) + file_bin = STORAGE_IMPL.get(bucket, name) do_layout = doc["parser_config"].get("layout_recognize", True) pages = PdfParser.total_page_number(doc["name"], file_bin) page_size = doc["parser_config"].get("task_page_size", 12) @@ -169,7 +169,7 @@ def new_task(): tsks.append(task) elif doc["parser_id"] == "table": - file_bin = MINIO.get(bucket, name) + file_bin = STORAGE_IMPL.get(bucket, name) rn = RAGFlowExcelParser.row_number( doc["name"], file_bin) for i in range(0, rn, 3000): diff --git a/conf/service_conf.yaml b/conf/service_conf.yaml index 880561d131..8e983374d0 100644 --- a/conf/service_conf.yaml +++ b/conf/service_conf.yaml @@ -13,6 +13,22 @@ minio: user: 'rag_flow' password: 'infini_rag_flow' host: 'minio:9000' +azure: + auth_type: 'sas' + container_url: 'container_url' + sas_token: 'sas_token' +#azure: +# auth_type: 'spn' +# account_url: 'account_url' +# client_id: 'client_id' +# secret: 'secret' +# tenant_id: 'tenant_id' +# container_name: 'container_name' +s3: + endpoint: 'endpoint' + access_key: 'access_key' + secret_key: 'secret_key' + region: 'region' es: hosts: 'http://es01:9200' username: 'elastic' diff --git a/rag/settings.py b/rag/settings.py index 50053787bd..6e8bcc427b 100644 --- a/rag/settings.py +++ b/rag/settings.py @@ -24,6 +24,8 @@ SUBPROCESS_STD_LOG_NAME = "std.log" ES = get_base_config("es", {}) +AZURE = get_base_config("azure", {}) +S3 = get_base_config("s3", {}) MINIO = decrypt_database_config(name="minio") try: REDIS = decrypt_database_config(name="redis") @@ -43,6 +45,8 @@ es_logger = getLogger("es") minio_logger = getLogger("minio") +s3_logger = getLogger("s3") +azure_logger = getLogger("azure") cron_logger = getLogger("cron_logger") cron_logger.setLevel(20) chunk_logger = getLogger("chunk_logger") diff --git a/rag/svr/cache_file_svr.py b/rag/svr/cache_file_svr.py index caa9ce5303..e929f899c1 100644 --- a/rag/svr/cache_file_svr.py +++ b/rag/svr/cache_file_svr.py @@ -20,7 +20,7 @@ from api.db.db_models import close_connection from api.db.services.task_service import TaskService from rag.settings import cron_logger -from rag.utils.minio_conn import MINIO +from rag.utils.storage_factory import STORAGE_IMPL from rag.utils.redis_conn import REDIS_CONN @@ -42,7 +42,7 @@ def main(): try: key = "{}/{}".format(kb_id, loc) if REDIS_CONN.exist(key):continue - file_bin = MINIO.get(kb_id, loc) + file_bin = STORAGE_IMPL.get(kb_id, loc) REDIS_CONN.transaction(key, file_bin, 12 * 60) cron_logger.info("CACHE: {}".format(loc)) except Exception as e: diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 3a9eb73908..401859fc6d 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -29,7 +29,7 @@ from api.db.services.file2document_service import File2DocumentService from api.settings import retrievaler from rag.raptor import RecursiveAbstractiveProcessing4TreeOrganizedRetrieval as Raptor -from rag.utils.minio_conn import MINIO +from rag.utils.storage_factory import STORAGE_IMPL from api.db.db_models import close_connection from rag.settings import database_logger, SVR_QUEUE_NAME from rag.settings import cron_logger, DOC_MAXIMUM_SIZE @@ -138,7 +138,7 @@ def collect(): def get_minio_binary(bucket, name): - return MINIO.get(bucket, name) + return STORAGE_IMPL.get(bucket, name) def build(row): @@ -214,7 +214,7 @@ def build(row): d["image"].save(output_buffer, format='JPEG') st = timer() - MINIO.put(row["kb_id"], d["_id"], output_buffer.getvalue()) + STORAGE_IMPL.put(row["kb_id"], d["_id"], output_buffer.getvalue()) el += timer() - st except Exception as e: cron_logger.error(str(e)) diff --git a/rag/utils/azure_sas_conn.py b/rag/utils/azure_sas_conn.py new file mode 100644 index 0000000000..4c0b1eec00 --- /dev/null +++ b/rag/utils/azure_sas_conn.py @@ -0,0 +1,80 @@ +import os +import time +from io import BytesIO +from rag import settings +from rag.settings import azure_logger +from rag.utils import singleton +from azure.storage.blob import ContainerClient + + +@singleton +class RAGFlowAzureSasBlob(object): + def __init__(self): + self.conn = None + self.container_url = os.getenv('CONTAINER_URL', settings.AZURE["container_url"]) + self.sas_token = os.getenv('SAS_TOKEN', settings.AZURE["sas_token"]) + self.__open__() + + def __open__(self): + try: + if self.conn: + self.__close__() + except Exception as e: + pass + + try: + self.conn = ContainerClient.from_container_url(self.account_url + "?" + self.sas_token) + except Exception as e: + azure_logger.error( + "Fail to connect %s " % self.account_url + str(e)) + + def __close__(self): + del self.conn + self.conn = None + + def health(self): + bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1" + return self.conn.upload_blob(name=fnm, data=BytesIO(binary), length=len(binary)) + + def put(self, bucket, fnm, binary): + for _ in range(3): + try: + return self.conn.upload_blob(name=fnm, data=BytesIO(binary), length=len(binary)) + except Exception as e: + azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e)) + self.__open__() + time.sleep(1) + + def rm(self, bucket, fnm): + try: + self.conn.delete_blob(fnm) + except Exception as e: + azure_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e)) + + def get(self, bucket, fnm): + for _ in range(1): + try: + r = self.conn.download_blob(fnm) + return r.read() + except Exception as e: + azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e)) + self.__open__() + time.sleep(1) + return + + def obj_exist(self, bucket, fnm): + try: + return self.conn.get_blob_client(fnm).exists() + except Exception as e: + azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e)) + return False + + def get_presigned_url(self, bucket, fnm, expires): + for _ in range(10): + try: + return self.conn.get_presigned_url("GET", bucket, fnm, expires) + except Exception as e: + azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e)) + self.__open__() + time.sleep(1) + return \ No newline at end of file diff --git a/rag/utils/azure_spn_conn.py b/rag/utils/azure_spn_conn.py new file mode 100644 index 0000000000..d614fdfbfe --- /dev/null +++ b/rag/utils/azure_spn_conn.py @@ -0,0 +1,90 @@ +import os +import time +from rag import settings +from rag.settings import azure_logger +from rag.utils import singleton +from azure.identity import ClientSecretCredential, AzureAuthorityHosts +from azure.storage.filedatalake import FileSystemClient + + +@singleton +class RAGFlowAzureSpnBlob(object): + def __init__(self): + self.conn = None + self.account_url = os.getenv('ACCOUNT_URL', settings.AZURE["account_url"]) + self.client_id = os.getenv('CLIENT_ID', settings.AZURE["client_id"]) + self.secret = os.getenv('SECRET', settings.AZURE["secret"]) + self.tenant_id = os.getenv('TENANT_ID', settings.AZURE["tenant_id"]) + self.container_name = os.getenv('CONTAINER_NAME', settings.AZURE["container_name"]) + self.__open__() + + def __open__(self): + try: + if self.conn: + self.__close__() + except Exception as e: + pass + + try: + credentials = ClientSecretCredential(tenant_id=self.tenant_id, client_id=self.client_id, client_secret=self.secret, authority=AzureAuthorityHosts.AZURE_CHINA) + self.conn = FileSystemClient(account_url=self.account_url, file_system_name=self.container_name, credential=credentials) + except Exception as e: + azure_logger.error( + "Fail to connect %s " % self.account_url + str(e)) + + def __close__(self): + del self.conn + self.conn = None + + def health(self): + bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1" + f = self.conn.create_file(fnm) + f.append_data(binary, offset=0, length=len(binary)) + return f.flush_data(len(binary)) + + def put(self, bucket, fnm, binary): + for _ in range(3): + try: + f = self.conn.create_file(fnm) + f.append_data(binary, offset=0, length=len(binary)) + return f.flush_data(len(binary)) + except Exception as e: + azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e)) + self.__open__() + time.sleep(1) + + def rm(self, bucket, fnm): + try: + self.conn.delete_file(fnm) + except Exception as e: + azure_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e)) + + def get(self, bucket, fnm): + for _ in range(1): + try: + client = self.conn.get_file_client(fnm) + r = client.download_file() + return r.read() + except Exception as e: + azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e)) + self.__open__() + time.sleep(1) + return + + def obj_exist(self, bucket, fnm): + try: + client = self.conn.get_file_client(fnm) + return client.exists() + except Exception as e: + azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e)) + return False + + def get_presigned_url(self, bucket, fnm, expires): + for _ in range(10): + try: + return self.conn.get_presigned_url("GET", bucket, fnm, expires) + except Exception as e: + azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e)) + self.__open__() + time.sleep(1) + return \ No newline at end of file diff --git a/rag/utils/s3_conn.py b/rag/utils/s3_conn.py new file mode 100644 index 0000000000..ab2d45a77e --- /dev/null +++ b/rag/utils/s3_conn.py @@ -0,0 +1,135 @@ +import boto3 +import os +from botocore.exceptions import ClientError +from botocore.client import Config +import time +from io import BytesIO +from rag.settings import s3_logger +from rag.utils import singleton + +@singleton +class RAGFlowS3(object): + def __init__(self): + self.conn = None + self.endpoint = os.getenv('ENDPOINT', None) + self.access_key = os.getenv('ACCESS_KEY', None) + self.secret_key = os.getenv('SECRET_KEY', None) + self.region = os.getenv('REGION', None) + self.__open__() + + def __open__(self): + try: + if self.conn: + self.__close__() + except Exception as e: + pass + + try: + + config = Config( + s3={ + 'addressing_style': 'virtual' + } + ) + + self.conn = boto3.client( + 's3', + endpoint_url=self.endpoint, + region_name=self.region, + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + config=config + ) + except Exception as e: + s3_logger.error( + "Fail to connect %s " % self.endpoint + str(e)) + + def __close__(self): + del self.conn + self.conn = None + + def bucket_exists(self, bucket): + try: + s3_logger.error(f"head_bucket bucketname {bucket}") + self.conn.head_bucket(Bucket=bucket) + exists = True + except ClientError as e: + s3_logger.error(f"head_bucket error {bucket}: " + str(e)) + exists = False + return exists + + def health(self): + bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1" + + if not self.bucket_exists(bucket): + self.conn.create_bucket(Bucket=bucket) + s3_logger.error(f"create bucket {bucket} ********") + + r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm) + return r + + def get_properties(self, bucket, key): + return {} + + def list(self, bucket, dir, recursive=True): + return [] + + def put(self, bucket, fnm, binary): + s3_logger.error(f"bucket name {bucket}; filename :{fnm}:") + for _ in range(1): + try: + if not self.bucket_exists(bucket): + self.conn.create_bucket(Bucket=bucket) + s3_logger.error(f"create bucket {bucket} ********") + r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm) + + return r + except Exception as e: + s3_logger.error(f"Fail put {bucket}/{fnm}: " + str(e)) + self.__open__() + time.sleep(1) + + def rm(self, bucket, fnm): + try: + self.conn.delete_object(Bucket=bucket, Key=fnm) + except Exception as e: + s3_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e)) + + def get(self, bucket, fnm): + for _ in range(1): + try: + r = self.conn.get_object(Bucket=bucket, Key=fnm) + object_data = r['Body'].read() + return object_data + except Exception as e: + s3_logger.error(f"fail get {bucket}/{fnm}: " + str(e)) + self.__open__() + time.sleep(1) + return + + def obj_exist(self, bucket, fnm): + try: + + if self.conn.head_object(Bucket=bucket, Key=fnm): + return True + except ClientError as e: + if e.response['Error']['Code'] == '404': + + return False + else: + raise + + def get_presigned_url(self, bucket, fnm, expires): + for _ in range(10): + try: + r = self.conn.generate_presigned_url('get_object', + Params={'Bucket': bucket, + 'Key': fnm}, + ExpiresIn=expires) + + return r + except Exception as e: + s3_logger.error(f"fail get url {bucket}/{fnm}: " + str(e)) + self.__open__() + time.sleep(1) + return diff --git a/rag/utils/storage_factory.py b/rag/utils/storage_factory.py new file mode 100644 index 0000000000..0b50affbe9 --- /dev/null +++ b/rag/utils/storage_factory.py @@ -0,0 +1,30 @@ +import os +from enum import Enum + +from rag.utils.azure_sas_conn import RAGFlowAzureSasBlob +from rag.utils.azure_spn_conn import RAGFlowAzureSpnBlob +from rag.utils.minio_conn import RAGFlowMinio +from rag.utils.s3_conn import RAGFlowS3 + + +class Storage(Enum): + MINIO = 1 + AZURE_SPN = 2 + AZURE_SAS = 3 + AWS_S3 = 4 + + +class StorageFactory: + storage_mapping = { + Storage.MINIO: RAGFlowMinio, + Storage.AZURE_SPN: RAGFlowAzureSpnBlob, + Storage.AZURE_SAS: RAGFlowAzureSasBlob, + Storage.AWS_S3: RAGFlowS3, + } + + @classmethod + def create(cls, storage: Storage): + return cls.storage_mapping[storage]() + + +STORAGE_IMPL = StorageFactory.create(Storage[os.getenv('STORAGE_IMPL', 'MINIO')]) diff --git a/requirements.txt b/requirements.txt index 9f86aaeb13..7fceac016e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ -akshare==1.14.72 -anthropic==0.34.1 +azure-storage-blob==12.22.0 +azure-identity==1.17.1 +azure-storage-file-datalake==12.16.0 +anthropic===0.34.1 arxiv==2.1.3 Aspose.Slides==24.2.0 BCEmbedding==0.1.3