refactor: Integrate core logic from apply_patch.py into PatchCoder

This commit is contained in:
Paul Gauthier (aider) 2025-04-14 14:45:09 -07:00
parent 78e76648d0
commit f565f72679

View file

@ -2,13 +2,19 @@ import pathlib
import re import re
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import Enum from enum import Enum
from typing import List, Optional from typing import Dict, List, Optional, Tuple
from ..dump import dump # noqa: F401 from ..dump import dump # noqa: F401
from .base_coder import Coder from .base_coder import Coder
# Adapted structures and types from apply_patch.py for parsing and applying # --------------------------------------------------------------------------- #
# Domain objects & Exceptions (Adapted from apply_patch.py)
# --------------------------------------------------------------------------- #
class DiffError(ValueError):
"""Any problem detected while parsing or applying a patch."""
class ActionType(str, Enum): class ActionType(str, Enum):
ADD = "Add" ADD = "Add"
DELETE = "Delete" DELETE = "Delete"
@ -17,27 +23,198 @@ class ActionType(str, Enum):
@dataclass @dataclass
class Chunk: class Chunk:
orig_index: int = -1 orig_index: int = -1 # Line number in the *original* file block where the change starts
del_lines: List[str] = field(default_factory=list) del_lines: List[str] = field(default_factory=list)
ins_lines: List[str] = field(default_factory=list) ins_lines: List[str] = field(default_factory=list)
context_before: List[str] = field(
default_factory=list
) # Store context for validation/application
@dataclass @dataclass
class PatchAction: class PatchAction:
type: ActionType type: ActionType
path: str path: str
new_content: Optional[str] = None # For Add # For ADD:
chunks: List[Chunk] = field(default_factory=list) # For Update new_content: Optional[str] = None
move_path: Optional[str] = None # For Update # For UPDATE:
chunks: List[Chunk] = field(default_factory=list)
move_path: Optional[str] = None
@dataclass
class Patch:
actions: Dict[str, PatchAction] = field(default_factory=dict)
fuzz: int = 0 # Track fuzziness used during parsing
# --------------------------------------------------------------------------- #
# Helper functions (Adapted from apply_patch.py)
# --------------------------------------------------------------------------- #
def _norm(line: str) -> str:
"""Strip CR so comparisons work for both LF and CRLF input."""
return line.rstrip("\r")
def find_context_core(
lines: List[str], context: List[str], start: int
) -> Tuple[int, int]:
"""Finds context block, returns start index and fuzz level."""
if not context:
return start, 0
# Exact match
for i in range(start, len(lines) - len(context) + 1):
if lines[i : i + len(context)] == context:
return i, 0
# Rstrip match
norm_context = [s.rstrip() for s in context]
for i in range(start, len(lines) - len(context) + 1):
if [s.rstrip() for s in lines[i : i + len(context)]] == norm_context:
return i, 1 # Fuzz level 1
# Strip match
norm_context_strip = [s.strip() for s in context]
for i in range(start, len(lines) - len(context) + 1):
if [s.strip() for s in lines[i : i + len(context)]] == norm_context_strip:
return i, 100 # Fuzz level 100
return -1, 0
def find_context(
lines: List[str], context: List[str], start: int, eof: bool
) -> Tuple[int, int]:
"""Finds context, handling EOF marker."""
if eof:
# If EOF marker, first try matching at the very end
if len(lines) >= len(context):
new_index, fuzz = find_context_core(lines, context, len(lines) - len(context))
if new_index != -1:
return new_index, fuzz
# If not found at end, search from `start` as fallback
new_index, fuzz = find_context_core(lines, context, start)
return new_index, fuzz + 10_000 # Add large fuzz penalty if EOF wasn't at end
# Normal case: search from `start`
return find_context_core(lines, context, start)
def peek_next_section(
lines: List[str], index: int
) -> Tuple[List[str], List[Chunk], int, bool]:
"""
Parses one section (context, -, + lines) of an Update block.
Returns: (context_lines, chunks_in_section, next_index, is_eof)
"""
context_lines: List[str] = []
del_lines: List[str] = []
ins_lines: List[str] = []
chunks: List[Chunk] = []
mode = "keep" # Start by expecting context lines
start_index = index
while index < len(lines):
line = lines[index]
norm_line = _norm(line)
# Check for section terminators
if norm_line.startswith(
(
"@@",
"*** End Patch",
"*** Update File:",
"*** Delete File:",
"*** Add File:",
"*** End of File", # Special terminator
)
):
break
if norm_line == "***": # Legacy/alternative terminator? Handle just in case.
break
if norm_line.startswith("***"): # Invalid line
raise DiffError(f"Invalid patch line found in update section: {line}")
index += 1
last_mode = mode
# Determine line type and strip prefix
if line.startswith("+"):
mode = "add"
line_content = line[1:]
elif line.startswith("-"):
mode = "delete"
line_content = line[1:]
elif line.startswith(" "):
mode = "keep"
line_content = line[1:]
elif line.strip() == "": # Treat blank lines in patch as context ' '
mode = "keep"
line_content = "" # Keep it as a blank line
else:
# Assume lines without prefix are context if format is loose,
# but strict format requires ' '. Raise error for strictness.
raise DiffError(f"Invalid line prefix in update section: {line}")
# If mode changes from add/delete back to keep, finalize the previous chunk
if mode == "keep" and last_mode != "keep":
if del_lines or ins_lines:
chunks.append(
Chunk(
# orig_index is relative to the start of the *context* block found
orig_index=len(context_lines) - len(del_lines),
del_lines=del_lines,
ins_lines=ins_lines,
)
)
del_lines, ins_lines = [], []
# Collect lines based on mode
if mode == "delete":
del_lines.append(line_content)
context_lines.append(line_content) # Deleted lines are part of the original context
elif mode == "add":
ins_lines.append(line_content)
elif mode == "keep":
context_lines.append(line_content)
# Finalize any pending chunk at the end of the section
if del_lines or ins_lines:
chunks.append(
Chunk(
orig_index=len(context_lines) - len(del_lines),
del_lines=del_lines,
ins_lines=ins_lines,
)
)
# Check for EOF marker
is_eof = False
if index < len(lines) and _norm(lines[index]) == "*** End of File":
index += 1
is_eof = True
if index == start_index and not is_eof: # Should not happen if patch is well-formed
raise DiffError("Empty patch section found.")
return context_lines, chunks, index, is_eof
def identify_files_needed(text: str) -> List[str]:
"""Extracts file paths from Update and Delete actions."""
lines = text.splitlines()
paths = set()
for line in lines:
norm_line = _norm(line)
if norm_line.startswith("*** Update File: "):
paths.add(norm_line[len("*** Update File: ") :].strip())
elif norm_line.startswith("*** Delete File: "):
paths.add(norm_line[len("*** Delete File: ") :].strip())
return list(paths)
# --------------------------------------------------------------------------- #
# PatchCoder Class Implementation
# --------------------------------------------------------------------------- #
class PatchCoder(Coder): class PatchCoder(Coder):
""" """
A coder that uses a custom patch format for code modifications, A coder that uses a custom patch format for code modifications,
inspired by the format described in tmp.gpt41edits.txt. inspired by the format described in tmp.gpt41edits.txt.
Applies patches using logic adapted from the reference apply_patch.py script.
""" """
edit_format = "patch" edit_format = "patch"
@ -51,115 +228,277 @@ class PatchCoder(Coder):
if not content or not content.strip(): if not content or not content.strip():
return [] return []
try: # Check for patch sentinels
parsed_edits = self._parse_patch_content(content)
return parsed_edits
except Exception as e:
raise ValueError(f"Error parsing patch content: {e}")
def _parse_patch_content(self, content: str) -> List[PatchAction]:
"""
Parses the patch content string into a list of PatchAction objects.
This is a simplified parser based on the expected format. A more robust
implementation would adapt the full parser logic from apply_patch.py,
including context finding and validation against current file content.
"""
edits = []
lines = content.splitlines() lines = content.splitlines()
i = 0 if (
in_patch = False len(lines) < 2
current_action = None or not _norm(lines[0]).startswith("*** Begin Patch")
# Allow flexible end, might be EOF or just end of stream
# or _norm(lines[-1]) != "*** End Patch"
):
# Tolerate missing sentinels if content looks like a patch action
is_patch_like = any(_norm(line).startswith(
("@@", "*** Update File:", "*** Add File:", "*** Delete File:")
) for line in lines)
if not is_patch_like:
# If it doesn't even look like a patch, return empty
self.io.tool_warning("Response does not appear to be in patch format.")
return []
# If it looks like a patch but lacks sentinels, try parsing anyway but warn.
self.io.tool_warning("Patch format warning: Missing '*** Begin Patch'/'*** End Patch' sentinels.")
start_index = 0
else:
start_index = 1 # Skip "*** Begin Patch"
while i < len(lines): # Identify files needed for context lookups during parsing
line = lines[i] needed_paths = identify_files_needed(content)
i += 1 current_files: Dict[str, str] = {}
for rel_path in needed_paths:
abs_path = self.abs_root_path(rel_path)
try:
# Use io.read_text to handle potential errors/encodings
file_content = self.io.read_text(abs_path)
if file_content is None:
raise DiffError(f"File referenced in patch not found or could not be read: {rel_path}")
current_files[rel_path] = file_content
except FileNotFoundError:
raise DiffError(f"File referenced in patch not found: {rel_path}")
except IOError as e:
raise DiffError(f"Error reading file {rel_path}: {e}")
if line.strip() == "*** Begin Patch":
in_patch = True
continue
if not in_patch:
continue
if line.strip() == "*** End Patch":
if current_action:
edits.append(current_action)
in_patch = False
break # End of patch found
# Match Action lines (Update, Add, Delete) try:
match = re.match(r"\*\*\* (Update|Add|Delete) File: (.*)", line) # Parse the patch text using adapted logic
if match: patch_obj = self._parse_patch_text(lines, start_index, current_files)
if current_action: # Convert Patch object actions dict to a list
edits.append(current_action) # Save previous action return list(patch_obj.actions.values())
except DiffError as e:
# Raise as ValueError for consistency with other coders' error handling
raise ValueError(f"Error parsing patch content: {e}")
except Exception as e:
# Catch unexpected errors during parsing
raise ValueError(f"Unexpected error parsing patch: {e}")
action_type_str, path = match.groups()
action_type = ActionType(action_type_str)
path = path.strip()
current_action = PatchAction(type=action_type, path=path)
# Check for optional Move to line immediately after Update def _parse_patch_text(
if action_type == ActionType.UPDATE and i < len(lines): self, lines: List[str], start_index: int, current_files: Dict[str, str]
move_match = re.match(r"\*\*\* Move to: (.*)", lines[i]) ) -> Patch:
if move_match: """
current_action.move_path = move_match.group(1).strip() Parses patch content lines into a Patch object.
i += 1 # Consume the move line Adapted from the Parser class in apply_patch.py.
"""
patch = Patch()
index = start_index
fuzz_accumulator = 0
while index < len(lines):
line = lines[index]
norm_line = _norm(line)
if norm_line == "*** End Patch":
index += 1
break # Successfully reached end
# ---------- UPDATE ---------- #
if norm_line.startswith("*** Update File: "):
path = norm_line[len("*** Update File: ") :].strip()
index += 1
if not path: raise DiffError("Update File action missing path.")
if path in patch.actions: raise DiffError(f"Duplicate action for file: {path}")
if path not in current_files: raise DiffError(f"Update File Error - missing file content for: {path}")
move_to = None
if index < len(lines) and _norm(lines[index]).startswith("*** Move to: "):
move_to = _norm(lines[index])[len("*** Move to: ") :].strip()
index += 1
if not move_to: raise DiffError("Move to action missing path.")
file_content = current_files[path]
action, index, fuzz = self._parse_update_file_sections(lines, index, file_content)
action.path = path # Ensure path is set
action.move_path = move_to
patch.actions[path] = action
fuzz_accumulator += fuzz
continue continue
if not current_action: # ---------- DELETE ---------- #
# Skip lines before the first action inside the patch elif norm_line.startswith("*** Delete File: "):
path = norm_line[len("*** Delete File: ") :].strip()
index += 1
if not path: raise DiffError("Delete File action missing path.")
if path in patch.actions: raise DiffError(f"Duplicate action for file: {path}")
if path not in current_files: raise DiffError(f"Delete File Error - file not found: {path}") # Check against known files
patch.actions[path] = PatchAction(type=ActionType.DELETE, path=path)
continue continue
# Handle content for Add action # ---------- ADD ---------- #
if current_action.type == ActionType.ADD: elif norm_line.startswith("*** Add File: "):
if current_action.new_content is None: path = norm_line[len("*** Add File: ") :].strip()
current_action.new_content = "" index += 1
# Assuming ADD content starts immediately and uses '+' prefix if not path: raise DiffError("Add File action missing path.")
if line.startswith("+"): if path in patch.actions: raise DiffError(f"Duplicate action for file: {path}")
current_action.new_content += line[1:] + "\n" # Check if file exists in the context provided (should not for Add)
# Note: We don't have *all* files, just needed ones. A full check requires FS access.
# if path in current_files: raise DiffError(f"Add File Error - file already exists: {path}")
action, index = self._parse_add_file_content(lines, index)
action.path = path # Ensure path is set
patch.actions[path] = action
continue
# If we are here, the line is unexpected
# Allow blank lines between actions
if not norm_line.strip():
index += 1
continue
raise DiffError(f"Unknown or misplaced line while parsing patch: {line}")
# Check if we consumed the whole input or stopped early
# Tolerate missing "*** End Patch" if we processed actions
# if index < len(lines) and _norm(lines[index-1]) != "*** End Patch":
# raise DiffError("Patch parsing finished unexpectedly before end of input.")
patch.fuzz = fuzz_accumulator
return patch
def _parse_update_file_sections(
self, lines: List[str], index: int, file_content: str
) -> Tuple[PatchAction, int, int]:
"""Parses all sections (@@, context, -, +) for a single Update File action."""
action = PatchAction(type=ActionType.UPDATE, path="") # Path set by caller
orig_lines = file_content.splitlines() # Use splitlines for consistency
current_file_index = 0 # Track position in original file content
total_fuzz = 0
while index < len(lines):
norm_line = _norm(lines[index])
# Check for terminators for *this* file update
if norm_line.startswith(
(
"*** End Patch",
"*** Update File:",
"*** Delete File:",
"*** Add File:",
)
):
break # End of this file's update section
# Handle @@ scope lines (optional)
scope_lines = []
while index < len(lines) and _norm(lines[index]).startswith("@@"):
scope_line_content = lines[index][len("@@") :].strip()
if scope_line_content: # Ignore empty @@ lines?
scope_lines.append(scope_line_content)
index += 1
# Find the scope in the original file if specified
if scope_lines:
# Simple scope finding: search from current position
# A more robust finder could handle nested scopes like the reference @@ @@
found_scope = False
temp_index = current_file_index
while temp_index < len(orig_lines):
# Check if all scope lines match sequentially from temp_index
match = True
for i, scope in enumerate(scope_lines):
if temp_index + i >= len(orig_lines) or _norm(orig_lines[temp_index + i]).strip() != scope:
match = False
break
if match:
current_file_index = temp_index + len(scope_lines)
found_scope = True
break
temp_index += 1
if not found_scope:
# Try fuzzy scope matching (strip whitespace)
temp_index = current_file_index
while temp_index < len(orig_lines):
match = True
for i, scope in enumerate(scope_lines):
if temp_index + i >= len(orig_lines) or _norm(orig_lines[temp_index + i]).strip() != scope.strip():
match = False
break
if match:
current_file_index = temp_index + len(scope_lines)
found_scope = True
total_fuzz += 1 # Add fuzz for scope match difference
break
temp_index += 1
if not found_scope:
scope_txt = "\n".join(scope_lines)
raise DiffError(f"Could not find scope context:\n{scope_txt}")
# Peek and parse the next context/change section
context_block, chunks_in_section, next_index, is_eof = peek_next_section(lines, index)
# Find where this context block appears in the original file
found_index, fuzz = find_context(orig_lines, context_block, current_file_index, is_eof)
total_fuzz += fuzz
if found_index == -1:
ctx_txt = "\n".join(context_block)
marker = "*** End of File" if is_eof else ""
raise DiffError(
f"Could not find patch context {marker} starting near line"
f" {current_file_index}:\n{ctx_txt}"
)
# Adjust chunk original indices to be absolute within the file
for chunk in chunks_in_section:
# chunk.orig_index from peek is relative to context_block start
# We need it relative to the file start
chunk.orig_index += found_index
action.chunks.append(chunk)
# Advance file index past the matched context block
current_file_index = found_index + len(context_block)
# Advance line index past the processed section in the patch
index = next_index
return action, index, total_fuzz
def _parse_add_file_content(
self, lines: List[str], index: int
) -> Tuple[PatchAction, int]:
"""Parses the content (+) lines for an Add File action."""
added_lines: List[str] = []
while index < len(lines):
line = lines[index]
norm_line = _norm(line)
# Stop if we hit another action or end marker
if norm_line.startswith(
(
"*** End Patch",
"*** Update File:",
"*** Delete File:",
"*** Add File:",
)
):
break
# Expect lines to start with '+'
if not line.startswith("+"):
# Tolerate blank lines? Or require '+'? Reference implies '+' required.
if norm_line.strip() == "":
# Treat blank line as adding a blank line
added_lines.append("")
else: else:
# Or maybe ADD content is just raw lines until next ***? raise DiffError(f"Invalid Add File line (missing '+'): {line}")
# This part needs clarification based on exact format spec. else:
# Assuming '+' prefix for now. If not, adjust logic. added_lines.append(line[1:]) # Strip leading '+'
pass # Ignore lines not starting with '+' in ADD? Or raise error?
continue
# Handle chunks for Update action index += 1
if current_action.type == ActionType.UPDATE:
# This simplified parser doesn't handle @@ context or chunk boundaries well.
# It assumes a simple sequence of context, '-', '+' lines per chunk.
# A real implementation needs the state machine from apply_patch.py's
# peek_next_section.
# Placeholder: treat consecutive -,+ blocks as single chunk for simplicity.
if not current_action.chunks:
current_action.chunks.append(Chunk()) # Start first chunk
chunk = current_action.chunks[-1] action = PatchAction(type=ActionType.ADD, path="", new_content="\n".join(added_lines))
return action, index
if line.startswith("-"):
chunk.del_lines.append(line[1:])
elif line.startswith("+"):
chunk.ins_lines.append(line[1:])
elif line.startswith("@@"):
# Context line - ignored by this simplified parser
pass
elif line.strip() == "*** End of File":
# EOF marker - ignored by this simplified parser
pass
else:
# Assume it's context line if not +/-/@@
# This simplified parser doesn't store context properly.
pass
continue
if in_patch and not current_action:
# Started patch but no actions found before end?
pass # Or raise error?
if in_patch and current_action:
# Reached end of content without *** End Patch
edits.append(current_action) # Append the last action
# Consider raising a warning or error about missing End Patch sentinel
return edits
def apply_edits(self, edits: List[PatchAction]): def apply_edits(self, edits: List[PatchAction]):
""" """
@ -168,124 +507,143 @@ class PatchCoder(Coder):
if not edits: if not edits:
return return
# Group edits by original path for easier processing, especially moves
# We don't strictly need this grouping if we process sequentially and handle moves correctly.
for action in edits: for action in edits:
full_path = self.abs_root_path(action.path) full_path = self.abs_root_path(action.path)
path_obj = pathlib.Path(full_path) path_obj = pathlib.Path(full_path)
try: try:
if action.type == ActionType.ADD: if action.type == ActionType.ADD:
# Check existence *before* writing
if path_obj.exists(): if path_obj.exists():
# According to apply_patch.py, Add should fail if file exists. raise DiffError(f"ADD Error: File already exists: {action.path}")
# This check should ideally happen during parsing with file content access.
raise ValueError(f"ADD Error: File already exists: {action.path}")
if action.new_content is None: if action.new_content is None:
raise ValueError(f"ADD change for {action.path} has no content") # Parser should ensure this doesn't happen
# Ensure parent directory exists raise DiffError(f"ADD change for {action.path} has no content")
self.io.tool_output(f"Adding {action.path}")
path_obj.parent.mkdir(parents=True, exist_ok=True) path_obj.parent.mkdir(parents=True, exist_ok=True)
self.io.write_text( # Ensure single trailing newline, matching reference behavior
full_path, action.new_content.rstrip("\n") + "\n" content_to_write = action.new_content
) # Ensure single trailing newline if not content_to_write.endswith('\n'):
content_to_write += '\n'
self.io.write_text(full_path, content_to_write)
elif action.type == ActionType.DELETE: elif action.type == ActionType.DELETE:
self.io.tool_output(f"Deleting {action.path}")
if not path_obj.exists(): if not path_obj.exists():
# Allow deleting non-existent files (idempotent) self.io.tool_warning(f"DELETE Warning: File not found, skipping: {action.path}")
pass
else: else:
path_obj.unlink() path_obj.unlink()
elif action.type == ActionType.UPDATE: elif action.type == ActionType.UPDATE:
if not path_obj.exists(): if not path_obj.exists():
# Update should fail if file doesn't exist raise DiffError(f"UPDATE Error: File does not exist: {action.path}")
# (checked in apply_patch.py parser).
raise ValueError(f"UPDATE Error: File does not exist: {action.path}")
current_content = self.io.read_text(full_path) current_content = self.io.read_text(full_path)
if current_content is None: if current_content is None:
raise ValueError(f"Could not read file for UPDATE: {action.path}") # Should have been caught during parsing if file was needed
raise DiffError(f"Could not read file for UPDATE: {action.path}")
# Apply the update logic using the parsed chunks # Apply the update logic using the parsed chunks
new_content = self._apply_update(current_content, action.chunks, action.path) new_content = self._apply_update(current_content, action, action.path)
target_full_path = ( target_full_path = self.abs_root_path(action.move_path) if action.move_path else full_path
self.abs_root_path(action.move_path) if action.move_path else full_path
)
target_path_obj = pathlib.Path(target_full_path) target_path_obj = pathlib.Path(target_full_path)
if action.move_path:
self.io.tool_output(f"Updating and moving {action.path} to {action.move_path}")
# Check if target exists before overwriting/moving
if target_path_obj.exists() and full_path != target_full_path:
self.io.tool_warning(f"UPDATE Warning: Target file for move already exists, overwriting: {action.move_path}")
else:
self.io.tool_output(f"Updating {action.path}")
# Ensure parent directory exists for target # Ensure parent directory exists for target
target_path_obj.parent.mkdir(parents=True, exist_ok=True) target_path_obj.parent.mkdir(parents=True, exist_ok=True)
self.io.write_text(target_full_path, new_content) self.io.write_text(target_full_path, new_content)
# Remove original file *after* successful write to new location if moved
if action.move_path and full_path != target_full_path: if action.move_path and full_path != target_full_path:
# Remove original file after successful write to new location
path_obj.unlink() path_obj.unlink()
else: else:
raise ValueError(f"Unknown action type encountered: {action.type}") # Should not happen
raise DiffError(f"Unknown action type encountered: {action.type}")
except Exception as e: except (DiffError, FileNotFoundError, IOError, OSError) as e:
# Raise a ValueError to signal failure, consistent with other coders. # Raise a ValueError to signal failure, consistent with other coders.
raise ValueError(f"Error applying action '{action.type}' to {action.path}: {e}") raise ValueError(f"Error applying action '{action.type}' to {action.path}: {e}")
except Exception as e:
# Catch unexpected errors during application
raise ValueError(f"Unexpected error applying action '{action.type}' to {action.path}: {e}")
def _apply_update(self, text: str, chunks: List[Chunk], path: str) -> str:
def _apply_update(self, text: str, action: PatchAction, path: str) -> str:
""" """
Applies UPDATE chunks to the given text content. Applies UPDATE chunks to the given text content.
Requires accurate chunk information (indices, lines) from a robust parser. Adapted from _get_updated_file in apply_patch.py.
This simplified version assumes chunks are sequential and indices are correct.
""" """
if not chunks: if action.type is not ActionType.UPDATE:
return text # No changes specified # Should not be called otherwise, but check for safety
raise DiffError("_apply_update called with non-update action")
orig_lines = text.splitlines() # Use splitlines() to match apply_patch.py behavior orig_lines = text.splitlines() # Use splitlines to handle endings consistently
dest_lines = [] dest_lines: List[str] = []
# last_orig_line_idx = -1 # Track the end of the last applied chunk in original lines current_orig_line_idx = 0 # Tracks index in orig_lines processed so far
# apply_patch.py finds context during parsing. Here we assume indices are pre-validated. # Sort chunks by their original index to apply them sequentially
# A robust implementation would re-validate context here or rely entirely on parser sorted_chunks = sorted(action.chunks, key=lambda c: c.orig_index)
# validation.
# Sort chunks? apply_patch.py implies they are processed in order found in patch. for chunk in sorted_chunks:
# Chunks need accurate `orig_index` relative to the start of their *context* block. # chunk.orig_index is the absolute line number where the change starts
# This simplified implementation lacks proper context handling and index calculation. # (specifically, where the first deleted line was, or where inserted lines go if no deletes)
# It assumes `orig_index` is the absolute line number from start of file, which is incorrect chunk_start_index = chunk.orig_index
# based on apply_patch.py.
# --> THIS METHOD NEEDS REWRITING BASED ON A CORRECT PARSER <--
# For demonstration, let's process sequentially, assuming indices are somewhat meaningful.
current_orig_line_num = 0 if chunk_start_index < current_orig_line_idx:
for chunk in chunks: # This indicates overlapping chunks or incorrect indices from parsing
# Placeholder: Assume chunk application logic here. raise DiffError(
# This needs the sophisticated context matching and index handling from apply_patch.py. f"{path}: Overlapping or out-of-order chunk detected."
# The current simplified parser doesn't provide enough info (like validated indices). f" Current index {current_orig_line_idx}, chunk starts at {chunk_start_index}."
# Raising NotImplementedError until a proper parser/applier is integrated. )
raise NotImplementedError(
"_apply_update requires a robust parser and context handling, similar to"
" apply_patch.py"
)
# --- Hypothetical logic assuming correct indices --- # Add lines from original file between the last chunk and this one
# chunk_start_index = chunk.orig_index # Needs correct calculation based on context dest_lines.extend(orig_lines[current_orig_line_idx : chunk_start_index])
# if chunk_start_index < current_orig_line_num:
# raise ValueError(f"{path}: Overlapping or out-of-order chunk detected.")
#
# # Add lines between the last chunk and this one
# dest_lines.extend(orig_lines[current_orig_line_num:chunk_start_index])
#
# # Verify deleted lines match (requires normalization)
# num_del = len(chunk.del_lines)
# actual_deleted = orig_lines[chunk_start_index : chunk_start_index + num_del]
# # if normalized(actual_deleted) != normalized(chunk.del_lines):
# # raise ValueError(
# # f"{path}: Mismatch in deleted lines for chunk at index {chunk_start_index}"
# # )
#
# # Add inserted lines
# dest_lines.extend(chunk.ins_lines)
#
# # Advance index past the deleted lines
# current_orig_line_num = chunk_start_index + num_del
# --- End Hypothetical ---
# Add remaining lines after the last chunk # Verify that the lines to be deleted actually match the original file content
dest_lines.extend(orig_lines[current_orig_line_num:]) # (The parser should have used find_context, but double-check here)
num_del = len(chunk.del_lines)
actual_deleted_lines = orig_lines[chunk_start_index : chunk_start_index + num_del]
return "\n".join(dest_lines) + "\n" # Ensure trailing newline # Use the same normalization as find_context_core for comparison robustness
norm_chunk_del = [_norm(s).strip() for s in chunk.del_lines]
norm_actual_del = [_norm(s).strip() for s in actual_deleted_lines]
if norm_chunk_del != norm_actual_del:
# This indicates the context matching failed or the file changed since parsing
# Provide detailed error message
expected_str = "\n".join(f"- {s}" for s in chunk.del_lines)
actual_str = "\n".join(f" {s}" for s in actual_deleted_lines)
raise DiffError(
f"{path}: Mismatch applying patch near line {chunk_start_index + 1}.\n"
f"Expected lines to remove:\n{expected_str}\n"
f"Found lines in file:\n{actual_str}"
)
# Add the inserted lines from the chunk
dest_lines.extend(chunk.ins_lines)
# Advance the original line index past the lines processed (deleted lines)
current_orig_line_idx = chunk_start_index + num_del
# Add any remaining lines from the original file after the last chunk
dest_lines.extend(orig_lines[current_orig_line_idx:])
# Join lines and ensure a single trailing newline
result = "\n".join(dest_lines)
if result or orig_lines: # Add newline unless result is empty and original was empty
result += "\n"
return result