mirror of
https://github.com/Aider-AI/aider.git
synced 2025-05-20 12:24:59 +00:00
478 lines
13 KiB
Python
Executable file
478 lines
13 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
import datetime
|
|
import json
|
|
import os
|
|
import random
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import time
|
|
from collections import defaultdict
|
|
from json.decoder import JSONDecodeError
|
|
from pathlib import Path
|
|
|
|
import git
|
|
import lox
|
|
import typer
|
|
from rich.console import Console
|
|
|
|
from aider import models
|
|
from aider.coders import Coder
|
|
from aider.dump import dump # noqa: F401
|
|
from aider.io import InputOutput
|
|
|
|
DOCKER_IMAGE = "aider-pytest"
|
|
|
|
BENCHMARK_DNAME = Path("tmp.benchmark/.")
|
|
|
|
ORIGINAL_DNAME = BENCHMARK_DNAME / "practice/."
|
|
|
|
app = typer.Typer(add_completion=False, pretty_exceptions_enable=False)
|
|
|
|
|
|
@app.command()
|
|
def main(
|
|
dirname: str = typer.Argument(..., help="Directory name"),
|
|
model: str = typer.Option("gpt-3.5-turbo", "--model", "-m", help="Model name"),
|
|
edit_format: str = typer.Option(None, "--edit-format", "-e", help="Edit format"),
|
|
keyword: str = typer.Option(
|
|
None, "--keyword", "-k", help="Only run tests that contain keyword"
|
|
),
|
|
clean: bool = typer.Option(
|
|
False, "--clean", "-c", help="Discard the existing testdir and make a clean copy"
|
|
),
|
|
make_new: bool = typer.Option(False, "--new", "-n", help="Make a new dated testdir"),
|
|
no_unit_tests: bool = typer.Option(False, "--no-unit-tests", help="Do not run unit tests"),
|
|
verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
|
|
stats_only: bool = typer.Option(
|
|
False, "--stats", "-s", help="Do not run tests, just collect stats on completed tests"
|
|
),
|
|
tries: int = typer.Option(2, "--tries", "-r", help="Number of tries for running tests"),
|
|
threads: int = typer.Option(1, "--threads", "-t", help="Number of threads to run in parallel"),
|
|
num_tests: int = typer.Option(-1, "--num-tests", "-n", help="Number of tests to run"),
|
|
):
|
|
assert BENCHMARK_DNAME.exists() and BENCHMARK_DNAME.is_dir()
|
|
assert ORIGINAL_DNAME.exists() and ORIGINAL_DNAME.is_dir()
|
|
|
|
repo = git.Repo(search_parent_directories=True)
|
|
commit_hash = repo.head.object.hexsha[:7]
|
|
if repo.is_dirty():
|
|
commit_hash += "-dirty"
|
|
|
|
dirname = Path(dirname)
|
|
|
|
if len(dirname.parts) == 1:
|
|
priors = list(BENCHMARK_DNAME.glob(f"*--{dirname}"))
|
|
if len(priors) == 1 and stats_only:
|
|
dirname = priors[0].name
|
|
print(f"Using pre-existing {dirname}")
|
|
elif len(priors):
|
|
if not make_new:
|
|
print(f"Prior runs of {dirname} exist, use --new or name one explicitly")
|
|
print()
|
|
for prior in priors:
|
|
print(prior)
|
|
return
|
|
|
|
if not re.match(r"\d\d\d\d-\d\d-\d\d-", str(dirname)):
|
|
now = datetime.datetime.now()
|
|
now = now.strftime("%Y-%m-%d-%H-%M-%S--")
|
|
dirname = now + dirname.name
|
|
|
|
dirname = BENCHMARK_DNAME / dirname
|
|
|
|
dump(dirname)
|
|
|
|
if stats_only:
|
|
summarize_results(dirname)
|
|
return
|
|
|
|
build_docker()
|
|
|
|
if clean and dirname.exists():
|
|
print("Cleaning up and replacing", dirname)
|
|
dir_files = set(fn.name for fn in dirname.glob("*"))
|
|
original_files = set(fn.name for fn in ORIGINAL_DNAME.glob("*"))
|
|
if dir_files != original_files:
|
|
print("ERROR: will not delete dir that does not look like original tests", dirname)
|
|
return
|
|
|
|
dest = dirname.parent / "OLD" / dirname.name
|
|
if dest.exists():
|
|
old_now = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
|
|
dest = dirname.parent / "OLD" / (old_now + dirname.name)
|
|
|
|
dirname.rename(dest)
|
|
|
|
if not dirname.exists():
|
|
shutil.copytree(ORIGINAL_DNAME, dirname)
|
|
|
|
test_dnames = sorted(os.listdir(dirname))
|
|
|
|
if keyword:
|
|
test_dnames = [dn for dn in test_dnames if keyword in dn]
|
|
|
|
random.shuffle(test_dnames)
|
|
if num_tests > 0:
|
|
test_dnames = test_dnames[:num_tests]
|
|
|
|
if threads == 1:
|
|
all_results = []
|
|
for testname in test_dnames:
|
|
results = run_test(
|
|
dirname / testname,
|
|
model,
|
|
edit_format,
|
|
tries,
|
|
no_unit_tests,
|
|
verbose,
|
|
commit_hash,
|
|
)
|
|
|
|
all_results.append(results)
|
|
summarize_results(dirname)
|
|
else:
|
|
run_test_threaded = lox.thread(threads)(run_test)
|
|
for testname in test_dnames:
|
|
run_test_threaded.scatter(
|
|
dirname / testname,
|
|
model,
|
|
edit_format,
|
|
tries,
|
|
no_unit_tests,
|
|
verbose,
|
|
commit_hash,
|
|
)
|
|
all_results = run_test_threaded.gather(tqdm=True)
|
|
|
|
print()
|
|
print()
|
|
print()
|
|
summarize_results(dirname)
|
|
|
|
|
|
def summarize_results(dirname):
|
|
dirname = Path(dirname)
|
|
total_tests = len(list(dirname.glob("*")))
|
|
all_results = [json.loads(fname.read_text()) for fname in dirname.glob("*/.aider.results.json")]
|
|
|
|
completed_tests = 0
|
|
try:
|
|
tries = max(len(results["tests_outcomes"]) for results in all_results if results)
|
|
except ValueError:
|
|
tries = 0
|
|
|
|
passed_tests = [0] * tries
|
|
duration = 0
|
|
total_cost = 0
|
|
total_error_outputs = 0
|
|
total_user_asks = 0
|
|
|
|
variants = defaultdict(set)
|
|
|
|
for results in all_results:
|
|
if not results:
|
|
continue
|
|
|
|
completed_tests += 1
|
|
passed = results["tests_outcomes"][-1]
|
|
if passed:
|
|
for i in range(len(results["tests_outcomes"]) - 1, tries):
|
|
passed_tests[i] += 1
|
|
|
|
total_cost += results["cost"]
|
|
duration += results["duration"]
|
|
|
|
total_error_outputs += results.get("num_error_outputs", 0)
|
|
total_user_asks += results.get("num_user_asks", 0)
|
|
|
|
for key in "model edit_format commit_hash".split():
|
|
val = results.get(key)
|
|
variants[key].add(val)
|
|
|
|
if not completed_tests:
|
|
return
|
|
|
|
console = Console(highlight=False)
|
|
console.rule(title=str(dirname))
|
|
|
|
console.print(f"test-cases: {completed_tests}")
|
|
for key, val in variants.items():
|
|
if len(val) > 1:
|
|
style = "red"
|
|
else:
|
|
style = None
|
|
val = ", ".join(map(str, val))
|
|
console.print(f"{key}: {val}", style=style)
|
|
print("num_error_outputs:", total_error_outputs)
|
|
print("num_user_asks:", total_user_asks)
|
|
|
|
console.print()
|
|
for i in range(tries):
|
|
pass_rate = 100 * passed_tests[i] / completed_tests
|
|
console.print(f"{pass_rate:.1f}% correct after try {i}")
|
|
|
|
console.print()
|
|
avg_duration = duration / completed_tests
|
|
remaining_seconds = (total_tests - completed_tests) * avg_duration
|
|
remaining_minutes, remaining_seconds = divmod(remaining_seconds, 60)
|
|
|
|
console.print(
|
|
f"duration: {avg_duration:.1f} sec/test-case,"
|
|
f" {remaining_minutes:2.0f}:{remaining_seconds:02.0f} remaining"
|
|
)
|
|
|
|
avg_cost = total_cost / completed_tests
|
|
|
|
projected_cost = avg_cost * total_tests
|
|
|
|
console.print(
|
|
f"costs: ${avg_cost:.4f}/test-case, ${total_cost:.2f} total,"
|
|
f" ${projected_cost:.2f} projected"
|
|
)
|
|
|
|
console.rule()
|
|
|
|
|
|
def run_test(testdir, model_name, edit_format, tries, no_unit_tests, verbose, commit_hash):
|
|
if not os.path.isdir(testdir):
|
|
print("Not a dir:", testdir)
|
|
return
|
|
|
|
testdir = Path(testdir)
|
|
|
|
history_fname = testdir / ".aider.chat.history.md"
|
|
|
|
results_fname = testdir / ".aider.results.json"
|
|
if results_fname.exists():
|
|
try:
|
|
res = json.loads(results_fname.read_text())
|
|
return res
|
|
except JSONDecodeError:
|
|
print(f"{results_fname} failed to parse, skipping")
|
|
return
|
|
|
|
fnames = []
|
|
for fname in testdir.glob("*"):
|
|
if "test" not in fname.name and fname.is_file() and fname.name[0] != ".":
|
|
fnames.append(fname)
|
|
|
|
# restore the original file, in case we interrupted a prev run
|
|
# after it had saved changes
|
|
original_fname = ORIGINAL_DNAME / testdir.name / fname.name
|
|
shutil.copy(original_fname, fname)
|
|
|
|
file_list = " ".join(fname.name for fname in fnames)
|
|
intro = testdir / ".docs/introduction.md"
|
|
if intro.exists():
|
|
instructions = intro.read_text() + "\n\n"
|
|
else:
|
|
instructions = ""
|
|
instructions += (testdir / ".docs/instructions.md").read_text()
|
|
instructions += f"""
|
|
=====
|
|
Use the above instructions to modify the supplied files: {file_list}
|
|
Keep and implement the existing function or class stubs, they will be called from unit tests.
|
|
Only use standard python libraries, don't suggest installing any packages.
|
|
"""
|
|
|
|
io = InputOutput(
|
|
pretty=True,
|
|
yes=False,
|
|
chat_history_file=history_fname,
|
|
)
|
|
|
|
main_model = models.Model(model_name)
|
|
edit_format = edit_format or main_model.edit_format
|
|
|
|
dump(main_model)
|
|
dump(edit_format)
|
|
show_fnames = ",".join(map(str, fnames))
|
|
print("fnames:", show_fnames)
|
|
|
|
coder = Coder.create(
|
|
main_model,
|
|
edit_format,
|
|
io,
|
|
os.environ["OPENAI_API_KEY"],
|
|
fnames=fnames,
|
|
use_git=False,
|
|
stream=False,
|
|
pretty=False,
|
|
verbose=verbose,
|
|
)
|
|
|
|
dur = 0
|
|
test_outcomes = []
|
|
for i in range(tries):
|
|
start = time.time()
|
|
coder.run(with_message=instructions)
|
|
dur += time.time() - start
|
|
|
|
if coder.num_control_c:
|
|
raise KeyboardInterrupt
|
|
|
|
if no_unit_tests:
|
|
return
|
|
|
|
errors = run_unit_tests(testdir, history_fname)
|
|
|
|
if errors:
|
|
test_outcomes.append(False)
|
|
else:
|
|
test_outcomes.append(True)
|
|
break
|
|
|
|
errors = errors.splitlines()
|
|
print(errors[-1])
|
|
errors = errors[:50]
|
|
errors = "\n".join(errors)
|
|
instructions = errors
|
|
instructions += f"""
|
|
|
|
####
|
|
|
|
See the testing errors above.
|
|
The tests are correct.
|
|
Fix the code in {file_list} to resolve the errors.
|
|
"""
|
|
|
|
results = dict(
|
|
testdir=str(testdir),
|
|
testcase=testdir.name,
|
|
model=main_model.name,
|
|
edit_format=edit_format,
|
|
tests_outcomes=test_outcomes,
|
|
cost=coder.total_cost,
|
|
duration=dur,
|
|
commit_hash=commit_hash,
|
|
num_error_outputs=io.num_error_outputs,
|
|
num_user_asks=io.num_user_asks,
|
|
chat_hashes=list(
|
|
zip(
|
|
coder.chat_completion_call_hashes,
|
|
coder.chat_completion_response_hashes,
|
|
)
|
|
),
|
|
)
|
|
dump(results)
|
|
|
|
results_fname.write_text(json.dumps(results, indent=4))
|
|
|
|
return results
|
|
|
|
|
|
def run_unit_tests(testdir, history_fname):
|
|
test_files = [file for file in testdir.glob("*") if file.name.endswith("_test.py")]
|
|
assert len(test_files)
|
|
|
|
all_tests_passed = True
|
|
timeout = 60
|
|
for test_file in test_files:
|
|
dump(test_file)
|
|
|
|
command = [
|
|
"docker",
|
|
"run",
|
|
"-it",
|
|
"--rm",
|
|
"--interactive=false",
|
|
"-v",
|
|
f"{test_file.parent.absolute()}:/app",
|
|
DOCKER_IMAGE,
|
|
"bash",
|
|
"-c",
|
|
f"python -m unittest {test_file.name}",
|
|
]
|
|
print(" ".join(command))
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
timeout=timeout,
|
|
)
|
|
if result.returncode != 0:
|
|
all_tests_passed = False
|
|
print(f"Test {test_file} failed")
|
|
|
|
res = cleanup_test_output(result.stdout)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
all_tests_passed = False
|
|
res = f"Test {test_file} timed out after {timeout} seconds."
|
|
|
|
with history_fname.open("a") as fh:
|
|
fh.write(f"```\n{res}\n```")
|
|
|
|
if not all_tests_passed:
|
|
return res
|
|
|
|
|
|
def cleanup_test_output(output):
|
|
# remove timing info, to avoid randomizing the response to GPT
|
|
res = re.sub(
|
|
r"^Ran \d+ tests in \d+\.\d+s$",
|
|
"",
|
|
output,
|
|
flags=re.MULTILINE,
|
|
)
|
|
res = re.sub(
|
|
r"^====*$",
|
|
"====",
|
|
res,
|
|
flags=re.MULTILINE,
|
|
)
|
|
res = re.sub(
|
|
r"^----*$",
|
|
"----",
|
|
res,
|
|
flags=re.MULTILINE,
|
|
)
|
|
return res
|
|
|
|
|
|
def build_docker():
|
|
check_command = ["docker", "images", "-q", DOCKER_IMAGE]
|
|
check_result = subprocess.run(check_command, stdout=subprocess.PIPE, text=True)
|
|
if check_result.stdout.strip():
|
|
print(f"Docker image '{DOCKER_IMAGE}' already exists, skipping build.")
|
|
return
|
|
|
|
command = [
|
|
"docker",
|
|
"build",
|
|
"--quiet",
|
|
"-t",
|
|
DOCKER_IMAGE,
|
|
"-f",
|
|
"benchmark/Dockerfile",
|
|
"/dev/null",
|
|
]
|
|
print(" ".join(command))
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
res = result.stdout
|
|
print(res)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
res = f"Failed to build Docker image: {e.output}"
|
|
raise e
|
|
|
|
if result.returncode != 0:
|
|
raise Exception("Unable to build docker image")
|
|
|
|
return res
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app()
|