Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASM] EXPANDR-4374 #30601

Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,39 @@ def _canonicalize_string(in_str: str) -> str:
return in_str.lower().strip(' \t"\'')


def get_indicators_from_key_value_pairs(observed_key_value_pairs: list, is_indicator_match: Callable) -> list:
def get_indicators_from_list(observed_list: list, is_indicator_match: Callable, comparison_type: str) -> list:
"""
Returns list of matches based on criteria.

Args:
observed_key_value_pairs (List[str]): list of tags to process.
observed_list (List[str]): list of tags to process.
is_indicator_match (callable): what function to call depending on dev or prod checking.
comparison_type (str): if comparing list of dictionaries or a list of strings.

Returns:
list: list of matches based on exact/partial dev criteria.
"""
indicators = []
for kv_pair in observed_key_value_pairs:
if not isinstance(kv_pair, Mapping):
demisto.info(f"Ignoring item because it is not a mapping: {kv_pair}")
else:
if "key" not in kv_pair or "value" not in kv_pair:
demisto.info(f"Ignoring item because it lacks the keys 'key' and/or 'value': {sorted(kv_pair.keys())}")
for list_entry in observed_list:
if comparison_type == "dictionary":
if not isinstance(list_entry, Mapping):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not isinstance(list_entry, Mapping):
if not isinstance(list_entry, dict):

Does this work instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from what I have seen, it works exactly the same. I didn't write that line, but can change if you want.

demisto.info(f"Ignoring item because it is not a mapping: {list_entry}")
else:
key = _canonicalize_string(kv_pair.get("key", ""))
value = _canonicalize_string(kv_pair.get("value", ""))

if (("env" in key) or (key in ("stage", "function", "lifecycle", "usage", "tier"))) and is_indicator_match(value):
indicators.append(kv_pair)

if "key" not in list_entry or "value" not in list_entry:
demisto.info(f"Ignoring item because it lacks the keys 'key' and/or 'value': {sorted(list_entry.keys())}")
else:
key = _canonicalize_string(list_entry.get("key", ""))
value = _canonicalize_string(list_entry.get("value", ""))

if (("env" in key) or (key in ("stage", "function", "lifecycle", "usage", "tier"))) and \
is_indicator_match(value):
indicators.append(list_entry)
elif comparison_type == "string":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i might pull this logic out into a separate function to handle list[str] inputs, while preserving this one for list[dict] inputs, but that just be a matter of preference since functionally they're equivalent

value = _canonicalize_string(list_entry)
if is_indicator_match(value):
indicators.append(list_entry)
else:
break
return indicators


Expand Down Expand Up @@ -112,23 +120,30 @@ def get_indicators_from_external_classification(classifications: list[str]) -> l
return ext_classification_match


def determine_reason(external_indicators: list, matches: list) -> str:
def determine_reason(external_indicators: list, tags: list, hierarchy: list, provider: str) -> str:
"""
Craft the 'reason' for the final verdict of "development" server or not.

Args:
external_indicators (list): to determine there is an external service classification match.
Empty list means no matches.
matches (list): list of matches of tags with DEV or PROD characteristics.
tags (list): list of matches of tags with DEV or PROD characteristics.
hierarchy (list): list of matches of hierarchy information with DEV or PROD characteristics.
provider (str): provider of the asset as returned by Xpanse.

Returns:
str: complete `reason` string to be added to the gridfield.
"""
reason_parts = []
if len(external_indicators) == 1:
reason_parts.append("external classification of " + DEV_ENV_CLASSIFICATION)
for match in matches:
reason_parts.append("tag {" + f"{match.get('key')}: {match.get('value')}" + "} from " + match.get('source'))
for tag in tags:
reason_parts.append("tag {" + f"{tag.get('key')}: {tag.get('value')}" + "} from " + tag.get('source'))
for match in hierarchy:
if provider:
reason_parts.append("infrastructure hierarchy information `" + f"{match}" + "` from " + provider)
else:
reason_parts.append("infrastructure hierarchy information `" + f"{match}" + "`")
reason_final = "match on "
for reason in reason_parts:
reason_final += reason + ", "
Expand All @@ -139,33 +154,37 @@ def determine_reason(external_indicators: list, matches: list) -> str:
return reason_final


def final_decision(external_indicators: list, dev_matches: list, prod_matches: list) -> dict:
def final_decision(external_indicators: list, dev_tags: list, prod_tags: list, dev_hierarchy: list,
prod_hierarchy: list, provider: str) -> dict:
"""
Final decision to be set in gridfield.

Args:
external_indicators (list): list of matches of external service classification match.
dev_matches (list): list of matches of tags with DEV characteristics.
prod_matches (list): list of matches of tags with PROD characteristics.
dev_tags (list): list of matches of tags with DEV characteristics.
prod_tags (list): list of matches of tags with PROD characteristics.
dev_hierarchy (list): list of matches of hierarchy information with DEV characteristics.
prod_hierarchy (list): list of matches of hierarchy information with PROD characteristics.
provider (str): provider of the asset as returned by Xpanse.

Returns:
dict: dictionary to be added to gridfield.
"""
final_dict: dict[str, Any] = {}
if (len(external_indicators) == 1 or len(dev_matches) > 0) and len(prod_matches) == 0:
if (len(external_indicators) == 1 or len(dev_tags + dev_hierarchy) > 0) and len(prod_tags + prod_hierarchy) == 0:
final_dict["result"] = True
final_dict["confidence"] = "Likely Development"
reason_final = determine_reason(external_indicators, dev_matches)
reason_final = determine_reason(external_indicators, dev_tags, dev_hierarchy, provider)
final_dict["reason"] = reason_final
elif (len(external_indicators) == 1 or len(dev_matches) > 0) and len(prod_matches) > 0:
elif (len(external_indicators) == 1 or len(dev_tags + dev_hierarchy) > 0) and len(prod_tags + prod_hierarchy) > 0:
final_dict["result"] = False
final_dict["confidence"] = "Conflicting Information"
reason_final = determine_reason(external_indicators, dev_matches + prod_matches)
reason_final = determine_reason(external_indicators, dev_tags + prod_tags, dev_hierarchy + prod_hierarchy, provider)
final_dict["reason"] = reason_final
elif (len(external_indicators) == 0 and len(dev_matches) == 0) and len(prod_matches) > 0:
elif (len(external_indicators) == 0 and len(dev_tags + dev_hierarchy) == 0) and len(prod_tags + prod_hierarchy) > 0:
final_dict["result"] = False
final_dict["confidence"] = "Likely Production"
reason_final = determine_reason(external_indicators, prod_matches)
reason_final = determine_reason(external_indicators, prod_tags, prod_hierarchy, provider)
final_dict["reason"] = reason_final
else:
final_dict["result"] = False
Expand Down Expand Up @@ -217,13 +236,19 @@ def main():
args = demisto.args()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add the new hierarchy and provider args to the docstring for the main function? The reference to parameter observed_key_value_pairs might also need updating

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also this might be the more appropriate place to elaborate on what hierarchy_info is (as opposed to where i commented on the yml)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


internal_tags: list[dict[str, Any]] = argToList(args.get("asm_tags", [{}]))
dev_kv_indicators = get_indicators_from_key_value_pairs(internal_tags, is_dev_indicator)
prod_kv_indicators = get_indicators_from_key_value_pairs(internal_tags, is_prod_indicator)
dev_kv_indicators = get_indicators_from_list(internal_tags, is_dev_indicator, "dictionary")
prod_kv_indicators = get_indicators_from_list(internal_tags, is_prod_indicator, "dictionary")

hierarchy_info = argToList(args.get("hierarchy_info", []))
dev_hierarchy_indicators = get_indicators_from_list(hierarchy_info, is_dev_indicator, "string")
prod_hierarchy_indicators = get_indicators_from_list(hierarchy_info, is_prod_indicator, "string")

external_active_classifications: list[str] = argToList(args.get("active_classifications", []))
external_indicators = get_indicators_from_external_classification(external_active_classifications)

decision_dict = final_decision(external_indicators, dev_kv_indicators, prod_kv_indicators)
provider: str = args.get("provider", None)
decision_dict = final_decision(external_indicators, dev_kv_indicators, prod_kv_indicators,
dev_hierarchy_indicators, prod_hierarchy_indicators, provider)
demisto.executeCommand("setAlert", {"asmdevcheckdetails": [decision_dict]})

output = tableToMarkdown("Dev Check Results", decision_dict, ['result_readable', 'confidence', 'reason'])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
args:
- description: 'Array of key-value objects. Each object within the array must contain the keys "Key" and "Value" to be considered. The values associated with those keys can be arbitrary. Example: [{"Key": "env", "Value": "dev"}, {"Key": "Name", "Value": "ssh-ec2-machine-name"}]'
- description: 'Array of key-value objects. Each object within the array must contain the keys "Key" and "Value" to be considered. The values associated with those keys can be arbitrary. Example: [{"Key": "env", "Value": "dev"}, {"Key": "Name", "Value": "ssh-ec2-machine-name"}].'
isArray: true
name: asm_tags
- description: 'Array of strings representing the Xpanse ASM "active classifications" for the service. Example: ["RdpServer", "SelfSignedCertificate"]'
- description: 'Array of strings representing the Xpanse ASM "active classifications" for the service. Example: ["RdpServer", "SelfSignedCertificate"].'
isArray: true
name: active_classifications
- description: infrastructure hierarchy information to include CSPs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure i understand what this means, can you elaborate slightly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name: hierarchy_info
- description: Provider of the asset as returned by Xpanse.
name: provider
comment: Identify whether the service is a "development" server. Development servers have no external users and run no production workflows. These servers might be named "dev", but they might also be named "qa", "pre-production", "user acceptance testing", or use other non-production terms. This automation uses both public data visible to anyone (`active_classifications` as derived by Xpanse ASM) as well as checking internal data for AI-learned indicators of development systems (`asm_tags` as derived from integrations with non-public systems).
commonfields:
id: InferWhetherServiceIsDev
version: -1
dockerimage: demisto/python3:3.10.12.68714
dockerimage: demisto/python3:3.10.13.78960
enabled: true
name: InferWhetherServiceIsDev
runas: DBotWeakRole
Expand All @@ -21,3 +25,5 @@ type: python
fromversion: 6.5.0
tests:
- No tests (auto formatted)
engineinfo: {}
runonce: false
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ def test_canonicalize():
([{"key": "ENV", "value": "dv"}, {"key": "stage", "value": "sbx"}],
[{"key": "ENV", "value": "dv"}, {"key": "stage", "value": "sbx"}])
])
def test_get_indicators_from_key_value_pairs(tags_raw, matches):
from InferWhetherServiceIsDev import get_indicators_from_key_value_pairs
def test_get_indicators_from_list(tags_raw, matches):
from InferWhetherServiceIsDev import get_indicators_from_list
from InferWhetherServiceIsDev import is_dev_indicator

assert get_indicators_from_key_value_pairs(tags_raw, is_dev_indicator) == matches
assert get_indicators_from_list(tags_raw, is_dev_indicator, "dictionary") == matches


def test_is_dev_indicator():
Expand Down Expand Up @@ -72,7 +72,7 @@ def test_get_indicators_from_external_classification(classifications, matches):
def test_determine_reason(external, internal, reason):
from InferWhetherServiceIsDev import determine_reason

assert determine_reason(external, internal) == reason
assert determine_reason(external, internal, [], "") == reason
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want another test for if provider: in determine_reason or is provider not functionally relevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not doing full testing on determine_reason(). i can, but next person will have to maintain 93%+ unit test coverage



def test_full_truth_table():
Expand All @@ -81,34 +81,82 @@ def test_full_truth_table():
# Blank list means no external classification or tag matches.
sample_no_match = []
sample_dev_classification = ["DevelopmentEnvironment"]
sample_dev_hierarchy = ["ENG-DEV"]
sample_prod_hierarchy = ["ENG-PROD"]

from InferWhetherServiceIsDev import final_decision

# dev == True, all else is False

# kv pair contains no indicators
# DevEnv is set (--> dev)
assert final_decision(sample_dev_classification, sample_no_match, sample_no_match)["result"]
assert final_decision(sample_dev_classification, sample_no_match, sample_no_match,
sample_no_match, sample_no_match, "")["result"]
# DevEnv is not set (--> can't tell)
assert not final_decision(sample_no_match, sample_no_match, sample_no_match)["result"]
assert not final_decision(sample_no_match, sample_no_match, sample_no_match, sample_no_match, sample_no_match, "")["result"]

# kv pair contains dev indicators only
# DevEnv is set (--> dev)
assert final_decision(sample_dev_classification, sample_dev_tag, sample_no_match)["result"]
# Dev Tags only
assert final_decision(sample_dev_classification, sample_dev_tag, sample_no_match,
sample_no_match, sample_no_match, "")["result"]
# Dev Hierachy only
assert final_decision(sample_dev_classification, sample_no_match, sample_no_match,
sample_dev_hierarchy, sample_no_match, "")["result"]
# Both Dev Tags and Hierarchy
assert final_decision(sample_dev_classification, sample_dev_tag, sample_no_match,
sample_dev_hierarchy, sample_no_match, "")["result"]
#
# DevEnv is not set (--> dev)
assert final_decision(sample_no_match, sample_dev_tag, sample_no_match)["result"]
# Dev Tag only
assert final_decision(sample_no_match, sample_dev_tag, sample_no_match, sample_no_match, sample_no_match, "")["result"]
# Dev Hierachy only
assert final_decision(sample_no_match, sample_no_match, sample_no_match, sample_dev_hierarchy, sample_no_match, "")["result"]
# Both Dev Tags and Hierarchy
assert final_decision(sample_no_match, sample_no_match, sample_no_match, sample_dev_hierarchy, sample_no_match, "")["result"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this was intended to have sample_dev_tag as the second arg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in PR feedback


# kv pair contains prod indicators only
# DevEnv is set (--> conflicting)
assert not final_decision(sample_dev_classification, sample_no_match, sample_prod_tag)["result"]
# PROD Tag only
assert not final_decision(sample_dev_classification, sample_no_match, sample_prod_tag,
sample_no_match, sample_no_match, "")["result"]
# PROD Hierachy only
assert not final_decision(sample_dev_classification, sample_no_match, sample_no_match,
sample_no_match, sample_prod_hierarchy, "")["result"]
# Both PROD Tags and Hierarchy
assert not final_decision(sample_dev_classification, sample_no_match, sample_prod_tag,
sample_no_match, sample_prod_hierarchy, "")["result"]
#
# DevEnv is not set (--> prod)
assert not final_decision(sample_no_match, sample_no_match, sample_prod_tag)["result"]
# PROD Tag only
assert not final_decision(sample_no_match, sample_no_match, sample_prod_tag, sample_no_match, sample_no_match, "")["result"]
# PROD Hierachy only
assert not final_decision(sample_no_match, sample_no_match, sample_no_match,
sample_no_match, sample_prod_hierarchy, "")["result"]
# Both PROD Tags and Hierarchy
assert not final_decision(sample_no_match, sample_no_match, sample_prod_tag,
sample_no_match, sample_prod_hierarchy, "")["result"]

# kv pair contains conflicting indicators
# DevEnv is set (--> conflicting)
assert not final_decision(sample_dev_classification, sample_dev_tag, sample_prod_tag)["result"]
# Conflicting tags only
assert not final_decision(sample_dev_classification, sample_dev_tag, sample_prod_tag,
sample_no_match, sample_no_match, "")["result"]
# Conflicting hierarchy only
assert not final_decision(sample_dev_classification, sample_no_match, sample_no_match,
sample_dev_hierarchy, sample_prod_hierarchy, "")["result"]
# Conflicting hiearchy and tags (would need other combinations to do full truth table)
assert not final_decision(sample_dev_classification, sample_dev_tag, sample_prod_tag,
sample_dev_hierarchy, sample_prod_hierarchy, "")["result"]
#
# DevEnv is not set (--> conflicting)
assert not final_decision(sample_no_match, sample_dev_tag, sample_prod_tag)["result"]
assert not final_decision(sample_no_match, sample_dev_tag, sample_prod_tag, sample_no_match, sample_no_match, "")["result"]
# Conflicting hierarchy only
assert not final_decision(sample_no_match, sample_no_match, sample_no_match,
sample_dev_hierarchy, sample_prod_hierarchy, "")["result"]
# Conflicting hiearchy and tags (would need other combinations to do full truth table)
assert not final_decision(sample_no_match, sample_dev_tag, sample_prod_tag,
sample_dev_hierarchy, sample_prod_hierarchy, "")["result"]


@pytest.mark.parametrize('in_classifications,in_tags,expected_out_boolean',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
Identify whether the service is a "development" server. Development servers have no external users and run no production workflows. These servers might be named "dev", but they might also be named "qa", "pre-production", "user acceptance testing", or use other non-production terms. This automation uses both public data visible to anyone (`active_classifications` as derived by Xpanse ASM) as well as checking internal data for AI-learned indicators of development systems (`asm_tags` as derived from integrations with non-public systems).

## Script Data

---

| **Name** | **Description** |
| --- | --- |
| Script Type | python3 |
| Cortex XSOAR Version | 6.5.0 |

## Used In

---
This script is used in the following playbooks and scripts.

* Cortex ASM - Enrichment

## Inputs

---

| **Argument Name** | **Description** |
| --- | --- |
| asm_tags | Array of key-value objects. Each object within the array must contain the keys "Key" and "Value" to be considered. The values associated with those keys can be arbitrary. Example: \[\{"Key": "env", "Value": "dev"\}, \{"Key": "Name", "Value": "ssh-ec2-machine-name"\}\] |
| active_classifications | Array of strings representing the Xpanse ASM "active classifications" for the service. Example: \["RdpServer", "SelfSignedCertificate"\] |
| asm_tags | Array of key-value objects. Each object within the array must contain the keys "Key" and "Value" to be considered. The values associated with those keys can be arbitrary. Example: \[\{"Key": "env", "Value": "dev"\}, \{"Key": "Name", "Value": "ssh-ec2-machine-name"\}\]. |
| active_classifications | Array of strings representing the Xpanse ASM "active classifications" for the service. Example: \["RdpServer", "SelfSignedCertificate"\]. |
| hierarchy_info | infrastructure hierarchy information to include CSPs. |
| provider | Provider of the asset as returned by Xpanse. |

## Outputs

---
There are no outputs for this script.
Loading