aider/aider/coders/patch_coder.py

706 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pathlib
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Tuple
from .base_coder import Coder
from .patch_prompts import PatchPrompts
# --------------------------------------------------------------------------- #
# Domain objects & Exceptions (Adapted from apply_patch.py)
# --------------------------------------------------------------------------- #
class DiffError(ValueError):
"""Any problem detected while parsing or applying a patch."""
class ActionType(str, Enum):
ADD = "Add"
DELETE = "Delete"
UPDATE = "Update"
@dataclass
class Chunk:
orig_index: int = -1 # Line number in the *original* file block where the change starts
del_lines: List[str] = field(default_factory=list)
ins_lines: List[str] = field(default_factory=list)
@dataclass
class PatchAction:
type: ActionType
path: str
# For ADD:
new_content: Optional[str] = None
# For UPDATE:
chunks: List[Chunk] = field(default_factory=list)
move_path: Optional[str] = None
# Type alias for the return type of get_edits
EditResult = Tuple[str, PatchAction]
@dataclass
class Patch:
actions: Dict[str, PatchAction] = field(default_factory=dict)
fuzz: int = 0 # Track fuzziness used during parsing
# --------------------------------------------------------------------------- #
# Helper functions (Adapted from apply_patch.py)
# --------------------------------------------------------------------------- #
def _norm(line: str) -> str:
"""Strip CR so comparisons work for both LF and CRLF input."""
return line.rstrip("\r")
def find_context_core(lines: List[str], context: List[str], start: int) -> Tuple[int, int]:
"""Finds context block, returns start index and fuzz level."""
if not context:
return start, 0
# Exact match
for i in range(start, len(lines) - len(context) + 1):
if lines[i : i + len(context)] == context:
return i, 0
# Rstrip match
norm_context = [s.rstrip() for s in context]
for i in range(start, len(lines) - len(context) + 1):
if [s.rstrip() for s in lines[i : i + len(context)]] == norm_context:
return i, 1 # Fuzz level 1
# Strip match
norm_context_strip = [s.strip() for s in context]
for i in range(start, len(lines) - len(context) + 1):
if [s.strip() for s in lines[i : i + len(context)]] == norm_context_strip:
return i, 100 # Fuzz level 100
return -1, 0
def find_context(lines: List[str], context: List[str], start: int, eof: bool) -> Tuple[int, int]:
"""Finds context, handling EOF marker."""
if eof:
# If EOF marker, first try matching at the very end
if len(lines) >= len(context):
new_index, fuzz = find_context_core(lines, context, len(lines) - len(context))
if new_index != -1:
return new_index, fuzz
# If not found at end, search from `start` as fallback
new_index, fuzz = find_context_core(lines, context, start)
return new_index, fuzz + 10_000 # Add large fuzz penalty if EOF wasn't at end
# Normal case: search from `start`
return find_context_core(lines, context, start)
def peek_next_section(lines: List[str], index: int) -> Tuple[List[str], List[Chunk], int, bool]:
"""
Parses one section (context, -, + lines) of an Update block.
Returns: (context_lines, chunks_in_section, next_index, is_eof)
"""
context_lines: List[str] = []
del_lines: List[str] = []
ins_lines: List[str] = []
chunks: List[Chunk] = []
mode = "keep" # Start by expecting context lines
start_index = index
while index < len(lines):
line = lines[index]
norm_line = _norm(line)
# Check for section terminators
if norm_line.startswith(
(
"@@",
"*** End Patch",
"*** Update File:",
"*** Delete File:",
"*** Add File:",
"*** End of File", # Special terminator
)
):
break
if norm_line == "***": # Legacy/alternative terminator? Handle just in case.
break
if norm_line.startswith("***"): # Invalid line
raise DiffError(f"Invalid patch line found in update section: {line}")
index += 1
last_mode = mode
# Determine line type and strip prefix
if line.startswith("+"):
mode = "add"
line_content = line[1:]
elif line.startswith("-"):
mode = "delete"
line_content = line[1:]
elif line.startswith(" "):
mode = "keep"
line_content = line[1:]
elif line.strip() == "": # Treat blank lines in patch as context ' '
mode = "keep"
line_content = "" # Keep it as a blank line
else:
# Assume lines without prefix are context if format is loose,
# but strict format requires ' '. Raise error for strictness.
raise DiffError(f"Invalid line prefix in update section: {line}")
# If mode changes from add/delete back to keep, finalize the previous chunk
if mode == "keep" and last_mode != "keep":
if del_lines or ins_lines:
chunks.append(
Chunk(
# orig_index is relative to the start of the *context* block found
orig_index=len(context_lines) - len(del_lines),
del_lines=del_lines,
ins_lines=ins_lines,
)
)
del_lines, ins_lines = [], []
# Collect lines based on mode
if mode == "delete":
del_lines.append(line_content)
context_lines.append(line_content) # Deleted lines are part of the original context
elif mode == "add":
ins_lines.append(line_content)
elif mode == "keep":
context_lines.append(line_content)
# Finalize any pending chunk at the end of the section
if del_lines or ins_lines:
chunks.append(
Chunk(
orig_index=len(context_lines) - len(del_lines),
del_lines=del_lines,
ins_lines=ins_lines,
)
)
# Check for EOF marker
is_eof = False
if index < len(lines) and _norm(lines[index]) == "*** End of File":
index += 1
is_eof = True
if index == start_index and not is_eof: # Should not happen if patch is well-formed
raise DiffError("Empty patch section found.")
return context_lines, chunks, index, is_eof
def identify_files_needed(text: str) -> List[str]:
"""Extracts file paths from Update and Delete actions."""
lines = text.splitlines()
paths = set()
for line in lines:
norm_line = _norm(line)
if norm_line.startswith("*** Update File: "):
paths.add(norm_line[len("*** Update File: ") :].strip())
elif norm_line.startswith("*** Delete File: "):
paths.add(norm_line[len("*** Delete File: ") :].strip())
return list(paths)
# --------------------------------------------------------------------------- #
# PatchCoder Class Implementation
# --------------------------------------------------------------------------- #
class PatchCoder(Coder):
"""
A coder that uses a custom patch format for code modifications,
inspired by the format described in tmp.gpt41edits.txt.
Applies patches using logic adapted from the reference apply_patch.py script.
"""
edit_format = "patch"
gpt_prompts = PatchPrompts()
def get_edits(self) -> List[EditResult]:
"""
Parses the LLM response content (containing the patch) into a list of
tuples, where each tuple contains the file path and the PatchAction object.
"""
content = self.partial_response_content
if not content or not content.strip():
return []
# Check for patch sentinels
lines = content.splitlines()
if (
len(lines) < 2
or not _norm(lines[0]).startswith("*** Begin Patch")
# Allow flexible end, might be EOF or just end of stream
# or _norm(lines[-1]) != "*** End Patch"
):
# Tolerate missing sentinels if content looks like a patch action
is_patch_like = any(
_norm(line).startswith(
("@@", "*** Update File:", "*** Add File:", "*** Delete File:")
)
for line in lines
)
if not is_patch_like:
# If it doesn't even look like a patch, return empty
self.io.tool_warning("Response does not appear to be in patch format.")
return []
# If it looks like a patch but lacks sentinels, try parsing anyway but warn.
self.io.tool_warning(
"Patch format warning: Missing '*** Begin Patch'/'*** End Patch' sentinels."
)
start_index = 0
else:
start_index = 1 # Skip "*** Begin Patch"
# Identify files needed for context lookups during parsing
needed_paths = identify_files_needed(content)
current_files: Dict[str, str] = {}
for rel_path in needed_paths:
abs_path = self.abs_root_path(rel_path)
try:
# Use io.read_text to handle potential errors/encodings
file_content = self.io.read_text(abs_path)
if file_content is None:
raise DiffError(
f"File referenced in patch not found or could not be read: {rel_path}"
)
current_files[rel_path] = file_content
except FileNotFoundError:
raise DiffError(f"File referenced in patch not found: {rel_path}")
except IOError as e:
raise DiffError(f"Error reading file {rel_path}: {e}")
try:
# Parse the patch text using adapted logic
patch_obj = self._parse_patch_text(lines, start_index, current_files)
# Convert Patch object actions dict to a list of tuples (path, action)
# for compatibility with the base Coder's prepare_to_edit method.
results = []
for path, action in patch_obj.actions.items():
results.append((path, action))
return results
except DiffError as e:
# Raise as ValueError for consistency with other coders' error handling
raise ValueError(f"Error parsing patch content: {e}")
except Exception as e:
# Catch unexpected errors during parsing
raise ValueError(f"Unexpected error parsing patch: {e}")
def _parse_patch_text(
self, lines: List[str], start_index: int, current_files: Dict[str, str]
) -> Patch:
"""
Parses patch content lines into a Patch object.
Adapted from the Parser class in apply_patch.py.
"""
patch = Patch()
index = start_index
fuzz_accumulator = 0
while index < len(lines):
line = lines[index]
norm_line = _norm(line)
if norm_line == "*** End Patch":
index += 1
break # Successfully reached end
# ---------- UPDATE ---------- #
if norm_line.startswith("*** Update File: "):
path = norm_line[len("*** Update File: ") :].strip()
index += 1
if not path:
raise DiffError("Update File action missing path.")
# Optional move target
move_to = None
if index < len(lines) and _norm(lines[index]).startswith("*** Move to: "):
move_to = _norm(lines[index])[len("*** Move to: ") :].strip()
index += 1
if not move_to:
raise DiffError("Move to action missing path.")
if path not in current_files:
raise DiffError(f"Update File Error - missing file content for: {path}")
file_content = current_files[path]
existing_action = patch.actions.get(path)
if existing_action is not None:
# Merge additional UPDATE block into the existing one
if existing_action.type != ActionType.UPDATE:
raise DiffError(f"Conflicting actions for file: {path}")
new_action, index, fuzz = self._parse_update_file_sections(
lines, index, file_content
)
existing_action.chunks.extend(new_action.chunks)
if move_to:
if existing_action.move_path and existing_action.move_path != move_to:
raise DiffError(f"Conflicting move targets for file: {path}")
existing_action.move_path = move_to
fuzz_accumulator += fuzz
else:
# First UPDATE block for this file
action, index, fuzz = self._parse_update_file_sections(
lines, index, file_content
)
action.path = path
action.move_path = move_to
patch.actions[path] = action
fuzz_accumulator += fuzz
continue
# ---------- DELETE ---------- #
elif norm_line.startswith("*** Delete File: "):
path = norm_line[len("*** Delete File: ") :].strip()
index += 1
if not path:
raise DiffError("Delete File action missing path.")
existing_action = patch.actions.get(path)
if existing_action:
if existing_action.type == ActionType.DELETE:
# Duplicate delete ignore the extra block
self.io.tool_warning(f"Duplicate delete action for file: {path} ignored.")
continue
else:
raise DiffError(f"Conflicting actions for file: {path}")
if path not in current_files:
raise DiffError(
f"Delete File Error - file not found: {path}"
) # Check against known files
patch.actions[path] = PatchAction(type=ActionType.DELETE, path=path)
continue
# ---------- ADD ---------- #
elif norm_line.startswith("*** Add File: "):
path = norm_line[len("*** Add File: ") :].strip()
index += 1
if not path:
raise DiffError("Add File action missing path.")
if path in patch.actions:
raise DiffError(f"Duplicate action for file: {path}")
# Check if file exists in the context provided (should not for Add).
# Note: We only have needed files, a full check requires FS access.
# if path in current_files:
# raise DiffError(f"Add File Error - file already exists: {path}")
action, index = self._parse_add_file_content(lines, index)
action.path = path # Ensure path is set
patch.actions[path] = action
continue
# If we are here, the line is unexpected
# Allow blank lines between actions
if not norm_line.strip():
index += 1
continue
raise DiffError(f"Unknown or misplaced line while parsing patch: {line}")
# Check if we consumed the whole input or stopped early
# Tolerate missing "*** End Patch" if we processed actions
# if index < len(lines) and _norm(lines[index-1]) != "*** End Patch":
# raise DiffError("Patch parsing finished unexpectedly before end of input.")
patch.fuzz = fuzz_accumulator
return patch
def _parse_update_file_sections(
self, lines: List[str], index: int, file_content: str
) -> Tuple[PatchAction, int, int]:
"""Parses all sections (@@, context, -, +) for a single Update File action."""
action = PatchAction(type=ActionType.UPDATE, path="") # Path set by caller
orig_lines = file_content.splitlines() # Use splitlines for consistency
current_file_index = 0 # Track position in original file content
total_fuzz = 0
while index < len(lines):
norm_line = _norm(lines[index])
# Check for terminators for *this* file update
if norm_line.startswith(
(
"*** End Patch",
"*** Update File:",
"*** Delete File:",
"*** Add File:",
)
):
break # End of this file's update section
# Handle @@ scope lines (optional)
scope_lines = []
while index < len(lines) and _norm(lines[index]).startswith("@@"):
scope_line_content = lines[index][len("@@") :].strip()
if scope_line_content: # Ignore empty @@ lines?
scope_lines.append(scope_line_content)
index += 1
# Find the scope in the original file if specified
if scope_lines:
# Simple scope finding: search from current position
# A more robust finder could handle nested scopes like the reference @@ @@
found_scope = False
temp_index = current_file_index
while temp_index < len(orig_lines):
# Check if all scope lines match sequentially from temp_index
match = True
for i, scope in enumerate(scope_lines):
if (
temp_index + i >= len(orig_lines)
or _norm(orig_lines[temp_index + i]).strip() != scope
):
match = False
break
if match:
current_file_index = temp_index + len(scope_lines)
found_scope = True
break
temp_index += 1
if not found_scope:
# Try fuzzy scope matching (strip whitespace)
temp_index = current_file_index
while temp_index < len(orig_lines):
match = True
for i, scope in enumerate(scope_lines):
if (
temp_index + i >= len(orig_lines)
or _norm(orig_lines[temp_index + i]).strip() != scope.strip()
):
match = False
break
if match:
current_file_index = temp_index + len(scope_lines)
found_scope = True
total_fuzz += 1 # Add fuzz for scope match difference
break
temp_index += 1
if not found_scope:
scope_txt = "\n".join(scope_lines)
raise DiffError(f"Could not find scope context:\n{scope_txt}")
# Peek and parse the next context/change section
context_block, chunks_in_section, next_index, is_eof = peek_next_section(lines, index)
# Find where this context block appears in the original file
found_index, fuzz = find_context(orig_lines, context_block, current_file_index, is_eof)
total_fuzz += fuzz
if found_index == -1:
ctx_txt = "\n".join(context_block)
marker = "*** End of File" if is_eof else ""
raise DiffError(
f"Could not find patch context {marker} starting near line"
f" {current_file_index}:\n{ctx_txt}"
)
# Adjust chunk original indices to be absolute within the file
for chunk in chunks_in_section:
# chunk.orig_index from peek is relative to context_block start
# We need it relative to the file start
chunk.orig_index += found_index
action.chunks.append(chunk)
# Advance file index past the matched context block
current_file_index = found_index + len(context_block)
# Advance line index past the processed section in the patch
index = next_index
return action, index, total_fuzz
def _parse_add_file_content(self, lines: List[str], index: int) -> Tuple[PatchAction, int]:
"""Parses the content (+) lines for an Add File action."""
added_lines: List[str] = []
while index < len(lines):
line = lines[index]
norm_line = _norm(line)
# Stop if we hit another action or end marker
if norm_line.startswith(
(
"*** End Patch",
"*** Update File:",
"*** Delete File:",
"*** Add File:",
)
):
break
# Expect lines to start with '+'
if not line.startswith("+"):
# Tolerate blank lines? Or require '+'? Reference implies '+' required.
if norm_line.strip() == "":
# Treat blank line as adding a blank line
added_lines.append("")
else:
raise DiffError(f"Invalid Add File line (missing '+'): {line}")
else:
added_lines.append(line[1:]) # Strip leading '+'
index += 1
action = PatchAction(type=ActionType.ADD, path="", new_content="\n".join(added_lines))
return action, index
def apply_edits(self, edits: List[PatchAction]):
"""
Applies the parsed PatchActions to the corresponding files.
"""
if not edits:
return
# Group edits by original path? Not strictly needed if processed sequentially.
# Edits are now List[Tuple[str, PatchAction]]
for _path_tuple_element, action in edits:
# action is the PatchAction object
# action.path is the canonical path within the action logic
full_path = self.abs_root_path(action.path)
path_obj = pathlib.Path(full_path)
try:
if action.type == ActionType.ADD:
# Check existence *before* writing
if path_obj.exists():
raise DiffError(f"ADD Error: File already exists: {action.path}")
if action.new_content is None:
# Parser should ensure this doesn't happen
raise DiffError(f"ADD change for {action.path} has no content")
self.io.tool_output(f"Adding {action.path}")
path_obj.parent.mkdir(parents=True, exist_ok=True)
# Ensure single trailing newline, matching reference behavior
content_to_write = action.new_content
if not content_to_write.endswith("\n"):
content_to_write += "\n"
self.io.write_text(full_path, content_to_write)
elif action.type == ActionType.DELETE:
self.io.tool_output(f"Deleting {action.path}")
if not path_obj.exists():
self.io.tool_warning(
f"DELETE Warning: File not found, skipping: {action.path}"
)
else:
path_obj.unlink()
elif action.type == ActionType.UPDATE:
if not path_obj.exists():
raise DiffError(f"UPDATE Error: File does not exist: {action.path}")
current_content = self.io.read_text(full_path)
if current_content is None:
# Should have been caught during parsing if file was needed
raise DiffError(f"Could not read file for UPDATE: {action.path}")
# Apply the update logic using the parsed chunks
new_content = self._apply_update(current_content, action, action.path)
target_full_path = (
self.abs_root_path(action.move_path) if action.move_path else full_path
)
target_path_obj = pathlib.Path(target_full_path)
if action.move_path:
self.io.tool_output(
f"Updating and moving {action.path} to {action.move_path}"
)
# Check if target exists before overwriting/moving
if target_path_obj.exists() and full_path != target_full_path:
self.io.tool_warning(
"UPDATE Warning: Target file for move already exists, overwriting:"
f" {action.move_path}"
)
else:
self.io.tool_output(f"Updating {action.path}")
# Ensure parent directory exists for target
target_path_obj.parent.mkdir(parents=True, exist_ok=True)
self.io.write_text(target_full_path, new_content)
# Remove original file *after* successful write to new location if moved
if action.move_path and full_path != target_full_path:
path_obj.unlink()
else:
# Should not happen
raise DiffError(f"Unknown action type encountered: {action.type}")
except (DiffError, FileNotFoundError, IOError, OSError) as e:
# Raise a ValueError to signal failure, consistent with other coders.
raise ValueError(f"Error applying action '{action.type}' to {action.path}: {e}")
except Exception as e:
# Catch unexpected errors during application
raise ValueError(
f"Unexpected error applying action '{action.type}' to {action.path}: {e}"
)
def _apply_update(self, text: str, action: PatchAction, path: str) -> str:
"""
Applies UPDATE chunks to the given text content.
Adapted from _get_updated_file in apply_patch.py.
"""
if action.type is not ActionType.UPDATE:
# Should not be called otherwise, but check for safety
raise DiffError("_apply_update called with non-update action")
orig_lines = text.splitlines() # Use splitlines to handle endings consistently
dest_lines: List[str] = []
current_orig_line_idx = 0 # Tracks index in orig_lines processed so far
# Sort chunks by their original index to apply them sequentially
sorted_chunks = sorted(action.chunks, key=lambda c: c.orig_index)
for chunk in sorted_chunks:
# chunk.orig_index is the absolute line number where the change starts
# (where the first deleted line was, or where inserted lines go if no deletes)
chunk_start_index = chunk.orig_index
if chunk_start_index < current_orig_line_idx:
# This indicates overlapping chunks or incorrect indices from parsing
raise DiffError(
f"{path}: Overlapping or out-of-order chunk detected."
f" Current index {current_orig_line_idx}, chunk starts at {chunk_start_index}."
)
# Add lines from original file between the last chunk and this one
dest_lines.extend(orig_lines[current_orig_line_idx:chunk_start_index])
# Verify that the lines to be deleted actually match the original file content
# (The parser should have used find_context, but double-check here)
num_del = len(chunk.del_lines)
actual_deleted_lines = orig_lines[chunk_start_index : chunk_start_index + num_del]
# Use the same normalization as find_context_core for comparison robustness
norm_chunk_del = [_norm(s).strip() for s in chunk.del_lines]
norm_actual_del = [_norm(s).strip() for s in actual_deleted_lines]
if norm_chunk_del != norm_actual_del:
# This indicates the context matching failed or the file changed since parsing
# Provide detailed error message
expected_str = "\n".join(f"- {s}" for s in chunk.del_lines)
actual_str = "\n".join(f" {s}" for s in actual_deleted_lines)
raise DiffError(
f"{path}: Mismatch applying patch near line {chunk_start_index + 1}.\n"
f"Expected lines to remove:\n{expected_str}\n"
f"Found lines in file:\n{actual_str}"
)
# Add the inserted lines from the chunk
dest_lines.extend(chunk.ins_lines)
# Advance the original line index past the lines processed (deleted lines)
current_orig_line_idx = chunk_start_index + num_del
# Add any remaining lines from the original file after the last chunk
dest_lines.extend(orig_lines[current_orig_line_idx:])
# Join lines and ensure a single trailing newline
result = "\n".join(dest_lines)
if result or orig_lines: # Add newline unless result is empty and original was empty
result += "\n"
return result