-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Problem integrating jina with async system #4761
Comments
Hi, thanks for reporting this issue. The exception message is on the point, yet it does not point out how to solve it. @jina-ai/team-core need better docs to explain the following: If you are building a Jina app, which involves calling another Jina server via The solution is simple, use In this example, DALLE Flow's executor calls CLIP-as-service for ranking. But as CLIP-as-service is a Jina-based service, directly using the sync-version of the Jina client will give that error you described. The solution is simple. Just use In the GLID3-XL there is another example, where I need CLIP encoding, the problem shows up again. The solution is again the same, use the async version of https://github.com/hanxiao/glid-3-xl/blob/master/dalle_flow_glid3/sample.py#L116-L120 Note, that CLIP-as-service is just an example. So if we put it back to the context:
Then, the two examples above are mapped the following:
In the case you do not have well-wrapped https://github.com/jina-ai/clip-as-service/blob/main/client/clip_client/client.py#L401 Finally, the difference between AsyncClient and Client is not as big as you imagined, they are both efficient:
https://docs.jina.ai/fundamentals/flow/client/#async-python-client |
Thanks for your reply! I'm not sure where I should replace this client with AsyncClient, (I tried to replace Flow with AsyncFlow) so I'm attaching the code that works (when running locally, but not when it is integrated with the chat bot). import os
import functools
from typing import Iterator, Tuple
from jina import AsyncFlow, Document, DocumentArray, Flow
import gamla
_DIR_NAME = os.path.dirname(__file__)
def _input_generator(answers: Tuple[str, ...]) -> Iterator[Document]:
for answer in answers:
yield Document(text=answer)
@functools.cache
def index(answers: Tuple[str, ...]) -> None:
flow = Flow(asyncio=True).load_config(os.path.join(_DIR_NAME, 'flows/flow-index.yaml'))
data_path = os.path.join(_DIR_NAME, "sexual_health_faq.csv")
with flow:
flow.post(on="/index", inputs=_input_generator(answers), show_progress=True)
def query(user_utterance: str) -> Tuple[str, float]:
flow = Flow().load_config('flows/flow-query.yaml')
with flow:
doc = Document(content=user_utterance)
result = flow.post(on='/search', inputs=DocumentArray([doc]),
parameters={'top_k': 1},
line_format='text',
return_results=True,
)
return gamla.pipe(result, gamla.head, gamla.attrgetter("_data"), gamla.attrgetter("matches"), gamla.attrgetter("_data"), gamla.head, gamla.juxt(gamla.attrgetter("text"), gamla.attrgetter("scores")))
if __name__ == "__main__":
index(("we are open everyday from 8am to 4pm.",))
query("what is your opening time?")
|
Hi @LawlAoux , let me see if I can help! I see that you create an async Flow using or Python API ( jtype: Flow
with:
asyncio: True Then instantiate your Flow: flow = Flow().load_config(os.path.join(_DIR_NAME, 'flows/flow-index.yaml')) # asyncio=True now unnecessary But I agree that this is unexpected behaviour from the user's perspective, we will look into how to improve this experience. Secondly, you only seem to make one of your Flows async, but you will have to apply the same treatment to both of them. from jina import Flow, Client
from docarray import Document, DocumentArray
flow = Flow().load_config(os.path.join(_DIR_NAME, 'flows/flow-index.yaml')
port = flow.port
... # do your other stuff
c = Client(port=port, asyncio=True)
async for results in c.post(on='/search', inputs=DocumentArray(), ...):
... # process your results Lastly, I like your idea for a functional API, but I don't think that such an API can handle the complexity that a Jina Flow offers, unfortunately. Consider, as just two examples, Flows that have complex, branching topologies, and Executors which take I hope that helps! |
Thank you very much guys! I have managed to overcome these errors with your help. I would like to ask you about optimization though, because the index and query flow run quite slow in my case (index is torch encoder + simple indexer from jina hub, and the query is the index + simple ranker from jina hub, and they run around 3 and 7 seconds, respectively). I understand that the cause of it may be the fact that I'm running it on cpu (I have a macbook pro), but do you have any tips on how to optimize the running time? |
Glad we were able to help!
If you want to go into more details regarding performance optimization and/or your specific performance bottlenecks, feel free to open a separate ticket on the issue, it really helps the rest of the community to find this information! |
Thanks for the reply! I tried a single doc (just tested it for a single faq pair). I will definitely try the indexer that you suggested and will let you know if it perform better or not (also I will now try it on a set of 10 faqs) |
Hey @LawlAoux, thanks for reporting the issue, as the original issue is solved, I will proceed to close it. Feel free to open new issues with any concerns or questions. |
Describe the bug
I have an async system for a chat bot, and we would like to integrate jina for matching faqs. When trying to run it (the jina code for faq works perfectly well alone) I'm getting
RuntimeError: you have an eventloop running but not using Jupyter/ipython, this may mean you are using Jina with other integration? if so, then you may want to use Client/Flow(asyncio=True). If not, then please report this issue here: https://github.com/jina-ai/jina
. We tried to useFlow(asyncio=True)
orAsyncFlow
but still got the same error... Would appreciate it if you could help me with it.Describe how you solve it
I think in general it would be easier to use jina with function compositions rather than the Flow object (maybe it is possible but we are not aware of how to do it). For example, instead of using
TransformerTorchEncoder
andSimpleIndexer
in the flow, it would be convenient to use these as functions and not classes (e.g.indexer = SimpleIndexer(TransformerTorchEncoder)
and then we give the input to this function). We think this API is much more convenient to developers, who can then build the indexer and query much more easilyThe text was updated successfully, but these errors were encountered: