Skip to content

Commit

Permalink
Merge pull request #70 from sophiebits/rollup-fields
Browse files Browse the repository at this point in the history
Add `add_rollup_field` API
  • Loading branch information
tredman authored Jul 26, 2019
2 parents 9c20f68 + cfced99 commit b2d4a9d
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 5 deletions.
20 changes: 18 additions & 2 deletions beeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def traced_thread(self, fn):
# copy as a new list - reference will be unavailable when we enter the new thread
stack = copy.copy(self.tracer_impl._state.stack)
trace_fields = copy.copy(self.tracer_impl._state.trace_fields)

@functools.wraps(fn)
def wrapped(*args, **kwargs):
self.tracer_impl._state.trace_id = trace_id
Expand Down Expand Up @@ -356,6 +356,22 @@ def remove_context_field(name):
if _GBL:
_GBL.tracer_impl.remove_context_field(name=name)

def add_rollup_field(name, value):
''' AddRollupField adds a key/value pair to the current span. If it is called repeatedly
on the same span, the values will be summed together. Additionally, this
field will be summed across all spans and added to the trace as a total. It
is especially useful for doing things like adding the duration spent talking
to a specific external service - eg database time. The root span will then
get a field that represents the total time spent talking to the database from
all of the spans that are part of the trace.
Args:
- `name`: Name of field to add
- `value`: Numeric (float) value of new field
'''
if _GBL:
_GBL.tracer_impl.add_rollup_field(name=name, value=value)

def add_trace_field(name, value):
''' Similar to `add_context_field` - adds a field to the current span, but
also to all other future spans in this trace. Trace context fields will be
Expand Down Expand Up @@ -635,7 +651,7 @@ def noop(*args, **kwargs):
# copy as a new list - reference will be unavailable when we enter the new thread
stack = copy.copy(bl.tracer_impl._state.stack)
trace_fields = copy.copy(bl.tracer_impl._state.trace_fields)

@functools.wraps(fn)
def wrapped(*args, **kwargs):
bl.tracer_impl._state.trace_id = trace_id
Expand Down
61 changes: 58 additions & 3 deletions beeline/test_trace.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from mock import Mock, call, patch
from collections import defaultdict
from mock import ANY, Mock, call, patch
import datetime
import unittest
import uuid
Expand Down Expand Up @@ -316,6 +317,59 @@ def test_add_trace_field_propagates(self):
'app.more': 'data!',
})

def test_add_rollup_field_propagates(self):
m_client = Mock()
tracer = SynchronousTracer(m_client)
tracer._run_hooks_and_send = Mock()

span1 = tracer.start_trace(context={'name': 'root'})
event1 = m_client.new_event.return_value

span2 = tracer.start_span(context={'name': 'middle'})
event2 = m_client.new_event.return_value

span3 = tracer.start_span(context={'name': 'inner1'})
event3 = m_client.new_event.return_value

tracer.add_rollup_field('database_ms', 17)
tracer.add_rollup_field('calories', 180)
tracer.add_rollup_field('database_ms', 23.1)

event3.add_field.reset_mock()
tracer.finish_span(span3)
event3.add_field.assert_has_calls([
call('database_ms', 17.0 + 23.1),
call('calories', 180.0),
call('duration_ms', ANY),
], any_order=True)

span4 = tracer.start_span(context={'name': 'inner2'})
event4 = m_client.new_event.return_value

tracer.add_rollup_field('calories', 120)

event4.add_field.reset_mock()
tracer.finish_span(span4)
event4.add_field.assert_has_calls([
call('calories', 120.0),
call('duration_ms', ANY),
], any_order=True)

event2.add_field.reset_mock()
tracer.finish_span(span2)
event2.add_field.assert_has_calls([
# This intermediate span doesn't get any rollup fields.
call('duration_ms', ANY),
], any_order=True)

event1.add_field.reset_mock()
tracer.finish_span(span1)
event1.add_field.assert_has_calls([
call('rollup.database_ms', 17.0 + 23.1),
call('rollup.calories', 180.0 + 120.0),
call('duration_ms', ANY),
], any_order=True)

def test_get_active_span(self):
m_client = Mock()
tracer = SynchronousTracer(m_client)
Expand Down Expand Up @@ -415,6 +469,7 @@ def test_run_hooks_and_send_adds_trace_fields(self):
m_span.event.start_time = datetime.datetime.now()
# set an existing trace field
m_span.event.add_field('app.a', 1)
m_span.rollup_fields = defaultdict(float)

with patch('beeline.trace._should_sample') as m_sample_fn:
m_sample_fn.return_value = True
Expand All @@ -423,7 +478,7 @@ def test_run_hooks_and_send_adds_trace_fields(self):
tracer.add_trace_field('b', 2)
tracer.add_trace_field('c', 3)
tracer.finish_span(m_span)

# ensure we only added fields b and c and did not try to overwrite a
self.assertDictContainsSubset({'app.a': 1, 'app.b': 2, 'app.c': 3}, m_span.event.fields())

Expand Down Expand Up @@ -466,4 +521,4 @@ def test_span_context(self):
span.remove_context_field("another")
self.assertDictEqual({
"some": "value",
}, ev.fields())
}, ev.fields())
22 changes: 22 additions & 0 deletions beeline/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import struct
import threading
import uuid
from collections import defaultdict
from functools import wraps

from contextlib import contextmanager
Expand All @@ -22,6 +23,7 @@ def d(self, *args, **kwargs):
self._state.trace_id = None
self._state.stack = []
self._state.trace_fields = {}
self._state.trace_rollup_fields = defaultdict(float)

return f(self, *args, **kwargs)
return d
Expand Down Expand Up @@ -85,6 +87,7 @@ def start_trace(self, context=None, trace_id=None, parent_span_id=None):
# reset our stack and context on new traces
self._state.stack = []
self._state.trace_fields = {}
self._state.trace_rollup_fields = defaultdict(float)

# start the root span
return self.start_span(context=context, parent_id=parent_span_id)
Expand Down Expand Up @@ -121,6 +124,14 @@ def finish_span(self, span):
# send the span's event. Even if the stack is in an unhealthy state,
# it's probably better to send event data than not
if span.event:
# add the trace's rollup fields to the root span
if span.is_root():
for k, v in self._state.trace_rollup_fields.items():
span.event.add_field(k, v)

for k, v in span.rollup_fields.items():
span.event.add_field(k, v)

# propagate trace fields that may have been added in later spans
for k, v in self._state.trace_fields.items():
# don't overwrite existing values because they may be different
Expand Down Expand Up @@ -179,6 +190,16 @@ def remove_context_field(self, name):
if span:
span.remove_context_field(name=name)

@init_state
def add_rollup_field(self, name, value):
value = float(value)

span = self.get_active_span()
if span:
span.rollup_fields[name] += value

self._state.trace_rollup_fields["rollup.%s" % name] += value

@init_state
def add_trace_field(self, name, value):
# prefix with app to avoid key conflicts
Expand Down Expand Up @@ -246,6 +267,7 @@ def __init__(self, trace_id, parent_id, id, event, is_root=False):
self.id = id
self.event = event
self.event.start_time = datetime.datetime.now()
self.rollup_fields = defaultdict(float)
self._is_root = is_root

def add_context_field(self, name, value):
Expand Down

0 comments on commit b2d4a9d

Please sign in to comment.