feat: Create PatchFlexCoder using flexible search and replace for updates

This commit is contained in:
Paul Gauthier (aider) 2025-04-14 15:29:10 -07:00
parent 14028d3758
commit 5573cdfba1

View file

@ -0,0 +1,565 @@
# At the top of the file, add necessary imports
import itertools
import pathlib
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Tuple
# Keep existing imports like dump, Coder, PatchPrompts, DiffError, ActionType
from ..dump import dump # noqa: F401
from .base_coder import Coder
from .patch_prompts import PatchPrompts
# Import search_replace utilities
from .search_replace import editblock_strategies, flexible_search_and_replace
# Remove original Chunk, PatchAction, EditResult, Patch dataclasses if they exist at the top
# Remove helper functions: _norm, find_context_core, find_context, peek_next_section, identify_files_needed
# We will redefine or replace these as needed.
# --------------------------------------------------------------------------- #
# Domain objects & Exceptions (Adapted for Flex Coder)
# --------------------------------------------------------------------------- #
class DiffError(ValueError):
"""Any problem detected while parsing or applying a patch."""
class ActionType(str, Enum):
ADD = "Add"
DELETE = "Delete"
UPDATE = "Update"
@dataclass
class ParsedEdit:
"""Represents a single parsed action or change hunk."""
path: str
type: ActionType
# For UPDATE hunks:
search_text: Optional[str] = None
replace_text: Optional[str] = None
# For ADD:
new_content: Optional[str] = None
# For UPDATE (last hunk wins if multiple):
move_path: Optional[str] = None
# Original line number in the patch file where this hunk started (for error reporting)
patch_line_num: int = 0
# --------------------------------------------------------------------------- #
# Helper functions (Adapted for Flex Coder)
# --------------------------------------------------------------------------- #
def _norm(line: str) -> str:
"""Strip CR so comparisons work for both LF and CRLF input."""
return line.rstrip("\r")
def identify_files_needed(text: str) -> List[str]:
"""Extracts file paths from Update and Delete actions."""
lines = text.splitlines()
paths = set()
for line in lines:
norm_line = _norm(line)
if norm_line.startswith("*** Update File: "):
paths.add(norm_line[len("*** Update File: ") :].strip())
elif norm_line.startswith("*** Delete File: "):
paths.add(norm_line[len("*** Delete File: ") :].strip())
return list(paths)
def _peek_change_hunk(
lines: List[str], index: int
) -> Tuple[List[str], List[str], List[str], List[str], int, bool]:
"""
Parses one change hunk (context-before, deleted, inserted, context-after)
from an Update block.
Returns: (context_before, del_lines, ins_lines, context_after, next_index, is_eof)
"""
context_before: List[str] = []
del_lines: List[str] = []
ins_lines: List[str] = []
context_after: List[str] = []
mode = "context_before" # States: context_before, delete, insert, context_after
start_index = index
while index < len(lines):
line = lines[index]
norm_line = _norm(line)
# Check for section terminators
if norm_line.startswith(
(
"@@", # Start of a new scope/hunk marker
"*** End Patch",
"*** Update File:",
"*** Delete File:",
"*** Add File:",
"*** End of File",
)
):
break
if norm_line == "***":
break
if norm_line.startswith("***"):
raise DiffError(f"Invalid patch line found in update section: {line}")
current_line_index = index
index += 1
# Determine line type and content
line_type = "unknown"
line_content = ""
if line.startswith("+"):
line_type = "insert"
line_content = line[1:]
elif line.startswith("-"):
line_type = "delete"
line_content = line[1:]
elif line.startswith(" "):
line_type = "context"
line_content = line[1:]
elif line.strip() == "":
line_type = "context"
line_content = ""
else:
raise DiffError(f"Invalid line prefix in update section: {line}")
# State machine logic
if mode == "context_before":
if line_type == "context":
context_before.append(line_content)
elif line_type == "delete":
del_lines.append(line_content)
mode = "delete"
elif line_type == "insert":
# Change starts with insertion (no deletion)
ins_lines.append(line_content)
mode = "insert"
else:
# Should not happen based on checks above
raise DiffError(f"Unexpected line type '{line_type}' in mode '{mode}': {line}")
elif mode == "delete":
if line_type == "delete":
del_lines.append(line_content)
elif line_type == "insert":
ins_lines.append(line_content)
mode = "insert"
elif line_type == "context":
# Deletes finished, context after starts
context_after.append(line_content)
mode = "context_after"
else:
raise DiffError(f"Unexpected line type '{line_type}' in mode '{mode}': {line}")
elif mode == "insert":
if line_type == "insert":
ins_lines.append(line_content)
elif line_type == "context":
# Inserts finished, context after starts
context_after.append(line_content)
mode = "context_after"
elif line_type == "delete":
# This implies interleaved +/- lines which this simplified parser doesn't handle well.
# Treat as end of hunk? Or raise error? Let's treat as end for now.
index = current_line_index # Put the delete line back for the next hunk
break
else:
raise DiffError(f"Unexpected line type '{line_type}' in mode '{mode}': {line}")
elif mode == "context_after":
if line_type == "context":
context_after.append(line_content)
else:
# Any non-context line means this hunk's context_after is finished.
# Put the line back for the next hunk/parser step.
index = current_line_index
break
# Check for EOF marker
is_eof = False
if index < len(lines) and _norm(lines[index]) == "*** End of File":
index += 1
is_eof = True
if index == start_index and not is_eof:
raise DiffError("Empty patch section found.")
# If the hunk ended immediately with context_after, the last context line
# might belong to the *next* hunk's context_before. This is tricky.
# For simplicity, we'll keep it here. flexible_search_replace might handle overlap.
return context_before, del_lines, ins_lines, context_after, index, is_eof
# --------------------------------------------------------------------------- #
# PatchFlexCoder Class Implementation
# --------------------------------------------------------------------------- #
class PatchFlexCoder(Coder): # Rename class
"""
A coder that uses the patch format for LLM output, but applies changes
using flexible search-and-replace logic for UPDATE actions, ignoring @@ hints
and precise line numbers during application.
"""
edit_format = "patch-flex" # Give it a distinct name
gpt_prompts = PatchPrompts() # Use the same prompts as PatchCoder
def get_edits(self) -> List[ParsedEdit]: # Return type changed
"""
Parses the LLM response content (containing the patch) into a list of
ParsedEdit objects, extracting search/replace blocks for UPDATEs.
"""
content = self.partial_response_content
if not content or not content.strip():
return []
lines = content.splitlines()
start_index = 0
if (
len(lines) >= 2
and _norm(lines[0]).startswith("*** Begin Patch")
):
start_index = 1
else:
# Tolerate missing sentinels if content looks like a patch action
is_patch_like = any(
_norm(line).startswith(
("@@", "*** Update File:", "*** Add File:", "*** Delete File:")
)
for line in lines
)
if not is_patch_like:
self.io.tool_warning("Response does not appear to be in patch format.")
return []
self.io.tool_warning(
"Patch format warning: Missing '*** Begin Patch' sentinel."
)
# Identify files needed for context lookups (only for DELETE check)
needed_paths = identify_files_needed(content)
# Unlike PatchCoder, we don't strictly need file content during parsing,
# but it's useful to check if DELETE targets exist.
# We read content dynamically in apply_edits.
known_files = set(self.get_inchat_relative_files()) | set(needed_paths)
try:
# Parse the patch text into ParsedEdit objects
parsed_edits = self._parse_patch_text(lines, start_index, known_files)
return parsed_edits
except DiffError as e:
raise ValueError(f"Error parsing patch content: {e}")
except Exception as e:
raise ValueError(f"Unexpected error parsing patch: {e}")
def _parse_patch_text(
self, lines: List[str], start_index: int, known_files: set[str]
) -> List[ParsedEdit]:
"""
Parses patch content lines into a list of ParsedEdit objects.
"""
parsed_edits: List[ParsedEdit] = []
index = start_index
current_file_path = None
current_move_path = None
while index < len(lines):
line = lines[index]
norm_line = _norm(line)
line_num = index + 1 # 1-based for reporting
if norm_line == "*** End Patch":
index += 1
break
# ---------- UPDATE ---------- #
if norm_line.startswith("*** Update File: "):
path = norm_line[len("*** Update File: ") :].strip()
index += 1
if not path:
raise DiffError(f"Update File action missing path (line {line_num}).")
# We don't check for duplicates here, multiple UPDATEs for the same file are handled sequentially.
# if path not in known_files: # Check if file is known (in chat or mentioned)
# self.io.tool_warning(f"Update File target '{path}' not found in chat context.")
current_file_path = path
current_move_path = None # Reset move path for new file
# Check for optional Move to immediately after
if index < len(lines) and _norm(lines[index]).startswith("*** Move to: "):
move_to = _norm(lines[index])[len("*** Move to: ") :].strip()
index += 1
if not move_to:
raise DiffError(f"Move to action missing path (line {index}).")
current_move_path = move_to
continue # Continue to parse hunks for this file
# ---------- DELETE ---------- #
elif norm_line.startswith("*** Delete File: "):
path = norm_line[len("*** Delete File: ") :].strip()
index += 1
if not path:
raise DiffError(f"Delete File action missing path (line {line_num}).")
if path not in known_files:
# Check against known files before adding delete action
self.io.tool_warning(f"Delete File target '{path}' not found in chat context.")
parsed_edits.append(ParsedEdit(path=path, type=ActionType.DELETE, patch_line_num=line_num))
current_file_path = None # Reset current file context
current_move_path = None
continue
# ---------- ADD ---------- #
elif norm_line.startswith("*** Add File: "):
path = norm_line[len("*** Add File: ") :].strip()
index += 1
if not path:
raise DiffError(f"Add File action missing path (line {line_num}).")
# if path in known_files: # Check if file might already exist
# self.io.tool_warning(f"Add File target '{path}' may already exist.")
action, index = self._parse_add_file_content(lines, index)
action.path = path
action.patch_line_num = line_num
parsed_edits.append(action)
current_file_path = None # Reset current file context
current_move_path = None
continue
# ---------- Hunks within UPDATE ---------- #
elif current_file_path:
# Skip @@ lines, they are ignored by this coder's application logic
if norm_line.startswith("@@"):
index += 1
continue
# Parse the next change hunk for the current file
hunk_start_index = index
try:
(
context_before,
del_lines,
ins_lines,
context_after,
next_index,
_is_eof, # EOF marker not strictly needed for search/replace logic
) = _peek_change_hunk(lines, index)
except DiffError as e:
raise DiffError(f"{e} (near line {line_num} in patch)")
if not del_lines and not ins_lines:
# Skip hunks that contain only context - they don't represent a change
index = next_index
continue
# Construct search and replace text based on user request
# Search = context_before + deleted_lines
# Replace = inserted_lines + context_after
search_text = "\n".join(context_before + del_lines)
replace_text = "\n".join(ins_lines + context_after)
# Add trailing newline if original content likely had one
# (This helps match blocks ending at EOF)
# Heuristic: if context_after is empty AND there were deleted lines,
# the original block likely ended with the last deleted line.
# Or if context_before/del/ins are all empty, it's just context.
if not context_after and (del_lines or ins_lines):
search_text += "\n"
# Replace text already includes context_after, so only add if that was empty too
if not ins_lines:
replace_text += "\n"
elif context_after or context_before or del_lines or ins_lines:
# If there's any content, ensure trailing newline for consistency
search_text += "\n"
replace_text += "\n"
parsed_edits.append(
ParsedEdit(
path=current_file_path,
type=ActionType.UPDATE,
search_text=search_text,
replace_text=replace_text,
move_path=current_move_path, # Carry over move path for this hunk
patch_line_num=hunk_start_index + 1,
)
)
index = next_index
continue
# If we are here, the line is unexpected or misplaced
if not norm_line.strip(): # Allow blank lines between actions/files
index += 1
continue
raise DiffError(f"Unknown or misplaced line while parsing patch (line {line_num}): {line}")
return parsed_edits
def _parse_add_file_content(self, lines: List[str], index: int) -> Tuple[ParsedEdit, int]:
"""Parses the content (+) lines for an Add File action."""
added_lines: List[str] = []
start_line_num = index + 1
while index < len(lines):
line = lines[index]
norm_line = _norm(line)
# Stop if we hit another action or end marker
if norm_line.startswith(
(
"*** End Patch",
"*** Update File:",
"*** Delete File:",
"*** Add File:",
)
):
break
if not line.startswith("+"):
if norm_line.strip() == "":
added_lines.append("") # Treat blank line as adding a blank line
else:
raise DiffError(f"Invalid Add File line (missing '+') (line {index+1}): {line}")
else:
added_lines.append(line[1:])
index += 1
action = ParsedEdit(
path="", # Path set by caller
type=ActionType.ADD,
new_content="\n".join(added_lines),
patch_line_num=start_line_num
)
return action, index
def apply_edits(self, edits: List[ParsedEdit]): # Argument type changed
"""
Applies the parsed edits. Uses flexible search-and-replace for UPDATEs.
"""
if not edits:
self.io.tool_output("No edits to apply.")
return
# Group edits by file path to process them sequentially
edits_by_path = itertools.groupby(edits, key=lambda edit: edit.path)
for path, path_edits_iter in edits_by_path:
path_edits = list(path_edits_iter)
full_path = self.abs_root_path(path)
path_obj = pathlib.Path(full_path)
current_content = None
edit_failed = False
final_move_path = None # Track the last move destination for this file
# Check for simple ADD/DELETE first (should ideally be only one per file)
if len(path_edits) == 1 and path_edits[0].type in [ActionType.ADD, ActionType.DELETE]:
edit = path_edits[0]
try:
if edit.type == ActionType.ADD:
if path_obj.exists():
# Allow overwrite on ADD? Or error? Let's warn and overwrite.
self.io.tool_warning(f"ADD Warning: File '{path}' already exists, overwriting.")
# raise DiffError(f"ADD Error: File already exists: {path}")
if edit.new_content is None:
raise DiffError(f"ADD change for {path} has no content")
self.io.tool_output(f"Adding {path}")
path_obj.parent.mkdir(parents=True, exist_ok=True)
content_to_write = edit.new_content
if not content_to_write.endswith("\n"):
content_to_write += "\n"
self.io.write_text(full_path, content_to_write)
elif edit.type == ActionType.DELETE:
self.io.tool_output(f"Deleting {path}")
if not path_obj.exists():
self.io.tool_warning(f"DELETE Warning: File not found, skipping: {path}")
else:
path_obj.unlink()
except (DiffError, FileNotFoundError, IOError, OSError) as e:
raise ValueError(f"Error applying action '{edit.type}' to {path}: {e}")
except Exception as e:
raise ValueError(f"Unexpected error applying action '{edit.type}' to {path}: {e}")
continue # Move to the next file path
# --- Handle UPDATE actions sequentially ---
self.io.tool_output(f"Updating {path}...")
try:
if not path_obj.exists():
raise DiffError(f"UPDATE Error: File does not exist: {path}")
current_content = self.io.read_text(full_path)
if current_content is None:
raise DiffError(f"Could not read file for UPDATE: {path}")
for i, edit in enumerate(path_edits):
if edit.type != ActionType.UPDATE:
raise DiffError(f"Unexpected action type '{edit.type}' mixed with UPDATE for {path}")
if edit.search_text is None or edit.replace_text is None:
raise DiffError(f"UPDATE action for {path} is missing search/replace text")
final_move_path = edit.move_path # Last move path specified wins
self.io.tool_output(f" Applying hunk {i+1} (from patch line {edit.patch_line_num})...")
texts = (edit.search_text, edit.replace_text, current_content)
new_content = flexible_search_and_replace(texts, editblock_strategies)
if new_content is None:
edit_failed = True
# Provide more context on failure
err_msg = (
f"Failed to apply update hunk {i+1} (from patch line {edit.patch_line_num})"
f" for file {path}. The search block may not have been found"
" or the change conflicted.\n"
f"Search block:\n```\n{edit.search_text}```\n"
f"Replace block:\n```\n{edit.replace_text}```"
)
# Raise immediately to stop processing this file
raise ValueError(err_msg)
# Update content for the next iteration
current_content = new_content
# After processing all hunks for this file:
if not edit_failed and current_content is not None:
target_full_path = (
self.abs_root_path(final_move_path) if final_move_path else full_path
)
target_path_obj = pathlib.Path(target_full_path)
if final_move_path:
self.io.tool_output(f"Moving updated file to {final_move_path}")
if target_path_obj.exists() and full_path != target_full_path:
self.io.tool_warning(
"UPDATE Warning: Target file for move already exists, overwriting:"
f" {final_move_path}"
)
# Ensure parent directory exists for target
target_path_obj.parent.mkdir(parents=True, exist_ok=True)
# Ensure trailing newline
if not current_content.endswith("\n") and current_content != "":
current_content += "\n"
self.io.write_text(target_full_path, current_content)
# Remove original file *after* successful write if moved
if final_move_path and full_path != target_full_path:
path_obj.unlink()
except (DiffError, FileNotFoundError, IOError, OSError) as e:
# Raise a ValueError to signal failure
raise ValueError(f"Error applying UPDATE to {path}: {e}")
except Exception as e:
# Catch unexpected errors during application
raise ValueError(f"Unexpected error applying UPDATE to {path}: {e}")
# Remove the _apply_update method as it's replaced by flexible_search_and_replace logic
# def _apply_update(self, text: str, action: PatchAction, path: str) -> str:
# ...