-
Notifications
You must be signed in to change notification settings - Fork 482
/
Copy pathmain.py
154 lines (130 loc) · 5.06 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import argparse
import json
import sys
from typing import List
from typing import Optional
from . import audit
from .core import baseline
from .core import plugins
from .core.log import log
from .core.scan import get_files_to_scan
from .core.scan import scan_for_allowlisted_secrets_in_file
from .core.scan import scan_line
from .core.secrets_collection import SecretsCollection
from .core.usage import ParserBuilder
from .exceptions import InvalidBaselineError
from .settings import get_plugins
from .settings import get_settings
def main(argv: Optional[List[str]] = None) -> int:
if not argv and len(sys.argv) == 1: # pragma: no cover
argv = ['--help']
args = parse_args(argv)
if args.verbose: # pragma: no cover
log.set_debug_level(args.verbose)
if args.action == 'scan':
handle_scan_action(args)
elif args.action == 'audit':
handle_audit_action(args)
return 0
def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
return ParserBuilder().add_console_use_arguments().parse_args(argv)
def handle_scan_action(args: argparse.Namespace) -> None:
if args.list_all_plugins:
# NOTE: If there was a baseline provided, it would already have been parsed and
# settings populated by the time it reaches here.
print('\n'.join(get_settings().plugins))
return
if args.string:
line = args.string
if isinstance(args.string, bool):
# Support stdin usage, rather than specifying on CLI.
line = sys.stdin.read().splitlines()[0]
print(scan_adhoc_string(line))
return
if args.only_allowlisted:
secrets = SecretsCollection(root=args.custom_root)
for filename in get_files_to_scan(
*args.path,
should_scan_all_files=args.all_files,
root=args.custom_root,
):
for secret in scan_for_allowlisted_secrets_in_file(filename):
secrets[secret.filename].add(secret)
print(json.dumps(baseline.format_for_output(secrets), indent=2))
return
secrets = baseline.create(
*args.path,
should_scan_all_files=args.all_files,
root=args.custom_root,
num_processors=args.num_cores,
)
if args.baseline is not None:
# The pre-commit hook's baseline upgrade is to trim the supplied baseline for non-existent
# secrets, and to upgrade the format to the latest version. This is because the pre-commit
# hook is not supposed to allow any new secrets to enter commit history.
#
# Unlike that, this scan's intention is to re-catalog the secrets in the repository. This
# means that we should favor (and allow) the newly found secrets, and create a baseline
# with them. It should also upgrade the format to the latest version, which is done by
# default.
secrets.merge(args.baseline)
baseline.save_to_file(secrets, args.baseline_filename)
else:
print(json.dumps(baseline.format_for_output(secrets, is_slim_mode=args.slim), indent=2))
def scan_adhoc_string(line: str) -> str:
registered_plugins = get_plugins()
results = {
plugin.secret_type: 'False'
for plugin in registered_plugins
}
for secret in scan_line(line):
results[secret.type] = (
plugins.initialize.from_secret_type(secret.type) # type: ignore
.format_scan_result(secret)
)
# Pretty formatting
longest_plugin_name_length = max([
len(plugin.__class__.__name__)
for plugin in registered_plugins
])
return '\n'.join([
('{:%d}: {}' % longest_plugin_name_length).format(
plugin.__class__.__name__,
results[plugin.secret_type],
)
for plugin in sorted(registered_plugins, key=lambda x: str(x.__class__.__name__))
])
def handle_audit_action(args: argparse.Namespace) -> None:
try:
if args.stats:
stats = audit.analytics.calculate_statistics_for_baseline(args.filename[0])
if args.diff:
# TODO
raise NotImplementedError
if args.json:
print(json.dumps(stats.json(), indent=2))
else:
print(str(stats))
elif args.report:
class_to_print = None
if args.only_real:
class_to_print = audit.report.SecretClassToPrint.REAL_SECRET
elif args.only_false:
class_to_print = audit.report.SecretClassToPrint.FALSE_POSITIVE
print(
json.dumps(
audit.report.generate_report(args.filename[0], class_to_print),
indent=4,
sort_keys=True,
),
)
else:
# Starts interactive session.
if args.diff:
# Show changes
audit.compare_baselines(args.filename[0], args.filename[1])
else:
# Label secrets
audit.audit_baseline(args.filename[0])
except InvalidBaselineError:
pass