This commit is contained in:
yozerpp 2025-04-25 18:40:16 +03:00
commit dbdf741264
340 changed files with 22361 additions and 3227 deletions

0
tests/basic/__init__.py Normal file
View file

View file

@ -0,0 +1,169 @@
import os
from unittest.mock import patch
from aider.models import Model
class TestAWSCredentials:
"""Test AWS credential handling, especially AWS_PROFILE."""
def test_bedrock_model_with_aws_profile(self):
"""Test that Bedrock models accept AWS_PROFILE as valid authentication."""
# Save original environment
original_env = os.environ.copy()
try:
# Set up test environment
os.environ.clear()
os.environ["AWS_PROFILE"] = "test-profile"
# Create a model instance
with patch("aider.llm.litellm.validate_environment") as mock_validate:
# Mock the litellm validate_environment to return missing AWS keys
mock_validate.return_value = {
"missing_keys": ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"],
"keys_in_environment": False,
}
# Test with a bedrock model
model = Model("bedrock/anthropic.claude-v2")
# Check that the AWS keys were removed from missing_keys
assert "AWS_ACCESS_KEY_ID" not in model.missing_keys
assert "AWS_SECRET_ACCESS_KEY" not in model.missing_keys
# With no missing keys, validation should pass
assert model.keys_in_environment
finally:
# Restore original environment
os.environ.clear()
os.environ.update(original_env)
def test_us_anthropic_model_with_aws_profile(self):
"""Test that us.anthropic models accept AWS_PROFILE as valid authentication."""
# Save original environment
original_env = os.environ.copy()
try:
# Set up test environment
os.environ.clear()
os.environ["AWS_PROFILE"] = "test-profile"
# Create a model instance
with patch("aider.llm.litellm.validate_environment") as mock_validate:
# Mock the litellm validate_environment to return missing AWS keys
mock_validate.return_value = {
"missing_keys": ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"],
"keys_in_environment": False,
}
# Test with a us.anthropic model
model = Model("us.anthropic.claude-3-7-sonnet-20250219-v1:0")
# Check that the AWS keys were removed from missing_keys
assert "AWS_ACCESS_KEY_ID" not in model.missing_keys
assert "AWS_SECRET_ACCESS_KEY" not in model.missing_keys
# With no missing keys, validation should pass
assert model.keys_in_environment
finally:
# Restore original environment
os.environ.clear()
os.environ.update(original_env)
def test_non_bedrock_model_with_aws_profile(self):
"""Test that non-Bedrock models do not accept AWS_PROFILE for AWS credentials."""
# Save original environment
original_env = os.environ.copy()
try:
# Set up test environment
os.environ.clear()
os.environ["AWS_PROFILE"] = "test-profile"
# Create a model instance
with patch("aider.llm.litellm.validate_environment") as mock_validate:
# Mock the litellm validate_environment to return missing AWS keys
mock_validate.return_value = {
"missing_keys": ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"],
"keys_in_environment": False,
}
# Test with a non-Bedrock model
model = Model("gpt-4")
# For non-Bedrock models, AWS credential keys should remain in missing_keys
assert "AWS_ACCESS_KEY_ID" in model.missing_keys
assert "AWS_SECRET_ACCESS_KEY" in model.missing_keys
# With missing keys, validation should fail
assert not model.keys_in_environment
finally:
# Restore original environment
os.environ.clear()
os.environ.update(original_env)
def test_bedrock_model_without_aws_profile(self):
"""Test that Bedrock models require credentials when AWS_PROFILE is not set."""
# Save original environment
original_env = os.environ.copy()
try:
# Set up test environment
os.environ.clear()
# Create a model instance
with patch("aider.llm.litellm.validate_environment") as mock_validate:
# Mock the litellm validate_environment to return missing AWS keys
mock_validate.return_value = {
"missing_keys": ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"],
"keys_in_environment": False,
}
# Test with a bedrock model without AWS_PROFILE
model = Model("bedrock/anthropic.claude-v2")
# Without AWS_PROFILE, AWS credential keys should remain in missing_keys
assert "AWS_ACCESS_KEY_ID" in model.missing_keys
assert "AWS_SECRET_ACCESS_KEY" in model.missing_keys
# With missing keys, validation should fail
assert not model.keys_in_environment
finally:
# Restore original environment
os.environ.clear()
os.environ.update(original_env)
def test_mixed_missing_keys_with_aws_profile(self):
"""Test that only AWS credential keys are affected by AWS_PROFILE."""
# Save original environment
original_env = os.environ.copy()
try:
# Set up test environment
os.environ.clear()
os.environ["AWS_PROFILE"] = "test-profile"
# Create a model instance
with patch("aider.llm.litellm.validate_environment") as mock_validate:
# Mock the litellm validate_environment to return missing AWS keys and another key
mock_validate.return_value = {
"missing_keys": ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "ANOTHER_KEY"],
"keys_in_environment": False,
}
# Test with a bedrock model
model = Model("bedrock/anthropic.claude-v2")
# AWS credential keys should be removed from missing_keys
assert "AWS_ACCESS_KEY_ID" not in model.missing_keys
assert "AWS_SECRET_ACCESS_KEY" not in model.missing_keys
# But other keys should remain
assert "ANOTHER_KEY" in model.missing_keys
# With other missing keys, validation should still fail
assert not model.keys_in_environment
finally:
# Restore original environment
os.environ.clear()
os.environ.update(original_env)

View file

@ -37,7 +37,9 @@ class TestCoder(unittest.TestCase):
repo.git.commit("-m", "init")
# YES!
io = InputOutput(yes=True)
# Use a completely mocked IO object instead of a real one
io = MagicMock()
io.confirm_ask = MagicMock(return_value=True)
coder = Coder.create(self.GPT35, None, io, fnames=["added.txt"])
self.assertTrue(coder.allowed_to_edit("added.txt"))
@ -192,8 +194,8 @@ class TestCoder(unittest.TestCase):
mock.return_value = set([str(fname1), str(fname2), str(fname3)])
coder.repo.get_tracked_files = mock
# Check that file mentions skip files with duplicate basenames
mentioned = coder.get_file_mentions(f"Check {fname2} and {fname3}")
# Check that file mentions of a pure basename skips files with duplicate basenames
mentioned = coder.get_file_mentions(f"Check {fname2.name} and {fname3}")
self.assertEqual(mentioned, {str(fname3)})
# Add a read-only file with same basename
@ -283,6 +285,126 @@ class TestCoder(unittest.TestCase):
self.assertEqual(coder.abs_fnames, set([str(fname.resolve())]))
def test_get_file_mentions_various_formats(self):
with GitTemporaryDirectory():
io = InputOutput(pretty=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
# Create test files
test_files = [
"file1.txt",
"file2.py",
"dir/nested_file.js",
"dir/subdir/deep_file.html",
"file99.txt",
"special_chars!@#.md",
]
# Pre-format the Windows path to avoid backslash issues in f-string expressions
windows_path = test_files[2].replace("/", "\\")
win_path3 = test_files[3].replace("/", "\\")
for fname in test_files:
fpath = Path(fname)
fpath.parent.mkdir(parents=True, exist_ok=True)
fpath.touch()
# Mock get_addable_relative_files to return our test files
coder.get_addable_relative_files = MagicMock(return_value=set(test_files))
# Test different mention formats
test_cases = [
# Simple plain text mentions
(f"You should edit {test_files[0]} first", {test_files[0]}),
# Multiple files in plain text
(f"Edit both {test_files[0]} and {test_files[1]}", {test_files[0], test_files[1]}),
# Files in backticks
(f"Check the file `{test_files[2]}`", {test_files[2]}),
# Files in code blocks
(f"```\n{test_files[3]}\n```", {test_files[3]}),
# Files in code blocks with language specifier
# (
# f"```python\nwith open('{test_files[1]}', 'r') as f:\n"
# f" data = f.read()\n```",
# {test_files[1]},
# ),
# Files with Windows-style paths
(f"Edit the file {windows_path}", {test_files[2]}),
# Files with different quote styles
(f'Check "{test_files[5]}" now', {test_files[5]}),
# All files in one complex message
(
(
f"First, edit `{test_files[0]}`. Then modify {test_files[1]}.\n"
f"```js\n// Update this file\nconst file = '{test_files[2]}';\n```\n"
f"Finally check {win_path3}"
),
{test_files[0], test_files[1], test_files[2], test_files[3]},
),
# Files mentioned in markdown bold format
(f"You should check **{test_files[0]}** for issues", {test_files[0]}),
(
f"Look at both **{test_files[1]}** and **{test_files[2]}**",
{test_files[1], test_files[2]},
),
(
f"The file **{win_path3}** needs updating",
{test_files[3]},
),
(
f"Files to modify:\n- **{test_files[0]}**\n- **{test_files[4]}**",
{test_files[0], test_files[4]},
),
]
for content, expected_mentions in test_cases:
with self.subTest(content=content):
mentioned_files = coder.get_file_mentions(content)
self.assertEqual(
mentioned_files,
expected_mentions,
f"Failed to extract mentions from: {content}",
)
def test_get_file_mentions_multiline_backticks(self):
with GitTemporaryDirectory():
io = InputOutput(pretty=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
# Create test files
test_files = [
"swebench/harness/test_spec/python.py",
"swebench/harness/test_spec/javascript.py",
]
for fname in test_files:
fpath = Path(fname)
fpath.parent.mkdir(parents=True, exist_ok=True)
fpath.touch()
# Mock get_addable_relative_files to return our test files
coder.get_addable_relative_files = MagicMock(return_value=set(test_files))
# Input text with multiline backticked filenames
content = """
Could you please **add the following files to the chat**?
1. `swebench/harness/test_spec/python.py`
2. `swebench/harness/test_spec/javascript.py`
Once I have these, I can show you precisely how to do the thing.
"""
expected_mentions = {
"swebench/harness/test_spec/python.py",
"swebench/harness/test_spec/javascript.py",
}
mentioned_files = coder.get_file_mentions(content)
self.assertEqual(
mentioned_files,
expected_mentions,
f"Failed to extract mentions from multiline backticked content: {content}",
)
def test_get_file_mentions_path_formats(self):
with GitTemporaryDirectory():
io = InputOutput(pretty=False, yes=True)
@ -1059,6 +1181,112 @@ This command will print 'Hello, World!' to the console."""
sanity_check_messages(coder.cur_messages)
self.assertEqual(coder.cur_messages[-1]["role"], "assistant")
def test_architect_coder_auto_accept_true(self):
with GitTemporaryDirectory():
io = InputOutput(yes=True)
io.confirm_ask = MagicMock(return_value=True)
# Create an ArchitectCoder with auto_accept_architect=True
with patch("aider.coders.architect_coder.AskCoder.__init__", return_value=None):
from aider.coders.architect_coder import ArchitectCoder
coder = ArchitectCoder()
coder.io = io
coder.main_model = self.GPT35
coder.auto_accept_architect = True
coder.verbose = False
coder.total_cost = 0
coder.cur_messages = []
coder.done_messages = []
coder.summarizer = MagicMock()
coder.summarizer.too_big.return_value = False
# Mock editor_coder creation and execution
mock_editor = MagicMock()
with patch("aider.coders.architect_coder.Coder.create", return_value=mock_editor):
# Set partial response content
coder.partial_response_content = "Make these changes to the code"
# Call reply_completed
coder.reply_completed()
# Verify that confirm_ask was not called (auto-accepted)
io.confirm_ask.assert_not_called()
# Verify that editor coder was created and run
mock_editor.run.assert_called_once()
def test_architect_coder_auto_accept_false_confirmed(self):
with GitTemporaryDirectory():
io = InputOutput(yes=False)
io.confirm_ask = MagicMock(return_value=True)
# Create an ArchitectCoder with auto_accept_architect=False
with patch("aider.coders.architect_coder.AskCoder.__init__", return_value=None):
from aider.coders.architect_coder import ArchitectCoder
coder = ArchitectCoder()
coder.io = io
coder.main_model = self.GPT35
coder.auto_accept_architect = False
coder.verbose = False
coder.total_cost = 0
coder.cur_messages = []
coder.done_messages = []
coder.summarizer = MagicMock()
coder.summarizer.too_big.return_value = False
coder.cur_messages = []
coder.done_messages = []
coder.summarizer = MagicMock()
coder.summarizer.too_big.return_value = False
# Mock editor_coder creation and execution
mock_editor = MagicMock()
with patch("aider.coders.architect_coder.Coder.create", return_value=mock_editor):
# Set partial response content
coder.partial_response_content = "Make these changes to the code"
# Call reply_completed
coder.reply_completed()
# Verify that confirm_ask was called
io.confirm_ask.assert_called_once_with("Edit the files?")
# Verify that editor coder was created and run
mock_editor.run.assert_called_once()
def test_architect_coder_auto_accept_false_rejected(self):
with GitTemporaryDirectory():
io = InputOutput(yes=False)
io.confirm_ask = MagicMock(return_value=False)
# Create an ArchitectCoder with auto_accept_architect=False
with patch("aider.coders.architect_coder.AskCoder.__init__", return_value=None):
from aider.coders.architect_coder import ArchitectCoder
coder = ArchitectCoder()
coder.io = io
coder.main_model = self.GPT35
coder.auto_accept_architect = False
coder.verbose = False
coder.total_cost = 0
# Mock editor_coder creation and execution
mock_editor = MagicMock()
with patch("aider.coders.architect_coder.Coder.create", return_value=mock_editor):
# Set partial response content
coder.partial_response_content = "Make these changes to the code"
# Call reply_completed
coder.reply_completed()
# Verify that confirm_ask was called
io.confirm_ask.assert_called_once_with("Edit the files?")
# Verify that editor coder was NOT created or run
# (because user rejected the changes)
mock_editor.run.assert_not_called()
if __name__ == "__main__":
unittest.main()

View file

@ -1124,6 +1124,29 @@ class TestCommands(TestCase):
# Check that the output was added to cur_messages
self.assertTrue(any("exit 1" in msg["content"] for msg in coder.cur_messages))
def test_cmd_test_returns_output_on_failure(self):
with ChdirTemporaryDirectory():
io = InputOutput(pretty=False, fancy_input=False, yes=False)
from aider.coders import Coder
coder = Coder.create(self.GPT35, None, io)
commands = Commands(io, coder)
# Define a command that prints to stderr and exits with non-zero status
test_cmd = "echo 'error output' >&2 && exit 1"
expected_output_fragment = "error output"
# Run cmd_test
result = commands.cmd_test(test_cmd)
# Assert that the result contains the expected output
self.assertIsNotNone(result)
self.assertIn(expected_output_fragment, result)
# Check that the output was also added to cur_messages
self.assertTrue(
any(expected_output_fragment in msg["content"] for msg in coder.cur_messages)
)
def test_cmd_add_drop_untracked_files(self):
with GitTemporaryDirectory():
repo = git.Repo()
@ -1283,6 +1306,38 @@ class TestCommands(TestCase):
# Verify the file was not added
self.assertEqual(len(coder.abs_fnames), 0)
def test_cmd_think_tokens(self):
io = InputOutput(pretty=False, fancy_input=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
commands = Commands(io, coder)
# Test with various formats
test_values = {
"8k": 8192, # 8 * 1024
"10.5k": 10752, # 10.5 * 1024
"512k": 524288, # 0.5 * 1024 * 1024
}
for input_value, expected_tokens in test_values.items():
with mock.patch.object(io, "tool_output") as mock_tool_output:
commands.cmd_think_tokens(input_value)
# Check that the model's thinking tokens were updated
self.assertEqual(
coder.main_model.extra_params["thinking"]["budget_tokens"], expected_tokens
)
# Check that the tool output shows the correct value with format
# Use the actual input_value (not normalized) in the assertion
mock_tool_output.assert_any_call(
f"Set thinking token budget to {expected_tokens:,} tokens ({input_value})."
)
# Test with no value provided - should display current value
with mock.patch.object(io, "tool_output") as mock_tool_output:
commands.cmd_think_tokens("")
mock_tool_output.assert_any_call(mock.ANY) # Just verify it calls tool_output
def test_cmd_add_aiderignored_file(self):
with GitTemporaryDirectory():
repo = git.Repo()
@ -1632,6 +1687,98 @@ class TestCommands(TestCase):
self.assertIn("-Further modified content", diff_output)
self.assertIn("+Final modified content", diff_output)
def test_cmd_model(self):
io = InputOutput(pretty=False, fancy_input=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
commands = Commands(io, coder)
# Test switching the main model
with self.assertRaises(SwitchCoder) as context:
commands.cmd_model("gpt-4")
# Check that the SwitchCoder exception contains the correct model configuration
self.assertEqual(context.exception.kwargs.get("main_model").name, "gpt-4")
self.assertEqual(
context.exception.kwargs.get("main_model").editor_model.name,
self.GPT35.editor_model.name,
)
self.assertEqual(
context.exception.kwargs.get("main_model").weak_model.name, self.GPT35.weak_model.name
)
# Check that the edit format is updated to the new model's default
self.assertEqual(context.exception.kwargs.get("edit_format"), "diff")
def test_cmd_model_preserves_explicit_edit_format(self):
io = InputOutput(pretty=False, fancy_input=False, yes=True)
# Use gpt-3.5-turbo (default 'diff')
coder = Coder.create(self.GPT35, None, io)
# Explicitly set edit format to something else
coder.edit_format = "udiff"
commands = Commands(io, coder)
# Mock sanity check to avoid network calls
with mock.patch("aider.models.sanity_check_models"):
# Test switching the main model to gpt-4 (default 'whole')
with self.assertRaises(SwitchCoder) as context:
commands.cmd_model("gpt-4")
# Check that the SwitchCoder exception contains the correct model configuration
self.assertEqual(context.exception.kwargs.get("main_model").name, "gpt-4")
# Check that the edit format is preserved
self.assertEqual(context.exception.kwargs.get("edit_format"), "udiff")
def test_cmd_editor_model(self):
io = InputOutput(pretty=False, fancy_input=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
commands = Commands(io, coder)
# Test switching the editor model
with self.assertRaises(SwitchCoder) as context:
commands.cmd_editor_model("gpt-4")
# Check that the SwitchCoder exception contains the correct model configuration
self.assertEqual(context.exception.kwargs.get("main_model").name, self.GPT35.name)
self.assertEqual(context.exception.kwargs.get("main_model").editor_model.name, "gpt-4")
self.assertEqual(
context.exception.kwargs.get("main_model").weak_model.name, self.GPT35.weak_model.name
)
def test_cmd_weak_model(self):
io = InputOutput(pretty=False, fancy_input=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
commands = Commands(io, coder)
# Test switching the weak model
with self.assertRaises(SwitchCoder) as context:
commands.cmd_weak_model("gpt-4")
# Check that the SwitchCoder exception contains the correct model configuration
self.assertEqual(context.exception.kwargs.get("main_model").name, self.GPT35.name)
self.assertEqual(
context.exception.kwargs.get("main_model").editor_model.name,
self.GPT35.editor_model.name,
)
self.assertEqual(context.exception.kwargs.get("main_model").weak_model.name, "gpt-4")
def test_cmd_model_updates_default_edit_format(self):
io = InputOutput(pretty=False, fancy_input=False, yes=True)
# Use gpt-3.5-turbo (default 'diff')
coder = Coder.create(self.GPT35, None, io)
# Ensure current edit format is the default
self.assertEqual(coder.edit_format, self.GPT35.edit_format)
commands = Commands(io, coder)
# Mock sanity check to avoid network calls
with mock.patch("aider.models.sanity_check_models"):
# Test switching the main model to gpt-4 (default 'whole')
with self.assertRaises(SwitchCoder) as context:
commands.cmd_model("gpt-4")
# Check that the SwitchCoder exception contains the correct model configuration
self.assertEqual(context.exception.kwargs.get("main_model").name, "gpt-4")
# Check that the edit format is updated to the new model's default
self.assertEqual(context.exception.kwargs.get("edit_format"), "diff")
def test_cmd_ask(self):
io = InputOutput(pretty=False, fancy_input=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
@ -1722,6 +1869,213 @@ class TestCommands(TestCase):
del coder
del commands
def test_reset_with_original_read_only_files(self):
with GitTemporaryDirectory() as repo_dir:
io = InputOutput(pretty=False, fancy_input=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
# Create test files
orig_read_only = Path(repo_dir) / "orig_read_only.txt"
orig_read_only.write_text("Original read-only file")
added_file = Path(repo_dir) / "added_file.txt"
added_file.write_text("Added file")
added_read_only = Path(repo_dir) / "added_read_only.txt"
added_read_only.write_text("Added read-only file")
# Initialize commands with original read-only files
commands = Commands(io, coder, original_read_only_fnames=[str(orig_read_only)])
# Add files to the chat
coder.abs_read_only_fnames.add(str(orig_read_only))
coder.abs_fnames.add(str(added_file))
coder.abs_read_only_fnames.add(str(added_read_only))
# Add some messages to the chat history
coder.cur_messages = [{"role": "user", "content": "Test message"}]
coder.done_messages = [{"role": "assistant", "content": "Test response"}]
# Verify initial state
self.assertEqual(len(coder.abs_fnames), 1)
self.assertEqual(len(coder.abs_read_only_fnames), 2)
self.assertEqual(len(coder.cur_messages), 1)
self.assertEqual(len(coder.done_messages), 1)
# Test reset command
commands.cmd_reset("")
# Verify that original read-only file is preserved
# but other files and messages are cleared
self.assertEqual(len(coder.abs_fnames), 0)
self.assertEqual(len(coder.abs_read_only_fnames), 1)
self.assertIn(str(orig_read_only), coder.abs_read_only_fnames)
self.assertNotIn(str(added_read_only), coder.abs_read_only_fnames)
# Chat history should be cleared
self.assertEqual(len(coder.cur_messages), 0)
self.assertEqual(len(coder.done_messages), 0)
def test_reset_with_no_original_read_only_files(self):
with GitTemporaryDirectory() as repo_dir:
io = InputOutput(pretty=False, fancy_input=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
# Create test files
added_file = Path(repo_dir) / "added_file.txt"
added_file.write_text("Added file")
added_read_only = Path(repo_dir) / "added_read_only.txt"
added_read_only.write_text("Added read-only file")
# Initialize commands with no original read-only files
commands = Commands(io, coder)
# Add files to the chat
coder.abs_fnames.add(str(added_file))
coder.abs_read_only_fnames.add(str(added_read_only))
# Add some messages to the chat history
coder.cur_messages = [{"role": "user", "content": "Test message"}]
coder.done_messages = [{"role": "assistant", "content": "Test response"}]
# Verify initial state
self.assertEqual(len(coder.abs_fnames), 1)
self.assertEqual(len(coder.abs_read_only_fnames), 1)
self.assertEqual(len(coder.cur_messages), 1)
self.assertEqual(len(coder.done_messages), 1)
# Test reset command
commands.cmd_reset("")
# Verify that all files and messages are cleared
self.assertEqual(len(coder.abs_fnames), 0)
self.assertEqual(len(coder.abs_read_only_fnames), 0)
self.assertEqual(len(coder.cur_messages), 0)
self.assertEqual(len(coder.done_messages), 0)
def test_cmd_reasoning_effort(self):
io = InputOutput(pretty=False, fancy_input=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
commands = Commands(io, coder)
# Test with numeric values
with mock.patch.object(io, "tool_output") as mock_tool_output:
commands.cmd_reasoning_effort("0.8")
mock_tool_output.assert_any_call("Set reasoning effort to 0.8")
# Test with text values (low/medium/high)
for effort_level in ["low", "medium", "high"]:
with mock.patch.object(io, "tool_output") as mock_tool_output:
commands.cmd_reasoning_effort(effort_level)
mock_tool_output.assert_any_call(f"Set reasoning effort to {effort_level}")
# Check model's reasoning effort was updated
with mock.patch.object(coder.main_model, "set_reasoning_effort") as mock_set_effort:
commands.cmd_reasoning_effort("0.5")
mock_set_effort.assert_called_once_with("0.5")
# Test with no value provided - should display current value
with mock.patch.object(io, "tool_output") as mock_tool_output:
commands.cmd_reasoning_effort("")
mock_tool_output.assert_any_call("Current reasoning effort: high")
def test_drop_with_original_read_only_files(self):
with GitTemporaryDirectory() as repo_dir:
io = InputOutput(pretty=False, fancy_input=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
# Create test files
orig_read_only = Path(repo_dir) / "orig_read_only.txt"
orig_read_only.write_text("Original read-only file")
added_file = Path(repo_dir) / "added_file.txt"
added_file.write_text("Added file")
added_read_only = Path(repo_dir) / "added_read_only.txt"
added_read_only.write_text("Added read-only file")
# Initialize commands with original read-only files
commands = Commands(io, coder, original_read_only_fnames=[str(orig_read_only)])
# Add files to the chat
coder.abs_read_only_fnames.add(str(orig_read_only))
coder.abs_fnames.add(str(added_file))
coder.abs_read_only_fnames.add(str(added_read_only))
# Verify initial state
self.assertEqual(len(coder.abs_fnames), 1)
self.assertEqual(len(coder.abs_read_only_fnames), 2)
# Test bare drop command
with mock.patch.object(io, "tool_output") as mock_tool_output:
commands.cmd_drop("")
mock_tool_output.assert_called_with(
"Dropping all files from the chat session except originally read-only files."
)
# Verify that original read-only file is preserved, but other files are dropped
self.assertEqual(len(coder.abs_fnames), 0)
self.assertEqual(len(coder.abs_read_only_fnames), 1)
self.assertIn(str(orig_read_only), coder.abs_read_only_fnames)
self.assertNotIn(str(added_read_only), coder.abs_read_only_fnames)
def test_drop_specific_original_read_only_file(self):
with GitTemporaryDirectory() as repo_dir:
io = InputOutput(pretty=False, fancy_input=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
# Create test file
orig_read_only = Path(repo_dir) / "orig_read_only.txt"
orig_read_only.write_text("Original read-only file")
# Initialize commands with original read-only files
commands = Commands(io, coder, original_read_only_fnames=[str(orig_read_only)])
# Add file to the chat
coder.abs_read_only_fnames.add(str(orig_read_only))
# Verify initial state
self.assertEqual(len(coder.abs_read_only_fnames), 1)
# Test specific drop command
commands.cmd_drop("orig_read_only.txt")
# Verify that the original read-only file is dropped when specified explicitly
self.assertEqual(len(coder.abs_read_only_fnames), 0)
def test_drop_with_no_original_read_only_files(self):
with GitTemporaryDirectory() as repo_dir:
io = InputOutput(pretty=False, fancy_input=False, yes=True)
coder = Coder.create(self.GPT35, None, io)
# Create test files
added_file = Path(repo_dir) / "added_file.txt"
added_file.write_text("Added file")
added_read_only = Path(repo_dir) / "added_read_only.txt"
added_read_only.write_text("Added read-only file")
# Initialize commands with no original read-only files
commands = Commands(io, coder)
# Add files to the chat
coder.abs_fnames.add(str(added_file))
coder.abs_read_only_fnames.add(str(added_read_only))
# Verify initial state
self.assertEqual(len(coder.abs_fnames), 1)
self.assertEqual(len(coder.abs_read_only_fnames), 1)
# Test bare drop command
with mock.patch.object(io, "tool_output") as mock_tool_output:
commands.cmd_drop("")
mock_tool_output.assert_called_with("Dropping all files from the chat session.")
# Verify that all files are dropped
self.assertEqual(len(coder.abs_fnames), 0)
self.assertEqual(len(coder.abs_read_only_fnames), 0)
def test_cmd_load_with_switch_coder(self):
with GitTemporaryDirectory() as repo_dir:
io = InputOutput(pretty=False, fancy_input=False, yes=True)

View file

@ -0,0 +1,140 @@
import os
from unittest import TestCase
from unittest.mock import MagicMock, patch
from prompt_toolkit.input import DummyInput
from prompt_toolkit.output import DummyOutput
from aider.deprecated import handle_deprecated_model_args
from aider.dump import dump # noqa
from aider.main import main
class TestDeprecated(TestCase):
def setUp(self):
self.original_env = os.environ.copy()
os.environ["OPENAI_API_KEY"] = "deadbeef"
os.environ["AIDER_CHECK_UPDATE"] = "false"
os.environ["AIDER_ANALYTICS"] = "false"
def tearDown(self):
os.environ.clear()
os.environ.update(self.original_env)
@patch("aider.io.InputOutput.tool_warning")
@patch("aider.io.InputOutput.offer_url")
def test_deprecated_args_show_warnings(self, mock_offer_url, mock_tool_warning):
# Prevent URL launches during tests
mock_offer_url.return_value = False
# Test all deprecated flags to ensure they show warnings
deprecated_flags = [
"--opus",
"--sonnet",
"--haiku",
"--4",
"-4",
"--4o",
"--mini",
"--4-turbo",
"--35turbo",
"--35-turbo",
"--3",
"-3",
"--deepseek",
"--o1-mini",
"--o1-preview",
]
for flag in deprecated_flags:
mock_tool_warning.reset_mock()
with patch("aider.models.Model"), self.subTest(flag=flag):
main(
[flag, "--no-git", "--exit", "--yes"], input=DummyInput(), output=DummyOutput()
)
# Look for the deprecation warning in all calls
deprecation_warning = None
dump(flag)
dump(mock_tool_warning.call_args_list)
for call_args in mock_tool_warning.call_args_list:
dump(call_args)
if "deprecated" in call_args[0][0]:
deprecation_warning = call_args[0][0]
break
self.assertIsNotNone(
deprecation_warning, f"No deprecation warning found for {flag}"
)
warning_msg = deprecation_warning
self.assertIn("deprecated", warning_msg)
self.assertIn("use --model", warning_msg.lower())
@patch("aider.io.InputOutput.tool_warning")
@patch("aider.io.InputOutput.offer_url")
def test_model_alias_in_warning(self, mock_offer_url, mock_tool_warning):
# Prevent URL launches during tests
mock_offer_url.return_value = False
# Test that the warning uses the model alias if available
with patch("aider.models.MODEL_ALIASES", {"gpt4": "gpt-4-0613"}):
with patch("aider.models.Model"):
main(
["--4", "--no-git", "--exit", "--yes"], input=DummyInput(), output=DummyOutput()
)
# Look for the deprecation warning in all calls
deprecation_warning = None
for call_args in mock_tool_warning.call_args_list:
if "deprecated" in call_args[0][0] and "--model gpt4" in call_args[0][0]:
deprecation_warning = call_args[0][0]
break
self.assertIsNotNone(
deprecation_warning, "No deprecation warning with model alias found"
)
warning_msg = deprecation_warning
self.assertIn("--model gpt4", warning_msg)
self.assertNotIn("--model gpt-4-0613", warning_msg)
def test_model_is_set_correctly(self):
test_cases = [
("opus", "claude-3-opus-20240229"),
("sonnet", "anthropic/claude-3-7-sonnet-20250219"),
("haiku", "claude-3-5-haiku-20241022"),
("4", "gpt-4-0613"),
# Testing the dash variant with underscore in attribute name
("4o", "gpt-4o"),
("mini", "gpt-4o-mini"),
("4_turbo", "gpt-4-1106-preview"),
("35turbo", "gpt-3.5-turbo"),
("deepseek", "deepseek/deepseek-chat"),
("o1_mini", "o1-mini"),
("o1_preview", "o1-preview"),
]
for flag, expected_model in test_cases:
print(flag, expected_model)
with self.subTest(flag=flag):
# Create a mock IO instance
mock_io = MagicMock()
# Create args with ONLY the current flag set to True
args = MagicMock()
args.model = None
# Ensure all flags are False by default
for test_flag, _ in test_cases:
setattr(args, test_flag, False)
# Set only the current flag to True
setattr(args, flag, True)
dump(args)
# Call the handle_deprecated_model_args function
handle_deprecated_model_args(args, mock_io)
# Check that args.model was set to the expected model
self.assertEqual(args.model, expected_model)

View file

@ -108,29 +108,6 @@ Hope you like it!
edits = list(eb.find_original_update_blocks(edit))
self.assertEqual(edits, [("foo.txt", "Two\n", "Tooooo\n")])
def test_find_original_update_blocks_mangled_filename_w_source_tag(self):
source = "source"
edit = """
Here's the change:
<%s>foo.txt
<<<<<<< SEARCH
One
=======
Two
>>>>>>> REPLACE
</%s>
Hope you like it!
""" % (source, source)
fence = ("<%s>" % source, "</%s>" % source)
with self.assertRaises(ValueError) as cm:
_edits = list(eb.find_original_update_blocks(edit, fence))
self.assertIn("missing filename", str(cm.exception))
def test_find_original_update_blocks_quote_below_filename(self):
edit = """
Here's the change:
@ -181,10 +158,11 @@ Tooooo
oops!
>>>>>>> REPLACE
"""
with self.assertRaises(ValueError) as cm:
list(eb.find_original_update_blocks(edit))
_blocks = list(eb.find_original_update_blocks(edit))
self.assertIn("filename", str(cm.exception))
def test_find_original_update_blocks_no_final_newline(self):
@ -575,6 +553,66 @@ Hope you like it!
edits = list(eb.find_original_update_blocks(edit, fence=quad_backticks))
self.assertEqual(edits, [("foo.txt", "", "Tooooo\n")])
# Test for shell script blocks with sh language identifier (issue #3785)
def test_find_original_update_blocks_with_sh_language_identifier(self):
# https://github.com/Aider-AI/aider/issues/3785
edit = """
Here's a shell script:
```sh
test_hello.sh
<<<<<<< SEARCH
=======
#!/bin/bash
# Check if exactly one argument is provided
if [ "$#" -ne 1 ]; then
echo "Usage: $0 <argument>" >&2
exit 1
fi
# Echo the first argument
echo "$1"
exit 0
>>>>>>> REPLACE
```
"""
edits = list(eb.find_original_update_blocks(edit))
# Instead of comparing exact strings, check that we got the right file and structure
self.assertEqual(len(edits), 1)
self.assertEqual(edits[0][0], "test_hello.sh")
self.assertEqual(edits[0][1], "")
# Check that the content contains the expected shell script elements
result_content = edits[0][2]
self.assertIn("#!/bin/bash", result_content)
self.assertIn('if [ "$#" -ne 1 ];', result_content)
self.assertIn('echo "Usage: $0 <argument>"', result_content)
self.assertIn("exit 1", result_content)
self.assertIn('echo "$1"', result_content)
self.assertIn("exit 0", result_content)
# Test for C# code blocks with csharp language identifier
def test_find_original_update_blocks_with_csharp_language_identifier(self):
edit = """
Here's a C# code change:
```csharp
Program.cs
<<<<<<< SEARCH
Console.WriteLine("Hello World!");
=======
Console.WriteLine("Hello, C# World!");
>>>>>>> REPLACE
```
"""
edits = list(eb.find_original_update_blocks(edit))
search_text = 'Console.WriteLine("Hello World!");\n'
replace_text = 'Console.WriteLine("Hello, C# World!");\n'
self.assertEqual(edits, [("Program.cs", search_text, replace_text)])
if __name__ == "__main__":
unittest.main()

View file

@ -63,3 +63,22 @@ def test_context_window_error():
)
ex_info = ex.get_ex_info(ctx_error)
assert ex_info.retry is False
def test_openrouter_error():
"""Test specific handling of OpenRouter API errors"""
ex = LiteLLMExceptions()
from litellm import APIConnectionError
# Create an APIConnectionError with OpenrouterException message
openrouter_error = APIConnectionError(
message="APIConnectionError: OpenrouterException - 'choices'",
model="openrouter/model",
llm_provider="openrouter",
)
ex_info = ex.get_ex_info(openrouter_error)
assert ex_info.retry is True
assert "OpenRouter" in ex_info.description
assert "overloaded" in ex_info.description
assert "rate" in ex_info.description

View file

@ -34,6 +34,35 @@ class TestInputOutput(unittest.TestCase):
io = InputOutput(fancy_input=False)
self.assertFalse(io.pretty)
def test_color_initialization(self):
"""Test that color values are properly initialized with # prefix"""
# Test with hex colors without #
io = InputOutput(
user_input_color="00cc00",
tool_error_color="FF2222",
tool_warning_color="FFA500",
assistant_output_color="0088ff",
pretty=True,
)
# Check that # was added to hex colors
self.assertEqual(io.user_input_color, "#00cc00")
self.assertEqual(io.tool_error_color, "#FF2222")
self.assertEqual(io.tool_warning_color, "#FFA500") # Already had #
self.assertEqual(io.assistant_output_color, "#0088ff")
# Test with named colors (should be unchanged)
io = InputOutput(user_input_color="blue", tool_error_color="red", pretty=True)
self.assertEqual(io.user_input_color, "blue")
self.assertEqual(io.tool_error_color, "red")
# Test with pretty=False (should not modify colors)
io = InputOutput(user_input_color="00cc00", tool_error_color="FF2222", pretty=False)
self.assertIsNone(io.user_input_color)
self.assertIsNone(io.tool_error_color)
def test_dumb_terminal(self):
with patch.dict(os.environ, {"TERM": "dumb"}):
io = InputOutput(fancy_input=True)
@ -393,6 +422,59 @@ class TestInputOutputMultilineMode(unittest.TestCase):
io.prompt_ask("Test prompt?")
self.assertTrue(io.multiline_mode) # Should be restored
def test_ensure_hash_prefix(self):
"""Test that ensure_hash_prefix correctly adds # to valid hex colors"""
from aider.io import ensure_hash_prefix
# Test valid hex colors without #
self.assertEqual(ensure_hash_prefix("000"), "#000")
self.assertEqual(ensure_hash_prefix("fff"), "#fff")
self.assertEqual(ensure_hash_prefix("F00"), "#F00")
self.assertEqual(ensure_hash_prefix("123456"), "#123456")
self.assertEqual(ensure_hash_prefix("abcdef"), "#abcdef")
self.assertEqual(ensure_hash_prefix("ABCDEF"), "#ABCDEF")
# Test hex colors that already have #
self.assertEqual(ensure_hash_prefix("#000"), "#000")
self.assertEqual(ensure_hash_prefix("#123456"), "#123456")
# Test invalid inputs (should return unchanged)
self.assertEqual(ensure_hash_prefix(""), "")
self.assertEqual(ensure_hash_prefix(None), None)
self.assertEqual(ensure_hash_prefix("red"), "red") # Named color
self.assertEqual(ensure_hash_prefix("12345"), "12345") # Wrong length
self.assertEqual(ensure_hash_prefix("1234567"), "1234567") # Wrong length
self.assertEqual(ensure_hash_prefix("xyz"), "xyz") # Invalid hex chars
self.assertEqual(ensure_hash_prefix("12345g"), "12345g") # Invalid hex chars
def test_tool_output_color_handling(self):
"""Test that tool_output correctly handles hex colors without # prefix"""
from unittest.mock import patch
from rich.text import Text
# Create IO with hex color without # for tool_output_color
io = InputOutput(tool_output_color="FFA500", pretty=True)
# Patch console.print to avoid actual printing
with patch.object(io.console, "print") as mock_print:
# This would raise ColorParseError without the fix
io.tool_output("Test message")
# Verify the call was made without error
mock_print.assert_called_once()
# Verify the style was correctly created with # prefix
# The first argument is the message, second would be the style
kwargs = mock_print.call_args.kwargs
self.assertIn("style", kwargs)
# Test with other hex color
io = InputOutput(tool_output_color="00FF00", pretty=True)
with patch.object(io.console, "print") as mock_print:
io.tool_output("Test message")
mock_print.assert_called_once()
if __name__ == "__main__":
unittest.main()

View file

@ -14,7 +14,7 @@ from prompt_toolkit.output import DummyOutput
from aider.coders import Coder
from aider.dump import dump # noqa: F401
from aider.io import InputOutput
from aider.main import check_gitignore, main, setup_git
from aider.main import check_gitignore, load_dotenv_files, main, setup_git
from aider.utils import GitTemporaryDirectory, IgnorantTemporaryDirectory, make_repo
@ -684,6 +684,116 @@ class TestMain(TestCase):
)
self.assertTrue(coder.detect_urls)
def test_accepts_settings_warnings(self):
# Test that appropriate warnings are shown based on accepts_settings configuration
with GitTemporaryDirectory():
# Test model that accepts the thinking_tokens setting
with (
patch("aider.io.InputOutput.tool_warning") as mock_warning,
patch("aider.models.Model.set_thinking_tokens") as mock_set_thinking,
):
main(
[
"--model",
"anthropic/claude-3-7-sonnet-20250219",
"--thinking-tokens",
"1000",
"--yes",
"--exit",
],
input=DummyInput(),
output=DummyOutput(),
)
# No warning should be shown as this model accepts thinking_tokens
for call in mock_warning.call_args_list:
self.assertNotIn("thinking_tokens", call[0][0])
# Method should be called
mock_set_thinking.assert_called_once_with("1000")
# Test model that doesn't have accepts_settings for thinking_tokens
with (
patch("aider.io.InputOutput.tool_warning") as mock_warning,
patch("aider.models.Model.set_thinking_tokens") as mock_set_thinking,
):
main(
[
"--model",
"gpt-4o",
"--thinking-tokens",
"1000",
"--check-model-accepts-settings",
"--yes",
"--exit",
],
input=DummyInput(),
output=DummyOutput(),
)
# Warning should be shown
warning_shown = False
for call in mock_warning.call_args_list:
if "thinking_tokens" in call[0][0]:
warning_shown = True
self.assertTrue(warning_shown)
# Method should NOT be called because model doesn't support it and check flag is on
mock_set_thinking.assert_not_called()
# Test model that accepts the reasoning_effort setting
with (
patch("aider.io.InputOutput.tool_warning") as mock_warning,
patch("aider.models.Model.set_reasoning_effort") as mock_set_reasoning,
):
main(
["--model", "o1", "--reasoning-effort", "3", "--yes", "--exit"],
input=DummyInput(),
output=DummyOutput(),
)
# No warning should be shown as this model accepts reasoning_effort
for call in mock_warning.call_args_list:
self.assertNotIn("reasoning_effort", call[0][0])
# Method should be called
mock_set_reasoning.assert_called_once_with("3")
# Test model that doesn't have accepts_settings for reasoning_effort
with (
patch("aider.io.InputOutput.tool_warning") as mock_warning,
patch("aider.models.Model.set_reasoning_effort") as mock_set_reasoning,
):
main(
["--model", "gpt-3.5-turbo", "--reasoning-effort", "3", "--yes", "--exit"],
input=DummyInput(),
output=DummyOutput(),
)
# Warning should be shown
warning_shown = False
for call in mock_warning.call_args_list:
if "reasoning_effort" in call[0][0]:
warning_shown = True
self.assertTrue(warning_shown)
# Method should still be called by default
mock_set_reasoning.assert_not_called()
@patch("aider.models.ModelInfoManager.set_verify_ssl")
def test_no_verify_ssl_sets_model_info_manager(self, mock_set_verify_ssl):
with GitTemporaryDirectory():
# Mock Model class to avoid actual model initialization
with patch("aider.models.Model") as mock_model:
# Configure the mock to avoid the TypeError
mock_model.return_value.info = {}
mock_model.return_value.name = "gpt-4" # Add a string name
mock_model.return_value.validate_environment.return_value = {
"missing_keys": [],
"keys_in_environment": [],
}
# Mock fuzzy_match_models to avoid string operations on MagicMock
with patch("aider.models.fuzzy_match_models", return_value=[]):
main(
["--no-verify-ssl", "--exit", "--yes"],
input=DummyInput(),
output=DummyOutput(),
)
mock_set_verify_ssl.assert_called_once_with(False)
def test_pytest_env_vars(self):
# Verify that environment variables from pytest.ini are properly set
self.assertEqual(os.environ.get("AIDER_ANALYTICS"), "false")
@ -741,6 +851,102 @@ class TestMain(TestCase):
result = main(["--api-key", "INVALID_FORMAT", "--exit", "--yes"])
self.assertEqual(result, 1)
def test_git_config_include(self):
# Test that aider respects git config includes for user.name and user.email
with GitTemporaryDirectory() as git_dir:
git_dir = Path(git_dir)
# Create an includable config file with user settings
include_config = git_dir / "included.gitconfig"
include_config.write_text(
"[user]\n name = Included User\n email = included@example.com\n"
)
# Set up main git config to include the other file
repo = git.Repo(git_dir)
include_path = str(include_config).replace("\\", "/")
repo.git.config("--local", "include.path", str(include_path))
# Verify the config is set up correctly using git command
self.assertEqual(repo.git.config("user.name"), "Included User")
self.assertEqual(repo.git.config("user.email"), "included@example.com")
# Manually check the git config file to confirm include directive
git_config_path = git_dir / ".git" / "config"
git_config_content = git_config_path.read_text()
# Run aider and verify it doesn't change the git config
main(["--yes", "--exit"], input=DummyInput(), output=DummyOutput())
# Check that the user settings are still the same using git command
repo = git.Repo(git_dir) # Re-open repo to ensure we get fresh config
self.assertEqual(repo.git.config("user.name"), "Included User")
self.assertEqual(repo.git.config("user.email"), "included@example.com")
# Manually check the git config file again to ensure it wasn't modified
git_config_content_after = git_config_path.read_text()
self.assertEqual(git_config_content, git_config_content_after)
def test_git_config_include_directive(self):
# Test that aider respects the include directive in git config
with GitTemporaryDirectory() as git_dir:
git_dir = Path(git_dir)
# Create an includable config file with user settings
include_config = git_dir / "included.gitconfig"
include_config.write_text(
"[user]\n name = Directive User\n email = directive@example.com\n"
)
# Set up main git config with include directive
git_config = git_dir / ".git" / "config"
# Use normalized path with forward slashes for git config
include_path = str(include_config).replace("\\", "/")
with open(git_config, "a") as f:
f.write(f"\n[include]\n path = {include_path}\n")
# Read the modified config file
modified_config_content = git_config.read_text()
# Verify the include directive was added correctly
self.assertIn("[include]", modified_config_content)
# Verify the config is set up correctly using git command
repo = git.Repo(git_dir)
self.assertEqual(repo.git.config("user.name"), "Directive User")
self.assertEqual(repo.git.config("user.email"), "directive@example.com")
# Run aider and verify it doesn't change the git config
main(["--yes", "--exit"], input=DummyInput(), output=DummyOutput())
# Check that the git config file wasn't modified
config_after_aider = git_config.read_text()
self.assertEqual(modified_config_content, config_after_aider)
# Check that the user settings are still the same using git command
repo = git.Repo(git_dir) # Re-open repo to ensure we get fresh config
self.assertEqual(repo.git.config("user.name"), "Directive User")
self.assertEqual(repo.git.config("user.email"), "directive@example.com")
def test_resolve_aiderignore_path(self):
# Import the function directly to test it
from aider.args import resolve_aiderignore_path
# Test with absolute path
abs_path = os.path.abspath("/tmp/test/.aiderignore")
self.assertEqual(resolve_aiderignore_path(abs_path), abs_path)
# Test with relative path and git root
git_root = "/path/to/git/root"
rel_path = ".aiderignore"
self.assertEqual(
resolve_aiderignore_path(rel_path, git_root), str(Path(git_root) / rel_path)
)
# Test with relative path and no git root
rel_path = ".aiderignore"
self.assertEqual(resolve_aiderignore_path(rel_path), rel_path)
def test_invalid_edit_format(self):
with GitTemporaryDirectory():
with patch("aider.io.InputOutput.offer_url") as mock_offer_url:
@ -777,7 +983,7 @@ class TestMain(TestCase):
coder = main(
["--exit", "--yes"], input=DummyInput(), output=DummyOutput(), return_coder=True
)
self.assertIn("openrouter/anthropic/claude", coder.main_model.name.lower())
self.assertIn("openrouter/", coder.main_model.name.lower())
del os.environ["OPENROUTER_API_KEY"]
# Test OpenAI API key
@ -793,12 +999,15 @@ class TestMain(TestCase):
coder = main(
["--exit", "--yes"], input=DummyInput(), output=DummyOutput(), return_coder=True
)
self.assertIn("flash", coder.main_model.name.lower())
self.assertIn("gemini", coder.main_model.name.lower())
del os.environ["GEMINI_API_KEY"]
# Test no API keys
result = main(["--exit", "--yes"], input=DummyInput(), output=DummyOutput())
self.assertEqual(result, 1)
# Test no API keys - should offer OpenRouter OAuth
with patch("aider.onboarding.offer_openrouter_oauth") as mock_offer_oauth:
mock_offer_oauth.return_value = None # Simulate user declining or failure
result = main(["--exit", "--yes"], input=DummyInput(), output=DummyOutput())
self.assertEqual(result, 1) # Expect failure since no model could be selected
mock_offer_oauth.assert_called_once()
def test_model_precedence(self):
with GitTemporaryDirectory():
@ -836,7 +1045,7 @@ class TestMain(TestCase):
def test_reasoning_effort_option(self):
coder = main(
["--reasoning-effort", "3", "--yes", "--exit"],
["--reasoning-effort", "3", "--no-check-model-accepts-settings", "--yes", "--exit"],
input=DummyInput(),
output=DummyOutput(),
return_coder=True,
@ -844,3 +1053,295 @@ class TestMain(TestCase):
self.assertEqual(
coder.main_model.extra_params.get("extra_body", {}).get("reasoning_effort"), "3"
)
def test_thinking_tokens_option(self):
coder = main(
["--model", "sonnet", "--thinking-tokens", "1000", "--yes", "--exit"],
input=DummyInput(),
output=DummyOutput(),
return_coder=True,
)
self.assertEqual(
coder.main_model.extra_params.get("thinking", {}).get("budget_tokens"), 1000
)
def test_list_models_includes_metadata_models(self):
# Test that models from model-metadata.json appear in list-models output
with GitTemporaryDirectory():
# Create a temporary model-metadata.json with test models
metadata_file = Path(".aider.model.metadata.json")
test_models = {
"unique-model-name": {
"max_input_tokens": 8192,
"litellm_provider": "test-provider",
"mode": "chat", # Added mode attribute
},
"another-provider/another-unique-model": {
"max_input_tokens": 4096,
"litellm_provider": "another-provider",
"mode": "chat", # Added mode attribute
},
}
metadata_file.write_text(json.dumps(test_models))
# Capture stdout to check the output
with patch("sys.stdout", new_callable=StringIO) as mock_stdout:
main(
[
"--list-models",
"unique-model",
"--model-metadata-file",
str(metadata_file),
"--yes",
"--no-gitignore",
],
input=DummyInput(),
output=DummyOutput(),
)
output = mock_stdout.getvalue()
# Check that the unique model name from our metadata file is listed
self.assertIn("test-provider/unique-model-name", output)
def test_list_models_includes_all_model_sources(self):
# Test that models from both litellm.model_cost and model-metadata.json
# appear in list-models
with GitTemporaryDirectory():
# Create a temporary model-metadata.json with test models
metadata_file = Path(".aider.model.metadata.json")
test_models = {
"metadata-only-model": {
"max_input_tokens": 8192,
"litellm_provider": "test-provider",
"mode": "chat", # Added mode attribute
}
}
metadata_file.write_text(json.dumps(test_models))
# Capture stdout to check the output
with patch("sys.stdout", new_callable=StringIO) as mock_stdout:
main(
[
"--list-models",
"metadata-only-model",
"--model-metadata-file",
str(metadata_file),
"--yes",
"--no-gitignore",
],
input=DummyInput(),
output=DummyOutput(),
)
output = mock_stdout.getvalue()
dump(output)
# Check that both models appear in the output
self.assertIn("test-provider/metadata-only-model", output)
def test_check_model_accepts_settings_flag(self):
# Test that --check-model-accepts-settings affects whether settings are applied
with GitTemporaryDirectory():
# When flag is on, setting shouldn't be applied to non-supporting model
with patch("aider.models.Model.set_thinking_tokens") as mock_set_thinking:
main(
[
"--model",
"gpt-4o",
"--thinking-tokens",
"1000",
"--check-model-accepts-settings",
"--yes",
"--exit",
],
input=DummyInput(),
output=DummyOutput(),
)
# Method should not be called because model doesn't support it and flag is on
mock_set_thinking.assert_not_called()
def test_list_models_with_direct_resource_patch(self):
# Test that models from resources/model-metadata.json are included in list-models output
with GitTemporaryDirectory():
# Create a temporary file with test model metadata
test_file = Path(self.tempdir) / "test-model-metadata.json"
test_resource_models = {
"special-model": {
"max_input_tokens": 8192,
"litellm_provider": "resource-provider",
"mode": "chat",
}
}
test_file.write_text(json.dumps(test_resource_models))
# Create a mock for the resource file path
mock_resource_path = MagicMock()
mock_resource_path.__str__.return_value = str(test_file)
# Create a mock for the files function that returns an object with joinpath
mock_files = MagicMock()
mock_files.joinpath.return_value = mock_resource_path
with patch("aider.main.importlib_resources.files", return_value=mock_files):
# Capture stdout to check the output
with patch("sys.stdout", new_callable=StringIO) as mock_stdout:
main(
["--list-models", "special", "--yes", "--no-gitignore"],
input=DummyInput(),
output=DummyOutput(),
)
output = mock_stdout.getvalue()
# Check that the resource model appears in the output
self.assertIn("resource-provider/special-model", output)
# When flag is off, setting should be applied regardless of support
with patch("aider.models.Model.set_reasoning_effort") as mock_set_reasoning:
main(
[
"--model",
"gpt-3.5-turbo",
"--reasoning-effort",
"3",
"--no-check-model-accepts-settings",
"--yes",
"--exit",
],
input=DummyInput(),
output=DummyOutput(),
)
# Method should be called because flag is off
mock_set_reasoning.assert_called_once_with("3")
def test_model_accepts_settings_attribute(self):
with GitTemporaryDirectory():
# Test with a model where we override the accepts_settings attribute
with patch("aider.models.Model") as MockModel:
# Setup mock model instance to simulate accepts_settings attribute
mock_instance = MockModel.return_value
mock_instance.name = "test-model"
mock_instance.accepts_settings = ["reasoning_effort"]
mock_instance.validate_environment.return_value = {
"missing_keys": [],
"keys_in_environment": [],
}
mock_instance.info = {}
mock_instance.weak_model_name = None
mock_instance.get_weak_model.return_value = None
# Run with both settings, but model only accepts reasoning_effort
main(
[
"--model",
"test-model",
"--reasoning-effort",
"3",
"--thinking-tokens",
"1000",
"--check-model-accepts-settings",
"--yes",
"--exit",
],
input=DummyInput(),
output=DummyOutput(),
)
# Only set_reasoning_effort should be called, not set_thinking_tokens
mock_instance.set_reasoning_effort.assert_called_once_with("3")
mock_instance.set_thinking_tokens.assert_not_called()
@patch("aider.main.InputOutput")
def test_stream_and_cache_warning(self, MockInputOutput):
mock_io_instance = MockInputOutput.return_value
with GitTemporaryDirectory():
main(
["--stream", "--cache-prompts", "--exit", "--yes"],
input=DummyInput(),
output=DummyOutput(),
)
mock_io_instance.tool_warning.assert_called_with(
"Cost estimates may be inaccurate when using streaming and caching."
)
@patch("aider.main.InputOutput")
def test_stream_without_cache_no_warning(self, MockInputOutput):
mock_io_instance = MockInputOutput.return_value
with GitTemporaryDirectory():
main(
["--stream", "--exit", "--yes"],
input=DummyInput(),
output=DummyOutput(),
)
for call in mock_io_instance.tool_warning.call_args_list:
self.assertNotIn("Cost estimates may be inaccurate", call[0][0])
def test_load_dotenv_files_override(self):
with GitTemporaryDirectory() as git_dir:
git_dir = Path(git_dir)
# Create fake home and .aider directory
fake_home = git_dir / "fake_home"
fake_home.mkdir()
aider_dir = fake_home / ".aider"
aider_dir.mkdir()
# Create oauth keys file
oauth_keys_file = aider_dir / "oauth-keys.env"
oauth_keys_file.write_text("OAUTH_VAR=oauth_val\nSHARED_VAR=oauth_shared\n")
# Create git root .env file
git_root_env = git_dir / ".env"
git_root_env.write_text("GIT_VAR=git_val\nSHARED_VAR=git_shared\n")
# Create CWD .env file in a subdir
cwd_subdir = git_dir / "subdir"
cwd_subdir.mkdir()
cwd_env = cwd_subdir / ".env"
cwd_env.write_text("CWD_VAR=cwd_val\nSHARED_VAR=cwd_shared\n")
# Change to subdir
original_cwd = os.getcwd()
os.chdir(cwd_subdir)
# Clear relevant env vars before test
for var in ["OAUTH_VAR", "SHARED_VAR", "GIT_VAR", "CWD_VAR"]:
if var in os.environ:
del os.environ[var]
with patch("pathlib.Path.home", return_value=fake_home):
loaded_files = load_dotenv_files(str(git_dir), None)
# Assert files were loaded in expected order (oauth first)
self.assertIn(str(oauth_keys_file.resolve()), loaded_files)
self.assertIn(str(git_root_env.resolve()), loaded_files)
self.assertIn(str(cwd_env.resolve()), loaded_files)
self.assertLess(
loaded_files.index(str(oauth_keys_file.resolve())),
loaded_files.index(str(git_root_env.resolve())),
)
self.assertLess(
loaded_files.index(str(git_root_env.resolve())),
loaded_files.index(str(cwd_env.resolve())),
)
# Assert environment variables reflect the override order
self.assertEqual(os.environ.get("OAUTH_VAR"), "oauth_val")
self.assertEqual(os.environ.get("GIT_VAR"), "git_val")
self.assertEqual(os.environ.get("CWD_VAR"), "cwd_val")
# SHARED_VAR should be overridden by the last loaded file (cwd .env)
self.assertEqual(os.environ.get("SHARED_VAR"), "cwd_shared")
# Restore CWD
os.chdir(original_cwd)
@patch("aider.main.InputOutput")
def test_cache_without_stream_no_warning(self, MockInputOutput):
mock_io_instance = MockInputOutput.return_value
with GitTemporaryDirectory():
main(
["--cache-prompts", "--exit", "--yes", "--no-stream"],
input=DummyInput(),
output=DummyOutput(),
)
for call in mock_io_instance.tool_warning.call_args_list:
self.assertNotIn("Cost estimates may be inaccurate", call[0][0])

View file

@ -0,0 +1,80 @@
import os
import tempfile
from pathlib import Path
from unittest import TestCase
from unittest.mock import MagicMock, patch
from aider.models import ModelInfoManager
class TestModelInfoManager(TestCase):
def setUp(self):
self.original_env = os.environ.copy()
self.manager = ModelInfoManager()
# Create a temporary directory for cache
self.temp_dir = tempfile.TemporaryDirectory()
self.manager.cache_dir = Path(self.temp_dir.name)
self.manager.cache_file = self.manager.cache_dir / "model_prices_and_context_window.json"
self.manager.cache_dir.mkdir(exist_ok=True)
def tearDown(self):
self.temp_dir.cleanup()
os.environ.clear()
os.environ.update(self.original_env)
@patch("requests.get")
def test_update_cache_respects_verify_ssl(self, mock_get):
# Setup mock response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"test_model": {"max_tokens": 4096}}
mock_get.return_value = mock_response
# Test with default verify_ssl=True
self.manager._update_cache()
mock_get.assert_called_with(self.manager.MODEL_INFO_URL, timeout=5, verify=True)
# Test with verify_ssl=False
mock_get.reset_mock()
self.manager.set_verify_ssl(False)
self.manager._update_cache()
mock_get.assert_called_with(self.manager.MODEL_INFO_URL, timeout=5, verify=False)
def test_lazy_loading_cache(self):
# Create a cache file
self.manager.cache_file.write_text('{"test_model": {"max_tokens": 4096}}')
# Verify cache is not loaded on initialization
self.assertFalse(self.manager._cache_loaded)
self.assertIsNone(self.manager.content)
# Access content through get_model_from_cached_json_db
with patch.object(self.manager, "_update_cache") as mock_update:
result = self.manager.get_model_from_cached_json_db("test_model")
# Verify cache was loaded
self.assertTrue(self.manager._cache_loaded)
self.assertIsNotNone(self.manager.content)
self.assertEqual(result, {"max_tokens": 4096})
# Verify _update_cache was not called since cache exists and is valid
mock_update.assert_not_called()
@patch("requests.get")
def test_verify_ssl_setting_before_cache_loading(self, mock_get):
# Setup mock response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"test_model": {"max_tokens": 4096}}
mock_get.return_value = mock_response
# Set verify_ssl to False before any cache operations
self.manager.set_verify_ssl(False)
# Force cache update by making it look expired
with patch("time.time", return_value=9999999999):
# This should trigger _update_cache
self.manager.get_model_from_cached_json_db("test_model")
# Verify _update_cache was called with verify=False
mock_get.assert_called_with(self.manager.MODEL_INFO_URL, timeout=5, verify=False)

View file

@ -105,6 +105,21 @@ class TestModels(unittest.TestCase):
any("bogus-model" in msg for msg in warning_messages)
) # Check that one of the warnings mentions the bogus model
@patch("aider.models.check_for_dependencies")
def test_sanity_check_model_calls_check_dependencies(self, mock_check_deps):
"""Test that sanity_check_model calls check_for_dependencies"""
mock_io = MagicMock()
model = MagicMock()
model.name = "test-model"
model.missing_keys = []
model.keys_in_environment = True
model.info = {"some": "info"}
sanity_check_model(mock_io, model)
# Verify check_for_dependencies was called with the model name
mock_check_deps.assert_called_once_with(mock_io, "test-model")
def test_model_aliases(self):
# Test common aliases
model = Model("4")
@ -145,6 +160,103 @@ class TestModels(unittest.TestCase):
self.assertEqual(model.name, "github/o1-preview")
self.assertEqual(model.use_temperature, False)
def test_parse_token_value(self):
# Create a model instance to test the parse_token_value method
model = Model("gpt-4")
# Test integer inputs
self.assertEqual(model.parse_token_value(8096), 8096)
self.assertEqual(model.parse_token_value(1000), 1000)
# Test string inputs
self.assertEqual(model.parse_token_value("8096"), 8096)
# Test k/K suffix (kilobytes)
self.assertEqual(model.parse_token_value("8k"), 8 * 1024)
self.assertEqual(model.parse_token_value("8K"), 8 * 1024)
self.assertEqual(model.parse_token_value("10.5k"), 10.5 * 1024)
self.assertEqual(model.parse_token_value("0.5K"), 0.5 * 1024)
# Test m/M suffix (megabytes)
self.assertEqual(model.parse_token_value("1m"), 1 * 1024 * 1024)
self.assertEqual(model.parse_token_value("1M"), 1 * 1024 * 1024)
self.assertEqual(model.parse_token_value("0.5M"), 0.5 * 1024 * 1024)
# Test with spaces
self.assertEqual(model.parse_token_value(" 8k "), 8 * 1024)
# Test conversion from other types
self.assertEqual(model.parse_token_value(8.0), 8)
def test_set_thinking_tokens(self):
# Test that set_thinking_tokens correctly sets the tokens with different formats
model = Model("gpt-4")
# Test with integer
model.set_thinking_tokens(8096)
self.assertEqual(model.extra_params["thinking"]["budget_tokens"], 8096)
self.assertFalse(model.use_temperature)
# Test with string
model.set_thinking_tokens("10k")
self.assertEqual(model.extra_params["thinking"]["budget_tokens"], 10 * 1024)
# Test with decimal value
model.set_thinking_tokens("0.5M")
self.assertEqual(model.extra_params["thinking"]["budget_tokens"], 0.5 * 1024 * 1024)
@patch("aider.models.check_pip_install_extra")
def test_check_for_dependencies_bedrock(self, mock_check_pip):
"""Test that check_for_dependencies calls check_pip_install_extra for Bedrock models"""
from aider.io import InputOutput
io = InputOutput()
# Test with a Bedrock model
from aider.models import check_for_dependencies
check_for_dependencies(io, "bedrock/anthropic.claude-3-sonnet-20240229-v1:0")
# Verify check_pip_install_extra was called with correct arguments
mock_check_pip.assert_called_once_with(
io, "boto3", "AWS Bedrock models require the boto3 package.", ["boto3"]
)
@patch("aider.models.check_pip_install_extra")
def test_check_for_dependencies_vertex_ai(self, mock_check_pip):
"""Test that check_for_dependencies calls check_pip_install_extra for Vertex AI models"""
from aider.io import InputOutput
io = InputOutput()
# Test with a Vertex AI model
from aider.models import check_for_dependencies
check_for_dependencies(io, "vertex_ai/gemini-1.5-pro")
# Verify check_pip_install_extra was called with correct arguments
mock_check_pip.assert_called_once_with(
io,
"google.cloud.aiplatform",
"Google Vertex AI models require the google-cloud-aiplatform package.",
["google-cloud-aiplatform"],
)
@patch("aider.models.check_pip_install_extra")
def test_check_for_dependencies_other_model(self, mock_check_pip):
"""Test that check_for_dependencies doesn't call check_pip_install_extra for other models"""
from aider.io import InputOutput
io = InputOutput()
# Test with a non-Bedrock, non-Vertex AI model
from aider.models import check_for_dependencies
check_for_dependencies(io, "gpt-4")
# Verify check_pip_install_extra was not called
mock_check_pip.assert_not_called()
def test_get_repo_map_tokens(self):
# Test default case (no max_input_tokens in info)
model = Model("gpt-4")
@ -210,7 +322,7 @@ class TestModels(unittest.TestCase):
self.assertTrue(model.use_repo_map)
self.assertTrue(model.examples_as_sys_msg)
self.assertFalse(model.use_temperature)
self.assertEqual(model.remove_reasoning, "think")
self.assertEqual(model.reasoning_tag, "think")
# Test provider/deepseek-r1 case
model = Model("someprovider/deepseek-r1")
@ -218,7 +330,7 @@ class TestModels(unittest.TestCase):
self.assertTrue(model.use_repo_map)
self.assertTrue(model.examples_as_sys_msg)
self.assertFalse(model.use_temperature)
self.assertEqual(model.remove_reasoning, "think")
self.assertEqual(model.reasoning_tag, "think")
# Test provider/deepseek-v3 case
model = Model("anotherprovider/deepseek-v3")
@ -262,66 +374,6 @@ class TestModels(unittest.TestCase):
self.assertEqual(model.editor_edit_format, "editor-diff")
self.assertTrue(model.use_repo_map)
def test_remove_reasoning_content(self):
# Test with no removal configured
model = Model("gpt-4")
text = "Here is <think>some reasoning</think> and regular text"
self.assertEqual(model.remove_reasoning_content(text), text)
# Test with removal configured
model = Model("deepseek-r1") # This model has remove_reasoning="think"
text = """Here is some text
<think>
This is reasoning that should be removed
Over multiple lines
</think>
And more text here"""
expected = """Here is some text
And more text here"""
self.assertEqual(model.remove_reasoning_content(text), expected)
# Test with multiple reasoning blocks
text = """Start
<think>Block 1</think>
Middle
<think>Block 2</think>
End"""
expected = """Start
Middle
End"""
self.assertEqual(model.remove_reasoning_content(text), expected)
# Test with no reasoning blocks
text = "Just regular text"
self.assertEqual(model.remove_reasoning_content(text), text)
@patch("aider.models.litellm.completion")
def test_simple_send_with_retries_removes_reasoning(self, mock_completion):
model = Model("deepseek-r1") # This model has remove_reasoning="think"
# Mock the completion response
mock_response = MagicMock()
mock_response.choices = [MagicMock(message=MagicMock(content="""Here is some text
<think>
This reasoning should be removed
</think>
And this text should remain"""))]
mock_completion.return_value = mock_response
messages = [{"role": "user", "content": "test"}]
result = model.simple_send_with_retries(messages)
expected = """Here is some text
And this text should remain"""
self.assertEqual(result, expected)
# Verify the completion was called
mock_completion.assert_called_once()
def test_aider_extra_model_settings(self):
import tempfile

View file

@ -0,0 +1,439 @@
import argparse
import base64
import hashlib
import os
import unittest
from unittest.mock import MagicMock, patch
import requests
# Import the functions to be tested
from aider.onboarding import (
check_openrouter_tier,
exchange_code_for_key,
find_available_port,
generate_pkce_codes,
offer_openrouter_oauth,
select_default_model,
try_to_select_default_model,
)
# Mock the Analytics class as it's used in some functions
class DummyAnalytics:
def event(self, *args, **kwargs):
pass
# Mock the InputOutput class
class DummyIO:
def tool_output(self, *args, **kwargs):
pass
def tool_warning(self, *args, **kwargs):
pass
def tool_error(self, *args, **kwargs):
pass
def confirm_ask(self, *args, **kwargs):
return False # Default to no confirmation
def offer_url(self, *args, **kwargs):
pass
class TestOnboarding(unittest.TestCase):
@patch("requests.get")
def test_check_openrouter_tier_free(self, mock_get):
"""Test check_openrouter_tier identifies free tier."""
mock_response = MagicMock()
mock_response.json.return_value = {"data": {"is_free_tier": True}}
mock_response.raise_for_status.return_value = None
mock_get.return_value = mock_response
self.assertTrue(check_openrouter_tier("fake_key"))
mock_get.assert_called_once_with(
"https://openrouter.ai/api/v1/auth/key",
headers={"Authorization": "Bearer fake_key"},
timeout=5,
)
@patch("requests.get")
def test_check_openrouter_tier_paid(self, mock_get):
"""Test check_openrouter_tier identifies paid tier."""
mock_response = MagicMock()
mock_response.json.return_value = {"data": {"is_free_tier": False}}
mock_response.raise_for_status.return_value = None
mock_get.return_value = mock_response
self.assertFalse(check_openrouter_tier("fake_key"))
@patch("requests.get")
def test_check_openrouter_tier_api_error(self, mock_get):
"""Test check_openrouter_tier defaults to free on API error."""
mock_get.side_effect = requests.exceptions.RequestException("API Error")
self.assertTrue(check_openrouter_tier("fake_key"))
@patch("requests.get")
def test_check_openrouter_tier_missing_key(self, mock_get):
"""Test check_openrouter_tier defaults to free if key is missing in response."""
mock_response = MagicMock()
mock_response.json.return_value = {"data": {}} # Missing 'is_free_tier'
mock_response.raise_for_status.return_value = None
mock_get.return_value = mock_response
self.assertTrue(check_openrouter_tier("fake_key"))
@patch("aider.onboarding.check_openrouter_tier")
@patch.dict(os.environ, {}, clear=True)
def test_try_select_default_model_no_keys(self, mock_check_tier):
"""Test no model is selected when no keys are present."""
self.assertIsNone(try_to_select_default_model())
mock_check_tier.assert_not_called()
@patch("aider.onboarding.check_openrouter_tier", return_value=True) # Assume free tier
@patch.dict(os.environ, {"OPENROUTER_API_KEY": "or_key"}, clear=True)
def test_try_select_default_model_openrouter_free(self, mock_check_tier):
"""Test OpenRouter free model selection."""
self.assertEqual(
try_to_select_default_model(), "openrouter/google/gemini-2.5-pro-exp-03-25:free"
)
mock_check_tier.assert_called_once_with("or_key")
@patch("aider.onboarding.check_openrouter_tier", return_value=False) # Assume paid tier
@patch.dict(os.environ, {"OPENROUTER_API_KEY": "or_key"}, clear=True)
def test_try_select_default_model_openrouter_paid(self, mock_check_tier):
"""Test OpenRouter paid model selection."""
self.assertEqual(try_to_select_default_model(), "openrouter/anthropic/claude-3.7-sonnet")
mock_check_tier.assert_called_once_with("or_key")
@patch("aider.onboarding.check_openrouter_tier")
@patch.dict(os.environ, {"ANTHROPIC_API_KEY": "an_key"}, clear=True)
def test_try_select_default_model_anthropic(self, mock_check_tier):
"""Test Anthropic model selection."""
self.assertEqual(try_to_select_default_model(), "sonnet")
mock_check_tier.assert_not_called()
@patch("aider.onboarding.check_openrouter_tier")
@patch.dict(os.environ, {"DEEPSEEK_API_KEY": "ds_key"}, clear=True)
def test_try_select_default_model_deepseek(self, mock_check_tier):
"""Test Deepseek model selection."""
self.assertEqual(try_to_select_default_model(), "deepseek")
mock_check_tier.assert_not_called()
@patch("aider.onboarding.check_openrouter_tier")
@patch.dict(os.environ, {"OPENAI_API_KEY": "oa_key"}, clear=True)
def test_try_select_default_model_openai(self, mock_check_tier):
"""Test OpenAI model selection."""
self.assertEqual(try_to_select_default_model(), "gpt-4o")
mock_check_tier.assert_not_called()
@patch("aider.onboarding.check_openrouter_tier")
@patch.dict(os.environ, {"GEMINI_API_KEY": "gm_key"}, clear=True)
def test_try_select_default_model_gemini(self, mock_check_tier):
"""Test Gemini model selection."""
self.assertEqual(try_to_select_default_model(), "gemini/gemini-2.5-pro-exp-03-25")
mock_check_tier.assert_not_called()
@patch("aider.onboarding.check_openrouter_tier")
@patch.dict(os.environ, {"VERTEXAI_PROJECT": "vx_proj"}, clear=True)
def test_try_select_default_model_vertex(self, mock_check_tier):
"""Test Vertex AI model selection."""
self.assertEqual(try_to_select_default_model(), "vertex_ai/gemini-2.5-pro-exp-03-25")
mock_check_tier.assert_not_called()
@patch("aider.onboarding.check_openrouter_tier", return_value=False) # Paid
@patch.dict(
os.environ, {"OPENROUTER_API_KEY": "or_key", "OPENAI_API_KEY": "oa_key"}, clear=True
)
def test_try_select_default_model_priority_openrouter(self, mock_check_tier):
"""Test OpenRouter key takes priority."""
self.assertEqual(try_to_select_default_model(), "openrouter/anthropic/claude-3.7-sonnet")
mock_check_tier.assert_called_once_with("or_key")
@patch("aider.onboarding.check_openrouter_tier")
@patch.dict(os.environ, {"ANTHROPIC_API_KEY": "an_key", "OPENAI_API_KEY": "oa_key"}, clear=True)
def test_try_select_default_model_priority_anthropic(self, mock_check_tier):
"""Test Anthropic key takes priority over OpenAI."""
self.assertEqual(try_to_select_default_model(), "sonnet")
mock_check_tier.assert_not_called()
@patch("socketserver.TCPServer")
def test_find_available_port_success(self, mock_tcp_server):
"""Test finding an available port."""
# Simulate port 8484 being available
mock_tcp_server.return_value.__enter__.return_value = None # Allow context manager
port = find_available_port(start_port=8484, end_port=8484)
self.assertEqual(port, 8484)
mock_tcp_server.assert_called_once_with(("localhost", 8484), None)
@patch("socketserver.TCPServer")
def test_find_available_port_in_use(self, mock_tcp_server):
"""Test finding the next available port if the first is in use."""
# Simulate port 8484 raising OSError, 8485 being available
mock_tcp_server.side_effect = [OSError, MagicMock()]
mock_tcp_server.return_value.__enter__.return_value = None # Allow context manager
port = find_available_port(start_port=8484, end_port=8485)
self.assertEqual(port, 8485)
self.assertEqual(mock_tcp_server.call_count, 2)
mock_tcp_server.assert_any_call(("localhost", 8484), None)
mock_tcp_server.assert_any_call(("localhost", 8485), None)
@patch("socketserver.TCPServer", side_effect=OSError)
def test_find_available_port_none_available(self, mock_tcp_server):
"""Test returning None if no ports are available in the range."""
port = find_available_port(start_port=8484, end_port=8485)
self.assertIsNone(port)
self.assertEqual(mock_tcp_server.call_count, 2) # Tried 8484 and 8485
def test_generate_pkce_codes(self):
"""Test PKCE code generation."""
verifier, challenge = generate_pkce_codes()
self.assertIsInstance(verifier, str)
self.assertIsInstance(challenge, str)
self.assertGreater(len(verifier), 40) # Check reasonable length
self.assertGreater(len(challenge), 40)
# Verify the challenge is the SHA256 hash of the verifier, base64 encoded
hasher = hashlib.sha256()
hasher.update(verifier.encode("utf-8"))
expected_challenge = base64.urlsafe_b64encode(hasher.digest()).rstrip(b"=").decode("utf-8")
self.assertEqual(challenge, expected_challenge)
@patch("requests.post")
def test_exchange_code_for_key_success(self, mock_post):
"""Test successful code exchange for API key."""
mock_response = MagicMock()
mock_response.json.return_value = {"key": "test_api_key"}
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
io_mock = DummyIO()
api_key = exchange_code_for_key("auth_code", "verifier", io_mock)
self.assertEqual(api_key, "test_api_key")
mock_post.assert_called_once_with(
"https://openrouter.ai/api/v1/auth/keys",
headers={"Content-Type": "application/json"},
json={
"code": "auth_code",
"code_verifier": "verifier",
"code_challenge_method": "S256",
},
timeout=30,
)
@patch("requests.post")
def test_exchange_code_for_key_missing_key(self, mock_post):
"""Test code exchange when 'key' is missing in response."""
mock_response = MagicMock()
mock_response.json.return_value = {"other_data": "value"} # Missing 'key'
mock_response.raise_for_status.return_value = None
mock_response.text = '{"other_data": "value"}'
mock_post.return_value = mock_response
io_mock = DummyIO()
io_mock.tool_error = MagicMock() # Track error output
api_key = exchange_code_for_key("auth_code", "verifier", io_mock)
self.assertIsNone(api_key)
io_mock.tool_error.assert_any_call("Error: 'key' not found in OpenRouter response.")
io_mock.tool_error.assert_any_call('Response: {"other_data": "value"}')
@patch("requests.post")
def test_exchange_code_for_key_http_error(self, mock_post):
"""Test code exchange with HTTP error."""
mock_response = MagicMock()
mock_response.status_code = 400
mock_response.reason = "Bad Request"
mock_response.text = '{"error": "invalid_code"}'
http_error = requests.exceptions.HTTPError(response=mock_response)
mock_post.side_effect = http_error
io_mock = DummyIO()
io_mock.tool_error = MagicMock()
api_key = exchange_code_for_key("auth_code", "verifier", io_mock)
self.assertIsNone(api_key)
io_mock.tool_error.assert_any_call(
"Error exchanging code for OpenRouter key: 400 Bad Request"
)
io_mock.tool_error.assert_any_call('Response: {"error": "invalid_code"}')
@patch("requests.post")
def test_exchange_code_for_key_timeout(self, mock_post):
"""Test code exchange with timeout."""
mock_post.side_effect = requests.exceptions.Timeout("Timeout")
io_mock = DummyIO()
io_mock.tool_error = MagicMock()
api_key = exchange_code_for_key("auth_code", "verifier", io_mock)
self.assertIsNone(api_key)
io_mock.tool_error.assert_called_once_with(
"Error: Request to OpenRouter timed out during code exchange."
)
@patch("requests.post")
def test_exchange_code_for_key_request_exception(self, mock_post):
"""Test code exchange with general request exception."""
req_exception = requests.exceptions.RequestException("Network Error")
mock_post.side_effect = req_exception
io_mock = DummyIO()
io_mock.tool_error = MagicMock()
api_key = exchange_code_for_key("auth_code", "verifier", io_mock)
self.assertIsNone(api_key)
io_mock.tool_error.assert_called_once_with(
f"Error exchanging code for OpenRouter key: {req_exception}"
)
# --- Tests for select_default_model ---
@patch("aider.onboarding.try_to_select_default_model", return_value="gpt-4o")
@patch("aider.onboarding.offer_openrouter_oauth")
def test_select_default_model_already_specified(self, mock_offer_oauth, mock_try_select):
"""Test select_default_model returns args.model if provided."""
args = argparse.Namespace(model="specific-model")
io_mock = DummyIO()
analytics_mock = DummyAnalytics()
selected_model = select_default_model(args, io_mock, analytics_mock)
self.assertEqual(selected_model, "specific-model")
mock_try_select.assert_not_called()
mock_offer_oauth.assert_not_called()
@patch("aider.onboarding.try_to_select_default_model", return_value="gpt-4o")
@patch("aider.onboarding.offer_openrouter_oauth")
def test_select_default_model_found_via_env(self, mock_offer_oauth, mock_try_select):
"""Test select_default_model returns model found by try_to_select."""
args = argparse.Namespace(model=None) # No model specified
io_mock = DummyIO()
io_mock.tool_warning = MagicMock() # Track warnings
analytics_mock = DummyAnalytics()
analytics_mock.event = MagicMock() # Track events
selected_model = select_default_model(args, io_mock, analytics_mock)
self.assertEqual(selected_model, "gpt-4o")
mock_try_select.assert_called_once()
io_mock.tool_warning.assert_called_once_with(
"Using gpt-4o model with API key from environment."
)
analytics_mock.event.assert_called_once_with("auto_model_selection", model="gpt-4o")
mock_offer_oauth.assert_not_called()
@patch(
"aider.onboarding.try_to_select_default_model", side_effect=[None, None]
) # Fails first, fails after oauth attempt
@patch(
"aider.onboarding.offer_openrouter_oauth", return_value=False
) # OAuth offered but fails/declined
def test_select_default_model_no_keys_oauth_fail(self, mock_offer_oauth, mock_try_select):
"""Test select_default_model offers OAuth when no keys, but OAuth fails."""
args = argparse.Namespace(model=None)
io_mock = DummyIO()
io_mock.tool_warning = MagicMock()
io_mock.offer_url = MagicMock()
analytics_mock = DummyAnalytics()
selected_model = select_default_model(args, io_mock, analytics_mock)
self.assertIsNone(selected_model)
self.assertEqual(mock_try_select.call_count, 2) # Called before and after oauth attempt
mock_offer_oauth.assert_called_once_with(io_mock, analytics_mock)
io_mock.tool_warning.assert_called_once_with(
"No LLM model was specified and no API keys were provided."
)
io_mock.offer_url.assert_called_once() # Should offer docs URL
@patch(
"aider.onboarding.try_to_select_default_model",
side_effect=[None, "openrouter/google/gemini-2.5-pro-exp-03-25:free"],
) # Fails first, succeeds after oauth
@patch(
"aider.onboarding.offer_openrouter_oauth", return_value=True
) # OAuth offered and succeeds
def test_select_default_model_no_keys_oauth_success(self, mock_offer_oauth, mock_try_select):
"""Test select_default_model offers OAuth, which succeeds."""
args = argparse.Namespace(model=None)
io_mock = DummyIO()
io_mock.tool_warning = MagicMock()
analytics_mock = DummyAnalytics()
selected_model = select_default_model(args, io_mock, analytics_mock)
self.assertEqual(selected_model, "openrouter/google/gemini-2.5-pro-exp-03-25:free")
self.assertEqual(mock_try_select.call_count, 2) # Called before and after oauth
mock_offer_oauth.assert_called_once_with(io_mock, analytics_mock)
# Only one warning is expected: "No LLM model..."
self.assertEqual(io_mock.tool_warning.call_count, 1)
io_mock.tool_warning.assert_called_once_with(
"No LLM model was specified and no API keys were provided."
)
# The second call to try_select finds the model, so the *outer* function logs the usage.
# Note: The warning comes from the second call within select_default_model,
# not try_select itself.
# We verify the final state and model returned.
# --- Tests for offer_openrouter_oauth ---
@patch("aider.onboarding.start_openrouter_oauth_flow", return_value="new_or_key")
@patch.dict(os.environ, {}, clear=True) # Ensure no key exists initially
def test_offer_openrouter_oauth_confirm_yes_success(self, mock_start_oauth):
"""Test offer_openrouter_oauth when user confirms and OAuth succeeds."""
io_mock = DummyIO()
io_mock.confirm_ask = MagicMock(return_value=True) # User says yes
analytics_mock = DummyAnalytics()
analytics_mock.event = MagicMock()
result = offer_openrouter_oauth(io_mock, analytics_mock)
self.assertTrue(result)
io_mock.confirm_ask.assert_called_once()
mock_start_oauth.assert_called_once_with(io_mock, analytics_mock)
self.assertEqual(os.environ.get("OPENROUTER_API_KEY"), "new_or_key")
analytics_mock.event.assert_any_call("oauth_flow_initiated", provider="openrouter")
analytics_mock.event.assert_any_call("oauth_flow_success")
# Clean up env var
del os.environ["OPENROUTER_API_KEY"]
@patch("aider.onboarding.start_openrouter_oauth_flow", return_value=None) # OAuth fails
@patch.dict(os.environ, {}, clear=True)
def test_offer_openrouter_oauth_confirm_yes_fail(self, mock_start_oauth):
"""Test offer_openrouter_oauth when user confirms but OAuth fails."""
io_mock = DummyIO()
io_mock.confirm_ask = MagicMock(return_value=True) # User says yes
io_mock.tool_error = MagicMock()
analytics_mock = DummyAnalytics()
analytics_mock.event = MagicMock()
result = offer_openrouter_oauth(io_mock, analytics_mock)
self.assertFalse(result)
io_mock.confirm_ask.assert_called_once()
mock_start_oauth.assert_called_once_with(io_mock, analytics_mock)
self.assertNotIn("OPENROUTER_API_KEY", os.environ)
io_mock.tool_error.assert_called_once_with(
"OpenRouter authentication did not complete successfully."
)
analytics_mock.event.assert_any_call("oauth_flow_initiated", provider="openrouter")
analytics_mock.event.assert_any_call("oauth_flow_failure")
@patch("aider.onboarding.start_openrouter_oauth_flow")
def test_offer_openrouter_oauth_confirm_no(self, mock_start_oauth):
"""Test offer_openrouter_oauth when user declines."""
io_mock = DummyIO()
io_mock.confirm_ask = MagicMock(return_value=False) # User says no
analytics_mock = DummyAnalytics()
analytics_mock.event = MagicMock()
result = offer_openrouter_oauth(io_mock, analytics_mock)
self.assertFalse(result)
io_mock.confirm_ask.assert_called_once()
mock_start_oauth.assert_not_called()
analytics_mock.event.assert_not_called() # No OAuth events if declined
# --- More complex test for start_openrouter_oauth_flow (simplified) ---
# This test focuses on the successful path, mocking heavily
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,609 @@
import unittest
from unittest.mock import MagicMock, patch
from aider.coders.base_coder import Coder
from aider.dump import dump # noqa
from aider.io import InputOutput
from aider.models import Model
from aider.reasoning_tags import (
REASONING_END,
REASONING_START,
remove_reasoning_content,
)
class TestReasoning(unittest.TestCase):
def test_send_with_reasoning_content(self):
"""Test that reasoning content is properly formatted and output."""
# Setup IO with no pretty
io = InputOutput(pretty=False)
io.assistant_output = MagicMock()
# Setup model and coder
model = Model("gpt-3.5-turbo")
coder = Coder.create(model, None, io=io, stream=False)
# Test data
reasoning_content = "My step-by-step reasoning process"
main_content = "Final answer after reasoning"
# Mock completion response with reasoning content
class MockCompletion:
def __init__(self, content, reasoning_content):
self.content = content
# Add required attributes expected by show_send_output
self.choices = [MagicMock()]
self.choices[0].message.content = content
self.choices[0].message.reasoning_content = reasoning_content
self.finish_reason = "stop"
mock_completion = MockCompletion(main_content, reasoning_content)
# Create a mock hash object
mock_hash = MagicMock()
mock_hash.hexdigest.return_value = "mock_hash_digest"
# Mock the model's send_completion method to return the expected tuple format
with patch.object(model, "send_completion", return_value=(mock_hash, mock_completion)):
# Call send with a simple message
messages = [{"role": "user", "content": "test prompt"}]
list(coder.send(messages))
# Now verify ai_output was called with the right content
io.assistant_output.assert_called_once()
output = io.assistant_output.call_args[0][0]
dump(output)
# Output should contain formatted reasoning tags
self.assertIn(REASONING_START, output)
self.assertIn(REASONING_END, output)
# Output should include both reasoning and main content
self.assertIn(reasoning_content, output)
self.assertIn(main_content, output)
# Verify that partial_response_content only contains the main content
coder.remove_reasoning_content()
self.assertEqual(coder.partial_response_content.strip(), main_content.strip())
# Ensure proper order: reasoning first, then main content
reasoning_pos = output.find(reasoning_content)
main_pos = output.find(main_content)
self.assertLess(
reasoning_pos, main_pos, "Reasoning content should appear before main content"
)
def test_send_with_reasoning_content_stream(self):
"""Test that streaming reasoning content is properly formatted and output."""
# Setup IO with pretty output for streaming
io = InputOutput(pretty=True)
mock_mdstream = MagicMock()
io.get_assistant_mdstream = MagicMock(return_value=mock_mdstream)
# Setup model and coder
model = Model("gpt-3.5-turbo")
coder = Coder.create(model, None, io=io, stream=True)
# Ensure the coder shows pretty output
coder.show_pretty = MagicMock(return_value=True)
# Mock streaming response chunks
class MockStreamingChunk:
def __init__(
self, content=None, reasoning_content=None, reasoning=None, finish_reason=None
):
self.choices = [MagicMock()]
self.choices[0].delta = MagicMock()
self.choices[0].finish_reason = finish_reason
# Set content if provided
if content is not None:
self.choices[0].delta.content = content
else:
# Need to handle attribute access that would raise AttributeError
delattr(self.choices[0].delta, "content")
# Set reasoning_content if provided
if reasoning_content is not None:
self.choices[0].delta.reasoning_content = reasoning_content
else:
# Need to handle attribute access that would raise AttributeError
delattr(self.choices[0].delta, "reasoning_content")
# Set reasoning if provided
if reasoning is not None:
self.choices[0].delta.reasoning = reasoning
else:
# Need to handle attribute access that would raise AttributeError
delattr(self.choices[0].delta, "reasoning")
# Create chunks to simulate streaming
chunks = [
# First chunk with reasoning content starts the tag
MockStreamingChunk(reasoning_content="My step-by-step "),
# Additional reasoning content
MockStreamingChunk(reasoning_content="reasoning process"),
# Switch to main content - this will automatically end the reasoning tag
MockStreamingChunk(content="Final "),
# More main content
MockStreamingChunk(content="answer "),
MockStreamingChunk(content="after reasoning"),
# End the response
MockStreamingChunk(finish_reason="stop"),
]
# Create a mock hash object
mock_hash = MagicMock()
mock_hash.hexdigest.return_value = "mock_hash_digest"
# Mock the model's send_completion to return the hash and completion
with (
patch.object(model, "send_completion", return_value=(mock_hash, chunks)),
patch.object(model, "token_count", return_value=10),
): # Mock token count to avoid serialization issues
# Set mdstream directly on the coder object
coder.mdstream = mock_mdstream
# Call send with a simple message
messages = [{"role": "user", "content": "test prompt"}]
list(coder.send(messages))
# Verify mdstream.update was called multiple times
mock_mdstream.update.assert_called()
coder.live_incremental_response(True)
# Explicitly get all calls to update
update_calls = mock_mdstream.update.call_args_list
# There should be at least two calls - one for streaming and one final
self.assertGreaterEqual(
len(update_calls), 2, "Should have at least two calls to update (streaming + final)"
)
# Check that at least one call has final=True (should be the last one)
has_final_true = any(call[1].get("final", False) for call in update_calls)
self.assertTrue(has_final_true, "At least one update call should have final=True")
# Get the text from the last update call
final_text = update_calls[-1][0][0]
# The final text should include both reasoning and main content with proper formatting
self.assertIn(REASONING_START, final_text)
self.assertIn("My step-by-step reasoning process", final_text)
self.assertIn(REASONING_END, final_text)
self.assertIn("Final answer after reasoning", final_text)
# Ensure proper order: reasoning first, then main content
reasoning_pos = final_text.find("My step-by-step reasoning process")
main_pos = final_text.find("Final answer after reasoning")
self.assertLess(
reasoning_pos, main_pos, "Reasoning content should appear before main content"
)
# Verify that partial_response_content only contains the main content
coder.remove_reasoning_content()
expected_content = "Final answer after reasoning"
self.assertEqual(coder.partial_response_content.strip(), expected_content)
def test_send_with_think_tags(self):
"""Test that <think> tags are properly processed and formatted."""
# Setup IO with no pretty
io = InputOutput(pretty=False)
io.assistant_output = MagicMock()
# Setup model and coder
model = Model("gpt-3.5-turbo")
model.reasoning_tag = "think" # Set to remove <think> tags
coder = Coder.create(model, None, io=io, stream=False)
# Test data
reasoning_content = "My step-by-step reasoning process"
main_content = "Final answer after reasoning"
# Create content with think tags
combined_content = f"""<think>
{reasoning_content}
</think>
{main_content}"""
# Mock completion response with think tags in content
class MockCompletion:
def __init__(self, content):
self.content = content
# Add required attributes expected by show_send_output
self.choices = [MagicMock()]
self.choices[0].message.content = content
self.choices[0].message.reasoning_content = None # No separate reasoning_content
self.finish_reason = "stop"
mock_completion = MockCompletion(combined_content)
# Create a mock hash object
mock_hash = MagicMock()
mock_hash.hexdigest.return_value = "mock_hash_digest"
# Mock the model's send_completion method to return the expected tuple format
with patch.object(model, "send_completion", return_value=(mock_hash, mock_completion)):
# Call send with a simple message
messages = [{"role": "user", "content": "test prompt"}]
list(coder.send(messages))
# Now verify ai_output was called with the right content
io.assistant_output.assert_called_once()
output = io.assistant_output.call_args[0][0]
dump(output)
# Output should contain formatted reasoning tags
self.assertIn(REASONING_START, output)
self.assertIn(REASONING_END, output)
# Output should include both reasoning and main content
self.assertIn(reasoning_content, output)
self.assertIn(main_content, output)
# Ensure proper order: reasoning first, then main content
reasoning_pos = output.find(reasoning_content)
main_pos = output.find(main_content)
self.assertLess(
reasoning_pos, main_pos, "Reasoning content should appear before main content"
)
# Verify that partial_response_content only contains the main content
coder.remove_reasoning_content()
self.assertEqual(coder.partial_response_content.strip(), main_content.strip())
def test_send_with_think_tags_stream(self):
"""Test that streaming with <think> tags is properly processed and formatted."""
# Setup IO with pretty output for streaming
io = InputOutput(pretty=True)
mock_mdstream = MagicMock()
io.get_assistant_mdstream = MagicMock(return_value=mock_mdstream)
# Setup model and coder
model = Model("gpt-3.5-turbo")
model.reasoning_tag = "think" # Set to remove <think> tags
coder = Coder.create(model, None, io=io, stream=True)
# Ensure the coder shows pretty output
coder.show_pretty = MagicMock(return_value=True)
# Mock streaming response chunks
class MockStreamingChunk:
def __init__(
self, content=None, reasoning_content=None, reasoning=None, finish_reason=None
):
self.choices = [MagicMock()]
self.choices[0].delta = MagicMock()
self.choices[0].finish_reason = finish_reason
# Set content if provided
if content is not None:
self.choices[0].delta.content = content
else:
# Need to handle attribute access that would raise AttributeError
delattr(self.choices[0].delta, "content")
# Set reasoning_content if provided
if reasoning_content is not None:
self.choices[0].delta.reasoning_content = reasoning_content
else:
# Need to handle attribute access that would raise AttributeError
delattr(self.choices[0].delta, "reasoning_content")
# Set reasoning if provided
if reasoning is not None:
self.choices[0].delta.reasoning = reasoning
else:
# Need to handle attribute access that would raise AttributeError
delattr(self.choices[0].delta, "reasoning")
# Create chunks to simulate streaming with think tags
chunks = [
# Start with open think tag
MockStreamingChunk(content="<think>\n", reasoning_content=None),
# Reasoning content inside think tags
MockStreamingChunk(content="My step-by-step ", reasoning_content=None),
MockStreamingChunk(content="reasoning process\n", reasoning_content=None),
# Close think tag
MockStreamingChunk(content="</think>\n\n", reasoning_content=None),
# Main content
MockStreamingChunk(content="Final ", reasoning_content=None),
MockStreamingChunk(content="answer ", reasoning_content=None),
MockStreamingChunk(content="after reasoning", reasoning_content=None),
# End the response
MockStreamingChunk(finish_reason="stop"),
]
# Create a mock hash object
mock_hash = MagicMock()
mock_hash.hexdigest.return_value = "mock_hash_digest"
# Mock the model's send_completion to return the hash and completion
with patch.object(model, "send_completion", return_value=(mock_hash, chunks)):
# Set mdstream directly on the coder object
coder.mdstream = mock_mdstream
# Call send with a simple message
messages = [{"role": "user", "content": "test prompt"}]
list(coder.send(messages))
# Verify mdstream.update was called multiple times
mock_mdstream.update.assert_called()
coder.live_incremental_response(True)
# Explicitly get all calls to update
update_calls = mock_mdstream.update.call_args_list
# There should be at least two calls - one for streaming and one final
self.assertGreaterEqual(
len(update_calls), 2, "Should have at least two calls to update (streaming + final)"
)
# Check that at least one call has final=True (should be the last one)
has_final_true = any(call[1].get("final", False) for call in update_calls)
self.assertTrue(has_final_true, "At least one update call should have final=True")
# Get the text from the last update call
final_text = update_calls[-1][0][0]
# The final text should include both reasoning and main content with proper formatting
self.assertIn(REASONING_START, final_text)
self.assertIn("My step-by-step reasoning process", final_text)
self.assertIn(REASONING_END, final_text)
self.assertIn("Final answer after reasoning", final_text)
# Ensure proper order: reasoning first, then main content
reasoning_pos = final_text.find("My step-by-step reasoning process")
main_pos = final_text.find("Final answer after reasoning")
self.assertLess(
reasoning_pos, main_pos, "Reasoning content should appear before main content"
)
def test_remove_reasoning_content(self):
"""Test the remove_reasoning_content function from reasoning_tags module."""
# Test with no removal configured
text = "Here is <think>some reasoning</think> and regular text"
self.assertEqual(remove_reasoning_content(text, None), text)
# Test with removal configured
text = """Here is some text
<think>
This is reasoning that should be removed
Over multiple lines
</think>
And more text here"""
expected = """Here is some text
And more text here"""
self.assertEqual(remove_reasoning_content(text, "think"), expected)
# Test with multiple reasoning blocks
text = """Start
<think>Block 1</think>
Middle
<think>Block 2</think>
End"""
expected = """Start
Middle
End"""
self.assertEqual(remove_reasoning_content(text, "think"), expected)
# Test with no reasoning blocks
text = "Just regular text"
self.assertEqual(remove_reasoning_content(text, "think"), text)
def test_send_with_reasoning(self):
"""Test that reasoning content from the 'reasoning' attribute is properly formatted
and output."""
# Setup IO with no pretty
io = InputOutput(pretty=False)
io.assistant_output = MagicMock()
# Setup model and coder
model = Model("gpt-3.5-turbo")
coder = Coder.create(model, None, io=io, stream=False)
# Test data
reasoning_content = "My step-by-step reasoning process"
main_content = "Final answer after reasoning"
# Mock completion response with reasoning content
class MockCompletion:
def __init__(self, content, reasoning):
self.content = content
# Add required attributes expected by show_send_output
self.choices = [MagicMock()]
self.choices[0].message.content = content
self.choices[0].message.reasoning = (
reasoning # Using reasoning instead of reasoning_content
)
delattr(self.choices[0].message, "reasoning_content")
self.finish_reason = "stop"
mock_completion = MockCompletion(main_content, reasoning_content)
# Create a mock hash object
mock_hash = MagicMock()
mock_hash.hexdigest.return_value = "mock_hash_digest"
# Mock the model's send_completion method to return the expected tuple format
with patch.object(model, "send_completion", return_value=(mock_hash, mock_completion)):
# Call send with a simple message
messages = [{"role": "user", "content": "test prompt"}]
list(coder.send(messages))
# Now verify ai_output was called with the right content
io.assistant_output.assert_called_once()
output = io.assistant_output.call_args[0][0]
dump(output)
# Output should contain formatted reasoning tags
self.assertIn(REASONING_START, output)
self.assertIn(REASONING_END, output)
# Output should include both reasoning and main content
self.assertIn(reasoning_content, output)
self.assertIn(main_content, output)
# Verify that partial_response_content only contains the main content
coder.remove_reasoning_content()
self.assertEqual(coder.partial_response_content.strip(), main_content.strip())
# Ensure proper order: reasoning first, then main content
reasoning_pos = output.find(reasoning_content)
main_pos = output.find(main_content)
self.assertLess(
reasoning_pos, main_pos, "Reasoning content should appear before main content"
)
def test_send_with_reasoning_stream(self):
"""Test that streaming reasoning content from the 'reasoning' attribute is properly
formatted and output."""
# Setup IO with pretty output for streaming
io = InputOutput(pretty=True)
mock_mdstream = MagicMock()
io.get_assistant_mdstream = MagicMock(return_value=mock_mdstream)
# Setup model and coder
model = Model("gpt-3.5-turbo")
coder = Coder.create(model, None, io=io, stream=True)
# Ensure the coder shows pretty output
coder.show_pretty = MagicMock(return_value=True)
# Mock streaming response chunks
class MockStreamingChunk:
def __init__(
self, content=None, reasoning_content=None, reasoning=None, finish_reason=None
):
self.choices = [MagicMock()]
self.choices[0].delta = MagicMock()
self.choices[0].finish_reason = finish_reason
# Set content if provided
if content is not None:
self.choices[0].delta.content = content
else:
# Need to handle attribute access that would raise AttributeError
delattr(self.choices[0].delta, "content")
# Set reasoning_content if provided
if reasoning_content is not None:
self.choices[0].delta.reasoning_content = reasoning_content
else:
# Need to handle attribute access that would raise AttributeError
delattr(self.choices[0].delta, "reasoning_content")
# Set reasoning if provided
if reasoning is not None:
self.choices[0].delta.reasoning = reasoning
else:
# Need to handle attribute access that would raise AttributeError
delattr(self.choices[0].delta, "reasoning")
# Create chunks to simulate streaming - using reasoning attribute instead of
# reasoning_content
chunks = [
# First chunk with reasoning content starts the tag
MockStreamingChunk(reasoning="My step-by-step "),
# Additional reasoning content
MockStreamingChunk(reasoning="reasoning process"),
# Switch to main content - this will automatically end the reasoning tag
MockStreamingChunk(content="Final "),
# More main content
MockStreamingChunk(content="answer "),
MockStreamingChunk(content="after reasoning"),
# End the response
MockStreamingChunk(finish_reason="stop"),
]
# Create a mock hash object
mock_hash = MagicMock()
mock_hash.hexdigest.return_value = "mock_hash_digest"
# Mock the model's send_completion to return the hash and completion
with (
patch.object(model, "send_completion", return_value=(mock_hash, chunks)),
patch.object(model, "token_count", return_value=10),
): # Mock token count to avoid serialization issues
# Set mdstream directly on the coder object
coder.mdstream = mock_mdstream
# Call send with a simple message
messages = [{"role": "user", "content": "test prompt"}]
list(coder.send(messages))
# Verify mdstream.update was called multiple times
mock_mdstream.update.assert_called()
coder.live_incremental_response(True)
# Explicitly get all calls to update
update_calls = mock_mdstream.update.call_args_list
# There should be at least two calls - one for streaming and one final
self.assertGreaterEqual(
len(update_calls), 2, "Should have at least two calls to update (streaming + final)"
)
# Check that at least one call has final=True (should be the last one)
has_final_true = any(call[1].get("final", False) for call in update_calls)
self.assertTrue(has_final_true, "At least one update call should have final=True")
# Get the text from the last update call
final_text = update_calls[-1][0][0]
# The final text should include both reasoning and main content with proper formatting
self.assertIn(REASONING_START, final_text)
self.assertIn("My step-by-step reasoning process", final_text)
self.assertIn(REASONING_END, final_text)
self.assertIn("Final answer after reasoning", final_text)
# Ensure proper order: reasoning first, then main content
reasoning_pos = final_text.find("My step-by-step reasoning process")
main_pos = final_text.find("Final answer after reasoning")
self.assertLess(
reasoning_pos, main_pos, "Reasoning content should appear before main content"
)
# Verify that partial_response_content only contains the main content
coder.remove_reasoning_content()
expected_content = "Final answer after reasoning"
self.assertEqual(coder.partial_response_content.strip(), expected_content)
@patch("aider.models.litellm.completion")
def test_simple_send_with_retries_removes_reasoning(self, mock_completion):
"""Test that simple_send_with_retries correctly removes reasoning content."""
model = Model("deepseek-r1") # This model has reasoning_tag="think"
# Mock the completion response
mock_response = MagicMock()
mock_response.choices = [MagicMock(message=MagicMock(content="""Here is some text
<think>
This reasoning should be removed
</think>
And this text should remain"""))]
mock_completion.return_value = mock_response
messages = [{"role": "user", "content": "test"}]
result = model.simple_send_with_retries(messages)
expected = """Here is some text
And this text should remain"""
self.assertEqual(result, expected)
# Verify the completion was called
mock_completion.assert_called_once()
if __name__ == "__main__":
unittest.main()

View file

@ -405,3 +405,51 @@ class TestRepo(unittest.TestCase):
git_repo = GitRepo(InputOutput(), None, None)
git_repo.commit(fnames=[str(fname)])
def test_git_commit_verify(self):
"""Test that git_commit_verify controls whether --no-verify is passed to git commit"""
# Skip on Windows as hook execution works differently
if platform.system() == "Windows":
return
with GitTemporaryDirectory():
# Create a new repo
raw_repo = git.Repo()
# Create a file to commit
fname = Path("test_file.txt")
fname.write_text("initial content")
raw_repo.git.add(str(fname))
# Do the initial commit
raw_repo.git.commit("-m", "Initial commit")
# Now create a pre-commit hook that always fails
hooks_dir = Path(raw_repo.git_dir) / "hooks"
hooks_dir.mkdir(exist_ok=True)
pre_commit_hook = hooks_dir / "pre-commit"
pre_commit_hook.write_text("#!/bin/sh\nexit 1\n") # Always fail
pre_commit_hook.chmod(0o755) # Make executable
# Modify the file
fname.write_text("modified content")
# Create GitRepo with verify=True (default)
io = InputOutput()
git_repo_verify = GitRepo(io, None, None, git_commit_verify=True)
# Attempt to commit - should fail due to pre-commit hook
commit_result = git_repo_verify.commit(fnames=[str(fname)], message="Should fail")
self.assertIsNone(commit_result)
# Create GitRepo with verify=False
git_repo_no_verify = GitRepo(io, None, None, git_commit_verify=False)
# Attempt to commit - should succeed by bypassing the hook
commit_result = git_repo_no_verify.commit(fnames=[str(fname)], message="Should succeed")
self.assertIsNotNone(commit_result)
# Verify the commit was actually made
latest_commit_msg = raw_repo.head.commit.message
self.assertEqual(latest_commit_msg.strip(), "Should succeed")

View file

@ -282,74 +282,145 @@ class TestRepoMapTypescript(unittest.TestCase):
class TestRepoMapAllLanguages(unittest.TestCase):
def setUp(self):
self.GPT35 = Model("gpt-3.5-turbo")
self.fixtures_dir = Path(__file__).parent.parent / "fixtures" / "languages"
def test_get_repo_map_all_languages(self):
language_files = {
"c": ("c", "main"),
"cpp": ("cpp", "main"),
"elixir": ("ex", "Greeter"),
"java": ("java", "Greeting"),
"javascript": ("js", "Person"),
"kotlin": ("kt", "Greeting"),
"ocaml": ("ml", "Greeter"),
"php": ("php", "greet"),
"python": ("py", "Person"),
"ql": ("ql", "greet"),
"ruby": ("rb", "greet"),
"rust": ("rs", "Person"),
"typescript": ("ts", "greet"),
"tsx": ("tsx", "UserProps"),
"csharp": ("cs", "IGreeter"),
"elisp": ("el", "greeter"),
"elm": ("elm", "Person"),
"go": ("go", "Greeter"),
"hcl": ("tf", "aws_vpc"),
}
def test_language_c(self):
self._test_language_repo_map("c", "c", "main")
fixtures_dir = Path(__file__).parent.parent / "fixtures" / "languages"
def test_language_cpp(self):
self._test_language_repo_map("cpp", "cpp", "main")
for lang, key_symbol in language_files.items():
# Get the fixture file path and name based on language
fixture_dir = fixtures_dir / lang
ext, key_symbol = language_files[lang]
filename = f"test.{ext}"
fixture_path = fixture_dir / filename
self.assertTrue(
fixture_path.exists(), f"Fixture file missing for {lang}: {fixture_path}"
def test_language_d(self):
self._test_language_repo_map("d", "d", "main")
def test_language_dart(self):
self._test_language_repo_map("dart", "dart", "Person")
def test_language_elixir(self):
self._test_language_repo_map("elixir", "ex", "Greeter")
def test_language_gleam(self):
self._test_language_repo_map("gleam", "gleam", "greet")
def test_language_java(self):
self._test_language_repo_map("java", "java", "Greeting")
def test_language_javascript(self):
self._test_language_repo_map("javascript", "js", "Person")
def test_language_kotlin(self):
self._test_language_repo_map("kotlin", "kt", "Greeting")
def test_language_lua(self):
self._test_language_repo_map("lua", "lua", "greet")
# "ocaml": ("ml", "Greeter"), # not supported in tsl-pack (yet?)
def test_language_php(self):
self._test_language_repo_map("php", "php", "greet")
def test_language_python(self):
self._test_language_repo_map("python", "py", "Person")
# "ql": ("ql", "greet"), # not supported in tsl-pack (yet?)
def test_language_ruby(self):
self._test_language_repo_map("ruby", "rb", "greet")
def test_language_rust(self):
self._test_language_repo_map("rust", "rs", "Person")
def test_language_typescript(self):
self._test_language_repo_map("typescript", "ts", "greet")
def test_language_tsx(self):
self._test_language_repo_map("tsx", "tsx", "UserProps")
def test_language_csharp(self):
self._test_language_repo_map("csharp", "cs", "IGreeter")
def test_language_elisp(self):
self._test_language_repo_map("elisp", "el", "greeter")
def test_language_elm(self):
self._test_language_repo_map("elm", "elm", "Person")
def test_language_go(self):
self._test_language_repo_map("go", "go", "Greeter")
def test_language_hcl(self):
self._test_language_repo_map("hcl", "tf", "aws_vpc")
def test_language_arduino(self):
self._test_language_repo_map("arduino", "ino", "setup")
def test_language_chatito(self):
self._test_language_repo_map("chatito", "chatito", "intent")
def test_language_commonlisp(self):
self._test_language_repo_map("commonlisp", "lisp", "greet")
def test_language_pony(self):
self._test_language_repo_map("pony", "pony", "Greeter")
def test_language_properties(self):
self._test_language_repo_map("properties", "properties", "database.url")
def test_language_r(self):
self._test_language_repo_map("r", "r", "calculate")
def test_language_racket(self):
self._test_language_repo_map("racket", "rkt", "greet")
def test_language_solidity(self):
self._test_language_repo_map("solidity", "sol", "SimpleStorage")
def test_language_swift(self):
self._test_language_repo_map("swift", "swift", "Greeter")
def test_language_udev(self):
self._test_language_repo_map("udev", "rules", "USB_DRIVER")
def test_language_scala(self):
self._test_language_repo_map("scala", "scala", "Greeter")
def _test_language_repo_map(self, lang, key, symbol):
"""Helper method to test repo map generation for a specific language."""
# Get the fixture file path and name based on language
fixture_dir = self.fixtures_dir / lang
filename = f"test.{key}"
fixture_path = fixture_dir / filename
self.assertTrue(fixture_path.exists(), f"Fixture file missing for {lang}: {fixture_path}")
# Read the fixture content
with open(fixture_path, "r", encoding="utf-8") as f:
content = f.read()
with GitTemporaryDirectory() as temp_dir:
test_file = os.path.join(temp_dir, filename)
with open(test_file, "w", encoding="utf-8") as f:
f.write(content)
io = InputOutput()
repo_map = RepoMap(main_model=self.GPT35, root=temp_dir, io=io)
other_files = [test_file]
result = repo_map.get_repo_map([], other_files)
dump(lang)
dump(result)
self.assertGreater(len(result.strip().splitlines()), 1)
# Check if the result contains all the expected files and symbols
self.assertIn(
filename, result, f"File for language {lang} not found in repo map: {result}"
)
self.assertIn(
symbol,
result,
f"Key symbol '{symbol}' for language {lang} not found in repo map: {result}",
)
# Read the fixture content
with open(fixture_path, "r", encoding="utf-8") as f:
content = f.read()
with GitTemporaryDirectory() as temp_dir:
test_file = os.path.join(temp_dir, filename)
with open(test_file, "w", encoding="utf-8") as f:
f.write(content)
io = InputOutput()
repo_map = RepoMap(main_model=self.GPT35, root=temp_dir, io=io)
other_files = [filename]
result = repo_map.get_repo_map([], other_files)
dump(lang)
dump(result)
self.assertGreater(len(result.strip().splitlines()), 1)
# Check if the result contains all the expected files and symbols
self.assertIn(
filename, result, f"File for language {lang} not found in repo map: {result}"
)
self.assertIn(
key_symbol,
result,
(
f"Key symbol '{key_symbol}' for language {lang} not found in repo map:"
f" {result}"
),
)
# close the open cache files, so Windows won't error
del repo_map
# close the open cache files, so Windows won't error
del repo_map
def test_repo_map_sample_code_base(self):
# Path to the sample code base

View file

@ -8,8 +8,6 @@ from git import GitError, Repo
from aider import urls
from aider.main import sanity_check_repo
from aider.repo import GitRepo
from aider.io import InputOutput
@pytest.fixture
@ -184,41 +182,3 @@ def test_sanity_check_repo_with_no_repo(mock_io):
# Assert that no errors or outputs were logged
mock_io.tool_error.assert_not_called()
mock_io.tool_output.assert_not_called()
def corrupt_git_index(repo_path):
index_path = os.path.join(repo_path, ".git", "index")
with open(index_path, "r+b") as f:
# Verify the file has the correct signature
signature = f.read(4)
if signature != b"DIRC":
raise ValueError("Invalid git index file signature.")
# Seek to the data section and inject invalid bytes to simulate encoding error
f.seek(77)
f.write(b"\xF5" * 5)
def test_sanity_check_repo_with_corrupt_index(create_repo, mock_io):
repo_path, repo = create_repo
# Corrupt the Git index file
corrupt_git_index(repo_path)
# Create GitRepo instance
git_repo = GitRepo(InputOutput(), None, repo_path)
# Call the function
result = sanity_check_repo(git_repo, mock_io)
# Assert that the function returns False
assert result is False
# Assert that the appropriate error messages were logged
mock_io.tool_error.assert_called_with("Unable to read git repository, it may be corrupt?")
mock_io.tool_output.assert_called_with(
(
"Failed to read the Git repository. This issue is likely caused by a path encoded "
"in a format different from the expected encoding \"utf-8\".\n"
"Internal error: 'utf-8' codec can't decode byte 0xf5 in position 3: invalid start byte"
)
)

View file

@ -0,0 +1,84 @@
import os
from unittest import TestCase
from unittest.mock import MagicMock, patch
from prompt_toolkit.input import DummyInput
from prompt_toolkit.output import DummyOutput
from aider.main import main
class TestSSLVerification(TestCase):
def setUp(self):
self.original_env = os.environ.copy()
os.environ["OPENAI_API_KEY"] = "test-key"
os.environ["AIDER_CHECK_UPDATE"] = "false"
os.environ["AIDER_ANALYTICS"] = "false"
def tearDown(self):
os.environ.clear()
os.environ.update(self.original_env)
@patch("aider.io.InputOutput.offer_url")
@patch("aider.models.ModelInfoManager.set_verify_ssl")
@patch("aider.llm.litellm._load_litellm")
@patch("httpx.Client")
@patch("httpx.AsyncClient")
@patch("aider.models.fuzzy_match_models", return_value=[])
def test_no_verify_ssl_flag_sets_model_info_manager(
self,
mock_fuzzy_match,
mock_async_client,
mock_client,
mock_load_litellm,
mock_set_verify_ssl,
mock_offer_url,
):
# Prevent actual URL opening
mock_offer_url.return_value = False
# Mock the litellm._lazy_module to avoid AttributeError
mock_load_litellm.return_value = None
mock_module = MagicMock()
# Mock Model class to avoid actual model initialization
with patch("aider.models.Model") as mock_model:
# Configure the mock to avoid the TypeError
mock_model.return_value.info = {}
mock_model.return_value.validate_environment.return_value = {
"missing_keys": [],
"keys_in_environment": [],
}
with patch("aider.llm.litellm._lazy_module", mock_module):
# Run main with --no-verify-ssl flag
main(
["--no-verify-ssl", "--exit", "--yes"],
input=DummyInput(),
output=DummyOutput(),
)
# Verify model_info_manager.set_verify_ssl was called with False
mock_set_verify_ssl.assert_called_once_with(False)
# Verify httpx clients were created with verify=False
mock_client.assert_called_once_with(verify=False)
mock_async_client.assert_called_once_with(verify=False)
# Verify SSL_VERIFY environment variable was set to empty string
self.assertEqual(os.environ.get("SSL_VERIFY"), "")
@patch("aider.io.InputOutput.offer_url")
@patch("aider.models.model_info_manager.set_verify_ssl")
def test_default_ssl_verification(self, mock_set_verify_ssl, mock_offer_url):
# Prevent actual URL opening
mock_offer_url.return_value = False
# Run main without --no-verify-ssl flag
with patch("aider.main.InputOutput"):
with patch("aider.coders.Coder.create"):
main(["--exit", "--yes"], input=DummyInput(), output=DummyOutput())
# Verify model_info_manager.set_verify_ssl was not called
mock_set_verify_ssl.assert_not_called()
# Verify SSL_VERIFY environment variable was not set
self.assertNotIn("SSL_VERIFY", os.environ)

View file

@ -155,3 +155,12 @@ def test_ai_comment_pattern():
assert (
question_js_has_bang == "?"
), "Expected at least one bang (!) comment in watch_question.js fixture"
# Test Lisp fixture
lisp_path = fixtures_dir / "watch.lisp"
lisp_lines, lisp_comments, lisp_has_bang = watcher.get_ai_comments(str(lisp_path))
lisp_expected = 7
assert (
len(lisp_lines) == lisp_expected
), f"Expected {lisp_expected} AI comments in Lisp fixture, found {len(lisp_lines)}"
assert lisp_has_bang == "!", "Expected at least one bang (!) comment in Lisp fixture"