Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] adding support for dynamic_outputs in rules #1095

Merged
merged 1 commit into from
Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions docs/source/dynamic-outputs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
Dynamic Outputs
===============

Prerequisites
-------------

* Any output assigned must be added with ``python manage.py output``
* ``functions`` must return ``None``, ``str`` or ``List[str]`` which maps to an output configured with the above.
* Only pass ``context`` if the ``rule`` sets context.

Overview
--------

Adds the ability to have custom logic run to define an ``output`` or ``outputs`` based on information within the ``record``.
For information on supported outputs and how to add support for additional outputs, see `outputs <outputs.html>`_

As can be seen by the examples below, they are easy to configure, but add a very useful feature to StreamAlert.

- StreamAlert sends to all outputs defined within a rules ``outputs=[]`` and ``dynamic_outputs=[]`` when sending ``Alerts``.
- It is also possible to pass ``context`` to the ``dynamic_function`` if the ``rule`` sets it.

.. note::
Any ``output`` passed must be configured with ``./manage.py output -h``


Dynamic Outputs, Simple
-----------------------

The below code block is considered a simple ``dynamic_output`` function, because the outputs are dynamically configured, but the information used still lives within the code. It also:

- allows you to maintain a static list of information inside your code
- will return the outputs relevant to the team who "own" the account
- ``Alerts`` are sent to the ``aws-sns:security`` output aswell as those returned by the function

.. code-block:: python
from streamalert.shared.rule import rule
def map_account_to_team(record):
teams = {
"team_a": {"accounts": ["123", "456", ...], "outputs": ["aws-sns:team_a"]},
"team_b": {"accounts": ["789", ...], "outputs": ["aws-sns:team_b", "slack:team_b"]},
}
account_id = record.get('recipientaccountid')
for team in teams:
if account_id in team["accounts"]:
return team["outputs"]
# None is guarded against by StreamAlert
@rule(
logs=['cloudwatch:events'],
req_subkeys={
'detail': ['userIdentity', 'eventType']
},
outputs=["aws-sns:security"],
dynamic_outputs=[map_account_to_team]
)
def cloudtrail_root_account_usage(rec):
# Rule logic
Dynamic Outputs, With LookupTables
----------------------------------

With the simple addition of a `lookup-table <lookup-tables.html>`_ you can take a rule like ``cloudtrail_root_account_usage`` and configure it as such:

.. code-block:: python
from streamalert.shared.rule import rule
from streamalert.shared.lookup_tables.core import LookupTables
def dynamic_output_with_context(record, context): # pass context only if the rule added context
account_id = context["account_id"]
return LookupTables.get(
'my_lookup_table',
'aws-account-owner:{}'.format(account_id),
None
) # potentially returns [aws-sns:team_a]
@rule(
logs=['cloudwatch:events'],
outputs=["aws-sns:security],
dynamic_outputs=[dynamic_output_with_context],
context={"account_id": "valid_account_id"},
)
def cloudtrail_root_account_usage(rec):
context["account_id"] = record.get('recipientaccountid')
# Rule logic
The above has the benefit of using information that lives outside of StreamAlert, which means teams can acquire new accounts and get ``Alerts``
without having to alter StreamAlert code.
Dynamic Outputs, With Other Data Source
---------------------------------------
.. code-block:: python
from streamalert.shared.rule import rule
import requests
def dynamic_output(record):
account_id = record.get('recipientaccountid')
# invoke an external API to get data back
response = requests.get("API/team_map")
for team in response.json():
if account_id in team["accounts"]:
return team["outputs"] # potentially "aws-lambda:team_a"
@rule(
logs=['cloudwatch:events'],
outputs=["aws-sns:security],
dynamic_outputs=[dynamic_output],
)
def cloudtrail_root_account_usage(rec):
# Rule logic
The above example uses an external API to get the output map, which is to be queried with the ``account_id`` on the record.
This is just an example, but hopefully highlights many ways in which ``dynamic_outputs`` can be used.
.. warning::
The above example could result in many queries to the API in use and could potentially slow down StreamAlert
Lambdas when processing ``Alerts``.
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Table of Contents
rules
testing
outputs
dynamic-outputs
publishers
lookup-tables
metrics
Expand Down
9 changes: 8 additions & 1 deletion docs/source/rules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ The following table provides an overview of each rule option, with more details
``merge_by_keys`` ``List[str]`` List of key names that must match in value before merging alerts
``merge_window_mins`` ``int`` Merge related alerts at this interval rather than sending immediately
``outputs`` ``List[str]`` List of alert outputs
``dynamic_outputs`` ``List[function]`` List of functions which return valid outputs
``req_subkeys`` ``Dict[str, List[str]]`` Subkeys which must be present in the record
===================== ======================== ===============

Expand Down Expand Up @@ -252,10 +253,16 @@ but those keys can be nested anywhere in the record structure.
outputs
~~~~~~~

Defines the alert destination if the return value of a rule is ``True``.
The ``outputs`` keyword argument defines the alert destination if the return value of a rule is ``True``.
Alerts are always sent to an :ref:`Athena table <athena_user_guide>` which is easy to query.
Any number of additional `outputs <outputs.html>`_ can be specified.

dynamic_outputs
~~~~~~~~~~~~~~~

The ``dynamic_outputs`` keyword argument defines additional `outputs <outputs.html>`_ to an Alert which are dynamically generated.
See `dynamic_outputs <dynamic_outputs.html>`_ for more info

req_subkeys
~~~~~~~~~~~

Expand Down
124 changes: 118 additions & 6 deletions streamalert/rules_engine/rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def _rule_analysis(self, payload, rule):
return

alert = Alert(
rule.name, payload['record'], self._configure_outputs(rule),
rule.name, payload['record'], self._configure_outputs(payload['record'], rule),
cluster=payload['cluster'],
context=rule.context,
log_source=payload['log_schema_type'],
Expand All @@ -213,14 +213,126 @@ def _rule_analysis(self, payload, rule):

return alert

def _configure_outputs(self, rule):
def _configure_outputs(self, record, rule):
"""Configure the outputs for the rule
Args:
record (dict): Record to pass through to dynamic_outputs
rule (rule.Rule): Attributes for the rule which triggered the alert
Returns:
set: unique set of outputs, only required outputs if the rule is staged
"""
# Check if the rule is staged and, if so, only use the required alert outputs
if rule.is_staged(self._rule_table):
all_outputs = self._required_outputs_set
else: # Otherwise, combine the required alert outputs with the ones for this rule
all_outputs = self._required_outputs_set.union(rule.outputs_set)
output_sources = [self._required_outputs_set]
else: # Otherwise, combine all outputs into one
output_sources = [self._required_outputs_set, rule.outputs_set]
if rule.dynamic_outputs:
# append dynamic_outputs to output sources if they exist
dynamic_outputs = self._configure_dynamic_outputs(record, rule)
output_sources.append(dynamic_outputs)

return {
output
for output_source in output_sources
for output in output_source
if self._check_valid_output(output)
}

@classmethod
def _configure_dynamic_outputs(cls, record, rule):
"""Generate list of outputs from dynamic_outputs
Args:
record (dict): Record to pass through to the dynamic_output function
rule (rule.Rule): Attributes for the rule which triggered the alert
Returns:
list: list of additional outputs to append to the current set
"""
args_list = [record]
if rule.context:
# Pass context to dynamic_output function if context exists
args_list.append(rule.context)

return [
output
for dynamic_output_function in rule.dynamic_outputs_set
for output in cls._call_dynamic_output_function(
dynamic_output_function, rule.name, args_list
)
]

@staticmethod
def _call_dynamic_output_function(function, rule_name, args_list):
"""Call the dynamic_output function
Args:
dynamic_output (func): Callable function which returns None, str or List[str]
rule_name (str): The name of the rule the functions belong to
args_list (list): list of args to be passed to the dynamic function
should be (record or record and context)
Returns:
list: list of additional outputs
"""
LOGGER.debug("invoking function %s", function.__name__)

outputs = []

try:
outputs = function(*args_list)
except Exception: # pylint: disable=broad-except
# Logger error and return []
LOGGER.error(
"Exception when calling dynamic_output %s for rule %s",
function.__name__, rule_name
)
else:
LOGGER.debug("function %s returned: %s", function.__name__, outputs)

if isinstance(outputs, str):
# Case 1: outputs is a string
# return outputs wrapped in a list
outputs = [outputs]
elif isinstance(outputs, list):
# Case 2: outputs is a list
# return outputs
pass
else:
# Case 3: outputs is neither a string or a list
# return an empty list
outputs = []

return outputs

@staticmethod
def _check_valid_output(output):
"""Verify output is valid
Args:
output (str): The output to check if its valid
Returns:
True (bool): Output is valid
False (bool): Output is invalid
"""
valid = False

if not isinstance(output, str):
# Case 1: output is not a string
# return False
LOGGER.warning("Output (%s) is not a string", output)
valid = False
elif isinstance(output, str) and ":" not in output:
# Case 2: output is a string but missing ":"
# Log warning and return False
LOGGER.warning("Output (%s) is missing ':'", output)

valid = False
else:
# Case 3: output is a string and contains ":"
# return True
valid = True

return all_outputs
return valid

@classmethod
def _configure_publishers(cls, rule):
Expand Down
6 changes: 6 additions & 0 deletions streamalert/shared/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(self, func, **kwargs):
self.merge_by_keys = kwargs.get('merge_by_keys')
self.merge_window_mins = kwargs.get('merge_window_mins') or 0
self.outputs = kwargs.get('outputs')
self.dynamic_outputs = kwargs.get('dynamic_outputs')
self.publishers = kwargs.get('publishers')
self.req_subkeys = kwargs.get('req_subkeys')
self.initial_context = kwargs.get('context')
Expand Down Expand Up @@ -199,6 +200,11 @@ def description(self, description):
def outputs_set(self):
return set(self.outputs or [])


@property
def dynamic_outputs_set(self):
return set(self.dynamic_outputs or [])

@classmethod
def disabled_rules(cls):
return {
Expand Down
Loading