Skip to content

Commit

Permalink
[core] Added dynamic_outputs to Rule (#1095)
Browse files Browse the repository at this point in the history
* Now possible to pass dynamic_outputs to the @rule decorator and have
outputs be dynamically configured based on information in the record.
For example, you could use lookup_tables to map an account_id to an
owner which maps to an output

[testing] Updated unit tests and added additional tests for new
dynamic_outputs

[docs] Added dynamic_outputs documentation

Signed-off-by: jack1902 <[email protected]>
  • Loading branch information
jack1902 authored and ryandeivert committed Feb 19, 2020
1 parent bd914de commit d94e13d
Show file tree
Hide file tree
Showing 7 changed files with 609 additions and 8 deletions.
129 changes: 129 additions & 0 deletions docs/source/dynamic-outputs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
Dynamic Outputs
===============

Prerequisites
-------------

* Any output assigned must be added with ``python manage.py output``
* ``functions`` must return ``None``, ``str`` or ``List[str]`` which maps to an output configured with the above.
* Only pass ``context`` if the ``rule`` sets context.

Overview
--------

Adds the ability to have custom logic run to define an ``output`` or ``outputs`` based on information within the ``record``.
For information on supported outputs and how to add support for additional outputs, see `outputs <outputs.html>`_

As can be seen by the examples below, they are easy to configure, but add a very useful feature to StreamAlert.

- StreamAlert sends to all outputs defined within a rules ``outputs=[]`` and ``dynamic_outputs=[]`` when sending ``Alerts``.
- It is also possible to pass ``context`` to the ``dynamic_function`` if the ``rule`` sets it.

.. note::
Any ``output`` passed must be configured with ``./manage.py output -h``


Dynamic Outputs, Simple
-----------------------

The below code block is considered a simple ``dynamic_output`` function, because the outputs are dynamically configured, but the information used still lives within the code. It also:

- allows you to maintain a static list of information inside your code
- will return the outputs relevant to the team who "own" the account
- ``Alerts`` are sent to the ``aws-sns:security`` output aswell as those returned by the function

.. code-block:: python
from streamalert.shared.rule import rule
def map_account_to_team(record):
teams = {
"team_a": {"accounts": ["123", "456", ...], "outputs": ["aws-sns:team_a"]},
"team_b": {"accounts": ["789", ...], "outputs": ["aws-sns:team_b", "slack:team_b"]},
}
account_id = record.get('recipientaccountid')
for team in teams:
if account_id in team["accounts"]:
return team["outputs"]
# None is guarded against by StreamAlert
@rule(
logs=['cloudwatch:events'],
req_subkeys={
'detail': ['userIdentity', 'eventType']
},
outputs=["aws-sns:security"],
dynamic_outputs=[map_account_to_team]
)
def cloudtrail_root_account_usage(rec):
# Rule logic
Dynamic Outputs, With LookupTables
----------------------------------

With the simple addition of a `lookup-table <lookup-tables.html>`_ you can take a rule like ``cloudtrail_root_account_usage`` and configure it as such:

.. code-block:: python
from streamalert.shared.rule import rule
from streamalert.shared.lookup_tables.core import LookupTables
def dynamic_output_with_context(record, context): # pass context only if the rule added context
account_id = context["account_id"]
return LookupTables.get(
'my_lookup_table',
'aws-account-owner:{}'.format(account_id),
None
) # potentially returns [aws-sns:team_a]
@rule(
logs=['cloudwatch:events'],
outputs=["aws-sns:security],
dynamic_outputs=[dynamic_output_with_context],
context={"account_id": "valid_account_id"},
)
def cloudtrail_root_account_usage(rec):
context["account_id"] = record.get('recipientaccountid')
# Rule logic
The above has the benefit of using information that lives outside of StreamAlert, which means teams can acquire new accounts and get ``Alerts``
without having to alter StreamAlert code.
Dynamic Outputs, With Other Data Source
---------------------------------------
.. code-block:: python
from streamalert.shared.rule import rule
import requests
def dynamic_output(record):
account_id = record.get('recipientaccountid')
# invoke an external API to get data back
response = requests.get("API/team_map")
for team in response.json():
if account_id in team["accounts"]:
return team["outputs"] # potentially "aws-lambda:team_a"
@rule(
logs=['cloudwatch:events'],
outputs=["aws-sns:security],
dynamic_outputs=[dynamic_output],
)
def cloudtrail_root_account_usage(rec):
# Rule logic
The above example uses an external API to get the output map, which is to be queried with the ``account_id`` on the record.
This is just an example, but hopefully highlights many ways in which ``dynamic_outputs`` can be used.
.. warning::
The above example could result in many queries to the API in use and could potentially slow down StreamAlert
Lambdas when processing ``Alerts``.
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Table of Contents
rules
testing
outputs
dynamic-outputs
publishers
lookup-tables
apps
Expand Down
6 changes: 6 additions & 0 deletions docs/source/rules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ The following table provides an overview of each rule option, with more details
``merge_by_keys`` ``List[str]`` List of key names that must match in value before merging alerts
``merge_window_mins`` ``int`` Merge related alerts at this interval rather than sending immediately
``outputs`` ``List[str]`` List of alert outputs
``dynamic_outputs`` ``List[function]`` List of functions which return valid outputs
``req_subkeys`` ``Dict[str, List[str]]`` Subkeys which must be present in the record
===================== ======================== ===============

Expand Down Expand Up @@ -255,6 +256,11 @@ The following table provides an overview of each rule option, with more details

The original (unmerged) alert will always be sent to `Athena <historical-search.html#athena-user-guide>`_.

:dynamic_outputs:

The ``dynamic_outputs`` keyword argument defines additional `outputs <outputs.html>`_ to an Alert which are dynamically generated.
See `dynamic_outputs <dynamic-outputs.html>`_ for more info

:outputs:

Defines the alert destination if the return value of a rule is ``True``.
Expand Down
124 changes: 118 additions & 6 deletions streamalert/rules_engine/rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def _rule_analysis(self, payload, rule):
return

alert = Alert(
rule.name, payload['record'], self._configure_outputs(rule),
rule.name, payload['record'], self._configure_outputs(payload['record'], rule),
cluster=payload['cluster'],
context=rule.context,
log_source=payload['log_schema_type'],
Expand All @@ -213,14 +213,126 @@ def _rule_analysis(self, payload, rule):

return alert

def _configure_outputs(self, rule):
def _configure_outputs(self, record, rule):
"""Configure the outputs for the rule
Args:
record (dict): Record to pass through to dynamic_outputs
rule (rule.Rule): Attributes for the rule which triggered the alert
Returns:
set: unique set of outputs, only required outputs if the rule is staged
"""
# Check if the rule is staged and, if so, only use the required alert outputs
if rule.is_staged(self._rule_table):
all_outputs = self._required_outputs_set
else: # Otherwise, combine the required alert outputs with the ones for this rule
all_outputs = self._required_outputs_set.union(rule.outputs_set)
output_sources = [self._required_outputs_set]
else: # Otherwise, combine all outputs into one
output_sources = [self._required_outputs_set, rule.outputs_set]
if rule.dynamic_outputs:
# append dynamic_outputs to output sources if they exist
dynamic_outputs = self._configure_dynamic_outputs(record, rule)
output_sources.append(dynamic_outputs)

return {
output
for output_source in output_sources
for output in output_source
if self._check_valid_output(output)
}

@classmethod
def _configure_dynamic_outputs(cls, record, rule):
"""Generate list of outputs from dynamic_outputs
Args:
record (dict): Record to pass through to the dynamic_output function
rule (rule.Rule): Attributes for the rule which triggered the alert
Returns:
list: list of additional outputs to append to the current set
"""
args_list = [record]
if rule.context:
# Pass context to dynamic_output function if context exists
args_list.append(rule.context)

return [
output
for dynamic_output_function in rule.dynamic_outputs_set
for output in cls._call_dynamic_output_function(
dynamic_output_function, rule.name, args_list
)
]

@staticmethod
def _call_dynamic_output_function(function, rule_name, args_list):
"""Call the dynamic_output function
Args:
dynamic_output (func): Callable function which returns None, str or List[str]
rule_name (str): The name of the rule the functions belong to
args_list (list): list of args to be passed to the dynamic function
should be (record or record and context)
Returns:
list: list of additional outputs
"""
LOGGER.debug("invoking function %s", function.__name__)

outputs = []

try:
outputs = function(*args_list)
except Exception: # pylint: disable=broad-except
# Logger error and return []
LOGGER.error(
"Exception when calling dynamic_output %s for rule %s",
function.__name__, rule_name
)
else:
LOGGER.debug("function %s returned: %s", function.__name__, outputs)

if isinstance(outputs, str):
# Case 1: outputs is a string
# return outputs wrapped in a list
outputs = [outputs]
elif isinstance(outputs, list):
# Case 2: outputs is a list
# return outputs
pass
else:
# Case 3: outputs is neither a string or a list
# return an empty list
outputs = []

return outputs

@staticmethod
def _check_valid_output(output):
"""Verify output is valid
Args:
output (str): The output to check if its valid
Returns:
True (bool): Output is valid
False (bool): Output is invalid
"""
valid = False

if not isinstance(output, str):
# Case 1: output is not a string
# return False
LOGGER.warning("Output (%s) is not a string", output)
valid = False
elif isinstance(output, str) and ":" not in output:
# Case 2: output is a string but missing ":"
# Log warning and return False
LOGGER.warning("Output (%s) is missing ':'", output)

valid = False
else:
# Case 3: output is a string and contains ":"
# return True
valid = True

return all_outputs
return valid

@classmethod
def _configure_publishers(cls, rule):
Expand Down
6 changes: 6 additions & 0 deletions streamalert/shared/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(self, func, **kwargs):
self.merge_by_keys = kwargs.get('merge_by_keys')
self.merge_window_mins = kwargs.get('merge_window_mins') or 0
self.outputs = kwargs.get('outputs')
self.dynamic_outputs = kwargs.get('dynamic_outputs')
self.publishers = kwargs.get('publishers')
self.req_subkeys = kwargs.get('req_subkeys')
self.initial_context = kwargs.get('context')
Expand Down Expand Up @@ -199,6 +200,11 @@ def description(self, description):
def outputs_set(self):
return set(self.outputs or [])


@property
def dynamic_outputs_set(self):
return set(self.dynamic_outputs or [])

@classmethod
def disabled_rules(cls):
return {
Expand Down
Loading

0 comments on commit d94e13d

Please sign in to comment.