Merge branch 'main' into watch

This commit is contained in:
Paul Gauthier 2024-11-26 10:30:25 -08:00
commit 139d89d817
78 changed files with 5012 additions and 2667 deletions

View file

@ -1,6 +1,7 @@
import base64
import os
import threading
import time
import webbrowser
from collections import defaultdict
from dataclasses import dataclass
@ -199,6 +200,7 @@ class InputOutput:
editingmode=EditingMode.EMACS,
fancy_input=True,
):
self.placeholder = None
self.never_prompts = set()
self.interrupted_partial_input = None
self.editingmode = editingmode
@ -336,14 +338,36 @@ class InputOutput:
self.tool_error("Use --encoding to set the unicode encoding.")
return
def write_text(self, filename, content):
def write_text(self, filename, content, max_retries=5, initial_delay=0.1):
"""
Writes content to a file, retrying with progressive backoff if the file is locked.
:param filename: Path to the file to write.
:param content: Content to write to the file.
:param max_retries: Maximum number of retries if a file lock is encountered.
:param initial_delay: Initial delay (in seconds) before the first retry.
"""
if self.dry_run:
return
try:
with open(str(filename), "w", encoding=self.encoding) as f:
f.write(content)
except OSError as err:
self.tool_error(f"Unable to write file {filename}: {err}")
delay = initial_delay
for attempt in range(max_retries):
try:
with open(str(filename), "w", encoding=self.encoding) as f:
f.write(content)
return # Successfully wrote the file
except PermissionError as err:
if attempt < max_retries - 1:
time.sleep(delay)
delay *= 2 # Exponential backoff
else:
self.tool_error(
f"Unable to write file {filename} after {max_retries} attempts: {err}"
)
raise
except OSError as err:
self.tool_error(f"Unable to write file {filename}: {err}")
raise
def rule(self):
if self.pretty:
@ -434,30 +458,33 @@ class InputOutput:
show = ". "
try:
try:
if self.prompt_session:
line = self.prompt_session.prompt(
show,
completer=completer_instance,
reserve_space_for_menu=4,
complete_style=CompleteStyle.MULTI_COLUMN,
style=style,
key_bindings=kb,
)
else:
line = input(show)
if self.prompt_session:
# Use placeholder if set, then clear it
default = self.placeholder or ""
self.placeholder = None
# Check if we were interrupted by a file change
if self.changed_files:
res = process_file_changes(self.changed_files)
self.changed_files = None
return res
line = self.prompt_session.prompt(
show,
default=default,
completer=completer_instance,
reserve_space_for_menu=4,
complete_style=CompleteStyle.MULTI_COLUMN,
style=style,
key_bindings=kb,
)
else:
line = input(show)
except EOFError:
return ""
except Exception as err:
dump(err)
# Check if we were interrupted by a file change
if self.changed_files:
res = process_file_changes(self.changed_files)
self.changed_files = None
return res
except EOFError:
return ""
except Exception as err:
dump(err)
except UnicodeEncodeError as err:
self.tool_error(str(err))
return ""
@ -466,13 +493,38 @@ class InputOutput:
stop_event.set()
watcher.join() # Thread should exit quickly due to stop_event
if line and line[0] == "{" and not multiline_input:
multiline_input = True
inp += line[1:] + "\n"
if line.strip("\r\n") and not multiline_input:
stripped = line.strip("\r\n")
if stripped == "{":
multiline_input = True
multiline_tag = None
inp += ""
elif stripped[0] == "{":
# Extract tag if it exists (only alphanumeric chars)
tag = "".join(c for c in stripped[1:] if c.isalnum())
if stripped == "{" + tag:
multiline_input = True
multiline_tag = tag
inp += ""
else:
inp = line
break
else:
inp = line
break
continue
elif line and line[-1] == "}" and multiline_input:
inp += line[:-1] + "\n"
break
elif multiline_input and line.strip():
if multiline_tag:
# Check if line is exactly "tag}"
if line.strip("\r\n") == f"{multiline_tag}}}":
break
else:
inp += line + "\n"
# Check if line is exactly "}"
elif line.strip("\r\n") == "}":
break
else:
inp += line + "\n"
elif multiline_input:
inp += line + "\n"
else:
@ -488,8 +540,8 @@ class InputOutput:
return
FileHistory(self.input_history_file).append_string(inp)
# Also add to the in-memory history if it exists
if hasattr(self, "session") and hasattr(self.session, "history"):
self.session.history.append_string(inp)
if self.prompt_session and self.prompt_session.history:
self.prompt_session.history.append_string(inp)
def get_input_history(self):
if not self.input_history_file:
@ -506,14 +558,17 @@ class InputOutput:
log_file.write(f"{role.upper()} {timestamp}\n")
log_file.write(content + "\n")
def display_user_input(self, inp):
if self.pretty and self.user_input_color:
style = dict(style=self.user_input_color)
else:
style = dict()
self.console.print(Text(inp), **style)
def user_input(self, inp, log_only=True):
if not log_only:
if self.pretty and self.user_input_color:
style = dict(style=self.user_input_color)
else:
style = dict()
self.console.print(Text(inp), **style)
self.display_user_input(inp)
prefix = "####"
if inp:
@ -533,11 +588,11 @@ class InputOutput:
hist = "\n" + content.strip() + "\n\n"
self.append_chat_history(hist)
def offer_url(self, url, prompt="Open URL for more info?"):
def offer_url(self, url, prompt="Open URL for more info?", allow_never=True):
"""Offer to open a URL in the browser, returns True if opened."""
if url in self.never_prompts:
return False
if self.confirm_ask(prompt, subject=url, allow_never=True):
if self.confirm_ask(prompt, subject=url, allow_never=allow_never):
webbrowser.open(url)
return True
return False
@ -736,6 +791,10 @@ class InputOutput:
self.console.print(show_resp)
def set_placeholder(self, placeholder):
"""Set a one-time placeholder text for the next input prompt."""
self.placeholder = placeholder
def print(self, message=""):
print(message)