-
Notifications
You must be signed in to change notification settings - Fork 3
/
yaml_shellcheck.py
executable file
·479 lines (414 loc) · 17.2 KB
/
yaml_shellcheck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
#!/usr/bin/env python3
#
# tool to run shellcheck on script-blocks in
# bitbucket-pipelines.yml or .gitlab-ci.yml config files
#
# Copyright (c) 2021, Martin Schütte <[email protected]>
import argparse
import logging
import shutil
import subprocess
import tempfile
from pathlib import Path
import re
import sys
from ruamel.yaml import YAML
from ruamel.yaml.nodes import ScalarNode
global logger
def setup():
global logger
parser = argparse.ArgumentParser(
description="run shellcheck on script blocks from .gitlab-ci.yml or bitbucket-pipelines.yml",
)
parser.add_argument("files", nargs="+", help="YAML files to read")
parser.add_argument(
"-o",
"--outdir",
help="output directory (default: create temporary directory)",
type=str,
)
parser.add_argument(
"-k",
"--keep",
help="keep (do not delete) output directory",
action="store_true",
)
parser.add_argument("-d", "--debug", help="debug output", action="store_true")
parser.add_argument(
"-s",
"--shell",
help="default shebang line to add to shell script snippets (default: '#!/bin/sh -e')",
default="#!/bin/sh -e",
type=str,
)
parser.add_argument(
"-c",
"--command",
help="shellcheck command to run (default: shellcheck)",
default="shellcheck",
type=str,
)
args = parser.parse_args()
# Enable logging
console_handler = logging.StreamHandler()
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.DEBUG if args.debug else logging.INFO,
handlers=[console_handler],
)
logger = logging.getLogger(__name__)
if not args.outdir:
args.outdir = tempfile.mkdtemp(prefix="py_yaml_shellcheck_")
logger.debug("created working dir: %s", args.outdir)
return args
def get_bitbucket_scripts(data):
"""Bitbucket pipeline files are deeply nested, and they do not
publish a schema, as a result we simply search all scripts elements,
something like `pipelines.**.script`
"""
logging.debug("get_bitbucket_scripts()")
def get_scripts(data, path):
results = {}
if isinstance(data, dict):
if "script" in data:
script = data["script"]
if isinstance(script, str):
results[f"{path}/script"] = script
elif isinstance(script, list):
results[f"{path}/script"] = "\n".join(script)
for key in data:
results.update(get_scripts(data[key], f"{path}/{key}"))
elif isinstance(data, list):
for i, item in enumerate(data):
results.update(get_scripts(item, f"{path}/{i}"))
elif (
isinstance(data, str)
or isinstance(data, int)
or isinstance(data, float)
or data is None
):
pass
return results
result = {}
if "pipelines" not in data:
return result
result = get_scripts(data["pipelines"], "pipelines")
logging.debug("got scripts: %s", result)
for key in result:
logging.debug("%s: %s", key, result[key])
return result
def get_github_scripts(data):
"""GitHub Workflows: from the docs the search pattern should be `jobs.<job_id>.steps[*].run`
https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions
as a simple first step we match on `jobs.**.run`, excluding `jobs.**.defaults.run`
and,
GitHub Actions: match on runs.steps[*].run
"""
def get_runs(data, path):
results = {}
if isinstance(data, dict):
if "run" in data and isinstance(data["run"], str):
script = data["run"]
if not isinstance(script, str):
raise ValueError(
"unexpected format of 'run' element, expected string and found "
+ str(type(script))
)
# GitHub Actions uses '${{ foo }}' for context expressions,
# we try to be useful and replace these with a simple shell variable
script = re.sub(r"\${{.*}}", "$ACTION_EXPRESSION", script)
results[f"{path}/run"] = script
for key in data:
if key == "defaults":
# GitHub Actions has jobs.<job_id>.defaults.run which we don't want to match on.
continue
results.update(get_runs(data[key], f"{path}/{key}"))
elif isinstance(data, list):
for i, item in enumerate(data):
results.update(get_runs(item, f"{path}/{i}"))
elif (
isinstance(data, str)
or isinstance(data, int)
or isinstance(data, float)
or data is None
):
pass
return results
result = {}
if "jobs" in data: # workflow
result = get_runs(data["jobs"], "jobs")
elif "runs" in data: # actions
result = get_runs(data["runs"], "runs")
else: # neither
return result
logging.debug("got scripts: %s", result)
for key in result:
logging.debug("%s: %s", key, result[key])
return result
def get_circleci_scripts(data):
"""CircleCI: match on `jobs.*.steps.run`,
https://circleci.com/docs/2.0/configuration-reference/
"""
result = {}
if "jobs" not in data:
return result
for jobkey, job in data["jobs"].items():
steps = job.get("steps", [])
logging.debug("job %s: %s", jobkey, steps)
for step_num, step in enumerate(steps):
if not (isinstance(step, dict) and "run" in step):
logging.debug("job %s, step %d: no run declaration", jobkey, step_num)
continue
run = step["run"]
shell = None
logging.debug("job %s, step %d: found %s %s", jobkey, step_num, type(run), run)
# challenge: the run element can have different data types
if isinstance(run, dict):
if "command" in run:
script = run["command"]
if "shell" in run:
shell = run["shell"]
else:
# this step could be a directive like `save_cache`
logging.info("job %s, step %d: no 'command' attribute", jobkey, step_num)
script = ""
elif isinstance(run, str):
script = run
elif isinstance(run, list):
script = "\n".join(run)
else:
raise ValueError(f"unexpected data type {type(run)} in job {jobkey} step {step_num}")
# CircleCI uses '<< foo >>' for context parameters,
# we try to be useful and replace these with a simple shell variable
script = re.sub(r'<<\s*([^\s>]*)\s*>>', r'"$PARAMETER"', script)
# add shebang line if we saw a 'shell' attribute
# TODO: we do not check for supported shell like we do in get_ansible_scripts
# TODO: not sure what is the best handling of bash vs. sh as default here
if not shell:
# CircleCI default shell, see doc "Default shell options"
shell = "/bin/bash"
script = f"#!{shell}\n" + script
result[f"{jobkey}/{step_num}"] = script
logging.debug("got scripts: %s", result)
for key in result:
logging.debug("%s: %s", key, result[key])
return result
def get_drone_scripts(data):
"""Drone CI has a simple file format, with all scripts in
`lists in steps[].commands[]`, see https://docs.drone.io/yaml/exec/
"""
result = {}
if "steps" not in data:
return result
jobkey = data.get("name", "unknown")
for item in data["steps"]:
section = item.get("name")
result[f"{jobkey}/{section}"] = "\n".join(item.get("commands", []))
logging.debug("got scripts: %s", result)
for key in result:
logging.debug("%s: %s", key, result[key])
return result
def get_gitlab_scripts(data):
"""GitLab is nice, as far as I can tell its files have a
flat hierarchy with many small job entities"""
def flatten_nested_string_lists(data):
"""helper function"""
if isinstance(data, str):
return data
elif isinstance(data, list):
return "\n".join([flatten_nested_string_lists(item) for item in data])
else:
raise ValueError(
f"unexpected data type {type(data)} in script section: {data}"
)
result = {}
for jobkey in data:
if not isinstance(data[jobkey], dict):
continue
for section in ["script", "before_script", "after_script"]:
if section in data[jobkey]:
script = data[jobkey][section]
script = flatten_nested_string_lists(script)
# replace inputs interpolation with dummy variable
script = re.sub(r'\$\[\[\s*(inputs\.[^]]*)\s*]]', r'$INPUT_PARAMETER', script)
result[f"{jobkey}/{section}"] = flatten_nested_string_lists(script)
return result
def get_ansible_scripts(data):
"""Ansible: read all `shell` tasks
https://docs.ansible.com/ansible/2.9/modules/shell_module.html
"""
def get_shell_tasks(data, path):
results = {}
for i, task in enumerate(data):
# look for simple and qualified collection names:
for key in ["shell", "ansible.builtin.shell"]:
if key in task:
# may be a string or a dict
if isinstance(task[key], str):
script = task[key]
elif isinstance(task[key], dict) and "cmd" in task[key]:
script = task[key]["cmd"]
else:
raise ValueError(f"unexpected data in element {path}/{i}/{key}")
# we cannot evaluate Jinja templates
# at least try to be useful and replace every expression with a variable
# we do not handle Jinja statements like loops of if/then/else
script = re.sub(r"{{.*?}}", "$JINJA_EXPRESSION", script)
# try to add shebang line from 'executable' if it looks like a shell
executable = task.get("args", {}).get("executable", None)
if executable and "sh" not in executable:
logging.debug(
f"unsupported shell %s, in %d/%s", executable, i, key
)
# ignore this task
continue
elif executable:
script = f"#!{executable}\n" + script
results[f"{path}/{i}/{key}"] = script
if "tasks" in task:
results.update(get_shell_tasks(task["tasks"], f"{path}/{i}"))
if "block" in task:
results.update(get_shell_tasks(task["block"], f"{path}/block-{i}"))
return results
result = {}
if isinstance(data, list):
result = get_shell_tasks(data, "root")
else:
return result
logging.debug("got scripts: %s", result)
for key in result:
logging.debug("%s: %s", key, result[key])
return result
def select_yaml_schema(documents, filename):
# try to determine CI system and file format,
# returns the right get function, and the document index to read
if len(documents) < 1:
raise ValueError(f"read {filename}, no valid YAML document, this should never happen")
# special case first: GitLab 17 adds an optional spec-document before the main content document
# https://docs.gitlab.com/ee/ci/yaml/inputs.html
if len(documents) == 2 and "spec" in documents[0]:
logging.info(f"read {filename} as GitLab CI config with spec header section ...")
return get_gitlab_scripts, 1
# in previous versions we ignored additional documents in YAML files
if len(documents) > 1:
logging.warning(f"{filename} contains multiple YAML, only the first will be checked")
# else: documents == 1; all other tools and cases only read a single YAML document
data = documents[0]
if isinstance(data, dict) and "pipelines" in data:
logging.info(f"read {filename} as Bitbucket Pipelines config...")
return get_bitbucket_scripts, 0
elif isinstance(data, dict) and "on" in data and "jobs" in data:
logging.info(f"read {filename} as GitHub Workflows config...")
return get_github_scripts, 0
elif isinstance(data, dict) and "inputs" in data and "runs" in data:
logging.info(f"read {filename} as GitHub Actions config...")
return get_github_scripts, 0
elif isinstance(data, dict) and "version" in data and "jobs" in data:
logging.info(f"read {filename} as CircleCI config...")
return get_circleci_scripts, 0
elif (
isinstance(data, dict) and "steps" in data and "kind" in data and "type" in data
):
logging.info(f"read {filename} as Drone CI config...")
return get_drone_scripts, 0
elif isinstance(data, list):
logging.info(f"read {filename} as Ansible file...")
return get_ansible_scripts, 0
elif isinstance(data, dict):
# TODO: GitLab is the de facto default value, we should add more checks here
logging.info(f"read {filename} as GitLab CI config...")
return get_gitlab_scripts, 0
else:
raise ValueError(f"read {filename}, cannot determine CI tool from YAML structure")
def read_yaml_file(filename):
"""read YAML and return dict with job name and shell scripts"""
global logger
class GitLabReference(object):
yaml_tag = "!reference"
def __init__(self, elements: list[str]):
self.elements = elements
def __str__(self):
return f"# {self.yaml_tag}[{', '.join(self.elements)}]"
@classmethod
def to_yaml(cls, representer, node):
return representer.represent_scalar(
cls.yaml_tag, f"[{', '.join(node.value)}]"
)
@classmethod
def from_yaml(cls, constructor, node):
if not all(isinstance(element, ScalarNode) for element in node.value):
raise ValueError(
f"Tag {cls.yaml_tag} only support a sequence of ScalarNode "
f"(should all be strings), but found "
f"{[type(element) for element in node.value]}")
# we instantiate a GitLabReference with cls, but return its string representation
return str(cls([element.value for element in node.value]))
yaml = YAML(typ="safe")
yaml.register_class(GitLabReference)
with open(filename, "r") as f:
yaml_documents = list(yaml.load_all(f))
get_script_snippets, document_index = select_yaml_schema(yaml_documents, filename)
return get_script_snippets(yaml_documents[document_index])
def write_tmp_files(args, data):
filelist = []
outdir = Path(args.outdir)
outdir.mkdir(exist_ok=True, parents=True)
for filename in data:
# workaround for absolute path in filename, insert path component to avoid collisions
if filename[0] == '/':
subdir = outdir / "__root__" / filename[1:]
else:
subdir = outdir / filename
# remove all '..' elements from the tmp file paths
if ".." in subdir.parts:
parts = filter(lambda a: a != "..", list(subdir.parts))
subdir = Path(*parts)
subdir.mkdir(exist_ok=True, parents=True)
for jobkey in data[filename]:
scriptfilename = subdir / jobkey
scriptfilename.parent.mkdir(exist_ok=True, parents=True)
with open(scriptfilename, "w") as f:
if not data[filename][jobkey].startswith("#!"):
f.write(f"{args.shell}\n")
f.write(data[filename][jobkey])
rel_filename = str(scriptfilename.relative_to(outdir))
filelist.append(rel_filename)
logger.debug("wrote file %s", rel_filename)
return filelist
def run_shellcheck(args, filenames):
if not filenames:
return
shellcheck_command = args.command.split() + filenames
logger.debug("Starting subprocess: %s", shellcheck_command)
proc = subprocess.run(
shellcheck_command,
shell=False,
stdout=sys.stdout,
stderr=sys.stderr,
cwd=args.outdir,
)
logger.debug("subprocess result: %s", proc)
return proc
def cleanup_files(args):
if args.keep:
return
else:
shutil.rmtree(args.outdir)
logger.debug("removed working dir %s", args.outdir)
def main():
args = setup()
filenames = []
for filename in args.files:
try:
result = {filename: read_yaml_file(filename)}
logger.debug("%s", result)
filenames.extend(write_tmp_files(args, result))
except ValueError as e:
# only log, then ignore the error
logger.error("%s", e)
check_proc_result = run_shellcheck(args, filenames)
cleanup_files(args)
# exit with shellcheck exit code
sys.exit(check_proc_result.returncode if check_proc_result else 0)
if __name__ == "__main__":
main()