From 5f226322c06b9ba3a45e58f8890573efa6ac0fdb Mon Sep 17 00:00:00 2001 From: koscinv <35606412+koscinv@users.noreply.github.com> Date: Thu, 9 Nov 2023 11:45:17 -0500 Subject: [PATCH 1/2] Add CWE-PoC list file (#376) * Add files via upload This CSV file contains a list of CWEs which may result in a PoC value for "state of exploitation" because "the vulnerability has a well-known method of exploitation." It contains links to potential exploit tools. It also contains CWEs which could not be PoCs as well as some reasoning behind this. * move cwe csv file to a folder * add csv as table in exploitation documentation --------- Co-authored-by: Allen D. Householder --- .../cwe/possible-cwe-with-poc-examples.csv | 157 ++++++++++++++++++ .../reference/decision_points/exploitation.md | 32 +++- 2 files changed, 180 insertions(+), 9 deletions(-) create mode 100644 data/csvs/cwe/possible-cwe-with-poc-examples.csv diff --git a/data/csvs/cwe/possible-cwe-with-poc-examples.csv b/data/csvs/cwe/possible-cwe-with-poc-examples.csv new file mode 100644 index 00000000..c8fdc97b --- /dev/null +++ b/data/csvs/cwe/possible-cwe-with-poc-examples.csv @@ -0,0 +1,157 @@ +CWE-ID,CWE name,In NVD's CWE Slice?,Possible PoC? ,How could vulnerabilities containing this CWE be exploited?,Tools,Links to tools +20,Improper Input Validation,yes,no,,, +22,Improper Limitation of a Pathname to a Restricted Directory ('Path Traversal'),yes,yes,"directory/path traversal ""../""",Panoptic; Burp Suite,https://github.com/lightos/Panoptic; https://portswigger.net/burp +59,Improper Link Resolution Before File Access ('Link Following'),yes,yes,symlink attack,No specialized resources are required to execute this type of attack. The only requirement is the ability to create the necessary symbolic link.,https://capec.mitre.org/data/definitions/132.html +73,External Control of File Name or Path,no,no,,, +74,Improper Neutralization of Special Elements in Output Used by a Downstream Component ('Injection'),yes,no,,, +77,Improper Neutralization of Special Elements used in a Command ('Command Injection'),yes,yes,command injection,Commix,https://github.com/commixproject/commix +78,Improper Neutralization of Special Elements used in an OS Command ('OS Command Injection'),yes,yes,OS command injection,Commix; Burp Suite,https://github.com/commixproject/commix; https://portswigger.net/burp +79,Improper Neutralization of Input During Web Page Generation ('Cross-site Scripting'),yes,yes,cross-site scripting attack,XSSER; Pybelt; XSStrike,https://github.com/epsylon/xsser; https://github.com/Ekultek/Pybelt; https://github.com/s0md3v/XSStrike +88,Improper Neutralization of Argument Delimiters in a Command ('Argument Injection'),yes,yes,argument/parameter injection,Argument Injection Hammer,https://github.com/nccgroup/argumentinjectionhammer +89,Improper Neutralization of Special Elements used in an SQL Command ('SQL Injection'),yes,yes,malicious SQL command injection,SQLMap; BBQSQL; JSQL injection; NoSQLMap,https://github.com/sqlmapproject/sqlmap; https://github.com/CiscoCXSecurity/bbqsql; https://github.com/ron190/jsql-injection; https://github.com/codingo/NoSQLMap +91,XML Injection (aka Blind XPath Injection),yes,yes,"inject XML code into a web input, XML file or stream",XXExploiter,https://github.com/luisfontes19/xxexploiter +94,Improper Control of Generation of Code ('Code Injection'),yes,no,,, +115,Misinterpretation of Input,no,no,,, +116,Improper Encoding or Escaping of Output,yes,no,,, +119,Improper Restriction of Operations within the Bounds of a Memory Buffer,yes,no,,, +120,Buffer Copy without Checking Size of Input ('Classic Buffer Overflow'),yes,no,,, +122,Heap-based Buffer Overflow,no,no,,, +125,Out-of-bounds Read,yes,no,,, +129,Improper Validation of Array Index,yes,no,,, +131,Incorrect Calculation of Buffer Size,yes,no,,, +134,Use of Externally-Controlled Format String,yes,no,,, +178,Improper Handling of Case Sensitivity,yes,no,,, +190,Integer Overflow or Wraparound,yes,no,,, +191,Integer Underflow (Wrap or Wraparound),yes,no,,, +193,Off-by-one Error,yes,no,,, +194,Unexpected Sign Extension,no,no,,, +200,Exposure of Sensitive Information to an Unauthorized Actor,yes,no,,, +201,Insertion of Sensitive Information Into Sent Data,no,no,,, +203,Observable Discrepancy,yes,no,,, +209,Generation of Error Message Containing Sensitive Information,yes,yes,read/capture sensitive information contained in error message,OWASP ZAP; Burp Suite,https://www.zaproxy.org/; https://portswigger.net/burp +212,Improper Removal of Sensitive Information Before Storage or Transfer,yes,no,,, +252,Unchecked Return Value,yes,no,,, +257,Storing Passwords in a Recoverable Format,no,no,,, +264,"Permissions, Privileges, and Access Controls",no,no,,, +269,Improper Privilege Management,yes,no,,, +273,Improper Check for Dropped Privileges,yes,no,,, +275,Permission Issues,no,no,,, +276,Incorrect Default Permissions,yes,yes,try to access data or privileges you normally should not have access to,"No specialized resources are required to execute this type of attack. In order to discover unrestricted resources, the attacker does not need special tools or skills. They only have to observe the resources or access mechanisms invoked as each action is performed and then try and access those access mechanisms directly.",https://capec.mitre.org/data/definitions/1.html +280,Improper Handling of Insufficient Permissions or Privileges,no,no,,, +281,Improper Preservation of Permissions,yes,no,,, +284,Improper Access Control,no,no,,, +287,Improper Authentication,yes,no,,, +290,Authentication Bypass by Spoofing,yes,no,,, +294,Authentication Bypass by Capture-replay,yes,yes,capture-replay attack,Wireshark; smartsniff,https://www.wireshark.org/; https://www.nirsoft.net/utils/smsniff.html +295,Improper Certificate Validation,yes,no,,, +305,Authentication Bypass by Primary Weakness,no,no,,, +306,Missing Authentication for Critical Function,yes,no,,, +307,Improper Restriction of Excessive Authentication Attempts,yes,yes,brute force attack,THC Hydra; John the Ripper; L0phtCrack; Hashcat,https://github.com/vanhauser-thc/thc-hydra; https://github.com/openwall/john; https://gitlab.com/l0phtcrack/l0phtcrack; https://hashcat.net/hashcat/ +311,Missing Encryption of Sensitive Data,yes,no,,, +312,Cleartext Storage of Sensitive Information,yes,yes,find sensitive data stored in system,OWASP ZAP; Burp Suite,https://www.zaproxy.org/; https://portswigger.net/burp +319,Cleartext Transmission of Sensitive Information,yes,yes,capture traffic and extract sensitive information,Wireshark; Smartsniff,https://www.wireshark.org/; https://www.nirsoft.net/utils/smsniff.html +321,Use of Hard-coded Cryptographic Key,no,no,,, +326,Inadequate Encryption Strength,yes,no,,, +327,Use of a Broken or Risky Cryptographic Algorithm,yes,no,,, +330,Use of Insufficiently Random Values,yes,yes,brute force attack,THC Hydra; John the Ripper; L0phtCrack; Hashcat,https://github.com/vanhauser-thc/thc-hydra; https://github.com/openwall/john; https://gitlab.com/l0phtcrack/l0phtcrack; https://hashcat.net/hashcat/ +331,Insufficient Entropy,yes,yes,brute force attack/predictive programs,hashcat; php_mt_seed,https://hashcat.net/hashcat/; https://github.com/openwall/php_mt_seed +335,Incorrect Usage of Seeds in Pseudo-Random Number Generator (PRNG),yes,no,,, +337,Predictable Seed in Pseudo-Random Number Generator (PRNG),no,no,,, +338,Use of Cryptographically Weak Pseudo-Random Number Generator (PRNG),yes,no,,, +345,Insufficient Verification of Data Authenticity,yes,no,,, +346,Origin Validation Error,yes,no,,, +347,Improper Verification of Cryptographic Signature,yes,no,,, +352,Cross-Site Request Forgery (CSRF),yes,yes,CSRF,Burp Suite; XSRFProbe,https://portswigger.net/burp; https://github.com/0xInfection/XSRFProbe +354,Improper Validation of Integrity Check Value,yes,no,,, +362,Concurrent Execution using Shared Resource with Improper Synchronization ('Race Condition'),yes,no,,, +367,Time-of-check Time-of-use (TOCTOU) Race Condition,yes,no,,, +369,Divide By Zero,yes,no,,, +384,Session Fixation,yes,no,,, +388,7PK - Errors,no,no,,, +400,Uncontrolled Resource Consumption,yes,no,,, +401,Missing Release of Memory after Effective Lifetime,yes,no,,, +404,Improper Resource Shutdown or Release,yes,no,,, +405,Asymmetric Resource Consumption (Amplification),no,no,,, +407,Inefficient Algorithmic Complexity,yes,no,,, +415,Double Free,yes,no,,, +416,Use After Free,yes,no,,, +425,Direct Request ('Forced Browsing'),yes,yes,forcibly navigate to unintended (by the system) URLs,Dirbuster; Dirstalk,https://sourceforge.net/projects/dirbuster/; https://github.com/stefanoj3/dirstalk +426,Untrusted Search Path,yes,yes,malicious dll injection/loading,evildll; evilldll-gen,https://github.com/CrackerCat/evildll; https://gist.github.com/klezVirus/e24c94d7061f5736e2452eee022f4011 +427,Uncontrolled Search Path Element,yes,yes,malicious dll injection/loading,evildll; evilldll-gen,https://github.com/CrackerCat/evildll; https://gist.github.com/klezVirus/e24c94d7061f5736e2452eee022f4011 +428,Unquoted Search Path or Element,yes,yes,insert malicious input into unquoted search path,Metasploit,https://www.metasploit.com/ +434,Unrestricted Upload of File with Dangerous Type,yes,yes,uploading of malicious file (program lacks restrictions to prevent this from occuring),No specialized resources are required to execute this type of attack.,https://capec.mitre.org/data/definitions/1.html +436,Interpretation Conflict,yes,no,,, +441,Unintended Proxy or Intermediary ('Confused Deputy'),no,no,,, +444,Inconsistent Interpretation of HTTP Requests ('HTTP Request/Response Smuggling'),yes,yes,HTTP smuggling,Smuggler,https://github.com/defparam/smuggler +451,User Interface (UI) Misrepresentation of Critical Information,no,no,,, +459,Incomplete Cleanup,yes,no,,, +470,Use of Externally-Controlled Input to Select Classes or Code ('Unsafe Reflection'),yes,no,,, +476,NULL Pointer Dereference,yes,no,,, +494,Download of Code Without Integrity Check,yes,no,,, +502,Deserialization of Untrusted Data,yes,no,,, +521,Weak Password Requirements,yes,yes,brute force attack,THC Hydra; John the Ripper; L0phtCrack; Hashcat,https://github.com/vanhauser-thc/thc-hydra; https://github.com/openwall/john; https://gitlab.com/l0phtcrack/l0phtcrack; https://hashcat.net/hashcat/ +522,Insufficiently Protected Credentials,yes,yes,"search for exposed credentials, capture traffic, or brute force (context-dependent)","Context-dependent, may utilize traffic sniffing tools, tools for discovering sensitive information, or brute forcing tools",https://www.wireshark.org/; https://www.nirsoft.net/utils/smsniff.html; https://www.zaproxy.org/; https://portswigger.net/burp; https://github.com/vanhauser-thc/thc-hydra; https://github.com/openwall/john; https://gitlab.com/l0phtcrack/l0phtcrack; https://hashcat.net/hashcat/ +532,Insertion of Sensitive Information into Log File,yes,yes,access log files and search them for sensitive information,OWASP ZAP; Burp Suite - along with the ability to access log files,https://www.zaproxy.org/; https://portswigger.net/burp +552,Files or Directories Accessible to External Parties,yes,no,,, +565,Reliance on Cookies without Validation and Integrity Checking,yes,no,,, +592,Authentication Bypass Issues,no,no,,, +601,URL Redirection to Untrusted Site ('Open Redirect'),yes,no,,, +602,Client-Side Enforcement of Server-Side Security,no,no,,, +610,Externally Controlled Reference to a Resource in Another Sphere,yes,no,,, +611,Improper Restriction of XML External Entity Reference,yes,yes,XML external entity injection,XXExploiter,https://github.com/luisfontes19/xxexploiter +613,Insufficient Session Expiration,yes,no,,, +617,Reachable Assertion,yes,no,,, +639,Authorization Bypass Through User-Controlled Key,yes,yes,"modify key values to change what data attacker has access to, insecure direct object vulnerability exploit",AuthZ for burpsuite,https://portswigger.net/bappstore/4316cc18ac5f434884b2089831c7d19e +640,Weak Password Recovery Mechanism for Forgotten Password,yes,no,,, +662,Improper Synchronization,yes,no,,, +665,Improper Initialization,yes,no,,, +667,Improper Locking,yes,no,,, +668,Exposure of Resource to Wrong Sphere,yes,no,,, +669,Incorrect Resource Transfer Between Spheres,yes,no,,, +670,Always-Incorrect Control Flow Implementation,yes,no,,, +672,Operation on a Resource after Expiration or Release,yes,no,,, +674,Uncontrolled Recursion,yes,no,,, +681,Incorrect Conversion between Numeric Types,yes,no,,, +682,Incorrect Calculation,yes,no,,, +697,Incorrect Comparison,yes,no,,, +703,Improper Check or Handling of Exceptional Conditions,no,no,,, +704,Incorrect Type Conversion or Cast,yes,no,,, +706,Use of Incorrectly-Resolved Name or Reference,yes,no,,, +732,Incorrect Permission Assignment for Critical Resource,yes,no,,, +749,Exposed Dangerous Method or Function,no,no,,, +754,Improper Check for Unusual or Exceptional Conditions,yes,no,,, +755,Improper Handling of Exceptional Conditions,yes,no,,, +759,Use of a One-Way Hash without a Salt,no,no,,, +763,Release of Invalid Pointer or Reference,yes,no,,, +770,Allocation of Resources Without Limits or Throttling,yes,no,,, +772,Missing Release of Resource after Effective Lifetime,yes,no,,, +776,Improper Restriction of Recursive Entity References in DTDs ('XML Entity Expansion'),yes,yes,XML entity expansion,XXExploiter,https://github.com/luisfontes19/xxexploiter +787,Out-of-bounds Write,yes,no,,, +789,Memory Allocation with Excessive Size Value,no,no,,, +798,Use of Hard-coded Credentials,yes,yes,discover and use hardcoded credentials,"Context-dependent, may use password cracking tools, binary analysis tools, or may not require any tools (just knowledge of the default hard-coded credentials)",https://github.com/vanhauser-thc/thc-hydra; https://github.com/openwall/john; https://gitlab.com/l0phtcrack/l0phtcrack; https://hashcat.net/hashcat/; https://www.powergrep.com/ +823,Use of Out-of-range Pointer Offset,no,no,,, +824,Access of Uninitialized Pointer,yes,no,,, +829,Inclusion of Functionality from Untrusted Control Sphere,yes,no,,, +834,Excessive Iteration,yes,no,,, +835,Loop with Unreachable Exit Condition ('Infinite Loop'),yes,no,,, +838,Inappropriate Encoding for Output Context,yes,no,,, +843,Access of Resource Using Incompatible Type ('Type Confusion'),yes,no,,, +862,Missing Authorization,yes,no,,, +863,Incorrect Authorization,yes,no,,, +908,Use of Uninitialized Resource,yes,no,,, +909,Missing Initialization of Resource,yes,no,,, +913,Improper Control of Dynamically-Managed Code Resources,yes,no,,, +916,Use of Password Hash With Insufficient Computational Effort,yes,yes,brute force,THC Hydra; John the Ripper; L0phtCrack; Hashcat,https://github.com/vanhauser-thc/thc-hydra; https://github.com/openwall/john; https://gitlab.com/l0phtcrack/l0phtcrack; https://hashcat.net/hashcat/ +917,Improper Neutralization of Special Elements used in an Expression Language Statement ('Expression La,yes,no,,, +918,Server-Side Request Forgery (SSRF),yes,yes,SSRF,SSRFmap; Burp Suite,https://github.com/swisskyrepo/SSRFmap; https://portswigger.net/web-security/ssrf +920,Improper Restriction of Power Consumption,yes,no,,, +922,Insecure Storage of Sensitive Information,yes,no,,, +924,Improper Enforcement of Message Integrity During Transmission in a Communication Channel,yes,no,,, +1021,Improper Restriction of Rendered UI Layers or Frames,yes,no,,, +1188,Insecure Default Initialization of Resource,yes,yes,use default credentials,"Context-dependent, but may not need any tools (for example, try to use default credentials or access resources that typically require permissions) - knowledge of the system (and its defaults) helps", +1236,Improper Neutralization of Formula Elements in a CSV File,yes,yes,CSV injection,"No specialized resources are required to execute this type of attack, it is more based on payloads.",https://gitlab.com/pentest-tools/PayloadsAllTheThings/-/tree/master/CSV%20Injection; https://owasp.org/www-community/attacks/CSV_Injection +1284,Improper Validation of Specified Quantity in Input,yes,no,,, +1321,Improperly Controlled Modification of Object Prototype Attributes ('Prototype Pollution'),yes,yes,prototype pollution,DOM Invader (Burp Suite),https://portswigger.net/burp/documentation/desktop/tools/dom-invader +1333,Inefficient Regular Expression Complexity,yes,yes,ReDoS or exponential backtracking,ReScue,https://2bdenny.github.io/ReScue/ +NVD-noinfo,There is insufficient information about the issue to classify it; details are unkown or unspecified.,yes,no,,, +NVD-Other,"NVD is only using a subset of CWE for mapping instead of the entire CWE, and the weakness type is not covered by that subset.",yes,no,,, diff --git a/docs/reference/decision_points/exploitation.md b/docs/reference/decision_points/exploitation.md index a25d2e6b..f20ba87a 100644 --- a/docs/reference/decision_points/exploitation.md +++ b/docs/reference/decision_points/exploitation.md @@ -1,3 +1,5 @@ +# Exploitation + !!! note "Exploitation" Evidence of Active Exploitation of a Vulnerability @@ -16,15 +18,10 @@ The intent of this measure is the present state of exploitation of the vulnerabi [@householder2020historical] presents a method for searching the GitHub repositories of open-source exploit databases. This method could be employed to gather information about whether [PoC](#exploitation) is true. However, part (3) of [PoC](#exploitation) would not be represented in such a search, so more information gathering would be needed. - For part (3), perhaps we could construct a mapping of CWE-IDs which always represent vulnerabilities with well-known methods of exploitation. - - !!! example "CWE-IDs for PoC" - - For example, CWE-295, [Improper Certificate Validation - ](https://cwe.mitre.org/data/definitions/295.html), and its child CWEs, describe improper validation of TLS certificates. - These CWE-IDs could always be marked as [PoC](#exploitation) since that meets condition (3) in the definition. - A comprehensive set of suggested CWE-IDs for this purpose is future work. - + For part (3), one approach is to construct a mapping of CWE-IDs which + always represent vulnerabilities with well-known methods of exploitation. + We provide a list of possible CWE-IDs for this purpose below. + Gathering information for [active](#exploitation) is a bit harder. If the vulnerability has a name or public identifier (such as a CVE-ID), a search of news websites, Twitter, the vendor's vulnerability description, and public vulnerability databases for mentions of exploitation is generally adequate. However, if the organization has the ability to detect exploitation attempts—for instance, through reliable and precise IDS signatures based on a public PoC—then detection of exploitation attempts also signals that [active](#exploitation) is the right choice. @@ -38,3 +35,20 @@ The intent of this measure is the present state of exploitation of the vulnerabi This framing admits that an analyst may not be able to detect or know about every attack. An analyst should feel comfortable selecting [none](#exploitation) if they (or their search scripts) have performed searches in the appropriate places for public PoCs and active exploitation (as described above) and found none. Acknowledging that [*Exploitation*](#exploitation) values can change relatively quickly, we recommend conducting these searches frequently: if they can be automated to the organization's satisfaction, perhaps once a day (see also [Guidance on Communicating Results](#guidance-on-communicating-results)). + +## CWE-IDs for PoC + +The table below lists CWE-IDs that could be used to mark a vulnerability as [PoC](#exploitation) if the vulnerability is described by the CWE-ID. + +!!! example "CWE-295" + + For example, CWE-295, [Improper Certificate Validation + ](https://cwe.mitre.org/data/definitions/295.html), and its child CWEs, + describe improper validation of TLS certificates. These CWE-IDs could + always be marked as [PoC](#exploitation) since that meets condition (3) in + the definition. + + +{{ read_csv('../../../data/csvs/cwe/possible-cwe-with-poc-examples.csv') }} + +--- \ No newline at end of file From 621ce0fd4768c95fca5bb8b9b8c2c2e98913125a Mon Sep 17 00:00:00 2001 From: "Allen D. Householder" Date: Thu, 9 Nov 2023 11:46:05 -0500 Subject: [PATCH 2/2] Policy Generator tool, first pass (#365) * add policy generator * add unit tests for outcome values and outcome groups * update requirements.txt * add unit tests * add unit tests * add docs * add docs * add docs * rename DSIO->DSOI * fix type hints * add unit test for dp groups * integrate policy generator with csv_analyzer * rename nav items --- docs/reference/code/outcomes.md | 5 + docs/reference/code/policy_generator.md | 9 + mkdocs.yml | 4 +- requirements.txt | 7 +- src/ssvc/_mixins.py | 28 +- src/ssvc/csv_analyzer.py | 92 +++++- src/ssvc/decision_points/base.py | 115 ++++--- src/ssvc/dp_groups/base.py | 35 +- src/ssvc/outcomes/__init__.py | 1 + src/ssvc/outcomes/base.py | 55 ++++ src/ssvc/outcomes/groups.py | 138 ++++++++ src/ssvc/policy_generator.py | 303 ++++++++++++++++++ ...st_analyze_csv.py => test_csv_analyzer.py} | 34 ++ src/test/test_dp_base.py | 54 +++- src/test/test_dp_groups.py | 77 +++++ src/test/test_outcomes.py | 52 +++ src/test/test_policy_generator.py | 291 +++++++++++++++++ 17 files changed, 1214 insertions(+), 86 deletions(-) create mode 100644 docs/reference/code/outcomes.md create mode 100644 docs/reference/code/policy_generator.md create mode 100644 src/ssvc/outcomes/__init__.py create mode 100644 src/ssvc/outcomes/base.py create mode 100644 src/ssvc/outcomes/groups.py create mode 100644 src/ssvc/policy_generator.py rename src/test/{test_analyze_csv.py => test_csv_analyzer.py} (75%) create mode 100644 src/test/test_dp_groups.py create mode 100644 src/test/test_outcomes.py create mode 100644 src/test/test_policy_generator.py diff --git a/docs/reference/code/outcomes.md b/docs/reference/code/outcomes.md new file mode 100644 index 00000000..f1a2d15c --- /dev/null +++ b/docs/reference/code/outcomes.md @@ -0,0 +1,5 @@ +# Outcome Values and Outcome Groups + +::: ssvc.outcomes.base + +::: ssvc.outcomes.groups diff --git a/docs/reference/code/policy_generator.md b/docs/reference/code/policy_generator.md new file mode 100644 index 00000000..fa6e8477 --- /dev/null +++ b/docs/reference/code/policy_generator.md @@ -0,0 +1,9 @@ +# SSVC Policy Generator Tool + +The SSVC Policy Generator is a Python object that generates an SSVC decision +policy (a decision tree) from a set of input parameters. + +It is intended to be used as a library, for example within a Jupyter notebook. + + +::: ssvc.policy_generator \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index 7e3f6186..ef883658 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -70,7 +70,9 @@ nav: - Technical Impact: 'reference/decision_points/technical_impact.md' - Value Density: 'reference/decision_points/value_density.md' - Code: - analyze_csv: 'reference/code/analyze_csv.md' + CSV Analyzer: 'reference/code/analyze_csv.md' + Policy Generator: 'reference/code/policy_generator.md' + Outcomes: 'reference/code/outcomes.md' - Calculator: 'ssvc-calc/index.html' - About: - Intro: 'about/index.md' diff --git a/requirements.txt b/requirements.txt index 8cd33e14..7ec6baa1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ mkdocstrings mkdocstrings-python mkdocs-print-site-plugin dataclasses-json -pandas -scikit-learn -jsonschema +pandas~=2.1.2 +scikit-learn~=1.3.2 +jsonschema~=4.19.2 +networkx~=3.1 \ No newline at end of file diff --git a/src/ssvc/_mixins.py b/src/ssvc/_mixins.py index 48359e5d..c68db33e 100644 --- a/src/ssvc/_mixins.py +++ b/src/ssvc/_mixins.py @@ -4,6 +4,19 @@ author: adh created_at: 9/20/23 4:51 PM """ +# Copyright (c) 2023 Carnegie Mellon University and Contributors. +# - see Contributors.md for a full list of Contributors +# - see ContributionInstructions.md for information on how you can Contribute to this project +# Stakeholder Specific Vulnerability Categorization (SSVC) is +# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed +# with this Software or contact permission@sei.cmu.edu for full terms. +# Created, in part, with funding and support from the United States Government +# (see Acknowledgments file). This program may include and/or can make use of +# certain third party source code, object code, documentation and other files +# (“Third Party Software”). See LICENSE.md for more details. +# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the +# U.S. Patent and Trademark Office by Carnegie Mellon University + from dataclasses import dataclass, field from typing import Optional @@ -44,6 +57,18 @@ def exclude_if_none(value): return value is None +@dataclass_json +@dataclass(kw_only=True) +class _Commented: + """ + Mixin class for commented SSVC objects. + """ + + _comment: Optional[str] = field( + default=None, metadata=config(exclude=exclude_if_none) + ) + + @dataclass_json @dataclass(kw_only=True) class _Base: @@ -53,9 +78,6 @@ class _Base: name: str description: str - _comment: Optional[str] = field( - default=None, metadata=config(exclude=exclude_if_none) - ) def main(): diff --git a/src/ssvc/csv_analyzer.py b/src/ssvc/csv_analyzer.py index 0ddc9525..81807f2d 100644 --- a/src/ssvc/csv_analyzer.py +++ b/src/ssvc/csv_analyzer.py @@ -40,6 +40,19 @@ Higher values imply more important features. """ +# Copyright (c) 2023 Carnegie Mellon University and Contributors. +# - see Contributors.md for a full list of Contributors +# - see ContributionInstructions.md for information on how you can Contribute to this project +# Stakeholder Specific Vulnerability Categorization (SSVC) is +# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed +# with this Software or contact permission@sei.cmu.edu for full terms. +# Created, in part, with funding and support from the United States Government +# (see Acknowledgments file). This program may include and/or can make use of +# certain third party source code, object code, documentation and other files +# (“Third Party Software”). See LICENSE.md for more details. +# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the +# U.S. Patent and Trademark Office by Carnegie Mellon University + import argparse import re import sys @@ -97,6 +110,7 @@ def _drop_col_feat_imp( model_clone.random_state = random_state # training and scoring the benchmark model model_clone.fit(X_train, y_train) + benchmark_score = model_clone.score(X_train, y_train) # list for storing feature importances importances = [] @@ -191,20 +205,41 @@ def _parse_args(args) -> argparse.Namespace: def main(): args = _parse_args(sys.argv[1:]) + csvfile = args.csvfile # read csv - df = pd.read_csv(args.csvfile) - df = _clean_table(df) + df = pd.read_csv(csvfile) + + if args.permutation: + imp = permute_feature_importance(df, args.outcol) + print(f"Feature Permutation Importance for {df.columns}") + else: + imp = drop_col_feature_importance(df, args.outcol) + print(f"Drop Column Feature Importance for {df.columns}") + + print(imp) + + +def _create_dt_classifier( + df: pd.DataFrame, target: str, permute: bool = False +) -> (pd.DataFrame, pd.DataFrame): + """ + Compute feature importance two different ways for a dataframe + Args: + df: the dataframe to analyze + target: the name of the target column to analyze against + permute: use permutation importance instead of drop column importance + + Returns: + a tuple of (the cleaned dataframe, the feature importance dataframe) + """ + + df = _clean_table(df) # check for target column - target = args.outcol if target not in df.columns: - print( - f"Column '{target}' not found in {list(df.columns)}.\nPlease specify --outcol= and try again." - ) - exit(1) + raise KeyError(f"Column '{target}' not found in {list(df.columns)}") X, y = _split_data(df, target) - # turn features into ordinals # this assumes that every column is an ordinal label # and that the ordinals are sorted in ascending order @@ -216,19 +251,42 @@ def main(): mapper = {v: k for (k, v) in codes} X[newcol] = X[c].replace(mapper) X2 = X[cols] - # construct tree dt = DecisionTreeClassifier(random_state=99, criterion="entropy") - if args.permutation: - imp = _perm_feat_imp(dt, X2, y) - print(f"Feature Permutation Importance for {args.csvfile}") - else: - # drop columns and re-run - imp = _drop_col_feat_imp(dt, X2, y) - print(f"Drop Column Feature Importance for {args.csvfile}") + return dt, X2, y - print(imp) + +def drop_col_feature_importance(df: pd.DataFrame, target: str) -> pd.DataFrame: + """ + Compute feature importance using drop column feature importance + + Args: + df: the dataframe to analyze + target: the name of the target column to analyze against + + Returns: + a dataframe of feature importances + """ + dt, X2, y = _create_dt_classifier(df, target) + imp = _drop_col_feat_imp(dt, X2, y) + return imp + + +def permute_feature_importance(df: pd.DataFrame, target: str) -> pd.DataFrame: + """ + Compute feature importance using permutation feature importance + + Args: + df: the dataframe to analyze + target: the name of the target column to analyze against + + Returns: + a dataframe of feature importances + """ + dt, X2, y = _create_dt_classifier(df, target) + imp = _perm_feat_imp(dt, X2, y) + return imp if __name__ == "__main__": diff --git a/src/ssvc/decision_points/base.py b/src/ssvc/decision_points/base.py index c148f841..d99b545b 100644 --- a/src/ssvc/decision_points/base.py +++ b/src/ssvc/decision_points/base.py @@ -4,31 +4,31 @@ author: adh created_at: 9/20/23 10:07 AM """ +# Copyright (c) 2023 Carnegie Mellon University and Contributors. +# - see Contributors.md for a full list of Contributors +# - see ContributionInstructions.md for information on how you can Contribute to this project +# Stakeholder Specific Vulnerability Categorization (SSVC) is +# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed +# with this Software or contact permission@sei.cmu.edu for full terms. +# Created, in part, with funding and support from the United States Government +# (see Acknowledgments file). This program may include and/or can make use of +# certain third party source code, object code, documentation and other files +# (“Third Party Software”). See LICENSE.md for more details. +# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the +# U.S. Patent and Trademark Office by Carnegie Mellon University import logging -from dataclasses import dataclass, field -from typing import ClassVar, Dict, Tuple +from dataclasses import dataclass +from typing import Iterable -from dataclasses_json import config, dataclass_json +from dataclasses_json import dataclass_json -from ssvc._mixins import _Base, _Keyed, _Namespaced, _Versioned +from ssvc._mixins import _Base, _Commented, _Keyed, _Namespaced, _Versioned logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) -class _DecisionPoints: - """ - A collection of SSVC decision points. - """ - - registry: ClassVar[Dict[str, "SsvcDecisionPoint"]] = {} - - def __iter__(self): - return iter(self.registry.values()) - - -REGISTERED_DECISION_POINTS = _DecisionPoints() +REGISTERED_DECISION_POINTS = [] @dataclass_json @@ -43,63 +43,76 @@ class SsvcDecisionPointValue(_Base, _Keyed): @dataclass_json @dataclass(kw_only=True) -class SsvcDecisionPoint(_Base, _Keyed, _Versioned, _Namespaced): +class SsvcDecisionPoint( + _Base, + _Keyed, + _Versioned, + _Namespaced, + _Commented, +): """ Models a single decision point as a list of values. """ - values: Tuple[SsvcDecisionPointValue] + values: Iterable[SsvcDecisionPointValue] = () - # this is only for our own use in Python land, exclude it from serialization - _fullname: str = field( - init=False, repr=False, default=None, metadata=config(exclude=lambda x: True) - ) + def __iter__(self): + """ + Allow iteration over the decision points in the group. + """ + return iter(self.values) def __post_init__(self): - self._fullname = f"{self.namespace} {self.name} v{self.version}" - logging.debug(f"Add {self._fullname} to registry") - REGISTERED_DECISION_POINTS.registry[self._fullname] = self + global REGISTERED_DECISION_POINTS - def to_table(self): - rows = [] - rows.append(f"{self.description}") - rows.append("") + REGISTERED_DECISION_POINTS.append(self) - headings = ["Value", "Key", "Description"] - def make_row(items): - return "| " + " | ".join(items) + " |" +def dp_to_table(dp: SsvcDecisionPoint) -> str: + """ + Convert a decision point to a markdown table. + :param dp: The decision point to convert. + :return: a string containing the markdown table. + """ + rows = [] + rows.append(f"{dp.description}") + rows.append("") + + headings = ["Value", "Key", "Description"] - rows.append(make_row(headings)) - rows.append(make_row(["---" for _ in headings])) + def make_row(items): + return "| " + " | ".join(items) + " |" - for value in self.values: - rows.append(make_row([value.name, value.key, value.description])) + rows.append(make_row(headings)) + rows.append(make_row(["---" for _ in headings])) - return "\n".join(rows) + for value in dp.values: + rows.append(make_row([value.name, value.key, value.description])) + + return "\n".join(rows) def main(): + opt_none = SsvcDecisionPointValue( + name="None", key="N", description="No exploit available" + ) + opt_poc = SsvcDecisionPointValue( + name="PoC", key="P", description="Proof of concept exploit available" + ) + opt_active = SsvcDecisionPointValue( + name="Active", key="A", description="Active exploitation observed" + ) + opts = [opt_none, opt_poc, opt_active] + dp = SsvcDecisionPoint( _comment="This is an optional comment that will be included in the object.", + values=opts, name="Exploitation", description="Is there an exploit available?", key="E", version="1.0.0", - values=( - SsvcDecisionPointValue( - name="None", key="N", description="No exploit available" - ), - SsvcDecisionPointValue( - name="PoC", - key="P", - description="Proof of concept exploit available", - ), - SsvcDecisionPointValue( - name="Active", key="A", description="Active exploitation observed" - ), - ), ) + print(dp.to_json(indent=2)) diff --git a/src/ssvc/dp_groups/base.py b/src/ssvc/dp_groups/base.py index 947a9cc4..6213d14b 100644 --- a/src/ssvc/dp_groups/base.py +++ b/src/ssvc/dp_groups/base.py @@ -4,8 +4,21 @@ author: adh created_at: 9/20/23 4:47 PM """ +# Copyright (c) 2023 Carnegie Mellon University and Contributors. +# - see Contributors.md for a full list of Contributors +# - see ContributionInstructions.md for information on how you can Contribute to this project +# Stakeholder Specific Vulnerability Categorization (SSVC) is +# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed +# with this Software or contact permission@sei.cmu.edu for full terms. +# Created, in part, with funding and support from the United States Government +# (see Acknowledgments file). This program may include and/or can make use of +# certain third party source code, object code, documentation and other files +# (“Third Party Software”). See LICENSE.md for more details. +# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the +# U.S. Patent and Trademark Office by Carnegie Mellon University + from dataclasses import dataclass -from typing import Tuple +from typing import Iterable from dataclasses_json import dataclass_json @@ -20,15 +33,24 @@ class SsvcDecisionPointGroup(_Base, _Versioned): Models a group of decision points. """ - decision_points: Tuple[SsvcDecisionPoint] + decision_points: Iterable[SsvcDecisionPoint] def __iter__(self): + """ + Allow iteration over the decision points in the group. + """ return iter(self.decision_points) + def __len__(self): + """ + Allow len() to be called on the group. + """ + return len(self.decision_points) + def get_all_decision_points_from( glist: list[SsvcDecisionPointGroup], -) -> Tuple[SsvcDecisionPoint]: +) -> Iterable[SsvcDecisionPoint]: """ Given a list of SsvcDecisionPointGroup objects, return a list of all the unique SsvcDecisionPoint objects contained in those groups. @@ -40,20 +62,13 @@ def get_all_decision_points_from( list: A list of SsvcDecisionPoint objects. """ dps = [] - seen = set() - for group in glist: for dp in group.decision_points: if dp in dps: # skip duplicates continue - key = (dp.name, dp.version) - if key in seen: - # skip duplicates - continue # keep non-duplicates dps.append(dp) - seen.add(key) return tuple(dps) diff --git a/src/ssvc/outcomes/__init__.py b/src/ssvc/outcomes/__init__.py new file mode 100644 index 00000000..063e8a7a --- /dev/null +++ b/src/ssvc/outcomes/__init__.py @@ -0,0 +1 @@ +# Copyright (c) 2023 Carnegie Mellon University and Contributors. diff --git a/src/ssvc/outcomes/base.py b/src/ssvc/outcomes/base.py new file mode 100644 index 00000000..09235be2 --- /dev/null +++ b/src/ssvc/outcomes/base.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +""" +Provides outcome group and outcome value classes for SSVC. +""" +# Copyright (c) 2023 Carnegie Mellon University and Contributors. +# - see Contributors.md for a full list of Contributors +# - see ContributionInstructions.md for information on how you can Contribute to this project +# Stakeholder Specific Vulnerability Categorization (SSVC) is +# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed +# with this Software or contact permission@sei.cmu.edu for full terms. +# Created, in part, with funding and support from the United States Government +# (see Acknowledgments file). This program may include and/or can make use of +# certain third party source code, object code, documentation and other files +# (“Third Party Software”). See LICENSE.md for more details. +# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the +# U.S. Patent and Trademark Office by Carnegie Mellon University + +from dataclasses import dataclass +from typing import Iterable + +from dataclasses_json import dataclass_json + +from ssvc._mixins import _Base, _Keyed + + +@dataclass_json +@dataclass(kw_only=True) +class OutcomeValue(_Base, _Keyed): + """ + Models a single value option for an SSVC outcome. + """ + + +@dataclass_json +@dataclass(kw_only=True) +class OutcomeGroup(_Base): + """ + Models an outcome group. + """ + + outcomes: Iterable[OutcomeValue] + + def __iter__(self): + """ + Allow iteration over the outcomes in the group. + """ + return iter(self.outcomes) + + def __len__(self): + """ + Allow len() to be called on the group. + """ + return len(self.outcomes) + + # register all instances diff --git a/src/ssvc/outcomes/groups.py b/src/ssvc/outcomes/groups.py new file mode 100644 index 00000000..e44444d7 --- /dev/null +++ b/src/ssvc/outcomes/groups.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python +""" +Provides a set of outcome groups for use in SSVC. +""" +# Copyright (c) 2023 Carnegie Mellon University and Contributors. +# - see Contributors.md for a full list of Contributors +# - see ContributionInstructions.md for information on how you can Contribute to this project +# Stakeholder Specific Vulnerability Categorization (SSVC) is +# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed +# with this Software or contact permission@sei.cmu.edu for full terms. +# Created, in part, with funding and support from the United States Government +# (see Acknowledgments file). This program may include and/or can make use of +# certain third party source code, object code, documentation and other files +# (“Third Party Software”). See LICENSE.md for more details. +# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the +# U.S. Patent and Trademark Office by Carnegie Mellon University + +from ssvc.outcomes.base import OutcomeGroup, OutcomeValue + +# Note: Outcome Groups must be defined in ascending order. + + +DSOI = OutcomeGroup( + name="Defer, Scheduled, Out-of-Cycle, Immediate", + description="The original SSVC outcome group.", + outcomes=( + OutcomeValue(name="Defer", key="D", description="Defer"), + OutcomeValue(name="Scheduled", key="S", description="Scheduled"), + OutcomeValue(name="Out-of-Cycle", key="O", description="Out-of-Cycle"), + OutcomeValue(name="Immediate", key="I", description="Immediate"), + ), +) +""" +The original SSVC outcome group. +""" + +PUBLISH = OutcomeGroup( + name="Publish, Do Not Publish", + description="The publish outcome group.", + outcomes=( + OutcomeValue(name="Do Not Publish", key="N", description="Do Not Publish"), + OutcomeValue(name="Publish", key="P", description="Publish"), + ), +) +""" +The publish outcome group. +""" + +COORDINATE = OutcomeGroup( + name="Decline, Track, Coordinate", + description="The coordinate outcome group.", + outcomes=( + OutcomeValue(name="Decline", key="D", description="Decline"), + OutcomeValue(name="Track", key="T", description="Track"), + OutcomeValue(name="Coordinate", key="C", description="Coordinate"), + ), +) +""" +The coordinate outcome group. +""" + +MOSCOW = OutcomeGroup( + name="Must, Should, Could, Won't", + description="The Moscow outcome group.", + outcomes=( + OutcomeValue(name="Won't", key="W", description="Won't"), + OutcomeValue(name="Could", key="C", description="Could"), + OutcomeValue(name="Should", key="S", description="Should"), + OutcomeValue(name="Must", key="M", description="Must"), + ), +) +""" +The MoSCoW outcome group. +""" + +EISENHOWER = OutcomeGroup( + name="Do, Schedule, Delegate, Delete", + description="The Eisenhower outcome group.", + outcomes=( + OutcomeValue(name="Delete", key="D", description="Delete"), + OutcomeValue(name="Delegate", key="G", description="Delegate"), + OutcomeValue(name="Schedule", key="S", description="Schedule"), + OutcomeValue(name="Do", key="O", description="Do"), + ), +) +""" +The Eisenhower outcome group. +""" + + +CVSS = OutcomeGroup( + name="CVSS Levels", + description="The CVSS outcome group.", + outcomes=( + OutcomeValue(name="Low", key="L", description="Low"), + OutcomeValue(name="Medium", key="M", description="Medium"), + OutcomeValue(name="High", key="H", description="High"), + OutcomeValue(name="Critical", key="C", description="Critical"), + ), +) +""" +The CVSS outcome group. +""" + +YES_NO = OutcomeGroup( + name="Yes, No", + description="The Yes/No outcome group.", + outcomes=( + OutcomeValue(name="No", key="N", description="No"), + OutcomeValue(name="Yes", key="Y", description="Yes"), + ), +) +""" +The Yes/No outcome group. +""" + +VALUE_COMPLEXITY = OutcomeGroup( + name="Value, Complexity", + description="The Value/Complexity outcome group.", + outcomes=( + # drop, reconsider later, easy win, do first + OutcomeValue(name="Drop", key="D", description="Drop"), + OutcomeValue(name="Reconsider Later", key="R", description="Reconsider Later"), + OutcomeValue(name="Easy Win", key="E", description="Easy Win"), + OutcomeValue(name="Do First", key="F", description="Do First"), + ), +) +""" +The Value/Complexity outcome group. +""" + + +def main(): + pass + + +if __name__ == "__main__": + main() diff --git a/src/ssvc/policy_generator.py b/src/ssvc/policy_generator.py new file mode 100644 index 00000000..01a490aa --- /dev/null +++ b/src/ssvc/policy_generator.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python +""" +Provides a Policy Generator class for SSVC decision point groups. + +""" +# Copyright (c) 2023 Carnegie Mellon University and Contributors. +# - see Contributors.md for a full list of Contributors +# - see ContributionInstructions.md for information on how you can Contribute to this project +# Stakeholder Specific Vulnerability Categorization (SSVC) is +# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed +# with this Software or contact permission@sei.cmu.edu for full terms. +# Created, in part, with funding and support from the United States Government +# (see Acknowledgments file). This program may include and/or can make use of +# certain third party source code, object code, documentation and other files +# (“Third Party Software”). See LICENSE.md for more details. +# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the +# U.S. Patent and Trademark Office by Carnegie Mellon University + +import itertools +import logging +from typing import List, Tuple + +import networkx as nx +import pandas as pd + +from ssvc import csv_analyzer +from ssvc.dp_groups.base import SsvcDecisionPointGroup +from ssvc.outcomes.base import OutcomeGroup + +logger = logging.getLogger(__name__) + + +class PolicyGenerator: + """ + Generates a policy for a given decision point group and outcome group. + + An SSVC policy is represented as a table of decision point values and outcomes. + Each row of the table represents a specific set of decision point values, and the outcome that results from those values. + + Internally, the PolicyGenerator represents a policy as a directed graph. + Each node in the graph corresponds to a specific set of decision point values. + Each edge in the graph indicates an ordering between two states. + Taken together, the graph represents a partial ordering of the decision point values mapped to outcomes. + """ + + def __init__( + self, + dp_group: SsvcDecisionPointGroup = None, + outcomes: OutcomeGroup = None, + outcome_weights: List[float] = None, + ): + """ + Create a policy generator. + + If outcome weights are unspecified, then the weights are evenly distributed across the outcomes. + + Args: + dp_group: The decision point group to generate a policy for. + outcomes: The outcome group to generate a policy for. + outcome_weights: The relative weights of the outcomes (optional) + + Raises: + ValueError: If dp_group or outcomes are None. + """ + if dp_group is None: + raise ValueError("dp_group is required") + else: + self.dpg: SsvcDecisionPointGroup = dp_group + + if outcomes is None: + raise ValueError("outcomes is required") + else: + self.outcomes: OutcomeGroup = outcomes + + if outcome_weights is None: + weight = 1.0 / len(list(self.outcomes)) + self.outcome_weights = [weight for _ in self.outcomes] + else: + self.outcome_weights = outcome_weights + logger.debug(f"Outcome weights: {self.outcome_weights}") + + self.policy: pd.DataFrame = None + self.G: nx.DiGraph = nx.DiGraph() + self.top: Tuple[int] = None + self.bottom: Tuple[int] = None + + self._enumerated_vec = None + + def __enter__(self) -> "PolicyGenerator": + """ + Sets up a policy generator runtime context. + + The runtime context performs the following steps in order: + + 1. Converts the decision point group to a vector + representation. + 2. Adds nodes to the graph. A node is represented as a tuple of decision point values as + integers. E.g., `(0,1,0,2)`, `(1,2,1,3)` + 3. Adds edges to the graph where each edge $(u,v)$ indicates that $u < v$. + 4. Assigns outcomes to each node in the graph according to the outcome weights. + 5. Validates that the graph + meets the requirement that outcome ordering is consistent with node ordering. + 6. Converts the graph to a policy table. The policy table is a dataframe where each row represents a node in + the graph. + + !!! note "Node ordering" + + A node $u$ is considered less than another node $v$ if $u[i] <= v[i]$ for all $i$. + + + Example: + ```python + with PolicyGenerator(dp_group, outcomes) as pg: + pg.emit_policy() + ``` + + Returns: + The policy generator context. + """ + self._setup() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def _setup(self): + """ + Convert the decision point group to a vector representation. + """ + + self._enumerate_dp_values() + self._add_nodes() + self._add_edges() + self._assign_outcomes() + self._validate_paths() + self._create_policy() + + def _validate_paths(self): + for path in nx.all_simple_paths(self.G, self.bottom, self.top): + for start, end in zip(path[:-1], path[1:]): + u = self.G.nodes[start]["outcome"] + v = self.G.nodes[end]["outcome"] + if u > v: + raise (ValueError(f"Invalid path: {u} !<= {v} in {path}")) + + def _create_policy(self): + rows = [] + for node in self.G.nodes: + row = {} + for i in range(len(node)): + # turn the numerical indexes back into decision point names + col1 = f"{self.dpg.decision_points[i].name}" + row[col1] = self.dpg.decision_points[i].values[node[i]].name + # numerical values + col2 = f"idx_{self.dpg.decision_points[i].name}" + row[col2] = node[i] + + oc_idx = self.G.nodes[node]["outcome"] + row["outcome"] = self.outcomes.outcomes[oc_idx].name + + row["idx_outcome"] = oc_idx + rows.append(row) + + self.policy = pd.DataFrame(rows) + + def clean_policy(self) -> pd.DataFrame: + df = self.policy.copy() + print_cols = [c for c in df.columns if not c.startswith("idx_")] + for c in print_cols: + df[c] = df[c].str.lower() + + return pd.DataFrame(df[print_cols]) + + def emit_policy(self) -> None: + """ + Prints the policy to stdout in CSV format. + """ + df = self.clean_policy() + + print(df.to_csv(index=False)) + + def _assign_outcomes(self): + node_count = len(self.G.nodes) + outcomes = [outcome.name for outcome in self.outcomes.outcomes] + logger.debug(f"Outcomes: {outcomes}") + + layers = list(nx.topological_generations(self.G)) + logger.debug(f"Layer count: {len(layers)}") + logger.debug(f"Layer sizes: {[len(layer) for layer in layers]}") + + outcome_counts = [round(node_count * weight) for weight in self.outcome_weights] + + toposort = list(nx.topological_sort(self.G)) + logger.debug(f"Toposort: {toposort[:4]}...{toposort[-4:]}") + + outcome_idx = 0 + assigned_counts = [0 for _ in self.outcomes.outcomes] + for node in toposort: + # step through the nodes in topological order + # and assign outcomes to each node + self.G.nodes[node]["outcome"] = outcome_idx + assigned_counts[outcome_idx] += 1 + + # if we've assigned enough of this outcome, move on to the next outcome + if ( + outcome_idx < (len(self.outcomes.outcomes)) + and outcome_counts[outcome_idx] <= assigned_counts[outcome_idx] + ): + outcome_idx += 1 + + logger.debug(f"Expected counts: {dict(zip(outcomes,outcome_counts))}") + logger.debug(f"Assigned counts: {dict(zip(outcomes,assigned_counts))}") + + def _add_edges(self): + # for each node, create an edge to the next node if the next node is strictly greater than the current node + for u, v in itertools.product(self.G.nodes, self.G.nodes): + if u == v: + # don't create an edge from a node to itself + continue + + # if the next node has at least one value greater than the current node + if all(u[i] <= v[i] for i in range(len(u))): + # then create an edge from the current node to the next node + self.G.add_edge(u, v) + + # the previous loop creates a much larger graph than we need + # so replace it with the transitive reduction of the graph + logger.debug(f"Edge count (pre-reduction): {len(self.G.edges)}") + self.G = nx.transitive_reduction(self.G) + logger.info(f"Edge count: {len(self.G.edges)}") + + def _add_nodes(self): + # then get the cartesian product of the values + # so [[0,1,2],[0,1],[0,1,2]] becomes + # [[0,0,0],[0,0,1],[0,0,2],[0,1,0],[0,1,1],[0,1,2]] + vec = self._enumerated_vec + + self.bottom = tuple([min(t) for t in vec]) + self.top = tuple([max(t) for t in vec]) + + logger.debug(f"Top node: {self.top}") + logger.debug(f"Bottom node: {self.bottom}") + + # add a node for each cartesian product of the elements of vec + for node in itertools.product(*vec): + node = tuple(node) + self.G.add_node(node) + + node_count = len(self.G.nodes) + logger.info(f"Node count: {node_count}") + return node_count + + def _enumerate_dp_values(self): + # for each decision point in the group, get an enumeration of the values + # so [[a,b,c],[d,e],[f,g,h]] becomes [[0,1,2],[0,1],[0,1,2]] + vec = [] + for dp in self.dpg.decision_points: + vec.append(tuple(range(len(dp.values)))) + + logger.debug(f"Enumerated vector: {vec}") + + self._enumerated_vec = vec + + +def main(): + from ssvc.decision_points.automatable import AUTOMATABLE_1 + from ssvc.decision_points.exploitation import EXPLOITATION_1 + from ssvc.decision_points.human_impact import HUMAN_IMPACT_1 + from ssvc.decision_points.system_exposure import SYSTEM_EXPOSURE_1_0_1 + from ssvc.outcomes.groups import DSOI + + # set up logging + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + hdlr = logging.StreamHandler() + logger.addHandler(hdlr) + + dpg = SsvcDecisionPointGroup( + name="Dummy Decision Point Group", + description="Dummy decision point group", + version="1.0.0", + decision_points=[ + EXPLOITATION_1, + SYSTEM_EXPOSURE_1_0_1, + AUTOMATABLE_1, + HUMAN_IMPACT_1, + ], + ) + + with PolicyGenerator( + dp_group=dpg, outcomes=DSOI, outcome_weights=[0.097, 0.583, 0.278, 0.042] + ) as pg: + pg.emit_policy() + + # check policy against csv_analyzer + df = pg.clean_policy() + imp = csv_analyzer.drop_col_feature_importance(df, "outcome") + + print(imp) + + +if __name__ == "__main__": + main() diff --git a/src/test/test_analyze_csv.py b/src/test/test_csv_analyzer.py similarity index 75% rename from src/test/test_analyze_csv.py rename to src/test/test_csv_analyzer.py index 742ee25f..7210132a 100644 --- a/src/test/test_analyze_csv.py +++ b/src/test/test_csv_analyzer.py @@ -1,3 +1,16 @@ +# Copyright (c) 2023 Carnegie Mellon University and Contributors. +# - see Contributors.md for a full list of Contributors +# - see ContributionInstructions.md for information on how you can Contribute to this project +# Stakeholder Specific Vulnerability Categorization (SSVC) is +# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed +# with this Software or contact permission@sei.cmu.edu for full terms. +# Created, in part, with funding and support from the United States Government +# (see Acknowledgments file). This program may include and/or can make use of +# certain third party source code, object code, documentation and other files +# (“Third Party Software”). See LICENSE.md for more details. +# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the +# U.S. Patent and Trademark Office by Carnegie Mellon University + import unittest import pandas as pd @@ -136,6 +149,27 @@ def test_parse_args(self): self.assertEqual(args.outcol, "priority") self.assertFalse(args.permutation) + def test_create_dt_classifier(self): + df = pd.DataFrame() + target = "outcome" + + # key error when target is not in df.columns + self.assertNotIn(target, df.columns) + self.assertRaises(KeyError, acsv._create_dt_classifier, df, target) + + df["color"] = [1, 1, 1, 1, 2, 2, 2, 2] + df["size"] = [1, 2, 3, 4, 1, 2, 3, 4] + df["outcome"] = [1, 1, 1, 1, 2, 2, 2, 2] + + # create_dt_classifier should return a DecisionTreeClassifier object + model, x, y = acsv._create_dt_classifier(df, target) + self.assertIsInstance(model, acsv.DecisionTreeClassifier) + self.assertIsInstance(x, pd.DataFrame) + self.assertIsInstance(y, pd.Series) + + self.assertIn("color_", x.columns) + self.assertIn("size_", x.columns) + if __name__ == "__main__": unittest.main() diff --git a/src/test/test_dp_base.py b/src/test/test_dp_base.py index cb1ecaac..969ef03c 100644 --- a/src/test/test_dp_base.py +++ b/src/test/test_dp_base.py @@ -1,9 +1,25 @@ +# Copyright (c) 2023 Carnegie Mellon University and Contributors. +# - see Contributors.md for a full list of Contributors +# - see ContributionInstructions.md for information on how you can Contribute to this project +# Stakeholder Specific Vulnerability Categorization (SSVC) is +# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed +# with this Software or contact permission@sei.cmu.edu for full terms. +# Created, in part, with funding and support from the United States Government +# (see Acknowledgments file). This program may include and/or can make use of +# certain third party source code, object code, documentation and other files +# (“Third Party Software”). See LICENSE.md for more details. +# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the +# U.S. Patent and Trademark Office by Carnegie Mellon University + import unittest + import ssvc.decision_points.base as base class MyTestCase(unittest.TestCase): def setUp(self) -> None: + self.original_registry = base.REGISTERED_DECISION_POINTS.copy() + self.value = base.SsvcDecisionPointValue( name="foo", key="bar", description="baz" ) @@ -17,6 +33,30 @@ def setUp(self) -> None: values=(self.value,), ) + def tearDown(self) -> None: + # restore the original registry + base.REGISTERED_DECISION_POINTS = self.original_registry + + def test_registry(self): + # just by creating the objects, they should be registered + self.assertIn(self.dp, base.REGISTERED_DECISION_POINTS) + + dp2 = base.SsvcDecisionPoint( + name="asdfad", + key="asdfasdf", + description="asdfasdf", + version="1.33.1", + namespace="asdfasdf", + values=( + self.value, + self.value, + ), + ) + + dp2._comment = "asdfasdfasdf" + + self.assertIn(dp2, base.REGISTERED_DECISION_POINTS) + def test_ssvc_value(self): obj = self.value # should have name, key, description @@ -56,7 +96,19 @@ def test_ssvc_decision_point_json_roundtrip(self): self.assertGreater(len(json), 0) obj2 = base.SsvcDecisionPoint.from_json(json) - self.assertEqual(obj, obj2) + self.assertEqual(obj.to_dict(), obj2.to_dict()) + + def test_dp_to_table(self): + obj = self.dp + + table = base.dp_to_table(obj) + + self.assertIn(obj.description, table) + self.assertIn("Value", table) + self.assertIn("Key", table) + self.assertIn("Description", table) + self.assertIn(obj.name, table) + self.assertIn(obj.key, table) if __name__ == "__main__": diff --git a/src/test/test_dp_groups.py b/src/test/test_dp_groups.py new file mode 100644 index 00000000..08c48b29 --- /dev/null +++ b/src/test/test_dp_groups.py @@ -0,0 +1,77 @@ +# Copyright (c) 2023 Carnegie Mellon University and Contributors. +# - see Contributors.md for a full list of Contributors +# - see ContributionInstructions.md for information on how you can Contribute to this project +# Stakeholder Specific Vulnerability Categorization (SSVC) is +# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed +# with this Software or contact permission@sei.cmu.edu for full terms. +# Created, in part, with funding and support from the United States Government +# (see Acknowledgments file). This program may include and/or can make use of +# certain third party source code, object code, documentation and other files +# (“Third Party Software”). See LICENSE.md for more details. +# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the +# U.S. Patent and Trademark Office by Carnegie Mellon University + +import unittest + +import ssvc.dp_groups.base as dpg +from ssvc.decision_points import SsvcDecisionPointValue + + +class MyTestCase(unittest.TestCase): + def setUp(self) -> None: + self.dps = [] + for i in range(10): + dp = dpg.SsvcDecisionPoint( + name=f"Decision Point {i}", + key=f"DP_{i}", + description=f"Description of Decision Point {i}", + version="1.0.0", + values=( + SsvcDecisionPointValue(name="foo", key="FOO", description="foo"), + SsvcDecisionPointValue(name="bar", key="BAR", description="bar"), + SsvcDecisionPointValue(name="baz", key="BAZ", description="baz"), + ), + ) + self.dps.append(dp) + + def tearDown(self) -> None: + pass + + def test_iter(self): + # add them to a decision point group + g = dpg.SsvcDecisionPointGroup( + name="Test Group", description="Test Group", decision_points=self.dps + ) + + self.assertTrue(hasattr(g, "__iter__")) + + # iterate over the group + for dp in g: + self.assertIn(dp, self.dps) + + def test_len(self): + # add them to a decision point group + g = dpg.SsvcDecisionPointGroup( + name="Test Group", description="Test Group", decision_points=self.dps + ) + + self.assertEqual(len(self.dps), len(g.decision_points)) + self.assertEqual(len(self.dps), len(g)) + + def test_json_roundtrip(self): + # add them to a decision point group + g = dpg.SsvcDecisionPointGroup( + name="Test Group", description="Test Group", decision_points=self.dps + ) + + # serialize the group to json + g_json = g.to_json() + + # deserialize the json to a new group + g2 = dpg.SsvcDecisionPointGroup.from_json(g_json) + # assert that the new group is the same as the old group + self.assertEqual(g.to_dict(), g2.to_dict()) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/test/test_outcomes.py b/src/test/test_outcomes.py new file mode 100644 index 00000000..3645c8b1 --- /dev/null +++ b/src/test/test_outcomes.py @@ -0,0 +1,52 @@ +# Copyright (c) 2023 Carnegie Mellon University and Contributors. +# - see Contributors.md for a full list of Contributors +# - see ContributionInstructions.md for information on how you can Contribute to this project +# Stakeholder Specific Vulnerability Categorization (SSVC) is +# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed +# with this Software or contact permission@sei.cmu.edu for full terms. +# Created, in part, with funding and support from the United States Government +# (see Acknowledgments file). This program may include and/or can make use of +# certain third party source code, object code, documentation and other files +# (“Third Party Software”). See LICENSE.md for more details. +# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the +# U.S. Patent and Trademark Office by Carnegie Mellon University + +import unittest + +from ssvc.outcomes.base import OutcomeGroup, OutcomeValue + +ALPHABET = "abcdefghijklmnopqrstuvwxyz" + + +class MyTestCase(unittest.TestCase): + def test_outcome_value(self): + for x in ALPHABET: + ov = OutcomeValue(key=x, name=x, description=x) + self.assertEqual(ov.key, x) + self.assertEqual(ov.name, x) + self.assertEqual(ov.description, x) + + def test_outcome_group(self): + ALPHABET + + values = [] + for x in ALPHABET: + values.append(OutcomeValue(key=x, name=x, description=x)) + + og = OutcomeGroup( + name="og", description="an outcome group", outcomes=tuple(values) + ) + + self.assertEqual(og.name, "og") + self.assertEqual(og.description, "an outcome group") + + self.assertEqual(len(og), len(ALPHABET)) + + for i, letter in enumerate(ALPHABET): + self.assertEqual(og.outcomes[i].key, letter) + self.assertEqual(og.outcomes[i].name, letter) + self.assertEqual(og.outcomes[i].description, letter) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/test/test_policy_generator.py b/src/test/test_policy_generator.py new file mode 100644 index 00000000..a396f5aa --- /dev/null +++ b/src/test/test_policy_generator.py @@ -0,0 +1,291 @@ +# Copyright (c) 2023 Carnegie Mellon University and Contributors. +# - see Contributors.md for a full list of Contributors +# - see ContributionInstructions.md for information on how you can Contribute to this project +# Stakeholder Specific Vulnerability Categorization (SSVC) is +# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed +# with this Software or contact permission@sei.cmu.edu for full terms. +# Created, in part, with funding and support from the United States Government +# (see Acknowledgments file). This program may include and/or can make use of +# certain third party source code, object code, documentation and other files +# (“Third Party Software”). See LICENSE.md for more details. +# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the +# U.S. Patent and Trademark Office by Carnegie Mellon University + +import unittest +from collections import Counter +from itertools import product + +import networkx as nx +import pandas as pd + +from ssvc.decision_points import SsvcDecisionPoint, SsvcDecisionPointValue +from ssvc.dp_groups.base import SsvcDecisionPointGroup +from ssvc.outcomes.base import OutcomeGroup, OutcomeValue +from ssvc.policy_generator import PolicyGenerator + + +class MyTestCase(unittest.TestCase): + def setUp(self) -> None: + self.og_names = ["Never", "Someday", "Today", "Now"] + self.dp_values = ["Yes", "No"] + self.dp_names = ["Who", "What", "When", "Where"] + + self.og = OutcomeGroup( + name="test", + description="test", + outcomes=[ + OutcomeValue(key=c, name=c, description=c) for c in self.og_names + ], + ) + self.dpg = SsvcDecisionPointGroup( + name="test", + description="test", + decision_points=[ + SsvcDecisionPoint( + name=c, + description=c, + key=c, + values=[ + SsvcDecisionPointValue(name=v, key=v, description=v) + for v in self.dp_values + ], + ) + for c in self.dp_names + ], + ) + + def test_pg_init(self): + self.assertEqual(4, len(self.dpg.decision_points)) + self.assertEqual(4, len(self.og.outcomes)) + + pg = PolicyGenerator(dp_group=self.dpg, outcomes=self.og) + for w in pg.outcome_weights: + self.assertEqual(0.25, w) + + self.assertIsInstance( + pg.G, + nx.DiGraph, + ) + self.assertIsNone(pg.policy) + self.assertIsNone(pg.top) + self.assertIsNone(pg.bottom) + + def test_pg_context(self): + with PolicyGenerator(dp_group=self.dpg, outcomes=self.og) as pg: + self.assertIsInstance( + pg.G, + nx.DiGraph, + ) + self.assertIsNotNone(pg.policy) + self.assertIsNotNone(pg.top) + self.assertIsNotNone(pg.bottom) + + def test_enumerate_dp_values(self): + pg = PolicyGenerator(dp_group=self.dpg, outcomes=self.og) + + self.assertIsNone(pg._enumerated_vec) + + pg._enumerate_dp_values() + self.assertEqual(4, len(pg._enumerated_vec)) + + for t in pg._enumerated_vec: + self.assertEqual(2, len(t)) + self.assertEqual((0, 1), t) + + def test_add_nodes(self): + pg = PolicyGenerator(dp_group=self.dpg, outcomes=self.og) + pg._enumerated_vec = [(0, 1), (0, 1, 2), (0, 1), (0, 1, 2, 3)] + + self.assertIsNone(pg.bottom) + self.assertIsNone(pg.top) + self.assertEqual(0, len(pg.G.nodes)) + + pg._add_nodes() + + prod = 1 + for t in pg._enumerated_vec: + prod *= len(t) + + self.assertEqual(prod, len(pg.G.nodes)) + + self.assertEqual((0, 0, 0, 0), pg.bottom) + self.assertEqual((1, 2, 1, 3), pg.top) + + self.assertIn(pg.bottom, pg.G.nodes) + self.assertIn(pg.top, pg.G.nodes) + + for i in range(2): + for j in range(3): + for k in range(2): + for l in range(4): + self.assertIn((i, j, k, l), pg.G.nodes) + + self.assertNotIn((2, 0, 0, 0), pg.G.nodes) + self.assertNotIn((0, 3, 0, 0), pg.G.nodes) + self.assertNotIn((0, 0, 2, 0), pg.G.nodes) + self.assertNotIn((0, 0, 0, 4), pg.G.nodes) + + def test_add_edges(self): + pg = PolicyGenerator(dp_group=self.dpg, outcomes=self.og) + for i, j, k in product(range(2), range(3), range(2)): + pg.G.add_node((i, j, k)) + + self.assertEqual(0, len(pg.G.edges)) + + pg._add_edges() + + expect_edges = [ + ((0, 0, 0), (1, 0, 0)), + ((0, 0, 0), (0, 1, 0)), + ((0, 0, 0), (0, 0, 1)), + ((0, 0, 1), (1, 0, 1)), + ((0, 0, 1), (0, 1, 1)), + ((0, 1, 0), (1, 1, 0)), + ((0, 1, 0), (0, 2, 0)), + ((0, 1, 0), (0, 1, 1)), + ((0, 1, 1), (0, 2, 1)), + ((0, 1, 1), (1, 1, 1)), + ((0, 2, 0), (0, 2, 1)), + ((0, 2, 0), (1, 2, 0)), + ((0, 2, 1), (1, 2, 1)), + ((1, 0, 0), (1, 0, 1)), + ((1, 0, 0), (1, 1, 0)), + ((1, 0, 1), (1, 1, 1)), + ((1, 1, 0), (1, 1, 1)), + ((1, 1, 0), (1, 2, 0)), + ((1, 1, 1), (1, 2, 1)), + ((1, 2, 0), (1, 2, 1)), + ] + self.assertEqual(len(expect_edges), len(pg.G.edges)) + + for u, v in expect_edges: + self.assertIn(u, pg.G.nodes) + self.assertIn(v, pg.G.nodes) + self.assertIn((u, v), pg.G.edges) + + def test_assign_outcomes(self): + pg = PolicyGenerator(dp_group=self.dpg, outcomes=self.og) + pg._enumerate_dp_values() + pg._add_nodes() + pg._add_edges() + + self.assertEqual(16, len(pg.G.nodes)) + self.assertEqual(32, len(pg.G.edges)) + + for node, data in pg.G.nodes.items(): + self.assertNotIn("outcome", data) + + pg._assign_outcomes() + + outcomes = [] + for node, data in pg.G.nodes.items(): + self.assertIn("outcome", data) + outcomes.append(data["outcome"]) + + # count outcomes + counts = Counter(outcomes) + self.assertEqual(len(self.og), len(counts)) + + # they should be evenly distributed + self.assertTrue(all([v == 4 for v in counts.values()])) + + def test_assign_weighted_outcomes(self): + pg = PolicyGenerator( + dp_group=self.dpg, + outcomes=self.og, + outcome_weights=[0.5, 0.25, 0.125, 0.125], + ) + pg._enumerate_dp_values() + pg._add_nodes() + pg._add_edges() + + self.assertEqual(16, len(pg.G.nodes)) + self.assertEqual(32, len(pg.G.edges)) + + for node, data in pg.G.nodes.items(): + self.assertNotIn("outcome", data) + + pg._assign_outcomes() + + outcomes = [] + for node, data in pg.G.nodes.items(): + self.assertIn("outcome", data) + outcomes.append(data["outcome"]) + + # count outcomes + counts = Counter(outcomes) + self.assertEqual(len(self.og), len(counts)) + + # they should be evenly distributed + self.assertEqual({0: 8, 1: 4, 2: 2, 3: 2}, counts) + + def test_emit_policy(self): + with PolicyGenerator(dp_group=self.dpg, outcomes=self.og) as pg: + # capture stdout + import io + import contextlib + + f = io.StringIO() + with contextlib.redirect_stdout(f): + pg.emit_policy() + + stdout = f.getvalue() + + for dpg in pg.dpg.decision_points: + self.assertIn(dpg.name, stdout) + for og in pg.outcomes.outcomes: + self.assertIn(og.name.lower(), stdout) + + def test_create_policy(self): + pg = PolicyGenerator( + dp_group=self.dpg, + outcomes=self.og, + outcome_weights=[0.5, 0.25, 0.125, 0.125], + ) + pg._enumerate_dp_values() + pg._add_nodes() + pg._add_edges() + pg._assign_outcomes() + pg._validate_paths() + + self.assertIsNone(pg.policy) + + pg._create_policy() + + self.assertIsNotNone(pg.policy) + self.assertIsInstance(pg.policy, pd.DataFrame) + self.assertEqual(16, len(pg.policy)) + + for c in self.dp_names: + self.assertIn(c, pg.policy.columns) + self.assertIn(f"idx_{c}", pg.policy.columns) + + self.assertIn("outcome", pg.policy.columns) + self.assertIn("idx_outcome", pg.policy.columns) + + for outcome in self.og_names: + self.assertIn(outcome, pg.policy.outcome.values) + + def test_validate_paths(self): + pg = PolicyGenerator( + dp_group=self.dpg, + outcomes=self.og, + outcome_weights=[0.5, 0.25, 0.125, 0.125], + ) + pg._enumerate_dp_values() + pg._add_nodes() + pg._add_edges() + pg._assign_outcomes() + + # should work fine and return None + self.assertIsNone(pg._validate_paths()) + + # unless we add a bad outcome value + pg.G.nodes[(0, 0, 0, 0)]["outcome"] = 5 + + with self.assertRaises(ValueError): + pg._validate_paths() + + +if __name__ == "__main__": + unittest.main()