diff --git a/beeline/__init__.py b/beeline/__init__.py index c47a2f7..41efbdb 100644 --- a/beeline/__init__.py +++ b/beeline/__init__.py @@ -184,7 +184,7 @@ def traced_thread(self, fn): # copy as a new list - reference will be unavailable when we enter the new thread stack = copy.copy(self.tracer_impl._state.stack) trace_fields = copy.copy(self.tracer_impl._state.trace_fields) - + @functools.wraps(fn) def wrapped(*args, **kwargs): self.tracer_impl._state.trace_id = trace_id @@ -356,6 +356,22 @@ def remove_context_field(name): if _GBL: _GBL.tracer_impl.remove_context_field(name=name) +def add_rollup_field(name, value): + ''' AddRollupField adds a key/value pair to the current span. If it is called repeatedly + on the same span, the values will be summed together. Additionally, this + field will be summed across all spans and added to the trace as a total. It + is especially useful for doing things like adding the duration spent talking + to a specific external service - eg database time. The root span will then + get a field that represents the total time spent talking to the database from + all of the spans that are part of the trace. + + Args: + - `name`: Name of field to add + - `value`: Numeric (float) value of new field + ''' + if _GBL: + _GBL.tracer_impl.add_rollup_field(name=name, value=value) + def add_trace_field(name, value): ''' Similar to `add_context_field` - adds a field to the current span, but also to all other future spans in this trace. Trace context fields will be @@ -635,7 +651,7 @@ def noop(*args, **kwargs): # copy as a new list - reference will be unavailable when we enter the new thread stack = copy.copy(bl.tracer_impl._state.stack) trace_fields = copy.copy(bl.tracer_impl._state.trace_fields) - + @functools.wraps(fn) def wrapped(*args, **kwargs): bl.tracer_impl._state.trace_id = trace_id diff --git a/beeline/test_trace.py b/beeline/test_trace.py index 826e1d0..dd9be5b 100644 --- a/beeline/test_trace.py +++ b/beeline/test_trace.py @@ -1,4 +1,5 @@ -from mock import Mock, call, patch +from collections import defaultdict +from mock import ANY, Mock, call, patch import datetime import unittest import uuid @@ -316,6 +317,59 @@ def test_add_trace_field_propagates(self): 'app.more': 'data!', }) + def test_add_rollup_field_propagates(self): + m_client = Mock() + tracer = SynchronousTracer(m_client) + tracer._run_hooks_and_send = Mock() + + span1 = tracer.start_trace(context={'name': 'root'}) + event1 = m_client.new_event.return_value + + span2 = tracer.start_span(context={'name': 'middle'}) + event2 = m_client.new_event.return_value + + span3 = tracer.start_span(context={'name': 'inner1'}) + event3 = m_client.new_event.return_value + + tracer.add_rollup_field('database_ms', 17) + tracer.add_rollup_field('calories', 180) + tracer.add_rollup_field('database_ms', 23.1) + + event3.add_field.reset_mock() + tracer.finish_span(span3) + event3.add_field.assert_has_calls([ + call('database_ms', 17.0 + 23.1), + call('calories', 180.0), + call('duration_ms', ANY), + ], any_order=True) + + span4 = tracer.start_span(context={'name': 'inner2'}) + event4 = m_client.new_event.return_value + + tracer.add_rollup_field('calories', 120) + + event4.add_field.reset_mock() + tracer.finish_span(span4) + event4.add_field.assert_has_calls([ + call('calories', 120.0), + call('duration_ms', ANY), + ], any_order=True) + + event2.add_field.reset_mock() + tracer.finish_span(span2) + event2.add_field.assert_has_calls([ + # This intermediate span doesn't get any rollup fields. + call('duration_ms', ANY), + ], any_order=True) + + event1.add_field.reset_mock() + tracer.finish_span(span1) + event1.add_field.assert_has_calls([ + call('rollup.database_ms', 17.0 + 23.1), + call('rollup.calories', 180.0 + 120.0), + call('duration_ms', ANY), + ], any_order=True) + def test_get_active_span(self): m_client = Mock() tracer = SynchronousTracer(m_client) @@ -415,6 +469,7 @@ def test_run_hooks_and_send_adds_trace_fields(self): m_span.event.start_time = datetime.datetime.now() # set an existing trace field m_span.event.add_field('app.a', 1) + m_span.rollup_fields = defaultdict(float) with patch('beeline.trace._should_sample') as m_sample_fn: m_sample_fn.return_value = True @@ -423,7 +478,7 @@ def test_run_hooks_and_send_adds_trace_fields(self): tracer.add_trace_field('b', 2) tracer.add_trace_field('c', 3) tracer.finish_span(m_span) - + # ensure we only added fields b and c and did not try to overwrite a self.assertDictContainsSubset({'app.a': 1, 'app.b': 2, 'app.c': 3}, m_span.event.fields()) @@ -466,4 +521,4 @@ def test_span_context(self): span.remove_context_field("another") self.assertDictEqual({ "some": "value", - }, ev.fields()) \ No newline at end of file + }, ev.fields()) diff --git a/beeline/trace.py b/beeline/trace.py index 529d5db..15686e7 100644 --- a/beeline/trace.py +++ b/beeline/trace.py @@ -6,6 +6,7 @@ import struct import threading import uuid +from collections import defaultdict from functools import wraps from contextlib import contextmanager @@ -22,6 +23,7 @@ def d(self, *args, **kwargs): self._state.trace_id = None self._state.stack = [] self._state.trace_fields = {} + self._state.trace_rollup_fields = defaultdict(float) return f(self, *args, **kwargs) return d @@ -85,6 +87,7 @@ def start_trace(self, context=None, trace_id=None, parent_span_id=None): # reset our stack and context on new traces self._state.stack = [] self._state.trace_fields = {} + self._state.trace_rollup_fields = defaultdict(float) # start the root span return self.start_span(context=context, parent_id=parent_span_id) @@ -121,6 +124,14 @@ def finish_span(self, span): # send the span's event. Even if the stack is in an unhealthy state, # it's probably better to send event data than not if span.event: + # add the trace's rollup fields to the root span + if span.is_root(): + for k, v in self._state.trace_rollup_fields.items(): + span.event.add_field(k, v) + + for k, v in span.rollup_fields.items(): + span.event.add_field(k, v) + # propagate trace fields that may have been added in later spans for k, v in self._state.trace_fields.items(): # don't overwrite existing values because they may be different @@ -179,6 +190,16 @@ def remove_context_field(self, name): if span: span.remove_context_field(name=name) + @init_state + def add_rollup_field(self, name, value): + value = float(value) + + span = self.get_active_span() + if span: + span.rollup_fields[name] += value + + self._state.trace_rollup_fields["rollup.%s" % name] += value + @init_state def add_trace_field(self, name, value): # prefix with app to avoid key conflicts @@ -246,6 +267,7 @@ def __init__(self, trace_id, parent_id, id, event, is_root=False): self.id = id self.event = event self.event.start_time = datetime.datetime.now() + self.rollup_fields = defaultdict(float) self._is_root = is_root def add_context_field(self, name, value):