Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add the non-threaded and threaded streaming support for pipeline #4889

Closed
wants to merge 6 commits into from

Conversation

yuanwu2017
Copy link
Contributor

@yuanwu2017 yuanwu2017 commented May 11, 2023

  • Add the dropping prompt feature in HFTokenStreamingHandler

  • Return a streaming generator when pipeline including streaming mode PromptNode runs.

  • Add the streaming interface for rest_api

Related Issues

Proposed Changes:

Return a streaming generator when pipeline including streaming mode PromptNode runs.

How did you test it?

pytest prompt
===================================================================== 138 passed, 48 skipped, 5 warnings in 103.81s (0:01:43) =====================================================================

Notes for the reviewer

Checklist

  • I have read the contributors guidelines and the code of conduct
  • I have updated the related issue with new insights and changes
  • I added tests that demonstrate the correct behavior of the change
  • I've used one of the conventional commit types for my PR title: fix:, feat:, build:, chore:, ci:, docs:, style:, refactor:, perf:, test:.
  • I documented my code
  • I ran pre-commit hooks and fixed any issue

@yuanwu2017 yuanwu2017 requested a review from a team as a code owner May 11, 2023 17:04
@yuanwu2017 yuanwu2017 requested review from julian-risch and removed request for a team May 11, 2023 17:04
@CLAassistant
Copy link

CLAassistant commented May 11, 2023

CLA assistant check
All committers have signed the CLA.

@coveralls
Copy link
Collaborator

coveralls commented May 11, 2023

Pull Request Test Coverage Report for Build 5086898268

  • 0 of 0 changed or added relevant lines in 0 files are covered.
  • 86 unchanged lines in 2 files lost coverage.
  • Overall coverage increased (+0.09%) to 39.71%

Files with Coverage Reduction New Missed Lines %
nodes/prompt/invocation_layer/hugging_face.py 27 71.09%
nodes/prompt/prompt_node.py 59 61.08%
Totals Coverage Status
Change from base Build 5081399763: 0.09%
Covered Lines: 8909
Relevant Lines: 22435

💛 - Coveralls

@yuanwu2017
Copy link
Contributor Author

I didn't modify the feedback.py.

rest_api/rest_api/controller/feedback.py:116: error: Item "TableCell" of "Union[Span, TableCell]" has no attribute "start" [union-attr]

@yuanwu2017 yuanwu2017 force-pushed the upstream branch 2 times, most recently from f44ec6d to d46d854 Compare May 12, 2023 02:49
@yuanwu2017
Copy link
Contributor Author

Please @vblagoje also help to review together.

@julian-risch
Copy link
Member

Hi @yuanwu2017 thank you for opening this PR. There is no issue linked but I assume you had a conversation with somebody already, probably @vblagoje ? 🙂 @vblagoje Could you please add some context? Thank you!

@yuanwu2017
Copy link
Contributor Author

yuanwu2017 commented May 12, 2023

Hi @yuanwu2017 thank you for opening this PR. There is no issue linked but I assume you had a conversation with somebody already, probably @vblagoje ? 🙂 @vblagoje Could you please add some context? Thank you!

I have not disscussed it with anybody. I created a issue #4890 to clarify it.

@vblagoje
Copy link
Member

@yuanwu2017 it would be even better to go with both non-threaded and threaded streaming handler options. We'll look into this one a bit closer, and we'll get back to you early next week with ideas on how to proceed forward.

@vblagoje
Copy link
Member

@yuanwu2017 I had another quick look, and it seems that HF already has a threaded version of the TextStreamer interface here

For our users and us, having non-threaded and threaded streaming would make the most sense. Can we work towards that solution? We can follow currently established design patterns to accommodate both approaches without any potential major issues. Let me know your thoughts.

@yuanwu2017
Copy link
Contributor Author

yuanwu2017 commented May 16, 2023

Thanks for your quick review. You are right. I will integrate the TextIteratorStreamer in HFLocalInvocationLayer for threaded streaming. HFLocalInvocationLayer will start a new thread for pipe() function. Users can get the TextIteratorStreamer generator from output of pipeline.run or promp_node.run. For non-threaded streaming, users need to run prompt node or pipeline in new thread for avoiding block ,when they develop applications. Am I right?

@yuanwu2017 yuanwu2017 force-pushed the upstream branch 2 times, most recently from a59d18b to 7f6034b Compare May 26, 2023 03:26
@yuanwu2017 yuanwu2017 changed the title feat: Add the streaming generator for streaming pipeline feat: Add the non-threaded and threaded streaming support for pipeline May 26, 2023
@yuanwu2017
Copy link
Contributor Author

yuanwu2017 commented May 26, 2023

@vblagoje I updated the PR. Please help to review. Currently the transformers has a bug about using TextIteratorStreamer for text-generation pipeline. I submitted a PR(huggingface/transformers#23641) for it. But for text2text-generation(google/flan-t5-base), it is ok.

@vblagoje
Copy link
Member

@yuanwu2017 this looks much better now! Note that I'll be on PTO this week. Let's wait for the HF fix to be accepted and we'll move forward with this one. Heads up - we also have a pending PR for hugging face layer, so rebase on top of it once it gets integrated!

@yuanwu2017
Copy link
Contributor Author

@vblagoje Thanks.

@dfokina
Copy link
Contributor

dfokina commented May 31, 2023

Hi, @yuanwu2017! I am the technical writer for the team and I noticed some docstrings that could be updated in accordance with our guidelines – would it be okay if I update those and commit directly? But I can also leave comments if you prefer.

@yuanwu2017
Copy link
Contributor Author

Hi, @yuanwu2017! I am the technical writer for the team and I noticed some docstrings that could be updated in accordance with our guidelines – would it be okay if I update those and commit directly? But I can also leave comments if you prefer.

It would be appreciated if you could update those and commit directly.

@dfokina
Copy link
Contributor

dfokina commented Jun 1, 2023

Thanks @yuanwu2017 , done :)

@julian-risch julian-risch removed their request for review June 7, 2023 07:51
@vblagoje
Copy link
Member

vblagoje commented Jun 7, 2023

@yuanwu2017 I'm back; do we need to wait for the HF fix? Anything we can do in the meantime?

@yuanwu2017
Copy link
Contributor Author

@yuanwu2017 I'm back; do we need to wait for the HF fix? Anything we can do in the meantime?

The PR(huggingface/transformers#23641) has been merged. I will rebase this PR.

@vblagoje
Copy link
Member

@yuanwu2017 congrats on the HF PR integration 🚀 We can't integrate this one fully until the HF transformers patch is released, but we can use the transformers main branch to test it out and play with the branch. We seem to have some merge conflicts; LMK if you need assistance.

yuanwu2017 and others added 4 commits June 15, 2023 07:03
* Add the paramters in PromptNode.run() to support running the pipeline
  in streaming mode.

* PromptNode.run() return iterator of TextIteratorStreamer when prompt node runs
  with stream=True and return_iterator=True.

* Add the streaming interface for rest_api

Signed-off-by: yuanwu <[email protected]>
* When running pipeline with default prameters, the prompt template is
None, the prompt should be the query.

* Add promptNode streaming iterator test.

* Add then unit test for pipeline streaming mode test.

Signed-off-by: yuanwu <[email protected]>
@yuanwu2017
Copy link
Contributor Author

@vblagoje I only added the unit tests for the text2text-generation pipeline, so it should pass the test. The HF PR is for text-generation pipeline. Do you think I need to refine the unit test?

@yuanwu2017
Copy link
Contributor Author

@vblagoje The get_task exception happend. The default model cannot work now. Maybe it is a new bug in HF? I will try to find out.

FAILED test/prompt/test_prompt_node.py::test_prompt_node_hf_model_pipeline_with_streaming_mode - ValueError: Model google/flan-t5-base is not supported - no matching invocation layer found. Currently supported invocation layers are: [<class 'haystack.nodes.prompt.invocation_layer.open_ai.OpenAIInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.azure_open_ai.AzureOpenAIInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.chatgpt.ChatGPTInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.azure_chatgpt.AzureChatGPTInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.hugging_face.HFLocalInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.hugging_face_inference.HFInferenceEndpointInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.anthropic_claude.AnthropicClaudeInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.cohere.CohereInvocationLayer'>] You can implement and provide custom invocation layer for google/flan-t5-base by subclassing PromptModelInvocationLayer.

@vblagoje
Copy link
Member

vblagoje commented Jun 15, 2023

@vblagoje The get_task exception happend. The default model cannot work now. Maybe it is a new bug in HF? I will try to find out.

FAILED test/prompt/test_prompt_node.py::test_prompt_node_hf_model_pipeline_with_streaming_mode - ValueError: Model google/flan-t5-base is not supported - no matching invocation layer found. Currently supported invocation layers are: [<class 'haystack.nodes.prompt.invocation_layer.open_ai.OpenAIInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.azure_open_ai.AzureOpenAIInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.chatgpt.ChatGPTInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.azure_chatgpt.AzureChatGPTInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.hugging_face.HFLocalInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.hugging_face_inference.HFInferenceEndpointInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.anthropic_claude.AnthropicClaudeInvocationLayer'>, <class 'haystack.nodes.prompt.invocation_layer.cohere.CohereInvocationLayer'>] You can implement and provide custom invocation layer for google/flan-t5-base by subclassing PromptModelInvocationLayer.

This usually happens when HF is under temp outage. You can bypass it by providing task_name model_kwargs. It should be text2text-generation. See the init and supports methods of HFLocalInvocationLayer for more details.

@yuanwu2017
Copy link
Contributor Author

Cannot load the configuration from the HF. Maybe we need to wailt for HF recovery.

E OSError: Can't load the configuration of 'google/flan-t5-base'. If you were trying to load it from 'https://huggingface.co/models', make sure you don't have a local directory with the same name. Otherwise, make sure 'google/flan-t5-base' is the correct path to a directory containing a config.json file

@yuanwu2017
Copy link
Contributor Author

yuanwu2017 commented Jun 25, 2023

@vblagoje I found that it can work, if I ran these test source code without pytest. It seems that the pytest disconnects the network during the test. Are there any configurable parameters that allow pytest to pass these tests?

@vblagoje
Copy link
Member

Hey @yuanwu2017, thanks for your updates. The test fails to run because we don't allow unit tests to open any HTTP connections. As huggingface models check for the task the model supports (we allow only text-generation and text2text-generation) in the supports method of HFLocalInvocationLayer then the unit test fails to load the HFLocalInvocationLayer for the PromptNode. Please continue to experiment with your tests by using the integration marker. Eventually, it would be awesome to add unit tests in the test_hugging_face.py module. I can help out with mocking if you need me.

I'll now experiment with this branch and provide more detailed feedback.

@yuanwu2017
Copy link
Contributor Author

yuanwu2017 commented Jun 28, 2023

@vblagoje Yes, I need your help. I have no idea to test the threaded streaming pipeline with mocking,

@vblagoje
Copy link
Member

vblagoje commented Jul 7, 2023

@yuanwu2017, we appreciate your input and recognize the importance of enabling this particular use case. We are actively investigating solutions to enable streaming for all LLMs via REST. This would not only apply to local Hugging Face models but all others as well. We appreciate your patience and understanding :-)

@vblagoje
Copy link
Member

vblagoje commented Jul 7, 2023

Hey @yuanwu2017

A more straightforward approach might be the following crude example:

import threading
import queue

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from haystack.nodes import PromptNode
from haystack.nodes.prompt.invocation_layer import TokenStreamingHandler
from pydantic import BaseModel

app = FastAPI()

pn = PromptNode("gpt-3.5-turbo",
                api_key="key_here",
                max_length=512)


class ThreadedGenerator:
    def __init__(self):
        self.queue = queue.Queue()

    def __iter__(self):
        return self

    def __next__(self):
        item = self.queue.get()
        if item is StopIteration: raise item
        return item

    def send(self, data):
        self.queue.put(data)

    def close(self):
        self.queue.put(StopIteration)


class FastAPITokenStreamingHandler(TokenStreamingHandler):
    def __init__(self, generator: ThreadedGenerator):
        self.generator = generator

    def __call__(self, token_received, **kwargs):
        self.generator.send(token_received)
        return token_received


def prompt_node_invocation_thread(prompt_node, g, prompt):
    try:
        prompt_node(prompt, stream_handler=FastAPITokenStreamingHandler(g))
    finally:
        g.close()


class PromptPayload(BaseModel):
    prompt: str


def token_generator(text: str):
    g = ThreadedGenerator()
    threading.Thread(target=prompt_node_invocation_thread, args=(pn, g, text)).start()
    return g


@app.get("/question")
async def stream(payload: PromptPayload):
    return StreamingResponse(token_generator(payload.prompt), media_type='text/event-stream')

start uvicorn, and then hit the server with the following curl command, for example:

curl -w "\n" -N -X GET -H "Content-Type: application/json" -d '{"prompt": "Tell me about Berlin"}' http://localhost:8000/question

And this will work for all supported types of PromptNode rather than HF only. Please let us know your thoughts about this approach. Note, of course, that this is not a production example. You'll need to handle client disconnects before the generator is exhausted for example etc etc, exception handling can be improved as well, but the gist of the idea is here.

@yuanwu2017
Copy link
Contributor Author

Yes, Implementing a streamer handler in RESTAPI layer is good choice. I also tried it before. But it encouted a python exception when integrating the prompt node with streamer handler into pipleine. Becuase there is a deepcopy in pipeline implementation. Hope this is a useful hint. If you already have an implemented patch, I can close this one. Or I can continue to finish this implementation according to your suggestion. Please let me know. Thank you very much for your patient review and reply.

@vblagoje
Copy link
Member

@yuanwu2017 I tried the above approach with several other models, including a local flan t5. It all worked as expected. I think we should close this PR and the relevant issue at this time with the above-specified workaround. In the next iteration of Haystack, we'll include this use case as a built-in option. Let us know your thoughts.

@yuanwu2017
Copy link
Contributor Author

Ok. I will close it.

@david-hung
Copy link

Hello @yuanwu2017,

I'm a user of Haystack, and I've been searching for a way to create a chatbot that streams tokens one by one. I'm curious to know if this feature is currently in development and when we can expect it to be released.

Thank you!

@yuanwu2017
Copy link
Contributor Author

@david-hung Please refer to the example

@dgisser
Copy link

dgisser commented Jul 27, 2024

@yuanwu2017 I am still on an old version of haystack, so I too faced this issue of the deepcopy in the pipeline implementation. You can get around it by adding

def __deepcopy__(self, memo):
    memo[id(self)] = self
    return self

to ThreadedGenerator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: add generator of StreamingHandler into pipeline output.
8 participants