From 2adcf8a8dc6b2e860e3ec91ad2a43555f860873c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= Date: Tue, 10 Aug 2021 22:21:16 +0200 Subject: [PATCH] mkosi: record package list in a json manifest The output is something like this { "packages": [ { "type": "rpm", "name": "acl", "version": "2.3.1-2.fc35.x86_64" }, { "type": "rpm", "name": "alternatives", "version": "1.19-1.fc35.x86_64" }, { "type": "rpm", "name": "audit-libs", "version": "3.0.3-2.fc35.x86_64" }, ... ] } This is a partial step towards #700, but has other uses too. --- mkosi/__init__.py | 45 ++++++++++++++++++++++++---- mkosi/manifest.py | 76 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 5 deletions(-) create mode 100644 mkosi/manifest.py diff --git a/mkosi/__init__.py b/mkosi/__init__.py index a962c844d3..c1be4661c0 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -62,6 +62,7 @@ cast, ) +from .manifest import Manifest from .backend import ( ARG_DEBUG, CommandLineArguments, @@ -4292,6 +4293,23 @@ def dir_size(path: str) -> int: return dir_sum +def save_manifest(args: CommandLineArguments, manifest: Manifest) -> None: + if manifest.has_data(): + with complete_step(f"Saving manifest {args.output}.manifest"): + f: TextIO = cast( + TextIO, + tempfile.NamedTemporaryFile( + mode="w+", + encoding="utf-8", + prefix=".mkosi-", + dir=os.path.dirname(args.output), + ), + ) + with f: + manifest.write_json(f) + _link_output(args, f.name, f"{args.output}.manifest") + + def print_output_size(args: CommandLineArguments) -> None: if args.output_format in (OutputFormat.directory, OutputFormat.subvolume): MkosiPrinter.print_step("Resulting image size is " + format_bytes(dir_size(args.output)) + ".") @@ -5286,6 +5304,7 @@ def unlink_output(args: CommandLineArguments) -> None: if not args.skip_final_phase: with complete_step("Removing output files…"): unlink_try_hard(args.output) + unlink_try_hard(f"{args.output}.manifest") if args.checksum: unlink_try_hard(args.output_checksum) @@ -6260,7 +6279,13 @@ def empty(cls) -> "BuildOutput": def build_image( - args: CommandLineArguments, root: str, *, do_run_build_script: bool, for_cache: bool = False, cleanup: bool = False + args: CommandLineArguments, + root: str, + *, + manifest: Optional[Manifest] = None, + do_run_build_script: bool, + for_cache: bool = False, + cleanup: bool = False, ) -> BuildOutput: # If there's no build script set, there's no point in executing # the build script iteration. Let's quit early. @@ -6335,6 +6360,10 @@ def build_image( setup_network_veth(args, root, do_run_build_script, cached_tree) run_postinst_script(args, root, loopdev, do_run_build_script, for_cache) + if manifest: + with complete_step("Recording packages in manifest…"): + manifest.record_packages(root) + if cleanup: clean_package_manager_metadata(root) remove_files(args, root) @@ -6509,12 +6538,13 @@ def remove_artifacts( unlink_try_hard(root_home(args, root)) -def build_stuff(args: CommandLineArguments) -> None: +def build_stuff(args: CommandLineArguments) -> Manifest: make_output_dir(args) setup_package_cache(args) workspace = setup_workspace(args) image = BuildOutput.empty() + manifest = Manifest(args) # Make sure tmpfiles' aging doesn't interfere with our workspace # while we are working on it. @@ -6553,7 +6583,7 @@ def build_stuff(args: CommandLineArguments) -> None: # Run the image builder for the second (final) stage if not args.skip_final_phase: with complete_step("Running second (final) stage…"): - image = build_image(args, root, do_run_build_script=False, cleanup=True) + image = build_image(args, root, manifest=manifest, do_run_build_script=False, cleanup=True) else: MkosiPrinter.print_step("Skipping (second) final image build phase.") @@ -6585,6 +6615,8 @@ def build_stuff(args: CommandLineArguments) -> None: if image.root_hash is not None: MkosiPrinter.print_step(f"Root hash is {image.root_hash}.") + return manifest + def check_root() -> None: if os.getuid() != 0: @@ -7120,12 +7152,15 @@ def run_verb(raw: argparse.Namespace) -> None: check_root() check_native(args) init_namespace(args) - build_stuff(args) - print_output_size(args) + manifest = build_stuff(args) if args.auto_bump: bump_image_version(args) + save_manifest(args, manifest) + + print_output_size(args) + if args.verb in ("shell", "boot"): run_shell(args) diff --git a/mkosi/manifest.py b/mkosi/manifest.py new file mode 100644 index 0000000000..deece216ea --- /dev/null +++ b/mkosi/manifest.py @@ -0,0 +1,76 @@ +# SPDX-License-Identifier: LGPL-2.1+ + +import dataclasses + +import json +from subprocess import DEVNULL, PIPE +from textwrap import dedent + +from typing import cast, Any, Dict, IO, List, Optional + +from .backend import run, CommandLineArguments, PackageType, Distribution + + +@dataclasses.dataclass +class PackageManifest: + """A description of a package + + The fields used here must match + https://systemd.io/COREDUMP_PACKAGE_METADATA/#well-known-keys. + """ + + type: str + name: str + version: str + size: int + + def json(self) -> Dict[str, str]: + return { + "type": self.type, + "name": self.name, + "version": self.version, + } + + +@dataclasses.dataclass +class Manifest: + args: CommandLineArguments + packages: List[PackageManifest] = dataclasses.field(default_factory=list) + + def record_packages(self, root: str) -> None: + if cast(Any, self.args.distribution).package_type == PackageType.rpm: + self.record_rpm_packages(root) + # TODO: add implementations for other package managers + + def record_rpm_packages(self, root: str) -> None: + c = run( + ["rpm", f"--root={root}", "-qa", "--qf", r"%{NEVRA}\t%{SOURCERPM}\t%{NAME}\t%{SIZE}\n"], + stdout=PIPE, + stderr=DEVNULL, + universal_newlines=True, + ) + + packages = sorted(c.stdout.splitlines()) + + for package in packages: + nevra, srpm, name, size = package.split("\t") + + assert nevra.startswith(f"{name}-") + evra = nevra[len(name) + 1 :] + + size = int(size) + + package = PackageManifest("rpm", name, evra, size) + self.packages.append(package) + + def has_data(self) -> bool: + # We might add more data in the future + return len(self.packages) > 0 + + def json(self) -> Dict[str, Any]: + return { + "packages": [package.json() for package in self.packages], + } + + def write_json(self, out: IO[str]) -> None: + json.dump(self.json(), out, indent=2)