feat: Add streaming test for <think> tags in reasoning process

This commit is contained in:
Paul Gauthier (aider) 2025-03-07 17:16:50 -08:00
parent 80de3335b7
commit 883bf74bad

View file

@ -225,6 +225,98 @@ class TestReasoning(unittest.TestCase):
self.assertLess(
reasoning_pos, main_pos, "Reasoning content should appear before main content"
)
def test_send_with_think_tags_stream(self):
"""Test that streaming with <think> tags is properly processed and formatted."""
# Setup IO with pretty output for streaming
io = InputOutput(pretty=True)
mock_mdstream = MagicMock()
io.get_assistant_mdstream = MagicMock(return_value=mock_mdstream)
# Setup model and coder
model = Model("gpt-3.5-turbo")
model.remove_reasoning = "think" # Set to remove <think> tags
coder = Coder.create(model, None, io=io, stream=True)
# Ensure the coder shows pretty output
coder.show_pretty = MagicMock(return_value=True)
# Mock streaming response chunks
class MockStreamingChunk:
def __init__(self, content=None, finish_reason=None):
self.choices = [MagicMock()]
self.choices[0].delta = MagicMock()
self.choices[0].finish_reason = finish_reason
# Set content if provided
if content is not None:
self.choices[0].delta.content = content
else:
# Need to handle attribute access that would raise AttributeError
delattr(self.choices[0].delta, "content")
# Create chunks to simulate streaming with think tags
chunks = [
# Start with open think tag
MockStreamingChunk(content="<think>\n"),
# Reasoning content inside think tags
MockStreamingChunk(content="My step-by-step "),
MockStreamingChunk(content="reasoning process\n"),
# Close think tag
MockStreamingChunk(content="</think>\n\n"),
# Main content
MockStreamingChunk(content="Final "),
MockStreamingChunk(content="answer "),
MockStreamingChunk(content="after reasoning"),
# End the response
MockStreamingChunk(finish_reason="stop"),
]
# Create a mock hash object
mock_hash = MagicMock()
mock_hash.hexdigest.return_value = "mock_hash_digest"
# Mock the model's send_completion to return the hash and completion
with patch.object(model, "send_completion", return_value=(mock_hash, chunks)):
# Set mdstream directly on the coder object
coder.mdstream = mock_mdstream
# Call send with a simple message
messages = [{"role": "user", "content": "test prompt"}]
list(coder.send(messages))
# Verify mdstream.update was called multiple times
mock_mdstream.update.assert_called()
coder.live_incremental_response(True)
# Explicitly get all calls to update
update_calls = mock_mdstream.update.call_args_list
# There should be at least two calls - one for streaming and one final
self.assertGreaterEqual(
len(update_calls), 2, "Should have at least two calls to update (streaming + final)"
)
# Check that at least one call has final=True (should be the last one)
has_final_true = any(call[1].get("final", False) for call in update_calls)
self.assertTrue(has_final_true, "At least one update call should have final=True")
# Get the text from the last update call
final_text = update_calls[-1][0][0]
# The final text should include both reasoning and main content with proper formatting
self.assertIn("> Thinking ...", final_text)
self.assertIn("My step-by-step reasoning process", final_text)
self.assertIn("> ... done thinking", final_text)
self.assertIn("Final answer after reasoning", final_text)
# Ensure proper order: reasoning first, then main content
reasoning_pos = final_text.find("My step-by-step reasoning process")
main_pos = final_text.find("Final answer after reasoning")
self.assertLess(
reasoning_pos, main_pos, "Reasoning content should appear before main content"
)
if __name__ == "__main__":