From 7628b61d22f2c546229ed608bd03022ea2a182b6 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 25 Mar 2024 17:08:21 +0100 Subject: [PATCH] Adding LLM example --- .../python-operator-dataflow/dataflow.yml | 3 - .../python-operator-dataflow/dataflow_llm.yml | 69 ++++ .../dataflow_record.yml | 87 +++++ .../python-operator-dataflow/file_saver_op.py | 44 +++ .../python-operator-dataflow/keyboard_op.py | 94 ++++++ examples/python-operator-dataflow/llm_op.py | 319 ++++++++++++++++++ examples/python-operator-dataflow/merge.py | 12 + .../python-operator-dataflow/microphone_op.py | 36 ++ .../object_detection.py | 53 +-- examples/python-operator-dataflow/plot.py | 199 +++++------ .../sentence_transformers_op.py | 100 ++++++ examples/python-operator-dataflow/utils.py | 25 +- examples/python-operator-dataflow/webcam.py | 33 +- .../python-operator-dataflow/whisper_op.py | 25 ++ 14 files changed, 924 insertions(+), 175 deletions(-) create mode 100644 examples/python-operator-dataflow/dataflow_llm.yml create mode 100644 examples/python-operator-dataflow/dataflow_record.yml create mode 100644 examples/python-operator-dataflow/file_saver_op.py create mode 100644 examples/python-operator-dataflow/keyboard_op.py create mode 100644 examples/python-operator-dataflow/llm_op.py create mode 100644 examples/python-operator-dataflow/merge.py create mode 100644 examples/python-operator-dataflow/microphone_op.py create mode 100644 examples/python-operator-dataflow/sentence_transformers_op.py create mode 100644 examples/python-operator-dataflow/whisper_op.py diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index ca069f7d3..92bf5f2b3 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -10,12 +10,10 @@ nodes: - id: object_detection operator: python: object_detection.py - send_stdout_as: stdout inputs: image: webcam/image outputs: - bbox - - stdout - id: plot operator: @@ -23,4 +21,3 @@ nodes: inputs: image: webcam/image bbox: object_detection/bbox - object_detection_stdout: object_detection/stdout diff --git a/examples/python-operator-dataflow/dataflow_llm.yml b/examples/python-operator-dataflow/dataflow_llm.yml new file mode 100644 index 000000000..6d3be9a58 --- /dev/null +++ b/examples/python-operator-dataflow/dataflow_llm.yml @@ -0,0 +1,69 @@ +nodes: + - id: webcam + operator: + python: webcam.py + inputs: + tick: dora/timer/millis/50 + outputs: + - image + + - id: object_detection + operator: + python: object_detection.py + inputs: + image: webcam/image + outputs: + - bbox + + - id: plot + operator: + python: plot.py + inputs: + image: webcam/image + bbox: object_detection/bbox + line: llm/line + keyboard_buffer: keyboard/buffer + user_message: keyboard/submitted + assistant_message: llm/assistant_message + + ## Speech to text + - id: keyboard + custom: + source: keyboard_op.py + outputs: + - buffer + - submitted + - record + - ask + - send + - change + + ## Code Modifier + - id: vectordb + operator: + python: sentence_transformers_op.py + inputs: + query: keyboard/change + saved_file: file_saver/saved_file + outputs: + - raw_file + + - id: llm + operator: + python: llm_op.py + inputs: + code_modifier: vectordb/raw_file + assistant: keyboard/ask + message_sender: keyboard/send + outputs: + - modified_file + - line + - assistant_message + + - id: file_saver + operator: + python: file_saver_op.py + inputs: + file: llm/modified_file + outputs: + - saved_file \ No newline at end of file diff --git a/examples/python-operator-dataflow/dataflow_record.yml b/examples/python-operator-dataflow/dataflow_record.yml new file mode 100644 index 000000000..faf978af4 --- /dev/null +++ b/examples/python-operator-dataflow/dataflow_record.yml @@ -0,0 +1,87 @@ +nodes: + - id: webcam + operator: + python: webcam.py + inputs: + tick: dora/timer/millis/50 + outputs: + - image + + - id: object_detection + operator: + python: object_detection.py + inputs: + image: webcam/image + outputs: + - bbox + + - id: plot + operator: + python: plot.py + inputs: + image: webcam/image + bbox: object_detection/bbox + line: llm/line + keyboard_buffer: keyboard/buffer + user_message: keyboard/submitted + assistant_message: llm/assistant_message + + ## Speech to text + - id: keyboard + custom: + source: keyboard_op.py + outputs: + - buffer + - submitted + - record + - ask + - send + - change + inputs: + recording: whisper/text + + - id: microphone + operator: + python: microphone_op.py + inputs: + record: keyboard/record + outputs: + - audio + + - id: whisper + operator: + python: whisper_op.py + inputs: + audio: microphone/audio + outputs: + - text + + ## Code Modifier + - id: vectordb + operator: + python: sentence_transformers_op.py + inputs: + query: keyboard/change + saved_file: file_saver/saved_file + outputs: + - raw_file + + - id: llm + operator: + python: llm_op.py + inputs: + code_modifier: vectordb/raw_file + assistant: keyboard/ask + message_sender: keyboard/send + outputs: + - modified_file + - line + - assistant_message + + - id: file_saver + operator: + python: file_saver_op.py + inputs: + file: llm/modified_file + outputs: + - saved_file \ No newline at end of file diff --git a/examples/python-operator-dataflow/file_saver_op.py b/examples/python-operator-dataflow/file_saver_op.py new file mode 100644 index 000000000..df1128f82 --- /dev/null +++ b/examples/python-operator-dataflow/file_saver_op.py @@ -0,0 +1,44 @@ +import pyarrow as pa + +from dora import DoraStatus + + +class Operator: + """ + Infering object from images + """ + + def __init__(self): + self.last_file = "" + self.last_path = "" + self.last_netadata = None + + def on_event( + self, + dora_event, + send_output, + ) -> DoraStatus: + if dora_event["type"] == "INPUT" and dora_event["id"] == "file": + input = dora_event["value"][0].as_py() + + with open(input["path"], "r") as file: + self.last_file = file.read() + self.last_path = input["path"] + self.last_metadata = dora_event["metadata"] + with open(input["path"], "w") as file: + file.write(input["raw"]) + + send_output( + "saved_file", + pa.array( + [ + { + "raw": input["raw"], + "path": input["path"], + "origin": dora_event["id"], + } + ] + ), + dora_event["metadata"], + ) + return DoraStatus.CONTINUE diff --git a/examples/python-operator-dataflow/keyboard_op.py b/examples/python-operator-dataflow/keyboard_op.py new file mode 100644 index 000000000..79a1cd4c3 --- /dev/null +++ b/examples/python-operator-dataflow/keyboard_op.py @@ -0,0 +1,94 @@ +from pynput import keyboard +from pynput.keyboard import Key, Events +import pyarrow as pa +from dora import Node +from tkinter import Tk +import tkinter as tk + + +node = Node() +buffer_text = "" +ctrl = False +submitted_text = [] +cursor = 0 + +NODE_TOPIC = ["record", "send", "ask", "change"] + +with keyboard.Events() as events: + while True: + dora_event = node.next(0.01) + if ( + dora_event is not None + and dora_event["type"] == "INPUT" + and dora_event["id"] == "recording" + ): + buffer_text += dora_event["value"][0].as_py() + node.send_output("buffer", pa.array([buffer_text])) + continue + + event = events.get(1.0) + if event is not None and isinstance(event, Events.Press): + if hasattr(event.key, "char"): + cursor = 0 + if ctrl and event.key.char == "v": + r = Tk() + r.update() + try: + selection = r.clipboard_get() + r.withdraw() + r.update() + except tk.TclError: + selection = "" + r.destroy() + buffer_text += selection + node.send_output("buffer", pa.array([buffer_text])) + elif ctrl and event.key.char == "c": + r = Tk() + r.clipboard_clear() + r.clipboard_append(buffer_text) + r.update() + r.destroy() + elif ctrl and event.key.char == "x": + r = Tk() + r.clipboard_clear() + r.clipboard_append(buffer_text) + r.update() + r.destroy() + buffer_text = "" + node.send_output("buffer", pa.array([buffer_text])) + else: + buffer_text += event.key.char + node.send_output("buffer", pa.array([buffer_text])) + else: + if event.key == Key.backspace: + buffer_text = buffer_text[:-1] + node.send_output("buffer", pa.array([buffer_text])) + elif event.key == Key.esc: + buffer_text = "" + node.send_output("buffer", pa.array([buffer_text])) + elif event.key == Key.enter: + node.send_output("submitted", pa.array([buffer_text])) + first_word = buffer_text.split(" ")[0] + if first_word in NODE_TOPIC: + node.send_output(first_word, pa.array([buffer_text])) + submitted_text.append(buffer_text) + buffer_text = "" + node.send_output("buffer", pa.array([buffer_text])) + elif event.key == Key.ctrl: + ctrl = True + elif event.key == Key.space: + buffer_text += " " + node.send_output("buffer", pa.array([buffer_text])) + elif event.key == Key.up: + if len(submitted_text) > 0: + cursor = max(cursor - 1, -len(submitted_text)) + buffer_text = submitted_text[cursor] + node.send_output("buffer", pa.array([buffer_text])) + elif event.key == Key.down: + if len(submitted_text) > 0: + cursor = min(cursor + 1, 0) + buffer_text = submitted_text[cursor] + node.send_output("buffer", pa.array([buffer_text])) + elif event is not None and isinstance(event, Events.Release): + if event.key == Key.ctrl: + ctrl = False diff --git a/examples/python-operator-dataflow/llm_op.py b/examples/python-operator-dataflow/llm_op.py new file mode 100644 index 000000000..2d0510962 --- /dev/null +++ b/examples/python-operator-dataflow/llm_op.py @@ -0,0 +1,319 @@ +from dora import DoraStatus +import pylcs +import os +import pyarrow as pa +from transformers import AutoModelForCausalLM, AutoTokenizer +import json + +import re +import time + +MODEL_NAME_OR_PATH = "TheBloke/deepseek-coder-6.7B-instruct-GPTQ" +# MODEL_NAME_OR_PATH = "hanspeterlyngsoeraaschoujensen/deepseek-math-7b-instruct-GPTQ" + +CODE_MODIFIER_TEMPLATE = """ +### Instruction +Respond with the small modified code only. No explaination. + +```python +{code} +``` + +{user_message} + +### Response: +""" + + +MESSAGE_SENDER_TEMPLATE = """ +### Instruction +You're a json expert. Format your response as a json with a topic and a data field in a ```json block. No explaination needed. No code needed. +The schema for those json are: +- line: Int[4] + +The response should look like this: +```json + + [ + {{ "topic": "line", "data": [10, 10, 90, 10] }}, +] +``` + +{user_message} + +### Response: +""" + +ASSISTANT_TEMPLATE = """ +### Instruction +You're a helpuf assistant named dora. +Reply with a short message. No code needed. + +User {user_message} + +### Response: +""" + + +model = AutoModelForCausalLM.from_pretrained( + MODEL_NAME_OR_PATH, + device_map="auto", + trust_remote_code=True, + revision="main", +) + + +tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME_OR_PATH, use_fast=True) + + +def extract_python_code_blocks(text): + """ + Extracts Python code blocks from the given text that are enclosed in triple backticks with a python language identifier. + + Parameters: + - text: A string that may contain one or more Python code blocks. + + Returns: + - A list of strings, where each string is a block of Python code extracted from the text. + """ + pattern = r"```python\n(.*?)\n```" + matches = re.findall(pattern, text, re.DOTALL) + if len(matches) == 0: + pattern = r"```python\n(.*?)(?:\n```|$)" + matches = re.findall(pattern, text, re.DOTALL) + if len(matches) == 0: + return [text] + + return matches + + +def extract_json_code_blocks(text): + """ + Extracts json code blocks from the given text that are enclosed in triple backticks with a json language identifier. + + Parameters: + - text: A string that may contain one or more json code blocks. + + Returns: + - A list of strings, where each string is a block of json code extracted from the text. + """ + pattern = r"```json\n(.*?)\n```" + matches = re.findall(pattern, text, re.DOTALL) + if len(matches) == 0: + pattern = r"```json\n(.*?)(?:\n```|$)" + matches = re.findall(pattern, text, re.DOTALL) + if len(matches) == 0: + return [text] + + return matches + + +def remove_last_line(python_code): + """ + Removes the last line from a given string of Python code. + + Parameters: + - python_code: A string representing Python source code. + + Returns: + - A string with the last line removed. + """ + lines = python_code.split("\n") # Split the string into lines + if lines: # Check if there are any lines to remove + lines.pop() # Remove the last line + return "\n".join(lines) # Join the remaining lines back into a string + + +def calculate_similarity(source, target): + """ + Calculate a similarity score between the source and target strings. + This uses the edit distance relative to the length of the strings. + """ + edit_distance = pylcs.edit_distance(source, target) + max_length = max(len(source), len(target)) + # Normalize the score by the maximum possible edit distance (the length of the longer string) + similarity = 1 - (edit_distance / max_length) + return similarity + + +def find_best_match_location(source_code, target_block): + """ + Find the best match for the target_block within the source_code by searching line by line, + considering blocks of varying lengths. + """ + source_lines = source_code.split("\n") + target_lines = target_block.split("\n") + + best_similarity = 0 + best_start_index = 0 + best_end_index = -1 + + # Iterate over the source lines to find the best matching range for all lines in target_block + for start_index in range(len(source_lines) - len(target_lines) + 1): + for end_index in range(start_index + len(target_lines), len(source_lines) + 1): + current_window = "\n".join(source_lines[start_index:end_index]) + current_similarity = calculate_similarity(current_window, target_block) + if current_similarity > best_similarity: + best_similarity = current_similarity + best_start_index = start_index + best_end_index = end_index + + # Convert line indices back to character indices for replacement + char_start_index = len("\n".join(source_lines[:best_start_index])) + ( + 1 if best_start_index > 0 else 0 + ) + char_end_index = len("\n".join(source_lines[:best_end_index])) + + return char_start_index, char_end_index + + +def replace_code_in_source(source_code, replacement_block: str): + """ + Replace the best matching block in the source_code with the replacement_block, considering variable block lengths. + """ + replacement_block = extract_python_code_blocks(replacement_block)[0] + replacement_block = remove_last_line(replacement_block) + start_index, end_index = find_best_match_location(source_code, replacement_block) + if start_index != -1 and end_index != -1: + # Replace the best matching part with the replacement block + new_source = ( + source_code[:start_index] + replacement_block + source_code[end_index:] + ) + return new_source + else: + return source_code + + +class Operator: + + def on_event( + self, + dora_event, + send_output, + ) -> DoraStatus: + if dora_event["type"] == "INPUT" and dora_event["id"] == "code_modifier": + input = dora_event["value"][0].as_py() + + with open(input["path"], "r", encoding="utf8") as f: + code = f.read() + + user_message = input["user_message"] + start_llm = time.time() + output = self.ask_llm( + CODE_MODIFIER_TEMPLATE.format(code=code, user_message=user_message) + ) + + source_code = replace_code_in_source(code, output) + print("response time:", time.time() - start_llm, flush=True) + send_output( + "modified_file", + pa.array( + [ + { + "raw": source_code, + "path": input["path"], + "response": output, + "prompt": input["user_message"], + } + ] + ), + dora_event["metadata"], + ) + print("response: ", output, flush=True) + send_output( + "assistant_message", + pa.array([output]), + dora_event["metadata"], + ) + elif dora_event["type"] == "INPUT" and dora_event["id"] == "message_sender": + user_message = dora_event["value"][0].as_py() + output = self.ask_llm( + MESSAGE_SENDER_TEMPLATE.format(user_message=user_message) + ) + outputs = extract_json_code_blocks(output)[0] + try: + outputs = json.loads(outputs) + if not isinstance(outputs, list): + outputs = [outputs] + for output in outputs: + if not isinstance(output["data"], list): + output["data"] = [output["data"]] + + if output["topic"] in [ + "line", + ]: + send_output( + output["topic"], + pa.array(output["data"]), + dora_event["metadata"], + ) + else: + print("Could not find the topic: {}".format(output["topic"])) + except: + print("Could not parse json") + # if data is not iterable, put data in a list + elif dora_event["type"] == "INPUT" and dora_event["id"] == "assistant": + user_message = dora_event["value"][0].as_py() + output = self.ask_llm(ASSISTANT_TEMPLATE.format(user_message=user_message)) + send_output( + "assistant_message", + pa.array([output]), + dora_event["metadata"], + ) + return DoraStatus.CONTINUE + + def ask_llm(self, prompt): + + # Generate output + # prompt = PROMPT_TEMPLATE.format(system_message=system_message, prompt=prompt)) + input = tokenizer(prompt, return_tensors="pt") + input_ids = input.input_ids.cuda() + + # add attention mask here + attention_mask = input["attention_mask"] + + output = model.generate( + inputs=input_ids, + temperature=0.7, + do_sample=True, + top_p=0.95, + top_k=40, + max_new_tokens=512, + attention_mask=attention_mask, + eos_token_id=tokenizer.eos_token_id, + ) + # Get the tokens from the output, decode them, print them + + # Get text between im_start and im_end + return tokenizer.decode(output[0], skip_special_tokens=True)[len(prompt) :] + + +if __name__ == "__main__": + op = Operator() + + # Path to the current file + current_file_path = __file__ + + # Directory of the current file + current_directory = os.path.dirname(current_file_path) + + path = current_directory + "object_detection.py" + with open(path, "r", encoding="utf8") as f: + raw = f.read() + + op.on_event( + { + "type": "INPUT", + "id": "message_sender", + "value": pa.array( + [ + { + "path": path, + "user_message": "send a star ", + }, + ] + ), + "metadata": [], + }, + print, + ) diff --git a/examples/python-operator-dataflow/merge.py b/examples/python-operator-dataflow/merge.py new file mode 100644 index 000000000..cd60745d5 --- /dev/null +++ b/examples/python-operator-dataflow/merge.py @@ -0,0 +1,12 @@ +import pyarrow as pa + +with pa.memory_map("image.arrow", "r") as source: + df_i = pa.ipc.open_file(source).read_all() + +with pa.memory_map("bbox.arrow", "r") as source: + df_b = pa.ipc.open_file(source).read_all() + +df_i = df_i.to_pandas() +df_b = df_b.to_pandas() + +df = df_i.merge(df_b, on="trace_id") diff --git a/examples/python-operator-dataflow/microphone_op.py b/examples/python-operator-dataflow/microphone_op.py new file mode 100644 index 000000000..6c3b37ad2 --- /dev/null +++ b/examples/python-operator-dataflow/microphone_op.py @@ -0,0 +1,36 @@ +import numpy as np +import pyarrow as pa +import sounddevice as sd + +from dora import DoraStatus + +# Set the parameters for recording +SAMPLE_RATE = 16000 +MAX_DURATION = 5 + + +class Operator: + """ + Microphone operator that records the audio + """ + + def on_event( + self, + dora_event, + send_output, + ) -> DoraStatus: + if dora_event["type"] == "INPUT": + audio_data = sd.rec( + int(SAMPLE_RATE * MAX_DURATION), + samplerate=SAMPLE_RATE, + channels=1, + dtype=np.int16, + blocking=True, + ) + + audio_data = audio_data.ravel().astype(np.float32) / 32768.0 + if len(audio_data) > 0: + send_output("audio", pa.array(audio_data), dora_event["metadata"]) + elif dora_event["type"] == "INPUT": + print("Microphone is not recording", dora_event["value"][0].as_py()) + return DoraStatus.CONTINUE diff --git a/examples/python-operator-dataflow/object_detection.py b/examples/python-operator-dataflow/object_detection.py index a6bd9cafb..087dfc27b 100755 --- a/examples/python-operator-dataflow/object_detection.py +++ b/examples/python-operator-dataflow/object_detection.py @@ -1,61 +1,40 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - - import numpy as np import pyarrow as pa from dora import DoraStatus from ultralytics import YOLO -pa.array([]) CAMERA_WIDTH = 640 CAMERA_HEIGHT = 480 +model = YOLO("yolov8n.pt") + + class Operator: """ Infering object from images """ - def __init__(self): - self.model = YOLO("yolov8n.pt") - def on_event( self, dora_event, send_output, ) -> DoraStatus: if dora_event["type"] == "INPUT": - return self.on_input(dora_event, send_output) - return DoraStatus.CONTINUE + frame = ( + dora_event["value"].to_numpy().reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3)) + ) + frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) + results = model(frame, verbose=False) # includes NMS + # Process results + boxes = np.array(results[0].boxes.xyxy.cpu()) + conf = np.array(results[0].boxes.conf.cpu()) + label = np.array(results[0].boxes.cls.cpu()) + # concatenate them together + arrays = np.concatenate((boxes, conf[:, None], label[:, None]), axis=1) + + send_output("bbox", pa.array(arrays.ravel()), dora_event["metadata"]) - def on_input( - self, - dora_input, - send_output, - ) -> DoraStatus: - """Handle image - Args: - dora_input (dict) containing the "id", value, and "metadata" - send_output Callable[[str, bytes | pa.Array, Optional[dict]], None]: - Function for sending output to the dataflow: - - First argument is the `output_id` - - Second argument is the data as either bytes or `pa.Array` - - Third argument is dora metadata dict - e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])` - """ - - frame = dora_input["value"].to_numpy().reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3)) - frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) - results = self.model(frame) # includes NMS - # Process results - boxes = np.array(results[0].boxes.xyxy.cpu()) - conf = np.array(results[0].boxes.conf.cpu()) - label = np.array(results[0].boxes.cls.cpu()) - # concatenate them together - arrays = np.concatenate((boxes, conf[:, None], label[:, None]), axis=1) - - send_output("bbox", pa.array(arrays.ravel()), dora_input["metadata"]) return DoraStatus.CONTINUE diff --git a/examples/python-operator-dataflow/plot.py b/examples/python-operator-dataflow/plot.py index f7f2ddd02..180af6fa4 100755 --- a/examples/python-operator-dataflow/plot.py +++ b/examples/python-operator-dataflow/plot.py @@ -1,22 +1,8 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -import os - import cv2 -import numpy as np -import pyarrow as pa - -from dora import DoraStatus -from utils import LABELS - -pa.array([]) -CI = os.environ.get("CI") -CAMERA_WIDTH = 640 -CAMERA_HEIGHT = 480 -font = cv2.FONT_HERSHEY_SIMPLEX +from dora import DoraStatus +from utils import LABELS, put_text, CAMERA_HEIGHT, CAMERA_WIDTH, FONT, CI class Operator: @@ -25,114 +11,91 @@ class Operator: """ def __init__(self): - self.image = [] self.bboxs = [] - self.bounding_box_messages = 0 - self.image_messages = 0 - self.object_detection_stdout = [] + self.buffer = "" + self.submitted = [] + self.lines = [] def on_event( self, dora_event, send_output, - ) -> DoraStatus: + ): if dora_event["type"] == "INPUT": - return self.on_input(dora_event, send_output) - return DoraStatus.CONTINUE - - def on_input( - self, - dora_input, - send_output, - ) -> DoraStatus: - """ - Put image and bounding box on cv2 window. - - Args: - dora_input["id"] (str): Id of the dora_input declared in the yaml configuration - dora_input["value"] (arrow array): message of the dora_input - send_output Callable[[str, bytes | pa.Array, Optional[dict]], None]: - Function for sending output to the dataflow: - - First argument is the `output_id` - - Second argument is the data as either bytes or `pa.Array` - - Third argument is dora metadata dict - e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])` - """ - if dora_input["id"] == "image": - frame = ( - dora_input["value"] - .to_numpy() - .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3)) - .copy() # copy the image because we want to modify it below - ) - self.image = frame - - self.image_messages += 1 - print("received " + str(self.image_messages) + " images") - - elif dora_input["id"] == "object_detection_stdout": - stdout = dora_input["value"][0].as_py() - self.object_detection_stdout += [stdout] - ## Only keep last 10 stdout - self.object_detection_stdout = self.object_detection_stdout[-10:] - return DoraStatus.CONTINUE - - elif dora_input["id"] == "bbox" and len(self.image) != 0: - bboxs = dora_input["value"].to_numpy() - self.bboxs = np.reshape(bboxs, (-1, 6)) - - self.bounding_box_messages += 1 - print("received " + str(self.bounding_box_messages) + " bounding boxes") - return DoraStatus.CONTINUE - else: - return DoraStatus.CONTINUE - - for bbox in self.bboxs: - [ - min_x, - min_y, - max_x, - max_y, - confidence, - label, - ] = bbox - cv2.rectangle( - self.image, - (int(min_x), int(min_y)), - (int(max_x), int(max_y)), - (0, 255, 0), - 2, - ) - - cv2.putText( - self.image, - LABELS[int(label)] + f", {confidence:0.2f}", - (int(max_x), int(max_y)), - font, - 0.75, - (0, 255, 0), - 2, - 1, - ) - - for i, log in enumerate(self.object_detection_stdout): - cv2.putText( - self.image, - log, - (10, 10 + 20 * i), - font, - 0.5, - (0, 255, 0), - 2, - 1, - ) - - if CI != "true": - cv2.imshow("frame", self.image) - if cv2.waitKey(1) & 0xFF == ord("q"): - return DoraStatus.STOP + id = dora_event["id"] + value = dora_event["value"] + if id == "image": + + image = ( + value.to_numpy().reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3)).copy() + ) + + for bbox in self.bboxs: + [ + min_x, + min_y, + max_x, + max_y, + confidence, + label, + ] = bbox + cv2.rectangle( + image, + (int(min_x), int(min_y)), + (int(max_x), int(max_y)), + (0, 255, 0), + ) + cv2.putText( + image, + f"{LABELS[int(label)]}, {confidence:0.2f}", + (int(max_x), int(max_y)), + FONT, + 0.45, + (0, 255, 0), + 2, + 1, + ) + + put_text( + image, + self.buffer, + (20, 12 * 25), + (190, 250, 0), + ) + + for i, text in enumerate(self.submitted[::-1]): + put_text( + image, + text["content"], + (20, 25 + (10 - i) * 25), + (0, 255, 190), + ) + + for line in self.lines: + cv2.line( + image, + (int(line[0]), int(line[1])), + (int(line[2]), int(line[3])), + (0, 0, 255), + 2, + ) + + if CI != "true": + cv2.imshow("frame", image) + if cv2.waitKey(1) & 0xFF == ord("q"): + return DoraStatus.STOP + elif id == "bbox": + self.bboxs = value.to_numpy().reshape((-1, 6)) + elif id == "keyboard_buffer": + self.buffer = value[0].as_py() + elif id == "line": + self.lines += [value.to_pylist()] + elif "message" in id: + self.submitted += [ + { + "role": id, + "content": value[0].as_py(), + } + ] return DoraStatus.CONTINUE - - def __del__(self): - cv2.destroyAllWindows() diff --git a/examples/python-operator-dataflow/sentence_transformers_op.py b/examples/python-operator-dataflow/sentence_transformers_op.py new file mode 100644 index 000000000..cd045211d --- /dev/null +++ b/examples/python-operator-dataflow/sentence_transformers_op.py @@ -0,0 +1,100 @@ +from sentence_transformers import SentenceTransformer +from sentence_transformers import util + +from dora import DoraStatus +import os +import sys +import inspect +import torch +import pyarrow as pa + +SHOULD_NOT_BE_INCLUDED = [ + "utils.py", + "sentence_transformers_op.py", + "chatgpt_op.py", + "llm_op.py", +] + +SHOULD_BE_INCLUDED = [ + "webcam.py", + "object_detection.py", + "plot.py", +] + + +## Get all python files path in given directory +def get_all_functions(path): + raw = [] + paths = [] + for root, dirs, files in os.walk(path): + for file in files: + if file.endswith(".py"): + if file not in SHOULD_BE_INCLUDED: + continue + path = os.path.join(root, file) + with open(path, "r", encoding="utf8") as f: + ## add file folder to system path + sys.path.append(root) + ## import module from path + raw.append(f.read()) + paths.append(path) + + return raw, paths + + +def search(query_embedding, corpus_embeddings, paths, raw, k=5, file_extension=None): + cos_scores = util.cos_sim(query_embedding, corpus_embeddings)[0] + top_results = torch.topk(cos_scores, k=min(k, len(cos_scores)), sorted=True) + out = [] + for score, idx in zip(top_results[0], top_results[1]): + out.extend([raw[idx], paths[idx], score]) + return out + + +class Operator: + """ """ + + def __init__(self): + ## TODO: Add a initialisation step + self.model = SentenceTransformer("BAAI/bge-large-en-v1.5") + self.encoding = [] + # file directory + path = os.path.dirname(os.path.abspath(__file__)) + + self.raw, self.path = get_all_functions(path) + # Encode all files + self.encoding = self.model.encode(self.raw) + + def on_event( + self, + dora_event, + send_output, + ) -> DoraStatus: + if dora_event["type"] == "INPUT": + if dora_event["id"] == "query": + values = dora_event["value"].to_pylist() + + query_embeddings = self.model.encode(values) + output = search( + query_embeddings, + self.encoding, + self.path, + self.raw, + ) + [raw, path, score] = output[0:3] + send_output( + "raw_file", + pa.array([{"raw": raw, "path": path, "user_message": values[0]}]), + dora_event["metadata"], + ) + else: + input = dora_event["value"][0].as_py() + index = self.path.index(input["path"]) + self.raw[index] = input["raw"] + self.encoding[index] = self.model.encode([input["raw"]])[0] + + return DoraStatus.CONTINUE + + +if __name__ == "__main__": + operator = Operator() diff --git a/examples/python-operator-dataflow/utils.py b/examples/python-operator-dataflow/utils.py index dabc915e1..4ec04ed09 100644 --- a/examples/python-operator-dataflow/utils.py +++ b/examples/python-operator-dataflow/utils.py @@ -1,5 +1,28 @@ +import os +import cv2 + + +def put_text(image, text, position, color): + cv2.putText( + image, + text, + position, + cv2.FONT_HERSHEY_SIMPLEX, + 0.45, + color, + 2, + 1, + ) + + +CI = os.environ.get("CI") + +CAMERA_WIDTH = 640 +CAMERA_HEIGHT = 480 + +FONT = cv2.FONT_HERSHEY_SIMPLEX LABELS = [ - "ABC", + "person", "bicycle", "car", "motorcycle", diff --git a/examples/python-operator-dataflow/webcam.py b/examples/python-operator-dataflow/webcam.py index 8b55db404..9867ad7c4 100755 --- a/examples/python-operator-dataflow/webcam.py +++ b/examples/python-operator-dataflow/webcam.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - import os import time @@ -13,6 +10,7 @@ CAMERA_WIDTH = 640 CAMERA_HEIGHT = 480 CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0)) +CI = os.environ.get("CI") font = cv2.FONT_HERSHEY_SIMPLEX @@ -27,6 +25,7 @@ def __init__(self): self.start_time = time.time() self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH) self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT) + self.failure_count = 0 def on_event( self, @@ -38,20 +37,22 @@ def on_event( ret, frame = self.video_capture.read() if ret: frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT)) - + self.failure_count = 0 ## Push an error image in case the camera is not available. else: - frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8) - cv2.putText( - frame, - "No Webcam was found at index %d" % (CAMERA_INDEX), - (int(30), int(30)), - font, - 0.75, - (255, 255, 255), - 2, - 1, - ) + if self.failure_count > 10: + frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8) + cv2.putText( + frame, + "No Webcam was found at index %d" % (CAMERA_INDEX), + (int(30), int(30)), + font, + 0.75, + (255, 255, 255), + 2, + 1, + ) + self.failure_count += 1 send_output( "image", @@ -63,7 +64,7 @@ def on_event( else: print("received unexpected event:", event_type) - if time.time() - self.start_time < 20: + if time.time() - self.start_time < 200 or CI != "true": return DoraStatus.CONTINUE else: return DoraStatus.STOP diff --git a/examples/python-operator-dataflow/whisper_op.py b/examples/python-operator-dataflow/whisper_op.py new file mode 100644 index 000000000..feab8b92e --- /dev/null +++ b/examples/python-operator-dataflow/whisper_op.py @@ -0,0 +1,25 @@ +import pyarrow as pa +import whisper + +from dora import DoraStatus + + +model = whisper.load_model("base") + + +class Operator: + """ + Transforming Speech to Text using OpenAI Whisper model + """ + + def on_event( + self, + dora_event, + send_output, + ) -> DoraStatus: + if dora_event["type"] == "INPUT": + audio = dora_event["value"].to_numpy() + audio = whisper.pad_or_trim(audio) + result = model.transcribe(audio, language="en") + send_output("text", pa.array([result["text"]]), dora_event["metadata"]) + return DoraStatus.CONTINUE