Skip to content

Commit

Permalink
Internal:Add span events native serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
marcotc committed Dec 11, 2024
1 parent 8ebc8a1 commit 4287f6b
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 51 deletions.
118 changes: 115 additions & 3 deletions lib/datadog/tracing/span_event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,135 @@ class SpanEvent
# @return [Integer]
attr_reader :time_unix_nano

# TODO: Accept {Time} as the time_unix_nano parameter, possibly in addition to the current nano integer.
def initialize(
name,
attributes: nil,
time_unix_nano: nil
)
@name = name
@attributes = attributes || {}

@attributes = attributes.dup || {}
validate_attributes!(@attributes)
@attributes.transform_keys!(&:to_s)

# OpenTelemetry SDK stores span event timestamps in nanoseconds (not seconds).
# We will do the same here to avoid unnecessary conversions and inconsistencies.
@time_unix_nano = time_unix_nano || (Time.now.to_r * 1_000_000_000).to_i
end

def to_hash
h = { name: @name, time_unix_nano: @time_unix_nano }
h[:attributes] = attributes unless @attributes.empty?
h = { 'name' => @name, 'time_unix_nano' => @time_unix_nano }
h['attributes'] = @attributes unless @attributes.empty?
h
end

def to_native_format
h = { 'name' => @name, 'time_unix_nano' => @time_unix_nano }

attr = {}
@attributes.each do |key, value|
attr[key] = if value.is_a?(Array)
{ type: ARRAY_TYPE, array_value: value.map { |v| serialize_native_attribute(v) } }
else
serialize_native_attribute(value)
end
end

h['attributes'] = attr unless @attributes.empty?

h
end

private

MIN_INT64_SIGNED = -2**63
MAX_INT64_SIGNED = 2 << 63 - 1

# Checks the attributes hash to ensure it only contains serializable values.
# Invalid values are removed from the hash.
def validate_attributes!(attributes)
attributes.select! do |key, value|
case value
when Array
next true if value.empty?

first = value.first
case first
when String, Integer, Float
first_type = first.class
if value.all? { |v| v.is_a?(first_type) }
value.all? { |v| validate_scalar_attribute!(key, v) }
else
Datadog.logger.warn("Attribute #{key} array must be homogenous: #{value}.")
false
end
when TrueClass, FalseClass
if value.all? { |v| v.is_a?(TrueClass) || v.is_a?(FalseClass) }
value.all? { |v| validate_scalar_attribute!(key, v) }
else
Datadog.logger.warn("Attribute #{key} array must be homogenous: #{value}.")
false
end
else
Datadog.logger.warn("Attribute #{key} must be a string, number, or boolean: #{value}.")
false
end
when String, Numeric, TrueClass, FalseClass
validate_scalar_attribute!(key, value)
else
Datadog.logger.warn("Attribute #{key} must be a string, number, boolean, or array: #{value}.")
false
end
end
end

def validate_scalar_attribute!(key, value)
case value
when String, TrueClass, FalseClass
true
when Integer
# Cannot be larger than signed 64-bit integer
if value < MIN_INT64_SIGNED || value > MAX_INT64_SIGNED
Datadog.logger.warn("Attribute #{key} must be within the range of a signed 64-bit integer: #{value}.")
false
else
true
end
when Float
# Has to be finite
return true if value.finite?

Datadog.logger.warn("Attribute #{key} must be a finite number: #{value}.")
false
else
Datadog.logger.warn("Attribute #{key} must be a string, number, or boolean: #{value}.")
false
end
end

STRING_TYPE = 0
BOOLEAN_TYPE = 1
INTEGER_TYPE = 2
DOUBLE_TYPE = 3
ARRAY_TYPE = 4

# Serializes individual scalar attributes into the native format.
def serialize_native_attribute(value)
case value
when String
{ type: STRING_TYPE, string_value: value }
when TrueClass, FalseClass
{ type: BOOLEAN_TYPE, bool_value: value }
when Integer
{ type: INTEGER_TYPE, int_value: value }
when Float
{ type: DOUBLE_TYPE, double_value: value }
else
# This is technically unreachable due to the validation in #initialize.
raise ArgumentError, "Attribute must be a string, number, or boolean: #{value}."
end
end
end
end
end
2 changes: 2 additions & 0 deletions lib/datadog/tracing/span_operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
require_relative 'metadata'
require_relative 'metadata/ext'
require_relative 'span'
require_relative 'span_event'
require_relative 'span_link'
require_relative 'utils'

module Datadog
Expand Down
26 changes: 20 additions & 6 deletions lib/datadog/tracing/transport/serializable_trace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ class SerializableTrace
attr_reader \
:trace

def initialize(trace)
def initialize(trace, native_events_supported = false)
@trace = trace
@native_events_supported = native_events_supported
end

# MessagePack serializer interface. Making this object
Expand All @@ -26,13 +27,13 @@ def initialize(trace)
# @param packer [MessagePack::Packer] serialization buffer, can be +nil+ with JRuby
def to_msgpack(packer = nil)
# As of 1.3.3, JRuby implementation doesn't pass an existing packer
trace.spans.map { |s| SerializableSpan.new(s) }.to_msgpack(packer)
trace.spans.map { |s| SerializableSpan.new(s, @native_events_supported) }.to_msgpack(packer)
end

# JSON serializer interface.
# Used by older version of the transport.
def to_json(*args)
trace.spans.map { |s| SerializableSpan.new(s).to_hash }.to_json(*args)
trace.spans.map { |s| SerializableSpan.new(s, @native_events_supported).to_hash }.to_json(*args)
end
end

Expand All @@ -41,9 +42,10 @@ class SerializableSpan
attr_reader \
:span

def initialize(span)
def initialize(span, native_events_supported)
@span = span
@trace_id = Tracing::Utils::TraceId.to_low_order(span.trace_id)
@native_events_supported = native_events_supported
end

# MessagePack serializer interface. Making this object
Expand All @@ -55,11 +57,14 @@ def initialize(span)
#
# @param packer [MessagePack::Packer] serialization buffer, can be +nil+ with JRuby
# rubocop:disable Metrics/AbcSize
# rubocop:disable Metrics/MethodLength
def to_msgpack(packer = nil)
packer ||= MessagePack::Packer.new

number_of_elements_to_write = 11

number_of_elements_to_write += 1 if span.events.any? && @native_events_supported

if span.stopped?
packer.write_map_header(number_of_elements_to_write + 2) # Set header with how many elements in the map

Expand All @@ -72,8 +77,16 @@ def to_msgpack(packer = nil)
packer.write_map_header(number_of_elements_to_write) # Set header with how many elements in the map
end

# serialize span events as meta tags
span.set_tag('events', span.events.map(&:to_hash).to_json) if span.events.any?
if span.events.any?
if @native_events_supported
# Use top-level field for native events
packer.write('span_events')
packer.write(span.events.map(&:to_native_format))
else
# Serialize span events as meta tags
span.set_tag('events', span.events.map(&:to_hash).to_json)
end
end

# DEV: We use strings as keys here, instead of symbols, as
# DEV: MessagePack will ultimately convert them to strings.
Expand Down Expand Up @@ -103,6 +116,7 @@ def to_msgpack(packer = nil)
packer
end
# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/MethodLength

# JSON serializer interface.
# Used by older version of the transport.
Expand Down
50 changes: 35 additions & 15 deletions sig/datadog/tracing/span_event.rbs
Original file line number Diff line number Diff line change
@@ -1,27 +1,47 @@
module Datadog
module Tracing
# SpanEvent represents an annotation on a span.
# @public_api
class SpanEvent
@name: untyped
type attributes = Hash[String,attributeValue]
type attributeValue = String | Integer | Float | bool | Array[String] | Array[Integer] | Array[Float] | Array[bool]

@attributes: untyped
MIN_INT64_SIGNED: Integer
MAX_INT64_SIGNED: Integer
STRING_TYPE: Integer
BOOLEAN_TYPE: Integer
INTEGER_TYPE: Integer
DOUBLE_TYPE: Integer
ARRAY_TYPE: Integer

@time_unix_nano: untyped
attr_reader name: untyped # TODO: Typing this makes to_hash internal typecheck fail
attr_reader attributes: attributes
attr_reader time_unix_nano: untyped # TODO: Typing this also makes to_hash internal typecheck fail

# @!attribute [r] name
# @return [String]
attr_reader name: untyped
def initialize: (String name, ?attributes: attributes, ?time_unix_nano: Integer) -> void

# @!attribute [r] attributes
# @return [Hash<String,String>]
attr_reader attributes: untyped
def to_hash: -> Hash[String, untyped]
# TODO: Steep does not track Hash keys when they are added with `hash[:key] = val`.
# {
# name: String,
# time_unix_nano: Integer,
# ?attributes: attributes,
# }

# @!attribute [r] time_unix_nano
# @return [Integer]
attr_reader time_unix_nano: untyped
def to_native_format: -> Hash[String, untyped]
# TODO: Steep does not track Hash keys when they are added with `hash[:key] = val`.
# {
# name: String,
# time_unix_nano: Integer,
# ?attributes: Hash[String, nativeAttributeValue],
# }
# type nativeAttributeValue = { type: Integer, string_value: String } | { type: Integer, int_value: Integer } | { type: Integer, double_value: Float } | { type: Integer, bool_value: bool } | { type: Integer, string_array_value: Array[String] } | { type: Integer, int_array_value: Array[Integer] } | { type: Integer, double_array_value: Array[Float] } | { type: Integer, bool_array_value: Array[bool] }

def initialize: (untyped name, ?attributes: untyped?, ?time_unix_nano: untyped?) -> void
private

def serialize_native_attribute: (attributeValue value)-> ({ type: Integer, string_value: String } | { type: Integer, int_value: Integer } | { type: Integer, double_value: Float } | { type: Integer, bool_value: bool })

def validate_attributes!: (attributes attributes)-> void

def validate_scalar_attribute!: (String key, attributeValue value)-> bool
end
end
end
13 changes: 9 additions & 4 deletions sig/datadog/tracing/transport/serializable_trace.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ module Datadog
module Tracing
module Transport
class SerializableTrace
attr_reader trace: untyped
@native_events_supported: bool

def initialize: (untyped trace) -> void
attr_reader trace: Span

def initialize: (untyped trace, bool native_events_supported) -> void

def to_msgpack: (?untyped? packer) -> untyped

def to_json: (*untyped args) -> untyped
end

class SerializableSpan
attr_reader span: untyped
@native_events_supported: bool
@trace_id: Integer

attr_reader span: Span

def initialize: (untyped span) -> void
def initialize: (untyped span, bool native_events_supported) -> void

def to_msgpack: (?untyped? packer) -> untyped

Expand Down
Loading

0 comments on commit 4287f6b

Please sign in to comment.