From dded11825ffff2f0e2673dbd69796aa1ae7d4358 Mon Sep 17 00:00:00 2001 From: BaoHuiling Date: Mon, 2 Sep 2024 14:40:51 +0800 Subject: [PATCH 1/8] add videoragqna gateway Signed-off-by: BaoHuiling --- comps/__init__.py | 1 + comps/cores/mega/constants.py | 1 + comps/cores/mega/gateway.py | 42 +++++++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/comps/__init__.py b/comps/__init__.py index c58ae42fe..324ba6dc8 100644 --- a/comps/__init__.py +++ b/comps/__init__.py @@ -44,6 +44,7 @@ AudioQnAGateway, RetrievalToolGateway, FaqGenGateway, + VideoRAGQnAGateway, VisualQnAGateway, ) diff --git a/comps/cores/mega/constants.py b/comps/cores/mega/constants.py index 10863c149..0b25b4cb6 100644 --- a/comps/cores/mega/constants.py +++ b/comps/cores/mega/constants.py @@ -38,6 +38,7 @@ class MegaServiceEndpoint(Enum): CHAT_QNA = "/v1/chatqna" AUDIO_QNA = "/v1/audioqna" VISUAL_QNA = "/v1/visualqna" + VIDEO_RAG_QNA = "/v1/videoragqna" CODE_GEN = "/v1/codegen" CODE_TRANS = "/v1/codetrans" DOC_SUMMARY = "/v1/docsum" diff --git a/comps/cores/mega/gateway.py b/comps/cores/mega/gateway.py index cc8eaf5d2..744012ab8 100644 --- a/comps/cores/mega/gateway.py +++ b/comps/cores/mega/gateway.py @@ -532,6 +532,48 @@ async def handle_request(self, request: Request): ) return ChatCompletionResponse(model="visualqna", choices=choices, usage=usage) +class VideoRAGQnAGateway(Gateway): + def __init__(self, megaservice, host="0.0.0.0", port=8888): + super().__init__( + megaservice, host, port, str(MegaServiceEndpoint.VIDEO_RAG_QNA), ChatCompletionRequest, ChatCompletionResponse + ) + + async def handle_request(self, request: Request): + data = await request.json() + stream_opt = data.get("stream", False) + chat_request = ChatCompletionRequest.parse_obj(data) + prompt = self._handle_message(chat_request.messages) + parameters = LLMParams( + max_new_tokens=chat_request.max_tokens if chat_request.max_tokens else 1024, + top_k=chat_request.top_k if chat_request.top_k else 10, + top_p=chat_request.top_p if chat_request.top_p else 0.95, + temperature=chat_request.temperature if chat_request.temperature else 0.01, + repetition_penalty=chat_request.presence_penalty if chat_request.presence_penalty else 1.03, + streaming=stream_opt, + ) + result_dict, runtime_graph = await self.megaservice.schedule( + initial_inputs={"text": prompt}, llm_parameters=parameters + ) + for node, response in result_dict.items(): + # Here it suppose the last microservice in the megaservice is LVM. + if ( + isinstance(response, StreamingResponse) + and node == list(self.megaservice.services.keys())[-1] + and self.megaservice.services[node].service_type == ServiceType.LVM + ): + return response + last_node = runtime_graph.all_leaves()[-1] + response = result_dict[last_node]["text"] + choices = [] + usage = UsageInfo() + choices.append( + ChatCompletionResponseChoice( + index=0, + message=ChatMessage(role="assistant", content=response), + finish_reason="stop", + ) + ) + return ChatCompletionResponse(model="videoragqna", choices=choices, usage=usage) class RetrievalToolGateway(Gateway): """embed+retrieve+rerank.""" From b99f0d42df9d9ede2957e7bb0f5aa5dfa93fcd37 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 3 Sep 2024 08:34:54 +0000 Subject: [PATCH 2/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- comps/cores/mega/gateway.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/comps/cores/mega/gateway.py b/comps/cores/mega/gateway.py index 744012ab8..c3492333c 100644 --- a/comps/cores/mega/gateway.py +++ b/comps/cores/mega/gateway.py @@ -532,10 +532,16 @@ async def handle_request(self, request: Request): ) return ChatCompletionResponse(model="visualqna", choices=choices, usage=usage) + class VideoRAGQnAGateway(Gateway): def __init__(self, megaservice, host="0.0.0.0", port=8888): super().__init__( - megaservice, host, port, str(MegaServiceEndpoint.VIDEO_RAG_QNA), ChatCompletionRequest, ChatCompletionResponse + megaservice, + host, + port, + str(MegaServiceEndpoint.VIDEO_RAG_QNA), + ChatCompletionRequest, + ChatCompletionResponse, ) async def handle_request(self, request: Request): @@ -575,6 +581,7 @@ async def handle_request(self, request: Request): ) return ChatCompletionResponse(model="videoragqna", choices=choices, usage=usage) + class RetrievalToolGateway(Gateway): """embed+retrieve+rerank.""" From e03a6e37e66995fbc94b3ffc61c1775680b083b8 Mon Sep 17 00:00:00 2001 From: BaoHuiling Date: Fri, 6 Sep 2024 09:22:27 +0800 Subject: [PATCH 3/8] add test script for gateway Signed-off-by: BaoHuiling --- ...ce_orchestrator_with_videoragqnagateway.py | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py diff --git a/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py b/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py new file mode 100644 index 000000000..24cef8ad1 --- /dev/null +++ b/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py @@ -0,0 +1,70 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import json +from time import sleep +import unittest + +from fastapi.responses import StreamingResponse + +from comps import ServiceType, VideoRAGQnAGateway, ServiceOrchestrator, TextDoc, opea_microservices, register_microservice +from comps.cores.proto.docarray import LLMParams + +@register_microservice(name="s1", host="172.16.186.61", port=8083, endpoint="/v1/add") +async def s1_add(request: TextDoc) -> TextDoc: + req = request.model_dump_json() + req_dict = json.loads(req) + text = req_dict["text"] + text += "opea " + return {"text": text} + + +@register_microservice(name="s2", host="172.16.186.61", port=8084, endpoint="/v1/add", service_type=ServiceType.LVM) +async def s2_add(request: TextDoc) -> TextDoc: + req = request.model_dump_json() + req_dict = json.loads(req) + text = req_dict["text"] + def streamer(text): + yield f"{text}".encode("utf-8") + for i in range(3): + yield "project!".encode("utf-8") + + return StreamingResponse(streamer(text), media_type="text/event-stream") + + +class TestServiceOrchestrator(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.s1 = opea_microservices["s1"] + self.s2 = opea_microservices["s2"] + self.s1.start() + self.s2.start() + + self.service_builder = ServiceOrchestrator() + + self.service_builder.add(opea_microservices["s1"]).add(opea_microservices["s2"]) + self.service_builder.flow_to(self.s1, self.s2) + self.gateway = VideoRAGQnAGateway(self.service_builder, port=9898) + + def tearDown(self): + self.s1.stop() + self.s2.stop() + self.gateway.stop() + + async def test_schedule(self): + result_dict, _ = await self.service_builder.schedule(initial_inputs={"text": "hello, "}, llm_parameters=LLMParams(streaming=True)) + streaming_response = result_dict[self.s2.name] + + if isinstance(streaming_response, StreamingResponse): + content = b"" + async for chunk in streaming_response.body_iterator: + content += chunk + final_text = content.decode("utf-8") + + print("Streamed content from s2: ", final_text) + + expected_result = "hello, opea project!project!project!" + self.assertEqual(final_text, expected_result) + + +if __name__ == "__main__": + unittest.main() From 0187a4cbb4f97991769897e67fd29748e29c6f06 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 6 Sep 2024 01:23:03 +0000 Subject: [PATCH 4/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ...ce_orchestrator_with_videoragqnagateway.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py b/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py index 24cef8ad1..c2510d441 100644 --- a/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py +++ b/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py @@ -2,14 +2,22 @@ # SPDX-License-Identifier: Apache-2.0 import json -from time import sleep import unittest +from time import sleep from fastapi.responses import StreamingResponse -from comps import ServiceType, VideoRAGQnAGateway, ServiceOrchestrator, TextDoc, opea_microservices, register_microservice +from comps import ( + ServiceOrchestrator, + ServiceType, + TextDoc, + VideoRAGQnAGateway, + opea_microservices, + register_microservice, +) from comps.cores.proto.docarray import LLMParams + @register_microservice(name="s1", host="172.16.186.61", port=8083, endpoint="/v1/add") async def s1_add(request: TextDoc) -> TextDoc: req = request.model_dump_json() @@ -24,13 +32,14 @@ async def s2_add(request: TextDoc) -> TextDoc: req = request.model_dump_json() req_dict = json.loads(req) text = req_dict["text"] + def streamer(text): yield f"{text}".encode("utf-8") for i in range(3): yield "project!".encode("utf-8") return StreamingResponse(streamer(text), media_type="text/event-stream") - + class TestServiceOrchestrator(unittest.IsolatedAsyncioTestCase): def setUp(self): @@ -51,9 +60,11 @@ def tearDown(self): self.gateway.stop() async def test_schedule(self): - result_dict, _ = await self.service_builder.schedule(initial_inputs={"text": "hello, "}, llm_parameters=LLMParams(streaming=True)) + result_dict, _ = await self.service_builder.schedule( + initial_inputs={"text": "hello, "}, llm_parameters=LLMParams(streaming=True) + ) streaming_response = result_dict[self.s2.name] - + if isinstance(streaming_response, StreamingResponse): content = b"" async for chunk in streaming_response.body_iterator: @@ -61,7 +72,7 @@ async def test_schedule(self): final_text = content.decode("utf-8") print("Streamed content from s2: ", final_text) - + expected_result = "hello, opea project!project!project!" self.assertEqual(final_text, expected_result) From f5b7ae079de7d6f7fab435c5bc11a28d06e4ff04 Mon Sep 17 00:00:00 2001 From: BaoHuiling Date: Fri, 6 Sep 2024 13:25:02 +0800 Subject: [PATCH 5/8] rm ip Signed-off-by: BaoHuiling --- .../test_service_orchestrator_with_videoragqnagateway.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py b/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py index c2510d441..a10aa940f 100644 --- a/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py +++ b/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py @@ -3,7 +3,6 @@ import json import unittest -from time import sleep from fastapi.responses import StreamingResponse @@ -17,8 +16,7 @@ ) from comps.cores.proto.docarray import LLMParams - -@register_microservice(name="s1", host="172.16.186.61", port=8083, endpoint="/v1/add") +@register_microservice(name="s1", host="0.0.0.0", port=8083, endpoint="/v1/add") async def s1_add(request: TextDoc) -> TextDoc: req = request.model_dump_json() req_dict = json.loads(req) @@ -27,7 +25,7 @@ async def s1_add(request: TextDoc) -> TextDoc: return {"text": text} -@register_microservice(name="s2", host="172.16.186.61", port=8084, endpoint="/v1/add", service_type=ServiceType.LVM) +@register_microservice(name="s2", host="0.0.0.0", port=8084, endpoint="/v1/add", service_type=ServiceType.LVM) async def s2_add(request: TextDoc) -> TextDoc: req = request.model_dump_json() req_dict = json.loads(req) From 398a498dbd2a7ac6a5df7c80f5229cb0445bb628 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 6 Sep 2024 05:25:52 +0000 Subject: [PATCH 6/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../mega/test_service_orchestrator_with_videoragqnagateway.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py b/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py index a10aa940f..a9bdcdb33 100644 --- a/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py +++ b/tests/cores/mega/test_service_orchestrator_with_videoragqnagateway.py @@ -16,6 +16,7 @@ ) from comps.cores.proto.docarray import LLMParams + @register_microservice(name="s1", host="0.0.0.0", port=8083, endpoint="/v1/add") async def s1_add(request: TextDoc) -> TextDoc: req = request.model_dump_json() From a663a145ae82c98d80ad2c1fc7b63db4df95d800 Mon Sep 17 00:00:00 2001 From: BaoHuiling Date: Fri, 6 Sep 2024 22:18:21 +0800 Subject: [PATCH 7/8] fix exist bug Signed-off-by: BaoHuiling --- comps/embeddings/multimodal_clip/embeddings_clip.py | 4 +++- comps/retrievers/langchain/vdms/retriever_vdms.py | 2 +- comps/retrievers/langchain/vdms/vdms_config.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/comps/embeddings/multimodal_clip/embeddings_clip.py b/comps/embeddings/multimodal_clip/embeddings_clip.py index 39db85b6e..4b14b0c42 100644 --- a/comps/embeddings/multimodal_clip/embeddings_clip.py +++ b/comps/embeddings/multimodal_clip/embeddings_clip.py @@ -27,7 +27,8 @@ def embed_query(self, texts): return text_features def get_embedding_length(self): - return len(self.embed_query("sample_text")) + text_features=self.embed_query("sample_text") + return text_features.shape[1] def get_image_embeddings(self, images): """Input is list of images.""" @@ -48,3 +49,4 @@ def get_video_embeddings(self, frames_batch): video_embeddings = video_embeddings / video_embeddings.norm(dim=-1, keepdim=True) vid_embs.append(video_embeddings) return torch.cat(vid_embs, dim=0) + diff --git a/comps/retrievers/langchain/vdms/retriever_vdms.py b/comps/retrievers/langchain/vdms/retriever_vdms.py index 8dae6f8f7..5eaa29ad6 100644 --- a/comps/retrievers/langchain/vdms/retriever_vdms.py +++ b/comps/retrievers/langchain/vdms/retriever_vdms.py @@ -89,7 +89,7 @@ def retrieve(input: EmbedDoc) -> SearchedMultimodalDoc: # Create vectorstore if use_clip: - embeddings = vCLIP({"model_name": "openai/clip-vit-base-patch32", "num_frm": 4}) + embeddings = vCLIP({"model_name": "openai/clip-vit-base-patch32", "num_frm": 64}) dimensions = embeddings.get_embedding_length() elif tei_embedding_endpoint: embeddings = HuggingFaceEndpointEmbeddings(model=tei_embedding_endpoint, huggingfacehub_api_token=hf_token) diff --git a/comps/retrievers/langchain/vdms/vdms_config.py b/comps/retrievers/langchain/vdms/vdms_config.py index d388add9a..5b6a85213 100644 --- a/comps/retrievers/langchain/vdms/vdms_config.py +++ b/comps/retrievers/langchain/vdms/vdms_config.py @@ -77,4 +77,4 @@ def get_boolean_env_var(var_name, default_value=False): # VDMS_SCHEMA = os.getenv("VDMS_SCHEMA", "vdms_schema.yml") # INDEX_SCHEMA = os.path.join(parent_dir, VDMS_SCHEMA) SEARCH_ENGINE = "FaissFlat" -DISTANCE_STRATEGY = "L2" +DISTANCE_STRATEGY = "IP" From 7ed5ad611d347f8d319031d5869793780d1d4f85 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 6 Sep 2024 14:19:01 +0000 Subject: [PATCH 8/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- comps/embeddings/multimodal_clip/embeddings_clip.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/comps/embeddings/multimodal_clip/embeddings_clip.py b/comps/embeddings/multimodal_clip/embeddings_clip.py index 4b14b0c42..f010245dd 100644 --- a/comps/embeddings/multimodal_clip/embeddings_clip.py +++ b/comps/embeddings/multimodal_clip/embeddings_clip.py @@ -27,7 +27,7 @@ def embed_query(self, texts): return text_features def get_embedding_length(self): - text_features=self.embed_query("sample_text") + text_features = self.embed_query("sample_text") return text_features.shape[1] def get_image_embeddings(self, images): @@ -49,4 +49,3 @@ def get_video_embeddings(self, frames_batch): video_embeddings = video_embeddings / video_embeddings.norm(dim=-1, keepdim=True) vid_embs.append(video_embeddings) return torch.cat(vid_embs, dim=0) -