diff --git a/docs/user_guide/trace.md b/docs/user_guide/trace.md index 1188a10edf..473ce8075c 100644 --- a/docs/user_guide/trace.md +++ b/docs/user_guide/trace.md @@ -430,6 +430,7 @@ The meaning of the trace timestamps is: ## OpenTelemetry trace support Triton provides an option to generate and export traces +for standalone and ensemble models using [OpenTelemetry APIs and SDKs](https://opentelemetry.io/). To specify OpenTelemetry mode for tracing, specify the `--trace-config` diff --git a/qa/L0_trace/opentelemetry_unittest.py b/qa/L0_trace/opentelemetry_unittest.py new file mode 100644 index 0000000000..1aef6aefea --- /dev/null +++ b/qa/L0_trace/opentelemetry_unittest.py @@ -0,0 +1,224 @@ +# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import sys + +sys.path.append("../common") +import json +import unittest +import tritonclient.http as httpclient +import tritonclient.grpc as grpcclient +import numpy as np +import test_util as tu +import time + +EXPECTED_NUM_SPANS = 10 + +class OpenTelemetryTest(tu.TestResultCollector): + + def setUp(self): + while True: + with open('trace_collector.log', 'rt') as f: + data = f.read() + if data.count("resource_spans") != EXPECTED_NUM_SPANS: + time.sleep(5) + continue + else: + break + + data = data.split('\n') + full_spans = [entry.split('POST')[0] for entry in data if "resource_spans" in entry] + self.spans = [] + for span in full_spans: + span = json.loads(span) + self.spans.append( + span["resource_spans"][0]['scope_spans'][0]['spans'][0]) + + self.simple_model_name = "simple" + self.ensemble_model_name = "ensemble_add_sub_int32_int32_int32" + self.root_span = "InferRequest" + + def _check_events(self, span_name, events): + root_events_http =\ + ["HTTP_RECV_START", + "HTTP_RECV_END", + "INFER_RESPONSE_COMPLETE", + "HTTP_SEND_START", + "HTTP_SEND_END"] + root_events_grpc =\ + ["GRPC_WAITREAD_START", + "GRPC_WAITREAD_END", + "INFER_RESPONSE_COMPLETE", + "GRPC_SEND_START", + "GRPC_SEND_END"] + request_events =\ + ["REQUEST_START", + "QUEUE_START", + "REQUEST_END"] + compute_events =\ + ["COMPUTE_START", + "COMPUTE_INPUT_END", + "COMPUTE_OUTPUT_START", + "COMPUTE_END"] + + if span_name == "compute": + # Check that all compute related events (and only them) + # are recorded in compute span + self.assertTrue(all(entry in events for entry in compute_events)) + self.assertFalse(all(entry in events for entry in request_events)) + self.assertFalse( + all(entry in events + for entry in root_events_http + root_events_grpc)) + + elif span_name == self.root_span: + # Check that root span has INFER_RESPONSE_COMPLETE, _RECV/_WAITREAD + # and _SEND events (and only them) + if "HTTP" in events: + self.assertTrue( + all(entry in events for entry in root_events_http)) + self.assertFalse( + all(entry in events for entry in root_events_grpc)) + + elif "GRPC" in events: + self.assertTrue( + all(entry in events for entry in root_events_grpc)) + self.assertFalse( + all(entry in events for entry in root_events_http)) + self.assertFalse( + all(entry in events for entry in request_events)) + self.assertFalse( + all(entry in events for entry in compute_events)) + + elif span_name == self.simple_model_name: + # Check that all request related events (and only them) + # are recorded in request span + self.assertTrue(all(entry in events for entry in request_events)) + self.assertFalse( + all(entry in events + for entry in root_events_http + root_events_grpc)) + self.assertFalse(all(entry in events for entry in compute_events)) + + def _check_parent(self, child_span, parent_span): + # Check that child and parent span have the same trace_id + # and child's `parent_span_id` is the same as parent's `span_id` + self.assertEqual(child_span['trace_id'], parent_span['trace_id']) + self.assertIn( + 'parent_span_id', + child_span, + "child span does not have parent span id specified") + self.assertEqual(child_span['parent_span_id'], parent_span['span_id']) + + def test_spans(self): + parsed_spans = [] + + # Check that collected spans have proper events recorded + for span in self.spans: + span_name = span['name'] + self._check_events(span_name, json.dumps(span['events'])) + parsed_spans.append(span_name) + + # There should be 6 spans in total: + # 3 for http request, 3 for grpc request, 4 for ensemble + self.assertEqual(len(self.spans), 10) + # We should have 3 compute spans + self.assertEqual(parsed_spans.count("compute"), 3) + # 4 request spans (3 named simple - same as our model name, 1 ensemble) + self.assertEqual(parsed_spans.count(self.simple_model_name), 3) + self.assertEqual(parsed_spans.count(self.ensemble_model_name), 1) + # 3 root spans + self.assertEqual(parsed_spans.count(self.root_span), 3) + + def test_nested_spans(self): + + # First 3 spans in `self.spans` belong to HTTP request + # They are recorded in the following order: + # compute_span [idx 0] , request_span [idx 1], root_span [idx 2]. + # compute_span should be a child of request_span + # request_span should be a child of root_span + for child, parent in zip(self.spans[:3], self.spans[1:3]): + self._check_parent(child, parent) + + # root_span should not have `parent_span_id` field + self.assertNotIn( + 'parent_span_id', + self.spans[2], + "root span has a parent_span_id specified") + + # Next 3 spans in `self.spans` belong to GRPC request + # Order of spans and their relationship described earlier + for child, parent in zip(self.spans[3:6], self.spans[4:6]): + self._check_parent(child, parent) + + # root_span should not have `parent_span_id` field + self.assertNotIn( + 'parent_span_id', + self.spans[5], + "root span has a parent_span_id specified") + + # Final 4 spans in `self.spans` belong to ensemble request + # Order of spans: compute span - request span - request span - root span + for child, parent in zip(self.spans[6:10], self.spans[7:10]): + self._check_parent(child, parent) + + # root_span should not have `parent_span_id` field + self.assertNotIn( + 'parent_span_id', + self.spans[9], + "root span has a parent_span_id specified") + +def prepare_data(client): + + inputs = [] + input0_data = np.full(shape=(1, 16), fill_value=-1, dtype=np.int32) + input1_data = np.full(shape=(1, 16), fill_value=-1, dtype=np.int32) + + inputs.append(client.InferInput('INPUT0', [1, 16], "INT32")) + inputs.append(client.InferInput('INPUT1', [1, 16], "INT32")) + + # Initialize the data + inputs[0].set_data_from_numpy(input0_data) + inputs[1].set_data_from_numpy(input1_data) + + return inputs + +def prepare_traces(): + + triton_client_http = httpclient.InferenceServerClient("localhost:8000", + verbose=True) + triton_client_grpc = grpcclient.InferenceServerClient("localhost:8001", + verbose=True) + inputs = prepare_data(httpclient) + triton_client_http.infer("simple",inputs) + + inputs = prepare_data(grpcclient) + triton_client_grpc.infer("simple", inputs) + + inputs = prepare_data(httpclient) + triton_client_http.infer("ensemble_add_sub_int32_int32_int32", inputs) + + +if __name__ == '__main__': + unittest.main() diff --git a/qa/L0_trace/test.sh b/qa/L0_trace/test.sh index 1cef8eb288..cf81a1a1ec 100755 --- a/qa/L0_trace/test.sh +++ b/qa/L0_trace/test.sh @@ -66,14 +66,19 @@ rm -f *.log rm -fr $MODELSDIR && mkdir -p $MODELSDIR # set up simple and global_simple model using MODELBASE -rm -fr $MODELSDIR && mkdir -p $MODELSDIR && \ - cp -r $DATADIR/$MODELBASE $MODELSDIR/simple && \ +cp -r $DATADIR/$MODELBASE $MODELSDIR/simple && \ rm -r $MODELSDIR/simple/2 && rm -r $MODELSDIR/simple/3 && \ (cd $MODELSDIR/simple && \ sed -i "s/^name:.*/name: \"simple\"/" config.pbtxt) && \ cp -r $MODELSDIR/simple $MODELSDIR/global_simple && \ (cd $MODELSDIR/global_simple && \ sed -i "s/^name:.*/name: \"global_simple\"/" config.pbtxt) && \ + cp -r $ENSEMBLEDIR/simple_onnx_int32_int32_int32 $MODELSDIR/ensemble_add_sub_int32_int32_int32 && \ + rm -r $MODELSDIR/ensemble_add_sub_int32_int32_int32/2 && \ + rm -r $MODELSDIR/ensemble_add_sub_int32_int32_int32/3 && \ + (cd $MODELSDIR/ensemble_add_sub_int32_int32_int32 && \ + sed -i "s/^name:.*/name: \"ensemble_add_sub_int32_int32_int32\"/" config.pbtxt && \ + sed -i "s/model_name:.*/model_name: \"simple\"/" config.pbtxt) RET=0 @@ -653,14 +658,29 @@ set -e kill $SERVER_PID wait $SERVER_PID +set +e + # Check opentelemetry trace exporter sends proper info. # A helper python script starts listenning on $OTLP_PORT, where -# OTLP exporter sends traces. It then check that received data contains -# expected entries +# OTLP exporter sends traces. +# Unittests then check that produced spans have expected format and events # FIXME: Redesign this test to remove time sensitivity -SERVER_ARGS="--trace-config=triton,file=some_file.log --trace-config=level=TIMESTAMPS \ - --trace-config=rate=1 --trace-config=count=6 --trace-config=mode=opentelemetry --trace-config=opentelemetry,url=localhost:$OTLP_PORT --model-repository=$MODELSDIR" + +OPENTELEMETRY_TEST=opentelemetry_unittest.py +OPENTELEMETRY_LOG="opentelemetry_unittest.log" +EXPECTED_NUM_TESTS="2" + +SERVER_ARGS="--trace-config=level=TIMESTAMPS --trace-config=rate=1 \ + --trace-config=count=100 --trace-config=mode=opentelemetry \ + --trace-config=opentelemetry,url=localhost:$OTLP_PORT \ + --model-repository=$MODELSDIR" SERVER_LOG="./inference_server_trace_config.log" + +# Increasing OTLP timeout, since we don't use a valid OTLP collector +# and don't send a proper signal back. +export OTEL_EXPORTER_OTLP_TIMEOUT=50000 +export OTEL_EXPORTER_OTLP_TRACES_TIMEOUT=50000 + run_server if [ "$SERVER_PID" == "0" ]; then echo -e "\n***\n*** Failed to start $SERVER\n***" @@ -668,67 +688,38 @@ if [ "$SERVER_PID" == "0" ]; then exit 1 fi -# This is a simple python code that opens port -python $TRACE_COLLECTOR $OTLP_PORT $TRACE_COLLECTOR_LOG & -COLLECTOR_PID=$! +# Using netcat as trace collector +apt-get update && apt-get install -y netcat +nc -l -k 127.0.0.1 $OTLP_PORT >> $TRACE_COLLECTOR_LOG 2>&1 & COLLECTOR_PID=$! set +e - -# To make sure receiver is ready log gets all data -sleep 3 - -# Send http request and collect trace -$SIMPLE_HTTP_CLIENT >> client_update.log 2>&1 -if [ $? -ne 0 ]; then - cat client_update.log - RET=1 -fi - -# Send grpc request and collect trace -$SIMPLE_GRPC_CLIENT >> client_update.log 2>&1 +# Preparing traces for unittest. +# Note: need to run this separately, to speed up trace collection. +# Otherwise internal (opentelemetry_unittest.OpenTelemetryTest.setUp) check +# will slow down collection. +python -c 'import opentelemetry_unittest; \ + opentelemetry_unittest.prepare_traces()' >>$CLIENT_LOG 2>&1 + +# Unittest will not start untill expected number of spans is collected. +python $OPENTELEMETRY_TEST >>$OPENTELEMETRY_LOG 2>&1 if [ $? -ne 0 ]; then - cat client_update.log + cat $OPENTELEMETRY_LOG RET=1 +else + check_test_results $TEST_RESULT_FILE $EXPECTED_NUM_TESTS + if [ $? -ne 0 ]; then + cat $OPENTELEMETRY_LOG + echo -e "\n***\n*** Test Result Verification Failed\n***" + RET=1 + fi fi -# To make sure log gets all data -sleep 3 kill $COLLECTOR_PID wait $COLLECTOR_PID -EXPECTED_ENTRIES=${EXPECTED_ENTRIES:="REQUEST_START QUEUE_START INFER_RESPONSE_COMPLETE COMPUTE_START COMPUTE_INPUT_END COMPUTE_OUTPUT_START COMPUTE_END REQUEST_END"} -HTTP_ENTRIES=${HTTP_ENTRIES:="HTTP_RECV_START HTTP_RECV_END HTTP_SEND_START HTTP_SEND_END"} -GRPC_ENTRIES=${GRPC_ENTRIES:="GRPC_WAITREAD_START GRPC_WAITREAD_END GRPC_SEND_START GRPC_SEND_END"} - -for ENTRY in $EXPECTED_ENTRIES; do - if [ `grep -c $ENTRY $TRACE_COLLECTOR_LOG` != "2" ]; then - RET=1 - fi -done - -for ENTRY in $HTTP_ENTRIES; do - if [ `grep -c $ENTRY $TRACE_COLLECTOR_LOG` != "1" ]; then - RET=1 - fi -done - -for ENTRY in $GRPC_ENTRIES; do - if [ `grep -c $ENTRY $TRACE_COLLECTOR_LOG` != "1" ]; then - RET=1 - fi -done - -if [ $RET -eq 0 ]; then - echo -e "\n***\n*** Test Passed\n***" -else - echo -e "\n***\n*** Test FAILED\n***" -fi - set -e kill $SERVER_PID wait $SERVER_PID -set +e - exit $RET diff --git a/qa/L0_trace/trace_collector.py b/qa/L0_trace/trace_collector.py deleted file mode 100644 index ddfdfa7853..0000000000 --- a/qa/L0_trace/trace_collector.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/python - -# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# * Neither the name of NVIDIA CORPORATION nor the names of its -# contributors may be used to endorse or promote products derived -# from this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY -# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY -# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import socket -import sys - -if __name__ == "__main__": - """ - This script is intended to be a mock opentelemetry trace collector. - It sets up a “listening” socket on provided port and receives data. - It is intended to be used with small traces (under 4096 bytes). - After trace is received, it is printed into the log file. - - Port and log file path can be provided with command line arguments: - - python trace_collector.py 10000 my.log - - By default, port is set to 10000 and file_path to "trace_collector.log" - - NOTE: It does not support OpenTelemetry protocol and is not intended to - support OTLP, use for validating exported tests only. - """ - - port = 1000 if sys.argv[1] is None else int(sys.argv[1]) - file_path = "trace_collector.log" if sys.argv[2] is None else sys.argv[2] - - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server_address = ('localhost', port) - sock.bind(server_address) - sock.listen(1) - - while True: - trace = '' - connection, client_address = sock.accept() - with connection: - with open(file_path, "a") as sys.stdout: - chunk = connection.recv(4096) - if not chunk: - break - connection.sendall(chunk) - trace = chunk.decode() - print(trace) diff --git a/src/tracer.cc b/src/tracer.cc index 26750fc3b5..c33aeb546e 100644 --- a/src/tracer.cc +++ b/src/tracer.cc @@ -307,7 +307,7 @@ TraceManager::Trace::~Trace() setting_->WriteTrace(streams_); } else if (setting_->mode_ == TRACE_MODE_OPENTELEMETRY) { #ifndef _WIN32 - EndSpan(); + EndSpan(kRootSpan); #else LOG_ERROR << "Unsupported trace mode: " << TraceManager::InferenceTraceModeString(setting_->mode_); @@ -339,12 +339,7 @@ TraceManager::Trace::CaptureTimestamp( << "{\"name\":\"" << name << "\",\"ns\":" << timestamp_ns << "}]}"; } else if (setting_->mode_ == TRACE_MODE_OPENTELEMETRY) { #ifndef _WIN32 - otel_common::SystemTimestamp otel_timestamp{ - (time_offset_ + std::chrono::nanoseconds{timestamp_ns})}; - if (trace_span_ == nullptr) { - InitSpan(otel_timestamp); - } - trace_span_->AddEvent(name, otel_timestamp); + AddEvent(kRootSpan, name, timestamp_ns); #else LOG_ERROR << "Unsupported trace mode: " << TraceManager::InferenceTraceModeString(setting_->mode_); @@ -374,24 +369,205 @@ TraceManager::Trace::InitTracer( otel_trace_sdk::SimpleSpanProcessorFactory::Create(std::move(exporter_)); provider_ = otel_trace_sdk::TracerProviderFactory::Create(std::move(processor_)); + auto steady_timestamp_ns = + std::chrono::duration_cast<std::chrono::nanoseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + auto root_span = StartSpan("InferRequest", steady_timestamp_ns); + // Initializing OTel context and storing "InferRequest" span as a root span + // to keep it alive for the duration of the request. + otel_context_ = opentelemetry::context::Context({kRootSpan, root_span}); } void -TraceManager::Trace::InitSpan(const otel_common::SystemTimestamp& timestamp_ns) +TraceManager::Trace::StartSpan( + std::string span_key, TRITONSERVER_InferenceTrace* trace, + TRITONSERVER_InferenceTraceActivity activity, uint64_t timestamp_ns, + uint64_t trace_id) +{ + uint64_t parent_id; + LOG_TRITONSERVER_ERROR( + TRITONSERVER_InferenceTraceParentId(trace, &parent_id), + "getting trace parent id"); + std::string parent_span_key = ""; + + // Currently, only 2 types of sub-spans are supported: + // request span and compute span. Compute span is a leaf span + // and can not be a parent of any sub-span. If parent_id==0, + // then current model is either a standalone model, or an ensemble model. + // In both cases, the parent of the new request sub-span is the kRootSpan. + // A request span with trace id = `trace_id` is a parent of a compute span, + // started in the same trace. + // If parent_id > 0, then this is a child trace, spawned from + // the ensamble's main request. For this instance, the parent + // span is the ensembles's request span. + if (parent_id == 0 && activity == TRITONSERVER_TRACE_REQUEST_START) { + parent_span_key = kRootSpan; + } else if (activity == TRITONSERVER_TRACE_REQUEST_START) { + parent_span_key = kRequestSpan + std::to_string(parent_id); + } else if (activity == TRITONSERVER_TRACE_COMPUTE_START) { + parent_span_key = kRequestSpan + std::to_string(trace_id); + } + + std::string display_name = "compute"; + const char* model_name; + if (activity == TRITONSERVER_TRACE_REQUEST_START) { + LOG_TRITONSERVER_ERROR( + TRITONSERVER_InferenceTraceModelName(trace, &model_name), + "getting model name"); + display_name = model_name; + } + + auto span = StartSpan(display_name, timestamp_ns, parent_span_key); + + if (activity == TRITONSERVER_TRACE_REQUEST_START) { + int64_t model_version; + LOG_TRITONSERVER_ERROR( + TRITONSERVER_InferenceTraceModelVersion(trace, &model_version), + "getting model version"); + span->SetAttribute("triton.model_name", model_name); + span->SetAttribute("triton.model_version", model_version); + span->SetAttribute("triton.trace_id", trace_id); + span->SetAttribute("triton.trace_parent_id", parent_id); + } + + otel_context_ = otel_context_.SetValue(span_key, span); +} + +opentelemetry::nostd::shared_ptr<otel_trace_api::Span> +TraceManager::Trace::StartSpan( + std::string display_name, const uint64_t& raw_timestamp_ns, + std::string parent_span_key) { otel_trace_api::StartSpanOptions options; - options.kind = otel_trace_api::SpanKind::kServer; // server - options.start_system_time = timestamp_ns; - // [FIXME] think about names - trace_span_ = - provider_->GetTracer("triton-server")->StartSpan("InferRequest", options); + options.kind = otel_trace_api::SpanKind::kServer; + options.start_system_time = + time_offset_ + std::chrono::nanoseconds{raw_timestamp_ns}; + options.start_steady_time = + otel_common::SteadyTimestamp{std::chrono::nanoseconds{raw_timestamp_ns}}; + + // If the new span is a child span, we need to retrieve its parent from + // the context and provide it through StartSpanOptions to the child span + if (!parent_span_key.empty() && otel_context_.HasKey(parent_span_key)) { + auto parent_span = opentelemetry::nostd::get< + opentelemetry::nostd::shared_ptr<otel_trace_api::Span>>( + otel_context_.GetValue(parent_span_key)); + options.parent = parent_span->GetContext(); + } + return provider_->GetTracer(kTritonTracer)->StartSpan(display_name, options); } void -TraceManager::Trace::EndSpan() +TraceManager::Trace::EndSpan(std::string span_key) { - if (trace_span_ != nullptr) { - trace_span_->End(); + auto timestamp_ns = std::chrono::duration_cast<std::chrono::nanoseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + EndSpan(span_key, timestamp_ns); +} + + +void +TraceManager::Trace::EndSpan( + std::string span_key, const uint64_t& raw_timestamp_ns) +{ + if (otel_context_.HasKey(span_key)) { + auto span = opentelemetry::nostd::get< + opentelemetry::nostd::shared_ptr<otel_trace_api::Span>>( + otel_context_.GetValue(span_key)); + + if (span == nullptr) { + return; + } + + otel_trace_api::EndSpanOptions end_options; + end_options.end_steady_time = otel_common::SteadyTimestamp{ + std::chrono::nanoseconds{raw_timestamp_ns}}; + span->End(end_options); + } +} + +void +TraceManager::Trace::ReportToOpenTelemetry( + TRITONSERVER_InferenceTrace* trace, + TRITONSERVER_InferenceTraceActivity activity, uint64_t timestamp_ns) +{ + uint64_t id; + LOG_TRITONSERVER_ERROR( + TRITONSERVER_InferenceTraceId(trace, &id), "getting trace id"); + + auto current_span_key = GetSpanKeyForActivity(activity, id); + if (current_span_key.empty()) { + return; + } + + AddEvent(current_span_key, trace, activity, timestamp_ns, id); +} + +std::string +TraceManager::Trace::GetSpanKeyForActivity( + TRITONSERVER_InferenceTraceActivity activity, uint64_t trace_id) +{ + std::string span_name; + switch (activity) { + case TRITONSERVER_TRACE_REQUEST_START: + case TRITONSERVER_TRACE_QUEUE_START: + case TRITONSERVER_TRACE_REQUEST_END: { + span_name = kRequestSpan + std::to_string(trace_id); + break; + } + + case TRITONSERVER_TRACE_COMPUTE_START: + case TRITONSERVER_TRACE_COMPUTE_INPUT_END: + case TRITONSERVER_TRACE_COMPUTE_OUTPUT_START: + case TRITONSERVER_TRACE_COMPUTE_END: { + span_name = kComputeSpan + std::to_string(trace_id); + break; + } + case TRITONSERVER_TRACE_TENSOR_QUEUE_INPUT: + case TRITONSERVER_TRACE_TENSOR_BACKEND_INPUT: + case TRITONSERVER_TRACE_TENSOR_BACKEND_OUTPUT: + default: { + LOG_ERROR << "Unsupported activity: " + << TRITONSERVER_InferenceTraceActivityString(activity); + span_name = ""; + break; + } + } + + return span_name; +} + +void +TraceManager::Trace::AddEvent( + std::string span_key, TRITONSERVER_InferenceTrace* trace, + TRITONSERVER_InferenceTraceActivity activity, uint64_t timestamp_ns, + uint64_t id) +{ + if (activity == TRITONSERVER_TRACE_REQUEST_START || + activity == TRITONSERVER_TRACE_COMPUTE_START) { + StartSpan(span_key, trace, activity, timestamp_ns, id); + } + + AddEvent( + span_key, TRITONSERVER_InferenceTraceActivityString(activity), + timestamp_ns); + + if (activity == TRITONSERVER_TRACE_REQUEST_END || + activity == TRITONSERVER_TRACE_COMPUTE_END) { + EndSpan(span_key, timestamp_ns); + } +} + +void +TraceManager::Trace::AddEvent( + std::string span_key, std::string event, uint64_t timestamp) +{ + if (otel_context_.HasKey(span_key)) { + auto span = opentelemetry::nostd::get< + opentelemetry::nostd::shared_ptr<otel_trace_api::Span>>( + otel_context_.GetValue(span_key)); + span->AddEvent(event, time_offset_ + std::chrono::nanoseconds{timestamp}); } } #endif @@ -441,22 +617,30 @@ TraceManager::TraceActivity( reinterpret_cast<std::shared_ptr<TraceManager::Trace>*>(userp)->get(); std::lock_guard<std::mutex> lk(ts->mtx_); + + if (ts->setting_->mode_ == TRACE_MODE_OPENTELEMETRY) { +#ifndef _WIN32 + ts->ReportToOpenTelemetry(trace, activity, timestamp_ns); +#else + LOG_ERROR << "Unsupported trace mode: " + << TraceManager::InferenceTraceModeString(ts->setting_->mode_); +#endif + return; + } + std::stringstream* ss = nullptr; { - if (ts->setting_->mode_ == TRACE_MODE_TRITON) { - if (ts->streams_.find(id) == ts->streams_.end()) { - std::unique_ptr<std::stringstream> stream(new std::stringstream()); - ss = stream.get(); - ts->streams_.emplace(id, std::move(stream)); - } else { - ss = ts->streams_[id].get(); - // If the string stream is not newly created, add "," as there is - // already content in the string stream - *ss << ","; - } + if (ts->streams_.find(id) == ts->streams_.end()) { + std::unique_ptr<std::stringstream> stream(new std::stringstream()); + ss = stream.get(); + ts->streams_.emplace(id, std::move(stream)); + } else { + ss = ts->streams_[id].get(); + // If the string stream is not newly created, add "," as there is + // already content in the string stream + *ss << ","; } } - // If 'activity' is TRITONSERVER_TRACE_REQUEST_START then collect // and serialize trace details. if (activity == TRITONSERVER_TRACE_REQUEST_START) { @@ -478,52 +662,22 @@ TraceManager::TraceActivity( TRITONSERVER_InferenceTraceRequestId(trace, &request_id), "getting request id"); - if (ts->setting_->mode_ == TRACE_MODE_TRITON) { - *ss << "{\"id\":" << id << ",\"model_name\":\"" << model_name - << "\",\"model_version\":" << model_version; - - if (std::string(request_id) != "") { - *ss << ",\"request_id\":\"" << request_id << "\""; - } + *ss << "{\"id\":" << id << ",\"model_name\":\"" << model_name + << "\",\"model_version\":" << model_version; - if (parent_id != 0) { - *ss << ",\"parent_id\":" << parent_id; - } - *ss << "},"; - } else if (ts->setting_->mode_ == TRACE_MODE_OPENTELEMETRY) { -#ifndef _WIN32 - if (ts->trace_span_ == nullptr) { - ts->InitSpan(ts->time_offset_ + std::chrono::nanoseconds{timestamp_ns}); - } - ts->trace_span_->SetAttribute("triton.model_name", model_name); - ts->trace_span_->SetAttribute("triton.model_version", model_version); - ts->trace_span_->SetAttribute("triton.trace_parent_id", parent_id); - ts->trace_span_->SetAttribute("triton.trace_request_id", request_id); -#else - LOG_ERROR << "Unsupported trace mode: " - << TraceManager::InferenceTraceModeString(ts->setting_->mode_); -#endif + if (std::string(request_id) != "") { + *ss << ",\"request_id\":\"" << request_id << "\""; } - } - if (ts->setting_->mode_ == TRACE_MODE_TRITON) { - *ss << "{\"id\":" << id << ",\"timestamps\":[" - << "{\"name\":\"" << TRITONSERVER_InferenceTraceActivityString(activity) - << "\",\"ns\":" << timestamp_ns << "}]}"; - } else if (ts->setting_->mode_ == TRACE_MODE_OPENTELEMETRY) { -#ifndef _WIN32 - otel_common::SystemTimestamp otel_timestamp{ - (ts->time_offset_ + std::chrono::nanoseconds{timestamp_ns})}; - if (ts->trace_span_ == nullptr) { - ts->InitSpan(otel_timestamp); + if (parent_id != 0) { + *ss << ",\"parent_id\":" << parent_id; } - ts->trace_span_->AddEvent( - TRITONSERVER_InferenceTraceActivityString(activity), otel_timestamp); -#else - LOG_ERROR << "Unsupported trace mode: " - << TraceManager::InferenceTraceModeString(ts->setting_->mode_); -#endif + *ss << "},"; } + + *ss << "{\"id\":" << id << ",\"timestamps\":[" + << "{\"name\":\"" << TRITONSERVER_InferenceTraceActivityString(activity) + << "\",\"ns\":" << timestamp_ns << "}]}"; } void diff --git a/src/tracer.h b/src/tracer.h index c310921f14..55bf2b9800 100644 --- a/src/tracer.h +++ b/src/tracer.h @@ -41,6 +41,7 @@ #include "opentelemetry/sdk/trace/processor.h" #include "opentelemetry/sdk/trace/simple_processor_factory.h" #include "opentelemetry/sdk/trace/tracer_provider_factory.h" +#include "opentelemetry/trace/context.h" #include "opentelemetry/trace/provider.h" namespace otlp = opentelemetry::exporter::otlp; namespace otel_trace_sdk = opentelemetry::sdk::trace; @@ -54,6 +55,15 @@ namespace triton { namespace server { using TraceConfig = std::vector<std::pair<std::string, std::string>>; using TraceConfigMap = std::unordered_map<std::string, TraceConfig>; +// Common OTel span keys to store in OTel context +// with the corresponding trace id. +constexpr char kRootSpan[] = "root_span"; +constexpr char kRequestSpan[] = "request_span"; +constexpr char kComputeSpan[] = "compute_span"; + +// OTel tracer name +constexpr char kTritonTracer[] = "triton-server"; + /// Trace modes. typedef enum tracemode_enum { /// Default is Triton tracing API @@ -157,7 +167,38 @@ class TraceManager { uint64_t trace_id_; + // Capture a timestamp generated outside of triton and associate it + // with this trace. + void CaptureTimestamp(const std::string& name, uint64_t timestamp_ns); + #if !defined(_WIN32) && defined(TRITON_ENABLE_TRACING) + /// Initializes Opentelemetry exporter, processor, provider and context. + /// + /// \param config_map A config map, which stores all parameters, specified + /// by user. + void InitTracer(const TraceConfigMap& config_map); + + /// Reports TRITONSERVER_InferenceTraceActivity as event to + /// the currently active span. If activity is an instance of + /// `TRITONSERVER_TRACE_REQUEST_START` or + /// `TRITONSERVER_TRACE_COMPUTE_START`, + /// it starts a new request or compute span. For the request span it + /// adds some triton related attributes, and adds this span to + /// `otel_context_`. Alternatively, if activity is + /// `TRITONSERVER_TRACE_REQUEST_END` or `TRITONSERVER_TRACE_COMPUTE_END`, + /// it ends the corresponding span. + /// + /// \param trace TRITONSERVER_InferenceTrace instance. + /// \param activity Trace activity. + /// \param timestamp_ns Steady timestamp, which is used to calculate + /// OpenTelemetry SystemTimestamp to display span on a timeline, and + /// OpenTelemetry SteadyTimestamp to calculate the duration on the span + /// with better precision. + void ReportToOpenTelemetry( + TRITONSERVER_InferenceTrace* trace, + TRITONSERVER_InferenceTraceActivity activity, uint64_t timestamp_ns); + + private: // OpenTelemetry SDK relies on system's clock for event timestamps. // Triton Tracing records timestamps using steady_clock. This is a // monotonic clock, i.e. time is always moving forward. It is not related @@ -174,26 +215,98 @@ class TraceManager { std::chrono::duration_cast<std::chrono::nanoseconds>( std::chrono::steady_clock::now().time_since_epoch()); - opentelemetry::nostd::shared_ptr<otel_trace_api::Span> trace_span_{nullptr}; - std::unique_ptr<otel_trace_sdk::SpanExporter> exporter_; std::unique_ptr<otel_trace_sdk::SpanProcessor> processor_; std::shared_ptr<otel_trace_api::TracerProvider> provider_; - // Initializes Opentelemetry exporter, processor and provider - void InitTracer(const TraceConfigMap& config_map); - - // Starts a span with the provided timestamp - void InitSpan(const opentelemetry::common::SystemTimestamp& timestamp_ns); - - // Ends the started span and cleans up tracer - void EndSpan(); + // OTel context to store spans, created in the current trace + opentelemetry::context::Context otel_context_; + + /// Starts a compute or request span based on `activity`. + /// For request spans, it will add the following attributes to the span: + /// `model_name`, `model_version`, `trace_id`, `parent_id`. + /// + /// \param span_key Span's key to retrieve the corresponding span from the + /// OpenTelemetry context. + /// \param trace TRITONSERVER_InferenceTrace, used to request model's name, + /// version, trace parent_id from the backend. + /// \param activity Trace activity. + /// \param timestamp_ns Steady timestamp, which is used to calculate + /// OpenTelemetry SystemTimestamp to display span on a timeline, and + /// OpenTelemetry SteadyTimestamp to calculate the duration on the span + /// with better precision. + /// \param trace_id Trace id. + void StartSpan( + std::string span_key, TRITONSERVER_InferenceTrace* trace, + TRITONSERVER_InferenceTraceActivity activity, uint64_t timestamp_ns, + uint64_t trace_id); + + /// Starts a span with the provided timestamp and name. + /// + /// \param display_name Span's name, which will be shown in the trace. + /// \param raw_timestamp_ns Steady timestamp, which is used to calculate + /// OpenTelemetry SystemTimestamp to display span on a timeline, and + /// OpenTelemetry SteadyTimestamp to calculate the duration on the span + /// with better precision. + /// \param parent_span_key A span key, to find a parent span in the + /// OpenTelemetry context. If empty, a root span will be started, + /// i.e. with no parent span specified. + /// \return A shared pointer to a newly created OpenTelemetry span. + opentelemetry::nostd::shared_ptr<otel_trace_api::Span> StartSpan( + std::string display_name, const uint64_t& raw_timestamp_ns, + std::string parent_span_key = ""); + + /// Ends the provided span. + /// + /// \param span_key Span's key to retrieve the corresponding span from the + /// OpenTelemetry context. + void EndSpan(std::string span_key); + + /// Ends the provided span at specified steady timestamp. + /// + /// \param span_key Span's key to retrieve the corresponding span from the + /// OpenTelemetry context. + /// \param raw_timestamp_ns Steady timestamp to use as + /// `EndSpanOptions::end_steady_time`. + void EndSpan(std::string span_key, const uint64_t& raw_timestamp_ns); + + /// Returns the span key, for which the activity belongs. + /// + /// \param activity reported activity. + /// \param trace_id Trace id. + /// \return A key to identify span, stored in the OpenTelemetry context. + std::string GetSpanKeyForActivity( + TRITONSERVER_InferenceTraceActivity activity, uint64_t trace_id); + + /// Adds event to the span, which is retrieved from OpenTelemetry context + /// with the provided `span_key`. If activity is + /// TRITONSERVER_TRACE_REQUEST_START, or TRITONSERVER_TRACE_COMPUTE_START, + /// starts a new span and adds it to `otel_context_`. + /// + /// \param span_key Span's key to retrieve the corresponding span from the + /// OpenTelemetry context. + /// \param trace TRITONSERVER_InferenceTrace, used to request model's name, + /// version, trace parent_id from the backend. + /// \param activity Trace activity. + /// \param timestamp_ns Timestamp of the provided event. + /// \param id Trace id. + void AddEvent( + std::string span_key, TRITONSERVER_InferenceTrace* trace, + TRITONSERVER_InferenceTraceActivity activity, uint64_t timestamp_ns, + uint64_t id); + + /// Adds event to the OpenTelemetry span, retrieved from an OpenTelementry + /// context with the provided `span_key`. + /// + /// \param span_key Span's key to retrieve the corresponding span from the + /// OpenTelemetry context. + /// \param event An event to add to the span. + /// \param timestamp_ns Timestamp of the provided event. + void AddEvent( + std::string span_key, std::string event, uint64_t timestamp_ns); #endif - // Capture a timestamp generated outside of triton and associate it - // with this trace. - void CaptureTimestamp(const std::string& name, uint64_t timestamp_ns); }; private: