From ea4d7958d76bf0515e515e6d761410601e9b8fea Mon Sep 17 00:00:00 2001 From: "Allen D. Householder" Date: Tue, 10 Oct 2023 15:01:29 -0400 Subject: [PATCH] add drop column importance (#327) * add drop column importance * add docstrings * add docstrings * more docstrings * newline * refactor and add unit tests * add test runner workflow * disable black check for now --- .github/workflows/python-app.yml | 46 ++++++ docs/reference/code/analyze_csv.md | 2 + mkdocs.yml | 7 + requirements.txt | 5 + src/analyze_csv.py | 237 ++++++++++++++++++++++++----- src/test/__init__.py | 0 src/test/test_analyze_csv.py | 143 +++++++++++++++++ 7 files changed, 399 insertions(+), 41 deletions(-) create mode 100644 .github/workflows/python-app.yml create mode 100644 docs/reference/code/analyze_csv.md create mode 100644 src/test/__init__.py create mode 100644 src/test/test_analyze_csv.py diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml new file mode 100644 index 00000000..31627fab --- /dev/null +++ b/.github/workflows/python-app.yml @@ -0,0 +1,46 @@ +# This workflow will install Python dependencies, run tests and lint with a single version of Python +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python + +name: Python application + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +permissions: + contents: read + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + with: + fetch-tags: true + - name: Set up Python 3.10 + uses: actions/setup-python@v3 + with: + python-version: "3.10" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pytest build + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi +# - uses: psf/black@stable + - name: Test with pytest + run: | + pytest +# stop here for now because we don't have a build step yet +# - name: Build +# run: | +# python -m build +# - name: Upload Artifacts +# uses: actions/upload-artifact@v3 +# with: +# name: ssvc +# path: dist/ssvc-*.tar.gz +# retention-days: 14 diff --git a/docs/reference/code/analyze_csv.md b/docs/reference/code/analyze_csv.md new file mode 100644 index 00000000..bb886969 --- /dev/null +++ b/docs/reference/code/analyze_csv.md @@ -0,0 +1,2 @@ +::: analyze_csv + diff --git a/mkdocs.yml b/mkdocs.yml index ee1534a3..cbf81193 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -69,6 +69,8 @@ nav: - System Exposure: 'reference/decision_points/system_exposure.md' - Technical Impact: 'reference/decision_points/technical_impact.md' - Value Density: 'reference/decision_points/value_density.md' + - Code: + analyze_csv: 'reference/code/analyze_csv.md' - Calculator: 'ssvc-calc/index.html' - About: - Intro: 'about/index.md' @@ -105,6 +107,10 @@ plugins: - table-reader - bibtex: bib_file: 'doc/md_src_files/sources_ssvc.bib' + - mkdocstrings: + handlers: + python: + paths: [ 'src' ] repo_url: 'https://github.com/CERTCC/SSVC' repo_name: 'CERTCC/SSVC' markdown_extensions: @@ -158,3 +164,4 @@ extra_javascript: - javascripts/tablesort.js extra_css: - stylesheets/extra.css +dev_addr: 127.0.0.1:8001 diff --git a/requirements.txt b/requirements.txt index c8939a0b..fbfe5ef8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,8 @@ mkdocs-include-markdown-plugin mkdocs-table-reader-plugin mkdocs-material mkdocs-material-extensions +mkdocstrings +mkdocstrings-python + +pandas~=2.1.1 +scikit-learn~=1.3.1 \ No newline at end of file diff --git a/src/analyze_csv.py b/src/analyze_csv.py index 27fa797a..21c22a5a 100644 --- a/src/analyze_csv.py +++ b/src/analyze_csv.py @@ -1,80 +1,235 @@ #!/usr/bin/env python -''' -file: analyze_csv -author: adh -created_at: 3/18/21 2:30 PM -''' +""" +This module provides a script for analyzing an SSVC tree csv file. + +```shell +usage: analyze_csv.py [-h] [--outcol OUTCOL] [--permutation] csvfile + +Analyze an SSVC tree csv file + +positional arguments: + csvfile the csv file to analyze + +options: + -h, --help show this help message and exit + --outcol OUTCOL the name of the outcome column + --permutation use permutation importance instead of drop column importance +``` + +Example: + Given a `test.csv` file like this: + ```csv + row,Exploitation,Exposure,Automatable,Human Impact,Priority + 1,none,small,no,low,defer + 2,none,small,no,medium,defer + 3,none,small,no,high,scheduled + ... + ``` + Analyze the csv file: + ```shell + $ python analyze_csv.py test.csv + + Feature Importance after Dropping Each Feature in test.csv + feature feature_importance + 0 exploitation_ 0.347222 + 1 human_impact_ 0.291667 + 2 automatable_ 0.180556 + 3 exposure_ 0.166667 + ``` + + Higher values imply more important features. + """ + import argparse +import sys + import pandas as pd import re from sklearn.tree import DecisionTreeClassifier import sklearn.inspection +from sklearn.base import clone + +def _col_norm(c: str) -> str: + """ + Normalize a column name -# normalize column names -def col_norm(c): - new_col = re.sub('[^0-9a-zA-Z]+',"_",c) + Args: + c: the column name to normalize + + Returns: + the normalized column name + """ + new_col = re.sub("[^0-9a-zA-Z]+", "_", c) new_col = new_col.lower() return new_col -def main(): + +def _imp_df(column_names: list, importances: list) -> pd.DataFrame: + """ + Create a dataframe of feature importances + + Args: + column_names: the names of the columns + importances: the feature importances + + Returns: + a dataframe of feature importances + """ + df = ( + pd.DataFrame({"feature": column_names, "feature_importance": importances}) + .sort_values("feature_importance", ascending=False) + .reset_index(drop=True) + ) + return df + + +def _drop_col_feat_imp( + model: DecisionTreeClassifier, + X_train: pd.DataFrame, + y_train: pd.DataFrame, + random_state: int = 99, +) -> pd.DataFrame: + # based on https://gist.github.com/erykml/6854134220276b1a50862aa486a44192#file-drop_col_feat_imp-py + # clone the model to have the exact same specification as the one initially trained + model_clone = clone(model) + # set random_state for comparability + model_clone.random_state = random_state + # training and scoring the benchmark model + model_clone.fit(X_train, y_train) + benchmark_score = model_clone.score(X_train, y_train) + # list for storing feature importances + importances = [] + + # iterating over all columns and storing feature importance (difference between benchmark and new model) + for col in X_train.columns: + model_clone = clone(model) + model_clone.random_state = random_state + model_clone.fit(X_train.drop(col, axis=1), y_train) + drop_col_score = model_clone.score(X_train.drop(col, axis=1), y_train) + importances.append(benchmark_score - drop_col_score) + + importances_df = _imp_df(X_train.columns, importances) + return importances_df + + +def _split_data(df: pd.DataFrame, target: str) -> (pd.DataFrame, pd.DataFrame): + """ + Split a dataframe into features and target + + Args: + df: the dataframe to split + target: the name of the target column + + Returns: + a tuple of (features, target) + """ + + # construct feature list + features = [c for c in df.columns if c != target] + y = df[target] + X = df[features] + return X, y + + +def _clean_table(df: pd.DataFrame) -> pd.DataFrame: + """ + Clean up a dataframe, normalizing column names and dropping columns we don't need + + Args: + df: the dataframe to clean + + Returns: + the cleaned dataframe + """ + # normalize data + df = df.rename(columns=_col_norm) + # drop columns we don't need + drop_cols = [ + "row", + ] + df = df.drop(columns=drop_cols, errors="ignore") + return df + + +def _perm_feat_imp(model, x, y): + model.random_state = 99 + model.fit(x, y) + # analyze tree + results = sklearn.inspection.permutation_importance(model, x, y) + imp = results["importances_mean"] + + imp = _imp_df(x.columns, imp) + return imp + + +def _parse_args(args) -> argparse.Namespace: # parse command line parser = argparse.ArgumentParser(description="Analyze an SSVC tree csv file") - parser.add_argument('csvfile',metavar="csvfile",type=str,help="the csv file to analyze") - parser.add_argument('--outcol',dest="outcol",type=str,help="the name of the outcome column",default="priority") - args = parser.parse_args() + parser.add_argument( + "csvfile", metavar="csvfile", type=str, help="the csv file to analyze" + ) + parser.add_argument( + "--outcol", + dest="outcol", + type=str, + help="the name of the outcome column", + default="priority", + ) + # use permutation or drop column importance? + # default is drop column + parser.add_argument( + "--permutation", + dest="permutation", + action="store_true", + help="use permutation importance instead of drop column importance", + default=False, + ) + return parser.parse_args(args) + + +def main(): + args = _parse_args(sys.argv[1:]) # read csv df = pd.read_csv(args.csvfile) + df = _clean_table(df) - # normalize data - df = df.rename(columns=col_norm) - + # check for target column target = args.outcol if target not in df.columns: - print(f"Column \'{target}\' not found in {list(df.columns)}.\nPlease specify --outcol= and try again.") + print( + f"Column '{target}' not found in {list(df.columns)}.\nPlease specify --outcol= and try again." + ) exit(1) - # drop columns we don't need - drop_cols = ['row',] - df = df.drop(columns=drop_cols,errors="ignore") - - # construct feature list - features = [c for c in df.columns if c != target] - y = df[target] - X = df[features] + X, y = _split_data(df, target) # turn features into ordinals # this assumes that every column is an ordinal label # and that the ordinals are sorted in ascending order - encoded = {c: list(enumerate(X[c].unique())) for c in X.columns} cols = [] for c in X.columns: newcol = f"{c}_" cols.append(newcol) codes = list(enumerate(X[c].unique())) - mapper = {v:k for (k,v) in codes} + mapper = {v: k for (k, v) in codes} X[newcol] = X[c].replace(mapper) X2 = X[cols] # construct tree - dt = DecisionTreeClassifier(random_state=99,criterion="entropy") - dt.fit(X2,y) - - # analyze tree - results = sklearn.inspection.permutation_importance(dt,X2,y) - - imp = results['importances_mean'] - labels = [c.replace("_","") for c in cols] + dt = DecisionTreeClassifier(random_state=99, criterion="entropy") - pairs = zip(labels,imp) - pairs = sorted(pairs,key=lambda x: x[1],reverse=True) + if args.permutation: + imp = _perm_feat_imp(dt, X2, y) + print(f"Feature Permutation Importance for {args.csvfile}") + else: + # drop columns and re-run + imp = _drop_col_feat_imp(dt, X2, y) + print(f"Drop Column Feature Importance for {args.csvfile}") - # print results - print(f"Feature Permutation Importance for {args.csvfile}") + print(imp) - for label,importance in pairs: - print(f"{label:>25}: {importance:0.4f}") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/src/test/__init__.py b/src/test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/test/test_analyze_csv.py b/src/test/test_analyze_csv.py new file mode 100644 index 00000000..115c460d --- /dev/null +++ b/src/test/test_analyze_csv.py @@ -0,0 +1,143 @@ +import contextlib +import io +import unittest + +import pandas as pd + +import analyze_csv as acsv + + +class MyTestCase(unittest.TestCase): + def test_col_norm(self): + # col_norm should remove any non-alphanumeric characters and replace them with underscores + # col_norm should convert all characters to lowercase + + # fold to lowercase + self.assertEqual(acsv._col_norm("Exploitation"), "exploitation") + self.assertEqual( + acsv._col_norm("AbcdEfghIjklmnOpqrstUvwxYz"), "abcdefghijklmnopqrstuvwxyz" + ) + + # replace strings of non-alphanumeric characters with underscores + self.assertEqual(acsv._col_norm("War!"), "war_") + self.assertEqual( + acsv._col_norm("Foo!@#$%^&*()- .,/<>;:'\"[]{}+=BAR"), "foo_bar" + ) + + def test_imp_df(self): + # given a list of column names and a list of feature importances, + # imp_df should return a dataframe with the column names and feature importances + # sorted in descending order by feature importance + + column_names = ["exploitation", "human_impact", "automatable", "exposure"] + importances = [0.347222, 0.291667, 0.180556, 0.166667] + df = acsv._imp_df(column_names, importances) + self.assertEqual(df["feature"][0], "exploitation") + self.assertEqual(df["feature_importance"][0], 0.347222) + self.assertEqual(df["feature"][1], "human_impact") + self.assertEqual(df["feature_importance"][1], 0.291667) + self.assertEqual(df["feature"][2], "automatable") + self.assertEqual(df["feature_importance"][2], 0.180556) + self.assertEqual(df["feature"][3], "exposure") + self.assertEqual(df["feature_importance"][3], 0.166667) + + def test_drop_col_feat_imp(self): + # given a model, two dataframes representing the input and output data, return a dataframe + + # create a model + model = acsv.DecisionTreeClassifier() + # create a dataframe representing the input data + df = pd.DataFrame( + { + "color": [1, 1, 1, 1, 2, 2, 2, 2], + "size": [1, 2, 3, 4, 1, 2, 3, 4], + "priority": [1, 1, 2, 2, 2, 3, 3, 3], + } + ) + x = df.drop("priority", axis=1) + y = df["priority"] + + # call drop_col_feat_imp + df = acsv._drop_col_feat_imp(model, x, y) + # assert that the dataframe returned by drop_col_feat_imp is the same as the dataframe returned by imp_df + + self.assertEqual(df["feature"][0], "color") + self.assertEqual(df["feature"][1], "size") + # I don't really know how to test a model fit, so let's just make sure + # that the column is ordered in descending order + self.assertGreaterEqual( + df["feature_importance"][0], df["feature_importance"][1] + ) + + def test_split_data(self): + df = pd.DataFrame({"A": [1, 2, 3, 4], "B": [5, 6, 7, 8], "C": [9, 10, 11, 12]}) + x, y = acsv._split_data(df, "C") + + self.assertTrue(x.equals(df.drop("C", axis=1))) + self.assertTrue(y.equals(df["C"])) + + def test_clean_table(self): + # columns get renamed + # column named "row" is dropped + + df = pd.DataFrame( + { + "row": [1, 2, 3, 4], + "A!": [1, 2, 3, 4], + "?B?": [5, 6, 7, 8], + "C with spaces": [9, 10, 11, 12], + } + ) + df = acsv._clean_table(df) + self.assertNotIn("row", df.columns) + self.assertEqual(df.columns[0], "a_") + self.assertEqual(df.columns[1], "_b_") + self.assertEqual(df.columns[2], "c_with_spaces") + + def test_perm_feat_imp(self): + model = acsv.DecisionTreeClassifier() + df = pd.DataFrame( + { + "color": [1, 1, 1, 1, 2, 2, 2, 2], + "size": [1, 2, 3, 4, 1, 2, 3, 4], + "priority": [1, 1, 2, 2, 2, 3, 3, 3], + } + ) + x = df.drop("priority", axis=1) + y = df["priority"] + + df = acsv._perm_feat_imp(model, x, y) + + self.assertIn("color", df["feature"].values) + self.assertIn("size", df["feature"].values) + # I don't really know how to test a model fit, so let's just make sure + # that the column is ordered in descending order + self.assertGreaterEqual( + df["feature_importance"][0], df["feature_importance"][1] + ) + + def test_parse_args(self): + # given a list of arguments, parse_args should return an argparse.Namespace object + # with the arguments as attributes + args = [ + "foo.csv", + "--outcol", + "bar", + "--permutation", + ] + args = acsv._parse_args(args) + self.assertEqual(args.csvfile, "foo.csv") + self.assertEqual(args.outcol, "bar") + self.assertTrue(args.permutation) + + args = [ + "foo.csv", + ] + args = acsv._parse_args(args) + self.assertEqual(args.csvfile, "foo.csv") + self.assertEqual(args.outcol, "priority") + self.assertFalse(args.permutation) + + +if __name__ == "__main__": + unittest.main()