diff --git a/lazyllm/engine/engine.py b/lazyllm/engine/engine.py index 29bd2afa..5fbc7c58 100644 --- a/lazyllm/engine/engine.py +++ b/lazyllm/engine/engine.py @@ -12,6 +12,7 @@ # Each session will have a separate engine class Engine(object): __default_engine__ = None + REPORT_URL = "" def __init__(self): self._nodes = {'__start__': Node(id='__start__', kind='__start__', name='__start__'), @@ -55,7 +56,7 @@ def build_node(self, node) -> Node: return _constructor.build(node) def set_report_url(self, url) -> None: - NodeMetaHook.URL = url + Engine.REPORT_URL = url def reset(self): for node in self._nodes: @@ -133,13 +134,8 @@ def get_args(cls, key, value, builder_key=None): def _process_hook(self, node, module): if not node.enable_data_reflow: return - if isinstance(module, lazyllm.ModuleBase): - NodeMetaHook.MODULEID_TO_WIDGETID[module._module_id] = node.id - elif isinstance(module, lazyllm.LazyLLMFlowsBase): - NodeMetaHook.MODULEID_TO_WIDGETID[module._flow_id] = node.id - else: - return - node.func.register_hook(NodeMetaHook) + if isinstance(module, (lazyllm.ModuleBase, lazyllm.LazyLLMFlowsBase)): + node.func.register_hook(NodeMetaHook(node.func, Engine.REPORT_URL, node.id)) _constructor = NodeConstructor() diff --git a/lazyllm/engine/node_meta_hook.py b/lazyllm/engine/node_meta_hook.py index ae1165c2..dcab0632 100644 --- a/lazyllm/engine/node_meta_hook.py +++ b/lazyllm/engine/node_meta_hook.py @@ -16,10 +16,8 @@ class MetaKeys: class NodeMetaHook(LazyLLMHook): - URL = "" - MODULEID_TO_WIDGETID = {} - def __init__(self, obj): + def __init__(self, obj, url, front_id): if isinstance(obj, lazyllm.ModuleBase): self._uniqueid = obj._module_id elif isinstance(obj, lazyllm.FlowBase): @@ -28,16 +26,18 @@ def __init__(self, obj): raise TypeError(f"Expected 'obj' to be type of ModuleBase or FlowBase, but got {type(obj)}") self._meta_info = { MetaKeys.ID: str(self._uniqueid), - MetaKeys.SESSIONID: lazyllm.globals._sid, MetaKeys.TIMECOST: 0.0, MetaKeys.PROMPT_TOKENS: 0, MetaKeys.COMPLETION_TOKENS: 0, MetaKeys.INPUT: "", MetaKeys.OUTPUT: "", } + self._front_id = front_id + self._url = url def pre_hook(self, *args, **kwargs): arguments = {} + self._meta_info[MetaKeys.SESSIONID] = lazyllm.globals._sid if len(args) == 1: if isinstance(args[0], lazyllm.package) and len(args[0]) == 1: self._meta_info[MetaKeys.INPUT] = str(args[0][0]) @@ -54,16 +54,14 @@ def post_hook(self, output): self._meta_info[MetaKeys.OUTPUT] = str(output) if self._uniqueid in globals["usage"]: - self._meta_info.update(globals["usage"]) - if self._meta_info[MetaKeys.ID] in self.MODULEID_TO_WIDGETID: - self._meta_info[MetaKeys.ID] = self.MODULEID_TO_WIDGETID[self._meta_info[MetaKeys.ID]] + self._meta_info.update(globals["usage"][self._uniqueid]) + self._meta_info[MetaKeys.ID] = self._front_id self._meta_info[MetaKeys.TIMECOST] = time.time() - self._meta_info[MetaKeys.TIMECOST] def report(self): headers = {"Content-Type": "application/json; charset=utf-8"} json_data = json.dumps(self._meta_info, ensure_ascii=False) try: - lazyllm.LOG.info(f"meta_info: {self._meta_info}") - requests.post(self.URL, data=json_data, headers=headers) + requests.post(self._url, data=json_data, headers=headers) except Exception as e: - lazyllm.LOG.warning(f"Error sending collected data: {e}") + lazyllm.LOG.warning(f"Error sending collected data: {e}. URL: {self._url}") diff --git a/tests/basic_tests/test_engine.py b/tests/basic_tests/test_engine.py index f4d91187..6a13893a 100644 --- a/tests/basic_tests/test_engine.py +++ b/tests/basic_tests/test_engine.py @@ -1,4 +1,4 @@ -from lazyllm.engine import LightEngine, NodeMetaHook +from lazyllm.engine import LightEngine import pytest import time from gradio_client import Client @@ -7,36 +7,80 @@ from lazyllm.common.common import TimeoutException import json import unittest +import subprocess +import socket +import threading +import requests + +HOOK_PORT = 33733 +HOOK_ROUTE = "mock_post" +fastapi_code = """ from fastapi import FastAPI from fastapi.responses import JSONResponse -from fastapi.testclient import TestClient +from collections import deque app = FastAPI() +received_datas = deque(maxlen=100) -@app.post("/mock_post") +@app.post("/{route}") async def receive_json(data: dict): + print("Received json data:", data) + received_datas.append(data) return JSONResponse(content=data) +@app.get("/get_last_report") +async def get_last_report(): + if len(received_datas) > 0: + return received_datas[-1] + else: + return {{}} + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port={port}) +""".format( + port=HOOK_PORT, route=HOOK_ROUTE +) + class TestEngine(unittest.TestCase): @classmethod def setUpClass(cls): - client = TestClient(app) - - def mock_report(self): - headers = {"Content-Type": "application/json; charset=utf-8"} - json_data = json.dumps(self._meta_info, ensure_ascii=False) - try: - lazyllm.LOG.info(f"meta_info: {self._meta_info}") - response = client.post(self.URL, data=json_data, headers=headers) - assert ( - response.json() == self._meta_info - ), "mock response should be same as input" - except Exception as e: - lazyllm.LOG.warning(f"Error sending collected data: {e}") - - NodeMetaHook.report = mock_report + cls.fastapi_process = subprocess.Popen( + ["python", "-c", fastapi_code], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + hostname = socket.gethostname() + ip_address = socket.gethostbyname(hostname) + cls.report_url = f"http://{ip_address}:{HOOK_PORT}/{HOOK_ROUTE}" + cls.get_url = f"http://{ip_address}:{HOOK_PORT}/get_last_report" + + def read_stdout(process): + for line in iter(process.stdout.readline, b''): + print("FastAPI Server Output: ", line.decode(), end='') + + cls.report_print_thread = threading.Thread( + target=read_stdout, args=(cls.fastapi_process,) + ) + cls.report_print_thread.daemon = True + cls.report_print_thread.start() + + @classmethod + def tearDownClass(cls): + time.sleep(3) + cls.fastapi_process.terminate() + cls.fastapi_process.wait() + + def get_last_report(self): + r = requests.get(self.get_url) + json_obj = {} + try: + json_obj = json.loads(r.content) + except Exception as e: + lazyllm.LOG.warning(str(e)) + return json_obj @pytest.fixture(autouse=True) def run_around_tests(self): @@ -86,7 +130,7 @@ def test_engine_switch(self): nodes = [switch] edges = [dict(iid='__start__', oid='4'), dict(iid='4', oid='__end__')] engine = LightEngine() - engine.set_report_url("mock_post") + engine.set_report_url(self.report_url) gid = engine.start(nodes, edges) assert engine.run(gid, 1) == 2 assert engine.run(gid, 2) == 6 @@ -111,6 +155,7 @@ def test_engine_switch(self): assert engine.run(gid, 'case1', 2) == 4 assert engine.run(gid, 'case2', 2) == 6 assert engine.run(gid, 'case3', 3) == 9 + assert "prompt_tokens" in self.get_last_report() def test_engine_ifs(self): plus1 = dict(id='1', kind='Code', name='m1', args=dict(code='def test(x: int):\n return 1 + x\n')) @@ -130,11 +175,74 @@ def test_engine_ifs(self): nodes = [ifs] edges = [dict(iid='__start__', oid='4'), dict(iid='4', oid='__end__')] engine = LightEngine() - engine.set_report_url("mock_post") + engine.set_report_url(self.report_url) gid = engine.start(nodes, edges) assert engine.run(gid, 1) == 4 assert engine.run(gid, 5) == 12 assert engine.run(gid, 10) == 100 + assert "prompt_tokens" in self.get_last_report() + + def test_data_reflow_in_server(self): + nodes = [ + { + "id": "1", + "kind": "Code", + "name": "f1", + "args": { + "code": "def main(x): return int(x) + 1", + "_lazyllm_enable_report": True, + }, + }, + { + "id": "2", + "kind": "Code", + "name": "f2", + "args": { + "code": "def main(x): return int(x) + 2", + "_lazyllm_enable_report": True, + }, + }, + { + "id": "3", + "kind": "Code", + "name": "f3", + "args": { + "code": "def main(x): return int(x) + 3", + "_lazyllm_enable_report": True, + }, + }, + ] + edges = [ + { + "iid": "__start__", + "oid": "1", + }, + { + "iid": "1", + "oid": "2", + }, + { + "iid": "2", + "oid": "3", + }, + { + "iid": "3", + "oid": "__end__", + }, + ] + resources = [ + { + "id": "4", + "kind": "server", + "name": "s1", + "args": {}, + } + ] + engine = LightEngine() + engine.set_report_url(self.report_url) + gid = engine.start(nodes, edges, resources) + assert engine.run(gid, 1) == 7 + assert "prompt_tokens" in self.get_last_report() def test_engine_loop(self): nodes = [dict(id='1', kind='Code', name='code', args=dict(code='def square(x: int): return x * x'))] @@ -156,9 +264,10 @@ def test_engine_loop(self): edges = [dict(iid='__start__', oid='2'), dict(iid='2', oid='__end__')] engine = LightEngine() - engine.set_report_url("mock_post") + engine.set_report_url(self.report_url) gid = engine.start(nodes, edges) assert engine.run(gid, 2) == 16 + assert "prompt_tokens" in self.get_last_report() def test_engine_warp(self): nodes = [dict(id='1', kind='Code', name='code', args=dict(code='def square(x: int): return x * x'))] @@ -179,9 +288,10 @@ def test_engine_warp(self): edges = [dict(iid='__start__', oid='2'), dict(iid='2', oid='__end__')] engine = LightEngine() - engine.set_report_url("mock_post") + engine.set_report_url(self.report_url) gid = engine.start(nodes, edges) assert engine.run(gid, 2, 3, 4, 5) == (4, 9, 16, 25) + assert "prompt_tokens" in self.get_last_report() def test_engine_formatter(self): nodes = [dict(id='1', kind='Formatter', name='f1', args=dict(ftype='python', rule='[:]'))]