Skip to content

Commit

Permalink
Allows the lower-level pipeline to obtain the input and intermediate …
Browse files Browse the repository at this point in the history
…results of the upper-level pipeline (LazyAGI#268)
  • Loading branch information
wzh1994 authored Sep 23, 2024
1 parent affe00f commit 5f86306
Show file tree
Hide file tree
Showing 18 changed files with 323 additions and 180 deletions.
4 changes: 2 additions & 2 deletions README.CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ writer_prompt = {"system": completion_prompt, "user": '{"title": {title}, "descr
with pipeline() as ppl:
ppl.outline_writer = lazyllm.OnlineChatModule(stream=False).formatter(JsonFormatter()).prompt(toc_prompt)
ppl.story_generater = warp(lazyllm.OnlineChatModule(stream=False).prompt(writer_prompt))
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.outline_writer)
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.output('outline_writer'))
lazyllm.WebModule(ppl, port=23466).start().wait()
```

Expand All @@ -208,7 +208,7 @@ lazyllm.WebModule(ppl, port=23466).start().wait()
with pipeline() as ppl:
ppl.outline_writer = lazyllm.TrainableModule('internlm2-chat-7b').formatter(JsonFormatter()).prompt(toc_prompt)
ppl.story_generater = warp(ppl.outline_writer.share(prompt=writer_prompt).formatter())
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.outline_writer)
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.output('outline_writer'))
lazyllm.WebModule(ppl, port=23466).start().wait()
```

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ Here is an online deployment example:
with pipeline() as ppl:
ppl.outline_writer = lazyllm.OnlineChatModule(stream=False).formatter(JsonFormatter()).prompt(toc_prompt)
ppl.story_generater = warp(lazyllm.OnlineChatModule(stream=False).prompt(writer_prompt))
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.outline_writer)
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.output('outline_writer'))
lazyllm.WebModule(ppl, port=23466).start().wait()
```

Expand All @@ -212,7 +212,7 @@ Here is an example of a local deployment:
with pipeline() as ppl:
ppl.outline_writer = lazyllm.TrainableModule('internlm2-chat-7b').formatter(JsonFormatter()).prompt(toc_prompt)
ppl.story_generater = warp(ppl.outline_writer.share(prompt=writer_prompt).formatter())
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.outline_writer)
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.output('outline_writer'))
lazyllm.WebModule(ppl, port=23466).start().wait()
```

Expand Down
6 changes: 3 additions & 3 deletions docs/en/Cookbook/great_writer.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Let's assemble the above modules with control flow.
with pipeline() as ppl:
ppl.outline_writer = lazyllm.TrainableModule('internlm2-chat-7b').formatter(JsonFormatter()).prompt(toc_prompt)
ppl.story_generater = warp(ppl.outline_writer.share(prompt=writer_prompt).formatter())
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.outline_writer)
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.output('outline_writer'))
```

In the above code, in addition to the commonly used [Pipeline][lazyllm.flow.Pipeline] control flow (Similar applications can be found at: [Master Painter](painting_master.md#use-pipeline)),
Expand All @@ -119,7 +119,7 @@ It accepts any number of inputs and then sends them in parallel to the same bran
And it is generally multiple different outputs. As the next level of content generation robot, each input needs to be processed, so using [Warp][lazyllm.flow.Warp] is very appropriate.

```python
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.outline_writer)
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.output('outline_writer'))
```

[](){#use-bind}
Expand Down Expand Up @@ -191,6 +191,6 @@ writer_prompt = {"system": completion_prompt, "user": '{"title": {title}, "descr
with pipeline() as ppl:
ppl.outline_writer = lazyllm.TrainableModule('internlm2-chat-7b').formatter(JsonFormatter()).prompt(toc_prompt)
ppl.story_generater = warp(ppl.outline_writer.share(prompt=writer_prompt).formatter())
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.outline_writer)
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.output('outline_writer'))
lazyllm.WebModule(ppl, port=23466).start().wait()
```
2 changes: 1 addition & 1 deletion docs/zh/Best Practice/flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ with pipeline() as p:
p.f1 = f1
p.f2 = f2
p.f3 = f3
p.f4 = f4 | bind(p.input, _0, p.f2)
p.f4 = f4 | bind(p.input, _0, p.output("f2"))
assert p(1) == 'get [1], [f3-5], [5]'
```

Expand Down
6 changes: 3 additions & 3 deletions docs/zh/Cookbook/great_writer.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ story_generater = warp(ppl.outline_writer.share(prompt=writer_prompt).formatter(
with pipeline() as ppl:
ppl.outline_writer = lazyllm.TrainableModule('internlm2-chat-7b').formatter(JsonFormatter()).prompt(toc_prompt)
ppl.story_generater = warp(ppl.outline_writer.share(prompt=writer_prompt).formatter())
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.outline_writer)
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.output('outline_writer'))
```

上面代码中,除了常用的 [Pipeline][lazyllm.flow.Pipeline] 控制流(类似应用见:[绘画大师](painting_master.md#use-pipeline)),
Expand All @@ -117,7 +117,7 @@ warp(ppl.outline_writer.share(prompt=writer_prompt).formatter())
而且一般是多个不同的输出。作为下一级的内容生成机器人,每个输入都需要进行处理,所以用 [Warp][lazyllm.flow.Warp] 就再合适不过了。

```python
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.outline_writer)
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.output('outline_writer'))
```

[](){#use-bind}
Expand Down Expand Up @@ -187,6 +187,6 @@ writer_prompt = {"system": completion_prompt, "user": '{"title": {title}, "descr
with pipeline() as ppl:
ppl.outline_writer = lazyllm.TrainableModule('internlm2-chat-7b').formatter(JsonFormatter()).prompt(toc_prompt)
ppl.story_generater = warp(ppl.outline_writer.share(prompt=writer_prompt).formatter())
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.outline_writer)
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.output('outline_writer'))
lazyllm.WebModule(ppl, port=23466).start().wait()
```
7 changes: 3 additions & 4 deletions examples/story.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
# flake8: noqa: F501

import lazyllm
from lazyllm import pipeline, warp, bind
Expand Down Expand Up @@ -35,7 +34,7 @@
}
]
User input is as follows:
"""
""" # noqa: E501

completion_prompt = """
You are now an intelligent assistant. Your task is to receive a dictionary containing `title` and `describe`, and expand the writing according to the guidance in `describe`.
Expand All @@ -50,14 +49,14 @@
This is the expanded content for writing.
Receive as follows:
"""
""" # noqa: E501

writer_prompt = {"system": completion_prompt, "user": '{"title": {title}, "describe": {describe}}'}

with pipeline() as ppl:
ppl.outline_writer = lazyllm.TrainableModule('internlm2-chat-7b').formatter(JsonFormatter()).prompt(toc_prompt)
ppl.story_generater = warp(ppl.outline_writer.share(prompt=writer_prompt).formatter())
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.outline_writer)
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.output("outline_writer")) # noqa: E501

if __name__ == '__main__':
lazyllm.WebModule(ppl, port=range(23467, 24000)).start().wait()
9 changes: 4 additions & 5 deletions examples/story_online.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
# -*- coding: utf-8 -*-
# flake8: noqa: F501

import lazyllm
from lazyllm import pipeline, warp, bind
from lazyllm.components.formatter import JsonFormatter

# Before running, set the environment variable:
#
# 1. `export LAZYLLM_GLM_API_KEY=xxxx`: the API key of Zhipu AI, default model "glm-4", `source="glm"`.
# 1. `export LAZYLLM_GLM_API_KEY=xxxx`: the API key of Zhipu AI, default model "glm-4", `source="glm"`.
# You can apply for the API key at https://open.bigmodel.cn/
# Also supports other API keys:
# - LAZYLLM_OPENAI_API_KEY: the API key of OpenAI, default model "gpt-3.5-turbo", `source="openai"`.
Expand Down Expand Up @@ -42,7 +41,7 @@
}
]
User input is as follows:
"""
""" # noqa: E50E

completion_prompt = """
You are now an intelligent assistant. Your task is to receive a dictionary containing `title` and `describe`, and expand the writing according to the guidance in `describe`.
Expand All @@ -57,14 +56,14 @@
This is the expanded content for writing.
Receive as follows:
"""
""" # noqa: E50E

writer_prompt = {"system": completion_prompt, "user": '{"title": {title}, "describe": {describe}}'}

with pipeline() as ppl:
ppl.outline_writer = lazyllm.OnlineChatModule(stream=False).formatter(JsonFormatter()).prompt(toc_prompt)
ppl.story_generater = warp(lazyllm.OnlineChatModule(stream=False).prompt(writer_prompt))
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.outline_writer)
ppl.synthesizer = (lambda *storys, outlines: "\n".join([f"{o['title']}\n{s}" for s, o in zip(storys, outlines)])) | bind(outlines=ppl.output('outline_writer')) # noqa: E50E

if __name__ == '__main__':
lazyllm.WebModule(ppl, port=range(23467, 24000)).start().wait()
2 changes: 1 addition & 1 deletion lazyllm/common/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .registry import LazyLLMRegisterMetaClass, _get_base_cls_from_registry, Register
from .common import package, kwargs, arguments, LazyLLMCMD, timeout, final, ReadOnlyWrapper, DynamicDescriptor
from .common import root, Bind as bind, _0, _1, _2, _3, _4, _5, _6, _7, _8, _9
from .common import FlatList, Identity, ResultCollector, ArgsDict, CaseInsensitiveDict
from .common import ReprRule, make_repr, modify_repr
from .common import once_flag, call_once, once_wrapper
Expand All @@ -10,6 +9,7 @@
from .logger import LOG
from .deprecated import deprecated
from .globals import globals, LazyLlmResponse, LazyLlmRequest, encode_request, decode_request
from .bind import root, Bind as bind, _0, _1, _2, _3, _4, _5, _6, _7, _8, _9
from .queue import FileSystemQueue
from .utils import compile_func

Expand Down
143 changes: 143 additions & 0 deletions lazyllm/common/bind.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import copy
import builtins
from typing import Callable, Any
from .globals import globals


class AttrTree(object):
def __init__(self, name=None, pres=[]):
self._path = copy.deepcopy(pres)
if name is not None:
self._path.append(name)

def __str__(self):
return '.'.join(self._path)

def __getattr__(self, name):
v = __class__(name, pres=self._path)
setattr(self, name, v)
return v

def get_from(self, obj):
v = obj
for name in self._path:
v = getattr(v, name)
return v

def __deepcopy__(self, memo):
return self

root = AttrTree()


class Placeholder(object):
_pool = dict()

def __new__(cls, idx):
if idx not in Placeholder._pool:
Placeholder._pool[idx] = super().__new__(cls)
return Placeholder._pool[idx]

def __init__(self, idx):
assert isinstance(idx, int)
self.idx = idx

def __deepcopy__(self, memo=None):
return self

def __repr__(self):
return f'placeholder._{self.idx}'

for i in range(10):
vars()[f'_{i}'] = Placeholder(i)

def _setattr(self, key, v):
raise RuntimeError('Cannot set attr for Placeholder')
setattr(Placeholder, '__setattr__', _setattr)


class _MetaBind(type):
def __instancecheck__(self, __instance):
if isinstance(__instance, Bind) and isinstance(__instance._f, self):
return True
return super(__class__, self).__instancecheck__(__instance)


class Bind(object):
class _None: pass

class Args(object):
class _None: pass

def __init__(self, source_id, target_id='input'):
self._item_key, self._attr_key = Bind.Args._None, Bind.Args._None
self._source_id, self._target_id = source_id, target_id

def __getitem__(self, key):
self._item_key = key
return self

def __getattr__(self, key):
self._attr_key = key
return self

def __getstate__(self):
return self._item_key, self._attr_key, self._source_id, self._target_id

def __setstate__(self, state):
self._item_key, self._attr_key, self._source_id, self._target_id = state

def get_arg(self, source):
if self._source_id in globals['bind_args']: source = globals['bind_args'][self._source_id]
if not source or source['source'] != self._source_id:
raise RuntimeError('Unable to find the bound parameter, possibly due to pipeline.input/output can only '
'be bind in direct member of pipeline! You may solve this by defining the pipeline '
'in a `with lazyllm.save_pipeline_result():` block.')
source, input = source['source'], source[self._target_id]
if self._item_key is not Bind.Args._None: return input[self._item_key]
elif self._attr_key is not Bind.Args._None: return getattr(input, self._attr_key)
return input

def __init__(self, __bind_func=_None, *args, **kw):
self._f = __bind_func() if isinstance(__bind_func, type) and __bind_func is not Bind._None else __bind_func
self._args = args
self._kw = kw
self._has_root = any([isinstance(a, AttrTree) for a in args])
self._has_root = self._has_root or any([isinstance(v, AttrTree) for k, v in kw.items()])

def __ror__(self, __value: Callable):
if self._f is not Bind._None: self._args = (self._f,) + self._args
self._f = __value
return self

# _bind_args_source: dict(input=input, args=dict(key=value))
def __call__(self, *args, _bind_args_source=None, **kw):
if self._f is None: return None
keys = set(kw.keys()).intersection(set(self._kw.keys()))
assert len(keys) == 0, f'Keys `{keys}` are already bind!'
bind_args = args if len(self._args) == 0 else (
[args[a.idx] if isinstance(a, Placeholder) else a for a in self._args])
kwargs = self._kw

bind_args = [a.get_arg(_bind_args_source) if isinstance(a, Bind.Args) else a for a in bind_args]
kwargs = {k: v.get_arg(_bind_args_source) if isinstance(v, Bind.Args) else v for k, v in kwargs.items()}
return self._f(*bind_args, **kwargs, **kw)

# TODO: modify it
def __repr__(self) -> str:
return self._f.__repr__() + '(bind args:{})'.format(
', '.join([repr(a) if a is not self else 'self' for a in self._args]))

def __getattr__(self, name):
# name will be '_f' in copy.deepcopy
if name != '_f':
return getattr(self._f, name)
return super(__class__, self).__getattr__(name)

def __setattr__(self, __name: str, __value: Any) -> None:
if __name not in ('_f', '_args', '_kw', '_has_root'):
return setattr(self._f, __name, __value)
return super(__class__, self).__setattr__(__name, __value)


setattr(builtins, 'bind', Bind)
Loading

0 comments on commit 5f86306

Please sign in to comment.