diff --git a/examples/rerun-viewer/README.md b/examples/rerun-viewer/README.md new file mode 100644 index 000000000..69074bd78 --- /dev/null +++ b/examples/rerun-viewer/README.md @@ -0,0 +1,25 @@ +# Python Dataflow Example + +This examples shows how to create and connect dora operators and custom nodes in Python. + +## Overview + +The [`dataflow.yml`](./dataflow.yml) defines a simple dataflow graph with the following three nodes: + +- a webcam node, that connects to your webcam and feed the dataflow with webcam frame as jpeg compressed bytearray. +- an object detection node, that apply Yolo v5 on the webcam image. The model is imported from Pytorch Hub. The output is the bouding box of each object detected, the confidence and the class. You can have more info here: https://pytorch.org/hub/ultralytics_yolov5/ +- a window plotting node, that will retrieve the webcam image and the Yolov5 bounding box and join the two together. + +## Getting started + +```bash +cargo run --example python-dataflow +``` + +## Run the dataflow as a standalone + +- Start the `dora-daemon`: + +``` +../../target/release/dora-daemon --run-dataflow dataflow.yml +``` diff --git a/examples/rerun-viewer/dataflow.yml b/examples/rerun-viewer/dataflow.yml new file mode 100644 index 000000000..1b9a1c40e --- /dev/null +++ b/examples/rerun-viewer/dataflow.yml @@ -0,0 +1,35 @@ +nodes: + - id: webcam + custom: + source: ./webcam.py + inputs: + tick: + source: dora/timer/millis/10 + queue_size: 1000 + outputs: + - image + - text + envs: + IMAGE_WIDTH: 960 + IMAGE_HEIGHT: 540 + + + - id: rerun + custom: + source: dora-rerun + inputs: + image: webcam/image + text: webcam/text + envs: + IMAGE_WIDTH: 540 + IMAGE_HEIGHT: 960 + IMAGE_DEPTH: 3 + + - id: matplotlib + custom: + source: ./plot.py + inputs: + image: webcam/image + envs: + IMAGE_WIDTH: 960 + IMAGE_HEIGHT: 540 \ No newline at end of file diff --git a/examples/rerun-viewer/plot.py b/examples/rerun-viewer/plot.py new file mode 100755 index 000000000..649fc82c9 --- /dev/null +++ b/examples/rerun-viewer/plot.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +from dora import Node +from dora import DoraStatus + +import cv2 +import numpy as np + +CI = os.environ.get("CI") + +font = cv2.FONT_HERSHEY_SIMPLEX + +IMAGE_WIDTH = int(os.getenv("IMAGE_WIDTH", 960)) +IMAGE_HEIGHT = int(os.getenv("IMAGE_HEIGHT", 540)) + + +class Plotter: + """ + Plot image and bounding box + """ + + def __init__(self): + self.image = [] + self.bboxs = [] + + def on_input( + self, + dora_input, + ) -> DoraStatus: + """ + Put image and bounding box on cv2 window. + + Args: + dora_input["id"] (str): Id of the dora_input declared in the yaml configuration + dora_input["value"] (arrow array): message of the dora_input + """ + if dora_input["id"] == "image": + image = ( + dora_input["value"].to_numpy().reshape((IMAGE_HEIGHT, IMAGE_WIDTH, 3)) + ) + + image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) + self.image = image + + elif dora_input["id"] == "bbox" and len(self.image) != 0: + bboxs = dora_input["value"].to_numpy() + self.bboxs = np.reshape(bboxs, (-1, 6)) + for bbox in self.bboxs: + [ + min_x, + min_y, + max_x, + max_y, + confidence, + label, + ] = bbox + cv2.rectangle( + self.image, + (int(min_x), int(min_y)), + (int(max_x), int(max_y)), + (0, 255, 0), + 2, + ) + + if CI != "true": + cv2.imshow("frame", self.image) + if cv2.waitKey(1) & 0xFF == ord("q"): + return DoraStatus.STOP + + return DoraStatus.CONTINUE + + +plotter = Plotter() +node = Node() + +for event in node: + event_type = event["type"] + if event_type == "INPUT": + status = plotter.on_input(event) + if status == DoraStatus.CONTINUE: + pass + elif status == DoraStatus.STOP: + print("plotter returned stop status") + break + elif event_type == "STOP": + print("received stop") + else: + print("received unexpected event:", event_type) diff --git a/examples/rerun-viewer/run.rs b/examples/rerun-viewer/run.rs new file mode 100644 index 000000000..a14b553f0 --- /dev/null +++ b/examples/rerun-viewer/run.rs @@ -0,0 +1,102 @@ +use dora_core::{get_pip_path, get_python_path, run}; +use dora_download::download_file; +use dora_tracing::set_up_tracing; +use eyre::{bail, ContextCompat, WrapErr}; +use std::path::Path; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + set_up_tracing("python-dataflow-runner")?; + + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + run( + get_python_path().context("Could not get python binary")?, + &["-m", "venv", "../.env"], + None, + ) + .await + .context("failed to create venv")?; + let venv = &root.join("examples").join(".env"); + std::env::set_var( + "VIRTUAL_ENV", + venv.to_str().context("venv path not valid unicode")?, + ); + let orig_path = std::env::var("PATH")?; + // bin folder is named Scripts on windows. + // 🤦‍♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 + let venv_bin = if cfg!(windows) { + venv.join("Scripts") + } else { + venv.join("bin") + }; + + if cfg!(windows) { + std::env::set_var( + "PATH", + format!( + "{};{orig_path}", + venv_bin.to_str().context("venv path not valid unicode")? + ), + ); + } else { + std::env::set_var( + "PATH", + format!( + "{}:{orig_path}", + venv_bin.to_str().context("venv path not valid unicode")? + ), + ); + } + + run( + get_python_path().context("Could not get pip binary")?, + &["-m", "pip", "install", "--upgrade", "pip"], + None, + ) + .await + .context("failed to install pip")?; + run( + get_pip_path().context("Could not get pip binary")?, + &["install", "-r", "requirements.txt"], + None, + ) + .await + .context("pip install failed")?; + + run( + "maturin", + &["develop"], + Some(&root.join("apis").join("python").join("node")), + ) + .await + .context("maturin develop failed")?; + download_file( + "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt", + Path::new("yolov8n.pt"), + ) + .await + .context("Could not download weights.")?; + + let dataflow = Path::new("dataflow.yml"); + run_dataflow(dataflow).await?; + + Ok(()) +} + +async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--") + .arg("daemon") + .arg("--run-dataflow") + .arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to run dataflow"); + }; + Ok(()) +} diff --git a/examples/rerun-viewer/webcam.py b/examples/rerun-viewer/webcam.py new file mode 100755 index 000000000..f85901503 --- /dev/null +++ b/examples/rerun-viewer/webcam.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import time +import numpy as np +import cv2 + +from dora import Node +import pyarrow as pa + +node = Node() + +IMAGE_INDEX = int(os.getenv("IMAGE_INDEX", 0)) +IMAGE_WIDTH = int(os.getenv("IMAGE_WIDTH", 960)) +IMAGE_HEIGHT = int(os.getenv("IMAGE_HEIGHT", 540)) +video_capture = cv2.VideoCapture(IMAGE_INDEX) +video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, IMAGE_WIDTH) +video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, IMAGE_HEIGHT) +font = cv2.FONT_HERSHEY_SIMPLEX + +start = time.time() + +# Run for 20 seconds +while time.time() - start < 60: + # Wait next dora_input + event = node.next() + event_type = event["type"] + if event_type == "INPUT": + ret, frame = video_capture.read() + if not ret: + frame = np.zeros((IMAGE_HEIGHT, IMAGE_WIDTH, 3), dtype=np.uint8) + cv2.putText( + frame, + "No Webcam was found at index %d" % (IMAGE_INDEX), + (int(30), int(30)), + font, + 0.75, + (255, 255, 255), + 2, + 1, + ) + if len(frame) != IMAGE_HEIGHT * IMAGE_WIDTH * 3: + print("frame size is not correct") + frame = cv2.resize(frame, (IMAGE_WIDTH, IMAGE_HEIGHT)) + + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + node.send_output( + "image", + pa.array(frame.ravel()), + event["metadata"], + ) + node.send_output("text", pa.array([f"send image at: {time.time()}"])) diff --git a/libraries/extensions/dora-rerun/Cargo.toml b/libraries/extensions/dora-rerun/Cargo.toml new file mode 100644 index 000000000..22aab73ad --- /dev/null +++ b/libraries/extensions/dora-rerun/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "dora-rerun" +version.workspace = true +edition = "2021" +documentation.workspace = true +description.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" +tokio = { version = "1.36.0", features = ["rt"] } +rerun = { version = "0.15.1", features = ["web_viewer", "image"] } diff --git a/libraries/extensions/dora-rerun/README.md b/libraries/extensions/dora-rerun/README.md new file mode 100644 index 000000000..523757844 --- /dev/null +++ b/libraries/extensions/dora-rerun/README.md @@ -0,0 +1,13 @@ +# dora-rerun + +dora visualization using `rerun` + +## Getting Started + +```bash +cargo install --force rerun-cli@0.15.1 + +## To install this package +git clone git@github.com:dora-rs/dora.git +cargo install --git https://github.com/dora-rs/dora dora-rerun +``` diff --git a/libraries/extensions/dora-rerun/src/main.rs b/libraries/extensions/dora-rerun/src/main.rs new file mode 100644 index 000000000..b860d5e39 --- /dev/null +++ b/libraries/extensions/dora-rerun/src/main.rs @@ -0,0 +1,86 @@ +//! Demonstrates the most barebone usage of the Rerun SDK. + +use dora_node_api::{ + arrow::{ + array::{PrimitiveArray, StringArray, UInt8Array}, + datatypes::{UInt8Type, Utf8Type}, + ipc::Utf8, + }, + DoraNode, Event, +}; +use eyre::{Context, Result}; +use rerun::{ + external::{arrow2::array::Utf8Array, re_types::ArrowBuffer}, + TensorBuffer, TensorData, TensorDimension, +}; + +fn main() -> Result<()> { + // `serve()` requires to have a running Tokio runtime in the current context. + let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime"); + let _guard = rt.enter(); + + let (_node, mut events) = + DoraNode::init_from_env().context("Could not initialize dora node")?; + + let rec = rerun::RecordingStreamBuilder::new("dora-rerun") + .spawn() + .context("Could not spawn rerun visualization")?; + + let shape = vec![ + TensorDimension { + name: Some("width".into()), + size: std::env::var("IMAGE_WIDTH") + .context("Could not read image width")? + .parse() + .context("Could not parse value of image width env variable")?, + }, + TensorDimension { + name: Some("height".into()), + size: std::env::var("IMAGE_HEIGHT") + .context("Could not read image height")? + .parse() + .context("Could not parse value of image height env variable")?, + }, + TensorDimension { + name: Some("depth".into()), + size: std::env::var("IMAGE_DEPTH") + .context("Could not read image depth")? + .parse() + .context("Could not parse value of image depth env variable")?, + }, + ]; + + while let Some(event) = events.recv() { + match event { + Event::Input { + id, + data, + metadata: _, + } => { + if id.as_str().contains("image") { + let buffer: UInt8Array = data.to_data().into(); + let buffer: &[u8] = buffer.values(); + let buffer = TensorBuffer::U8(ArrowBuffer::from(buffer)); + let tensordata = TensorData::new(shape.clone(), buffer.into()); + let image = rerun::Image::new(tensordata); + + rec.log(id.as_str(), &image) + .context("could not log image")?; + } else if id.as_str().contains("text") { + let buffer: StringArray = data.to_data().into(); + buffer.iter().try_for_each(|string| -> Result<()> { + if let Some(str) = string { + rec.log(id.as_str(), &rerun::TextLog::new(str)) + .wrap_err("Could not log text") + } else { + Ok(()) + } + })?; + } + } + _ => {} + } + } + + Ok(()) +}