diff --git a/gptme/eval/main.py b/gptme/eval/main.py index 39322752..f76a4c55 100644 --- a/gptme/eval/main.py +++ b/gptme/eval/main.py @@ -7,6 +7,7 @@ import csv import logging import multiprocessing +import multiprocessing.resource_tracker import subprocess import sys from collections import defaultdict @@ -17,6 +18,7 @@ import multiprocessing_logging from tabulate import tabulate +from ..message import len_tokens from .run import run_evals from .suites import suites, tests_default, tests_map from .types import ExecResult, ExecTest @@ -29,41 +31,61 @@ # message=r"resource_tracker:.*", # ) - # Configure logging, including fully-qualified module names logging.basicConfig( level=logging.INFO, # helpful in debugging: %(processName)s format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", ) -multiprocessing_logging.install_mp_handler() logging.getLogger("httpx").setLevel(logging.WARNING) logger = logging.getLogger(__name__) project_dir = Path(__file__).parent.parent.parent - def print_model_results(model_results: dict[str, list[ExecResult]]): + total_tests = 0 + total_duration = 0.0 + total_tokens = 0 + for model, results in model_results.items(): print(f"\nResults for model: {model}") duration_total = sum( result.timings["gen"] + result.timings["run"] + result.timings["eval"] for result in results ) - print(f"Completed {len(results)} tests in {duration_total:.2f}s:") + model_total_tokens = sum( + len_tokens(result.gen_stdout) + len_tokens(result.run_stdout) + for result in results + ) + print( + f"Completed {len(results)} tests in {duration_total:.2f}s/{model_total_tokens}tok:" + ) for result in results: cases = result.results checkmark = "✅" if cases and all(case.passed for case in cases) else "❌" duration_result = ( result.timings["gen"] + result.timings["run"] + result.timings["eval"] ) + gen_tokens = len_tokens(result.gen_stdout) + run_tokens = len_tokens(result.run_stdout) + result_total_tokens = gen_tokens + run_tokens print( - f"{checkmark} {result.name} in {duration_result:.2f}s (gen: {result.timings['gen']:.2f}s, run: {result.timings['run']:.2f}s, eval: {result.timings['eval']:.2f}s)" + f"{checkmark} {result.name}: {duration_result:.2f}s/{result_total_tokens}tok " + f"(gen: {result.timings['gen']:.2f}s/{gen_tokens}tok, " + f"run: {result.timings['run']:.2f}s/{run_tokens}tok, " + f"eval: {result.timings['eval']:.2f}s)" ) for case in cases: checkmark = "✅" if case.passed else "❌" print(f" {checkmark} {case.name}") + total_tests += len(results) + total_duration += duration_total + total_tokens += model_total_tokens + + print("\nTotal across all models:") + print(f"Completed {total_tests} tests in {total_duration:.2f}s/{total_tokens}tok") + def print_model_results_table(model_results: dict[str, list[ExecResult]]): test_names = { @@ -80,11 +102,15 @@ def print_model_results_table(model_results: dict[str, list[ExecResult]]): passed = all(case.passed for case in result.results) checkmark = "✅" if result.status == "success" and passed else "❌" duration = sum(result.timings.values()) + gen_tokens = len_tokens(result.gen_stdout) + run_tokens = len_tokens(result.run_stdout) reason = "timeout" if result.status == "timeout" else "" if reason: row.append(f"{checkmark} {reason}") else: - row.append(f"{checkmark} {duration:.2f}s") + row.append( + f"{checkmark} {duration:.2f}s/{gen_tokens+run_tokens}tok" + ) except StopIteration: row.append("❌ N/A") table_data.append(row) @@ -114,6 +140,9 @@ def main( Pass test names to run, or result files to print. """ + # init + multiprocessing_logging.install_mp_handler() + models = _model or [ "openai/gpt-4o", "anthropic/claude-3-5-sonnet-20240620", @@ -121,24 +150,24 @@ def main( ] results_files = [f for f in eval_names_or_result_files if f.endswith(".csv")] - for results_file in results_files: - p = Path(results_file) - if p.exists(): - results = read_results_from_csv(str(p)) - print(f"\n{results_file}") - print(f"{'=' * len(results_file)}") - print_model_results_table(results) - else: - print(f"Error: File {results_file} not found") - sys.exit(1) if results_files: + for results_file in results_files: + p = Path(results_file) + if p.exists(): + results = read_results_from_csv(str(p)) + print(f"\n{results_file}") + print(f"{'=' * len(results_file)}") + print_model_results(results) + print("\n=== Model Comparison ===") + print_model_results_table(results) + else: + print(f"Error: File {results_file} not found") + sys.exit(1) sys.exit(0) tests_to_run: list[ExecTest] = [] for test_name in eval_names_or_result_files: - if test_name in results_files: - continue - elif test_name in tests_map: + if test_name in tests_map: tests_to_run.append(tests_map[test_name]) elif test_name in suites: tests_to_run.extend(suites[test_name]) @@ -164,12 +193,22 @@ def main( sys.exit(0) +def read_log_file(file_path: Path) -> str: + if file_path.exists(): + with open(file_path) as f: + return f.read() + return "" + + def read_results_from_csv(filename: str) -> dict[str, list[ExecResult]]: model_results = defaultdict(list) + results_dir = Path(filename).parent with open(filename, newline="") as csvfile: reader = csv.DictReader(csvfile) for row in reader: model = row["Model"] + test_dir = results_dir / model / row["Test"] + result = ExecResult( name=row["Test"], status="success" if row["Passed"] == "true" else "error", @@ -179,10 +218,10 @@ def read_results_from_csv(filename: str) -> dict[str, list[ExecResult]]: "run": float(row["Run Time"]), "eval": float(row["Eval Time"]), }, - stdout="", # We don't have stdout in the CSV - stderr="", # We don't have stderr in the CSV - run_stdout="", # We don't have run_stdout in the CSV - run_stderr="", # We don't have run_stderr in the CSV + gen_stdout=read_log_file(test_dir / "gen_stdout.txt"), + gen_stderr=read_log_file(test_dir / "gen_stderr.txt"), + run_stdout=read_log_file(test_dir / "run_stdout.txt"), + run_stderr=read_log_file(test_dir / "run_stderr.txt"), ) model_results[model].append(result) return dict(model_results) @@ -240,9 +279,9 @@ def write_results_to_csv(model_results: dict[str, list[ExecResult]]): run_stderr_file = test_dir / "run_stderr.txt" with open(gen_stdout_file, "w") as f: - f.write(result.stdout) + f.write(result.gen_stdout) with open(gen_stderr_file, "w") as f: - f.write(result.stderr) + f.write(result.gen_stderr) with open(run_stdout_file, "w") as f: f.write(result.run_stdout) with open(run_stderr_file, "w") as f: @@ -290,7 +329,7 @@ def write_results_to_csv(model_results: dict[str, list[ExecResult]]): writer.writeheader() for model, results in model_results.items(): for result in results: - # Needs to pass all checks, and needs to have results (not empty, as in case of timeout) + # Needs to pass all checks, and needs to have results (not empty, as in case of error/timeout) passed = ( all(case.passed for case in result.results) if result.results @@ -302,19 +341,11 @@ def write_results_to_csv(model_results: dict[str, list[ExecResult]]): test_dir.mkdir(parents=True, exist_ok=True) # Save each stream to a separate file - gen_stdout_file = test_dir / "gen_stdout.txt" - gen_stderr_file = test_dir / "gen_stderr.txt" - run_stdout_file = test_dir / "run_stdout.txt" - run_stderr_file = test_dir / "run_stderr.txt" - - with open(gen_stdout_file, "w") as f: - f.write(result.stdout) - with open(gen_stderr_file, "w") as f: - f.write(result.stderr) - with open(run_stdout_file, "w") as f: - f.write(result.run_stdout) - with open(run_stderr_file, "w") as f: - f.write(result.run_stderr) + streams = ["gen_stdout", "gen_stderr", "run_stdout", "run_stderr"] + for stream in streams: + stream_file = test_dir / f"{stream}.txt" + with open(stream_file, "w") as f: + f.write(getattr(result, stream)) writer.writerow( { @@ -326,18 +357,8 @@ def write_results_to_csv(model_results: dict[str, list[ExecResult]]): "Run Time": result.timings["run"], "Eval Time": result.timings["eval"], "Commit Hash": commit_hash, - "Gen Stdout File": gen_stdout_file.relative_to(results_dir), - "Gen Stderr File": gen_stderr_file.relative_to(results_dir), - "Run Stdout File": run_stdout_file.relative_to(results_dir), - "Run Stderr File": run_stderr_file.relative_to(results_dir), } ) print(f"\nResults saved to {csv_filename.resolve()}") print(f"Output files saved in {results_dir.resolve()}") - - -if __name__ == "__main__": - # This ensures compatibility across platforms - multiprocessing.set_start_method("spawn") - main() diff --git a/gptme/eval/run.py b/gptme/eval/run.py index ff0d6738..d007d2f8 100644 --- a/gptme/eval/run.py +++ b/gptme/eval/run.py @@ -3,6 +3,7 @@ import inspect import io import logging +import multiprocessing import os import signal import sys @@ -10,8 +11,7 @@ from collections import defaultdict from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass -from multiprocessing import Manager, Process, Queue -from queue import Empty +from multiprocessing import Manager, Process from typing import Union from .agents import Agent, GPTMe @@ -96,8 +96,8 @@ def _handle_future(future): status="timeout", results=[], timings={"gen": timeout, "run": 0, "eval": 0}, - stdout="", - stderr="", + gen_stdout="", + gen_stderr="", run_stdout="", run_stderr="", ) @@ -119,6 +119,11 @@ def _handle_future(future): _handle_future(future) future.cancel() + # Ensure all processes are terminated + for process in multiprocessing.active_children(): + process.terminate() + process.join() + # sort model_results by test order for model in model_results: model_results[model] = sorted( @@ -137,43 +142,107 @@ def execute(test: ExecTest, agent: Agent, timeout: int, parallel: bool) -> ExecR logger.info(f'Running "{test["name"]}" for {agent.model}') with Manager() as manager: - queue = manager.Queue() + result_dict = manager.dict() p = Process( target=act_process, - args=(agent, test["files"], test["prompt"], queue, test["name"], parallel), + args=( + agent, + test["files"], + test["prompt"], + result_dict, + test["name"], + parallel, + ), ) p.start() - p.join(timeout) + try: + p.join(timeout) + + time_gen = 0.0 + time_run = 0.0 + time_eval = 0.0 + + status: Status = "success" + if p.is_alive(): + logger.info("Timeout reached, terminating process") + p.terminate() + p.join(timeout=1) + status = "timeout" + time_gen = timeout + + if "result" in result_dict: + result = result_dict["result"] + time_gen = result.get("duration", 0.0) + status = result.get("status", "success") + files = result.get("files", {}) + gen_stdout = result.get("stdout", "") + gen_stderr = result.get("stderr", "") + else: + logger.error("No result in shared dictionary") + return ExecResult( + name=test["name"], + status="error", + results=[], + timings={"gen": time_gen, "run": time_run, "eval": time_eval}, + gen_stdout="", + gen_stderr="", + run_stdout="", + run_stderr="", + ) - time_gen = 0.0 - time_run = 0.0 - time_eval = 0.0 + logger.debug("Got result") + + if status != "timeout" and status != "error": + # check and collect results + run_start = time.time() + env = SimpleExecutionEnv() + env.upload(files) + logger.debug(f"Running check: {test['run']}") + stdout_run, stderr_run, exit_code = env.run(test["run"]) + time_run = time.time() - run_start + + files = env.download() + + ctx = ResultContext(files, stdout_run, stderr_run, exit_code) + results: list[CaseResult] = [] + print(f"\n--- Results for '{test['name']}' with {agent.model} ---") + for name, case in test["expect"].items(): + code = inspect.getsource(case).strip() + eval_start = time.time() + try: + passed = case(ctx) + except Exception as e: + print(f"Error while checking {name}: {e}") + passed = False + eval_duration = time.time() - eval_start + checkmark = "✅" if passed else "❌" + print(f"{checkmark} {name:20s}") + results.append( + CaseResult( + name=name, passed=passed, code=code, duration=eval_duration + ) + ) + print("--- End of results ---\n") - status: Status = "success" - if p.is_alive(): - logger.info("Timeout reached, terminating process") - p.terminate() - p.join(timeout=1) - status = "timeout" - time_gen = timeout + time_eval = sum(r.duration for r in results) + else: + results = [] + stdout_run, stderr_run = "", "" - logger.debug("Getting result from queue") - try: - result = queue.get(timeout=1) - except Empty: - logger.error("Queue is empty, expected a result") return ExecResult( name=test["name"], - status="error", - results=[], + status=status, + results=results, timings={"gen": time_gen, "run": time_run, "eval": time_eval}, - stdout="", - stderr="", - run_stdout="", - run_stderr="", + gen_stdout=gen_stdout, + gen_stderr=gen_stderr, + run_stdout=stdout_run, + run_stderr=stderr_run, ) - - logger.debug("Got result") + finally: + if p.is_alive(): + p.terminate() + p.join(timeout=1) if status != "timeout": time_gen = result.duration stdout, stderr = result.stdout, result.stderr @@ -184,8 +253,8 @@ def execute(test: ExecTest, agent: Agent, timeout: int, parallel: bool) -> ExecR status="timeout" if status == "timeout" else "error", results=[], timings={"gen": time_gen, "run": time_run, "eval": time_eval}, - stdout=stdout, - stderr=stderr, + gen_stdout=stdout, + gen_stderr=stderr, run_stdout="", run_stderr="", ) @@ -232,8 +301,8 @@ def execute(test: ExecTest, agent: Agent, timeout: int, parallel: bool) -> ExecR "run": time_run, "eval": time_eval, }, - stdout=stdout, - stderr=stderr, + gen_stdout=stdout, + gen_stderr=stderr, run_stdout=stdout_run, run_stderr=stderr_run, ) @@ -261,7 +330,7 @@ def act_process( agent: Agent, files, prompt, - queue: "Queue[ProcessResult]", + result_dict: dict, test_name: str, parallel: bool, ): @@ -285,7 +354,13 @@ def error_handler(e): duration = time.time() - start if not isinstance(e, KeyboardInterrupt): subprocess_logger.error(f"Error: {e}") - queue.put(ProcessError(str(e), stdout.getvalue(), stderr.getvalue(), duration)) + result_dict["result"] = { + "status": "error", + "message": str(e), + "stdout": stdout.getvalue(), + "stderr": stderr.getvalue(), + "duration": duration, + } # kill child processes os.killpg(pgrp, signal.SIGKILL) @@ -304,7 +379,13 @@ def sigterm_handler(*_): return duration = time.time() - start - queue.put(ProcessSuccess(files, stdout.getvalue(), stderr.getvalue(), duration)) + result_dict["result"] = { + "status": "success", + "files": files, + "stdout": stdout.getvalue(), + "stderr": stderr.getvalue(), + "duration": duration, + } subprocess_logger.info("Success") # kill child processes diff --git a/gptme/eval/types.py b/gptme/eval/types.py index 00d2058d..0ced4800 100644 --- a/gptme/eval/types.py +++ b/gptme/eval/types.py @@ -40,8 +40,8 @@ class ExecResult: status: Status results: list[CaseResult] timings: dict[str, float] - stdout: str - stderr: str + gen_stdout: str + gen_stderr: str run_stdout: str run_stderr: str