Skip to content

Commit

Permalink
chunked eval
Browse files Browse the repository at this point in the history
  • Loading branch information
GaetanLepage committed Nov 17, 2024
1 parent fbacee3 commit 8aaadd8
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 49 deletions.
62 changes: 62 additions & 0 deletions nixpkgs_review/nix/parallel-eval.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
Invocation:
Invocation; note that the number of processes spawned is four times
the number of cores -- this helps in two ways:
1. Keeping cores busy while I/O operations are in flight
2. Since the amount of time needed for the jobs is *not* balanced
this minimizes the "tail latency" for the very last job to finish
(on one core) by making the job size smaller.
*/
# see pkgs/top-level/nohydra
{
checkMeta,
includeBroken ? true,
path,
systems,
localSystem,
myChunk,
numChunks,
attrPathFile,
}: let
pkgs = import <nixpkgs> {
system = localSystem;
};
inherit (pkgs) lib;

attrPaths = builtins.fromJSON (builtins.readFile attrPathFile);
chunkSize = (lib.length attrPaths) / numChunks;
myPaths = let
dropped = lib.drop (chunkSize * myChunk) attrPaths;
in
if myChunk == numChunks - 1
then dropped
else lib.take chunkSize dropped;

unfiltered = import (path + "/pkgs/top-level/release-outpaths.nix") {
inherit
checkMeta
path
includeBroken
systems
;
};

f = i: m: a:
lib.mapAttrs (
name: values:
if a ? ${name}
then
if lib.any (value: lib.length value <= i + 1) values
then a.${name}
else f (i + 1) values a.${name}
else null
) (lib.groupBy (a: lib.elemAt a i) m);

filtered = f 0 myPaths unfiltered;

recurseEverywhere = val:
if lib.isDerivation val || !(lib.isAttrs val)
then val
else (builtins.mapAttrs (_: v: recurseEverywhere v) val) // {recurseForDerivations = true;};
in
recurseEverywhere filtered
157 changes: 108 additions & 49 deletions nixpkgs_review/review.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .github import GithubClient
from .nix import Attr, nix_build, nix_eval, nix_shell
from .report import Report
from .utils import System, current_system, info, sh, system_order_key, warn
from .utils import ROOT, System, current_system, info, sh, system_order_key, warn

# keep up to date with `supportedPlatforms`
# https://github.com/NixOS/ofborg/blob/cf2c6712bd7342406e799110e7cd465aa250cdca/ofborg/src/outpaths.nix#L12
Expand Down Expand Up @@ -201,8 +201,10 @@ def build_commit(
# TODO: nix-eval-jobs ?
base_packages: dict[System, list[Package]] = list_packages(
self.builddir.nix_path,
self.systems,
self.allow,
systems=self.systems,
local_system=self.local_system,
allow=self.allow,
nixpkgs_path=str(self.builddir.worktree_dir),
n_threads=self.num_parallel_evals,
)

Expand All @@ -216,8 +218,10 @@ def build_commit(
# TODO: nix-eval-jobs ?
merged_packages: dict[System, list[Package]] = list_packages(
self.builddir.nix_path,
self.systems,
self.allow,
systems=self.systems,
local_system=self.local_system,
allow=self.allow,
nixpkgs_path=str(self.builddir.worktree_dir),
n_threads=self.num_parallel_evals,
check_meta=True,
)
Expand Down Expand Up @@ -427,68 +431,123 @@ def parse_packages_xml(stdout: IO[str]) -> list[Package]:


def _list_packages_system(
system: System,
nix_path: str,
chunk_id: int,
num_chunks: int,
systems: set[System],
local_system: System,
nixpkgs_path: str,
paths_json_filename: str,
paths_filename: str,
allow: AllowedFeatures,
check_meta: bool = False,
) -> list[Package]:
cmd = [
) -> list[str]:
cmd: list[str] = [
"nix-env",
"--extra-experimental-features",
"" if allow.url_literals else "no-url-literals",
"--option",
"system",
system,
"-f",
"<nixpkgs>",
"--nix-path",
nix_path,
"-qaP",
"--xml",
"--no-name",
"--out-path",
"--show-trace",
"--allow-import-from-derivation"
if allow.ifd
else "--no-allow-import-from-derivation",
]
if check_meta:
cmd.append("--meta")
cmd.extend(["-f", str(ROOT.joinpath("nix/parallel-eval.nix"))])

cmd.extend(
["--arg", "systems", f"[{", ".join([f'"{system}"' for system in systems])}]"]
)
cmd.extend(["--arg", "checkMeta", "true"])
cmd.extend(["--arg", "includeBroken", "true"])
cmd.extend(["--argstr", "localSystem", local_system])
cmd.extend(["--arg", "attrPathFile", paths_json_filename])
cmd.extend(["--arg", "path", nixpkgs_path])
cmd.extend(["--arg", "numChunks", str(num_chunks)])
cmd.extend(["--arg", "myChunk", str(chunk_id)])

# cmd.extend([">", paths_filename])

info("$ " + " ".join(cmd))
with tempfile.NamedTemporaryFile(mode="w") as tmp:
res = subprocess.run(cmd, stdout=tmp)
if res.returncode != 0:
raise NixpkgsReviewError(
f"Failed to list packages: nix-env failed with exit code {res.returncode}"
)
tmp.flush()
with open(tmp.name, encoding="utf-8") as f:
return parse_packages_xml(f)
res = subprocess.run(
cmd,
check=True,
stdout=subprocess.PIPE,
text=True,
)
if res.returncode != 0:
raise NixpkgsReviewError(
f"Failed to list packages: nix-env failed with exit code {res.returncode}"
)
results: list[str] = []
for line in res.stdout.split("\n"):
# <package_path>.<system> (python312Packages.numpy.x86_64-linux)
results.append(line.split()[0].strip())

return results


def list_packages(
nix_path: str,
systems: set[System],
local_system: System,
allow: AllowedFeatures,
nixpkgs_path: str,
n_threads: int,
check_meta: bool = False,
) -> dict[System, list[Package]]:
results: dict[System, list[Package]] = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
future_to_system = {
executor.submit(
_list_packages_system,
system=system,
nix_path=nix_path,
allow=allow,
check_meta=check_meta,
): system
for system in systems
}
for future in concurrent.futures.as_completed(future_to_system):
system = future_to_system[future]
results[system] = future.result()
with tempfile.TemporaryDirectory() as temp_dir:
paths_json_filename: str = os.path.join(temp_dir, "paths.json")
with open(paths_json_filename, mode="w") as paths_json:
subprocess.run(
args=[
"nix-instantiate",
"--eval",
"--strict",
"--json",
"--arg",
"enableWarnings",
"false",
f"{nixpkgs_path}/pkgs/top-level/release-attrpaths-superset.nix",
"-A",
"paths",
],
stdout=paths_json,
stderr=subprocess.DEVNULL,
check=True,
)

return results
paths_filename: str = os.path.join(temp_dir, "paths")
num_chunks: int = 4 * n_threads
results: dict[System, list[Package]] = {system: [] for system in systems}
with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
futures = [
executor.submit(
_list_packages_system,
chunk_id=chunk_id,
systems=systems,
# nix_path=nix_path,
local_system=local_system,
nixpkgs_path=nixpkgs_path,
paths_json_filename=paths_json_filename,
paths_filename=paths_filename,
num_chunks=num_chunks,
allow=allow,
check_meta=check_meta,
)
for chunk_id in range(num_chunks)
]
for future in concurrent.futures.as_completed(futures):
for result in future.result():
# result = "python312Packages.numpy.x86_64-linux"

# ["python312Packages", "numpy", "x86_64-linux"]
splitted_result: list[str] = result.split(".")

# "x86_64-linux"
system = splitted_result.pop()

path: str = ".".join(splitted_result)

# TODO: create a Package object
results[system].append(path)

return results


def package_attrs(
Expand Down

0 comments on commit 8aaadd8

Please sign in to comment.