Skip to content

Commit

Permalink
Introduce new MemoryRecording for use with Jupyter notebooks (#1834)
Browse files Browse the repository at this point in the history
* Introduce new MemoryRecording for use with Jupyter notebooks
  • Loading branch information
jleibs authored Apr 13, 2023
1 parent 4e3d975 commit 19182a7
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 103 deletions.
4 changes: 3 additions & 1 deletion crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ pub mod demo_util;
/// This is how you select whether the log stream ends up
/// sent over TCP, written to file, etc.
pub mod sink {
pub use crate::log_sink::{disabled, BufferedSink, LogSink, TcpSink};
pub use crate::log_sink::{
disabled, BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink,
};

#[cfg(not(target_arch = "wasm32"))]
pub use re_log_encoding::{FileSink, FileSinkError};
Expand Down
47 changes: 47 additions & 0 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,53 @@ impl LogSink for BufferedSink {
}
}

/// Store log messages directly in memory
///
/// Although very similar to `BufferedSink` this sink is a real-endpoint. When creating
/// a new sink the logged messages stay with the `MemorySink` (`drain_backlog` does nothing).
///
/// Additionally the raw storage can be accessed and used to create an in-memory RRD.
/// This is useful for things like the inline rrd-viewer in Jupyter notebooks.
#[derive(Default)]
pub struct MemorySink(MemorySinkStorage);

impl MemorySink {
/// Access the raw `MemorySinkStorage`
pub fn buffer(&self) -> MemorySinkStorage {
self.0.clone()
}
}

impl LogSink for MemorySink {
fn send(&self, msg: LogMsg) {
self.0.lock().push(msg);
}

fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.lock().append(&mut messages);
}
}

/// The storage used by [`MemorySink`]
#[derive(Default, Clone)]
pub struct MemorySinkStorage(std::sync::Arc<parking_lot::Mutex<Vec<LogMsg>>>);

///
impl MemorySinkStorage {
/// Lock the contained buffer
fn lock(&self) -> parking_lot::MutexGuard<'_, Vec<LogMsg>> {
self.0.lock()
}

/// Convert the stored messages into an in-memory Rerun log file
pub fn rrd_as_bytes(&self) -> Result<Vec<u8>, re_log_encoding::encoder::EncodeError> {
let messages = self.lock();
let mut buffer = std::io::Cursor::new(Vec::new());
re_log_encoding::encoder::encode(messages.iter(), &mut buffer)?;
Ok(buffer.into_inner())
}
}

// ----------------------------------------------------------------------------

/// Stream log messages to a Rerun TCP server.
Expand Down
25 changes: 18 additions & 7 deletions examples/python/notebook/cube.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
"\n",
"import numpy as np\n",
"\n",
"import rerun as rr"
"import rerun as rr\n",
"\n",
"rr.init(\"cube\")\n",
"\n",
"# Uncomment to use locally-hosted assets. Necessary for local-builds or offline setups.\n",
"# Won't work for remotely hosted notebook environments.\n",
"\n",
"# rr.self_host_assets()"
]
},
{
Expand Down Expand Up @@ -93,25 +100,29 @@
"metadata": {},
"outputs": [],
"source": [
"rr.init(\"cube\")\n",
"rec = rr.memory_recording()\n",
"\n",
"STEPS = 100\n",
"twists = math.pi * np.sin(np.linspace(0, math.tau, STEPS)) / 4\n",
"for t in range(STEPS):\n",
" rr.set_time_sequence(\"step\", t)\n",
" cube = build_color_grid(10, 10, 10, twist=twists[t])\n",
" rr.log_points(\"cube\", positions=cube.positions, colors=cube.colors, radii=0.5)\n",
" \n",
"rr.inline_show()"
"\n",
"rec"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "51f695a1",
"metadata": {},
"id": "1a1b0f66-4287-4705-8be5-ae837ffe3f90",
"metadata": {
"tags": []
},
"outputs": [],
"source": []
"source": [
"rec.show(width=400, height=400)"
]
}
],
"metadata": {
Expand Down
17 changes: 15 additions & 2 deletions rerun_py/rerun_sdk/rerun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from rerun.log.tensor import log_tensor
from rerun.log.text import LoggingHandler, LogLevel, log_text_entry
from rerun.log.transform import log_rigid3, log_unknown_transform, log_view_coordinates
from rerun.notebook import inline_show
from rerun.recording import MemoryRecording
from rerun.script_helpers import script_add_args, script_setup, script_teardown

__all__ = [
Expand Down Expand Up @@ -402,12 +402,25 @@ def save(path: str) -> None:
"""

if not bindings.is_enabled():
print("Rerun is disabled - serve() call ignored")
print("Rerun is disabled - save() call ignored")
return

bindings.save(path)


def memory_recording() -> MemoryRecording:
"""
Streams all log-data to a memory buffer.
Returns
-------
MemoryRecording
A memory recording object that can be used to read the data.
"""

return MemoryRecording(bindings.memory_recording())


def set_time_sequence(timeline: str, sequence: Optional[int]) -> None:
"""
Set the current time for this thread as an integer sequence.
Expand Down
77 changes: 0 additions & 77 deletions rerun_py/rerun_sdk/rerun/notebook.py

This file was deleted.

90 changes: 90 additions & 0 deletions rerun_py/rerun_sdk/rerun/recording.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Helper functions for displaying Rerun in a Jupyter notebook."""

import base64
import logging
import random
import string
from typing import Any, Optional

from rerun import bindings


class MemoryRecording:
def __init__(self, storage: bindings.PyMemorySinkStorage) -> None:
self.storage = storage

def as_html(
self, width: int = 950, height: int = 712, app_location: Optional[str] = None, timeout_ms: int = 2000
) -> str:
"""
Show the Rerun viewer in a Jupyter notebook.
Parameters
----------
width : int
The width of the viewer in pixels.
height : int
The height of the viewer in pixels.
app_location : str
The location of the Rerun web viewer.
timeout_ms : int
The number of milliseconds to wait for the Rerun web viewer to load.
"""

if app_location is None:
app_location = bindings.get_app_url()

random_string = "".join(random.choice(string.ascii_letters) for i in range(6))

base64_data = base64.b64encode(self.storage.get_rrd_as_bytes()).decode("utf-8")

html_template = f"""
<div id="{random_string}_rrd" style="display: none;">{base64_data}</div>
<div id="{random_string}_error" style="display: none;"><p>Timed out waiting for {app_location} to load.</p>
<p>Consider using <code>rr.self_host_assets()</code></p></div>
<script>
timeout_{random_string} = setTimeout(() => {{
document.getElementById("{random_string}_error").style.display = 'block';
}}, {timeout_ms});
window.addEventListener("message", function(rrd) {{
return async function onIframeReady_{random_string}(event) {{
var iframe = document.getElementById("{random_string}");
if (event.source === iframe.contentWindow) {{
clearTimeout(timeout_{random_string});
document.getElementById("{random_string}_error").style.display = 'none';
iframe.style.display = 'inline';
window.removeEventListener("message", onIframeReady_{random_string});
iframe.contentWindow.postMessage((await rrd), "*");
}}
}}
}}(async function() {{
await new Promise(r => setTimeout(r, 0));
var div = document.getElementById("{random_string}_rrd");
var base64Data = div.textContent;
var intermediate = atob(base64Data);
var buff = new Uint8Array(intermediate.length);
for (var i = 0; i < intermediate.length; i++) {{
buff[i] = intermediate.charCodeAt(i);
}}
return buff;
}}()));
</script>
<iframe id="{random_string}" width="{width}" height="{height}" src="{app_location}?url=web_event://&persist=0"
frameborder="0" style="display: none;" allowfullscreen=""></iframe>
"""

return html_template

def show(self, **kwargs: Any) -> Any:
html = self.as_html(**kwargs)
try:
from IPython.core.display import HTML

return HTML(html) # type: ignore[no-untyped-call]
except ImportError:
logging.warning("Could not import IPython.core.display. Returning raw HTML string instead.")
return html

def _repr_html_(self) -> Any:
return self.as_html()
26 changes: 19 additions & 7 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use pyo3::{
use re_log_types::{ArrowMsg, DataRow, DataTableError};
use rerun::{
log::{PathOp, RowId},
sink::MemorySinkStorage,
time::{Time, TimeInt, TimePoint, TimeType, Timeline},
ApplicationId, EntityPath, RecordingId,
};
Expand Down Expand Up @@ -114,6 +115,7 @@ fn rerun_bindings(py: Python<'_>, m: &PyModule) -> PyResult<()> {
// TODO(jleibs): Refactor import logic so all we need is main
m.add_function(wrap_pyfunction!(get_registered_component_names, m)?)?;
m.add_class::<TensorDataMeaning>()?;
m.add_class::<PyMemorySinkStorage>()?;

// If this is a special RERUN_APP_ONLY context (launched via .spawn), we
// can bypass everything else, which keeps us from preparing an SDK session
Expand All @@ -134,10 +136,10 @@ fn rerun_bindings(py: Python<'_>, m: &PyModule) -> PyResult<()> {

m.add_function(wrap_pyfunction!(connect, m)?)?;
m.add_function(wrap_pyfunction!(disconnect, m)?)?;
m.add_function(wrap_pyfunction!(dump_rrd_as_bytes, m)?)?;
m.add_function(wrap_pyfunction!(get_app_url, m)?)?;
m.add_function(wrap_pyfunction!(init, m)?)?;
m.add_function(wrap_pyfunction!(is_enabled, m)?)?;
m.add_function(wrap_pyfunction!(memory_recording, m)?)?;
m.add_function(wrap_pyfunction!(save, m)?)?;
m.add_function(wrap_pyfunction!(self_host_assets, m)?)?;
m.add_function(wrap_pyfunction!(serve, m)?)?;
Expand Down Expand Up @@ -394,14 +396,24 @@ fn save(path: &str) -> PyResult<()> {
.map_err(|err| PyRuntimeError::new_err(err.to_string()))
}

/// Drain all pending messages and return them as an in-memory rrd
#[pyclass]
struct PyMemorySinkStorage(MemorySinkStorage);

#[pymethods]
impl PyMemorySinkStorage {
fn get_rrd_as_bytes<'p>(&self, py: Python<'p>) -> PyResult<&'p PyBytes> {
self.0
.rrd_as_bytes()
.map(|bytes| PyBytes::new(py, bytes.as_slice()))
.map_err(|err| PyRuntimeError::new_err(err.to_string()))
}
}

/// Create an in-memory rrd file
#[pyfunction]
fn dump_rrd_as_bytes(py: Python<'_>) -> PyResult<&PyBytes> {
fn memory_recording() -> PyMemorySinkStorage {
let mut session = python_session();
session
.dump_rrd_as_bytes()
.map(|bytes| PyBytes::new(py, bytes.as_slice()))
.map_err(|err| PyRuntimeError::new_err(err.to_string()))
PyMemorySinkStorage(session.memory_recording())
}

// ----------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 19182a7

Please sign in to comment.