-
Notifications
You must be signed in to change notification settings - Fork 34
/
tracker.py
143 lines (116 loc) · 4.55 KB
/
tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""
Track application events. Supports persisting events to multiple backends.
Best Practices:
* It is recommended that event types are namespaced using dot notation to
avoid naming collisions, similar to DNS names. For example:
org.edx.video.stop, edu.mit.audio.stop
* Avoid using event type names that may cause collisions. The burden is
on the analyst to decide whether your event is equivalent to another
and should be grouped accordingly etc.
* Do not emit events that you don't own. This could negatively impact
the analysis of the event stream. If you suspect your event is
equivalent to another, say so in your documenation, and the analyst
can decide whether or not to group them.
"""
from contextlib import contextmanager
from datetime import datetime
import logging
from pytz import UTC
from eventtracking.locator import DefaultContextLocator
from eventtracking.backends.routing import RoutingBackend
UNKNOWN_EVENT_TYPE = 'unknown'
DEFAULT_TRACKER_NAME = 'default'
TRACKERS = {}
LOG = logging.getLogger(__name__)
class Tracker:
"""
Track application events. Holds references to a set of backends that will
be used to persist any events that are emitted.
"""
def __init__(self, backends=None, context_locator=None, processors=None):
self.routing_backend = RoutingBackend(backends=backends, processors=processors)
self.context_locator = context_locator or DefaultContextLocator()
@property
def located_context(self):
"""
The thread local context for this tracker.
"""
return self.context_locator.get()
def get_backend(self, name):
"""Gets the backend that was configured with `name`"""
return self.backends[name]
@property
def processors(self):
"""The list of registered processors"""
return self.routing_backend.processors
@property
def backends(self):
"""The dictionary of registered backends"""
return self.routing_backend.backends
def emit(self, name=None, data=None):
"""
Emit an event annotated with the UTC time when this function was called.
`name` is a unique identification string for an event that has
already been registered.
`data` is a dictionary mapping field names to the value to include in the event.
Note that all values provided must be serializable.
"""
event = {
'name': name or UNKNOWN_EVENT_TYPE,
'timestamp': datetime.now(UTC),
'data': data or {},
'context': self.resolve_context()
}
self.routing_backend.send(event)
def resolve_context(self):
"""
Create a new dictionary that corresponds to the union of all of the
contexts that have been entered but not exited at this point.
"""
merged = {}
for context in self.located_context.values():
merged.update(context)
return merged
def enter_context(self, name, ctx):
"""
Enter a named context. Any events emitted after calling this
method will contain all of the key-value pairs included in `ctx`
unless overridden by a context that is entered after this call.
"""
self.located_context[name] = ctx
def exit_context(self, name):
"""
Exit a named context. This will remove all key-value pairs
associated with this context from any events emitted after it
is removed.
"""
del self.located_context[name]
@contextmanager
def context(self, name, ctx):
"""
Execute the block with the given context applied. This manager
ensures that the context is removed even if an exception is raised
within the context.
"""
self.enter_context(name, ctx)
try:
yield
finally:
self.exit_context(name)
def register_tracker(tracker, name=DEFAULT_TRACKER_NAME):
"""
Makes a tracker globally accessible. Providing no `name` parameter
allows you to register the global default tracker that will be used
by subsequent calls to `tracker.emit`.
"""
TRACKERS[name] = tracker
def get_tracker(name=DEFAULT_TRACKER_NAME):
"""
Gets a named tracker. Defaults to the default global tracker. Raises
a `KeyError` if no such tracker has been registered by previously calling
`register_tracker`.
"""
return TRACKERS[name]
def emit(name=None, data=None):
"""Calls `Tracker.emit` on the default global tracker"""
return get_tracker().emit(name=name, data=data)