-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path__init__.py
134 lines (108 loc) · 3.78 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
Log events to Elastic Search via bulk upload
"""
import datetime as _datetime
import json as _json
import logging as _logging
import socket as _socket
__version__ = '0.3'
LOG = _logging.getLogger(__name__)
DEFAULT = {
'host': 'localhost',
'port': 9700,
'protocol': 'UDP',
'index': 'log',
'datestamp_index': True,
'type': None,
}
class Connection(object):
"""A socket connecting to Elastic Search
Use a context manager for PEP 343's 'with' syntax:
>>> with Connection(host='localhost', port=1234) as c:
... c.send(message='hello!')
"""
def __init__(self, host=None, port=None, protocol=None):
if host is None:
host = DEFAULT['host']
if port is None:
port = DEFAULT['port']
if protocol is None:
protocol = DEFAULT['protocol']
self.host = host
self.port = port
if protocol == 'UDP':
self.socket_type = _socket.SOCK_DGRAM
else:
raise NotImplementedError(protocol)
self._sock = None
def __enter__(self):
self._sock = _socket.socket(_socket.AF_INET, self.socket_type)
return self
def __exit__(self, *exc_info):
if self._sock is not None:
try:
self._sock.close()
finally:
self._sock = None
def send(self, message):
LOG.debug(message)
self._sock.sendto(message, (self.host, self.port))
def log(index=None, type=None, sort_keys=False, **kwargs):
"""Log an arbitrary payload dictionary to Elastic Search
Uses the default connection configuration. If you need to
override any of them, build your payload dict by hand and use
emit() instead.
You can optionally override the index and type of payload, for
later filtering in Elastic Search. This means that `index` and
`type` are not available as payload keys.
"""
kwargs['@timestamp'] = _datetime.datetime.utcnow().isoformat()
kwargs['@version'] = 1
return emit(payload=kwargs, index=index, type=type, sort_keys=sort_keys)
def emit(payload, index=None, datestamp_index=None, type=None,
sort_keys=False, connection_class=Connection, **kwargs):
"""Send bulk-upload data to Elastic Search
Uses the 'index' action to add or replace a document as necessary.
http://www.elasticsearch.org/guide/reference/api/bulk/
http://www.elasticsearch.org/guide/reference/api/bulk-udp/
"""
if index is None:
index = DEFAULT['index']
if type is None:
type = DEFAULT['type']
if datestamp_index is None:
datestamp_index = DEFAULT['datestamp_index']
if datestamp_index:
index = '-'.join([
index,
_datetime.date.today().strftime('%Y.%m.%d'),
])
if type is None:
LOG.error('You must set a type for {!r}'.format(payload))
return
index_data = {
'index': {
'_index': index,
'_type': type,
},
}
#not everything is JSON serializable, and logging should not blow up
#how to make these errors easier to track down?
try:
message = '\n'.join([
_json.dumps(index_data, sort_keys=sort_keys),
_json.dumps(payload, sort_keys=sort_keys),
'',
])
except TypeError:
LOG.error('Unable to serlialize {!r} to json'.format(payload))
message = '\n'.join([
_json.dumps(index_data, sort_keys=sort_keys),
_json.dumps({"error": "unable to serialize"}, sort_keys=sort_keys),
'',
])
if hasattr(message, 'encode'):
message = message.encode('utf-8') # convert str to bytes for Python 3
with connection_class(**kwargs) as connection:
connection.send(message)
return message