aider/scripts/benchmark.py
2023-06-25 17:54:41 -07:00

378 lines
10 KiB
Python

import argparse
import datetime
import json
import os
import random
import shutil
import subprocess
import time
from collections import defaultdict
from json.decoder import JSONDecodeError
from pathlib import Path
import git
import lox
from rich.console import Console
from aider import models
from aider.coders import Coder
from aider.dump import dump # noqa: F401
from aider.io import InputOutput
BENCHMARK_DNAME = Path("tmp.benchmark/.")
assert BENCHMARK_DNAME.exists() and BENCHMARK_DNAME.is_dir()
ORIGINAL_DNAME = BENCHMARK_DNAME / "practice/."
assert ORIGINAL_DNAME.exists() and ORIGINAL_DNAME.is_dir()
def main():
repo = git.Repo(search_parent_directories=True)
commit_hash = repo.head.object.hexsha[:7]
if repo.is_dirty():
commit_hash += "-dirty"
parser = argparse.ArgumentParser(description="Aider Benchmark")
parser.add_argument("dirname", type=str, help="Directory name")
parser.add_argument("--model", "-m", type=str, help="Model name", default="gpt-3.5-turbo")
parser.add_argument("--edit-format", "-e", type=str, help="Edit format")
parser.add_argument("--keyword", "-k", type=str, help="Only run tests that contain keyword")
parser.add_argument(
"--clean",
"-c",
action="store_true",
help="Discard the current testdir and make a clean copy",
)
parser.add_argument(
"--no-test",
action="store_true",
help="Do not run tests",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Verbose output",
)
parser.add_argument(
"--stats-only",
"-s",
action="store_true",
help="Do not run tests, just collect stats on completed tests",
)
parser.add_argument(
"--retries",
"-r",
type=int,
help="Number of retries for running tests",
default=2,
)
parser.add_argument(
"--threads",
"-t",
type=int,
help="Number of threads to run in parallel",
default=1,
)
parser.add_argument(
"--num-tests",
"-n",
type=int,
help="Number of tests to run",
default=-1,
)
args = parser.parse_args()
dirname = Path(args.dirname)
if args.clean and dirname.exists():
print("Cleaning up and replacing", dirname)
dir_files = set(fn.name for fn in dirname.glob("*"))
original_files = set(fn.name for fn in ORIGINAL_DNAME.glob("*"))
if dir_files != original_files:
print("ERROR: will not delete dir that does not look like original tests", dirname)
return
now = datetime.datetime.now()
now = now.strftime("%Y-%m-%d-%H-%M-%S-")
dest = dirname.parent / "OLD" / (now + dirname.name)
dirname.rename(dest)
if not dirname.exists():
shutil.copytree(ORIGINAL_DNAME, dirname)
test_dnames = sorted(os.listdir(dirname))
total_tests = len(test_dnames)
if args.keyword:
test_dnames = [dn for dn in test_dnames if args.keyword in dn]
random.shuffle(test_dnames)
if args.num_tests > 0:
test_dnames = test_dnames[: args.num_tests]
if args.threads == 1:
all_results = []
for testname in test_dnames:
results = run_test(
dirname / testname,
args.model,
args.edit_format,
args.retries,
args.no_test,
args.verbose,
args.stats_only,
commit_hash,
)
all_results.append(results)
if not args.stats_only:
summarize_results(dirname, all_results, total_tests)
else:
run_test_threaded = lox.thread(args.threads)(run_test)
for testname in test_dnames:
run_test_threaded.scatter(
dirname / testname,
args.model,
args.edit_format,
args.retries,
args.no_test,
args.verbose,
args.stats_only,
commit_hash,
)
all_results = run_test_threaded.gather(tqdm=True)
if not args.stats_only:
print()
print()
print()
summarize_results(dirname, all_results, total_tests)
def summarize_results(dirname, all_results, total_tests=None):
if not total_tests:
total_tests = len(all_results)
completed_tests = 0
try:
retries = max(len(results["tests_outcomes"]) for results in all_results if results)
except ValueError:
retries = 0
passed_tests = [0] * retries
duration = 0
total_cost = 0
variants = defaultdict(set)
for results in all_results:
if not results:
continue
completed_tests += 1
passed = results["tests_outcomes"][-1]
if passed:
for i in range(len(results["tests_outcomes"]) - 1, retries):
passed_tests[i] += 1
total_cost += results["cost"]
duration += results["duration"]
for key in "model edit_format commit_hash".split():
val = results.get(key)
variants[key].add(val)
if not completed_tests:
return
console = Console(highlight=False)
console.rule(title=str(dirname))
console.print(f"test-cases: {completed_tests}")
for key, val in variants.items():
if len(val) > 1:
style = "red"
else:
style = None
val = ", ".join(map(str, val))
console.print(f"{key}: {val}", style=style)
console.print()
for i in range(retries):
pass_rate = 100 * passed_tests[i] / completed_tests
console.print(f"{pass_rate:.1f}% correct after try {i}")
console.print()
avg_duration = duration / completed_tests
console.print(f"duration: {avg_duration:.1f} sec/test-case")
avg_cost = total_cost / completed_tests
projected_cost = avg_cost * total_tests
console.print(
f"costs: ${avg_cost:.4f}/test-case, ${total_cost:.2f} total,"
f" ${projected_cost:.2f} projected"
)
console.rule()
def run_test(testdir, model_name, edit_format, retries, no_test, verbose, stats_only, commit_hash):
if not stats_only:
dump(testdir)
if not os.path.isdir(testdir):
print("Not a dir:", testdir)
return
testdir = Path(testdir)
history_fname = testdir / ".aider.chat.history.md"
results_fname = testdir / ".aider.results.json"
if results_fname.exists():
try:
res = json.loads(results_fname.read_text())
return res
except JSONDecodeError:
print(f"{results_fname} failed to parse, skipping")
return
if stats_only:
return
fnames = []
for fname in testdir.glob("*"):
if "test" not in fname.name and fname.is_file() and fname.name[0] != ".":
fnames.append(fname)
# restore the original file, in case we interrupted a prev run
# after it had saved changes
original_fname = ORIGINAL_DNAME / testdir.name / fname.name
shutil.copy(original_fname, fname)
file_list = " ".join(fname.name for fname in fnames)
intro = testdir / ".docs/introduction.md"
if intro.exists():
instructions = intro.read_text() + "\n\n"
else:
instructions = ""
instructions += (testdir / ".docs/instructions.md").read_text()
instructions += f"""
=====
Use the above instructions to modify the supplied files: {file_list}
Use the existing function or class stubs as the entrypoint.
Only use standard python libraries, don't suggest installing any packages.
"""
io = InputOutput(
pretty=True,
yes=False,
chat_history_file=history_fname,
)
main_model = models.Model(model_name)
edit_format = edit_format or main_model.edit_format
dump(main_model)
dump(edit_format)
show_fnames = ",".join(map(str, fnames))
print("fnames:", show_fnames)
coder = Coder.create(
main_model,
edit_format,
io,
os.environ["OPENAI_API_KEY"],
fnames=fnames,
use_git=False,
stream=False,
pretty=False,
verbose=verbose,
)
dur = 0
test_outcomes = []
for i in range(retries):
start = time.time()
coder.run(with_message=instructions)
dur += time.time() - start
if coder.num_control_c:
raise KeyboardInterrupt
if no_test:
return
errors = run_pytests(testdir, history_fname)
if errors:
test_outcomes.append(False)
else:
test_outcomes.append(True)
break
errors = errors.splitlines()
print(errors[-1])
errors = errors[:50]
errors = "\n".join(errors)
instructions = errors
instructions += (
"\n\n####\n\nFix the code in {file_list} to resolve the test failures above."
)
results = dict(
testdir=str(testdir),
testcase=testdir.name,
model=main_model.name,
edit_format=edit_format,
tests_outcomes=test_outcomes,
cost=coder.total_cost,
duration=dur,
commit_hash=commit_hash,
)
dump(results)
results_fname.write_text(json.dumps(results, indent=4))
return results
def run_pytests(testdir, history_fname):
test_files = [file for file in testdir.glob("*") if file.name.endswith("_test.py")]
assert len(test_files)
all_tests_passed = True
timeout = 60
for test_file in test_files:
dump(test_file)
try:
result = subprocess.run(
["pytest", test_file],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=timeout,
)
if result.returncode != 0:
all_tests_passed = False
print(f"Test {test_file} failed")
res = result.stdout
except subprocess.TimeoutExpired:
all_tests_passed = False
res = f"Test {test_file} timed out after {timeout} seconds."
with history_fname.open("a") as fh:
fh.write(f"```\n{res}\n```")
if not all_tests_passed:
return res
if __name__ == "__main__":
main()