Skip to content

Commit

Permalink
Merge pull request #820 from Yelp/support_es5
Browse files Browse the repository at this point in the history
Support es5
  • Loading branch information
Qmando authored Feb 2, 2017
2 parents e529300 + 8043a9d commit 1b5f610
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 51 deletions.
2 changes: 1 addition & 1 deletion docs/source/recipes/writing_filters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ See http://lucene.apache.org/core/2_9_4/queryparsersyntax.html for more informat
query: "field: value OR otherfield: othervalue"
- query:
query_string:
query: "this: that AND these: those"
query: "this: that AND these: those"

term
****
Expand Down
12 changes: 11 additions & 1 deletion docs/source/running_elastalert.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Running ElastAlert for the First Time
Requirements
------------

- Elasticsearch 1.* or 2.*
- Elasticsearch
- ISO8601 or Unix timestamped data
- Python 2.6 or 2.7
- pip, see requirements.txt
Expand All @@ -23,6 +23,16 @@ Install the module::
$ python setup.py install
$ pip install -r requirements.txt

Depending on the version of Elasticsearch, you may need to manually install the correct version of elasticsearch-py.

Elasticsearch 5.0+::

$ pip install elasticsearch>=5.0.0

Elasticsearch 2.X::

$ pip install elasticsearch<3.0.0

Next, open up config.yaml.example. In it, you will find several configuration options. ElastAlert may be run without changing any of these settings.

``rules_folder`` is where ElastAlert will load rule configuration files from. It will attempt to load every .yaml file in the folder. Without any valid rules, ElastAlert will not start. ElastAlert will also load new rules, stop running missing rules, and restart modified rules as the files in this folder change. For this tutorial, we will use the example_rules folder.
Expand Down
104 changes: 77 additions & 27 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,21 @@ def __init__(self, args):
self.replace_dots_in_field_names = self.conf.get('replace_dots_in_field_names', False)

self.writeback_es = elasticsearch_client(self.conf)
self.es_version = self.get_version()

for rule in self.rules:
self.init_rule(rule)

if self.args.silence:
self.silence()

def get_version(self):
info = self.writeback_es.info()
return info['version']['number']

def is_five(self):
return self.es_version.startswith('5')

@staticmethod
def get_index(rule, starttime=None, endtime=None):
""" Gets the index for a rule. If strftime is set and starttime and endtime
Expand All @@ -141,7 +149,7 @@ def get_index(rule, starttime=None, endtime=None):
return index

@staticmethod
def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field='@timestamp', to_ts_func=dt_to_ts, desc=False):
def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field='@timestamp', to_ts_func=dt_to_ts, desc=False, five=False):
""" Returns a query dict that will apply a list of filters, filter by
start and end time, and sort results by timestamp.
Expand All @@ -158,18 +166,25 @@ def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field=
if starttime and endtime:
es_filters['filter']['bool']['must'].insert(0, {'range': {timestamp_field: {'gt': starttime,
'lte': endtime}}})
query = {'query': {'filtered': es_filters}}
if five:
query = {'query': {'bool': es_filters}}
else:
query = {'query': {'filtered': es_filters}}
if sort:
query['sort'] = [{timestamp_field: {'order': 'desc' if desc else 'asc'}}]
return query

def get_terms_query(self, query, size, field):
""" Takes a query generated by get_query and outputs a aggregation query """
query = query['query']
if 'sort' in query:
query.pop('sort')
query['filtered'].update({'aggs': {'counts': {'terms': {'field': field, 'size': size}}}})
aggs_query = {'aggs': query}
query_element = query['query']
if 'sort' in query_element:
query_element.pop('sort')
if not self.is_five():
query_element['filtered'].update({'aggs': {'counts': {'terms': {'field': field, 'size': size}}}})
aggs_query = {'aggs': query_element}
else:
aggs_query = query
aggs_query['aggs'] = {'counts': {'terms': {'field': field, 'size': size}}}
return aggs_query

def get_index_start(self, index, timestamp_field='@timestamp'):
Expand Down Expand Up @@ -210,6 +225,9 @@ def process_hits(rule, hits):

# Convert the timestamp to a datetime
ts = lookup_es_key(hit['_source'], rule['timestamp_field'])
if not ts and not rule["_source_enabled"]:
raise EAException("Error: No timestamp was found for hit. '_source_enabled' is set to false, check your mappings for stored fields")

set_es_key(hit['_source'], rule['timestamp_field'], rule['ts_to_dt'](ts))
set_es_key(hit, rule['timestamp_field'], lookup_es_key(hit['_source'], rule['timestamp_field']))

Expand Down Expand Up @@ -238,11 +256,14 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False):
:param endtime: The latest time to query.
:return: A list of hits, bounded by rule['max_query_size'].
"""
query = self.get_query(rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], to_ts_func=rule['dt_to_ts'])
query = self.get_query(rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], to_ts_func=rule['dt_to_ts'], five=self.is_five())
extra_args = {'_source_include': rule['include']}
scroll_keepalive = rule.get('scroll_keepalive', self.scroll_keepalive)
if not rule.get('_source_enabled'):
query['fields'] = rule['include']
if self.is_five():
query['stored_fields'] = rule['include']
else:
query['fields'] = rule['include']
extra_args = {}

try:
Expand Down Expand Up @@ -287,7 +308,7 @@ def get_hits_count(self, rule, starttime, endtime, index):
:param endtime: The latest time to query.
:return: A dictionary mapping timestamps to number of hits for that time period.
"""
query = self.get_query(rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'])
query = self.get_query(rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'], five=self.is_five())

try:
res = self.current_es.count(index=index, doc_type=rule['doc_type'], body=query, ignore_unavailable=True)
Expand All @@ -311,13 +332,16 @@ def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=Non
if rule.get('raw_count_keys', True) and not rule['query_key'].endswith('.raw'):
filter_key = add_raw_postfix(filter_key)
rule_filter.extend([{'term': {filter_key: qk}}])
base_query = self.get_query(rule_filter, starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'])
base_query = self.get_query(rule_filter, starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'], five=self.is_five())
if size is None:
size = rule.get('terms_size', 50)
query = self.get_terms_query(base_query, size, key)

try:
res = self.current_es.search(index=index, doc_type=rule['doc_type'], body=query, search_type='count', ignore_unavailable=True)
if not self.is_five():
res = self.current_es.search(index=index, doc_type=rule['doc_type'], body=query, search_type='count', ignore_unavailable=True)
else:
res = self.current_es.search(index=index, doc_type=rule['doc_type'], body=query, size=0, ignore_unavailable=True)
except ElasticsearchException as e:
# Elasticsearch sometimes gives us GIGANTIC error messages
# (so big that they will fill the entire terminal buffer)
Expand All @@ -328,7 +352,10 @@ def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=Non

if 'aggregations' not in res:
return {}
buckets = res['aggregations']['filtered']['counts']['buckets']
if not self.is_five():
buckets = res['aggregations']['filtered']['counts']['buckets']
else:
buckets = res['aggregations']['counts']['buckets']
self.num_hits += len(buckets)
lt = rule.get('use_local_time')
elastalert_logger.info('Queried rule %s from %s to %s: %s buckets' % (rule['name'], pretty_ts(starttime, lt), pretty_ts(endtime, lt), len(buckets)))
Expand Down Expand Up @@ -412,8 +439,12 @@ def get_starttime(self, rule):
:param rule: The rule configuration.
:return: A timestamp or None.
"""
query = {'filter': {'term': {'rule_name': '%s' % (rule['name'])}},
'sort': {'@timestamp': {'order': 'desc'}}}
sort = {'sort': {'@timestamp': {'order': 'desc'}}}
query = {'filter': {'term': {'rule_name': '%s' % (rule['name'])}}}
if self.is_five():
query = {'query': {'bool': query}}
query.update(sort)

try:
if self.writeback_es:
res = self.writeback_es.search(index=self.writeback_index, doc_type='elastalert_status',
Expand Down Expand Up @@ -628,6 +659,14 @@ def init_rule(self, new_rule, new=True):
continue
new_rule[prop] = rule[prop]

# In ES5, filters starting with 'query' should have the top wrapper removed
if self.is_five():
for es_filter in new_rule.get('filter', []):
if es_filter.get('query'):
new_filter = es_filter['query']
new_rule['filter'].append(new_filter)
new_rule['filter'].remove(es_filter)

return new_rule

def load_rule_changes(self):
Expand Down Expand Up @@ -844,9 +883,9 @@ def upload_dashboard(self, db, rule, match):
# Upload
es = elasticsearch_client(rule)

res = es.create(index='kibana-int',
doc_type='temp',
body=db_body)
res = es.index(index='kibana-int',
doc_type='temp',
body=db_body)

# Return dashboard URL
kibana_url = rule.get('kibana_url')
Expand Down Expand Up @@ -1030,8 +1069,8 @@ def writeback(self, doc_type, body):

if self.writeback_es:
try:
res = self.writeback_es.create(index=self.writeback_index,
doc_type=doc_type, body=writeback_body)
res = self.writeback_es.index(index=self.writeback_index,
doc_type=doc_type, body=body)
return res
except ElasticsearchException as e:
logging.exception("Error writing alert info to Elasticsearch: %s" % (e))
Expand All @@ -1045,10 +1084,15 @@ def find_recent_pending_alerts(self, time_limit):
# unless there is constantly more than 1000 alerts to send.

# Fetch recent, unsent alerts that aren't part of an aggregate, earlier alerts first.
query = {'query': {'query_string': {'query': '!_exists_:aggregate_id AND alert_sent:false'}},
'filter': {'range': {'alert_time': {'from': dt_to_ts(ts_now() - time_limit),
'to': dt_to_ts(ts_now())}}},
'sort': {'alert_time': {'order': 'asc'}}}
inner_query = {'query_string': {'query': '!_exists_:aggregate_id AND alert_sent:false'}}
time_filter = {'range': {'alert_time': {'from': dt_to_ts(ts_now() - time_limit),
'to': dt_to_ts(ts_now())}}}
sort = {'sort': {'alert_time': {'order': 'asc'}}}
if self.is_five():
query = {'query': {'bool': {'must': inner_query, 'filter': time_filter}}}
else:
query = {'query': inner_query, 'filter': time_filter}
query.update(sort)
if self.writeback_es:
try:
res = self.writeback_es.search(index=self.writeback_index,
Expand All @@ -1057,7 +1101,8 @@ def find_recent_pending_alerts(self, time_limit):
size=1000)
if res['hits']['hits']:
return res['hits']['hits']
except: # TODO: Give this a more relevant exception, try:except: is evil.
except ElasticsearchException as e:
logging.exception("Error finding recent pending alerts: %s %s" % (e, query))
pass
return []

Expand Down Expand Up @@ -1269,8 +1314,13 @@ def is_silenced(self, rule_name):
if self.debug:
return False

query = {'filter': {'term': {'rule_name': rule_name}},
'sort': {'until': {'order': 'desc'}}}
query = {'term': {'rule_name': rule_name}}
sort = {'sort': {'until': {'order': 'desc'}}}
if self.is_five():
query = {'query': query}
else:
query = {'filter': query}
query.update(sort)

if self.writeback_es:
try:
Expand Down
13 changes: 10 additions & 3 deletions elastalert/test_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,18 @@ def test_file(self, conf, args):

# Set up Elasticsearch client and query
es_client = elasticsearch_client(conf)

try:
is_five = es_client.info()['version']['number'].startswith('5')
except Exception as e:
print("Error connecting to ElasticSearch:", file=sys.stderr)
print(repr(e)[:2048], file=sys.stderr)
return None

start_time = ts_now() - datetime.timedelta(days=args.days)
end_time = ts_now()
ts = conf.get('timestamp_field', '@timestamp')
query = ElastAlerter.get_query(conf['filter'], starttime=start_time, endtime=end_time, timestamp_field=ts)
query = ElastAlerter.get_query(conf['filter'], starttime=start_time, endtime=end_time, timestamp_field=ts, five=is_five)
index = ElastAlerter.get_index(conf, start_time, end_time)

# Get one document for schema
Expand All @@ -72,8 +80,7 @@ def test_file(self, conf, args):
doc_type = res['hits']['hits'][0]['_type']

# Get a count of all docs
count_query = ElastAlerter.get_query(conf['filter'], starttime=start_time, endtime=end_time, timestamp_field=ts, sort=False)
count_query = {'query': {'filtered': count_query}}
count_query = ElastAlerter.get_query(conf['filter'], starttime=start_time, endtime=end_time, timestamp_field=ts, sort=False, five=is_five)
try:
res = es_client.count(index, doc_type=doc_type, body=count_query, ignore_unavailable=True)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ boto==2.34.0
botocore==1.4.5
configparser>=3.3.0r2
croniter==0.3.8
elasticsearch==1.3.0
elasticsearch
jira==0.32
jsonschema==2.2.0
mock==1.0.0
Expand Down
2 changes: 1 addition & 1 deletion tests/alerts_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ def test_kibana(ea):
with mock.patch("elastalert.elastalert.elasticsearch_client") as mock_es:
mock_create = mock.Mock(return_value={'_id': 'ABCDEFGH'})
mock_es_inst = mock.Mock()
mock_es_inst.create = mock_create
mock_es_inst.index = mock_create
mock_es.return_value = mock_es_inst
link = ea.generate_kibana_db(rule, match)

Expand Down
Loading

0 comments on commit 1b5f610

Please sign in to comment.