From 2a7943d2e64814b45f97c9d1bfffc0e6aaea4a9d Mon Sep 17 00:00:00 2001 From: Tiep Le <97980157+tileintel@users.noreply.github.com> Date: Thu, 29 Aug 2024 05:52:17 -0700 Subject: [PATCH] Multimodal dataprep (#575) * multimodal embedding for MM RAG for videos Signed-off-by: Tiep Le * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * develop data prep first commit Signed-off-by: Tiep Le * develop dataprep microservice for multimodal data Signed-off-by: Tiep Le * multimodal langchain for dataprep Signed-off-by: Tiep Le * update README Signed-off-by: Tiep Le * update README Signed-off-by: Tiep Le * update README Signed-off-by: Tiep Le * update README Signed-off-by: Tiep Le * cosmetic Signed-off-by: Tiep Le * test for multimodal dataprep Signed-off-by: Tiep Le * update test Signed-off-by: Tiep Le * update test Signed-off-by: Tiep Le * update test Signed-off-by: Tiep Le * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * cosmetic update Signed-off-by: Tiep Le * remove langsmith Signed-off-by: Tiep Le * update API to remove /dataprep from API names and remove langsmith Signed-off-by: Tiep Le * update test Signed-off-by: Tiep Le * update the error message per PR reviewer Signed-off-by: Tiep Le --------- Signed-off-by: Tiep Le Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- comps/__init__.py | 4 + comps/cores/proto/docarray.py | 39 +- comps/dataprep/multimodal_utils.py | 258 +++++++++ comps/dataprep/redis/README.md | 4 +- .../redis/multimodal_langchain/README.md | 213 +++++++ .../redis/multimodal_langchain/__init__.py | 2 + .../redis/multimodal_langchain/config.py | 70 +++ .../multimodal_langchain/docker/Dockerfile | 37 ++ .../docker/docker-compose-dataprep-redis.yaml | 29 + .../prepare_videodoc_redis.py | 527 ++++++++++++++++++ .../multimodal_langchain/requirements.txt | 19 + .../redis/multimodal_langchain/schema.yml | 19 + .../multimodal_embeddings/README.md | 185 ++++++ .../multimodal_embeddings/__init__.py | 2 + .../bridgetower/__init__.py | 5 + .../bridgetower/bridgetower_custom.py | 243 ++++++++ .../bridgetower/bridgetower_embedding.py | 122 ++++ .../bridgetower/bridgetower_server.py | 153 +++++ .../bridgetower/docker/Dockerfile | 25 + .../bridgetower/docker/Dockerfile_hpu | 29 + ...ompose_bridgetower_embedding_endpoint.yaml | 19 + .../bridgetower/utils.py | 90 +++ .../multimodal_langchain/__init__.py | 2 + .../multimodal_langchain/docker/Dockerfile | 29 + .../docker_compose_multimodal_embedding.yaml | 21 + .../local_mm_embedding.py | 58 ++ .../multimodal_langchain/mm_embedding_mmei.py | 84 +++ .../multimodal_langchain/requirements.txt | 14 + ...est_dataprep_redis_multimodal_langchain.sh | 278 +++++++++ ...est_multimodal_embeddings_langchain_cpu.sh | 111 ++++ ...est_multimodal_embeddings_langchain_hpu.sh | 111 ++++ 31 files changed, 2799 insertions(+), 3 deletions(-) create mode 100644 comps/dataprep/multimodal_utils.py create mode 100644 comps/dataprep/redis/multimodal_langchain/README.md create mode 100644 comps/dataprep/redis/multimodal_langchain/__init__.py create mode 100644 comps/dataprep/redis/multimodal_langchain/config.py create mode 100644 comps/dataprep/redis/multimodal_langchain/docker/Dockerfile create mode 100644 comps/dataprep/redis/multimodal_langchain/docker/docker-compose-dataprep-redis.yaml create mode 100644 comps/dataprep/redis/multimodal_langchain/prepare_videodoc_redis.py create mode 100644 comps/dataprep/redis/multimodal_langchain/requirements.txt create mode 100644 comps/dataprep/redis/multimodal_langchain/schema.yml create mode 100644 comps/embeddings/multimodal_embeddings/README.md create mode 100644 comps/embeddings/multimodal_embeddings/__init__.py create mode 100644 comps/embeddings/multimodal_embeddings/bridgetower/__init__.py create mode 100644 comps/embeddings/multimodal_embeddings/bridgetower/bridgetower_custom.py create mode 100644 comps/embeddings/multimodal_embeddings/bridgetower/bridgetower_embedding.py create mode 100644 comps/embeddings/multimodal_embeddings/bridgetower/bridgetower_server.py create mode 100644 comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile create mode 100644 comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile_hpu create mode 100644 comps/embeddings/multimodal_embeddings/bridgetower/docker/docker_compose_bridgetower_embedding_endpoint.yaml create mode 100644 comps/embeddings/multimodal_embeddings/bridgetower/utils.py create mode 100644 comps/embeddings/multimodal_embeddings/multimodal_langchain/__init__.py create mode 100644 comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/Dockerfile create mode 100644 comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/docker_compose_multimodal_embedding.yaml create mode 100644 comps/embeddings/multimodal_embeddings/multimodal_langchain/local_mm_embedding.py create mode 100644 comps/embeddings/multimodal_embeddings/multimodal_langchain/mm_embedding_mmei.py create mode 100644 comps/embeddings/multimodal_embeddings/multimodal_langchain/requirements.txt create mode 100644 tests/test_dataprep_redis_multimodal_langchain.sh create mode 100644 tests/test_multimodal_embeddings_langchain_cpu.sh create mode 100644 tests/test_multimodal_embeddings_langchain_hpu.sh diff --git a/comps/__init__.py b/comps/__init__.py index a5d00f9e07..10c5835fcd 100644 --- a/comps/__init__.py +++ b/comps/__init__.py @@ -19,6 +19,10 @@ GraphDoc, LVMDoc, LVMVideoDoc, + ImageDoc, + TextImageDoc, + MultimodalDoc, + EmbedMultimodalDoc, ) # Constants diff --git a/comps/cores/proto/docarray.py b/comps/cores/proto/docarray.py index 0d397dcb7b..aa4caf1797 100644 --- a/comps/cores/proto/docarray.py +++ b/comps/cores/proto/docarray.py @@ -6,7 +6,7 @@ import numpy as np from docarray import BaseDoc, DocList from docarray.documents import AudioDoc, VideoDoc -from docarray.typing import AudioUrl +from docarray.typing import AudioUrl, ImageUrl from pydantic import Field, conint, conlist, field_validator @@ -17,7 +17,30 @@ class TopologyInfo: class TextDoc(BaseDoc, TopologyInfo): - text: str + text: str = None + + +class ImageDoc(BaseDoc): + url: Optional[ImageUrl] = Field( + description="The path to the image. It can be remote (Web) URL, or a local file path", + default=None, + ) + base64_image: Optional[str] = Field( + description="The base64-based encoding of the image", + default=None, + ) + + +class TextImageDoc(BaseDoc): + image: ImageDoc = None + text: TextDoc = None + + +MultimodalDoc = Union[ + TextDoc, + ImageDoc, + TextImageDoc, +] class Base64ByteStrDoc(BaseDoc): @@ -43,6 +66,18 @@ class EmbedDoc(BaseDoc): score_threshold: float = 0.2 +class EmbedMultimodalDoc(EmbedDoc): + # extend EmbedDoc with these attributes + url: Optional[ImageUrl] = Field( + description="The path to the image. It can be remote (Web) URL, or a local file path.", + default=None, + ) + base64_image: Optional[str] = Field( + description="The base64-based encoding of the image.", + default=None, + ) + + class Audio2TextDoc(AudioDoc): url: Optional[AudioUrl] = Field( description="The path to the audio.", diff --git a/comps/dataprep/multimodal_utils.py b/comps/dataprep/multimodal_utils.py new file mode 100644 index 0000000000..cd71c5fc31 --- /dev/null +++ b/comps/dataprep/multimodal_utils.py @@ -0,0 +1,258 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import base64 +import json +import os +import uuid +from pathlib import Path +from typing import Iterator + +import cv2 +import requests +import webvtt +import whisper +from moviepy.editor import VideoFileClip + + +def create_upload_folder(upload_path): + """Create a directory to store uploaded video data.""" + if not os.path.exists(upload_path): + Path(upload_path).mkdir(parents=True, exist_ok=True) + + +def load_json_file(file_path): + """Read contents of json file.""" + with open(file_path, "r") as file: + data = json.load(file) + return data + + +def clear_upload_folder(upload_path): + """Clear the upload directory.""" + for root, dirs, files in os.walk(upload_path, topdown=False): + for file in files: + file_path = os.path.join(root, file) + os.remove(file_path) + for dir in dirs: + dir_path = os.path.join(root, dir) + os.rmdir(dir_path) + + +def generate_video_id(): + """Generates a unique identifier for a video file.""" + return str(uuid.uuid4()) + + +def convert_video_to_audio(video_path: str, output_audio_path: str): + """Converts video to audio using MoviePy library that uses `ffmpeg` under the hood. + + :param video_path: file path of video file (.mp4) + :param output_audio_path: file path of audio file (.wav) to be created + """ + video_clip = VideoFileClip(video_path) + audio_clip = video_clip.audio + audio_clip.write_audiofile(output_audio_path) + video_clip.close() + audio_clip.close() + + +def load_whisper_model(model_name: str = "base"): + """Load a whisper model for generating video transcripts.""" + return whisper.load_model(model_name) + + +def extract_transcript_from_audio(whisper_model, audio_path: str): + """Generate transcript from audio file. + + :param whisper_model: a pre-loaded whisper model object + :param audio_path: file path of audio file (.wav) + """ + options = dict(task="translate", best_of=5, language="en") + return whisper_model.transcribe(audio_path, **options) + + +def format_timestamp_for_transcript(seconds: float, always_include_hours: bool = True, fractionalSeperator: str = "."): + """Format timestamp for video transcripts.""" + milliseconds = round(seconds * 1000.0) + + hours = milliseconds // 3_600_000 + milliseconds -= hours * 3_600_000 + + minutes = milliseconds // 60_000 + milliseconds -= minutes * 60_000 + + seconds = milliseconds // 1_000 + milliseconds -= seconds * 1_000 + + hours_marker = f"{hours:02d}:" if always_include_hours or hours > 0 else "" + return f"{hours_marker}{minutes:02d}:{seconds:02d}{fractionalSeperator}{milliseconds:03d}" + + +def write_vtt(transcript: Iterator[dict], vtt_path: str): + """Write transcripts to a .vtt file.""" + with open(vtt_path, "a") as file: + file.write("WEBVTT\n\n") + for segment in transcript["segments"]: + text = (segment["text"]).replace("-->", "->") + file.write( + f"{format_timestamp_for_transcript(segment['start'])} --> {format_timestamp_for_transcript(segment['end'])}\n" + ) + file.write(f"{text.strip()}\n\n") + + +def delete_audio_file(audio_path: str): + """Delete audio file after extracting transcript.""" + os.remove(audio_path) + + +def time_to_frame(time: float, fps: float): + """Convert time in seconds into frame number.""" + return int(time * fps - 1) + + +def str2time(strtime: str): + """Get time in seconds from string.""" + strtime = strtime.strip('"') + hrs, mins, seconds = [float(c) for c in strtime.split(":")] + + total_seconds = hrs * 60**2 + mins * 60 + seconds + + return total_seconds + + +def convert_img_to_base64(image): + "Convert image to base64 string" + _, buffer = cv2.imencode(".jpg", image) + encoded_string = base64.b64encode(buffer) + return encoded_string.decode() + + +def extract_frames_and_annotations_from_transcripts(video_id: str, video_path: str, vtt_path: str, output_dir: str): + """Extract frames (.jpg) and annotations (.json) from video file (.mp4) and captions file (.vtt)""" + # Set up location to store frames and annotations + os.makedirs(output_dir, exist_ok=True) + os.makedirs(os.path.join(output_dir, "frames"), exist_ok=True) + + # Load video and get fps + vidcap = cv2.VideoCapture(video_path) + fps = vidcap.get(cv2.CAP_PROP_FPS) + + # read captions file + captions = webvtt.read(vtt_path) + + annotations = [] + for idx, caption in enumerate(captions): + start_time = str2time(caption.start) + end_time = str2time(caption.end) + + mid_time = (end_time + start_time) / 2 + text = caption.text.replace("\n", " ") + + frame_no = time_to_frame(mid_time, fps) + mid_time_ms = mid_time * 1000 + vidcap.set(cv2.CAP_PROP_POS_MSEC, mid_time_ms) + success, frame = vidcap.read() + + if success: + # Save frame for further processing + img_fname = f"frame_{idx}" + img_fpath = os.path.join(output_dir, "frames", img_fname + ".jpg") + cv2.imwrite(img_fpath, frame) + + # Convert image to base64 encoded string + b64_img_str = convert_img_to_base64(frame) + + # Create annotations for frame from transcripts + annotations.append( + { + "video_id": video_id, + "video_name": os.path.basename(video_path), + "b64_img_str": b64_img_str, + "caption": text, + "time": mid_time_ms, + "frame_no": frame_no, + "sub_video_id": idx, + } + ) + + # Save transcript annotations as json file for further processing + with open(os.path.join(output_dir, "annotations.json"), "w") as f: + json.dump(annotations, f) + + vidcap.release() + return annotations + + +def use_lvm(endpoint: str, img_b64_string: str, prompt: str = "Provide a short description for this scene."): + """Generate image captions/descriptions using LVM microservice.""" + inputs = {"image": img_b64_string, "prompt": prompt, "max_new_tokens": 32} + response = requests.post(url=endpoint, data=json.dumps(inputs)) + print(response) + return response.json()["text"] + + +def extract_frames_and_generate_captions( + video_id: str, video_path: str, lvm_endpoint: str, output_dir: str, key_frame_per_second: int = 1 +): + """Extract frames (.jpg) and annotations (.json) from video file (.mp4) by generating captions using LVM microservice.""" + # Set up location to store frames and annotations + os.makedirs(output_dir, exist_ok=True) + os.makedirs(os.path.join(output_dir, "frames"), exist_ok=True) + + # Load video and get fps + vidcap = cv2.VideoCapture(video_path) + fps = vidcap.get(cv2.CAP_PROP_FPS) + + annotations = [] + hop = round(fps / key_frame_per_second) + curr_frame = 0 + idx = -1 + + while True: + ret, frame = vidcap.read() + if not ret: + break + + if curr_frame % hop == 0: + idx += 1 + + mid_time = vidcap.get(cv2.CAP_PROP_POS_MSEC) + mid_time_ms = mid_time * 1000 + + frame_no = curr_frame + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + + # Save frame for further processing + img_fname = f"frame_{idx}" + img_fpath = os.path.join(output_dir, "frames", img_fname + ".jpg") + cv2.imwrite(img_fpath, frame) + + # Convert image to base64 encoded string + b64_img_str = convert_img_to_base64(frame) + + # Caption generation using LVM microservice + caption = use_lvm(lvm_endpoint, b64_img_str) + caption = caption.strip() + text = caption.replace("\n", " ") + + # Create annotations for frame from transcripts + annotations.append( + { + "video_id": video_id, + "video_name": os.path.basename(video_path), + "b64_img_str": b64_img_str, + "caption": text, + "time": mid_time_ms, + "frame_no": frame_no, + "sub_video_id": idx, + } + ) + + curr_frame += 1 + + # Save caption annotations as json file for further processing + with open(os.path.join(output_dir, "annotations.json"), "w") as f: + json.dump(annotations, f) + + vidcap.release() diff --git a/comps/dataprep/redis/README.md b/comps/dataprep/redis/README.md index 4617dfa25c..76361a236f 100644 --- a/comps/dataprep/redis/README.md +++ b/comps/dataprep/redis/README.md @@ -1,6 +1,8 @@ # Dataprep Microservice with Redis -For dataprep microservice, we provide two frameworks: `Langchain` and `LlamaIndex`. We also provide `Langchain_ray` which uses ray to parallel the data prep for multi-file performance improvement(observed 5x - 15x speedup by processing 1000 files/links.). +We have provided dataprep microservice for multimodal data input (e.g., text and image) [here](multimodal_langchain/README.md). + +For dataprep microservice for text input, we provide here two frameworks: `Langchain` and `LlamaIndex`. We also provide `Langchain_ray` which uses ray to parallel the data prep for multi-file performance improvement(observed 5x - 15x speedup by processing 1000 files/links.). We organized these two folders in the same way, so you can use either framework for dataprep microservice with the following constructions. diff --git a/comps/dataprep/redis/multimodal_langchain/README.md b/comps/dataprep/redis/multimodal_langchain/README.md new file mode 100644 index 0000000000..19042c6ae4 --- /dev/null +++ b/comps/dataprep/redis/multimodal_langchain/README.md @@ -0,0 +1,213 @@ +# Dataprep Microservice for Multimodal Data with Redis + +This `dataprep` microservice accepts videos (mp4 files) and their transcripts (optional) from the user and ingests them into Redis vectorstore. + +# ๐Ÿš€1. Start Microservice with Python๏ผˆOption 1๏ผ‰ + +## 1.1 Install Requirements + +```bash +# Install ffmpeg static build +wget https://johnvansickle.com/ffmpeg/builds/ffmpeg-git-amd64-static.tar.xz +mkdir ffmpeg-git-amd64-static +tar -xvf ffmpeg-git-amd64-static.tar.xz -C ffmpeg-git-amd64-static --strip-components 1 +export PATH=$(pwd)/ffmpeg-git-amd64-static:$PATH +cp $(pwd)/ffmpeg-git-amd64-static/ffmpeg /usr/local/bin/ + +pip install -r requirements.txt +``` + +## 1.2 Start Redis Stack Server + +Please refer to this [readme](../../../vectorstores/langchain/redis/README.md). + +## 1.3 Setup Environment Variables + +```bash +export your_ip=$(hostname -I | awk '{print $1}') +export REDIS_URL="redis://${your_ip}:6379" +export INDEX_NAME=${your_redis_index_name} +export PYTHONPATH=${path_to_comps} +``` + +## 1.4 Start LVM Microservice (Optional) + +This is required only if you are going to consume the _generate_captions_ API of this microservice as in [Section 4.3](#43-consume-generate_captions-api). + +Please refer to this [readme](../../../lvms/README.md) to start the LVM microservice. +After LVM is up, set up environment variables. + +```bash +export your_ip=$(hostname -I | awk '{print $1}') +export LVM_ENDPOINT="http://${your_ip}:9399/v1/lvm" +``` + +## 1.5 Start Data Preparation Microservice for Redis with Python Script + +Start document preparation microservice for Redis with below command. + +```bash +python prepare_videodoc_redis.py +``` + +# ๐Ÿš€2. Start Microservice with Docker (Option 2) + +## 2.1 Start Redis Stack Server + +Please refer to this [readme](../../../vectorstores/langchain/redis/README.md). + +## 2.2 Start LVM Microservice (Optional) + +This is required only if you are going to consume the _generate_captions_ API of this microservice as described [here](#43-consume-generate_captions-api). + +Please refer to this [readme](../../../lvms/README.md) to start the LVM microservice. +After LVM is up, set up environment variables. + +```bash +export your_ip=$(hostname -I | awk '{print $1}') +export LVM_ENDPOINT="http://${your_ip}:9399/v1/lvm" +``` + +## 2.3 Setup Environment Variables + +```bash +export your_ip=$(hostname -I | awk '{print $1}') +export EMBEDDING_MODEL_ID="BridgeTower/bridgetower-large-itm-mlm-itc" +export REDIS_URL="redis://${your_ip}:6379" +export WHISPER_MODEL="base" +export INDEX_NAME=${your_redis_index_name} +export HUGGINGFACEHUB_API_TOKEN=${your_hf_api_token} +``` + +## 2.4 Build Docker Image + +```bash +cd ../../../../ +docker build -t opea/dataprep-redis:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/dataprep/redis/multimodal_langchain/docker/Dockerfile . +``` + +## 2.5 Run Docker with CLI (Option A) + +```bash +docker run -d --name="dataprep-redis-server" -p 6007:6007 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e LVM_ENDPOINT=$LVM_ENDPOINT -e HUGGINGFACEHUB_API_TOKEN=$HUGGINGFACEHUB_API_TOKEN opea/dataprep-redis:latest +``` + +## 2.6 Run with Docker Compose (Option B - deprecated, will move to genAIExample in future) + +```bash +cd comps/dataprep/redis/multimodal_langchain/docker +docker compose -f docker-compose-dataprep-redis.yaml up -d +``` + +# ๐Ÿš€3. Status Microservice + +```bash +docker container logs -f dataprep-redis-server +``` + +# ๐Ÿš€4. Consume Microservice + +Once this dataprep microservice is started, user can use the below commands to invoke the microservice to convert videos and their transcripts (optional) to embeddings and save to the Redis vector store. + +This mircroservice has provided 3 different ways for users to ingest videos into Redis vector store corresponding to the 3 use cases. + +## 4.1 Consume _videos_with_transcripts_ API + +**Use case:** This API is used when a transcript file (under `.vtt` format) is available for each video. + +**Important notes:** + +- Make sure the file paths after `files=@` are correct. +- Every transcript file's name must be identical with its corresponding video file's name (except their extension .vtt and .mp4). For example, `video1.mp4` and `video1.vtt`. Otherwise, if `video1.vtt` is not included correctly in this API call, this microservice will return error `No captions file video1.vtt found for video1.mp4`. + +### Single video-transcript pair upload + +```bash +curl -X POST \ + -H "Content-Type: multipart/form-data" \ + -F "files=@./video1.mp4" \ + -F "files=@./video1.vtt" \ + http://localhost:6007/v1/videos_with_transcripts +``` + +### Multiple video-transcript pair upload + +```bash +curl -X POST \ + -H "Content-Type: multipart/form-data" \ + -F "files=@./video1.mp4" \ + -F "files=@./video1.vtt" \ + -F "files=@./video2.mp4" \ + -F "files=@./video2.vtt" \ + http://localhost:6007/v1/videos_with_transcripts +``` + +## 4.2 Consume _generate_transcripts_ API + +**Use case:** This API should be used when a video has meaningful audio or recognizable speech but its transcript file is not available. + +In this use case, this microservice will use [`whisper`](https://openai.com/index/whisper/) model to generate the `.vtt` transcript for the video. + +### Single video upload + +```bash +curl -X POST \ + -H "Content-Type: multipart/form-data" \ + -F "files=@./video1.mp4" \ + http://localhost:6007/v1/generate_transcripts +``` + +### Multiple video upload + +```bash +curl -X POST \ + -H "Content-Type: multipart/form-data" \ + -F "files=@./video1.mp4" \ + -F "files=@./video2.mp4" \ + http://localhost:6007/v1/generate_transcripts +``` + +## 4.3 Consume _generate_captions_ API + +**Use case:** This API should be used when a video does not have meaningful audio or does not have audio. + +In this use case, transcript either does not provide any meaningful information or does not exist. Thus, it is preferred to leverage a LVM microservice to summarize the video frames. + +- Single video upload + +```bash +curl -X POST \ + -H "Content-Type: multipart/form-data" \ + -F "files=@./video1.mp4" \ + http://localhost:6007/v1/generate_captions +``` + +- Multiple video upload + +```bash +curl -X POST \ + -H "Content-Type: multipart/form-data" \ + -F "files=@./video1.mp4" \ + -F "files=@./video2.mp4" \ + http://localhost:6007/v1/generate_captions +``` + +## 4.4 Consume get_videos API + +To get names of uploaded videos, use the following command. + +```bash +curl -X POST \ + -H "Content-Type: application/json" \ + http://localhost:6007/v1/dataprep/get_videos +``` + +## 4.5 Consume delete_videos API + +To delete uploaded videos and clear the database, use the following command. + +```bash +curl -X POST \ + -H "Content-Type: application/json" \ + http://localhost:6007/v1/dataprep/delete_videos +``` diff --git a/comps/dataprep/redis/multimodal_langchain/__init__.py b/comps/dataprep/redis/multimodal_langchain/__init__.py new file mode 100644 index 0000000000..916f3a44b2 --- /dev/null +++ b/comps/dataprep/redis/multimodal_langchain/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/dataprep/redis/multimodal_langchain/config.py b/comps/dataprep/redis/multimodal_langchain/config.py new file mode 100644 index 0000000000..0cae533788 --- /dev/null +++ b/comps/dataprep/redis/multimodal_langchain/config.py @@ -0,0 +1,70 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os + +# Models +EMBED_MODEL = os.getenv("EMBED_MODEL", "BridgeTower/bridgetower-large-itm-mlm-itc") +WHISPER_MODEL = os.getenv("WHISPER_MODEL", "small") + +# Redis Connection Information +REDIS_HOST = os.getenv("REDIS_HOST", "localhost") +REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) + +# Lvm Microservice Information +LVM_ENDPOINT = os.getenv("LVM_ENDPOINT", "http://localhost:9399/v1/lvm") + + +def get_boolean_env_var(var_name, default_value=False): + """Retrieve the boolean value of an environment variable. + + Args: + var_name (str): The name of the environment variable to retrieve. + default_value (bool): The default value to return if the variable + is not found. + Returns: + bool: The value of the environment variable, interpreted as a boolean. + """ + true_values = {"true", "1", "t", "y", "yes"} + false_values = {"false", "0", "f", "n", "no"} + + # Retrieve the environment variable's value + value = os.getenv(var_name, "").lower() + + # Decide the boolean value based on the content of the string + if value in true_values: + return True + elif value in false_values: + return False + else: + return default_value + + +def format_redis_conn_from_env(): + redis_url = os.getenv("REDIS_URL", None) + if redis_url: + return redis_url + else: + using_ssl = get_boolean_env_var("REDIS_SSL", False) + start = "rediss://" if using_ssl else "redis://" + + # if using RBAC + password = os.getenv("REDIS_PASSWORD", None) + username = os.getenv("REDIS_USERNAME", "default") + if password is not None: + start += f"{username}:{password}@" + + return start + f"{REDIS_HOST}:{REDIS_PORT}" + + +REDIS_URL = format_redis_conn_from_env() + +# Vector Index Configuration +INDEX_NAME = os.getenv("INDEX_NAME", "mm-rag-redis") + +current_file_path = os.path.abspath(__file__) +parent_dir = os.path.dirname(current_file_path) +REDIS_SCHEMA = os.getenv("REDIS_SCHEMA", "schema.yml") +TIMEOUT_SECONDS = int(os.getenv("TIMEOUT_SECONDS", 600)) +schema_path = os.path.join(parent_dir, REDIS_SCHEMA) +INDEX_SCHEMA = schema_path diff --git a/comps/dataprep/redis/multimodal_langchain/docker/Dockerfile b/comps/dataprep/redis/multimodal_langchain/docker/Dockerfile new file mode 100644 index 0000000000..a6c2be7e3b --- /dev/null +++ b/comps/dataprep/redis/multimodal_langchain/docker/Dockerfile @@ -0,0 +1,37 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +FROM python:3.11-slim + +ENV LANG=C.UTF-8 + +ARG ARCH="cpu" + +RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \ + build-essential \ + libgl1-mesa-glx \ + libjemalloc-dev \ + default-jre \ + wget \ + vim + +# Install ffmpeg static build +RUN cd /root && wget https://johnvansickle.com/ffmpeg/builds/ffmpeg-git-amd64-static.tar.xz && \ + mkdir ffmpeg-git-amd64-static && tar -xvf ffmpeg-git-amd64-static.tar.xz -C ffmpeg-git-amd64-static --strip-components 1 && \ + export PATH=/root/ffmpeg-git-amd64-static:$PATH && \ + cp /root/ffmpeg-git-amd64-static/ffmpeg /usr/local/bin/ + +RUN mkdir -p /home/user + +COPY comps /home/user/comps + +RUN pip install --no-cache-dir --upgrade pip setuptools && \ + if [ ${ARCH} = "cpu" ]; then pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu; fi && \ + pip install --no-cache-dir -r /home/user/comps/dataprep/redis/multimodal_langchain/requirements.txt + +ENV PYTHONPATH=$PYTHONPATH:/home/user + +WORKDIR /home/user/comps/dataprep/redis/multimodal_langchain + +ENTRYPOINT ["python", "prepare_videodoc_redis.py"] + diff --git a/comps/dataprep/redis/multimodal_langchain/docker/docker-compose-dataprep-redis.yaml b/comps/dataprep/redis/multimodal_langchain/docker/docker-compose-dataprep-redis.yaml new file mode 100644 index 0000000000..d98ddbd878 --- /dev/null +++ b/comps/dataprep/redis/multimodal_langchain/docker/docker-compose-dataprep-redis.yaml @@ -0,0 +1,29 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +version: "3" +services: + redis-vector-db: + image: redis/redis-stack:7.2.0-v9 + container_name: redis-vector-db + ports: + - "6379:6379" + - "8001:8001" + dataprep-redis: + image: opea/dataprep-redis:latest + container_name: dataprep-redis-server + ports: + - "6007:6007" + ipc: host + environment: + no_proxy: ${no_proxy} + http_proxy: ${http_proxy} + https_proxy: ${https_proxy} + REDIS_URL: ${REDIS_URL} + INDEX_NAME: ${INDEX_NAME} + LVM_ENDPOINT: ${LVM_ENDPOINT} + restart: unless-stopped + +networks: + default: + driver: bridge diff --git a/comps/dataprep/redis/multimodal_langchain/prepare_videodoc_redis.py b/comps/dataprep/redis/multimodal_langchain/prepare_videodoc_redis.py new file mode 100644 index 0000000000..d658c58b0c --- /dev/null +++ b/comps/dataprep/redis/multimodal_langchain/prepare_videodoc_redis.py @@ -0,0 +1,527 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os +import shutil +import subprocess +import time +import uuid +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Type, Union + +from config import EMBED_MODEL, INDEX_NAME, INDEX_SCHEMA, LVM_ENDPOINT, REDIS_URL, WHISPER_MODEL +from fastapi import File, HTTPException, UploadFile +from langchain_community.utilities.redis import _array_to_buffer +from langchain_community.vectorstores import Redis +from langchain_community.vectorstores.redis.base import _generate_field_schema, _prepare_metadata +from langchain_community.vectorstores.redis.schema import read_schema +from langchain_core.embeddings import Embeddings +from langchain_core.utils import get_from_dict_or_env +from PIL import Image + +from comps import opea_microservices, register_microservice +from comps.dataprep.multimodal_utils import ( + clear_upload_folder, + convert_video_to_audio, + create_upload_folder, + delete_audio_file, + extract_frames_and_annotations_from_transcripts, + extract_frames_and_generate_captions, + extract_transcript_from_audio, + generate_video_id, + load_json_file, + load_whisper_model, + write_vtt, +) +from comps.embeddings.multimodal_embeddings.bridgetower.bridgetower_embedding import BridgeTowerEmbedding + +device = "cpu" +upload_folder = "./uploaded_files/" + + +class MultimodalRedis(Redis): + """Redis vector database to process multimodal data.""" + + @classmethod + def from_text_image_pairs_return_keys( + cls: Type[Redis], + texts: List[str], + images: List[str], + embedding: Embeddings = BridgeTowerEmbedding, + metadatas: Optional[List[dict]] = None, + index_name: Optional[str] = None, + index_schema: Optional[Union[Dict[str, str], str, os.PathLike]] = None, + vector_schema: Optional[Dict[str, Union[str, int]]] = None, + **kwargs: Any, + ): + """ + Args: + texts (List[str]): List of texts to add to the vectorstore. + images (List[str]): List of path-to-images to add to the vectorstore. + embedding (Embeddings): Embeddings to use for the vectorstore. + metadatas (Optional[List[dict]], optional): Optional list of metadata + dicts to add to the vectorstore. Defaults to None. + index_name (Optional[str], optional): Optional name of the index to + create or add to. Defaults to None. + index_schema (Optional[Union[Dict[str, str], str, os.PathLike]], optional): + Optional fields to index within the metadata. Overrides generated + schema. Defaults to None. + vector_schema (Optional[Dict[str, Union[str, int]]], optional): Optional + vector schema to use. Defaults to None. + **kwargs (Any): Additional keyword arguments to pass to the Redis client. + Returns: + Tuple[Redis, List[str]]: Tuple of the Redis instance and the keys of + the newly created documents. + Raises: + ValueError: If the number of texts does not equal the number of images. + ValueError: If the number of metadatas does not match the number of texts. + """ + # the length of texts must be equal to the length of images + if len(texts) != len(images): + raise ValueError(f"the len of captions {len(texts)} does not equal the len of images {len(images)}") + + redis_url = get_from_dict_or_env(kwargs, "redis_url", "REDIS_URL") + + if "redis_url" in kwargs: + kwargs.pop("redis_url") + + # flag to use generated schema + if "generate" in kwargs: + kwargs.pop("generate") + + # see if the user specified keys + keys = None + if "keys" in kwargs: + keys = kwargs.pop("keys") + + # Name of the search index if not given + if not index_name: + index_name = uuid.uuid4().hex + + # type check for metadata + if metadatas: + if isinstance(metadatas, list) and len(metadatas) != len(texts): # type: ignore # noqa: E501 + raise ValueError("Number of metadatas must match number of texts") + if not (isinstance(metadatas, list) and isinstance(metadatas[0], dict)): + raise ValueError("Metadatas must be a list of dicts") + generated_schema = _generate_field_schema(metadatas[0]) + + if not index_schema: + index_schema = generated_schema + + # Create instance + instance = cls( + redis_url, + index_name, + embedding, + index_schema=index_schema, + vector_schema=vector_schema, + **kwargs, + ) + # Add data to Redis + keys = instance.add_text_image_pairs(texts, images, metadatas, keys=keys) + return instance, keys + + def add_text_image_pairs( + self, + texts: Iterable[str], + images: Iterable[str], + metadatas: Optional[List[dict]] = None, + embeddings: Optional[List[List[float]]] = None, + batch_size: int = 2, + clean_metadata: bool = True, + **kwargs: Any, + ) -> List[str]: + """Add more embeddings of text-image pairs to the vectorstore. + + Args: + texts (Iterable[str]): Iterable of strings/text to add to the vectorstore. + images: Iterable[str]: Iterable of strings/text of path-to-image to add to the vectorstore. + metadatas (Optional[List[dict]], optional): Optional list of metadatas. + Defaults to None. + embeddings (Optional[List[List[float]]], optional): Optional pre-generated + embeddings. Defaults to None. + keys (List[str]) or ids (List[str]): Identifiers of entries. + Defaults to None. + batch_size (int, optional): Batch size to use for writes. Defaults to 1000. + Returns: + List[str]: List of ids added to the vectorstore + """ + ids = [] + # Get keys or ids from kwargs + # Other vectorstores use ids + keys_or_ids = kwargs.get("keys", kwargs.get("ids")) + + # type check for metadata + if metadatas: + if isinstance(metadatas, list) and len(metadatas) != len(texts): # type: ignore # noqa: E501 + raise ValueError("Number of metadatas must match number of texts") + if not (isinstance(metadatas, list) and isinstance(metadatas[0], dict)): + raise ValueError("Metadatas must be a list of dicts") + pil_imgs = [Image.open(img) for img in images] + if not embeddings: + embeddings = self._embeddings.embed_image_text_pairs(list(texts), pil_imgs, batch_size=batch_size) + self._create_index_if_not_exist(dim=len(embeddings[0])) + + # Write data to redis + pipeline = self.client.pipeline(transaction=False) + for i, text in enumerate(texts): + # Use provided values by default or fallback + key = keys_or_ids[i] if keys_or_ids else str(uuid.uuid4().hex) + if not key.startswith(self.key_prefix + ":"): + key = self.key_prefix + ":" + key + metadata = metadatas[i] if metadatas else {} + metadata = _prepare_metadata(metadata) if clean_metadata else metadata + pipeline.hset( + key, + mapping={ + self._schema.content_key: text, + self._schema.content_vector_key: _array_to_buffer(embeddings[i], self._schema.vector_dtype), + **metadata, + }, + ) + ids.append(key) + + # Write batch + if i % batch_size == 0: + pipeline.execute() + + # Cleanup final batch + pipeline.execute() + return ids + + +def prepare_data_and_metadata_from_annotation( + annotation, path_to_frames, title, num_transcript_concat_for_ingesting=2, num_transcript_concat_for_inference=7 +): + text_list = [] + image_list = [] + metadatas = [] + for i, frame in enumerate(annotation): + frame_index = frame["sub_video_id"] + path_to_frame = os.path.join(path_to_frames, f"frame_{frame_index}.jpg") + # augment this frame's transcript with a reasonable number of neighboring frames' transcripts helps semantic retrieval + lb_ingesting = max(0, i - num_transcript_concat_for_ingesting) + ub_ingesting = min(len(annotation), i + num_transcript_concat_for_ingesting + 1) + caption_for_ingesting = " ".join([annotation[j]["caption"] for j in range(lb_ingesting, ub_ingesting)]) + + # augment this frame's transcript with more neighboring frames' transcript to provide more context to LVM for question answering + lb_inference = max(0, i - num_transcript_concat_for_inference) + ub_inference = min(len(annotation), i + num_transcript_concat_for_inference + 1) + caption_for_inference = " ".join([annotation[j]["caption"] for j in range(lb_inference, ub_inference)]) + + video_id = frame["video_id"] + b64_img_str = frame["b64_img_str"] + time_of_frame = frame["time"] + embedding_type = "pair" + source_video = frame["video_name"] + + text_list.append(caption_for_ingesting) + image_list.append(path_to_frame) + metadatas.append( + { + "content": caption_for_ingesting, + "b64_img_str": b64_img_str, + "video_id": video_id, + "source_video": source_video, + "time_of_frame_ms": float(time_of_frame), + "embedding_type": embedding_type, + "title": title, + "transcript_for_inference": caption_for_inference, + } + ) + + return text_list, image_list, metadatas + + +def ingest_multimodal(videoname, data_folder, embeddings): + """Ingest text image pairs to Redis from the data/ directory that consists of frames and annotations.""" + data_folder = os.path.abspath(data_folder) + annotation_file_path = os.path.join(data_folder, "annotations.json") + path_to_frames = os.path.join(data_folder, "frames") + + annotation = load_json_file(annotation_file_path) + + # prepare data to ingest + text_list, image_list, metadatas = prepare_data_and_metadata_from_annotation(annotation, path_to_frames, videoname) + + MultimodalRedis.from_text_image_pairs_return_keys( + texts=[f"From {videoname}. " + text for text in text_list], + images=image_list, + embedding=embeddings, + metadatas=metadatas, + index_name=INDEX_NAME, + index_schema=INDEX_SCHEMA, + redis_url=REDIS_URL, + ) + + +def drop_index(index_name, redis_url=REDIS_URL): + print(f"dropping index {index_name}") + try: + assert Redis.drop_index(index_name=index_name, delete_documents=True, redis_url=redis_url) + print(f"index {index_name} deleted") + except Exception as e: + print(f"index {index_name} delete failed: {e}") + return False + return True + + +@register_microservice( + name="opea_service@prepare_videodoc_redis", endpoint="/v1/generate_transcripts", host="0.0.0.0", port=6007 +) +async def ingest_videos_generate_transcripts(files: List[UploadFile] = File(None)): + """Upload videos with speech, generate transcripts using whisper and ingest into redis.""" + + if files: + video_files = [] + for file in files: + if os.path.splitext(file.filename)[1] == ".mp4": + video_files.append(file) + else: + raise HTTPException( + status_code=400, detail=f"File {file.filename} is not an mp4 file. Please upload mp4 files only." + ) + + for video_file in video_files: + st = time.time() + print(f"Processing video {video_file.filename}") + + # Assign unique identifier to video + video_id = generate_video_id() + + # Create video file name by appending identifier + video_name = os.path.splitext(video_file.filename)[0] + video_file_name = f"{video_name}_{video_id}.mp4" + video_dir_name = os.path.splitext(video_file_name)[0] + + # Save video file in upload_directory + with open(os.path.join(upload_folder, video_file_name), "wb") as f: + shutil.copyfileobj(video_file.file, f) + + # Extract temporary audio wav file from video mp4 + audio_file = video_dir_name + ".wav" + print(f"Extracting {audio_file}") + convert_video_to_audio( + os.path.join(upload_folder, video_file_name), os.path.join(upload_folder, audio_file) + ) + print(f"Done extracting {audio_file}") + + # Load whisper model + print("Loading whisper model....") + whisper_model = load_whisper_model(model_name=WHISPER_MODEL) + print("Done loading whisper!") + + # Extract transcript from audio + print("Extracting transcript from audio") + transcripts = extract_transcript_from_audio(whisper_model, os.path.join(upload_folder, audio_file)) + + # Save transcript as vtt file and delete audio file + vtt_file = video_dir_name + ".vtt" + write_vtt(transcripts, os.path.join(upload_folder, vtt_file)) + delete_audio_file(os.path.join(upload_folder, audio_file)) + print("Done extracting transcript.") + + # Store frames and caption annotations in a new directory + print("Extracting frames and generating annotation") + extract_frames_and_annotations_from_transcripts( + video_id, + os.path.join(upload_folder, video_file_name), + os.path.join(upload_folder, vtt_file), + os.path.join(upload_folder, video_dir_name), + ) + print("Done extracting frames and generating annotation") + # Delete temporary vtt file + os.remove(os.path.join(upload_folder, vtt_file)) + + # Ingest multimodal data into redis + print("Ingesting data to redis vector store") + ingest_multimodal(video_name, os.path.join(upload_folder, video_dir_name), embeddings) + + # Delete temporary video directory containing frames and annotations + shutil.rmtree(os.path.join(upload_folder, video_dir_name)) + + print(f"Processed video {video_file.filename}") + end = time.time() + print(str(end - st)) + + return {"status": 200, "message": "Data preparation succeeded"} + + raise HTTPException(status_code=400, detail="Must provide at least one video (.mp4) file.") + + +@register_microservice( + name="opea_service@prepare_videodoc_redis", endpoint="/v1/generate_captions", host="0.0.0.0", port=6007 +) +async def ingest_videos_generate_caption(files: List[UploadFile] = File(None)): + """Upload videos without speech (only background music or no audio), generate captions using lvm microservice and ingest into redis.""" + + if files: + video_files = [] + for file in files: + if os.path.splitext(file.filename)[1] == ".mp4": + video_files.append(file) + else: + raise HTTPException( + status_code=400, detail=f"File {file.filename} is not an mp4 file. Please upload mp4 files only." + ) + + for video_file in video_files: + print(f"Processing video {video_file.filename}") + + # Assign unique identifier to video + video_id = generate_video_id() + + # Create video file name by appending identifier + video_name = os.path.splitext(video_file.filename)[0] + video_file_name = f"{video_name}_{video_id}.mp4" + video_dir_name = os.path.splitext(video_file_name)[0] + + # Save video file in upload_directory + with open(os.path.join(upload_folder, video_file_name), "wb") as f: + shutil.copyfileobj(video_file.file, f) + + # Store frames and caption annotations in a new directory + extract_frames_and_generate_captions( + video_id, + os.path.join(upload_folder, video_file_name), + LVM_ENDPOINT, + os.path.join(upload_folder, video_dir_name), + ) + + # Ingest multimodal data into redis + ingest_multimodal(video_name, os.path.join(upload_folder, video_dir_name), embeddings) + + # Delete temporary video directory containing frames and annotations + # shutil.rmtree(os.path.join(upload_folder, video_dir_name)) + + print(f"Processed video {video_file.filename}") + + return {"status": 200, "message": "Data preparation succeeded"} + + raise HTTPException(status_code=400, detail="Must provide at least one video (.mp4) file.") + + +@register_microservice( + name="opea_service@prepare_videodoc_redis", + endpoint="/v1/videos_with_transcripts", + host="0.0.0.0", + port=6007, +) +async def ingest_videos_with_transcripts(files: List[UploadFile] = File(None)): + + if files: + video_files, video_file_names = [], [] + captions_files, captions_file_names = [], [] + for file in files: + if os.path.splitext(file.filename)[1] == ".mp4": + video_files.append(file) + video_file_names.append(file.filename) + elif os.path.splitext(file.filename)[1] == ".vtt": + captions_files.append(file) + captions_file_names.append(file.filename) + else: + print(f"Skipping file {file.filename} because of unsupported format.") + + # Check if every video file has a captions file + for video_file_name in video_file_names: + file_prefix = os.path.splitext(video_file_name)[0] + if (file_prefix + ".vtt") not in captions_file_names: + raise HTTPException( + status_code=400, detail=f"No captions file {file_prefix}.vtt found for {video_file_name}" + ) + + if len(video_files) == 0: + return HTTPException( + status_code=400, + detail="The uploaded files have unsupported formats. Please upload at least one video file (.mp4) with captions (.vtt)", + ) + + for video_file in video_files: + print(f"Processing video {video_file.filename}") + + # Assign unique identifier to video + video_id = generate_video_id() + + # Create video file name by appending identifier + video_name = os.path.splitext(video_file.filename)[0] + video_file_name = f"{video_name}_{video_id}.mp4" + video_dir_name = os.path.splitext(video_file_name)[0] + + # Save video file in upload_directory + with open(os.path.join(upload_folder, video_file_name), "wb") as f: + shutil.copyfileobj(video_file.file, f) + + # Save captions file in upload directory + vtt_file_name = os.path.splitext(video_file.filename)[0] + ".vtt" + vtt_idx = None + for idx, caption_file in enumerate(captions_files): + if caption_file.filename == vtt_file_name: + vtt_idx = idx + break + vtt_file = video_dir_name + ".vtt" + with open(os.path.join(upload_folder, vtt_file), "wb") as f: + shutil.copyfileobj(captions_files[vtt_idx].file, f) + + # Store frames and caption annotations in a new directory + extract_frames_and_annotations_from_transcripts( + video_id, + os.path.join(upload_folder, video_file_name), + os.path.join(upload_folder, vtt_file), + os.path.join(upload_folder, video_dir_name), + ) + + # Delete temporary vtt file + os.remove(os.path.join(upload_folder, vtt_file)) + + # Ingest multimodal data into redis + ingest_multimodal(video_name, os.path.join(upload_folder, video_dir_name), embeddings) + + # Delete temporary video directory containing frames and annotations + shutil.rmtree(os.path.join(upload_folder, video_dir_name)) + + print(f"Processed video {video_file.filename}") + + return {"status": 200, "message": "Data preparation succeeded"} + + raise HTTPException( + status_code=400, detail="Must provide at least one pair consisting of video (.mp4) and captions (.vtt)" + ) + + +@register_microservice( + name="opea_service@prepare_videodoc_redis", endpoint="/v1/dataprep/get_videos", host="0.0.0.0", port=6007 +) +async def rag_get_file_structure(): + """Returns list of names of uploaded videos saved on the server.""" + + if not Path(upload_folder).exists(): + print("No file uploaded, return empty list.") + return [] + + uploaded_videos = os.listdir(upload_folder) + return uploaded_videos + + +@register_microservice( + name="opea_service@prepare_videodoc_redis", endpoint="/v1/dataprep/delete_videos", host="0.0.0.0", port=6007 +) +async def delete_videos(): + """Delete all uploaded videos along with redis index.""" + index_deleted = drop_index(index_name=INDEX_NAME) + + if not index_deleted: + raise HTTPException(status_code=409, detail="Uploaded videos could not be deleted. Index does not exist") + + clear_upload_folder(upload_folder) + print("Successfully deleted all uploaded videos.") + return {"status": True} + + +if __name__ == "__main__": + create_upload_folder(upload_folder) + # Load embeddings model + print("Initializing BridgeTower model as embedder...") + embeddings = BridgeTowerEmbedding(model_name=EMBED_MODEL, device=device) + print("Done initialization of embedder!") + opea_microservices["opea_service@prepare_videodoc_redis"].start() diff --git a/comps/dataprep/redis/multimodal_langchain/requirements.txt b/comps/dataprep/redis/multimodal_langchain/requirements.txt new file mode 100644 index 0000000000..574d2952aa --- /dev/null +++ b/comps/dataprep/redis/multimodal_langchain/requirements.txt @@ -0,0 +1,19 @@ +docarray[full] +fastapi +langchain==0.1.12 +langchain_benchmarks +moviepy +openai-whisper +opencv-python +opentelemetry-api +opentelemetry-exporter-otlp +opentelemetry-sdk +Pillow +prometheus-fastapi-instrumentator +pydantic==2.8.2 +python-multipart +redis +shortuuid +transformers +uvicorn +webvtt-py diff --git a/comps/dataprep/redis/multimodal_langchain/schema.yml b/comps/dataprep/redis/multimodal_langchain/schema.yml new file mode 100644 index 0000000000..32f4a79ae4 --- /dev/null +++ b/comps/dataprep/redis/multimodal_langchain/schema.yml @@ -0,0 +1,19 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +text: + - name: content + - name: b64_img_str + - name: video_id + - name: source_video + - name: embedding_type + - name: title + - name: transcript_for_inference +numeric: + - name: time_of_frame_ms +vector: + - name: content_vector + algorithm: HNSW + datatype: FLOAT32 + dims: 512 + distance_metric: COSINE diff --git a/comps/embeddings/multimodal_embeddings/README.md b/comps/embeddings/multimodal_embeddings/README.md new file mode 100644 index 0000000000..c2cf2b875c --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/README.md @@ -0,0 +1,185 @@ +# Multimodal Embeddings Microservice + +The Multimodal Embedding Microservice is designed to efficiently convert pairs of textual string and image into vectorized embeddings, facilitating seamless integration into various machine learning and data processing workflows. This service utilizes advanced algorithms to generate high-quality embeddings that capture the joint semantic essence of the input text-and-image pairs, making it ideal for applications in multi-modal data processing, information retrieval, and similar fields. + +Key Features: + +**High Performance**: Optimized for quick and reliable conversion of textual data and image inputs into vector embeddings. + +**Scalability**: Built to handle high volumes of requests simultaneously, ensuring robust performance even under heavy loads. + +**Ease of Integration**: Provides a simple and intuitive API, allowing for straightforward integration into existing systems and workflows. + +**Customizable**: Supports configuration and customization to meet specific use case requirements, including different embedding models and preprocessing techniques. + +Users are albe to configure and build embedding-related services according to their actual needs. + +## ๐Ÿš€1. Start Microservice with Python (Option 1) + +Currently, we provide two ways to implement the multimodal embedding service: + +1. Build the multimodal embedding model **locally** from the server, which is faster, but takes up memory on the local server. +2. Build it based on the multimodal embedding inference endpoint (**MMEI endpoint**), which provides more flexibility, but may bring some network latency. + +For both of the implementations, you need to install requirements first. + +### 1.1 Install Requirements + +```bash +# run with langchain +pip install -r multimodal_langchain/requirements.txt +``` + +### 1.2 Start Embedding Service + +You can select one of the following to start the multimodal embedding service: + +**Start Multimodal Embedding Service with MMEI** + +First, you need to start a MMEI service. + +```bash +export your_mmei_port=8080 +export EMBEDDER_PORT=$your_mmei_port +``` + +Currently, we employ [**BridgeTower**](https://huggingface.co/BridgeTower/bridgetower-large-itm-mlm-gaudi) model for MMEI and provide two ways to start MMEI: + +1. Start MMEI on Gaudi2 HPU +2. Start MMEI on Xeon CPU (if Gaudi2 HPU is not available) + +- Gaudi2 HPU + +```bash +cd ../../.. +docker build -t opea/bridgetower-embedder:latest --build-arg EMBEDDER_PORT=$EMBEDDER_PORT --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile_hpu . +cd comps/embeddings/multimodal_embeddings/bridgetower/docker/ +docker compose -f docker_compose_bridgetower_embedding_endpoint.yaml up -d +``` + +- Xeon CPU + +```bash +cd ../../.. +docker build -t opea/bridgetower-embedder:latest --build-arg EMBEDDER_PORT=$EMBEDDER_PORT --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile . +cd comps/embeddings/multimodal_embeddings/bridgetower/docker/ +docker compose -f docker_compose_bridgetower_embedding_endpoint.yaml up -d +``` + +Then you need to test your MMEI service using the following commands: + +```bash +curl http://localhost:$your_mmei_port/v1/encode \ + -X POST \ + -H "Content-Type:application/json" \ + -d '{"text":"This is example"}' +``` + +Start the embedding service with MMEI_EMBEDDING_ENDPOINT. + +```bash +# run with langchain +cd multimodal_langchain +export MMEI_EMBEDDING_ENDPOINT="http://localhost:$your_mmei_port/v1/encode" +export your_embedding_port_microservice=6600 +export MM_EMBEDDING_PORT_MICROSERVICE=$your_embedding_port_microservice +python mm_embedding_mmei.py +``` + +**Start Embedding Service with Local Model** + +```bash +# run with langchain +cd multimodal_langchain +export your_embedding_port_microservice=6600 +export MM_EMBEDDING_PORT_MICROSERVICE=$your_embedding_port_microservice +python local_mm_embedding.py +``` + +## ๐Ÿš€2. Start Microservice with Docker (Option 2) + +### 2.1 Start Multimodal Embedding Inference (MMEI) Service + +First, you need to start a MMEI service. + +```bash +export your_mmei_port=8080 +export EMBEDDER_PORT=$your_mmei_port +``` + +Currently, we employ [**BridgeTower**](https://huggingface.co/BridgeTower/bridgetower-large-itm-mlm-gaudi) model for MMEI and provide two ways to start MMEI: + +1. Start MMEI on Gaudi2 HPU +2. Start MMEI on Xeon CPU (if Gaudi2 HPU is not available) + +- Gaudi2 HPU + +```bash +cd ../../.. +docker build -t opea/bridgetower-embedder:latest --build-arg EMBEDDER_PORT=$EMBEDDER_PORT --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile_hpu . +cd comps/embeddings/multimodal_embeddings/bridgetower/docker/ +docker compose -f docker_compose_bridgetower_embedding_endpoint.yaml up -d +``` + +- Xeon CPU + +```bash +cd ../../.. +docker build -t opea/bridgetower-embedder:latest --build-arg EMBEDDER_PORT=$EMBEDDER_PORT --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile . +cd comps/embeddings/multimodal_embeddings/bridgetower/docker/ +docker compose -f docker_compose_bridgetower_embedding_endpoint.yaml up -d +``` + +Then you need to test your MMEI service using the following commands: + +```bash +curl http://localhost:$your_mmei_port/v1/encode \ + -X POST \ + -H "Content-Type:application/json" \ + -d '{"text":"This is example"}' +``` + +Export the `MMEI_EMBEDDING_ENDPOINT` for later usage: + +```bash +export ip_address=$(hostname -I | awk '{print $1}') +export MMEI_EMBEDDING_ENDPOINT="http://$ip_address:$your_mmei_port/v1/encode" +``` + +### 2.2 Build Docker Image + +#### Build Langchain Docker + +```bash +cd ../../.. +docker build -t opea/embedding-multimodal:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/Dockerfile . +``` + +### 2.3 Run Docker with Docker Compose + +```bash +cd multimodal_langchain/docker +export your_embedding_port_microservice=6600 +export MM_EMBEDDING_PORT_MICROSERVICE=$your_embedding_port_microservice +docker compose -f docker_compose_multimodal_embedding.yaml up -d +``` + +## ๐Ÿš€3. Consume Embedding Service + +### 2.2 Consume Embedding Service + +**Compute a joint embedding of an image-text pair** + +```bash +curl -X POST http://0.0.0.0:6600/v1/embeddings \ + -H "Content-Type: application/json" \ + -d '{"text": {"text" : "This is some sample text."}, "image" : {"url": "https://github.com/docarray/docarray/blob/main/tests/toydata/image-data/apple.png?raw=true"}}' +``` + +**Compute an embedding of a text** + +```bash +curl -X POST http://0.0.0.0:6600/v1/embeddings \ + -H "Content-Type: application/json" \ + -d '{"text" : "This is some sample text."}' +``` diff --git a/comps/embeddings/multimodal_embeddings/__init__.py b/comps/embeddings/multimodal_embeddings/__init__.py new file mode 100644 index 0000000000..916f3a44b2 --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/embeddings/multimodal_embeddings/bridgetower/__init__.py b/comps/embeddings/multimodal_embeddings/bridgetower/__init__.py new file mode 100644 index 0000000000..e64366189a --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/bridgetower/__init__.py @@ -0,0 +1,5 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from .bridgetower_embedding import BridgeTowerEmbedding +from .bridgetower_custom import BridgeTowerTextFeatureExtractor, BridgeTowerForITC diff --git a/comps/embeddings/multimodal_embeddings/bridgetower/bridgetower_custom.py b/comps/embeddings/multimodal_embeddings/bridgetower/bridgetower_custom.py new file mode 100644 index 0000000000..0a89c3fa9a --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/bridgetower/bridgetower_custom.py @@ -0,0 +1,243 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from collections import OrderedDict +from typing import List, Optional, Tuple, Union + +import torch +import torch.nn.functional as F +from torch import nn +from torchvision import transforms +from torchvision.transforms import CenterCrop, Compose, Normalize, Resize, ToTensor +from transformers import BridgeTowerModel, BridgeTowerPreTrainedModel +from transformers.modeling_outputs import SequenceClassifierOutput +from transformers.models.bridgetower.modeling_bridgetower import ( + BridgeTowerContrastiveHead, + BridgeTowerTextModel, + BridgeTowerVisionModel, +) + + +class LayerNorm(nn.LayerNorm): + """Subclass torch's LayerNorm to handle fp16.""" + + def forward(self, x: torch.Tensor): + orig_type = x.dtype + ret = super().forward(x.type(torch.float32)) + return ret.type(orig_type) + + +class BridgeTowerImageFeatureExtractor(nn.Module): + def __init__( + self, + patch_size=14, + width=1024, + resolution_after=294, + ckpt_path=None, + ): + super().__init__() + + self.conv1 = nn.Conv2d(in_channels=3, out_channels=width, kernel_size=patch_size, stride=patch_size, bias=False) + + scale = width**-0.5 + self.class_embedding = nn.Parameter(scale * torch.randn(width)) + self.positional_embedding = nn.Parameter(scale * torch.randn((resolution_after // patch_size) ** 2 + 1, width)) + self.ln_pre = LayerNorm(width) + + if ckpt_path is not None: + sd = torch.load(ckpt_path) + if "state_dict" in sd: + sd = sd["state_dict"] + print(f"Loading feature extractor checkpoint from {ckpt_path}") + self.load_state_dict(sd) + + def forward(self, x: torch.Tensor): + x = self.conv1(x) # shape = [*, width, grid, grid] + x = x.reshape(x.shape[0], x.shape[1], -1) # shape = [*, width, grid ** 2] + x = x.permute(0, 2, 1) # shape = [*, grid ** 2, width] + t = self.class_embedding.to(x.dtype) + torch.zeros(x.shape[0], 1, x.shape[-1], dtype=x.dtype, device=x.device) + x = torch.cat([t, x], dim=1) # shape = [*, grid ** 2 + 1, width] + x = x + self.positional_embedding.to(x.dtype) + x = self.ln_pre(x) + x = x.permute(1, 0, 2) # NLD -> LND + return x + + +class BridgeTowerITCHead(nn.Module): + def __init__(self, hidden_size, embed_size): + super().__init__() + self.fc = nn.Linear(hidden_size, embed_size) + + def forward(self, x): + x = self.fc(x) + return x + + +class _BridgeTowerTextModelWrapper(nn.Module): + def __init__(self, config): + super().__init__() + self.text_model = BridgeTowerTextModel(config) + + def forward(self, **kwargs): + return self.text_model(**kwargs) + + +class _BridgeTowerVisionModelWrapper(nn.Module): + def __init__(self, config): + super().__init__() + self.vision_model = BridgeTowerVisionModel(config.vision_config) + + if config.share_cross_modal_transformer_layers: + self.cross_modal_image_transform = nn.Linear(config.vision_config.hidden_size, config.hidden_size) + else: + self.cross_modal_image_transform = nn.ModuleList( + [ + nn.Linear(config.vision_config.hidden_size, config.hidden_size) + for _ in range(config.num_hidden_layers) + ] + ) + self.token_type_embeddings = nn.Embedding(2, config.hidden_size) + + def forward(self, **kwargs): + return self.vision_model(**kwargs) + + +class BridgeTowerVisionFeatureExtractor(BridgeTowerPreTrainedModel): + def __init__(self, config): + super().__init__(config) + + self.bridgetower = _BridgeTowerVisionModelWrapper(config) + self.itc_image_head = BridgeTowerContrastiveHead(config.hidden_size, config.contrastive_hidden_size) + + def forward( + self, + input_ids: Optional[torch.LongTensor] = None, + attention_mask: Optional[torch.FloatTensor] = None, + token_type_ids: Optional[torch.LongTensor] = None, + head_mask: Optional[torch.FloatTensor] = None, + inputs_embeds: Optional[torch.FloatTensor] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + labels: Optional[torch.LongTensor] = None, + ): + + outputs = self.bridgetower(input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True) + final_hidden_cls = outputs.hidden_states[-1][:, 0, :] + + image_embeds_with_ln = self.bridgetower.vision_model.visual.forward_post(final_hidden_cls) + image_token_type_embeddings = self.bridgetower.token_type_embeddings( + torch.full((1,), 1, dtype=torch.long, device=self.bridgetower.token_type_embeddings.weight.device) + ).expand_as(image_embeds_with_ln) + + image_embeds = self.bridgetower.cross_modal_image_transform(image_embeds_with_ln) + image_token_type_embeddings + + final_hidden_cls = F.normalize(self.itc_image_head(image_embeds), dim=-1, p=2) + + return final_hidden_cls + + +class BridgeTowerTextFeatureExtractor(BridgeTowerPreTrainedModel): + def __init__(self, config): + super().__init__(config) + + self.bridgetower = _BridgeTowerTextModelWrapper(config.text_config) + self.itc_text_head = BridgeTowerITCHead(config.hidden_size, config.contrastive_hidden_size) + + def forward( + self, + input_ids: Optional[torch.LongTensor] = None, + attention_mask: Optional[torch.FloatTensor] = None, + token_type_ids: Optional[torch.LongTensor] = None, + head_mask: Optional[torch.FloatTensor] = None, + inputs_embeds: Optional[torch.FloatTensor] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + labels: Optional[torch.LongTensor] = None, + ): + + outputs = self.bridgetower(input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True) + final_hidden_cls = outputs.hidden_states[-1][:, 0, :] + final_hidden_cls = F.normalize(self.itc_text_head(final_hidden_cls), dim=-1, p=2) + + return final_hidden_cls + + +class BridgeTowerForITC(BridgeTowerPreTrainedModel): + def __init__(self, config): + super().__init__(config) + + self.bridgetower = BridgeTowerModel(config) + + self.itc_text_head = BridgeTowerITCHead(config.hidden_size, config.contrastive_hidden_size) + self.itc_image_head = BridgeTowerITCHead(config.hidden_size, config.contrastive_hidden_size) + self.itc_cross_modal_head = BridgeTowerITCHead(config.hidden_size * 2, config.contrastive_hidden_size) + + # Initialize weights and apply final processing + self.post_init() + + def forward( + self, + input_ids: Optional[torch.LongTensor] = None, + attention_mask: Optional[torch.FloatTensor] = None, + token_type_ids: Optional[torch.LongTensor] = None, + pixel_values: Optional[torch.FloatTensor] = None, + pixel_mask: Optional[torch.LongTensor] = None, + head_mask: Optional[torch.FloatTensor] = None, + inputs_embeds: Optional[torch.FloatTensor] = None, + image_embeds: Optional[torch.FloatTensor] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + labels: Optional[torch.LongTensor] = None, + ) -> Union[SequenceClassifierOutput, Tuple[torch.FloatTensor]]: + + assert output_hidden_states, "output_hidden_states should be set to True for BridgeTowerForITC" + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + + outputs = self.bridgetower( + input_ids, + attention_mask=attention_mask, + token_type_ids=token_type_ids, + pixel_values=pixel_values, + pixel_mask=pixel_mask, + head_mask=head_mask, + inputs_embeds=inputs_embeds, + image_embeds=image_embeds, + output_attentions=output_attentions, + output_hidden_states=output_hidden_states, + return_dict=return_dict, + ) + + pooler_output = outputs.pooler_output if return_dict else outputs[2] + + hidden_states_txt, hidden_states_img, hidden_states_cross_modal = outputs.hidden_states + + final_hidden_txt = hidden_states_txt[-1] + final_hidden_img = hidden_states_img[-1] + + image_embeds_with_ln = self.bridgetower.vision_model.visual.forward_post(final_hidden_img) + image_token_type_embeddings = self.bridgetower.token_type_embeddings( + torch.full((1,), 1, dtype=torch.long, device=self.bridgetower.token_type_embeddings.weight.device) + ).expand_as(image_embeds_with_ln) + + final_hidden_img = ( + self.bridgetower.cross_modal_image_transform(image_embeds_with_ln) + image_token_type_embeddings + ) + + final_hidden_txt = F.normalize(self.itc_text_head(final_hidden_txt[:, 0, :]), dim=-1, p=2) + final_hidden_img = F.normalize(self.itc_image_head(final_hidden_img[:, 0, :]), dim=-1, p=2) + final_hidden_cross = F.normalize(self.itc_cross_modal_head(pooler_output), dim=-1, p=2) + + logits = torch.stack([final_hidden_txt, final_hidden_img, final_hidden_cross], dim=-2) + + if not return_dict: + return tuple(logits) + + return SequenceClassifierOutput( + loss=None, + logits=logits, + hidden_states=outputs.hidden_states, + attentions=outputs.attentions, + ) diff --git a/comps/embeddings/multimodal_embeddings/bridgetower/bridgetower_embedding.py b/comps/embeddings/multimodal_embeddings/bridgetower/bridgetower_embedding.py new file mode 100644 index 0000000000..f61d8e1c33 --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/bridgetower/bridgetower_embedding.py @@ -0,0 +1,122 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any, List + +import torch +from langchain_core.embeddings import Embeddings +from langchain_core.pydantic_v1 import BaseModel, Extra +from PIL import Image +from transformers import BridgeTowerProcessor + +from .bridgetower_custom import BridgeTowerForITC, BridgeTowerTextFeatureExtractor + + +class BridgeTowerEmbedding(BaseModel, Embeddings): + """BridgeTower embedding model.""" + + model_name: str = "BridgeTower/bridgetower-large-itm-mlm-itc" + device: str = "cpu" + TEXT_MODEL: Any + PROCESSOR: Any + MODEL: Any + + def __init__(self, **kwargs: Any): + """Initialize the BridgeTowerEmbedding class.""" + super().__init__(**kwargs) + + if "device" in kwargs: + if kwargs["device"] == "hpu": + try: + import habana_frameworks.torch.core as htcore + + self.device = torch.device("hpu") + except ImportError: + self.device = "cpu" + elif kwargs["device"] == "gpu": + if torch.cuda.is_available(): + self.device = "cuda" + else: + self.device = "cpu" + + self.TEXT_MODEL = BridgeTowerTextFeatureExtractor.from_pretrained(self.model_name).to(self.device) + self.PROCESSOR = BridgeTowerProcessor.from_pretrained(self.model_name) + self.MODEL = BridgeTowerForITC.from_pretrained(self.model_name).to(self.device) + + class Config: + """Configuration for this pydantic object.""" + + extra = Extra.forbid + + def embed_documents(self, texts: List[str]) -> List[List[float]]: + """Embed a list of documents using BridgeTower. + + Args: + texts: The list of texts to embed. + Returns: + List of embeddings, one for each text. + """ + encodings = self.PROCESSOR.tokenizer(texts, return_tensors="pt").to(self.device) + with torch.no_grad(): + outputs = self.TEXT_MODEL(**encodings) + embeddings = outputs.cpu().numpy().tolist() + return embeddings + + def embed_query(self, text: str) -> List[float]: + """Embed a query using BridgeTower. + + Args: + text: The text to embed. + Returns: + Embeddings for the text. + """ + return self.embed_documents([text])[0] + + def embed_image_text_pairs(self, texts: List[str], images: list[Image], batch_size=2) -> List[List[float]]: # type: ignore + """Embed a list of image-text pairs using BridgeTower. + + Args: + texts: The list of texts to embed. + images: The list of path-to-images to embed + batch_size: the batch size to process, default to 2 + Returns: + List of embeddings, one for each image-text pairs. + """ + + # the length of texts must be equal to the length of images + assert len(texts) == len(images), "the number of captions should be equal to the number of images" + + image_list = [] + text_list = [] + embeddings = [] + for pil_img, text in zip(images, texts): + # print(path_to_img) + # img = read_image(path_to_img, mode=ImageReadMode.RGB) + # img = transform.to_pil_image(img) + + img = pil_img.convert("RGB") + image_list.append(img) + text_list.append(text) + if len(text_list) == batch_size: + batch = self.PROCESSOR( + image_list, text_list, return_tensors="pt", max_length=200, padding="max_length", truncation=True + ).to(self.device) + with torch.no_grad(): + batch_embeddings = self.MODEL(**batch, output_hidden_states=True) + + for i in range(len(text_list)): + embeddings.append(batch_embeddings.logits[i, 2, :].detach().cpu().numpy().tolist()) + image_list = [] + text_list = [] + # embedding the remaining + if len(text_list) > 0: + batch = self.PROCESSOR( + image_list, text_list, return_tensors="pt", max_length=100, padding="max_length", truncation=True + ).to(self.device) + with torch.no_grad(): + batch_embeddings = self.MODEL(**batch, output_hidden_states=True) + for i in range(len(text_list)): + embeddings.append(batch_embeddings.logits[i, 2, :].detach().cpu().numpy().tolist()) + image_list = [] + text_list = [] + return embeddings diff --git a/comps/embeddings/multimodal_embeddings/bridgetower/bridgetower_server.py b/comps/embeddings/multimodal_embeddings/bridgetower/bridgetower_server.py new file mode 100644 index 0000000000..62e70c74fe --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/bridgetower/bridgetower_server.py @@ -0,0 +1,153 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import argparse +import asyncio +import base64 +import os +import uuid +from functools import partial +from io import BytesIO +from typing import List + +import PIL +import PIL.Image +import requests +import uvicorn +from fastapi import BackgroundTasks, FastAPI, Request +from fastapi.responses import JSONResponse, Response +from utils import build_logger + +from comps.embeddings.multimodal_embeddings.bridgetower import BridgeTowerEmbedding + +worker_id = str(uuid.uuid4())[:6] +print(f"worker_id: {worker_id}") +logger = build_logger("embedding_worker", f"bridgetower_embedding_worker_{worker_id}.log") +model_semaphore = None +global_counter = 0 + +model_name_or_path = None +model_dtype = None +use_hpu_graphs = True + + +app = FastAPI() + + +def release_model_semaphore(fn=None): + model_semaphore.release() + if fn is not None: + fn() + + +def get_queue_length(): + if model_semaphore is None: + return 0 + else: + return ( + args.limit_model_concurrency + - model_semaphore._value + + (len(model_semaphore._waiters) if model_semaphore._waiters is not None else 0) + ) + + +def get_status(): + return { + "model_names": [model_name_or_path], + "speed": 1, + "queue_length": get_queue_length(), + "global_counter": global_counter, + } + + +@app.get("/v1/health_check") +async def health() -> Response: + """Health check.""" + return Response(status_code=200, content=b'{"message" : "BridgeTower server is running..."}') + + +@app.post("/v1/encode") +async def encode(request: Request) -> Response: + global model_semaphore, global_counter + global_counter += 1 + + request_dict = await request.json() + if model_semaphore is None: + model_semaphore = asyncio.Semaphore(args.limit_model_concurrency) + await model_semaphore.acquire() + + text = request_dict.pop("text") + image = None + if "img_b64_str" in request_dict.keys(): + img_b64_str = request_dict.pop("img_b64_str") + image = PIL.Image.open(BytesIO(base64.b64decode(img_b64_str))) + if image is None: + # embed text only + embeddings = embedder.embed_documents([text])[0] + else: + # embed image and text pair + embeddings = embedder.embed_image_text_pairs([text], [image], batch_size=1)[0] + + background_tasks = BackgroundTasks() + background_tasks.add_task(partial(release_model_semaphore)) + return JSONResponse( + status_code=200, + content={ + "embedding": embeddings, + }, + background=background_tasks, + ) + + +@app.post("/v1/worker_get_status") +async def get_woker_status(): + return get_status() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--host", type=str, default="0.0.0.0") + parser.add_argument("--model_name_or_path", type=str, default="BridgeTower/bridgetower-large-itm-mlm-itc") + parser.add_argument("--warmup", type=int, default=1, help="Number of warmup iterations for benchmarking.") + parser.add_argument("--device", type=str, default="cpu") + parser.add_argument("--limit-model-concurrency", type=int, default=5) + + args = parser.parse_args() + # get port from env variable if exist + args.port = int(os.getenv("PORT", 8080)) + + print(f"device: {args.device}") + logger.info(f"args: {args}") + + if args.device == "hpu": + try: + import habana_frameworks.torch.core as htcore + except ImportError: + print("device: hpu is not available. Using cpu instead!") + args.device = "cpu" + + model_name_or_path = args.model_name_or_path + + embedder = BridgeTowerEmbedding(device=args.device) + + # warmup + print("Warmup...") + image_paths = ["https://llava-vl.github.io/static/images/view.jpg"] + example_prompts = ["This is test image!"] + images = [] + for image_path in image_paths: + images.append(PIL.Image.open(requests.get(image_path, stream=True, timeout=3000).raw)) + for i in range(args.warmup): + embedder.embed_image_text_pairs( + example_prompts, + images, + batch_size=1, + ) + print("Done warmup...") + + uvicorn.run( + app, + host=args.host, + port=args.port, + log_level="debug", + ) diff --git a/comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile b/comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile new file mode 100644 index 0000000000..83cd41ae18 --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile @@ -0,0 +1,25 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +FROM python:3.10-slim +RUN useradd -m -s /bin/bash user && \ + mkdir -p /home/user && \ + chown -R user /home/user/ +USER user +# Set environment variables +ENV LANG=en_US.UTF-8 +ENV PYTHONPATH=/home/user:/usr/lib/habanalabs/:/optimum-habana + +COPY --chown=user comps /home/user/comps + +RUN pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir -r /home/user/comps/embeddings/multimodal_embeddings/multimodal_langchain/requirements.txt + +ENV PYTHONPATH=$PYTHONPATH:/home/user + +ARG EMBEDDER_PORT=8080 +ENV PORT=$EMBEDDER_PORT + +WORKDIR /home/user/comps/embeddings/multimodal_embeddings/bridgetower + +ENTRYPOINT ["python", "bridgetower_server.py", "--device", "cpu"] \ No newline at end of file diff --git a/comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile_hpu b/comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile_hpu new file mode 100644 index 0000000000..e571ab2538 --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile_hpu @@ -0,0 +1,29 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +# HABANA environment +FROM vault.habana.ai/gaudi-docker/1.16.1/ubuntu22.04/habanalabs/pytorch-installer-2.2.2:latest AS hpu +RUN useradd -m -s /bin/bash user && \ + mkdir -p /home/user && \ + chown -R user /home/user/ + +RUN rm -rf /etc/ssh/ssh_host* +USER user +# Set environment variables +ENV LANG=en_US.UTF-8 +ENV PYTHONPATH=/home/user:/usr/lib/habanalabs/:/optimum-habana + +COPY --chown=user comps /home/user/comps + +# Install requirements and optimum habana +RUN pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir -r /home/user/comps/embeddings/multimodal_embeddings/multimodal_langchain/requirements.txt && \ + pip install optimum[habana] + +ENV PYTHONPATH=$PYTHONPATH:/home/user + +ARG EMBEDDER_PORT=8080 +ENV PORT=$EMBEDDER_PORT + +WORKDIR /home/user/comps/embeddings/multimodal_embeddings/bridgetower +ENTRYPOINT ["python", "bridgetower_server.py", "--device", "hpu"] \ No newline at end of file diff --git a/comps/embeddings/multimodal_embeddings/bridgetower/docker/docker_compose_bridgetower_embedding_endpoint.yaml b/comps/embeddings/multimodal_embeddings/bridgetower/docker/docker_compose_bridgetower_embedding_endpoint.yaml new file mode 100644 index 0000000000..9767490d0a --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/bridgetower/docker/docker_compose_bridgetower_embedding_endpoint.yaml @@ -0,0 +1,19 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +services: + bridgetower: + image: opea/bridgetower-embedder:latest + container_name: bridgetower-embedding-server + ports: + - ${EMBEDDER_PORT}:${EMBEDDER_PORT} + ipc: host + environment: + no_proxy: ${no_proxy} + http_proxy: ${http_proxy} + https_proxy: ${https_proxy} + restart: unless-stopped + +networks: + default: + driver: bridge diff --git a/comps/embeddings/multimodal_embeddings/bridgetower/utils.py b/comps/embeddings/multimodal_embeddings/bridgetower/utils.py new file mode 100644 index 0000000000..673d54dbcc --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/bridgetower/utils.py @@ -0,0 +1,90 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import logging +import os +import sys + +handler = None +save_log = True +LOGDIR = "." + + +def build_logger(logger_name, logger_filename): + global handler + + formatter = logging.Formatter( + fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + # Set the format of root handlers + if not logging.getLogger().handlers: + logging.basicConfig(level=logging.INFO) + logging.getLogger().handlers[0].setFormatter(formatter) + + # Redirect stdout and stderr to loggers + stdout_logger = logging.getLogger("stdout") + stdout_logger.setLevel(logging.INFO) + sl = StreamToLogger(stdout_logger, logging.INFO) + sys.stdout = sl + + stderr_logger = logging.getLogger("stderr") + stderr_logger.setLevel(logging.ERROR) + sl = StreamToLogger(stderr_logger, logging.ERROR) + sys.stderr = sl + + # Get logger + logger = logging.getLogger(logger_name) + logger.setLevel(logging.INFO) + + # Add a file handler for all loggers + if handler is None and save_log: + os.makedirs(LOGDIR, exist_ok=True) + filename = os.path.join(LOGDIR, logger_filename) + handler = logging.handlers.TimedRotatingFileHandler(filename, when="D", utc=True, encoding="UTF-8") + handler.setFormatter(formatter) + + for name, item in logging.root.manager.loggerDict.items(): + if isinstance(item, logging.Logger): + item.addHandler(handler) + + return logger + + +class StreamToLogger(object): + """Fake file-like stream object that redirects writes to a logger instance.""" + + def __init__(self, logger, log_level=logging.INFO): + self.terminal = sys.stdout + self.logger = logger + self.log_level = log_level + self.linebuf = "" + + def __getattr__(self, attr): + return getattr(self.terminal, attr) + + def write(self, buf): + temp_linebuf = self.linebuf + buf + self.linebuf = "" + for line in temp_linebuf.splitlines(True): + # From the io.TextIOWrapper docs: + # On output, if newline is None, any '\n' characters written + # are translated to the system default line separator. + # By default sys.stdout.write() expects '\n' newlines and then + # translates them so this is still cross platform. + if line[-1] == "\n": + self.logger.log(self.log_level, line.rstrip()) + else: + self.linebuf += line + + def flush(self): + if self.linebuf != "": + self.logger.log(self.log_level, self.linebuf.rstrip()) + self.linebuf = "" + + +def pretty_print_semaphore(semaphore): + if semaphore is None: + return "None" + return f"Semaphore(value={semaphore._value}, locked={semaphore.locked()})" diff --git a/comps/embeddings/multimodal_embeddings/multimodal_langchain/__init__.py b/comps/embeddings/multimodal_embeddings/multimodal_langchain/__init__.py new file mode 100644 index 0000000000..916f3a44b2 --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/multimodal_langchain/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/Dockerfile b/comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/Dockerfile new file mode 100644 index 0000000000..97d5906ecb --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/Dockerfile @@ -0,0 +1,29 @@ + +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +FROM langchain/langchain:latest + +RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \ + libgl1-mesa-glx \ + libjemalloc-dev \ + vim + +RUN useradd -m -s /bin/bash user && \ + mkdir -p /home/user && \ + chown -R user /home/user/ + +USER user + +COPY comps /home/user/comps + +RUN pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir -r /home/user/comps/embeddings/multimodal_embeddings/multimodal_langchain/requirements.txt + +# RUN pip install --upgrade pydantic + +ENV PYTHONPATH=$PYTHONPATH:/home/user + +WORKDIR /home/user/comps/embeddings/multimodal_embeddings/multimodal_langchain + +ENTRYPOINT ["python", "mm_embedding_mmei.py"] diff --git a/comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/docker_compose_multimodal_embedding.yaml b/comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/docker_compose_multimodal_embedding.yaml new file mode 100644 index 0000000000..314233f931 --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/docker_compose_multimodal_embedding.yaml @@ -0,0 +1,21 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +services: + embedding: + image: opea/embedding-multimodal:latest + container_name: embedding-multimodal-server + ports: + - ${MM_EMBEDDING_PORT_MICROSERVICE}:${MM_EMBEDDING_PORT_MICROSERVICE} + ipc: host + environment: + no_proxy: ${no_proxy} + http_proxy: ${http_proxy} + https_proxy: ${https_proxy} + MMEI_EMBEDDING_ENDPOINT: ${MMEI_EMBEDDING_ENDPOINT} + MM_EMBEDDING_PORT_MICROSERVICE: ${MM_EMBEDDING_PORT_MICROSERVICE} + restart: unless-stopped + +networks: + default: + driver: bridge diff --git a/comps/embeddings/multimodal_embeddings/multimodal_langchain/local_mm_embedding.py b/comps/embeddings/multimodal_embeddings/multimodal_langchain/local_mm_embedding.py new file mode 100644 index 0000000000..7327284a8f --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/multimodal_langchain/local_mm_embedding.py @@ -0,0 +1,58 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os + +from comps import ( + CustomLogger, + EmbedDoc, + EmbedMultimodalDoc, + MultimodalDoc, + ServiceType, + TextDoc, + TextImageDoc, + opea_microservices, + register_microservice, +) +from comps.embeddings.multimodal_embeddings.bridgetower import BridgeTowerEmbedding + +logger = CustomLogger("local_multimodal_embedding") +logflag = os.getenv("LOGFLAG", False) + +port = int(os.getenv("MM_EMBEDDING_PORT_MICROSERVICE", 6600)) + + +@register_microservice( + name="opea_service@local_multimodal_embedding", + service_type=ServiceType.EMBEDDING, + endpoint="/v1/embeddings", + host="0.0.0.0", + port=port, + input_datatype=MultimodalDoc, + output_datatype=EmbedMultimodalDoc, +) +def embedding(input: MultimodalDoc) -> EmbedDoc: + if logflag: + logger.info(input) + + if isinstance(input, TextDoc): + # Handle text input + embed_vector = embeddings.embed_query(input.text) + res = EmbedDoc(text=input.text, embedding=embed_vector) + + elif isinstance(input, TextImageDoc): + # Handle text + image input + pil_image = input.image.url.load_pil() + embed_vector = embeddings.embed_image_text_pairs([input.text.text], [pil_image], batch_size=1)[0] + res = EmbedMultimodalDoc(text=input.text.text, url=input.image.url, embedding=embed_vector) + else: + raise ValueError("Invalid input type") + + if logflag: + logger.info(res) + return res + + +if __name__ == "__main__": + embeddings = BridgeTowerEmbedding() + opea_microservices["opea_service@local_multimodal_embedding"].start() diff --git a/comps/embeddings/multimodal_embeddings/multimodal_langchain/mm_embedding_mmei.py b/comps/embeddings/multimodal_embeddings/multimodal_langchain/mm_embedding_mmei.py new file mode 100644 index 0000000000..fbd972a202 --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/multimodal_langchain/mm_embedding_mmei.py @@ -0,0 +1,84 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import base64 +import os +import time + +import requests +from fastapi.responses import JSONResponse + +from comps import ( + CustomLogger, + EmbedDoc, + EmbedMultimodalDoc, + MultimodalDoc, + ServiceType, + TextDoc, + TextImageDoc, + opea_microservices, + register_microservice, + register_statistics, + statistics_dict, +) + +logger = CustomLogger("multimodal_embedding_mmei_langchain") +logflag = os.getenv("LOGFLAG", False) +port = int(os.getenv("MM_EMBEDDING_PORT_MICROSERVICE", 6600)) +headers = {"Content-Type": "application/json"} + + +@register_microservice( + name="opea_service@multimodal_embedding_mmei_langchain", + service_type=ServiceType.EMBEDDING, + endpoint="/v1/embeddings", + host="0.0.0.0", + port=port, + input_datatype=MultimodalDoc, + output_datatype=EmbedMultimodalDoc, +) +@register_statistics(names=["opea_service@multimodal_embedding_mmei_langchain"]) +def embedding(input: MultimodalDoc) -> EmbedDoc: + start = time.time() + if logflag: + logger.info(input) + + json = {} + if isinstance(input, TextDoc): + json["text"] = input.text + elif isinstance(input, TextImageDoc): + json["text"] = input.text.text + img_bytes = input.image.url.load_bytes() + base64_img = base64.b64encode(img_bytes).decode("utf-8") + json["img_b64_str"] = base64_img + else: + return JSONResponse(status_code=400, content={"message": "Bad request!"}) + + # call multimodal embedding endpoint + try: + response = requests.post(mmei_embedding_endpoint, headers=headers, json=json) + if response.status_code != 200: + return JSONResponse(status_code=503, content={"message": "Multimodal embedding endpoint failed!"}) + + response_json = response.json() + embed_vector = response_json["embedding"] + if isinstance(input, TextDoc): + res = EmbedDoc(text=input.text, embedding=embed_vector) + elif isinstance(input, TextImageDoc): + res = EmbedMultimodalDoc(text=input.text.text, url=input.image.url, embedding=embed_vector) + except requests.exceptions.ConnectionError: + res = JSONResponse(status_code=503, content={"message": "Multimodal embedding endpoint not started!"}) + statistics_dict["opea_service@multimodal_embedding_mmei_langchain"].append_latency(time.time() - start, None) + if logflag: + logger.info(res) + return res + + +if __name__ == "__main__": + url_endpoint = os.getenv("MMEI_EMBEDDING_HOST_ENDPOINT", "http://0.0.0.0") + port_endpoint = os.getenv("MMEI_EMBEDDING_PORT_ENDPOINT", "8080") + path_endpoint = os.getenv("MMEI_EMBEDDING_PATH_ENDPOINT", "/v1/encode") + + mmei_embedding_endpoint = os.getenv("MMEI_EMBEDDING_ENDPOINT", f"{url_endpoint}:{port_endpoint}{path_endpoint}") + logger.info(f"MMEI Gaudi Embedding initialized at {mmei_embedding_endpoint}") + opea_microservices["opea_service@multimodal_embedding_mmei_langchain"].start() diff --git a/comps/embeddings/multimodal_embeddings/multimodal_langchain/requirements.txt b/comps/embeddings/multimodal_embeddings/multimodal_langchain/requirements.txt new file mode 100644 index 0000000000..cc9d77a432 --- /dev/null +++ b/comps/embeddings/multimodal_embeddings/multimodal_langchain/requirements.txt @@ -0,0 +1,14 @@ +docarray[full] +fastapi +huggingface_hub +langchain +opentelemetry-api +opentelemetry-exporter-otlp +opentelemetry-sdk +prometheus-fastapi-instrumentator +pydantic==2.8.2 +shortuuid +torch +torchvision +transformers +uvicorn diff --git a/tests/test_dataprep_redis_multimodal_langchain.sh b/tests/test_dataprep_redis_multimodal_langchain.sh new file mode 100644 index 0000000000..e5a75f8604 --- /dev/null +++ b/tests/test_dataprep_redis_multimodal_langchain.sh @@ -0,0 +1,278 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -x + +WORKPATH=$(dirname "$PWD") +LOG_PATH="$WORKPATH/tests" +ip_address=$(hostname -I | awk '{print $1}') +LVM_PORT=5028 +LVM_ENDPOINT="http://${ip_address}:${LVM_PORT}/v1/lvm" +WHISPER_MODEL="base" +INDEX_NAME="dataprep" +video_name="WeAreGoingOnBullrun" +transcript_fn="${video_name}.vtt" +video_fn="${video_name}.mp4" + +function build_docker_images() { + cd $WORKPATH + echo $(pwd) + docker build --no-cache -t opea/dataprep-redis:comps --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/dataprep/redis/multimodal_langchain/docker/Dockerfile . + + if [ $? -ne 0 ]; then + echo "opea/dataprep-redis built fail" + exit 1 + else + echo "opea/dataprep-redis built successful" + fi +} + +function build_lvm_docker_images() { + cd $WORKPATH + echo $(pwd) + docker build --no-cache -t opea/llava:comps -f comps/lvms/llava/Dockerfile . + if [ $? -ne 0 ]; then + echo "opea/llava built fail" + exit 1 + else + echo "opea/llava built successful" + fi + docker build --no-cache -t opea/lvm:comps -f comps/lvms/Dockerfile . + if [ $? -ne 0 ]; then + echo "opea/lvm built fail" + exit 1 + else + echo "opea/lvm built successful" + fi +} + +function start_lvm_service() { + unset http_proxy + docker run -d --name="test-comps-lvm-llava" -e http_proxy=$http_proxy -e https_proxy=$https_proxy -p 5029:8399 --ipc=host opea/llava:comps + docker run -d --name="test-comps-lvm" -e LVM_ENDPOINT=http://$ip_address:5029 -e http_proxy=$http_proxy -e https_proxy=$https_proxy -p ${LVM_PORT}:9399 --ipc=host opea/lvm:comps + sleep 5m +} + +function start_lvm() { + cd $WORKPATH + echo $(pwd) + echo "Building LVM Docker Images" + build_lvm_docker_images + echo "Starting LVM Services" + start_lvm_service + +} + +function start_service() { + # start redis + echo "Starting Redis server" + REDIS_PORT=6380 + docker run -d --name="test-comps-dataprep-redis-multimodal-langchain" -e http_proxy=$http_proxy -e https_proxy=$https_proxy -p $REDIS_PORT:6379 -p 8002:8001 --ipc=host redis/redis-stack:7.2.0-v9 + + # start dataprep microservice + echo "Starting dataprep microservice" + dataprep_service_port=5013 + REDIS_URL="redis://${ip_address}:${REDIS_PORT}" + docker run -d --name="test-comps-dataprep-redis-multimodal-langchain-server" -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e LVM_ENDPOINT=$LVM_ENDPOINT -p ${dataprep_service_port}:6007 --runtime=runc --ipc=host opea/dataprep-redis:comps + + sleep 1m +} + +function prepare_data() { + echo "Prepare Transcript .vtt" + cd ${LOG_PATH} + echo $(pwd) + echo """WEBVTT + +00:00:00.000 --> 00:00:03.400 +Last year the smoking tire went on the bull run live rally in the + +00:00:03.400 --> 00:00:09.760 +2010 Ford SBT Raptor. I liked it so much. I bought one. Here it is. We're going back + +00:00:09.760 --> 00:00:12.920 +to bull run this year of course we'll help from our friends at Black Magic and + +00:00:12.920 --> 00:00:19.560 +we're so serious about it. We got two Valentine one radar detectors. Oh yeah. + +00:00:19.560 --> 00:00:23.760 +So we're all set up and the reason we got two is because we're going to be going + +00:00:23.760 --> 00:00:29.920 +a little bit faster. We got a 2011 Shelby GT500. The 550 horsepower + +00:00:29.920 --> 00:00:34.560 +all-luminum V8. We are going to be right in the action bringing you guys a video + +00:00:34.560 --> 00:00:40.120 +every single day live from the bull run rally July 9th to 16th and the only + +00:00:40.120 --> 00:00:45.240 +place to watch it is on BlackmagicShine.com. We're right here on the smoking + +00:00:45.240 --> 00:00:47.440 +tire.""" > ${transcript_fn} + + echo "Downloading Video" + wget http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/WeAreGoingOnBullrun.mp4 -O ${video_fn} + +} + +function validate_microservice() { + cd $LOG_PATH + + # test v1/generate_transcripts upload file + echo "Testing generate_transcripts API" + URL="http://${ip_address}:$dataprep_service_port/v1/generate_transcripts" + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F "files=@./$video_fn" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - upload - file" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs test-comps-dataprep-redis-multimodal-langchain-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs test-comps-dataprep-redis-multimodal-langchain-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test v1/videos_with_transcripts upload file + echo "Testing videos_with_transcripts API" + URL="http://${ip_address}:$dataprep_service_port/v1/videos_with_transcripts" + + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F "files=@./$video_fn" -F "files=@./$transcript_fn" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - upload - file" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs test-comps-dataprep-redis-multimodal-langchain-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs test-comps-dataprep-redis-multimodal-langchain-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test v1/generate_captions upload file + echo "Testing generate_captions API" + URL="http://${ip_address}:$dataprep_service_port/v1/generate_captions" + + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F "files=@./$video_fn" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - upload - file" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs test-comps-dataprep-redis-multimodal-langchain-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs test-comps-dataprep-redis-multimodal-langchain-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + + + # test /v1/dataprep/get_videos + echo "Testing get_videos API" + URL="http://${ip_address}:$dataprep_service_port/v1/dataprep/get_videos" + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - get" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs test-comps-dataprep-redis-multimodal-langchain-server >> ${LOG_PATH}/dataprep_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *${video_name}* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs test-comps-dataprep-redis-multimodal-langchain-server >> ${LOG_PATH}/dataprep_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test /v1/dataprep/delete_videos + echo "Testing delete_videos API" + URL="http://${ip_address}:$dataprep_service_port/v1/dataprep/delete_videos" + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -d '{"file_path": "dataprep_file.txt"}' -H 'Content-Type: application/json' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - del" + + # check response status + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs test-comps-dataprep-redis-multimodal-langchain-server >> ${LOG_PATH}/dataprep_del.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + # check response body + if [[ "$RESPONSE_BODY" != *'{"status":true}'* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs test-comps-dataprep-redis-multimodal-langchain-server >> ${LOG_PATH}/dataprep_del.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi +} + +function stop_docker() { + cid=$(docker ps -aq --filter "name=test-comps-dataprep-redis-multimodal-langchain*") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi + cid=$(docker ps -aq --filter "name=test-comps-lvm*") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi + +} + +function delete_data() { + cd ${LOG_PATH} + rm -rf WeAreGoingOnBullrun.vtt + rm -rf WeAreGoingOnBullrun.mp4 + sleep 1s +} + +function main() { + + stop_docker + start_lvm + build_docker_images + start_service + prepare_data + + validate_microservice + delete_data + stop_docker + # echo y | docker system prune + +} + +main diff --git a/tests/test_multimodal_embeddings_langchain_cpu.sh b/tests/test_multimodal_embeddings_langchain_cpu.sh new file mode 100644 index 0000000000..77a7b6d993 --- /dev/null +++ b/tests/test_multimodal_embeddings_langchain_cpu.sh @@ -0,0 +1,111 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -x + +WORKPATH=$(dirname "$PWD") +ip_address=$(hostname -I | awk '{print $1}') +export your_mmei_port=8089 +export EMBEDDER_PORT=$your_mmei_port +export MMEI_EMBEDDING_ENDPOINT="http://$ip_address:$your_mmei_port/v1/encode" +export your_embedding_port_microservice=6609 +export MM_EMBEDDING_PORT_MICROSERVICE=$your_embedding_port_microservice +unset http_proxy + +function build_mmei_docker_images() { + cd $WORKPATH + echo $(pwd) + docker build --no-cache -t opea/bridgetower-embedder:latest --build-arg EMBEDDER_PORT=$EMBEDDER_PORT --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile . + + if [ $? -ne 0 ]; then + echo "opea/bridgetower-embedder built fail" + exit 1 + else + echo "opea/bridgetower-embedder built successful" + fi +} + +function build_embedding_service_images() { + cd $WORKPATH + echo $(pwd) + docker build --no-cache -t opea/embedding-multimodal:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/Dockerfile . + + if [ $? -ne 0 ]; then + echo "opea/embedding-multimodal built fail" + exit 1 + else + echo "opea/embedding-multimodal built successful" + fi +} + +function build_docker_images() { + build_mmei_docker_images + build_embedding_service_images +} + +function start_service() { + cd $WORKPATH + cd comps/embeddings/multimodal_embeddings/bridgetower/docker/ + docker compose -f docker_compose_bridgetower_embedding_endpoint.yaml up -d + cd $WORKPATH + cd comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/ + docker compose -f docker_compose_multimodal_embedding.yaml up -d + sleep 2m +} +function validate_microservice_text_embedding() { + result=$(http_proxy="" curl http://${ip_address}:$MM_EMBEDDING_PORT_MICROSERVICE/v1/embeddings \ + -X POST \ + -H "Content-Type: application/json" \ + -d '{"text" : "This is some sample text."}') + + if [[ $result == *"embedding"* ]]; then + echo "Result correct." + else + echo "Result wrong. Received was $result" + docker logs bridgetower-embedding-server + docker logs embedding-multimodal-server + exit 1 + fi +} + +function validate_microservice_image_text_pair_embedding() { + result=$(http_proxy="" curl http://${ip_address}:$MM_EMBEDDING_PORT_MICROSERVICE/v1/embeddings \ + -X POST \ + -H "Content-Type: application/json" \ + -d '{"text": {"text" : "This is some sample text."}, "image" : {"url": "https://github.com/docarray/docarray/blob/main/tests/toydata/image-data/apple.png?raw=true"}}') + + if [[ $result == *"embedding"* ]]; then + echo "Result correct." + else + echo "Result wrong. Received was $result" + docker logs bridgetower-embedding-server + docker logs embedding-multimodal-server + exit 1 + fi +} + +function validate_microservice() { + validate_microservice_text_embedding + validate_microservice_image_text_pair_embedding +} + +function stop_docker() { + cid=$(docker ps -aq --filter "name=bridgetower-embedding-server" --filter "name=embedding-multimodal-server") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi +} + +function main() { + + stop_docker + + build_docker_images + start_service + + validate_microservice + + stop_docker + echo y | docker system prune +} + +main diff --git a/tests/test_multimodal_embeddings_langchain_hpu.sh b/tests/test_multimodal_embeddings_langchain_hpu.sh new file mode 100644 index 0000000000..50c789c7d2 --- /dev/null +++ b/tests/test_multimodal_embeddings_langchain_hpu.sh @@ -0,0 +1,111 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -x + +WORKPATH=$(dirname "$PWD") +ip_address=$(hostname -I | awk '{print $1}') +export your_mmei_port=8089 +export EMBEDDER_PORT=$your_mmei_port +export MMEI_EMBEDDING_ENDPOINT="http://$ip_address:$your_mmei_port/v1/encode" +export your_embedding_port_microservice=6609 +export MM_EMBEDDING_PORT_MICROSERVICE=$your_embedding_port_microservice +unset http_proxy + +function build_mmei_docker_images() { + cd $WORKPATH + echo $(pwd) + docker build --no-cache -t opea/bridgetower-embedder:latest --build-arg EMBEDDER_PORT=$EMBEDDER_PORT --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/embeddings/multimodal_embeddings/bridgetower/docker/Dockerfile_hpu . + + if [ $? -ne 0 ]; then + echo "opea/bridgetower-embedder built fail" + exit 1 + else + echo "opea/bridgetower-embedder built successful" + fi +} + +function build_embedding_service_images() { + cd $WORKPATH + echo $(pwd) + docker build --no-cache -t opea/embedding-multimodal:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/Dockerfile . + + if [ $? -ne 0 ]; then + echo "opea/embedding-multimodal built fail" + exit 1 + else + echo "opea/embedding-multimodal built successful" + fi +} + +function build_docker_images() { + build_mmei_docker_images + build_embedding_service_images +} + +function start_service() { + cd $WORKPATH + cd comps/embeddings/multimodal_embeddings/bridgetower/docker/ + docker compose -f docker_compose_bridgetower_embedding_endpoint.yaml up -d + cd $WORKPATH + cd comps/embeddings/multimodal_embeddings/multimodal_langchain/docker/ + docker compose -f docker_compose_multimodal_embedding.yaml up -d + sleep 2m +} +function validate_microservice_text_embedding() { + result=$(http_proxy="" curl http://${ip_address}:$MM_EMBEDDING_PORT_MICROSERVICE/v1/embeddings \ + -X POST \ + -H "Content-Type: application/json" \ + -d '{"text" : "This is some sample text."}') + + if [[ $result == *"embedding"* ]]; then + echo "Result correct." + else + echo "Result wrong. Received was $result" + docker logs bridgetower-embedding-server + docker logs embedding-multimodal-server + exit 1 + fi +} + +function validate_microservice_image_text_pair_embedding() { + result=$(http_proxy="" curl http://${ip_address}:$MM_EMBEDDING_PORT_MICROSERVICE/v1/embeddings \ + -X POST \ + -H "Content-Type: application/json" \ + -d '{"text": {"text" : "This is some sample text."}, "image" : {"url": "https://github.com/docarray/docarray/blob/main/tests/toydata/image-data/apple.png?raw=true"}}') + + if [[ $result == *"embedding"* ]]; then + echo "Result correct." + else + echo "Result wrong. Received was $result" + docker logs bridgetower-embedding-server + docker logs embedding-multimodal-server + exit 1 + fi +} + +function validate_microservice() { + validate_microservice_text_embedding + validate_microservice_image_text_pair_embedding +} + +function stop_docker() { + cid=$(docker ps -aq --filter "name=bridgetower-embedding-server" --filter "name=embedding-multimodal-server") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi +} + +function main() { + + stop_docker + + build_docker_images + start_service + + validate_microservice + + stop_docker + echo y | docker system prune +} + +main