From eecabac82712982b105216821d5932f2dd56e5aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Mon, 16 Sep 2024 19:08:42 +0200 Subject: [PATCH] fix: improved eval stream capturing logic --- gptme/eval/main.py | 51 +++++++++++++++++++++++----------------------- tests/test_eval.py | 2 +- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/gptme/eval/main.py b/gptme/eval/main.py index 05c2fbdd..a7ec1228 100644 --- a/gptme/eval/main.py +++ b/gptme/eval/main.py @@ -74,13 +74,10 @@ class ProcessError: class StreamTee(io.TextIOBase): """Capture stdout or stderr to a stream and optionally keep original streams intact.""" - # NOTE: toggling keep_stream can be useful for debugging - def __init__(self, stream, keep=None): + def __init__(self, stream, keep: bool = False): self.stream = stream self.captured = io.StringIO() - self.keep_stream = keep if keep is not None else StreamTee.default_keep - - default_keep = True # This will be set in main() based on number of tests + self.keep_stream = keep def write(self, message) -> int: self.captured.write(message) @@ -93,7 +90,12 @@ def getvalue(self): def act_process( - agent: Agent, files, prompt, queue: "Queue[ProcessResult]", test_name: str + agent: Agent, + files, + prompt, + queue: "Queue[ProcessResult]", + test_name: str, + parallel: bool, ): # Configure logging for this subprocess subprocess_logger = logging.getLogger(f"eval:{agent.model}@{test_name}") @@ -105,8 +107,8 @@ def act_process( pgrp = os.getpgrp() # redirect stdout and stderr to streams - stdout = StreamTee(sys.stdout) - stderr = StreamTee(sys.stderr) + stdout = StreamTee(sys.stdout, keep=not parallel) + stderr = StreamTee(sys.stderr, keep=not parallel) sys.stdout, sys.stderr = stdout, stderr # type: ignore def error_handler(e): @@ -138,7 +140,7 @@ def sigterm_handler(*_): # TODO: rewrite to run in Docker? Would help with capturing output + process management. -def execute(test: ExecTest, agent: Agent, timeout: int) -> ExecResult: +def execute(test: ExecTest, agent: Agent, timeout: int, parallel: bool) -> ExecResult: """ Executes the code for a specific model with a timeout. """ @@ -150,7 +152,7 @@ def execute(test: ExecTest, agent: Agent, timeout: int) -> ExecResult: queue = manager.Queue() p = Process( target=act_process, - args=(agent, test["files"], test["prompt"], queue, test["name"]), + args=(agent, test["files"], test["prompt"], queue, test["name"], parallel), ) p.start() p.join(timeout) @@ -265,28 +267,36 @@ def run_evals( cleanup_on_sigterm() model_results = defaultdict(list) + parallel = min(len(tests), parallel) with ProcessPoolExecutor(parallel) as executor: futures = [] future_to_model_test = {} for model in models: for test in tests: - future = executor.submit(execute, test, GPTMe(model=model), timeout) + future = executor.submit( + execute, + test, + GPTMe(model=model), + timeout, + parallel > 1, + ) futures.append(future) future_to_model_test[future] = (model, test) for future in as_completed(futures, timeout=timeout + 10): model, test = future_to_model_test[future] + test_name = test["name"] try: result = future.result( timeout=1 ) # Short timeout to quickly move to next future model_results[model].append(result) - print(f"=== Completed test {test.name} for model {model} ===") + print(f"=== Completed test {test_name} for model {model} ===") except concurrent.futures.TimeoutError: - logger.warning(f"Test {test.name} for model {model} timed out") + logger.warning(f"Test {test_name} for model {model} timed out") model_results[model].append( ExecResult( - name=test["name"], + name=test_name, status="timeout", results=[], timings={"gen": timeout, "run": 0, "eval": 0}, @@ -370,17 +380,11 @@ def print_model_results_table(model_results: dict[str, list[ExecResult]]): ) @click.option("--timeout", "-t", default=30, help="Timeout for code generation") @click.option("--parallel", "-p", default=10, help="Number of parallel evals to run") -@click.option( - "--keep-output", - is_flag=True, - help="Always keep output in console, regardless of test count", -) def main( eval_names_or_result_files: list[str], _model: list[str], timeout: int, parallel: int, - keep_output: bool, ): """ Run evals for gptme. @@ -416,17 +420,14 @@ def main( if not tests_to_run: tests_to_run = tests_default - # Set the default_keep value based on the number of tests and the keep_output flag - StreamTee.default_keep = keep_output or len(tests_to_run) == 1 - print("=== Running evals ===") model_results = run_evals(tests_to_run, models, timeout, parallel) print("\n=== Finished ===\n") - print("\n\n=== Model Results ===") + print("\n=== Model Results ===") print_model_results(model_results) - print("\n\n=== Model Comparison ===") + print("\n=== Model Comparison ===") print_model_results_table(model_results) # Write results to CSV diff --git a/tests/test_eval.py b/tests/test_eval.py index 7cacd4af..38330521 100644 --- a/tests/test_eval.py +++ b/tests/test_eval.py @@ -29,7 +29,7 @@ def test_eval(test): See pytest_generate_tests() below. """ agent = GPTMe("openai/gpt-4o") - result = execute(test, agent, timeout=30) + result = execute(test, agent, timeout=30, parallel=False) assert all(case.passed for case in result.results)