From 8aaadd87f657e871b413dfae69a54129d4a5b901 Mon Sep 17 00:00:00 2001 From: Gaetan Lepage Date: Sun, 17 Nov 2024 22:56:57 +0100 Subject: [PATCH] chunked eval --- nixpkgs_review/nix/parallel-eval.nix | 62 +++++++++++ nixpkgs_review/review.py | 157 ++++++++++++++++++--------- 2 files changed, 170 insertions(+), 49 deletions(-) create mode 100644 nixpkgs_review/nix/parallel-eval.nix diff --git a/nixpkgs_review/nix/parallel-eval.nix b/nixpkgs_review/nix/parallel-eval.nix new file mode 100644 index 0000000..78e8479 --- /dev/null +++ b/nixpkgs_review/nix/parallel-eval.nix @@ -0,0 +1,62 @@ +/* +Invocation: +Invocation; note that the number of processes spawned is four times +the number of cores -- this helps in two ways: +1. Keeping cores busy while I/O operations are in flight +2. Since the amount of time needed for the jobs is *not* balanced + this minimizes the "tail latency" for the very last job to finish + (on one core) by making the job size smaller. +*/ +# see pkgs/top-level/nohydra +{ + checkMeta, + includeBroken ? true, + path, + systems, + localSystem, + myChunk, + numChunks, + attrPathFile, +}: let + pkgs = import { + system = localSystem; + }; + inherit (pkgs) lib; + + attrPaths = builtins.fromJSON (builtins.readFile attrPathFile); + chunkSize = (lib.length attrPaths) / numChunks; + myPaths = let + dropped = lib.drop (chunkSize * myChunk) attrPaths; + in + if myChunk == numChunks - 1 + then dropped + else lib.take chunkSize dropped; + + unfiltered = import (path + "/pkgs/top-level/release-outpaths.nix") { + inherit + checkMeta + path + includeBroken + systems + ; + }; + + f = i: m: a: + lib.mapAttrs ( + name: values: + if a ? ${name} + then + if lib.any (value: lib.length value <= i + 1) values + then a.${name} + else f (i + 1) values a.${name} + else null + ) (lib.groupBy (a: lib.elemAt a i) m); + + filtered = f 0 myPaths unfiltered; + + recurseEverywhere = val: + if lib.isDerivation val || !(lib.isAttrs val) + then val + else (builtins.mapAttrs (_: v: recurseEverywhere v) val) // {recurseForDerivations = true;}; +in + recurseEverywhere filtered diff --git a/nixpkgs_review/review.py b/nixpkgs_review/review.py index f0a68bb..3c03c11 100644 --- a/nixpkgs_review/review.py +++ b/nixpkgs_review/review.py @@ -17,7 +17,7 @@ from .github import GithubClient from .nix import Attr, nix_build, nix_eval, nix_shell from .report import Report -from .utils import System, current_system, info, sh, system_order_key, warn +from .utils import ROOT, System, current_system, info, sh, system_order_key, warn # keep up to date with `supportedPlatforms` # https://github.com/NixOS/ofborg/blob/cf2c6712bd7342406e799110e7cd465aa250cdca/ofborg/src/outpaths.nix#L12 @@ -201,8 +201,10 @@ def build_commit( # TODO: nix-eval-jobs ? base_packages: dict[System, list[Package]] = list_packages( self.builddir.nix_path, - self.systems, - self.allow, + systems=self.systems, + local_system=self.local_system, + allow=self.allow, + nixpkgs_path=str(self.builddir.worktree_dir), n_threads=self.num_parallel_evals, ) @@ -216,8 +218,10 @@ def build_commit( # TODO: nix-eval-jobs ? merged_packages: dict[System, list[Package]] = list_packages( self.builddir.nix_path, - self.systems, - self.allow, + systems=self.systems, + local_system=self.local_system, + allow=self.allow, + nixpkgs_path=str(self.builddir.worktree_dir), n_threads=self.num_parallel_evals, check_meta=True, ) @@ -427,68 +431,123 @@ def parse_packages_xml(stdout: IO[str]) -> list[Package]: def _list_packages_system( - system: System, - nix_path: str, + chunk_id: int, + num_chunks: int, + systems: set[System], + local_system: System, + nixpkgs_path: str, + paths_json_filename: str, + paths_filename: str, allow: AllowedFeatures, check_meta: bool = False, -) -> list[Package]: - cmd = [ +) -> list[str]: + cmd: list[str] = [ "nix-env", - "--extra-experimental-features", - "" if allow.url_literals else "no-url-literals", - "--option", - "system", - system, - "-f", - "", - "--nix-path", - nix_path, "-qaP", - "--xml", + "--no-name", "--out-path", "--show-trace", - "--allow-import-from-derivation" - if allow.ifd - else "--no-allow-import-from-derivation", ] - if check_meta: - cmd.append("--meta") + cmd.extend(["-f", str(ROOT.joinpath("nix/parallel-eval.nix"))]) + + cmd.extend( + ["--arg", "systems", f"[{", ".join([f'"{system}"' for system in systems])}]"] + ) + cmd.extend(["--arg", "checkMeta", "true"]) + cmd.extend(["--arg", "includeBroken", "true"]) + cmd.extend(["--argstr", "localSystem", local_system]) + cmd.extend(["--arg", "attrPathFile", paths_json_filename]) + cmd.extend(["--arg", "path", nixpkgs_path]) + cmd.extend(["--arg", "numChunks", str(num_chunks)]) + cmd.extend(["--arg", "myChunk", str(chunk_id)]) + + # cmd.extend([">", paths_filename]) + info("$ " + " ".join(cmd)) - with tempfile.NamedTemporaryFile(mode="w") as tmp: - res = subprocess.run(cmd, stdout=tmp) - if res.returncode != 0: - raise NixpkgsReviewError( - f"Failed to list packages: nix-env failed with exit code {res.returncode}" - ) - tmp.flush() - with open(tmp.name, encoding="utf-8") as f: - return parse_packages_xml(f) + res = subprocess.run( + cmd, + check=True, + stdout=subprocess.PIPE, + text=True, + ) + if res.returncode != 0: + raise NixpkgsReviewError( + f"Failed to list packages: nix-env failed with exit code {res.returncode}" + ) + results: list[str] = [] + for line in res.stdout.split("\n"): + # . (python312Packages.numpy.x86_64-linux) + results.append(line.split()[0].strip()) + + return results def list_packages( nix_path: str, systems: set[System], + local_system: System, allow: AllowedFeatures, + nixpkgs_path: str, n_threads: int, check_meta: bool = False, ) -> dict[System, list[Package]]: - results: dict[System, list[Package]] = {} - with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor: - future_to_system = { - executor.submit( - _list_packages_system, - system=system, - nix_path=nix_path, - allow=allow, - check_meta=check_meta, - ): system - for system in systems - } - for future in concurrent.futures.as_completed(future_to_system): - system = future_to_system[future] - results[system] = future.result() + with tempfile.TemporaryDirectory() as temp_dir: + paths_json_filename: str = os.path.join(temp_dir, "paths.json") + with open(paths_json_filename, mode="w") as paths_json: + subprocess.run( + args=[ + "nix-instantiate", + "--eval", + "--strict", + "--json", + "--arg", + "enableWarnings", + "false", + f"{nixpkgs_path}/pkgs/top-level/release-attrpaths-superset.nix", + "-A", + "paths", + ], + stdout=paths_json, + stderr=subprocess.DEVNULL, + check=True, + ) - return results + paths_filename: str = os.path.join(temp_dir, "paths") + num_chunks: int = 4 * n_threads + results: dict[System, list[Package]] = {system: [] for system in systems} + with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor: + futures = [ + executor.submit( + _list_packages_system, + chunk_id=chunk_id, + systems=systems, + # nix_path=nix_path, + local_system=local_system, + nixpkgs_path=nixpkgs_path, + paths_json_filename=paths_json_filename, + paths_filename=paths_filename, + num_chunks=num_chunks, + allow=allow, + check_meta=check_meta, + ) + for chunk_id in range(num_chunks) + ] + for future in concurrent.futures.as_completed(futures): + for result in future.result(): + # result = "python312Packages.numpy.x86_64-linux" + + # ["python312Packages", "numpy", "x86_64-linux"] + splitted_result: list[str] = result.split(".") + + # "x86_64-linux" + system = splitted_result.pop() + + path: str = ".".join(splitted_result) + + # TODO: create a Package object + results[system].append(path) + + return results def package_attrs(