diff --git a/aider/coders/patch_flex_coder.py b/aider/coders/patch_flex_coder.py new file mode 100644 index 000000000..d68d65590 --- /dev/null +++ b/aider/coders/patch_flex_coder.py @@ -0,0 +1,565 @@ +# At the top of the file, add necessary imports +import itertools +import pathlib +from dataclasses import dataclass, field +from enum import Enum +from typing import Dict, List, Optional, Tuple + +# Keep existing imports like dump, Coder, PatchPrompts, DiffError, ActionType +from ..dump import dump # noqa: F401 +from .base_coder import Coder +from .patch_prompts import PatchPrompts + + +# Import search_replace utilities +from .search_replace import editblock_strategies, flexible_search_and_replace + +# Remove original Chunk, PatchAction, EditResult, Patch dataclasses if they exist at the top +# Remove helper functions: _norm, find_context_core, find_context, peek_next_section, identify_files_needed +# We will redefine or replace these as needed. + + +# --------------------------------------------------------------------------- # +# Domain objects & Exceptions (Adapted for Flex Coder) +# --------------------------------------------------------------------------- # +class DiffError(ValueError): + """Any problem detected while parsing or applying a patch.""" + + +class ActionType(str, Enum): + ADD = "Add" + DELETE = "Delete" + UPDATE = "Update" + + +@dataclass +class ParsedEdit: + """Represents a single parsed action or change hunk.""" + + path: str + type: ActionType + # For UPDATE hunks: + search_text: Optional[str] = None + replace_text: Optional[str] = None + # For ADD: + new_content: Optional[str] = None + # For UPDATE (last hunk wins if multiple): + move_path: Optional[str] = None + # Original line number in the patch file where this hunk started (for error reporting) + patch_line_num: int = 0 + + +# --------------------------------------------------------------------------- # +# Helper functions (Adapted for Flex Coder) +# --------------------------------------------------------------------------- # +def _norm(line: str) -> str: + """Strip CR so comparisons work for both LF and CRLF input.""" + return line.rstrip("\r") + + +def identify_files_needed(text: str) -> List[str]: + """Extracts file paths from Update and Delete actions.""" + lines = text.splitlines() + paths = set() + for line in lines: + norm_line = _norm(line) + if norm_line.startswith("*** Update File: "): + paths.add(norm_line[len("*** Update File: ") :].strip()) + elif norm_line.startswith("*** Delete File: "): + paths.add(norm_line[len("*** Delete File: ") :].strip()) + return list(paths) + + +def _peek_change_hunk( + lines: List[str], index: int +) -> Tuple[List[str], List[str], List[str], List[str], int, bool]: + """ + Parses one change hunk (context-before, deleted, inserted, context-after) + from an Update block. + + Returns: (context_before, del_lines, ins_lines, context_after, next_index, is_eof) + """ + context_before: List[str] = [] + del_lines: List[str] = [] + ins_lines: List[str] = [] + context_after: List[str] = [] + + mode = "context_before" # States: context_before, delete, insert, context_after + start_index = index + + while index < len(lines): + line = lines[index] + norm_line = _norm(line) + + # Check for section terminators + if norm_line.startswith( + ( + "@@", # Start of a new scope/hunk marker + "*** End Patch", + "*** Update File:", + "*** Delete File:", + "*** Add File:", + "*** End of File", + ) + ): + break + if norm_line == "***": + break + if norm_line.startswith("***"): + raise DiffError(f"Invalid patch line found in update section: {line}") + + current_line_index = index + index += 1 + + # Determine line type and content + line_type = "unknown" + line_content = "" + if line.startswith("+"): + line_type = "insert" + line_content = line[1:] + elif line.startswith("-"): + line_type = "delete" + line_content = line[1:] + elif line.startswith(" "): + line_type = "context" + line_content = line[1:] + elif line.strip() == "": + line_type = "context" + line_content = "" + else: + raise DiffError(f"Invalid line prefix in update section: {line}") + + # State machine logic + if mode == "context_before": + if line_type == "context": + context_before.append(line_content) + elif line_type == "delete": + del_lines.append(line_content) + mode = "delete" + elif line_type == "insert": + # Change starts with insertion (no deletion) + ins_lines.append(line_content) + mode = "insert" + else: + # Should not happen based on checks above + raise DiffError(f"Unexpected line type '{line_type}' in mode '{mode}': {line}") + + elif mode == "delete": + if line_type == "delete": + del_lines.append(line_content) + elif line_type == "insert": + ins_lines.append(line_content) + mode = "insert" + elif line_type == "context": + # Deletes finished, context after starts + context_after.append(line_content) + mode = "context_after" + else: + raise DiffError(f"Unexpected line type '{line_type}' in mode '{mode}': {line}") + + elif mode == "insert": + if line_type == "insert": + ins_lines.append(line_content) + elif line_type == "context": + # Inserts finished, context after starts + context_after.append(line_content) + mode = "context_after" + elif line_type == "delete": + # This implies interleaved +/- lines which this simplified parser doesn't handle well. + # Treat as end of hunk? Or raise error? Let's treat as end for now. + index = current_line_index # Put the delete line back for the next hunk + break + else: + raise DiffError(f"Unexpected line type '{line_type}' in mode '{mode}': {line}") + + elif mode == "context_after": + if line_type == "context": + context_after.append(line_content) + else: + # Any non-context line means this hunk's context_after is finished. + # Put the line back for the next hunk/parser step. + index = current_line_index + break + + # Check for EOF marker + is_eof = False + if index < len(lines) and _norm(lines[index]) == "*** End of File": + index += 1 + is_eof = True + + if index == start_index and not is_eof: + raise DiffError("Empty patch section found.") + + # If the hunk ended immediately with context_after, the last context line + # might belong to the *next* hunk's context_before. This is tricky. + # For simplicity, we'll keep it here. flexible_search_replace might handle overlap. + + return context_before, del_lines, ins_lines, context_after, index, is_eof + + +# --------------------------------------------------------------------------- # +# PatchFlexCoder Class Implementation +# --------------------------------------------------------------------------- # +class PatchFlexCoder(Coder): # Rename class + """ + A coder that uses the patch format for LLM output, but applies changes + using flexible search-and-replace logic for UPDATE actions, ignoring @@ hints + and precise line numbers during application. + """ + + edit_format = "patch-flex" # Give it a distinct name + gpt_prompts = PatchPrompts() # Use the same prompts as PatchCoder + + def get_edits(self) -> List[ParsedEdit]: # Return type changed + """ + Parses the LLM response content (containing the patch) into a list of + ParsedEdit objects, extracting search/replace blocks for UPDATEs. + """ + content = self.partial_response_content + if not content or not content.strip(): + return [] + + lines = content.splitlines() + start_index = 0 + if ( + len(lines) >= 2 + and _norm(lines[0]).startswith("*** Begin Patch") + ): + start_index = 1 + else: + # Tolerate missing sentinels if content looks like a patch action + is_patch_like = any( + _norm(line).startswith( + ("@@", "*** Update File:", "*** Add File:", "*** Delete File:") + ) + for line in lines + ) + if not is_patch_like: + self.io.tool_warning("Response does not appear to be in patch format.") + return [] + self.io.tool_warning( + "Patch format warning: Missing '*** Begin Patch' sentinel." + ) + + # Identify files needed for context lookups (only for DELETE check) + needed_paths = identify_files_needed(content) + # Unlike PatchCoder, we don't strictly need file content during parsing, + # but it's useful to check if DELETE targets exist. + # We read content dynamically in apply_edits. + known_files = set(self.get_inchat_relative_files()) | set(needed_paths) + + + try: + # Parse the patch text into ParsedEdit objects + parsed_edits = self._parse_patch_text(lines, start_index, known_files) + return parsed_edits + except DiffError as e: + raise ValueError(f"Error parsing patch content: {e}") + except Exception as e: + raise ValueError(f"Unexpected error parsing patch: {e}") + + def _parse_patch_text( + self, lines: List[str], start_index: int, known_files: set[str] + ) -> List[ParsedEdit]: + """ + Parses patch content lines into a list of ParsedEdit objects. + """ + parsed_edits: List[ParsedEdit] = [] + index = start_index + current_file_path = None + current_move_path = None + + while index < len(lines): + line = lines[index] + norm_line = _norm(line) + line_num = index + 1 # 1-based for reporting + + if norm_line == "*** End Patch": + index += 1 + break + + # ---------- UPDATE ---------- # + if norm_line.startswith("*** Update File: "): + path = norm_line[len("*** Update File: ") :].strip() + index += 1 + if not path: + raise DiffError(f"Update File action missing path (line {line_num}).") + # We don't check for duplicates here, multiple UPDATEs for the same file are handled sequentially. + # if path not in known_files: # Check if file is known (in chat or mentioned) + # self.io.tool_warning(f"Update File target '{path}' not found in chat context.") + + current_file_path = path + current_move_path = None # Reset move path for new file + + # Check for optional Move to immediately after + if index < len(lines) and _norm(lines[index]).startswith("*** Move to: "): + move_to = _norm(lines[index])[len("*** Move to: ") :].strip() + index += 1 + if not move_to: + raise DiffError(f"Move to action missing path (line {index}).") + current_move_path = move_to + continue # Continue to parse hunks for this file + + # ---------- DELETE ---------- # + elif norm_line.startswith("*** Delete File: "): + path = norm_line[len("*** Delete File: ") :].strip() + index += 1 + if not path: + raise DiffError(f"Delete File action missing path (line {line_num}).") + if path not in known_files: + # Check against known files before adding delete action + self.io.tool_warning(f"Delete File target '{path}' not found in chat context.") + + parsed_edits.append(ParsedEdit(path=path, type=ActionType.DELETE, patch_line_num=line_num)) + current_file_path = None # Reset current file context + current_move_path = None + continue + + # ---------- ADD ---------- # + elif norm_line.startswith("*** Add File: "): + path = norm_line[len("*** Add File: ") :].strip() + index += 1 + if not path: + raise DiffError(f"Add File action missing path (line {line_num}).") + # if path in known_files: # Check if file might already exist + # self.io.tool_warning(f"Add File target '{path}' may already exist.") + + action, index = self._parse_add_file_content(lines, index) + action.path = path + action.patch_line_num = line_num + parsed_edits.append(action) + current_file_path = None # Reset current file context + current_move_path = None + continue + + # ---------- Hunks within UPDATE ---------- # + elif current_file_path: + # Skip @@ lines, they are ignored by this coder's application logic + if norm_line.startswith("@@"): + index += 1 + continue + + # Parse the next change hunk for the current file + hunk_start_index = index + try: + ( + context_before, + del_lines, + ins_lines, + context_after, + next_index, + _is_eof, # EOF marker not strictly needed for search/replace logic + ) = _peek_change_hunk(lines, index) + except DiffError as e: + raise DiffError(f"{e} (near line {line_num} in patch)") + + + if not del_lines and not ins_lines: + # Skip hunks that contain only context - they don't represent a change + index = next_index + continue + + # Construct search and replace text based on user request + # Search = context_before + deleted_lines + # Replace = inserted_lines + context_after + search_text = "\n".join(context_before + del_lines) + replace_text = "\n".join(ins_lines + context_after) + + # Add trailing newline if original content likely had one + # (This helps match blocks ending at EOF) + # Heuristic: if context_after is empty AND there were deleted lines, + # the original block likely ended with the last deleted line. + # Or if context_before/del/ins are all empty, it's just context. + if not context_after and (del_lines or ins_lines): + search_text += "\n" + # Replace text already includes context_after, so only add if that was empty too + if not ins_lines: + replace_text += "\n" + elif context_after or context_before or del_lines or ins_lines: + # If there's any content, ensure trailing newline for consistency + search_text += "\n" + replace_text += "\n" + + + parsed_edits.append( + ParsedEdit( + path=current_file_path, + type=ActionType.UPDATE, + search_text=search_text, + replace_text=replace_text, + move_path=current_move_path, # Carry over move path for this hunk + patch_line_num=hunk_start_index + 1, + ) + ) + index = next_index + continue + + # If we are here, the line is unexpected or misplaced + if not norm_line.strip(): # Allow blank lines between actions/files + index += 1 + continue + + raise DiffError(f"Unknown or misplaced line while parsing patch (line {line_num}): {line}") + + return parsed_edits + + def _parse_add_file_content(self, lines: List[str], index: int) -> Tuple[ParsedEdit, int]: + """Parses the content (+) lines for an Add File action.""" + added_lines: List[str] = [] + start_line_num = index + 1 + while index < len(lines): + line = lines[index] + norm_line = _norm(line) + # Stop if we hit another action or end marker + if norm_line.startswith( + ( + "*** End Patch", + "*** Update File:", + "*** Delete File:", + "*** Add File:", + ) + ): + break + + if not line.startswith("+"): + if norm_line.strip() == "": + added_lines.append("") # Treat blank line as adding a blank line + else: + raise DiffError(f"Invalid Add File line (missing '+') (line {index+1}): {line}") + else: + added_lines.append(line[1:]) + + index += 1 + + action = ParsedEdit( + path="", # Path set by caller + type=ActionType.ADD, + new_content="\n".join(added_lines), + patch_line_num=start_line_num + ) + return action, index + + def apply_edits(self, edits: List[ParsedEdit]): # Argument type changed + """ + Applies the parsed edits. Uses flexible search-and-replace for UPDATEs. + """ + if not edits: + self.io.tool_output("No edits to apply.") + return + + # Group edits by file path to process them sequentially + edits_by_path = itertools.groupby(edits, key=lambda edit: edit.path) + + for path, path_edits_iter in edits_by_path: + path_edits = list(path_edits_iter) + full_path = self.abs_root_path(path) + path_obj = pathlib.Path(full_path) + current_content = None + edit_failed = False + final_move_path = None # Track the last move destination for this file + + # Check for simple ADD/DELETE first (should ideally be only one per file) + if len(path_edits) == 1 and path_edits[0].type in [ActionType.ADD, ActionType.DELETE]: + edit = path_edits[0] + try: + if edit.type == ActionType.ADD: + if path_obj.exists(): + # Allow overwrite on ADD? Or error? Let's warn and overwrite. + self.io.tool_warning(f"ADD Warning: File '{path}' already exists, overwriting.") + # raise DiffError(f"ADD Error: File already exists: {path}") + if edit.new_content is None: + raise DiffError(f"ADD change for {path} has no content") + + self.io.tool_output(f"Adding {path}") + path_obj.parent.mkdir(parents=True, exist_ok=True) + content_to_write = edit.new_content + if not content_to_write.endswith("\n"): + content_to_write += "\n" + self.io.write_text(full_path, content_to_write) + + elif edit.type == ActionType.DELETE: + self.io.tool_output(f"Deleting {path}") + if not path_obj.exists(): + self.io.tool_warning(f"DELETE Warning: File not found, skipping: {path}") + else: + path_obj.unlink() + except (DiffError, FileNotFoundError, IOError, OSError) as e: + raise ValueError(f"Error applying action '{edit.type}' to {path}: {e}") + except Exception as e: + raise ValueError(f"Unexpected error applying action '{edit.type}' to {path}: {e}") + continue # Move to the next file path + + # --- Handle UPDATE actions sequentially --- + self.io.tool_output(f"Updating {path}...") + try: + if not path_obj.exists(): + raise DiffError(f"UPDATE Error: File does not exist: {path}") + current_content = self.io.read_text(full_path) + if current_content is None: + raise DiffError(f"Could not read file for UPDATE: {path}") + + for i, edit in enumerate(path_edits): + if edit.type != ActionType.UPDATE: + raise DiffError(f"Unexpected action type '{edit.type}' mixed with UPDATE for {path}") + if edit.search_text is None or edit.replace_text is None: + raise DiffError(f"UPDATE action for {path} is missing search/replace text") + + final_move_path = edit.move_path # Last move path specified wins + + self.io.tool_output(f" Applying hunk {i+1} (from patch line {edit.patch_line_num})...") + + texts = (edit.search_text, edit.replace_text, current_content) + new_content = flexible_search_and_replace(texts, editblock_strategies) + + if new_content is None: + edit_failed = True + # Provide more context on failure + err_msg = ( + f"Failed to apply update hunk {i+1} (from patch line {edit.patch_line_num})" + f" for file {path}. The search block may not have been found" + " or the change conflicted.\n" + f"Search block:\n```\n{edit.search_text}```\n" + f"Replace block:\n```\n{edit.replace_text}```" + ) + # Raise immediately to stop processing this file + raise ValueError(err_msg) + + # Update content for the next iteration + current_content = new_content + + # After processing all hunks for this file: + if not edit_failed and current_content is not None: + target_full_path = ( + self.abs_root_path(final_move_path) if final_move_path else full_path + ) + target_path_obj = pathlib.Path(target_full_path) + + if final_move_path: + self.io.tool_output(f"Moving updated file to {final_move_path}") + if target_path_obj.exists() and full_path != target_full_path: + self.io.tool_warning( + "UPDATE Warning: Target file for move already exists, overwriting:" + f" {final_move_path}" + ) + + # Ensure parent directory exists for target + target_path_obj.parent.mkdir(parents=True, exist_ok=True) + # Ensure trailing newline + if not current_content.endswith("\n") and current_content != "": + current_content += "\n" + self.io.write_text(target_full_path, current_content) + + # Remove original file *after* successful write if moved + if final_move_path and full_path != target_full_path: + path_obj.unlink() + + except (DiffError, FileNotFoundError, IOError, OSError) as e: + # Raise a ValueError to signal failure + raise ValueError(f"Error applying UPDATE to {path}: {e}") + except Exception as e: + # Catch unexpected errors during application + raise ValueError(f"Unexpected error applying UPDATE to {path}: {e}") + + # Remove the _apply_update method as it's replaced by flexible_search_and_replace logic + # def _apply_update(self, text: str, action: PatchAction, path: str) -> str: + # ...