diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 5a1bbbe68..cc555aa5e 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -293,6 +293,7 @@ mod callback_impl { ZERO_COPY_THRESHOLD, }; use dora_operator_api_python::pydict_to_metadata; + use dora_tracing::telemetry::deserialize_context; use eyre::{eyre, Context, Result}; use pyo3::{ pymethods, @@ -300,6 +301,8 @@ mod callback_impl { PyObject, Python, }; use tokio::sync::oneshot; + use tracing::{field, span}; + use tracing_opentelemetry::OpenTelemetrySpanExt; /// Send an output from the operator: /// - the first argument is the `output_id` as defined in your dataflow. @@ -315,6 +318,20 @@ mod callback_impl { metadata: Option<&PyDict>, py: Python, ) -> Result<()> { + let parameters = pydict_to_metadata(metadata) + .wrap_err("failed to parse metadata")? + .into_owned(); + let span = span!( + tracing::Level::TRACE, + "send_output", + output_id = field::Empty + ); + span.record("output_id", output); + + let cx = deserialize_context(¶meters.open_telemetry_context); + span.set_parent(cx); + let _ = span.enter(); + let allocate_sample = |data_len| { if data_len > ZERO_COPY_THRESHOLD { let (tx, rx) = oneshot::channel(); @@ -350,10 +367,6 @@ mod callback_impl { eyre::bail!("invalid `data` type, must by `PyBytes` or arrow array") }; - let parameters = pydict_to_metadata(metadata) - .wrap_err("failed to parse metadata")? - .into_owned(); - py.allow_threads(|| { let event = OperatorEvent::Output { output_id: output.to_owned().into(),