Skip to content

Commit

Permalink
fix: improved eval stream capturing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ErikBjare committed Sep 16, 2024
1 parent 25de7f7 commit eecabac
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 26 deletions.
51 changes: 26 additions & 25 deletions gptme/eval/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,10 @@ class ProcessError:
class StreamTee(io.TextIOBase):
"""Capture stdout or stderr to a stream and optionally keep original streams intact."""

# NOTE: toggling keep_stream can be useful for debugging
def __init__(self, stream, keep=None):
def __init__(self, stream, keep: bool = False):
self.stream = stream
self.captured = io.StringIO()
self.keep_stream = keep if keep is not None else StreamTee.default_keep

default_keep = True # This will be set in main() based on number of tests
self.keep_stream = keep

def write(self, message) -> int:
self.captured.write(message)
Expand All @@ -93,7 +90,12 @@ def getvalue(self):


def act_process(
agent: Agent, files, prompt, queue: "Queue[ProcessResult]", test_name: str
agent: Agent,
files,
prompt,
queue: "Queue[ProcessResult]",
test_name: str,
parallel: bool,
):
# Configure logging for this subprocess
subprocess_logger = logging.getLogger(f"eval:{agent.model}@{test_name}")
Expand All @@ -105,8 +107,8 @@ def act_process(
pgrp = os.getpgrp()

# redirect stdout and stderr to streams
stdout = StreamTee(sys.stdout)
stderr = StreamTee(sys.stderr)
stdout = StreamTee(sys.stdout, keep=not parallel)
stderr = StreamTee(sys.stderr, keep=not parallel)
sys.stdout, sys.stderr = stdout, stderr # type: ignore

def error_handler(e):
Expand Down Expand Up @@ -138,7 +140,7 @@ def sigterm_handler(*_):


# TODO: rewrite to run in Docker? Would help with capturing output + process management.
def execute(test: ExecTest, agent: Agent, timeout: int) -> ExecResult:
def execute(test: ExecTest, agent: Agent, timeout: int, parallel: bool) -> ExecResult:
"""
Executes the code for a specific model with a timeout.
"""
Expand All @@ -150,7 +152,7 @@ def execute(test: ExecTest, agent: Agent, timeout: int) -> ExecResult:
queue = manager.Queue()
p = Process(
target=act_process,
args=(agent, test["files"], test["prompt"], queue, test["name"]),
args=(agent, test["files"], test["prompt"], queue, test["name"], parallel),
)
p.start()
p.join(timeout)
Expand Down Expand Up @@ -265,28 +267,36 @@ def run_evals(
cleanup_on_sigterm()

model_results = defaultdict(list)
parallel = min(len(tests), parallel)
with ProcessPoolExecutor(parallel) as executor:
futures = []
future_to_model_test = {}
for model in models:
for test in tests:
future = executor.submit(execute, test, GPTMe(model=model), timeout)
future = executor.submit(
execute,
test,
GPTMe(model=model),
timeout,
parallel > 1,
)
futures.append(future)
future_to_model_test[future] = (model, test)

for future in as_completed(futures, timeout=timeout + 10):
model, test = future_to_model_test[future]
test_name = test["name"]
try:
result = future.result(
timeout=1
) # Short timeout to quickly move to next future
model_results[model].append(result)
print(f"=== Completed test {test.name} for model {model} ===")
print(f"=== Completed test {test_name} for model {model} ===")
except concurrent.futures.TimeoutError:
logger.warning(f"Test {test.name} for model {model} timed out")
logger.warning(f"Test {test_name} for model {model} timed out")
model_results[model].append(
ExecResult(
name=test["name"],
name=test_name,
status="timeout",
results=[],
timings={"gen": timeout, "run": 0, "eval": 0},
Expand Down Expand Up @@ -370,17 +380,11 @@ def print_model_results_table(model_results: dict[str, list[ExecResult]]):
)
@click.option("--timeout", "-t", default=30, help="Timeout for code generation")
@click.option("--parallel", "-p", default=10, help="Number of parallel evals to run")
@click.option(
"--keep-output",
is_flag=True,
help="Always keep output in console, regardless of test count",
)
def main(
eval_names_or_result_files: list[str],
_model: list[str],
timeout: int,
parallel: int,
keep_output: bool,
):
"""
Run evals for gptme.
Expand Down Expand Up @@ -416,17 +420,14 @@ def main(
if not tests_to_run:
tests_to_run = tests_default

# Set the default_keep value based on the number of tests and the keep_output flag
StreamTee.default_keep = keep_output or len(tests_to_run) == 1

print("=== Running evals ===")
model_results = run_evals(tests_to_run, models, timeout, parallel)
print("\n=== Finished ===\n")

print("\n\n=== Model Results ===")
print("\n=== Model Results ===")
print_model_results(model_results)

print("\n\n=== Model Comparison ===")
print("\n=== Model Comparison ===")
print_model_results_table(model_results)

# Write results to CSV
Expand Down
2 changes: 1 addition & 1 deletion tests/test_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_eval(test):
See pytest_generate_tests() below.
"""
agent = GPTMe("openai/gpt-4o")
result = execute(test, agent, timeout=30)
result = execute(test, agent, timeout=30, parallel=False)
assert all(case.passed for case in result.results)


Expand Down

0 comments on commit eecabac

Please sign in to comment.