From 060091e057a0ad49270363ee86ede148b8f2ed6e Mon Sep 17 00:00:00 2001 From: wangzhihong Date: Mon, 9 Sep 2024 15:56:36 +0800 Subject: [PATCH] Add web and server resource for engine (#231) --- lazyllm/engine/engine.py | 68 +++++++++++++++++++++++++---- lazyllm/engine/lightengine.py | 7 ++- lazyllm/tools/webpages/webmodule.py | 4 +- tests/basic_tests/test_engine.py | 22 ++++++++++ 4 files changed, 89 insertions(+), 12 deletions(-) diff --git a/lazyllm/engine/engine.py b/lazyllm/engine/engine.py index ec45f0ab..5f3a687b 100644 --- a/lazyllm/engine/engine.py +++ b/lazyllm/engine/engine.py @@ -100,11 +100,63 @@ def get_args(cls, key, value, builder_key=None): _constructor = NodeConstructor() +class ServerGraph(lazyllm.ModuleBase): + def __init__(self, g: lazyllm.graph, server: Node, web: Node): + super().__init__() + self._g = lazyllm.ActionModule(g) + if server: + if server.args.get('port'): raise NotImplementedError('Port is not supported now') + self._g = lazyllm.ServerModule(g) + if web: + port = self._get_port(web.args['port']) + self._web = lazyllm.WebModule(g, port=port, title=web.args['title'], audio=web.args['audio'], + history=[Engine().build_node(h).func for h in web.args['history']]) + + def forward(self, *args, **kw): + return self._g(*args, **kw) + + # TODO(wangzhihong) + def _update(self, *, mode=None, recursive=True): + super(__class__, self)._update(mode=mode, recursive=recursive) + if hasattr(self, '_web'): self._web.start() + return self + + def _get_port(self, port): + if not port: return None + elif ',' in port: + return list(int(p.strip()) for p in port.split(',')) + elif '-' in port: + left, right = tuple(int(p.strip()) for p in port.split('-')) + assert left < right + return range(left, right) + return int(port) + + @property + def api_url(self): + if isinstance(self._g, lazyllm.ServerModule): + return self._g._url + return None + + @property + def web_url(self): + if hasattr(self, '_web'): + return self._web.url + return None + + @NodeConstructor.register('Graph') @NodeConstructor.register('SubGraph') -def make_graph(nodes: List[dict], edges: List[dict], resources: List[dict] = []): +def make_graph(nodes: List[dict], edges: List[dict], resources: List[dict] = [], enable_server=True): engine = Engine() - resources = [engine.build_node(resource) for resource in resources] + server_resources = dict(server=None, web=None) + for resource in resources: + if resource['kind'] in server_resources: + assert enable_server, 'Web and Api server are not allowed outside graph and subgraph' + assert server_resources[resource['kind']] is None, f'Duplicated {resource["kind"]} resource' + server_resources[resource['kind']] = Node(id=resource['id'], kind=resource['kind'], + name=resource['name'], args=resource['args']) + + resources = [engine.build_node(resource) for resource in resources if resource['kind'] not in server_resources] nodes = [engine.build_node(node) for node in nodes] with graph() as g: @@ -117,12 +169,12 @@ def make_graph(nodes: List[dict], edges: List[dict], resources: List[dict] = []) formatter = lazyllm.formatter.JsonLike(formatter) g.add_edge(engine._nodes[edge['iid']].name, engine._nodes[edge['oid']].name, formatter) - return g + return ServerGraph(g, server_resources['server'], server_resources['web']) @NodeConstructor.register('App') -def make_subapp(nodes: List[dict], edges: List[dict]): - return make_graph(nodes, edges) +def make_subapp(nodes: List[dict], edges: List[dict], resources: List[dict] = []): + return make_graph(nodes, edges, resources) # Note: It will be very dangerous if provided to C-end users as a SAAS service @@ -153,15 +205,15 @@ def make_switch(judge_on_full_input: bool, nodes: Dict[str, List[dict]]): @NodeConstructor.register('Warp') def make_warp(nodes: List[dict], edges: List[dict], resources: List[dict] = []): - return lazyllm.warp(make_graph(nodes, edges, resources)) + return lazyllm.warp(make_graph(nodes, edges, resources, enable_server=False)) @NodeConstructor.register('Loop') def make_loop(stop_condition: str, nodes: List[dict], edges: List[dict], resources: List[dict] = [], judge_on_full_input: bool = True): stop_condition = make_code(stop_condition) - return lazyllm.loop(make_graph(nodes, edges, resources), stop_condition=stop_condition, - judge_on_full_input=judge_on_full_input) + return lazyllm.loop(make_graph(nodes, edges, resources, enable_server=False), + stop_condition=stop_condition, judge_on_full_input=judge_on_full_input) @NodeConstructor.register('Ifs') diff --git a/lazyllm/engine/lightengine.py b/lazyllm/engine/lightengine.py index 98bced5f..cfacab54 100644 --- a/lazyllm/engine/lightengine.py +++ b/lazyllm/engine/lightengine.py @@ -37,12 +37,15 @@ def start(self, nodes: List[Dict] = [], edges: List[Dict] = [], resources: List[ node = Node(id=gid or str(uuid.uuid4().hex), kind='Graph', name=name or str(uuid.uuid4().hex), args=dict(nodes=nodes, edges=edges, resources=resources)) self.graph = self.build_node(node).func - ActionModule(self.graph).start() + self.graph.start() def update(self, nodes: List[Dict] = [], changed_nodes: List[Dict] = [], edges: List[Dict] = [], changed_resources: List[Dict] = [], gid: Optional[str] = None, name: Optional[str] = None): - for r in changed_resources: self.update_node(r) + for r in changed_resources: + if r['kind'] in ('server', 'web'): + raise NotImplementedError('Web and Api server are not allowed now') + self.update_node(r) for n in changed_nodes: self.update_node(n) node = Node(id=gid or str(uuid.uuid4().hex), kind='Graph', name=name or str(uuid.uuid4().hex), args=dict(nodes=nodes, edges=edges)) diff --git a/lazyllm/tools/webpages/webmodule.py b/lazyllm/tools/webpages/webmodule.py index 442ab10c..dcc494ff 100644 --- a/lazyllm/tools/webpages/webmodule.py +++ b/lazyllm/tools/webpages/webmodule.py @@ -36,13 +36,13 @@ class Mode: Refresh = 1 Appendix = 2 - def __init__(self, m, *, components=dict(), title='对话演示终端', port=range(20500, 20799), + def __init__(self, m, *, components=dict(), title='对话演示终端', port=None, history=[], text_mode=None, trace_mode=None, audio=False) -> None: super().__init__() self.m = lazyllm.ActionModule(m) if isinstance(m, lazyllm.FlowBase) else m self.pool = lazyllm.ThreadPoolExecutor(max_workers=50) self.title = title - self.port = port + self.port = port or range(20500, 20799) components = sum([[([k._module_id, k._module_name] + list(v)) for v in vs] for k, vs in components.items()], []) self.ckeys = [[c[0], c[2]] for c in components] diff --git a/tests/basic_tests/test_engine.py b/tests/basic_tests/test_engine.py index d70dce9a..4fe9f065 100644 --- a/tests/basic_tests/test_engine.py +++ b/tests/basic_tests/test_engine.py @@ -1,5 +1,8 @@ from lazyllm import LightEngine import pytest +import time +from gradio_client import Client +import lazyllm class TestEngine(object): @@ -98,6 +101,25 @@ def test_engine_formatter(self): assert engine.run(1) == '1[2, 4]1' assert engine.run(2) == '2[4, 8]4' + def test_engine_server(self): + nodes = [dict(id='1', kind='Code', name='m1', args='def test(x: int):\n return 2 * x\n')] + edges = [dict(iid='__start__', oid='1'), dict(iid='1', oid='__end__')] + resources = [dict(id='2', kind='server', name='s1', args=dict(port=None)), + dict(id='3', kind='web', name='w1', args=dict(port=None, title='网页', history=[], audio=False)) + ] + engine = LightEngine() + engine.start(nodes, edges, resources, gid='graph-1') + assert engine.run(1) == 2 + time.sleep(3) + web = engine.build_node('graph-1').func._web + client = Client(web.url, download_files=web.cach_path) + chat_history = [['123', None]] + ans = client.predict(False, chat_history, False, False, api_name="/_respond_stream") + assert ans[0][-1][-1] == '123123' + client.close() + lazyllm.launcher.cleanup() + web.stop() + class TestEngineRAG(object):