Skip to content

Commit

Permalink
Merge pull request #384 from dora-rs/tracing-send-output
Browse files Browse the repository at this point in the history
Trace send_output as it can be a big source of overhead for large messages
  • Loading branch information
haixuanTao authored Nov 22, 2023
2 parents c1993a9 + 135d643 commit 798734f
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,16 @@ mod callback_impl {
ZERO_COPY_THRESHOLD,
};
use dora_operator_api_python::pydict_to_metadata;
use dora_tracing::telemetry::deserialize_context;
use eyre::{eyre, Context, Result};
use pyo3::{
pymethods,
types::{PyBytes, PyDict},
PyObject, Python,
};
use tokio::sync::oneshot;
use tracing::{field, span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// Send an output from the operator:
/// - the first argument is the `output_id` as defined in your dataflow.
Expand All @@ -315,6 +318,20 @@ mod callback_impl {
metadata: Option<&PyDict>,
py: Python,
) -> Result<()> {
let parameters = pydict_to_metadata(metadata)
.wrap_err("failed to parse metadata")?
.into_owned();
let span = span!(
tracing::Level::TRACE,
"send_output",
output_id = field::Empty
);
span.record("output_id", output);

let cx = deserialize_context(&parameters.open_telemetry_context);
span.set_parent(cx);
let _ = span.enter();

let allocate_sample = |data_len| {
if data_len > ZERO_COPY_THRESHOLD {
let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -350,10 +367,6 @@ mod callback_impl {
eyre::bail!("invalid `data` type, must by `PyBytes` or arrow array")
};

let parameters = pydict_to_metadata(metadata)
.wrap_err("failed to parse metadata")?
.into_owned();

py.allow_threads(|| {
let event = OperatorEvent::Output {
output_id: output.to_owned().into(),
Expand Down

0 comments on commit 798734f

Please sign in to comment.