diff --git a/agent/canvas.py b/agent/canvas.py index 27774d60448..0b6281398f8 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -14,14 +14,12 @@ # limitations under the License. # import json -import traceback from abc import ABC from copy import deepcopy from functools import partial from agent.component import component_class from agent.component.base import ComponentBase -from agent.settings import flow_logger, DEBUG - +from api.utils.log_utils import logger class Canvas(ABC): """ @@ -189,7 +187,7 @@ def prepare2run(cpns): if cpn.component_name == "Answer": self.answer.append(c) else: - if DEBUG: print("RUN: ", c) + logger.debug(f"Canvas.prepare2run: {c}") cpids = cpn.get_dependent_components() if any([c not in self.path[-1] for c in cpids]): continue @@ -199,7 +197,7 @@ def prepare2run(cpns): prepare2run(self.components[self.path[-2][-1]]["downstream"]) while 0 <= ran < len(self.path[-1]): - if DEBUG: print(ran, self.path) + logger.debug(f"Canvas.run: {ran} {self.path}") cpn_id = self.path[-1][ran] cpn = self.get_component(cpn_id) if not cpn["downstream"]: break @@ -219,7 +217,7 @@ def prepare2run(cpns): self.get_component(p)["obj"].set_exception(e) prepare2run([p]) break - traceback.print_exc() + logger.exception("Canvas.run got exception") break continue @@ -231,7 +229,7 @@ def prepare2run(cpns): self.get_component(p)["obj"].set_exception(e) prepare2run([p]) break - traceback.print_exc() + logger.exception("Canvas.run got exception") break if self.answer: diff --git a/agent/component/arxiv.py b/agent/component/arxiv.py index 6b47ded9d15..c3f52d64f34 100644 --- a/agent/component/arxiv.py +++ b/agent/component/arxiv.py @@ -16,9 +16,8 @@ from abc import ABC import arxiv import pandas as pd -from agent.settings import DEBUG from agent.component.base import ComponentBase, ComponentParamBase - +from api.utils.log_utils import logger class ArXivParam(ComponentParamBase): """ @@ -65,5 +64,5 @@ def _run(self, history, **kwargs): return ArXiv.be_output("") df = pd.DataFrame(arxiv_res) - if DEBUG: print(df, ":::::::::::::::::::::::::::::::::") + logger.debug(f"df: {str(df)}") return df diff --git a/agent/component/baidu.py b/agent/component/baidu.py index cb2b66ed68a..cb1f5a2a52f 100644 --- a/agent/component/baidu.py +++ b/agent/component/baidu.py @@ -13,14 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import random from abc import ABC -from functools import partial import pandas as pd import requests import re -from agent.settings import DEBUG from agent.component.base import ComponentBase, ComponentParamBase +from api.utils.log_utils import logger class BaiduParam(ComponentParamBase): @@ -64,6 +62,6 @@ def _run(self, history, **kwargs): return Baidu.be_output("") df = pd.DataFrame(baidu_res) - if DEBUG: print(df, ":::::::::::::::::::::::::::::::::") + logger.debug(f"df: {str(df)}") return df diff --git a/agent/component/base.py b/agent/component/base.py index d105d43b8a1..2029e7a4d4a 100644 --- a/agent/component/base.py +++ b/agent/component/base.py @@ -17,14 +17,14 @@ import builtins import json import os -from copy import deepcopy from functools import partial -from typing import List, Dict, Tuple, Union +from typing import Tuple, Union import pandas as pd from agent import settings -from agent.settings import flow_logger, DEBUG +from api.utils.log_utils import logger + _FEEDED_DEPRECATED_PARAMS = "_feeded_deprecated_params" _DEPRECATED_PARAMS = "_deprecated_params" @@ -361,13 +361,13 @@ def _not_in(value, wrong_value_list): def _warn_deprecated_param(self, param_name, descr): if self._deprecated_params_set.get(param_name): - flow_logger.warning( + logger.warning( f"{descr} {param_name} is deprecated and ignored in this version." ) def _warn_to_deprecate_param(self, param_name, descr, new_param): if self._deprecated_params_set.get(param_name): - flow_logger.warning( + logger.warning( f"{descr} {param_name} will be deprecated in future release; " f"please use {new_param} instead." ) @@ -403,7 +403,7 @@ def get_dependent_components(self): return cpnts def run(self, history, **kwargs): - flow_logger.info("{}, history: {}, kwargs: {}".format(self, json.dumps(history, ensure_ascii=False), + logger.info("{}, history: {}, kwargs: {}".format(self, json.dumps(history, ensure_ascii=False), json.dumps(kwargs, ensure_ascii=False))) try: res = self._run(history, **kwargs) @@ -463,7 +463,7 @@ def get_input(self): reversed_cpnts.extend(self._canvas.path[-2]) reversed_cpnts.extend(self._canvas.path[-1]) - if DEBUG: print(self.component_name, reversed_cpnts[::-1]) + logger.debug(f"{self.component_name} {reversed_cpnts[::-1]}") for u in reversed_cpnts[::-1]: if self.get_component_name(u) in ["switch", "concentrator"]: continue if self.component_name.lower() == "generate" and self.get_component_name(u) == "retrieval": diff --git a/agent/component/bing.py b/agent/component/bing.py index dce3c7ea827..7497c4c9794 100644 --- a/agent/component/bing.py +++ b/agent/component/bing.py @@ -16,9 +16,8 @@ from abc import ABC import requests import pandas as pd -from agent.settings import DEBUG from agent.component.base import ComponentBase, ComponentParamBase - +from api.utils.log_utils import logger class BingParam(ComponentParamBase): """ @@ -81,5 +80,5 @@ def _run(self, history, **kwargs): return Bing.be_output("") df = pd.DataFrame(bing_res) - if DEBUG: print(df, ":::::::::::::::::::::::::::::::::") + logger.debug(f"df: {str(df)}") return df diff --git a/agent/component/categorize.py b/agent/component/categorize.py index 7ba001b8369..f1c8855ca15 100644 --- a/agent/component/categorize.py +++ b/agent/component/categorize.py @@ -17,7 +17,7 @@ from api.db import LLMType from api.db.services.llm_service import LLMBundle from agent.component import GenerateParam, Generate -from agent.settings import DEBUG +from api.utils.log_utils import logger class CategorizeParam(GenerateParam): @@ -34,7 +34,7 @@ def check(self): super().check() self.check_empty(self.category_description, "[Categorize] Category examples") for k, v in self.category_description.items(): - if not k: raise ValueError(f"[Categorize] Category name can not be empty!") + if not k: raise ValueError("[Categorize] Category name can not be empty!") if not v.get("to"): raise ValueError(f"[Categorize] 'To' of category {k} can not be empty!") def get_prompt(self): @@ -77,7 +77,7 @@ def _run(self, history, **kwargs): chat_mdl = LLMBundle(self._canvas.get_tenant_id(), LLMType.CHAT, self._param.llm_id) ans = chat_mdl.chat(self._param.get_prompt(), [{"role": "user", "content": input}], self._param.gen_conf()) - if DEBUG: print(ans, ":::::::::::::::::::::::::::::::::", input) + logger.debug(f"input: {input}, answer: {str(ans)}") for c in self._param.category_description.keys(): if ans.lower().find(c.lower()) >= 0: return Categorize.be_output(self._param.category_description[c]["to"]) diff --git a/agent/component/duckduckgo.py b/agent/component/duckduckgo.py index 2ee011369db..b0ad40c2316 100644 --- a/agent/component/duckduckgo.py +++ b/agent/component/duckduckgo.py @@ -16,8 +16,8 @@ from abc import ABC from duckduckgo_search import DDGS import pandas as pd -from agent.settings import DEBUG from agent.component.base import ComponentBase, ComponentParamBase +from api.utils.log_utils import logger class DuckDuckGoParam(ComponentParamBase): @@ -62,5 +62,5 @@ def _run(self, history, **kwargs): return DuckDuckGo.be_output("") df = pd.DataFrame(duck_res) - if DEBUG: print(df, ":::::::::::::::::::::::::::::::::") + logger.debug("df: {df}") return df diff --git a/agent/component/github.py b/agent/component/github.py index 5e56340e6c4..20c9a0c3ff0 100644 --- a/agent/component/github.py +++ b/agent/component/github.py @@ -16,8 +16,8 @@ from abc import ABC import pandas as pd import requests -from agent.settings import DEBUG from agent.component.base import ComponentBase, ComponentParamBase +from api.utils.log_utils import logger class GitHubParam(ComponentParamBase): @@ -57,5 +57,5 @@ def _run(self, history, **kwargs): return GitHub.be_output("") df = pd.DataFrame(github_res) - if DEBUG: print(df, ":::::::::::::::::::::::::::::::::") + logger.debug(f"df: {df}") return df diff --git a/agent/component/google.py b/agent/component/google.py index a6ff7d281d9..0e0cec40ed1 100644 --- a/agent/component/google.py +++ b/agent/component/google.py @@ -16,8 +16,8 @@ from abc import ABC from serpapi import GoogleSearch import pandas as pd -from agent.settings import DEBUG from agent.component.base import ComponentBase, ComponentParamBase +from api.utils.log_utils import logger class GoogleParam(ComponentParamBase): @@ -85,12 +85,12 @@ def _run(self, history, **kwargs): "hl": self._param.language, "num": self._param.top_n}) google_res = [{"content": '' + i["title"] + ' ' + i["snippet"]} for i in client.get_dict()["organic_results"]] - except Exception as e: + except Exception: return Google.be_output("**ERROR**: Existing Unavailable Parameters!") if not google_res: return Google.be_output("") df = pd.DataFrame(google_res) - if DEBUG: print(df, ":::::::::::::::::::::::::::::::::") + logger.debug(f"df: {df}") return df diff --git a/agent/component/googlescholar.py b/agent/component/googlescholar.py index 19e1110e26b..d6dc146891d 100644 --- a/agent/component/googlescholar.py +++ b/agent/component/googlescholar.py @@ -15,9 +15,9 @@ # from abc import ABC import pandas as pd -from agent.settings import DEBUG from agent.component.base import ComponentBase, ComponentParamBase from scholarly import scholarly +from api.utils.log_utils import logger class GoogleScholarParam(ComponentParamBase): @@ -58,13 +58,13 @@ def _run(self, history, **kwargs): 'pub_url'] + '"> ' + "\n author: " + ",".join(pub['bib']['author']) + '\n Abstract: ' + pub[ 'bib'].get('abstract', 'no abstract')}) - except StopIteration or Exception as e: - print("**ERROR** " + str(e)) + except StopIteration or Exception: + logger.exception("GoogleScholar") break if not scholar_res: return GoogleScholar.be_output("") df = pd.DataFrame(scholar_res) - if DEBUG: print(df, ":::::::::::::::::::::::::::::::::") + logger.debug(f"df: {df}") return df diff --git a/agent/component/keyword.py b/agent/component/keyword.py index 805dddf3ce1..abc0c40ccb7 100644 --- a/agent/component/keyword.py +++ b/agent/component/keyword.py @@ -18,7 +18,7 @@ from api.db import LLMType from api.db.services.llm_service import LLMBundle from agent.component import GenerateParam, Generate -from agent.settings import DEBUG +from api.utils.log_utils import logger class KeywordExtractParam(GenerateParam): @@ -58,5 +58,5 @@ def _run(self, history, **kwargs): self._param.gen_conf()) ans = re.sub(r".*keyword:", "", ans).strip() - if DEBUG: print(ans, ":::::::::::::::::::::::::::::::::") + logger.info(f"ans: {ans}") return KeywordExtract.be_output(ans) diff --git a/agent/component/pubmed.py b/agent/component/pubmed.py index ff97ec88b10..409fb05258d 100644 --- a/agent/component/pubmed.py +++ b/agent/component/pubmed.py @@ -18,8 +18,8 @@ import re import pandas as pd import xml.etree.ElementTree as ET -from agent.settings import DEBUG from agent.component.base import ComponentBase, ComponentParamBase +from api.utils.log_utils import logger class PubMedParam(ComponentParamBase): @@ -65,5 +65,5 @@ def _run(self, history, **kwargs): return PubMed.be_output("") df = pd.DataFrame(pubmed_res) - if DEBUG: print(df, ":::::::::::::::::::::::::::::::::") + logger.debug(f"df: {df}") return df diff --git a/agent/component/relevant.py b/agent/component/relevant.py index 8f246f3d27f..e20083bbdd6 100644 --- a/agent/component/relevant.py +++ b/agent/component/relevant.py @@ -18,6 +18,7 @@ from api.db.services.llm_service import LLMBundle from agent.component import GenerateParam, Generate from rag.utils import num_tokens_from_string, encoder +from api.utils.log_utils import logger class RelevantParam(GenerateParam): @@ -70,7 +71,7 @@ def _run(self, history, **kwargs): ans = chat_mdl.chat(self._param.get_prompt(), [{"role": "user", "content": ans}], self._param.gen_conf()) - print(ans, ":::::::::::::::::::::::::::::::::") + logger.info(ans) if ans.lower().find("yes") >= 0: return Relevant.be_output(self._param.yes) if ans.lower().find("no") >= 0: diff --git a/agent/component/retrieval.py b/agent/component/retrieval.py index e53d987d9e5..0fc0c714e8c 100644 --- a/agent/component/retrieval.py +++ b/agent/component/retrieval.py @@ -22,6 +22,7 @@ from api.db.services.llm_service import LLMBundle from api.settings import retrievaler from agent.component.base import ComponentBase, ComponentParamBase +from api.utils.log_utils import logger class RetrievalParam(ComponentParamBase): @@ -80,7 +81,7 @@ def _run(self, history, **kwargs): df = pd.DataFrame(kbinfos["chunks"]) df["content"] = df["content_with_weight"] del df["content_with_weight"] - print(">>>>>>>>>>>>>>>>>>>>>>>>>>\n", query, df) + logger.debug("{} {}".format(query, df)) return df diff --git a/agent/component/rewrite.py b/agent/component/rewrite.py index 63e473d8a4a..7cfc01a61e3 100644 --- a/agent/component/rewrite.py +++ b/agent/component/rewrite.py @@ -17,6 +17,7 @@ from api.db import LLMType from api.db.services.llm_service import LLMBundle from agent.component import GenerateParam, Generate +from api.utils.log_utils import logger class RewriteQuestionParam(GenerateParam): @@ -104,7 +105,7 @@ def _run(self, history, **kwargs): self._canvas.history.pop() self._canvas.history.append(("user", ans)) - print(ans, ":::::::::::::::::::::::::::::::::") + logger.info(ans) return RewriteQuestion.be_output(ans) diff --git a/agent/component/wikipedia.py b/agent/component/wikipedia.py index 811ac844339..3d773a63761 100644 --- a/agent/component/wikipedia.py +++ b/agent/component/wikipedia.py @@ -13,13 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import random from abc import ABC -from functools import partial import wikipedia import pandas as pd -from agent.settings import DEBUG from agent.component.base import ComponentBase, ComponentParamBase +from api.utils.log_utils import logger class WikipediaParam(ComponentParamBase): @@ -65,5 +63,5 @@ def _run(self, history, **kwargs): return Wikipedia.be_output("") df = pd.DataFrame(wiki_res) - if DEBUG: print(df, ":::::::::::::::::::::::::::::::::") + logger.debug(f"df: {df}") return df diff --git a/agent/component/yahoofinance.py b/agent/component/yahoofinance.py index aa2cd25b4bc..139c72345c6 100644 --- a/agent/component/yahoofinance.py +++ b/agent/component/yahoofinance.py @@ -17,6 +17,7 @@ import pandas as pd from agent.component.base import ComponentBase, ComponentParamBase import yfinance as yf +from api.utils.log_utils import logger class YahooFinanceParam(ComponentParamBase): @@ -74,8 +75,8 @@ def _run(self, history, **kwargs): {"content": "quarterly cash flow statement:\n" + msft.quarterly_cashflow.to_markdown() + "\n"}) if self._param.news: yohoo_res.append({"content": "news:\n" + pd.DataFrame(msft.news).to_markdown() + "\n"}) - except Exception as e: - print("**ERROR** " + str(e)) + except Exception: + logger.exception("YahooFinance got exception") if not yohoo_res: return YahooFinance.be_output("") diff --git a/agent/settings.py b/agent/settings.py index f6fec1075f7..fbdb263dc74 100644 --- a/agent/settings.py +++ b/agent/settings.py @@ -13,22 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# Logger -import os -from api.utils.file_utils import get_project_base_directory -from api.utils.log_utils import LoggerFactory, getLogger - -DEBUG = 0 -LoggerFactory.set_directory( - os.path.join( - get_project_base_directory(), - "logs", - "flow")) -# {CRITICAL: 50, FATAL:50, ERROR:40, WARNING:30, WARN:30, INFO:20, DEBUG:10, NOTSET:0} -LoggerFactory.LEVEL = 30 - -flow_logger = getLogger("flow") -database_logger = getLogger("database") FLOAT_ZERO = 1e-8 PARAM_MAXDEPTH = 5 diff --git a/api/apps/__init__.py b/api/apps/__init__.py index b101edb172a..d1dc755cfcf 100644 --- a/api/apps/__init__.py +++ b/api/apps/__init__.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import logging import os import sys from importlib.util import module_from_spec, spec_from_file_location @@ -30,18 +29,14 @@ from flask_session import Session from flask_login import LoginManager -from api.settings import SECRET_KEY, stat_logger -from api.settings import API_VERSION, access_logger +from api.settings import SECRET_KEY +from api.settings import API_VERSION from api.utils.api_utils import server_error_response +from api.utils.log_utils import logger from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer __all__ = ["app"] - -logger = logging.getLogger("flask.app") -for h in access_logger.handlers: - logger.addHandler(h) - Request.json = property(lambda self: self.get_json(force=True, silent=True)) app = Flask(__name__) @@ -158,8 +153,8 @@ def load_user(web_request): return user[0] else: return None - except Exception as e: - stat_logger.exception(e) + except Exception: + logger.exception("load_user got exception") return None else: return None diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index dc4c37848ed..260017bc35f 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -23,6 +23,7 @@ from api.utils.api_utils import get_json_result, server_error_response, validate_request, get_data_error_result from agent.canvas import Canvas from peewee import MySQLDatabase, PostgresqlDatabase +from api.utils.log_utils import logger @manager.route('/templates', methods=['GET']) @@ -114,7 +115,7 @@ def run(): pass canvas.add_user_input(req["message"]) answer = canvas.run(stream=stream) - print(canvas) + logger.info(canvas) except Exception as e: return server_error_response(e) diff --git a/api/apps/llm_app.py b/api/apps/llm_app.py index 74b3e71383d..f035da82b5f 100644 --- a/api/apps/llm_app.py +++ b/api/apps/llm_app.py @@ -25,6 +25,7 @@ from api.utils.api_utils import get_json_result from rag.llm import EmbeddingModel, ChatModel, RerankModel, CvModel, TTSModel import requests +from api.utils.log_utils import logger @manager.route('/factories', methods=['GET']) @@ -89,7 +90,7 @@ def set_api_key(): if len(arr) == 0 or tc == 0: raise Exception("Fail") rerank_passed = True - print(f'passed model rerank{llm.llm_name}',flush=True) + logger.info(f'passed model rerank {llm.llm_name}') except Exception as e: msg += f"\nFail to access model({llm.llm_name}) using this api key." + str( e) diff --git a/api/apps/sdk/dataset.py b/api/apps/sdk/dataset.py index 179d9c2bd9f..047ab82790c 100644 --- a/api/apps/sdk/dataset.py +++ b/api/apps/sdk/dataset.py @@ -526,4 +526,4 @@ def list(tenant_id): new_key = key_mapping.get(key, key) renamed_data[new_key] = value renamed_list.append(renamed_data) - return get_result(data=renamed_list) \ No newline at end of file + return get_result(data=renamed_list) diff --git a/api/apps/user_app.py b/api/apps/user_app.py index 04db59256e4..35a9a46c275 100644 --- a/api/apps/user_app.py +++ b/api/apps/user_app.py @@ -53,8 +53,8 @@ ) from api.db.services.user_service import UserService, TenantService, UserTenantService from api.db.services.file_service import FileService -from api.settings import stat_logger from api.utils.api_utils import get_json_result, construct_response +from api.utils.log_utils import logger @manager.route("/login", methods=["POST", "GET"]) @@ -177,7 +177,7 @@ def github_callback(): try: avatar = download_img(user_info["avatar_url"]) except Exception as e: - stat_logger.exception(e) + logger.exception(e) avatar = "" users = user_register( user_id, @@ -202,7 +202,7 @@ def github_callback(): return redirect("/?auth=%s" % user.get_id()) except Exception as e: rollback_user_registration(user_id) - stat_logger.exception(e) + logger.exception(e) return redirect("/?error=%s" % str(e)) # User has already registered, try to log in @@ -279,7 +279,7 @@ def feishu_callback(): try: avatar = download_img(user_info["avatar_url"]) except Exception as e: - stat_logger.exception(e) + logger.exception(e) avatar = "" users = user_register( user_id, @@ -304,7 +304,7 @@ def feishu_callback(): return redirect("/?auth=%s" % user.get_id()) except Exception as e: rollback_user_registration(user_id) - stat_logger.exception(e) + logger.exception(e) return redirect("/?error=%s" % str(e)) # User has already registered, try to log in @@ -436,7 +436,7 @@ def setting_user(): UserService.update_by_id(current_user.id, update_dict) return get_json_result(data=True) except Exception as e: - stat_logger.exception(e) + logger.exception(e) return get_json_result( data=False, message="Update failure!", code=RetCode.EXCEPTION_ERROR ) @@ -621,7 +621,7 @@ def user_add(): ) except Exception as e: rollback_user_registration(user_id) - stat_logger.exception(e) + logger.exception(e) return get_json_result( data=False, message=f"User registration failure, error: {str(e)}", diff --git a/api/db/db_models.py b/api/db/db_models.py index 3938cb3a306..29c3c2618bc 100644 --- a/api/db/db_models.py +++ b/api/db/db_models.py @@ -30,12 +30,9 @@ ) from playhouse.pool import PooledMySQLDatabase, PooledPostgresqlDatabase from api.db import SerializedType, ParserType -from api.settings import DATABASE, stat_logger, SECRET_KEY, DATABASE_TYPE -from api.utils.log_utils import getLogger +from api.settings import DATABASE, SECRET_KEY, DATABASE_TYPE from api import utils - -LOGGER = getLogger() - +from api.utils.log_utils import logger def singleton(cls, *args, **kw): instances = {} @@ -288,7 +285,7 @@ def __init__(self): database_config = DATABASE.copy() db_name = database_config.pop("name") self.database_connection = PooledDatabase[DATABASE_TYPE.upper()].value(db_name, **database_config) - stat_logger.info('init database on cluster mode successfully') + logger.info('init database on cluster mode successfully') class PostgresDatabaseLock: def __init__(self, lock_name, timeout=10, db=None): @@ -396,7 +393,7 @@ def close_connection(): if DB: DB.close_stale(age=30) except Exception as e: - LOGGER.exception(e) + logger.exception(e) class DataBaseModel(BaseModel): @@ -412,15 +409,15 @@ def init_database_tables(alter_fields=[]): for name, obj in members: if obj != DataBaseModel and issubclass(obj, DataBaseModel): table_objs.append(obj) - LOGGER.info(f"start create table {obj.__name__}") + logger.info(f"start create table {obj.__name__}") try: obj.create_table() - LOGGER.info(f"create table success: {obj.__name__}") + logger.info(f"create table success: {obj.__name__}") except Exception as e: - LOGGER.exception(e) + logger.exception(e) create_failed_list.append(obj.__name__) if create_failed_list: - LOGGER.info(f"create tables failed: {create_failed_list}") + logger.info(f"create tables failed: {create_failed_list}") raise Exception(f"create tables failed: {create_failed_list}") migrate_db() diff --git a/api/db/db_utils.py b/api/db/db_utils.py index 795fc7f7673..47449e79c2a 100644 --- a/api/db/db_utils.py +++ b/api/db/db_utils.py @@ -22,12 +22,6 @@ from api.utils import current_timestamp, timestamp_to_date from api.db.db_models import DB, DataBaseModel -from api.db.runtime_config import RuntimeConfig -from api.utils.log_utils import getLogger -from enum import Enum - - -LOGGER = getLogger() @DB.connection_context() diff --git a/api/db/init_data.py b/api/db/init_data.py index 3ac16e0358c..7ecc5438150 100644 --- a/api/db/init_data.py +++ b/api/db/init_data.py @@ -30,6 +30,7 @@ from api.db.services.user_service import TenantService, UserTenantService from api.settings import CHAT_MDL, EMBEDDING_MDL, ASR_MDL, IMAGE2TEXT_MDL, PARSERS, LLM_FACTORY, API_KEY, LLM_BASE_URL from api.utils.file_utils import get_project_base_directory +from api.utils.log_utils import logger def encode_to_base64(input_string): @@ -69,36 +70,34 @@ def init_superuser(): "api_key": API_KEY, "api_base": LLM_BASE_URL}) if not UserService.save(**user_info): - print("\033[93m【ERROR】\033[0mcan't init admin.") + logger.info("can't init admin.") return TenantService.insert(**tenant) UserTenantService.insert(**usr_tenant) TenantLLMService.insert_many(tenant_llm) - print( - "【INFO】Super user initialized. \033[93memail: admin@ragflow.io, password: admin\033[0m. Changing the password after logining is strongly recomanded.") + logger.info( + "Super user initialized. email: admin@ragflow.io, password: admin. Changing the password after logining is strongly recomanded.") chat_mdl = LLMBundle(tenant["id"], LLMType.CHAT, tenant["llm_id"]) msg = chat_mdl.chat(system="", history=[ {"role": "user", "content": "Hello!"}], gen_conf={}) if msg.find("ERROR: ") == 0: - print( - "\33[91m【ERROR】\33[0m: ", + logger.error( "'{}' dosen't work. {}".format( tenant["llm_id"], msg)) embd_mdl = LLMBundle(tenant["id"], LLMType.EMBEDDING, tenant["embd_id"]) v, c = embd_mdl.encode(["Hello!"]) if c == 0: - print( - "\33[91m【ERROR】\33[0m:", - " '{}' dosen't work!".format( + logger.error( + "'{}' dosen't work!".format( tenant["embd_id"])) def init_llm_factory(): try: LLMService.filter_delete([(LLM.fid == "MiniMax" or LLM.fid == "Minimax")]) - except Exception as e: + except Exception: pass factory_llm_infos = json.load( @@ -111,14 +110,14 @@ def init_llm_factory(): llm_infos = factory_llm_info.pop("llm") try: LLMFactoriesService.save(**factory_llm_info) - except Exception as e: + except Exception: pass LLMService.filter_delete([LLM.fid == factory_llm_info["name"]]) for llm_info in llm_infos: llm_info["fid"] = factory_llm_info["name"] try: LLMService.save(**llm_info) - except Exception as e: + except Exception: pass LLMFactoriesService.filter_delete([LLMFactories.name == "Local"]) @@ -145,7 +144,7 @@ def init_llm_factory(): row = deepcopy(row) row["llm_name"] = "text-embedding-3-large" TenantLLMService.save(**row) - except Exception as e: + except Exception: pass break for kb_id in KnowledgebaseService.get_all_ids(): @@ -169,9 +168,8 @@ def add_graph_templates(): CanvasTemplateService.save(**cnvs) except: CanvasTemplateService.update_by_id(cnvs["id"], cnvs) - except Exception as e: - print("Add graph templates error: ", e) - print("------------", flush=True) + except Exception: + logger.exception("Add graph templates error: ") def init_web_data(): @@ -182,7 +180,7 @@ def init_web_data(): # init_superuser() add_graph_templates() - print("init web data success:{}".format(time.time() - start_time)) + logger.info("init web data success:{}".format(time.time() - start_time)) if __name__ == '__main__': diff --git a/api/db/operatioins.py b/api/db/operatioins.py deleted file mode 100644 index 30220d8f89c..00000000000 --- a/api/db/operatioins.py +++ /dev/null @@ -1,21 +0,0 @@ -# -# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import operator -import time -import typing -from api.utils.log_utils import sql_logger -import peewee diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py index 96bd8c4e58a..1c82cab51a8 100644 --- a/api/db/services/dialog_service.py +++ b/api/db/services/dialog_service.py @@ -26,11 +26,12 @@ from api.db.services.common_service import CommonService from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.llm_service import LLMService, TenantLLMService, LLMBundle -from api.settings import chat_logger, retrievaler, kg_retrievaler +from api.settings import retrievaler, kg_retrievaler from rag.app.resume import forbidden_select_fields4resume from rag.nlp.search import index_name from rag.utils import rmSpace, num_tokens_from_string, encoder from api.utils.file_utils import get_project_base_directory +from api.utils.log_utils import logger class DialogService(CommonService): @@ -177,7 +178,7 @@ def chat(dialog, messages, stream=True, **kwargs): tts_mdl = LLMBundle(dialog.tenant_id, LLMType.TTS) # try to use sql if field mapping is good to go if field_map: - chat_logger.info("Use SQL to retrieval:{}".format(questions[-1])) + logger.info("Use SQL to retrieval:{}".format(questions[-1])) ans = use_sql(questions[-1], field_map, dialog.tenant_id, chat_mdl, prompt_config.get("quote", True)) if ans: yield ans @@ -219,7 +220,7 @@ def chat(dialog, messages, stream=True, **kwargs): doc_ids=attachments, top=dialog.top_k, aggs=False, rerank_mdl=rerank_mdl) knowledges = [ck["content_with_weight"] for ck in kbinfos["chunks"]] - chat_logger.info( + logger.info( "{}->{}".format(" ".join(questions), "\n->".join(knowledges))) retrieval_tm = timer() @@ -291,7 +292,7 @@ def decorate_answer(answer): yield decorate_answer(answer) else: answer = chat_mdl.chat(prompt, msg[1:], gen_conf) - chat_logger.info("User: {}|Assistant: {}".format( + logger.info("User: {}|Assistant: {}".format( msg[-1]["content"], answer)) res = decorate_answer(answer) res["audio_binary"] = tts(tts_mdl, answer) @@ -319,8 +320,7 @@ def get_table(): nonlocal sys_prompt, user_promt, question, tried_times sql = chat_mdl.chat(sys_prompt, [{"role": "user", "content": user_promt}], { "temperature": 0.06}) - print(user_promt, sql) - chat_logger.info(f"“{question}”==>{user_promt} get SQL: {sql}") + logger.info(f"{question} ==> {user_promt} get SQL: {sql}") sql = re.sub(r"[\r\n]+", " ", sql.lower()) sql = re.sub(r".*select ", "select ", sql.lower()) sql = re.sub(r" +", " ", sql) @@ -340,9 +340,7 @@ def get_table(): flds.append(k) sql = "select doc_id,docnm_kwd," + ",".join(flds) + sql[8:] - print(f"“{question}” get SQL(refined): {sql}") - - chat_logger.info(f"“{question}” get SQL(refined): {sql}") + logger.info(f"{question} get SQL(refined): {sql}") tried_times += 1 return retrievaler.sql_retrieval(sql, format="json"), sql @@ -371,10 +369,9 @@ def get_table(): question, sql, tbl["error"] ) tbl, sql = get_table() - chat_logger.info("TRY it again: {}".format(sql)) + logger.info("TRY it again: {}".format(sql)) - chat_logger.info("GET table: {}".format(tbl)) - print(tbl) + logger.info("GET table: {}".format(tbl)) if tbl.get("error") or len(tbl["rows"]) == 0: return None @@ -404,7 +401,7 @@ def get_table(): rows = re.sub(r"T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+Z)?\|", "|", rows) if not docid_idx or not docnm_idx: - chat_logger.warning("SQL missing field: " + sql) + logger.warning("SQL missing field: " + sql) return { "answer": "\n".join([clmns, line, rows]), "reference": {"chunks": [], "doc_aggs": []}, diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 51fd28c4ddb..428f1a4bc85 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -17,7 +17,6 @@ import json import random import re -import traceback from concurrent.futures import ThreadPoolExecutor from copy import deepcopy from datetime import datetime @@ -26,7 +25,7 @@ from peewee import fn from api.db.db_utils import bulk_insert_into_db -from api.settings import stat_logger, docStoreConn +from api.settings import docStoreConn from api.utils import current_timestamp, get_format_time, get_uuid from graphrag.mind_map_extractor import MindMapExtractor from rag.settings import SVR_QUEUE_NAME @@ -40,6 +39,7 @@ from api.db.services.knowledgebase_service import KnowledgebaseService from api.db import StatusEnum from rag.utils.redis_conn import REDIS_CONN +from api.utils.log_utils import logger class DocumentService(CommonService): @@ -387,7 +387,7 @@ def update_progress(cls): cls.update_by_id(d["id"], info) except Exception as e: if str(e).find("'0'") < 0: - stat_logger.error("fetch task exception:" + str(e)) + logger.exception("fetch task exception") @classmethod @DB.connection_context() @@ -544,7 +544,7 @@ def embedding(doc_id, cnts, batch_size=16): "knowledge_graph_kwd": "mind_map" }) except Exception as e: - stat_logger.error("Mind map generation error:", traceback.format_exc()) + logger.exception("Mind map generation error") vects = embedding(doc_id, [c["content_with_weight"] for c in cks]) assert len(cks) == len(vects) diff --git a/api/db/services/file_service.py b/api/db/services/file_service.py index 587e517bd9d..325fdc949e5 100644 --- a/api/db/services/file_service.py +++ b/api/db/services/file_service.py @@ -28,6 +28,7 @@ from api.utils import get_uuid from api.utils.file_utils import filename_type, thumbnail_img from rag.utils.storage_factory import STORAGE_IMPL +from api.utils.log_utils import logger class FileService(CommonService): @@ -272,8 +273,8 @@ def delete_folder_by_pf_id(cls, user_id, folder_id): cls.delete_folder_by_pf_id(user_id, file.id) return cls.model.delete().where((cls.model.tenant_id == user_id) & (cls.model.id == folder_id)).execute(), - except Exception as e: - print(e) + except Exception: + logger.exception("delete_folder_by_pf_id") raise RuntimeError("Database error (File retrieval)!") @classmethod @@ -321,8 +322,8 @@ def add_file_from_kb(cls, doc, kb_folder_id, tenant_id): def move_file(cls, file_ids, folder_id): try: cls.filter_update((cls.model.id << file_ids, ), { 'parent_id': folder_id }) - except Exception as e: - print(e) + except Exception: + logger.exception("move_file") raise RuntimeError("Database error (File move)!") @classmethod diff --git a/api/db/services/llm_service.py b/api/db/services/llm_service.py index 7677b700adc..6e94b15d0ee 100644 --- a/api/db/services/llm_service.py +++ b/api/db/services/llm_service.py @@ -14,12 +14,12 @@ # limitations under the License. # from api.db.services.user_service import TenantService -from api.settings import database_logger from rag.llm import EmbeddingModel, CvModel, ChatModel, RerankModel, Seq2txtModel, TTSModel from api.db import LLMType from api.db.db_models import DB from api.db.db_models import LLMFactories, LLM, TenantLLM from api.db.services.common_service import CommonService +from api.utils.log_utils import logger class LLMFactoriesService(CommonService): @@ -209,40 +209,40 @@ def encode(self, texts: list, batch_size=32): emd, used_tokens = self.mdl.encode(texts, batch_size) if not TenantLLMService.increase_usage( self.tenant_id, self.llm_type, used_tokens): - database_logger.error( - "Can't update token usage for {}/EMBEDDING used_tokens: {}".format(self.tenant_id, used_tokens)) + logger.error( + "LLMBundle.encode can't update token usage for {}/EMBEDDING used_tokens: {}".format(self.tenant_id, used_tokens)) return emd, used_tokens def encode_queries(self, query: str): emd, used_tokens = self.mdl.encode_queries(query) if not TenantLLMService.increase_usage( self.tenant_id, self.llm_type, used_tokens): - database_logger.error( - "Can't update token usage for {}/EMBEDDING used_tokens: {}".format(self.tenant_id, used_tokens)) + logger.error( + "LLMBundle.encode_queries can't update token usage for {}/EMBEDDING used_tokens: {}".format(self.tenant_id, used_tokens)) return emd, used_tokens def similarity(self, query: str, texts: list): sim, used_tokens = self.mdl.similarity(query, texts) if not TenantLLMService.increase_usage( self.tenant_id, self.llm_type, used_tokens): - database_logger.error( - "Can't update token usage for {}/RERANK used_tokens: {}".format(self.tenant_id, used_tokens)) + logger.error( + "LLMBundle.similarity can't update token usage for {}/RERANK used_tokens: {}".format(self.tenant_id, used_tokens)) return sim, used_tokens def describe(self, image, max_tokens=300): txt, used_tokens = self.mdl.describe(image, max_tokens) if not TenantLLMService.increase_usage( self.tenant_id, self.llm_type, used_tokens): - database_logger.error( - "Can't update token usage for {}/IMAGE2TEXT used_tokens: {}".format(self.tenant_id, used_tokens)) + logger.error( + "LLMBundle.describe can't update token usage for {}/IMAGE2TEXT used_tokens: {}".format(self.tenant_id, used_tokens)) return txt def transcription(self, audio): txt, used_tokens = self.mdl.transcription(audio) if not TenantLLMService.increase_usage( self.tenant_id, self.llm_type, used_tokens): - database_logger.error( - "Can't update token usage for {}/SEQUENCE2TXT used_tokens: {}".format(self.tenant_id, used_tokens)) + logger.error( + "LLMBundle.transcription can't update token usage for {}/SEQUENCE2TXT used_tokens: {}".format(self.tenant_id, used_tokens)) return txt def tts(self, text): @@ -250,8 +250,8 @@ def tts(self, text): if isinstance(chunk,int): if not TenantLLMService.increase_usage( self.tenant_id, self.llm_type, chunk, self.llm_name): - database_logger.error( - "Can't update token usage for {}/TTS".format(self.tenant_id)) + logger.error( + "LLMBundle.tts can't update token usage for {}/TTS".format(self.tenant_id)) return yield chunk @@ -259,8 +259,8 @@ def chat(self, system, history, gen_conf): txt, used_tokens = self.mdl.chat(system, history, gen_conf) if isinstance(txt, int) and not TenantLLMService.increase_usage( self.tenant_id, self.llm_type, used_tokens, self.llm_name): - database_logger.error( - "Can't update token usage for {}/CHAT llm_name: {}, used_tokens: {}".format(self.tenant_id, self.llm_name, used_tokens)) + logger.error( + "LLMBundle.chat can't update token usage for {}/CHAT llm_name: {}, used_tokens: {}".format(self.tenant_id, self.llm_name, used_tokens)) return txt def chat_streamly(self, system, history, gen_conf): @@ -268,7 +268,7 @@ def chat_streamly(self, system, history, gen_conf): if isinstance(txt, int): if not TenantLLMService.increase_usage( self.tenant_id, self.llm_type, txt, self.llm_name): - database_logger.error( - "Can't update token usage for {}/CHAT llm_name: {}, content: {}".format(self.tenant_id, self.llm_name, txt)) + logger.error( + "LLMBundle.chat_streamly can't update token usage for {}/CHAT llm_name: {}, content: {}".format(self.tenant_id, self.llm_name, txt)) return yield txt diff --git a/api/ragflow_server.py b/api/ragflow_server.py index 186b00b1116..53208bb4585 100644 --- a/api/ragflow_server.py +++ b/api/ragflow_server.py @@ -27,13 +27,10 @@ from api.db.runtime_config import RuntimeConfig from api.db.services.document_service import DocumentService from api.settings import ( - HOST, - HTTP_PORT, - access_logger, - database_logger, - stat_logger, + HOST, HTTP_PORT ) from api import utils +from api.utils.log_utils import logger from api.db.db_models import init_database_tables as init_web_db from api.db.init_data import init_web_data @@ -45,23 +42,22 @@ def update_progress(): time.sleep(3) try: DocumentService.update_progress() - except Exception as e: - stat_logger.error("update_progress exception:" + str(e)) + except Exception: + logger.exception("update_progress exception") -if __name__ == "__main__": - print( - r""" +if __name__ == '__main__': + logger.info(r""" ____ ___ ______ ______ __ / __ \ / | / ____// ____// /____ _ __ / /_/ // /| | / / __ / /_ / // __ \| | /| / / / _, _// ___ |/ /_/ // __/ / // /_/ /| |/ |/ / /_/ |_|/_/ |_|\____//_/ /_/ \____/ |__/|__/ - """, - flush=True, + """) + logger.info( + f'project base: {utils.file_utils.get_project_base_directory()}' ) - stat_logger.info(f"project base: {utils.file_utils.get_project_base_directory()}") # init db init_web_db() @@ -83,7 +79,7 @@ def update_progress(): RuntimeConfig.DEBUG = args.debug if RuntimeConfig.DEBUG: - stat_logger.info("run on debug mode") + logger.info("run on debug mode") RuntimeConfig.init_env() RuntimeConfig.init_config(JOB_SERVER_HOST=HOST, HTTP_PORT=HTTP_PORT) @@ -91,17 +87,17 @@ def update_progress(): peewee_logger = logging.getLogger("peewee") peewee_logger.propagate = False # rag_arch.common.log.ROpenHandler - peewee_logger.addHandler(database_logger.handlers[0]) - peewee_logger.setLevel(database_logger.level) + peewee_logger.addHandler(logger.handlers[0]) + peewee_logger.setLevel(logger.handlers[0].level) thr = ThreadPoolExecutor(max_workers=1) thr.submit(update_progress) # start http server try: - stat_logger.info("RAG Flow http server start...") + logger.info("RAG Flow http server start...") werkzeug_logger = logging.getLogger("werkzeug") - for h in access_logger.handlers: + for h in logger.handlers: werkzeug_logger.addHandler(h) run_simple( hostname=HOST, diff --git a/api/settings.py b/api/settings.py index 46fb612c448..d2cce38b4fb 100644 --- a/api/settings.py +++ b/api/settings.py @@ -17,24 +17,9 @@ from datetime import date from enum import IntEnum, Enum from api.utils.file_utils import get_project_base_directory -from api.utils.log_utils import LoggerFactory, getLogger import rag.utils.es_conn import rag.utils.infinity_conn -# Logger -LoggerFactory.set_directory( - os.path.join( - get_project_base_directory(), - "logs", - "api")) -# {CRITICAL: 50, FATAL:50, ERROR:40, WARNING:30, WARN:30, INFO:20, DEBUG:10, NOTSET:0} -LoggerFactory.LEVEL = 30 - -stat_logger = getLogger("stat") -access_logger = getLogger("access") -database_logger = getLogger("database") -chat_logger = getLogger("chat") - import rag.utils from rag.nlp import search from graphrag import search as kg_search @@ -47,8 +32,6 @@ RAG_FLOW_CONF_PATH = os.path.join(get_project_base_directory(), "conf") LIGHTEN = int(os.environ.get('LIGHTEN', "0")) -SUBPROCESS_STD_LOG_NAME = "std.log" - ERROR_REPORT = True ERROR_REPORT_WITH_PATH = False diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index 815506a27ed..9c48ce17966 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -35,11 +35,12 @@ from api.db.db_models import APIToken from api.settings import ( REQUEST_MAX_WAIT_SEC, REQUEST_WAIT_SEC, - stat_logger, CLIENT_AUTHENTICATION, HTTP_APP_KEY, SECRET_KEY + CLIENT_AUTHENTICATION, HTTP_APP_KEY, SECRET_KEY ) from api.settings import RetCode from api.utils import CustomJSONEncoder, get_uuid from api.utils import json_dumps +from api.utils.log_utils import logger requests.models.complexjson.dumps = functools.partial( json.dumps, cls=CustomJSONEncoder) @@ -117,7 +118,7 @@ def get_data_error_result(code=RetCode.DATA_ERROR, def server_error_response(e): - stat_logger.exception(e) + logger.exception(e) try: if e.code == 401: return get_json_result(code=401, message=repr(e)) @@ -258,7 +259,7 @@ def construct_json_result(code=RetCode.SUCCESS, message='success', data=None): def construct_error_response(e): - stat_logger.exception(e) + logger.exception(e) try: if e.code == 401: return construct_json_result(code=RetCode.UNAUTHORIZED, message=repr(e)) diff --git a/api/utils/log_utils.py b/api/utils/log_utils.py index 5640ffe7128..ffe08097893 100644 --- a/api/utils/log_utils.py +++ b/api/utils/log_utils.py @@ -14,300 +14,38 @@ # limitations under the License. # import os -import typing -import traceback import logging -import inspect -from logging.handlers import TimedRotatingFileHandler -from threading import RLock +from logging.handlers import RotatingFileHandler -from api.utils import file_utils +from api.utils.file_utils import get_project_base_directory +LOG_LEVEL = logging.INFO +LOG_FILE = os.path.abspath(os.path.join(get_project_base_directory(), "logs", f"ragflow_{os.getpid()}.log")) +LOG_FORMAT = "%(asctime)-15s %(levelname)-8s %(process)d %(message)s" +logger = None -class LoggerFactory(object): - TYPE = "FILE" - LOG_FORMAT = "[%(levelname)s] [%(asctime)s] [%(module)s.%(funcName)s] [line:%(lineno)d]: %(message)s" - logging.basicConfig(format=LOG_FORMAT) - LEVEL = logging.DEBUG - logger_dict = {} - global_handler_dict = {} - - LOG_DIR = None - PARENT_LOG_DIR = None - log_share = True - - append_to_parent_log = None - - lock = RLock() - # CRITICAL = 50 - # FATAL = CRITICAL - # ERROR = 40 - # WARNING = 30 - # WARN = WARNING - # INFO = 20 - # DEBUG = 10 - # NOTSET = 0 - levels = (10, 20, 30, 40) - schedule_logger_dict = {} - - @staticmethod - def set_directory(directory=None, parent_log_dir=None, - append_to_parent_log=None, force=False): - if parent_log_dir: - LoggerFactory.PARENT_LOG_DIR = parent_log_dir - if append_to_parent_log: - LoggerFactory.append_to_parent_log = append_to_parent_log - with LoggerFactory.lock: - if not directory: - directory = file_utils.get_project_base_directory("logs") - if not LoggerFactory.LOG_DIR or force: - LoggerFactory.LOG_DIR = directory - if LoggerFactory.log_share: - oldmask = os.umask(000) - os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True) - os.umask(oldmask) - else: - os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True) - for loggerName, ghandler in LoggerFactory.global_handler_dict.items(): - for className, (logger, - handler) in LoggerFactory.logger_dict.items(): - logger.removeHandler(ghandler) - ghandler.close() - LoggerFactory.global_handler_dict = {} - for className, (logger, - handler) in LoggerFactory.logger_dict.items(): - logger.removeHandler(handler) - _handler = None - if handler: - handler.close() - if className != "default": - _handler = LoggerFactory.get_handler(className) - logger.addHandler(_handler) - LoggerFactory.assemble_global_handler(logger) - LoggerFactory.logger_dict[className] = logger, _handler - - @staticmethod - def new_logger(name): - logger = logging.getLogger(name) - logger.propagate = False - logger.setLevel(LoggerFactory.LEVEL) +def getLogger(): + global logger + if logger is not None: return logger - @staticmethod - def get_logger(class_name=None): - with LoggerFactory.lock: - if class_name in LoggerFactory.logger_dict.keys(): - logger, handler = LoggerFactory.logger_dict[class_name] - if not logger: - logger, handler = LoggerFactory.init_logger(class_name) - else: - logger, handler = LoggerFactory.init_logger(class_name) - return logger - - @staticmethod - def get_global_handler(logger_name, level=None, log_dir=None): - if not LoggerFactory.LOG_DIR: - return logging.StreamHandler() - if log_dir: - logger_name_key = logger_name + "_" + log_dir - else: - logger_name_key = logger_name + "_" + LoggerFactory.LOG_DIR - # if loggerName not in LoggerFactory.globalHandlerDict: - if logger_name_key not in LoggerFactory.global_handler_dict: - with LoggerFactory.lock: - if logger_name_key not in LoggerFactory.global_handler_dict: - handler = LoggerFactory.get_handler( - logger_name, level, log_dir) - LoggerFactory.global_handler_dict[logger_name_key] = handler - return LoggerFactory.global_handler_dict[logger_name_key] - - @staticmethod - def get_handler(class_name, level=None, log_dir=None, - log_type=None, job_id=None): - if not log_type: - if not LoggerFactory.LOG_DIR or not class_name: - return logging.StreamHandler() - # return Diy_StreamHandler() - - if not log_dir: - log_file = os.path.join( - LoggerFactory.LOG_DIR, - "{}.log".format(class_name)) - else: - log_file = os.path.join(log_dir, "{}.log".format(class_name)) - else: - log_file = os.path.join(log_dir, "rag_flow_{}.log".format( - log_type) if level == LoggerFactory.LEVEL else 'rag_flow_{}_error.log'.format(log_type)) - - os.makedirs(os.path.dirname(log_file), exist_ok=True) - if LoggerFactory.log_share: - handler = ROpenHandler(log_file, - when='D', - interval=1, - backupCount=14, - delay=True) - else: - handler = TimedRotatingFileHandler(log_file, - when='D', - interval=1, - backupCount=14, - delay=True) - if level: - handler.level = level - - return handler - - @staticmethod - def init_logger(class_name): - with LoggerFactory.lock: - logger = LoggerFactory.new_logger(class_name) - handler = None - if class_name: - handler = LoggerFactory.get_handler(class_name) - logger.addHandler(handler) - LoggerFactory.logger_dict[class_name] = logger, handler - - else: - LoggerFactory.logger_dict["default"] = logger, handler - - LoggerFactory.assemble_global_handler(logger) - return logger, handler - - @staticmethod - def assemble_global_handler(logger): - if LoggerFactory.LOG_DIR: - for level in LoggerFactory.levels: - if level >= LoggerFactory.LEVEL: - level_logger_name = logging._levelToName[level] - logger.addHandler( - LoggerFactory.get_global_handler( - level_logger_name, level)) - if LoggerFactory.append_to_parent_log and LoggerFactory.PARENT_LOG_DIR: - for level in LoggerFactory.levels: - if level >= LoggerFactory.LEVEL: - level_logger_name = logging._levelToName[level] - logger.addHandler( - LoggerFactory.get_global_handler(level_logger_name, level, LoggerFactory.PARENT_LOG_DIR)) - - -def setDirectory(directory=None): - LoggerFactory.set_directory(directory) - - -def setLevel(level): - LoggerFactory.LEVEL = level - - -def getLogger(className=None, useLevelFile=False): - if className is None: - frame = inspect.stack()[1] - module = inspect.getmodule(frame[0]) - className = 'stat' - return LoggerFactory.get_logger(className) - + print(f"log file path: {LOG_FILE}") + os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) + logger = logging.getLogger("ragflow") + logger.setLevel(LOG_LEVEL) -def exception_to_trace_string(ex): - return "".join(traceback.TracebackException.from_exception(ex).format()) + handler1 = RotatingFileHandler(LOG_FILE, maxBytes=10*1024*1024, backupCount=5) + handler1.setLevel(LOG_LEVEL) + formatter1 = logging.Formatter(LOG_FORMAT) + handler1.setFormatter(formatter1) + logger.addHandler(handler1) + handler2 = logging.StreamHandler() + handler2.setLevel(LOG_LEVEL) + formatter2 = logging.Formatter(LOG_FORMAT) + handler2.setFormatter(formatter2) + logger.addHandler(handler2) -class ROpenHandler(TimedRotatingFileHandler): - def _open(self): - prevumask = os.umask(000) - rtv = TimedRotatingFileHandler._open(self) - os.umask(prevumask) - return rtv - - -def sql_logger(job_id='', log_type='sql'): - key = job_id + log_type - if key in LoggerFactory.schedule_logger_dict.keys(): - return LoggerFactory.schedule_logger_dict[key] - return get_job_logger(job_id=job_id, log_type=log_type) - - -def ready_log(msg, job=None, task=None, role=None, party_id=None, detail=None): - prefix, suffix = base_msg(job, task, role, party_id, detail) - return f"{prefix}{msg} ready{suffix}" - - -def start_log(msg, job=None, task=None, role=None, party_id=None, detail=None): - prefix, suffix = base_msg(job, task, role, party_id, detail) - return f"{prefix}start to {msg}{suffix}" - - -def successful_log(msg, job=None, task=None, role=None, - party_id=None, detail=None): - prefix, suffix = base_msg(job, task, role, party_id, detail) - return f"{prefix}{msg} successfully{suffix}" - - -def warning_log(msg, job=None, task=None, role=None, - party_id=None, detail=None): - prefix, suffix = base_msg(job, task, role, party_id, detail) - return f"{prefix}{msg} is not effective{suffix}" - - -def failed_log(msg, job=None, task=None, role=None, - party_id=None, detail=None): - prefix, suffix = base_msg(job, task, role, party_id, detail) - return f"{prefix}failed to {msg}{suffix}" - - -def base_msg(job=None, task=None, role: str = None, - party_id: typing.Union[str, int] = None, detail=None): - if detail: - detail_msg = f" detail: \n{detail}" - else: - detail_msg = "" - if task is not None: - return f"task {task.f_task_id} {task.f_task_version} ", f" on {task.f_role} {task.f_party_id}{detail_msg}" - elif job is not None: - return "", f" on {job.f_role} {job.f_party_id}{detail_msg}" - elif role and party_id: - return "", f" on {role} {party_id}{detail_msg}" - else: - return "", f"{detail_msg}" - - -def exception_to_trace_string(ex): - return "".join(traceback.TracebackException.from_exception(ex).format()) - - -def get_logger_base_dir(): - job_log_dir = file_utils.get_rag_flow_directory('logs') - return job_log_dir - - -def get_job_logger(job_id, log_type): - rag_flow_log_dir = file_utils.get_rag_flow_directory('logs', 'rag_flow') - job_log_dir = file_utils.get_rag_flow_directory('logs', job_id) - if not job_id: - log_dirs = [rag_flow_log_dir] - else: - if log_type == 'audit': - log_dirs = [job_log_dir, rag_flow_log_dir] - else: - log_dirs = [job_log_dir] - if LoggerFactory.log_share: - oldmask = os.umask(000) - os.makedirs(job_log_dir, exist_ok=True) - os.makedirs(rag_flow_log_dir, exist_ok=True) - os.umask(oldmask) - else: - os.makedirs(job_log_dir, exist_ok=True) - os.makedirs(rag_flow_log_dir, exist_ok=True) - logger = LoggerFactory.new_logger(f"{job_id}_{log_type}") - for job_log_dir in log_dirs: - handler = LoggerFactory.get_handler(class_name=None, level=LoggerFactory.LEVEL, - log_dir=job_log_dir, log_type=log_type, job_id=job_id) - error_handler = LoggerFactory.get_handler( - class_name=None, - level=logging.ERROR, - log_dir=job_log_dir, - log_type=log_type, - job_id=job_id) - logger.addHandler(handler) - logger.addHandler(error_handler) - with LoggerFactory.lock: - LoggerFactory.schedule_logger_dict[job_id + log_type] = logger return logger + +logger = getLogger() diff --git a/deepdoc/parser/pdf_parser.py b/deepdoc/parser/pdf_parser.py index a938cbb3e72..0d4572bbde9 100644 --- a/deepdoc/parser/pdf_parser.py +++ b/deepdoc/parser/pdf_parser.py @@ -19,13 +19,14 @@ import re import pdfplumber import logging -from PIL import Image, ImageDraw +from PIL import Image import numpy as np from timeit import default_timer as timer from pypdf import PdfReader as pdf2_read from api.settings import LIGHTEN from api.utils.file_utils import get_project_base_directory +from api.utils.log_utils import logger from deepdoc.vision import OCR, Recognizer, LayoutRecognizer, TableStructureRecognizer from rag.nlp import rag_tokenizer from copy import deepcopy @@ -49,15 +50,15 @@ def __init__(self): import torch if torch.cuda.is_available(): self.updown_cnt_mdl.set_param({"device": "cuda"}) - except Exception as e: - logging.error(str(e)) + except Exception: + logger.exception("RAGFlowPdfParser __init__") try: model_dir = os.path.join( get_project_base_directory(), "rag/res/deepdoc") self.updown_cnt_mdl.load_model(os.path.join( model_dir, "updown_concat_xgb.model")) - except Exception as e: + except Exception: model_dir = snapshot_download( repo_id="InfiniFlow/text_concat_xgb_v1.0", local_dir=os.path.join(get_project_base_directory(), "rag/res/deepdoc"), @@ -187,7 +188,7 @@ def _has_color(self, o): return True def _table_transformer_job(self, ZM): - logging.info("Table processing...") + logger.info("Table processing...") imgs, pos = [], [] tbcnt = [0] MARGIN = 10 @@ -425,12 +426,12 @@ def _naive_vertical_merge(self): detach_feats = [b["x1"] < b_["x0"], b["x0"] > b_["x1"]] if (any(feats) and not any(concatting_feats)) or any(detach_feats): - print( + logger.info("{} {} {} {}".format( b["text"], b_["text"], any(feats), any(concatting_feats), - any(detach_feats)) + )) i += 1 continue # merge up and down @@ -726,14 +727,14 @@ def nearest(tbls): # continue if tv < fv and tk: tables[tk].insert(0, c) - logging.debug( + logger.debug( "TABLE:" + self.boxes[i]["text"] + "; Cap: " + tk) elif fk: figures[fk].insert(0, c) - logging.debug( + logger.debug( "FIGURE:" + self.boxes[i]["text"] + "; Cap: " + @@ -760,7 +761,7 @@ def cropout(bxs, ltype, poss): if ii is not None: b = louts[ii] else: - logging.warn( + logger.warn( f"Missing layout match: {pn + 1},%s" % (bxs[0].get( "layoutno", ""))) @@ -918,8 +919,8 @@ def dfs(line, st): if usefull(boxes[0]): dfs(boxes[0], 0) else: - logging.debug("WASTE: " + boxes[0]["text"]) - except Exception as e: + logger.debug("WASTE: " + boxes[0]["text"]) + except Exception: pass boxes.pop(0) mw = np.mean(widths) @@ -927,7 +928,7 @@ def dfs(line, st): res.append( "\n".join([c["text"] + self._line_tag(c, ZM) for c in lines])) else: - logging.debug("REMOVED: " + + logger.debug("REMOVED: " + "<<".join([c["text"] for c in lines])) return "\n\n".join(res) @@ -938,8 +939,8 @@ def total_page_number(fnm, binary=None): pdf = pdfplumber.open( fnm) if not binary else pdfplumber.open(BytesIO(binary)) return len(pdf.pages) - except Exception as e: - logging.error(str(e)) + except Exception: + logger.exception("total_page_number") def __images__(self, fnm, zoomin=3, page_from=0, page_to=299, callback=None): @@ -962,8 +963,8 @@ def __images__(self, fnm, zoomin=3, page_from=0, self.page_chars = [[{**c, 'top': c['top'], 'bottom': c['bottom']} for c in page.dedupe_chars().chars if self._has_color(c)] for page in self.pdf.pages[page_from:page_to]] self.total_page = len(self.pdf.pages) - except Exception as e: - logging.error(str(e)) + except Exception: + logger.exception("RAGFlowPdfParser __images__") self.outlines = [] try: @@ -979,11 +980,11 @@ def dfs(arr, depth): dfs(outlines, 0) except Exception as e: - logging.warning(f"Outlines exception: {e}") + logger.warning(f"Outlines exception: {e}") if not self.outlines: - logging.warning(f"Miss outlines") + logger.warning("Miss outlines") - logging.info("Images converted.") + logger.info("Images converted.") self.is_english = [re.search(r"[a-zA-Z0-9,/¸;:'\[\]\(\)!@#$%^&*\"?<>._-]{30,}", "".join( random.choices([c["text"] for c in self.page_chars[i]], k=min(100, len(self.page_chars[i]))))) for i in range(len(self.page_chars))] @@ -1023,7 +1024,7 @@ def dfs(arr, depth): self.is_english = re.search(r"[\na-zA-Z0-9,/¸;:'\[\]\(\)!@#$%^&*\"?<>._-]{30,}", "".join([b["text"] for b in random.choices(bxes, k=min(30, len(bxes)))])) - logging.info("Is it English:", self.is_english) + logger.info("Is it English:", self.is_english) self.page_cum_height = np.cumsum(self.page_cum_height) assert len(self.page_cum_height) == len(self.page_images) + 1 @@ -1162,10 +1163,10 @@ def dfs(arr, depth): dfs(a, depth + 1) dfs(outlines, 0) - except Exception as e: - logging.warning(f"Outlines exception: {e}") + except Exception: + logger.exception("Outlines exception") if not self.outlines: - logging.warning(f"Miss outlines") + logger.warning("Miss outlines") return [(l, "") for l in lines], [] diff --git a/deepdoc/parser/resume/entities/corporations.py b/deepdoc/parser/resume/entities/corporations.py index d653b3e43b8..20f606e17ed 100644 --- a/deepdoc/parser/resume/entities/corporations.py +++ b/deepdoc/parser/resume/entities/corporations.py @@ -11,10 +11,15 @@ # limitations under the License. # -import re,json,os +import re +import json +import os import pandas as pd from rag.nlp import rag_tokenizer from . import regions +from api.utils.log_utils import logger + + current_file_path = os.path.dirname(os.path.abspath(__file__)) GOODS = pd.read_csv(os.path.join(current_file_path, "res/corp_baike_len.csv"), sep="\t", header=0).fillna(0) GOODS["cid"] = GOODS["cid"].astype(str) @@ -27,7 +32,7 @@ def baike(cid, default_v=0): global GOODS try: return GOODS.loc[str(cid), "len"] - except Exception as e: + except Exception: pass return default_v @@ -65,7 +70,8 @@ def rmNoise(n): GOOD_CORP = set([corpNorm(rmNoise(c), False) for c in GOOD_CORP]) for c,v in CORP_TAG.items(): cc = corpNorm(rmNoise(c), False) - if not cc: print (c) + if not cc: + logger.info(c) CORP_TAG = {corpNorm(rmNoise(c), False):v for c,v in CORP_TAG.items()} def is_good(nm): diff --git a/deepdoc/parser/resume/step_two.py b/deepdoc/parser/resume/step_two.py index 4f8b79d7401..d480b4520aa 100644 --- a/deepdoc/parser/resume/step_two.py +++ b/deepdoc/parser/resume/step_two.py @@ -11,13 +11,19 @@ # limitations under the License. # -import re, copy, time, datetime, demjson3, \ - traceback, signal +import re +import copy +import time +import datetime +import demjson3 +import traceback +import signal import numpy as np from deepdoc.parser.resume.entities import degrees, schools, corporations from rag.nlp import rag_tokenizer, surname from xpinyin import Pinyin from contextlib import contextmanager +from api.utils.log_utils import logger class TimeoutException(Exception): pass @@ -79,7 +85,7 @@ def forEdu(cv): y, m, d = getYMD(dt) st_dt.append(str(y)) e["start_dt_kwd"] = str(y) - except Exception as e: + except Exception: pass r = schools.select(n.get("school_name", "")) @@ -158,7 +164,7 @@ def forEdu(cv): y, m, d = getYMD(edu_end_dt) cv["work_exp_flt"] = min(int(str(datetime.date.today())[0:4]) - int(y), cv.get("work_exp_flt", 1000)) except Exception as e: - print("EXCEPTION: ", e, edu_end_dt, cv.get("work_exp_flt")) + logger.exception("forEdu {} {} {}".format(e, edu_end_dt, cv.get("work_exp_flt"))) if sch: cv["school_name_kwd"] = sch if (len(cv.get("degree_kwd", [])) >= 1 and "本科" in cv["degree_kwd"]) \ @@ -233,7 +239,7 @@ def forWork(cv): if type(n) == type(""): try: n = json_loads(n) - except Exception as e: + except Exception: continue if n.get("start_time") and (not work_st_tm or n["start_time"] < work_st_tm): work_st_tm = n["start_time"] @@ -269,8 +275,8 @@ def forWork(cv): try: duas.append((datetime.datetime.strptime(ed, "%Y-%m-%d") - datetime.datetime.strptime(st, "%Y-%m-%d")).days) - except Exception as e: - print("kkkkkkkkkkkkkkkkkkkk", n.get("start_time"), n.get("end_time")) + except Exception: + logger.exception("forWork {} {}".format(n.get("start_time"), n.get("end_time"))) if n.get("scale"): r = re.search(r"^([0-9]+)", str(n["scale"])) @@ -327,7 +333,7 @@ def forWork(cv): y, m, d = getYMD(work_st_tm) cv["work_exp_flt"] = min(int(str(datetime.date.today())[0:4]) - int(y), cv.get("work_exp_flt", 1000)) except Exception as e: - print("EXCEPTION: ", e, work_st_tm, cv.get("work_exp_flt")) + logger.exception("forWork {} {} {}".format(e, work_st_tm, cv.get("work_exp_flt"))) cv["job_num_int"] = 0 if duas: @@ -457,8 +463,8 @@ def hasValues(flds): t = k[:-4] cv[f"{t}_kwd"] = nms cv[f"{t}_tks"] = rag_tokenizer.tokenize(" ".join(nms)) - except Exception as e: - print("【EXCEPTION】:", str(traceback.format_exc()), cv[k]) + except Exception: + logger.exception("parse {} {}".format(str(traceback.format_exc()), cv[k])) cv[k] = [] # tokenize fields @@ -524,7 +530,7 @@ def hasValues(flds): if not y: y = "2012" if not m: m = "01" if not d: d = "01" - cv["updated_at_dt"] = f"%s-%02d-%02d 00:00:00" % (y, int(m), int(d)) + cv["updated_at_dt"] = "%s-%02d-%02d 00:00:00" % (y, int(m), int(d)) # long text tokenize if cv.get("responsibilities"): cv["responsibilities_ltks"] = rag_tokenizer.tokenize(rmHtmlTag(cv["responsibilities"])) @@ -556,10 +562,10 @@ def hasValues(flds): cv["work_exp_flt"] = (time.time() - int(int(cv["work_start_time"]) / 1000)) / 3600. / 24. / 365. elif re.match(r"[0-9]{4}[^0-9]", str(cv["work_start_time"])): y, m, d = getYMD(str(cv["work_start_time"])) - cv["work_start_dt"] = f"%s-%02d-%02d 00:00:00" % (y, int(m), int(d)) + cv["work_start_dt"] = "%s-%02d-%02d 00:00:00" % (y, int(m), int(d)) cv["work_exp_flt"] = int(str(datetime.date.today())[0:4]) - int(y) except Exception as e: - print("【EXCEPTION】", e, "==>", cv.get("work_start_time")) + logger.exception("parse {} ==> {}".format(e, cv.get("work_start_time"))) if "work_exp_flt" not in cv and cv.get("work_experience", 0): cv["work_exp_flt"] = int(cv["work_experience"]) / 12. keys = list(cv.keys()) @@ -574,7 +580,7 @@ def hasValues(flds): cv["tob_resume_id"] = str(cv["tob_resume_id"]) cv["id"] = cv["tob_resume_id"] - print("CCCCCCCCCCCCCCC") + logger.info("CCCCCCCCCCCCCCC") return dealWithInt64(cv) @@ -589,4 +595,3 @@ def dealWithInt64(d): if isinstance(d, np.integer): d = int(d) return d - diff --git a/deepdoc/vision/operators.py b/deepdoc/vision/operators.py index 305a028b03b..ff625906dbd 100644 --- a/deepdoc/vision/operators.py +++ b/deepdoc/vision/operators.py @@ -20,6 +20,7 @@ import numpy as np import math from PIL import Image +from api.utils.log_utils import logger class DecodeImage(object): @@ -402,7 +403,7 @@ def resize_image_type0(self, img): return None, (None, None) img = cv2.resize(img, (int(resize_w), int(resize_h))) except BaseException: - print(img.shape, resize_w, resize_h) + logger.exception("{} {} {}".format(img.shape, resize_w, resize_h)) sys.exit(0) ratio_h = resize_h / float(h) ratio_w = resize_w / float(w) @@ -452,7 +453,6 @@ def __call__(self, data): return data def resize_image_for_totaltext(self, im, max_side_len=512): - h, w, _ = im.shape resize_w = w resize_h = h diff --git a/deepdoc/vision/recognizer.py b/deepdoc/vision/recognizer.py index a7fe5047c53..e4d2107cf47 100644 --- a/deepdoc/vision/recognizer.py +++ b/deepdoc/vision/recognizer.py @@ -19,6 +19,7 @@ from api.utils.file_utils import get_project_base_directory from .operators import * +from api.utils.log_utils import logger class Recognizer(object): @@ -439,7 +440,7 @@ def __call__(self, image_list, thr=0.7, batch_size=16): end_index = min((i + 1) * batch_size, len(imgs)) batch_image_list = imgs[start_index:end_index] inputs = self.preprocess(batch_image_list) - print("preprocess") + logger.info("preprocess") for ins in inputs: bb = self.postprocess(self.ort_sess.run(None, {k:v for k,v in ins.items() if k in self.input_names})[0], ins, thr) res.append(bb) diff --git a/deepdoc/vision/seeit.py b/deepdoc/vision/seeit.py index 96046d76e5e..e7535984960 100644 --- a/deepdoc/vision/seeit.py +++ b/deepdoc/vision/seeit.py @@ -14,6 +14,7 @@ import os import PIL from PIL import ImageDraw +from api.utils.log_utils import logger def save_results(image_list, results, labels, output_dir='output/', threshold=0.5): @@ -24,7 +25,7 @@ def save_results(image_list, results, labels, output_dir='output/', threshold=0. out_path = os.path.join(output_dir, f"{idx}.jpg") im.save(out_path, quality=95) - print("save result to: " + out_path) + logger.info("save result to: " + out_path) def draw_box(im, result, lables, threshold=0.5): diff --git a/deepdoc/vision/t_recognizer.py b/deepdoc/vision/t_recognizer.py index 7f9ff8e031a..7a6f2e47259 100644 --- a/deepdoc/vision/t_recognizer.py +++ b/deepdoc/vision/t_recognizer.py @@ -10,7 +10,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os, sys +import os +import sys +from api.utils.log_utils import logger + sys.path.insert( 0, os.path.abspath( @@ -56,7 +59,7 @@ def main(args): } for t in lyt] img = draw_box(images[i], lyt, labels, float(args.threshold)) img.save(outputs[i], quality=95) - print("save result to: " + outputs[i]) + logger.info("save result to: " + outputs[i]) def get_table_html(img, tb_cpns, ocr): diff --git a/graphrag/claim_extractor.py b/graphrag/claim_extractor.py index d986b24546a..edb59e87881 100644 --- a/graphrag/claim_extractor.py +++ b/graphrag/claim_extractor.py @@ -7,7 +7,6 @@ import argparse import json -import logging import re import traceback from dataclasses import dataclass @@ -18,12 +17,12 @@ from graphrag.claim_prompt import CLAIM_EXTRACTION_PROMPT, CONTINUE_PROMPT, LOOP_PROMPT from rag.llm.chat_model import Base as CompletionLLM from graphrag.utils import ErrorHandlerFn, perform_variable_replacements +from api.utils.log_utils import logger DEFAULT_TUPLE_DELIMITER = "<|>" DEFAULT_RECORD_DELIMITER = "##" DEFAULT_COMPLETION_DELIMITER = "<|COMPLETE|>" CLAIM_MAX_GLEANINGS = 1 -log = logging.getLogger(__name__) @dataclass @@ -127,7 +126,7 @@ def __call__( ] source_doc_map[document_id] = text except Exception as e: - log.exception("error extracting claim") + logger.exception("error extracting claim") self._on_error( e, traceback.format_exc(), @@ -266,4 +265,4 @@ def pull_field(index: int, fields: list[str]) -> str | None: "claim_description": "" } claim = ex(info) - print(json.dumps(claim.output, ensure_ascii=False, indent=2)) + logger.info(json.dumps(claim.output, ensure_ascii=False, indent=2)) diff --git a/graphrag/community_reports_extractor.py b/graphrag/community_reports_extractor.py index 5efc120afe3..468ee18c38a 100644 --- a/graphrag/community_reports_extractor.py +++ b/graphrag/community_reports_extractor.py @@ -6,11 +6,10 @@ """ import json -import logging import re import traceback from dataclasses import dataclass -from typing import Any, List, Callable +from typing import List, Callable import networkx as nx import pandas as pd from graphrag import leiden @@ -20,8 +19,7 @@ from graphrag.utils import ErrorHandlerFn, perform_variable_replacements, dict_has_keys_with_types from rag.utils import num_tokens_from_string from timeit import default_timer as timer - -log = logging.getLogger(__name__) +from api.utils.log_utils import logger @dataclass @@ -82,7 +80,7 @@ def __call__(self, graph: nx.Graph, callback: Callable | None = None): response = re.sub(r"[^\}]*$", "", response) response = re.sub(r"\{\{", "{", response) response = re.sub(r"\}\}", "}", response) - print(response) + logger.info(response) response = json.loads(response) if not dict_has_keys_with_types(response, [ ("title", str), @@ -94,7 +92,7 @@ def __call__(self, graph: nx.Graph, callback: Callable | None = None): response["weight"] = weight response["entities"] = ents except Exception as e: - print("ERROR: ", traceback.format_exc()) + logger.exception("CommunityReportsExtractor got exception") self._on_error(e, traceback.format_exc(), None) continue @@ -127,5 +125,4 @@ def finding_explanation(finding: dict): report_sections = "\n\n".join( f"## {finding_summary(f)}\n\n{finding_explanation(f)}" for f in findings ) - return f"# {title}\n\n{summary}\n\n{report_sections}" diff --git a/graphrag/index.py b/graphrag/index.py index 018b438b9a6..4b72333ccf0 100644 --- a/graphrag/index.py +++ b/graphrag/index.py @@ -28,6 +28,7 @@ from graphrag.mind_map_extractor import MindMapExtractor from rag.nlp import rag_tokenizer from rag.utils import num_tokens_from_string +from api.utils.log_utils import logger def graph_merge(g1, g2): @@ -94,7 +95,7 @@ def build_knowledge_graph_chunks(tenant_id: str, chunks: List[str], callback, en chunks = [] for n, attr in graph.nodes(data=True): if attr.get("rank", 0) == 0: - print(f"Ignore entity: {n}") + logger.info(f"Ignore entity: {n}") continue chunk = { "name_kwd": n, @@ -136,7 +137,7 @@ def build_knowledge_graph_chunks(tenant_id: str, chunks: List[str], callback, en mg = mindmap(_chunks).output if not len(mg.keys()): return chunks - print(json.dumps(mg, ensure_ascii=False, indent=2)) + logger.info(json.dumps(mg, ensure_ascii=False, indent=2)) chunks.append( { "content_with_weight": json.dumps(mg, ensure_ascii=False, indent=2), diff --git a/graphrag/mind_map_extractor.py b/graphrag/mind_map_extractor.py index 6705fa6e64d..8ceebe4e226 100644 --- a/graphrag/mind_map_extractor.py +++ b/graphrag/mind_map_extractor.py @@ -18,7 +18,6 @@ import logging import os import re -import logging import traceback from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass @@ -30,6 +29,7 @@ import markdown_to_json from functools import reduce from rag.utils import num_tokens_from_string +from api.utils.log_utils import logger @dataclass @@ -193,6 +193,6 @@ def _process_document( gen_conf = {"temperature": 0.5} response = self._llm.chat(text, [{"role": "user", "content": "Output:"}], gen_conf) response = re.sub(r"```[^\n]*", "", response) - print(response) - print("---------------------------------------------------\n", self._todict(markdown_to_json.dictify(response))) + logger.info(response) + logger.info(self._todict(markdown_to_json.dictify(response))) return self._todict(markdown_to_json.dictify(response)) diff --git a/intergrations/chatgpt-on-wechat/plugins/ragflow_chat.py b/intergrations/chatgpt-on-wechat/plugins/ragflow_chat.py index c280a93dbfb..7be661f8a23 100644 --- a/intergrations/chatgpt-on-wechat/plugins/ragflow_chat.py +++ b/intergrations/chatgpt-on-wechat/plugins/ragflow_chat.py @@ -2,7 +2,7 @@ from bridge.context import ContextType # Import Context, ContextType from bridge.reply import Reply, ReplyType # Import Reply, ReplyType from bridge import * -from common.log import logger +from api.utils.log_utils import logger from plugins import Plugin, register # Import Plugin and register from plugins.event import Event, EventContext, EventAction # Import event-related classes @@ -76,7 +76,7 @@ def get_ragflow_reply(self, user_input, session_id): logger.error(f"[RAGFlowChat] HTTP error when creating conversation: {response.status_code}") return f"Sorry, unable to connect to RAGFlow API (create conversation). HTTP status code: {response.status_code}" except Exception as e: - logger.exception(f"[RAGFlowChat] Exception when creating conversation: {e}") + logger.exception("[RAGFlowChat] Exception when creating conversation") return f"Sorry, an internal error occurred: {str(e)}" # Step 2: Send the message and get a reply @@ -108,5 +108,5 @@ def get_ragflow_reply(self, user_input, session_id): logger.error(f"[RAGFlowChat] HTTP error when getting answer: {response.status_code}") return f"Sorry, unable to connect to RAGFlow API (get reply). HTTP status code: {response.status_code}" except Exception as e: - logger.exception(f"[RAGFlowChat] Exception when getting answer: {e}") + logger.exception("[RAGFlowChat] Exception when getting answer") return f"Sorry, an internal error occurred: {str(e)}" diff --git a/rag/app/book.py b/rag/app/book.py index cf29bb11955..9aa88c3dfac 100644 --- a/rag/app/book.py +++ b/rag/app/book.py @@ -20,6 +20,7 @@ tokenize_chunks from rag.nlp import rag_tokenizer from deepdoc.parser import PdfParser, DocxParser, PlainParser, HtmlParser +from api.utils.log_utils import logger class Pdf(PdfParser): @@ -38,7 +39,7 @@ def __call__(self, filename, binary=None, from_page=0, start = timer() self._layouts_rec(zoomin) callback(0.67, "Layout analysis finished") - print("layouts:", timer() - start) + logger.info("layouts: {}".format(timer() - start)) self._table_transformer_job(zoomin) callback(0.68, "Table analysis finished") self._text_merge() diff --git a/rag/app/email.py b/rag/app/email.py index 9c843e6cfa9..1c3350222bf 100644 --- a/rag/app/email.py +++ b/rag/app/email.py @@ -18,7 +18,7 @@ from rag.nlp import rag_tokenizer, naive_merge, tokenize_chunks from deepdoc.parser import HtmlParser, TxtParser from timeit import default_timer as timer -from rag.settings import cron_logger +from api.utils.log_utils import logger import io @@ -86,7 +86,7 @@ def _add_content(msg, content_type): ) main_res.extend(tokenize_chunks(chunks, doc, eng, None)) - cron_logger.info("naive_merge({}): {}".format(filename, timer() - st)) + logger.info("naive_merge({}): {}".format(filename, timer() - st)) # get the attachment info for part in msg.iter_attachments(): content_disposition = part.get("Content-Disposition") diff --git a/rag/app/laws.py b/rag/app/laws.py index 74aac8a2929..38e7d106e70 100644 --- a/rag/app/laws.py +++ b/rag/app/laws.py @@ -21,7 +21,7 @@ make_colon_as_title, tokenize_chunks, docx_question_level from rag.nlp import rag_tokenizer from deepdoc.parser import PdfParser, DocxParser, PlainParser, HtmlParser -from rag.settings import cron_logger +from api.utils.log_utils import logger class Docx(DocxParser): @@ -122,8 +122,8 @@ def __call__(self, filename, binary=None, from_page=0, start = timer() self._layouts_rec(zoomin) callback(0.67, "Layout analysis finished") - cron_logger.info("layouts:".format( - (timer() - start) / (self.total_page + 0.1))) + logger.info("layouts:".format( + )) self._naive_vertical_merge() callback(0.8, "Text extraction finished") diff --git a/rag/app/manual.py b/rag/app/manual.py index b9546083b54..a3cf100e71c 100644 --- a/rag/app/manual.py +++ b/rag/app/manual.py @@ -24,6 +24,7 @@ from deepdoc.parser import PdfParser, PlainParser, DocxParser from docx import Document from PIL import Image +from api.utils.log_utils import logger class Pdf(PdfParser): @@ -47,11 +48,11 @@ def __call__(self, filename, binary=None, from_page=0, # for bb in self.boxes: # for b in bb: # print(b) - print("OCR:", timer() - start) + logger.info("OCR: {}".format(timer() - start)) self._layouts_rec(zoomin) callback(0.65, "Layout analysis finished.") - print("layouts:", timer() - start) + logger.info("layouts: {}".format(timer() - start)) self._table_transformer_job(zoomin) callback(0.67, "Table analysis finished.") self._text_merge() diff --git a/rag/app/naive.py b/rag/app/naive.py index f86d9ed271a..3834c67b106 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -19,7 +19,7 @@ from rag.nlp import rag_tokenizer, naive_merge, tokenize_table, tokenize_chunks, find_codec, concat_img, \ naive_merge_docx, tokenize_chunks_docx from deepdoc.parser import PdfParser, ExcelParser, DocxParser, HtmlParser, JsonParser, MarkdownParser, TxtParser -from rag.settings import cron_logger +from api.utils.log_utils import logger from rag.utils import num_tokens_from_string from PIL import Image from functools import reduce @@ -41,18 +41,18 @@ def get_picture(self, document, paragraph): try: image_blob = related_part.image.blob except UnrecognizedImageError: - print("Unrecognized image format. Skipping image.") + logger.info("Unrecognized image format. Skipping image.") return None except UnexpectedEndOfFileError: - print("EOF was unexpectedly encountered while reading an image stream. Skipping image.") + logger.info("EOF was unexpectedly encountered while reading an image stream. Skipping image.") return None except InvalidImageStreamError: - print("The recognized image stream appears to be corrupted. Skipping image.") + logger.info("The recognized image stream appears to be corrupted. Skipping image.") return None try: image = Image.open(BytesIO(image_blob)).convert('RGB') return image - except Exception as e: + except Exception: return None def __clean(self, line): @@ -133,7 +133,7 @@ def __call__(self, filename, binary=None, from_page=0, callback ) callback(msg="OCR finished") - cron_logger.info("OCR({}~{}): {}".format(from_page, to_page, timer() - start)) + logger.info("OCR({}~{}): {}".format(from_page, to_page, timer() - start)) start = timer() self._layouts_rec(zoomin) @@ -147,7 +147,7 @@ def __call__(self, filename, binary=None, from_page=0, self._concat_downward() # self._filter_forpages() - cron_logger.info("layouts: {}".format(timer() - start)) + logger.info("layouts cost: {}s".format(timer() - start)) return [(b["text"], self._line_tag(b, zoomin)) for b in self.boxes], tbls @@ -216,7 +216,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, return chunks res.extend(tokenize_chunks_docx(chunks, doc, eng, images)) - cron_logger.info("naive_merge({}): {}".format(filename, timer() - st)) + logger.info("naive_merge({}): {}".format(filename, timer() - st)) return res elif re.search(r"\.pdf$", filename, re.IGNORECASE): @@ -280,7 +280,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, return chunks res.extend(tokenize_chunks(chunks, doc, eng, pdf_parser)) - cron_logger.info("naive_merge({}): {}".format(filename, timer() - st)) + logger.info("naive_merge({}): {}".format(filename, timer() - st)) return res diff --git a/rag/app/one.py b/rag/app/one.py index 6a20ef6a935..7335c74fa73 100644 --- a/rag/app/one.py +++ b/rag/app/one.py @@ -18,6 +18,7 @@ from rag.app import laws from rag.nlp import rag_tokenizer, tokenize from deepdoc.parser import PdfParser, ExcelParser, PlainParser, HtmlParser +from api.utils.log_utils import logger class Pdf(PdfParser): @@ -37,7 +38,7 @@ def __call__(self, filename, binary=None, from_page=0, start = timer() self._layouts_rec(zoomin, drop=False) callback(0.63, "Layout analysis finished.") - print("layouts:", timer() - start) + logger.info("layouts cost: {}s".format(timer() - start)) self._table_transformer_job(zoomin) callback(0.65, "Table analysis finished.") self._text_merge() diff --git a/rag/app/paper.py b/rag/app/paper.py index 2092581bb37..59c3ccd7560 100644 --- a/rag/app/paper.py +++ b/rag/app/paper.py @@ -17,6 +17,7 @@ from rag.nlp import rag_tokenizer, tokenize, tokenize_table, add_positions, bullets_category, title_frequency, tokenize_chunks from deepdoc.parser import PdfParser, PlainParser import numpy as np +from api.utils.log_utils import logger class Pdf(PdfParser): @@ -40,7 +41,7 @@ def __call__(self, filename, binary=None, from_page=0, start = timer() self._layouts_rec(zoomin) callback(0.63, "Layout analysis finished") - print("layouts:", timer() - start) + logger.info(f"layouts cost: {timer() - start}s") self._table_transformer_job(zoomin) callback(0.68, "Table analysis finished") self._text_merge() @@ -52,8 +53,8 @@ def __call__(self, filename, binary=None, from_page=0, # clean mess if column_width < self.page_images[0].size[0] / zoomin / 2: - print("two_column...................", column_width, - self.page_images[0].size[0] / zoomin / 2) + logger.info("two_column................... {} {}".format(column_width, + self.page_images[0].size[0] / zoomin / 2)) self.boxes = self.sort_X_by_page(self.boxes, column_width / 2) for b in self.boxes: b["text"] = re.sub(r"([\t  ]|\u3000){2,}", " ", b["text"].strip()) @@ -114,8 +115,8 @@ def _begin(txt): from_page, min( to_page, self.total_page))) for b in self.boxes: - print(b["text"], b.get("layoutno")) - print(tbls) + logger.info("{} {}".format(b["text"], b.get("layoutno"))) + logger.info("{}".format(tbls)) return { "title": title, @@ -156,7 +157,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, doc["authors_sm_tks"] = rag_tokenizer.fine_grained_tokenize(doc["authors_tks"]) # is it English eng = lang.lower() == "english" # pdf_parser.is_english - print("It's English.....", eng) + logger.info("It's English.....{}".format(eng)) res = tokenize_table(paper["tables"], doc, eng) @@ -183,7 +184,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, if lvl <= most_level and i > 0 and lvl != levels[i - 1]: sid += 1 sec_ids.append(sid) - print(lvl, sorted_sections[i][0], most_level, sid) + logger.info("{} {} {} {}".format(lvl, sorted_sections[i][0], most_level, sid)) chunks = [] last_sid = -2 diff --git a/rag/app/qa.py b/rag/app/qa.py index 1f2aa30560e..b6c756966cb 100644 --- a/rag/app/qa.py +++ b/rag/app/qa.py @@ -19,7 +19,7 @@ from deepdoc.parser.utils import get_text from rag.nlp import is_english, random_choices, qbullets_category, add_positions, has_qbullet, docx_question_level from rag.nlp import rag_tokenizer, tokenize_table, concat_img -from rag.settings import cron_logger +from api.utils.log_utils import logger from deepdoc.parser import PdfParser, ExcelParser, DocxParser from docx import Document from PIL import Image @@ -82,7 +82,7 @@ def __call__(self, filename, binary=None, from_page=0, callback ) callback(msg="OCR finished") - cron_logger.info("OCR({}~{}): {}".format(from_page, to_page, timer() - start)) + logger.info("OCR({}~{}): {}".format(from_page, to_page, timer() - start)) start = timer() self._layouts_rec(zoomin, drop=False) callback(0.63, "Layout analysis finished.") @@ -94,7 +94,7 @@ def __call__(self, filename, binary=None, from_page=0, #self._naive_vertical_merge() # self._concat_downward() #self._filter_forpages() - cron_logger.info("layouts: {}".format(timer() - start)) + logger.info("layouts: {}".format(timer() - start)) sections = [b["text"] for b in self.boxes] bull_x0_list = [] q_bull, reg = qbullets_category(sections) diff --git a/rag/app/resume.py b/rag/app/resume.py index 235c7793c82..ade0b12c93f 100644 --- a/rag/app/resume.py +++ b/rag/app/resume.py @@ -14,14 +14,13 @@ import datetime import json import re - import pandas as pd import requests from api.db.services.knowledgebase_service import KnowledgebaseService from rag.nlp import rag_tokenizer from deepdoc.parser.resume import refactor from deepdoc.parser.resume import step_one, step_two -from rag.settings import cron_logger +from api.utils.log_utils import logger from rag.utils import rmSpace forbidden_select_fields4resume = [ @@ -64,8 +63,8 @@ def remote_call(filename, binary): "updated_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}])) resume = step_two.parse(resume) return resume - except Exception as e: - cron_logger.error("Resume parser error: " + str(e)) + except Exception: + logger.exception("Resume parser error") return {} @@ -87,7 +86,7 @@ def chunk(filename, binary=None, callback=None, **kwargs): callback(-1, "Resume is not successfully parsed.") raise Exception("Resume parser remote call fail!") callback(0.6, "Done parsing. Chunking...") - print(json.dumps(resume, ensure_ascii=False, indent=2)) + logger.info("chunking resume: " + json.dumps(resume, ensure_ascii=False, indent=2)) field_map = { "name_kwd": "姓名/名字", @@ -159,7 +158,7 @@ def chunk(filename, binary=None, callback=None, **kwargs): resume[n] = rag_tokenizer.fine_grained_tokenize(resume[n]) doc[n] = resume[n] - print(doc) + logger.info("chunked resume to " + str(doc)) KnowledgebaseService.update_parser_config( kwargs["kb_id"], {"field_map": field_map}) return [doc] diff --git a/rag/llm/embedding_model.py b/rag/llm/embedding_model.py index fbd93ccee90..daddbbba822 100644 --- a/rag/llm/embedding_model.py +++ b/rag/llm/embedding_model.py @@ -32,6 +32,7 @@ from rag.utils import num_tokens_from_string, truncate import google.generativeai as genai import json +from api.utils.log_utils import logger class Base(ABC): def __init__(self, key, model_name): @@ -68,7 +69,7 @@ def __init__(self, key, model_name, **kwargs): DefaultEmbedding._model = FlagModel(os.path.join(get_home_cache_dir(), re.sub(r"^[a-zA-Z]+/", "", model_name)), query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:", use_fp16=torch.cuda.is_available()) - except Exception as e: + except Exception: model_dir = snapshot_download(repo_id="BAAI/bge-large-zh-v1.5", local_dir=os.path.join(get_home_cache_dir(), re.sub(r"^[a-zA-Z]+/", "", model_name)), local_dir_use_symlinks=False) @@ -189,7 +190,7 @@ def encode_queries(self, text): ) return np.array(resp["output"]["embeddings"][0] ["embedding"]), resp["usage"]["total_tokens"] - except Exception as e: + except Exception: raise Exception("Account abnormal. Please ensure it's on good standing to use QWen's "+self.model_name) return np.array([]), 0 @@ -296,11 +297,11 @@ def __init__(self, key=None, model_name="maidalun1020/bce-embedding-base_v1", ** if not LIGHTEN and not YoudaoEmbed._client: from BCEmbedding import EmbeddingModel as qanthing try: - print("LOADING BCE...") + logger.info("LOADING BCE...") YoudaoEmbed._client = qanthing(model_name_or_path=os.path.join( get_home_cache_dir(), "bce-embedding-base_v1")) - except Exception as e: + except Exception: YoudaoEmbed._client = qanthing( model_name_or_path=model_name.replace( "maidalun1020", "InfiniFlow")) diff --git a/rag/llm/rerank_model.py b/rag/llm/rerank_model.py index c322f257d41..29925505fa6 100644 --- a/rag/llm/rerank_model.py +++ b/rag/llm/rerank_model.py @@ -27,6 +27,7 @@ from api.utils.file_utils import get_home_cache_dir from rag.utils import num_tokens_from_string, truncate import json +from api.utils.log_utils import logger def sigmoid(x): @@ -66,7 +67,7 @@ def __init__(self, key, model_name, **kwargs): DefaultRerank._model = FlagReranker( os.path.join(get_home_cache_dir(), re.sub(r"^[a-zA-Z]+/", "", model_name)), use_fp16=torch.cuda.is_available()) - except Exception as e: + except Exception: model_dir = snapshot_download(repo_id=model_name, local_dir=os.path.join(get_home_cache_dir(), re.sub(r"^[a-zA-Z]+/", "", model_name)), @@ -126,11 +127,11 @@ def __init__(self, key=None, model_name="maidalun1020/bce-reranker-base_v1", **k with YoudaoRerank._model_lock: if not YoudaoRerank._model: try: - print("LOADING BCE...") + logger.info("LOADING BCE...") YoudaoRerank._model = RerankerModel(model_name_or_path=os.path.join( get_home_cache_dir(), re.sub(r"^[a-zA-Z]+/", "", model_name))) - except Exception as e: + except Exception: YoudaoRerank._model = RerankerModel( model_name_or_path=model_name.replace( "maidalun1020", "InfiniFlow")) diff --git a/rag/nlp/__init__.py b/rag/nlp/__init__.py index 03fca45f11a..8fd23973684 100644 --- a/rag/nlp/__init__.py +++ b/rag/nlp/__init__.py @@ -26,6 +26,7 @@ from cn2an import cn2an from PIL import Image import json +from api.utils.log_utils import logger all_codecs = [ 'utf-8', 'gb2312', 'gbk', 'utf_16', 'ascii', 'big5', 'big5hkscs', @@ -235,7 +236,7 @@ def tokenize_chunks(chunks, doc, eng, pdf_parser=None): # wrap up as es documents for ck in chunks: if len(ck.strip()) == 0:continue - print("--", ck) + logger.debug("-- {}".format(ck)) d = copy.deepcopy(doc) if pdf_parser: try: @@ -254,7 +255,7 @@ def tokenize_chunks_docx(chunks, doc, eng, images): # wrap up as es documents for ck, image in zip(chunks, images): if len(ck.strip()) == 0:continue - print("--", ck) + logger.debug("-- {}".format(ck)) d = copy.deepcopy(doc) d["image"] = image tokenize(d, ck, eng) @@ -457,7 +458,7 @@ def binary_search(arr, target): for i in range(len(cks)): cks[i] = [sections[j] for j in cks[i][::-1]] - print("--------------\n", "\n* ".join(cks[i])) + logger.info("\n* ".join(cks[i])) res = [[]] num = [0] diff --git a/rag/nlp/rag_tokenizer.py b/rag/nlp/rag_tokenizer.py index bfe4887a6bd..7d1eaf3a853 100644 --- a/rag/nlp/rag_tokenizer.py +++ b/rag/nlp/rag_tokenizer.py @@ -22,10 +22,10 @@ import string import sys from hanziconv import HanziConv -from huggingface_hub import snapshot_download from nltk import word_tokenize from nltk.stem import PorterStemmer, WordNetLemmatizer from api.utils.file_utils import get_project_base_directory +from api.utils.log_utils import logger class RagTokenizer: @@ -36,7 +36,7 @@ def rkey_(self, line): return str(("DD" + (line[::-1].lower())).encode("utf-8"))[2:-1] def loadDict_(self, fnm): - print("[HUQIE]:Build trie", fnm, file=sys.stderr) + logger.info(f"[HUQIE]:Build trie {fnm}") try: of = open(fnm, "r", encoding='utf-8') while True: @@ -52,8 +52,9 @@ def loadDict_(self, fnm): self.trie_[self.rkey_(line[0])] = 1 self.trie_.save(fnm + ".trie") of.close() - except Exception as e: - print("[HUQIE]:Faild to build trie, ", fnm, e, file=sys.stderr) + except Exception: + logger.exception(f"[HUQIE]:Build trie {fnm} failed") + def __init__(self, debug=False): self.DEBUG = debug @@ -68,8 +69,8 @@ def __init__(self, debug=False): try: self.trie_ = datrie.Trie.load(self.DIR_ + ".txt.trie") return - except Exception as e: - print("[HUQIE]:Build default trie", file=sys.stderr) + except Exception: + logger.exception("[HUQIE]:Build default trie") self.trie_ = datrie.Trie(string.printable) self.loadDict_(self.DIR_ + ".txt") @@ -78,7 +79,7 @@ def loadUserDict(self, fnm): try: self.trie_ = datrie.Trie.load(fnm + ".trie") return - except Exception as e: + except Exception: self.trie_ = datrie.Trie(string.printable) self.loadDict_(fnm) @@ -173,8 +174,7 @@ def score_(self, tfts): tks.append(tk) F /= len(tks) L /= len(tks) - if self.DEBUG: - print("[SC]", tks, len(tks), L, F, B / len(tks) + L + F) + logger.debug("[SC] {} {} {} {} {}".format(tks, len(tks), L, F, B / len(tks) + L + F)) return tks, B / len(tks) + L + F def sortTks_(self, tkslist): @@ -278,8 +278,8 @@ def tokenize(self, line): tks, s = self.maxForward_(L) tks1, s1 = self.maxBackward_(L) if self.DEBUG: - print("[FW]", tks, s) - print("[BW]", tks1, s1) + logger.debug("[FW] {} {}".format(tks, s)) + logger.debug("[BW] {} {}".format(tks1, s1)) i, j, _i, _j = 0, 0, 0, 0 same = 0 @@ -326,8 +326,7 @@ def tokenize(self, line): res.append(" ".join(self.sortTks_(tkslist)[0][0])) res = " ".join(self.english_normalize_(res)) - if self.DEBUG: - print("[TKS]", self.merge_(res)) + logger.debug("[TKS] {}".format(self.merge_(res))) return self.merge_(res) def fine_grained_tokenize(self, tks): @@ -418,30 +417,30 @@ def naiveQie(txt): # huqie.addUserDict("/tmp/tmp.new.tks.dict") tks = tknzr.tokenize( "哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈") - print(tknzr.fine_grained_tokenize(tks)) + logger.info(tknzr.fine_grained_tokenize(tks)) tks = tknzr.tokenize( "公开征求意见稿提出,境外投资者可使用自有人民币或外汇投资。使用外汇投资的,可通过债券持有人在香港人民币业务清算行及香港地区经批准可进入境内银行间外汇市场进行交易的境外人民币业务参加行(以下统称香港结算行)办理外汇资金兑换。香港结算行由此所产生的头寸可到境内银行间外汇市场平盘。使用外汇投资的,在其投资的债券到期或卖出后,原则上应兑换回外汇。") - print(tknzr.fine_grained_tokenize(tks)) + logger.info(tknzr.fine_grained_tokenize(tks)) tks = tknzr.tokenize( "多校划片就是一个小区对应多个小学初中,让买了学区房的家庭也不确定到底能上哪个学校。目的是通过这种方式为学区房降温,把就近入学落到实处。南京市长江大桥") - print(tknzr.fine_grained_tokenize(tks)) + logger.info(tknzr.fine_grained_tokenize(tks)) tks = tknzr.tokenize( "实际上当时他们已经将业务中心偏移到安全部门和针对政府企业的部门 Scripts are compiled and cached aaaaaaaaa") - print(tknzr.fine_grained_tokenize(tks)) + logger.info(tknzr.fine_grained_tokenize(tks)) tks = tknzr.tokenize("虽然我不怎么玩") - print(tknzr.fine_grained_tokenize(tks)) + logger.info(tknzr.fine_grained_tokenize(tks)) tks = tknzr.tokenize("蓝月亮如何在外资夹击中生存,那是全宇宙最有意思的") - print(tknzr.fine_grained_tokenize(tks)) + logger.info(tknzr.fine_grained_tokenize(tks)) tks = tknzr.tokenize( "涡轮增压发动机num最大功率,不像别的共享买车锁电子化的手段,我们接过来是否有意义,黄黄爱美食,不过,今天阿奇要讲到的这家农贸市场,说实话,还真蛮有特色的!不仅环境好,还打出了") - print(tknzr.fine_grained_tokenize(tks)) + logger.info(tknzr.fine_grained_tokenize(tks)) tks = tknzr.tokenize("这周日你去吗?这周日你有空吗?") - print(tknzr.fine_grained_tokenize(tks)) + logger.info(tknzr.fine_grained_tokenize(tks)) tks = tknzr.tokenize("Unity3D开发经验 测试开发工程师 c++双11双11 985 211 ") - print(tknzr.fine_grained_tokenize(tks)) + logger.info(tknzr.fine_grained_tokenize(tks)) tks = tknzr.tokenize( "数据分析项目经理|数据分析挖掘|数据分析方向|商品数据分析|搜索数据分析 sql python hive tableau Cocos2d-") - print(tknzr.fine_grained_tokenize(tks)) + logger.info(tknzr.fine_grained_tokenize(tks)) if len(sys.argv) < 2: sys.exit() tknzr.DEBUG = False @@ -451,5 +450,5 @@ def naiveQie(txt): line = of.readline() if not line: break - print(tknzr.tokenize(line)) + logger.info(tknzr.tokenize(line)) of.close() diff --git a/rag/nlp/search.py b/rag/nlp/search.py index 6737f22e295..8e288788fdd 100644 --- a/rag/nlp/search.py +++ b/rag/nlp/search.py @@ -19,7 +19,7 @@ from typing import List, Optional, Dict, Union from dataclasses import dataclass -from rag.settings import doc_store_logger +from api.utils.log_utils import logger from rag.utils import rmSpace from rag.nlp import rag_tokenizer, query import numpy as np @@ -83,7 +83,7 @@ def search(self, req, idx_names: list[str], kb_ids: list[str], emb_mdl=None, hig orderBy.desc("create_timestamp_flt") res = self.dataStore.search(src, [], filters, [], orderBy, offset, limit, idx_names, kb_ids) total=self.dataStore.getTotal(res) - doc_store_logger.info("Dealer.search TOTAL: {}".format(total)) + logger.info("Dealer.search TOTAL: {}".format(total)) else: highlightFields = ["content_ltks", "title_tks"] if highlight else [] matchText, keywords = self.qryr.question(qst, min_match=0.3) @@ -91,7 +91,7 @@ def search(self, req, idx_names: list[str], kb_ids: list[str], emb_mdl=None, hig matchExprs = [matchText] res = self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids) total=self.dataStore.getTotal(res) - doc_store_logger.info("Dealer.search TOTAL: {}".format(total)) + logger.info("Dealer.search TOTAL: {}".format(total)) else: matchDense = self.get_vector(qst, emb_mdl, topk, req.get("similarity", 0.1)) q_vec = matchDense.embedding_data @@ -102,7 +102,7 @@ def search(self, req, idx_names: list[str], kb_ids: list[str], emb_mdl=None, hig res = self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids) total=self.dataStore.getTotal(res) - doc_store_logger.info("Dealer.search TOTAL: {}".format(total)) + logger.info("Dealer.search TOTAL: {}".format(total)) # If result is empty, try again with lower min_match if total == 0: @@ -112,7 +112,7 @@ def search(self, req, idx_names: list[str], kb_ids: list[str], emb_mdl=None, hig matchDense.extra_options["similarity"] = 0.17 res = self.dataStore.search(src, highlightFields, filters, [matchText, matchDense, fusionExpr], orderBy, offset, limit, idx_names, kb_ids) total=self.dataStore.getTotal(res) - doc_store_logger.info("Dealer.search 2 TOTAL: {}".format(total)) + logger.info("Dealer.search 2 TOTAL: {}".format(total)) for k in keywords: kwds.add(k) @@ -123,7 +123,7 @@ def search(self, req, idx_names: list[str], kb_ids: list[str], emb_mdl=None, hig continue kwds.add(kk) - doc_store_logger.info(f"TOTAL: {total}") + logger.info(f"TOTAL: {total}") ids=self.dataStore.getChunkIds(res) keywords=list(kwds) highlight = self.dataStore.getHighlight(res, keywords, "content_with_weight") @@ -180,7 +180,7 @@ def insert_citations(self, answer, chunks, chunk_v, continue idx.append(i) pieces_.append(t) - doc_store_logger.info("{} => {}".format(answer, pieces_)) + logger.info("{} => {}".format(answer, pieces_)) if not pieces_: return answer, set([]) @@ -201,7 +201,7 @@ def insert_citations(self, answer, chunks, chunk_v, chunks_tks, tkweight, vtweight) mx = np.max(sim) * 0.99 - doc_store_logger.info("{} SIM: {}".format(pieces_[i], mx)) + logger.info("{} SIM: {}".format(pieces_[i], mx)) if mx < thr: continue cites[idx[i]] = list( diff --git a/rag/nlp/synonym.py b/rag/nlp/synonym.py index 12ca1dd7999..5b0f4fad07d 100644 --- a/rag/nlp/synonym.py +++ b/rag/nlp/synonym.py @@ -17,10 +17,10 @@ import json import os import time -import logging import re from api.utils.file_utils import get_project_base_directory +from api.utils.log_utils import logger class Dealer: @@ -32,15 +32,15 @@ def __init__(self, redis=None): path = os.path.join(get_project_base_directory(), "rag/res", "synonym.json") try: self.dictionary = json.load(open(path, 'r')) - except Exception as e: - logging.warn("Missing synonym.json") + except Exception: + logger.warn("Missing synonym.json") self.dictionary = {} if not redis: - logging.warning( + logger.warning( "Realtime synonym is disabled, since no redis connection.") if not len(self.dictionary.keys()): - logging.warning(f"Fail to load synonym") + logger.warning("Fail to load synonym") self.redis = redis self.load() @@ -64,7 +64,7 @@ def load(self): d = json.loads(d) self.dictionary = d except Exception as e: - logging.error("Fail to load synonym!" + str(e)) + logger.error("Fail to load synonym!" + str(e)) def lookup(self, tk): self.lookup_num += 1 diff --git a/rag/nlp/term_weight.py b/rag/nlp/term_weight.py index f0be0527ffb..2a3533ca513 100644 --- a/rag/nlp/term_weight.py +++ b/rag/nlp/term_weight.py @@ -21,6 +21,7 @@ import numpy as np from rag.nlp import rag_tokenizer from api.utils.file_utils import get_project_base_directory +from api.utils.log_utils import logger class Dealer: @@ -81,12 +82,12 @@ def load_dict(fnm): self.ne, self.df = {}, {} try: self.ne = json.load(open(os.path.join(fnm, "ner.json"), "r")) - except Exception as e: - print("[WARNING] Load ner.json FAIL!") + except Exception: + logger.warning("Load ner.json FAIL!") try: self.df = load_dict(os.path.join(fnm, "term.freq")) - except Exception as e: - print("[WARNING] Load term.freq FAIL!") + except Exception: + logger.warning("Load term.freq FAIL!") def pretoken(self, txt, num=False, stpwd=True): patt = [ diff --git a/rag/raptor.py b/rag/raptor.py index c0254cc8996..9839c5009f5 100644 --- a/rag/raptor.py +++ b/rag/raptor.py @@ -14,7 +14,6 @@ # limitations under the License. # import re -import traceback from concurrent.futures import ThreadPoolExecutor, ALL_COMPLETED, wait from threading import Lock from typing import Tuple @@ -22,7 +21,8 @@ import numpy as np from sklearn.mixture import GaussianMixture -from rag.utils import num_tokens_from_string, truncate +from rag.utils import truncate +from api.utils.log_utils import logger class RecursiveAbstractiveProcessing4TreeOrganizedRetrieval: @@ -62,14 +62,13 @@ def summarize(ck_idx, lock): {"temperature": 0.3, "max_tokens": self._max_token} ) cnt = re.sub("(······\n由于长度的原因,回答被截断了,要继续吗?|For the content length reason, it stopped, continue?)", "", cnt) - print("SUM:", cnt) + logger.info(f"SUM: {cnt}") embds, _ = self._embd_model.encode([cnt]) with lock: if not len(embds[0]): return chunks.append((cnt, embds[0])) except Exception as e: - print(e, flush=True) - traceback.print_stack(e) + logger.exception("summarize got exception") return e labels = [] @@ -105,7 +104,7 @@ def summarize(ck_idx, lock): ck_idx = [i+start for i in range(len(lbls)) if lbls[i] == c] threads.append(executor.submit(summarize, ck_idx, lock)) wait(threads, return_when=ALL_COMPLETED) - print([t.result() for t in threads]) + logger.info(str([t.result() for t in threads])) assert len(chunks) - end == n_clusters, "{} vs. {}".format(len(chunks) - end, n_clusters) labels.extend(lbls) diff --git a/rag/settings.py b/rag/settings.py index 74165822fed..f31e00fdcc2 100644 --- a/rag/settings.py +++ b/rag/settings.py @@ -14,15 +14,11 @@ # limitations under the License. # import os -import logging from api.utils import get_base_config, decrypt_database_config from api.utils.file_utils import get_project_base_directory -from api.utils.log_utils import LoggerFactory, getLogger - # Server RAG_CONF_PATH = os.path.join(get_project_base_directory(), "conf") -SUBPROCESS_STD_LOG_NAME = "std.log" ES = get_base_config("es", {}) INFINITY = get_base_config("infinity", {"uri": "infinity:23817"}) @@ -36,29 +32,6 @@ pass DOC_MAXIMUM_SIZE = int(os.environ.get("MAX_CONTENT_LENGTH", 128 * 1024 * 1024)) -# Logger -LoggerFactory.set_directory( - os.path.join( - get_project_base_directory(), - "logs", - "rag")) -# {CRITICAL: 50, FATAL:50, ERROR:40, WARNING:30, WARN:30, INFO:20, DEBUG:10, NOTSET:0} -LoggerFactory.LEVEL = 30 - -doc_store_logger = getLogger("doc_store") -minio_logger = getLogger("minio") -s3_logger = getLogger("s3") -azure_logger = getLogger("azure") -cron_logger = getLogger("cron_logger") -chunk_logger = getLogger("chunk_logger") -database_logger = getLogger("database") - -formatter = logging.Formatter("%(asctime)-15s %(levelname)-8s (%(process)d) %(message)s") -for logger in [doc_store_logger, minio_logger, s3_logger, azure_logger, cron_logger, chunk_logger, database_logger]: - logger.setLevel(logging.INFO) - for handler in logger.handlers: - handler.setFormatter(fmt=formatter) - SVR_QUEUE_NAME = "rag_flow_svr_queue" SVR_QUEUE_RETENTION = 60*60 SVR_QUEUE_MAX_LEN = 1024 diff --git a/rag/svr/cache_file_svr.py b/rag/svr/cache_file_svr.py index e929f899c19..6b0581164c1 100644 --- a/rag/svr/cache_file_svr.py +++ b/rag/svr/cache_file_svr.py @@ -13,20 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import random import time import traceback from api.db.db_models import close_connection from api.db.services.task_service import TaskService -from rag.settings import cron_logger +from api.utils.log_utils import logger from rag.utils.storage_factory import STORAGE_IMPL from rag.utils.redis_conn import REDIS_CONN def collect(): doc_locations = TaskService.get_ongoing_doc_name() - print(doc_locations) + logger.info(doc_locations) if len(doc_locations) == 0: time.sleep(1) return @@ -35,7 +34,7 @@ def collect(): def main(): locations = collect() if not locations:return - print("TASKS:", len(locations)) + logger.info(f"TASKS: {len(locations)}") for kb_id, loc in locations: try: if REDIS_CONN.is_alive(): @@ -44,7 +43,7 @@ def main(): if REDIS_CONN.exist(key):continue file_bin = STORAGE_IMPL.get(kb_id, loc) REDIS_CONN.transaction(key, file_bin, 12 * 60) - cron_logger.info("CACHE: {}".format(loc)) + logger.info("CACHE: {}".format(loc)) except Exception as e: traceback.print_stack(e) except Exception as e: diff --git a/rag/svr/discord_svr.py b/rag/svr/discord_svr.py index 5426826c99a..313205dea8d 100644 --- a/rag/svr/discord_svr.py +++ b/rag/svr/discord_svr.py @@ -17,6 +17,7 @@ import requests import base64 import asyncio +from api.utils.log_utils import logger URL = '{YOUR_IP_ADDRESS:PORT}/v1/api/completion_aibotk' # Default: https://demo.ragflow.io/v1/api/completion_aibotk @@ -36,7 +37,7 @@ @client.event async def on_ready(): - print(f'We have logged in as {client.user}') + logger.info(f'We have logged in as {client.user}') @client.event diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 15b5aaf33b7..fb8207ae818 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -22,7 +22,6 @@ import re import sys import time -import traceback from concurrent.futures import ThreadPoolExecutor from functools import partial from io import BytesIO @@ -43,8 +42,8 @@ from rag.app import laws, paper, presentation, manual, qa, table, book, resume, picture, naive, one, audio, knowledge_graph, email from rag.nlp import search, rag_tokenizer from rag.raptor import RecursiveAbstractiveProcessing4TreeOrganizedRetrieval as Raptor -from rag.settings import database_logger, SVR_QUEUE_NAME -from rag.settings import cron_logger, DOC_MAXIMUM_SIZE +from api.utils.log_utils import logger, LOG_FILE +from rag.settings import DOC_MAXIMUM_SIZE, SVR_QUEUE_NAME from rag.utils import rmSpace, num_tokens_from_string from rag.utils.redis_conn import REDIS_CONN, Payload from rag.utils.storage_factory import STORAGE_IMPL @@ -90,8 +89,8 @@ def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing... d["progress"] = prog try: TaskService.update_progress(task_id, d) - except Exception as e: - cron_logger.error("set_progress:({}), {}".format(task_id, str(e))) + except Exception: + logger.exception(f"set_progress({task_id}) got exception") close_connection() if cancel: @@ -110,8 +109,8 @@ def collect(): if not PAYLOAD: time.sleep(1) return pd.DataFrame() - except Exception as e: - cron_logger.error("Get task event from queue exception:" + str(e)) + except Exception: + logger.exception("Get task event from queue exception") return pd.DataFrame() msg = PAYLOAD.get_message() @@ -119,11 +118,11 @@ def collect(): return pd.DataFrame() if TaskService.do_cancel(msg["id"]): - cron_logger.info("Task {} has been canceled.".format(msg["id"])) + logger.info("Task {} has been canceled.".format(msg["id"])) return pd.DataFrame() tasks = TaskService.get_tasks(msg["id"]) if not tasks: - cron_logger.warning("{} empty task!".format(msg["id"])) + logger.warning("{} empty task!".format(msg["id"])) return [] tasks = pd.DataFrame(tasks) @@ -152,33 +151,29 @@ def build(row): st = timer() bucket, name = File2DocumentService.get_storage_address(doc_id=row["doc_id"]) binary = get_storage_binary(bucket, name) - cron_logger.info( + logger.info( "From minio({}) {}/{}".format(timer() - st, row["location"], row["name"])) except TimeoutError: callback(-1, "Internal server error: Fetch file from minio timeout. Could you try it again.") - cron_logger.error( - "Minio {}/{}: Fetch file from minio timeout.".format(row["location"], row["name"])) + logger.exception("Minio {}/{} got timeout: Fetch file from minio timeout.".format(row["location"], row["name"])) return except Exception as e: if re.search("(No such file|not found)", str(e)): callback(-1, "Can not find file <%s> from minio. Could you try it again?" % row["name"]) else: callback(-1, "Get file from minio: %s" % str(e).replace("'", "")) - traceback.print_exc() + logger.exception("Chunking {}/{} got exception".format(row["location"], row["name"])) return try: cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"], to_page=row["to_page"], lang=row["language"], callback=callback, kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"]) - cron_logger.info( - "Chunking({}) {}/{}".format(timer() - st, row["location"], row["name"])) + logger.info("Chunking({}) {}/{} done".format(timer() - st, row["location"], row["name"])) except Exception as e: callback(-1, "Internal server error while chunking: %s" % str(e).replace("'", "")) - cron_logger.error( - "Chunking {}/{}: {}".format(row["location"], row["name"], str(e))) - traceback.print_exc() + logger.exception("Chunking {}/{} got exception".format(row["location"], row["name"])) return docs = [] @@ -214,14 +209,13 @@ def build(row): st = timer() STORAGE_IMPL.put(row["kb_id"], d["id"], output_buffer.getvalue()) el += timer() - st - except Exception as e: - cron_logger.error(str(e)) - traceback.print_exc() + except Exception: + logger.exception("Saving image of chunk {}/{}/{} got exception".format(row["location"], row["name"], d["_id"])) d["img_id"] = "{}-{}".format(row["kb_id"], d["id"]) del d["image"] docs.append(d) - cron_logger.info("MINIO PUT({}):{}".format(row["name"], el)) + logger.info("MINIO PUT({}):{}".format(row["name"], el)) if row["parser_config"].get("auto_keywords", 0): callback(msg="Start to generate keywords for every chunk ...") @@ -347,7 +341,7 @@ def main(): embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING, llm_name=r["embd_id"], lang=r["language"]) except Exception as e: callback(-1, msg=str(e)) - cron_logger.error(str(e)) + logger.exception("LLMBundle got exception") continue if r.get("task_type", "") == "raptor": @@ -356,12 +350,12 @@ def main(): cks, tk_count, vector_size = run_raptor(r, chat_mdl, embd_mdl, callback) except Exception as e: callback(-1, msg=str(e)) - cron_logger.error(str(e)) + logger.exception("run_raptor got exception") continue else: st = timer() cks = build(r) - cron_logger.info("Build chunks({}): {}".format(r["name"], timer() - st)) + logger.info("Build chunks({}): {}".format(r["name"], timer() - st)) if cks is None: continue if not cks: @@ -377,12 +371,12 @@ def main(): tk_count, vector_size = embedding(cks, embd_mdl, r["parser_config"], callback) except Exception as e: callback(-1, "Embedding error:{}".format(str(e))) - cron_logger.error(str(e)) + logger.exception("run_rembedding got exception") tk_count = 0 - cron_logger.info("Embedding elapsed({}): {:.2f}".format(r["name"], timer() - st)) + logger.info("Embedding elapsed({}): {:.2f}".format(r["name"], timer() - st)) callback(msg="Finished embedding({:.2f})! Start to build index!".format(timer() - st)) - # cron_logger.info(f"task_executor init_kb index {search.index_name(r["tenant_id"])} embd_mdl {embd_mdl.llm_name} vector length {vector_size}") + # logger.info(f"task_executor init_kb index {search.index_name(r["tenant_id"])} embd_mdl {embd_mdl.llm_name} vector length {vector_size}") init_kb(r, vector_size) chunk_count = len(set([c["id"] for c in cks])) st = timer() @@ -393,11 +387,11 @@ def main(): if b % 128 == 0: callback(prog=0.8 + 0.1 * (b + 1) / len(cks), msg="") - cron_logger.info("Indexing elapsed({}): {:.2f}".format(r["name"], timer() - st)) + logger.info("Indexing elapsed({}): {:.2f}".format(r["name"], timer() - st)) if es_r: - callback(-1, "Insert chunk error, detail info please check ragflow-logs/api/cron_logger.log. Please also check ES status!") + callback(-1, f"Insert chunk error, detail info please check {LOG_FILE}. Please also check ES status!") docStoreConn.delete({"doc_id": r["doc_id"]}, search.index_name(r["tenant_id"]), r["kb_id"]) - cron_logger.error('Insert chunk error: ' + str(es_r)) + logger.error('Insert chunk error: ' + str(es_r)) else: if TaskService.do_cancel(r["id"]): docStoreConn.delete({"doc_id": r["doc_id"]}, search.index_name(r["tenant_id"]), r["kb_id"]) @@ -405,7 +399,7 @@ def main(): callback(1., "Done!") DocumentService.increment_chunk_num( r["doc_id"], r["kb_id"], tk_count, chunk_count, 0) - cron_logger.info( + logger.info( "Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format( r["id"], tk_count, len(cks), timer() - st)) @@ -421,16 +415,16 @@ def report_status(): obj[CONSUMER_NAME].append(timer()) obj[CONSUMER_NAME] = obj[CONSUMER_NAME][-60:] REDIS_CONN.set_obj("TASKEXE", obj, 60*2) - except Exception as e: - print("[Exception]:", str(e)) + except Exception: + logger.exception("report_status got exception") time.sleep(30) if __name__ == "__main__": peewee_logger = logging.getLogger('peewee') peewee_logger.propagate = False - peewee_logger.addHandler(database_logger.handlers[0]) - peewee_logger.setLevel(database_logger.level) + peewee_logger.addHandler(logger.handlers[0]) + peewee_logger.setLevel(logger.handlers[0].level) exe = ThreadPoolExecutor(max_workers=1) exe.submit(report_status) diff --git a/rag/utils/azure_sas_conn.py b/rag/utils/azure_sas_conn.py index 380cfc5b90c..87495e63318 100644 --- a/rag/utils/azure_sas_conn.py +++ b/rag/utils/azure_sas_conn.py @@ -2,7 +2,6 @@ import time from io import BytesIO from rag import settings -from rag.settings import azure_logger from rag.utils import singleton from azure.storage.blob import ContainerClient @@ -19,14 +18,13 @@ def __open__(self): try: if self.conn: self.__close__() - except Exception as e: + except Exception: pass try: self.conn = ContainerClient.from_container_url(self.container_url + "?" + self.sas_token) - except Exception as e: - azure_logger.error( - "Fail to connect %s " % self.container_url + str(e)) + except Exception: + logger.exception("Fail to connect %s " % self.container_url) def __close__(self): del self.conn @@ -40,24 +38,24 @@ def put(self, bucket, fnm, binary): for _ in range(3): try: return self.conn.upload_blob(name=fnm, data=BytesIO(binary), length=len(binary)) - except Exception as e: - azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail put {bucket}/{fnm}") self.__open__() time.sleep(1) def rm(self, bucket, fnm): try: self.conn.delete_blob(fnm) - except Exception as e: - azure_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail rm {bucket}/{fnm}") def get(self, bucket, fnm): for _ in range(1): try: r = self.conn.download_blob(fnm) return r.read() - except Exception as e: - azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"fail get {bucket}/{fnm}") self.__open__() time.sleep(1) return @@ -65,16 +63,16 @@ def get(self, bucket, fnm): def obj_exist(self, bucket, fnm): try: return self.conn.get_blob_client(fnm).exists() - except Exception as e: - azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail put {bucket}/{fnm}") return False def get_presigned_url(self, bucket, fnm, expires): for _ in range(10): try: return self.conn.get_presigned_url("GET", bucket, fnm, expires) - except Exception as e: - azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"fail get {bucket}/{fnm}") self.__open__() time.sleep(1) return diff --git a/rag/utils/azure_spn_conn.py b/rag/utils/azure_spn_conn.py index d614fdfbfe0..cbaea213100 100644 --- a/rag/utils/azure_spn_conn.py +++ b/rag/utils/azure_spn_conn.py @@ -1,7 +1,6 @@ import os import time from rag import settings -from rag.settings import azure_logger from rag.utils import singleton from azure.identity import ClientSecretCredential, AzureAuthorityHosts from azure.storage.filedatalake import FileSystemClient @@ -22,15 +21,14 @@ def __open__(self): try: if self.conn: self.__close__() - except Exception as e: + except Exception: pass try: credentials = ClientSecretCredential(tenant_id=self.tenant_id, client_id=self.client_id, client_secret=self.secret, authority=AzureAuthorityHosts.AZURE_CHINA) self.conn = FileSystemClient(account_url=self.account_url, file_system_name=self.container_name, credential=credentials) - except Exception as e: - azure_logger.error( - "Fail to connect %s " % self.account_url + str(e)) + except Exception: + logger.exception("Fail to connect %s" % self.account_url) def __close__(self): del self.conn @@ -48,16 +46,16 @@ def put(self, bucket, fnm, binary): f = self.conn.create_file(fnm) f.append_data(binary, offset=0, length=len(binary)) return f.flush_data(len(binary)) - except Exception as e: - azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail put {bucket}/{fnm}") self.__open__() time.sleep(1) def rm(self, bucket, fnm): try: self.conn.delete_file(fnm) - except Exception as e: - azure_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail rm {bucket}/{fnm}") def get(self, bucket, fnm): for _ in range(1): @@ -65,8 +63,8 @@ def get(self, bucket, fnm): client = self.conn.get_file_client(fnm) r = client.download_file() return r.read() - except Exception as e: - azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"fail get {bucket}/{fnm}") self.__open__() time.sleep(1) return @@ -75,16 +73,16 @@ def obj_exist(self, bucket, fnm): try: client = self.conn.get_file_client(fnm) return client.exists() - except Exception as e: - azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail put {bucket}/{fnm}") return False def get_presigned_url(self, bucket, fnm, expires): for _ in range(10): try: return self.conn.get_presigned_url("GET", bucket, fnm, expires) - except Exception as e: - azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"fail get {bucket}/{fnm}") self.__open__() time.sleep(1) return \ No newline at end of file diff --git a/rag/utils/es_conn.py b/rag/utils/es_conn.py index 9b38d895e50..65546ffb25b 100644 --- a/rag/utils/es_conn.py +++ b/rag/utils/es_conn.py @@ -9,7 +9,7 @@ from elasticsearch import Elasticsearch from elasticsearch_dsl import UpdateByQuery, Q, Search, Index from elastic_transport import ConnectionTimeout -from rag.settings import doc_store_logger +from api.utils.log_utils import logger from rag import settings from rag.utils import singleton from api.utils.file_utils import get_project_base_directory @@ -17,7 +17,7 @@ from rag.utils.doc_store_conn import DocStoreConnection, MatchExpr, OrderByExpr, MatchTextExpr, MatchDenseExpr, FusionExpr from rag.nlp import is_english, rag_tokenizer -doc_store_logger.info("Elasticsearch sdk version: "+str(elasticsearch.__version__)) +logger.info("Elasticsearch sdk version: "+str(elasticsearch.__version__)) @singleton @@ -34,10 +34,10 @@ def __init__(self): ) if self.es: self.info = self.es.info() - doc_store_logger.info("Connect to es.") + logger.info("Connect to es.") break - except Exception as e: - doc_store_logger.error("Fail to connect to es: " + str(e)) + except Exception: + logger.exception("Fail to connect to es") time.sleep(1) if not self.es.ping(): raise Exception("Can't connect to ES cluster") @@ -70,14 +70,14 @@ def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int): return IndicesClient(self.es).create(index=indexName, settings=self.mapping["settings"], mappings=self.mapping["mappings"]) - except Exception as e: - doc_store_logger.error("ES create index error %s ----%s" % (indexName, str(e))) + except Exception: + logger.exception("ES create index error %s" % (indexName)) def deleteIdx(self, indexName: str, knowledgebaseId: str): try: return self.es.indices.delete(indexName, allow_no_indices=True) - except Exception as e: - doc_store_logger.error("ES delete index error %s ----%s" % (indexName, str(e))) + except Exception: + logger.exception("ES delete index error %s" % (indexName)) def indexExist(self, indexName: str, knowledgebaseId: str) -> bool: s = Index(indexName, self.es) @@ -85,7 +85,7 @@ def indexExist(self, indexName: str, knowledgebaseId: str) -> bool: try: return s.exists() except Exception as e: - doc_store_logger.error("ES indexExist: " + str(e)) + logger.exception("ES indexExist") if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0: continue return False @@ -159,7 +159,7 @@ def search(self, selectFields: list[str], highlightFields: list[str], condition: if limit > 0: s = s[offset:limit] q = s.to_dict() - doc_store_logger.info("ESConnection.search [Q]: " + json.dumps(q)) + # logger.info("ESConnection.search [Q]: " + json.dumps(q)) for i in range(3): try: @@ -171,18 +171,14 @@ def search(self, selectFields: list[str], highlightFields: list[str], condition: _source=True) if str(res.get("timed_out", "")).lower() == "true": raise Exception("Es Timeout.") - doc_store_logger.info("ESConnection.search res: " + str(res)) + logger.info("ESConnection.search res: " + str(res)) return res except Exception as e: - doc_store_logger.error( - "ES search exception: " + - str(e) + - "\n[Q]: " + - str(q)) + logger.exception("ES search [Q]: " + str(q)) if str(e).find("Timeout") > 0: continue raise e - doc_store_logger.error("ES search timeout for 3 times!") + logger.error("ES search timeout for 3 times!") raise Exception("ES search timeout.") def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None: @@ -198,15 +194,11 @@ def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict chunk["id"] = chunkId return chunk except Exception as e: - doc_store_logger.error( - "ES get exception: " + - str(e) + - "[Q]: " + - chunkId) + logger.exception(f"ES get({chunkId}) got exception") if str(e).find("Timeout") > 0: continue raise e - doc_store_logger.error("ES search timeout for 3 times!") + logger.error("ES search timeout for 3 times!") raise Exception("ES search timeout.") def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str) -> list[str]: @@ -236,7 +228,7 @@ def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str) -> res.append(str(item[action]["_id"]) + ":" + str(item[action]["error"])) return res except Exception as e: - doc_store_logger.warning("Fail to bulk: " + str(e)) + logger.warning("Fail to bulk: " + str(e)) if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE): time.sleep(3) continue @@ -253,9 +245,7 @@ def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseI self.es.update(index=indexName, id=chunkId, doc=doc) return True except Exception as e: - doc_store_logger.error( - "ES update exception: " + str(e) + " id:" + str(id) + - json.dumps(newValue, ensure_ascii=False)) + logger.exception(f"ES failed to update(index={indexName}, id={id}, doc={json.dumps(condition, ensure_ascii=False)})") if str(e).find("Timeout") > 0: continue else: @@ -292,8 +282,7 @@ def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseI _ = ubq.execute() return True except Exception as e: - doc_store_logger.error("ES update exception: " + - str(e) + "[Q]:" + str(bqry.to_dict())) + logger.error("ES update exception: " + str(e) + "[Q]:" + str(bqry.to_dict())) if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0: continue return False @@ -315,7 +304,7 @@ def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int: qry.must.append(Q("term", **{k: v})) else: raise Exception("Condition value must be int, str or list.") - doc_store_logger.info("ESConnection.delete [Q]: " + json.dumps(qry.to_dict())) + logger.info("ESConnection.delete [Q]: " + json.dumps(qry.to_dict())) for _ in range(10): try: res = self.es.delete_by_query( @@ -324,7 +313,7 @@ def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int: refresh=True) return res["deleted"] except Exception as e: - doc_store_logger.warning("Fail to delete: " + str(filter) + str(e)) + logger.warning("Fail to delete: " + str(filter) + str(e)) if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE): time.sleep(3) continue @@ -407,7 +396,7 @@ def getAggregation(self, res, fieldnm: str): SQL """ def sql(self, sql: str, fetch_size: int, format: str): - doc_store_logger.info(f"ESConnection.sql get sql: {sql}") + logger.info(f"ESConnection.sql get sql: {sql}") sql = re.sub(r"[ `]+", " ", sql) sql = sql.replace("%", "") replaces = [] @@ -424,17 +413,17 @@ def sql(self, sql: str, fetch_size: int, format: str): for p, r in replaces: sql = sql.replace(p, r, 1) - doc_store_logger.info(f"ESConnection.sql to es: {sql}") + logger.info(f"ESConnection.sql to es: {sql}") for i in range(3): try: res = self.es.sql.query(body={"query": sql, "fetch_size": fetch_size}, format=format, request_timeout="2s") return res except ConnectionTimeout: - doc_store_logger.error("ESConnection.sql timeout [Q]: " + sql) + logger.exception("ESConnection.sql timeout [Q]: " + sql) continue - except Exception as e: - doc_store_logger.error(f"ESConnection.sql failure: {sql} => " + str(e)) + except Exception: + logger.exception("ESConnection.sql got exception [Q]: " + sql) return None - doc_store_logger.error("ESConnection.sql timeout for 3 times!") + logger.error("ESConnection.sql timeout for 3 times!") return None diff --git a/rag/utils/infinity_conn.py b/rag/utils/infinity_conn.py index 5be47918639..f022ecdb904 100644 --- a/rag/utils/infinity_conn.py +++ b/rag/utils/infinity_conn.py @@ -7,7 +7,7 @@ from infinity.index import IndexInfo, IndexType from infinity.connection_pool import ConnectionPool from rag import settings -from rag.settings import doc_store_logger +from api.utils.log_utils import logger from rag.utils import singleton import polars as pl from polars.series.series import Series @@ -22,7 +22,6 @@ OrderByExpr, ) - def equivalent_condition_to_str(condition: dict) -> str: assert "_id" not in condition cond = list() @@ -56,7 +55,7 @@ def __init__(self): host, port = infinity_uri.split(":") infinity_uri = infinity.common.NetworkAddress(host, int(port)) self.connPool = ConnectionPool(infinity_uri) - doc_store_logger.info(f"Connected to infinity {infinity_uri}.") + logger.info(f"Connected to infinity {infinity_uri}.") """ Database operations @@ -71,7 +70,7 @@ def health(self) -> dict: TODO: Infinity-sdk provides health() to wrap `show global variables` and `show tables` """ inf_conn = self.connPool.get_conn() - res = infinity.show_current_node() + res = inf_conn.show_current_node() self.connPool.release_conn(inf_conn) color = "green" if res.error_code == 0 else "red" res2 = { @@ -132,7 +131,7 @@ def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int): ) break self.connPool.release_conn(inf_conn) - doc_store_logger.info( + logger.info( f"INFINITY created table {table_name}, vector size {vectorSize}" ) @@ -142,7 +141,7 @@ def deleteIdx(self, indexName: str, knowledgebaseId: str): db_instance = inf_conn.get_database(self.dbName) db_instance.drop_table(table_name, ConflictType.Ignore) self.connPool.release_conn(inf_conn) - doc_store_logger.info(f"INFINITY dropped table {table_name}") + logger.info(f"INFINITY dropped table {table_name}") def indexExist(self, indexName: str, knowledgebaseId: str) -> bool: table_name = f"{indexName}_{knowledgebaseId}" @@ -152,8 +151,8 @@ def indexExist(self, indexName: str, knowledgebaseId: str) -> bool: _ = db_instance.get_table(table_name) self.connPool.release_conn(inf_conn) return True - except Exception as e: - doc_store_logger.error("INFINITY indexExist: " + str(e)) + except Exception: + logger.exception("INFINITY indexExist") return False """ @@ -263,7 +262,7 @@ def search( df_list.append(kb_res) self.connPool.release_conn(inf_conn) res = pl.concat(df_list) - doc_store_logger.info("INFINITY search tables: " + str(table_list)) + logger.info("INFINITY search tables: " + str(table_list)) return res def get( @@ -318,8 +317,8 @@ def insert( str_filter = f"id IN ({str_ids})" table_instance.delete(str_filter) # for doc in documents: - # doc_store_logger.info(f"insert position_list: {doc['position_list']}") - # doc_store_logger.info(f"InfinityConnection.insert {json.dumps(documents)}") + # logger.info(f"insert position_list: {doc['position_list']}") + # logger.info(f"InfinityConnection.insert {json.dumps(documents)}") table_instance.insert(documents) self.connPool.release_conn(inf_conn) doc_store_logger.info(f"inserted into {table_name} {str_ids}.") @@ -329,7 +328,7 @@ def update( self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str ) -> bool: # if 'position_list' in newValue: - # doc_store_logger.info(f"update position_list: {newValue['position_list']}") + # logger.info(f"upsert position_list: {newValue['position_list']}") inf_conn = self.connPool.get_conn() db_instance = inf_conn.get_database(self.dbName) table_name = f"{indexName}_{knowledgebaseId}" @@ -350,7 +349,7 @@ def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int: try: table_instance = db_instance.get_table(table_name) except Exception: - doc_store_logger.warning( + logger.warning( f"Skipped deleting `{filter}` from table {table_name} since the table doesn't exist." ) return 0 diff --git a/rag/utils/minio_conn.py b/rag/utils/minio_conn.py index 5841dabc331..5053ef2d3f3 100644 --- a/rag/utils/minio_conn.py +++ b/rag/utils/minio_conn.py @@ -1,10 +1,9 @@ -import os import time from minio import Minio from io import BytesIO from rag import settings -from rag.settings import minio_logger from rag.utils import singleton +from api.utils.log_utils import logger @singleton @@ -17,7 +16,7 @@ def __open__(self): try: if self.conn: self.__close__() - except Exception as e: + except Exception: pass try: @@ -26,9 +25,9 @@ def __open__(self): secret_key=settings.MINIO["password"], secure=False ) - except Exception as e: - minio_logger.error( - "Fail to connect %s " % settings.MINIO["host"] + str(e)) + except Exception: + logger.exception( + "Fail to connect %s " % settings.MINIO["host"]) def __close__(self): del self.conn @@ -55,24 +54,24 @@ def put(self, bucket, fnm, binary): len(binary) ) return r - except Exception as e: - minio_logger.error(f"Fail put {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail put {bucket}/{fnm}:") self.__open__() time.sleep(1) def rm(self, bucket, fnm): try: self.conn.remove_object(bucket, fnm) - except Exception as e: - minio_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail put {bucket}/{fnm}:") def get(self, bucket, fnm): for _ in range(1): try: r = self.conn.get_object(bucket, fnm) return r.read() - except Exception as e: - minio_logger.error(f"fail get {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail put {bucket}/{fnm}:") self.__open__() time.sleep(1) return @@ -81,8 +80,8 @@ def obj_exist(self, bucket, fnm): try: if self.conn.stat_object(bucket, fnm):return True return False - except Exception as e: - minio_logger.error(f"Fail put {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail put {bucket}/{fnm}:") return False @@ -90,8 +89,8 @@ def get_presigned_url(self, bucket, fnm, expires): for _ in range(10): try: return self.conn.get_presigned_url("GET", bucket, fnm, expires) - except Exception as e: - minio_logger.error(f"fail get {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail put {bucket}/{fnm}:") self.__open__() time.sleep(1) return diff --git a/rag/utils/redis_conn.py b/rag/utils/redis_conn.py index 09d37bf8339..b256b0e0cad 100644 --- a/rag/utils/redis_conn.py +++ b/rag/utils/redis_conn.py @@ -110,9 +110,8 @@ def queue_product(self, queue, message, exp=settings.SVR_QUEUE_RETENTION) -> boo #pipeline.expire(queue, exp) pipeline.execute() return True - except Exception as e: - print(e) - logging.warning("[EXCEPTION]producer" + str(queue) + "||" + str(e)) + except Exception: + logging.exception("producer" + str(queue) + " got exception") return False def queue_consumer(self, queue_name, group_name, consumer_name, msg_id=b">") -> Payload: @@ -143,7 +142,7 @@ def queue_consumer(self, queue_name, group_name, consumer_name, msg_id=b">") -> if 'key' in str(e): pass else: - logging.warning("[EXCEPTION]consumer: " + str(queue_name) + "||" + str(e)) + logging.exception("consumer: " + str(queue_name) + " got exception") return None def get_unacked_for(self, consumer_name, queue_name, group_name): @@ -160,7 +159,7 @@ def get_unacked_for(self, consumer_name, queue_name, group_name): except Exception as e: if 'key' in str(e): return - logging.warning("[EXCEPTION]xpending_range: " + consumer_name + "||" + str(e)) + logging.exception("xpending_range: " + consumer_name + " got exception") self.__open__() REDIS_CONN = RedisDB() diff --git a/rag/utils/s3_conn.py b/rag/utils/s3_conn.py index ab2d45a77ed..86397ce1ce3 100644 --- a/rag/utils/s3_conn.py +++ b/rag/utils/s3_conn.py @@ -4,7 +4,6 @@ from botocore.client import Config import time from io import BytesIO -from rag.settings import s3_logger from rag.utils import singleton @singleton @@ -21,7 +20,7 @@ def __open__(self): try: if self.conn: self.__close__() - except Exception as e: + except Exception: pass try: @@ -40,9 +39,9 @@ def __open__(self): aws_secret_access_key=self.secret_key, config=config ) - except Exception as e: - s3_logger.error( - "Fail to connect %s " % self.endpoint + str(e)) + except Exception: + logger.exception( + "Fail to connect %s" % self.endpoint) def __close__(self): del self.conn @@ -50,11 +49,11 @@ def __close__(self): def bucket_exists(self, bucket): try: - s3_logger.error(f"head_bucket bucketname {bucket}") + logger.debug(f"head_bucket bucketname {bucket}") self.conn.head_bucket(Bucket=bucket) exists = True - except ClientError as e: - s3_logger.error(f"head_bucket error {bucket}: " + str(e)) + except ClientError: + logger.exception(f"head_bucket error {bucket}") exists = False return exists @@ -63,7 +62,7 @@ def health(self): if not self.bucket_exists(bucket): self.conn.create_bucket(Bucket=bucket) - s3_logger.error(f"create bucket {bucket} ********") + logger.debug(f"create bucket {bucket} ********") r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm) return r @@ -75,25 +74,25 @@ def list(self, bucket, dir, recursive=True): return [] def put(self, bucket, fnm, binary): - s3_logger.error(f"bucket name {bucket}; filename :{fnm}:") + logger.debug(f"bucket name {bucket}; filename :{fnm}:") for _ in range(1): try: if not self.bucket_exists(bucket): self.conn.create_bucket(Bucket=bucket) - s3_logger.error(f"create bucket {bucket} ********") + logger.info(f"create bucket {bucket} ********") r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm) return r - except Exception as e: - s3_logger.error(f"Fail put {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail put {bucket}/{fnm}") self.__open__() time.sleep(1) def rm(self, bucket, fnm): try: self.conn.delete_object(Bucket=bucket, Key=fnm) - except Exception as e: - s3_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"Fail rm {bucket}/{fnm}") def get(self, bucket, fnm): for _ in range(1): @@ -101,8 +100,8 @@ def get(self, bucket, fnm): r = self.conn.get_object(Bucket=bucket, Key=fnm) object_data = r['Body'].read() return object_data - except Exception as e: - s3_logger.error(f"fail get {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"fail get {bucket}/{fnm}") self.__open__() time.sleep(1) return @@ -128,8 +127,8 @@ def get_presigned_url(self, bucket, fnm, expires): ExpiresIn=expires) return r - except Exception as e: - s3_logger.error(f"fail get url {bucket}/{fnm}: " + str(e)) + except Exception: + logger.exception(f"fail get url {bucket}/{fnm}") self.__open__() time.sleep(1) return