Skip to content

Commit

Permalink
Rework logging (infiniflow#3358)
Browse files Browse the repository at this point in the history
Unified all log files into one.

Unified all log files into one.

- [x] Refactoring
  • Loading branch information
yuzhichang authored and jhaiq committed Nov 30, 2024
1 parent a55c73b commit b776310
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 28 deletions.
10 changes: 10 additions & 0 deletions api/ragflow_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
#

import logging
from api.utils.log_utils import initRootLogger
initRootLogger("ragflow_server")
for module in ["pdfminer"]:
module_logger = logging.getLogger(module)
module_logger.setLevel(logging.WARNING)
for module in ["peewee"]:
module_logger = logging.getLogger(module)
module_logger.handlers.clear()
module_logger.propagate = True

import os
import signal
import sys
Expand Down
88 changes: 60 additions & 28 deletions rag/svr/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import datetime
import logging
import sys
from api.utils.log_utils import initRootLogger
CONSUMER_NO = "0" if len(sys.argv) < 2 else sys.argv[1]
initRootLogger(f"task_executor_{CONSUMER_NO}")
for module in ["pdfminer"]:
module_logger = logging.getLogger(module)
module_logger.setLevel(logging.WARNING)
for module in ["peewee"]:
module_logger = logging.getLogger(module)
module_logger.handlers.clear()
module_logger.propagate = True

from datetime import datetime
import json
import logging
import os
Expand All @@ -22,8 +35,7 @@
import re
import sys
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
import threading
from functools import partial
from io import BytesIO
from multiprocessing.context import TimeoutError
Expand Down Expand Up @@ -72,9 +84,14 @@
ParserType.KG.value: knowledge_graph
}

CONSUMER_NAME = "task_consumer_" + ("0" if len(sys.argv) < 2 else sys.argv[1])
CONSUMER_NAME = "task_consumer_" + CONSUMER_NO
PAYLOAD: Payload | None = None

BOOT_AT = datetime.now().isoformat()
DONE_TASKS = 0
RETRY_TASKS = 0
PENDING_TASKS = 0
HEAD_CREATED_AT = ""
HEAD_DETAIL = ""

def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing..."):
global PAYLOAD
Expand Down Expand Up @@ -196,9 +213,9 @@ def build(row):
md5 = hashlib.md5()
md5.update((ck["content_with_weight"] +
str(d["doc_id"])).encode("utf-8"))
d["_id"] = md5.hexdigest()
d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19]
d["create_timestamp_flt"] = datetime.datetime.now().timestamp()
d["id"] = md5.hexdigest()
d["create_time"] = str(datetime.now()).replace("T", " ")[:19]
d["create_timestamp_flt"] = datetime.now().timestamp()
if not d.get("image"):
docs.append(d)
continue
Expand Down Expand Up @@ -323,9 +340,9 @@ def run_raptor(row, chat_mdl, embd_mdl, callback=None):
d = copy.deepcopy(doc)
md5 = hashlib.md5()
md5.update((content + str(d["doc_id"])).encode("utf-8"))
d["_id"] = md5.hexdigest()
d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19]
d["create_timestamp_flt"] = datetime.datetime.now().timestamp()
d["id"] = md5.hexdigest()
d["create_time"] = str(datetime.now()).replace("T", " ")[:19]
d["create_timestamp_flt"] = datetime.now().timestamp()
d[vctr_nm] = vctr.tolist()
d["content_with_weight"] = content
d["content_ltks"] = rag_tokenizer.tokenize(content)
Expand Down Expand Up @@ -411,29 +428,44 @@ def main():


def report_status():
global CONSUMER_NAME
global CONSUMER_NAME, BOOT_AT, DONE_TASKS, RETRY_TASKS, PENDING_TASKS, HEAD_CREATED_AT, HEAD_DETAIL
REDIS_CONN.sadd("TASKEXE", CONSUMER_NAME)
while True:
try:
obj = REDIS_CONN.get("TASKEXE")
if not obj: obj = {}
else: obj = json.loads(obj)
if CONSUMER_NAME not in obj: obj[CONSUMER_NAME] = []
obj[CONSUMER_NAME].append(timer())
obj[CONSUMER_NAME] = obj[CONSUMER_NAME][-60:]
REDIS_CONN.set_obj("TASKEXE", obj, 60*2)
except Exception as e:
print("[Exception]:", str(e))
now = datetime.now()
PENDING_TASKS = REDIS_CONN.queue_length(SVR_QUEUE_NAME)
if PENDING_TASKS > 0:
head_info = REDIS_CONN.queue_head(SVR_QUEUE_NAME)
if head_info is not None:
seconds = int(head_info[0].split("-")[0])/1000
HEAD_CREATED_AT = datetime.fromtimestamp(seconds).isoformat()
HEAD_DETAIL = head_info[1]

heartbeat = json.dumps({
"name": CONSUMER_NAME,
"now": now.isoformat(),
"boot_at": BOOT_AT,
"done": DONE_TASKS,
"retry": RETRY_TASKS,
"pending": PENDING_TASKS,
"head_created_at": HEAD_CREATED_AT,
"head_detail": HEAD_DETAIL,
})
REDIS_CONN.zadd(CONSUMER_NAME, heartbeat, now.timestamp())
logging.info(f"{CONSUMER_NAME} reported heartbeat: {heartbeat}")

expired = REDIS_CONN.zcount(CONSUMER_NAME, 0, now.timestamp() - 60*30)
if expired > 0:
REDIS_CONN.zpopmin(CONSUMER_NAME, expired)
except Exception:
logging.exception("report_status got exception")
time.sleep(30)


if __name__ == "__main__":
peewee_logger = logging.getLogger('peewee')
peewee_logger.propagate = False
peewee_logger.addHandler(database_logger.handlers[0])
peewee_logger.setLevel(database_logger.level)

exe = ThreadPoolExecutor(max_workers=1)
exe.submit(report_status)
background_thread = threading.Thread(target=report_status)
background_thread.daemon = True
background_thread.start()

while True:
main()
Expand Down
81 changes: 81 additions & 0 deletions rag/utils/redis_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,69 @@ def set(self, k, v, exp=3600):
self.__open__()
return False

def sadd(self, key: str, member: str):
try:
self.REDIS.sadd(key, member)
return True
except Exception as e:
logging.warning("[EXCEPTION]sadd" + str(key) + "||" + str(e))
self.__open__()
return False

def srem(self, key: str, member: str):
try:
self.REDIS.srem(key, member)
return True
except Exception as e:
logging.warning("[EXCEPTION]srem" + str(key) + "||" + str(e))
self.__open__()
return False

def smembers(self, key: str):
try:
res = self.REDIS.smembers(key)
return res
except Exception as e:
logging.warning("[EXCEPTION]smembers" + str(key) + "||" + str(e))
self.__open__()
return None

def zadd(self, key: str, member: str, score: float):
try:
self.REDIS.zadd(key, {member: score})
return True
except Exception as e:
logging.warning("[EXCEPTION]zadd" + str(key) + "||" + str(e))
self.__open__()
return False

def zcount(self, key: str, min: float, max: float):
try:
res = self.REDIS.zcount(key, min, max)
return res
except Exception as e:
logging.warning("[EXCEPTION]spopmin" + str(key) + "||" + str(e))
self.__open__()
return 0

def zpopmin(self, key: str, count: int):
try:
res = self.REDIS.zpopmin(key, count)
return res
except Exception as e:
logging.warning("[EXCEPTION]spopmin" + str(key) + "||" + str(e))
self.__open__()
return None

def zrangebyscore(self, key: str, min: float, max: float):
try:
res = self.REDIS.zrangebyscore(key, min, max)
return res
except Exception as e:
logging.warning("[EXCEPTION]srangebyscore" + str(key) + "||" + str(e))
self.__open__()
return None

def transaction(self, key, value, exp=3600):
try:
pipeline = self.REDIS.pipeline(transaction=True)
Expand Down Expand Up @@ -163,4 +226,22 @@ def get_unacked_for(self, consumer_name, queue_name, group_name):
logging.warning("[EXCEPTION]xpending_range: " + consumer_name + "||" + str(e))
self.__open__()

def queue_length(self, queue) -> int:
for _ in range(3):
try:
num = self.REDIS.xlen(queue)
return num
except Exception:
logging.exception("queue_length" + str(queue) + " got exception")
return 0

def queue_head(self, queue) -> int:
for _ in range(3):
try:
ent = self.REDIS.xrange(queue, count=1)
return ent[0]
except Exception:
logging.exception("queue_head" + str(queue) + " got exception")
return 0

REDIS_CONN = RedisDB()

0 comments on commit b776310

Please sign in to comment.