refactor: create FileWatcher class to encapsulate file watching logic

This commit is contained in:
Paul Gauthier (aider) 2024-11-27 16:13:00 -08:00
parent af195a610c
commit 094d2e12a4
2 changed files with 96 additions and 99 deletions

View file

@ -392,28 +392,10 @@ class InputOutput:
):
self.rule()
# ai refactor this chunk ...
self.changed_files = None
stop_event = threading.Event()
def watch_files():
try:
gitignore = [str(Path(root) / ".gitignore")]
for changed in watch_source_files(
root, stop_event=stop_event, gitignores=gitignore, encoding=self.encoding
):
if changed:
self.changed_files = changed
self.interrupt_input()
break
except Exception as e:
self.tool_error(f"File watcher error: {e}")
raise e
# Start the watcher thread
watcher = threading.Thread(target=watch_files, daemon=True)
watcher.start()
# ... to here
# Initialize and start the file watcher
self.file_watcher = FileWatcher(root, encoding=self.encoding)
gitignore = [str(Path(root) / ".gitignore")]
self.file_watcher.start(gitignores=gitignore)
rel_fnames = list(rel_fnames)
show = ""
@ -476,9 +458,9 @@ class InputOutput:
line = input(show)
# Check if we were interrupted by a file change
if self.changed_files:
res = process_file_changes(self.changed_files)
self.changed_files = None
if changes := self.file_watcher.get_changes():
res = process_file_changes(changes)
self.file_watcher.changed_files = None
return res
except EOFError:
@ -489,9 +471,7 @@ class InputOutput:
self.tool_error(str(err))
return ""
finally:
# ai: we'll need to adjust this too
stop_event.set()
watcher.join() # Thread should exit quickly due to stop_event
self.file_watcher.stop()
if line.strip("\r\n") and not multiline_input:
stripped = line.strip("\r\n")

View file

@ -61,83 +61,100 @@ def load_gitignores(gitignore_paths: list[Path]) -> Optional[PathSpec]:
return PathSpec.from_lines(GitWildMatchPattern, patterns) if patterns else None
# ai: make a class for this that includes the code from io!
def watch_source_files(
directory: str,
stop_event=None,
gitignores: list[str] = None,
ignore_func=None,
encoding="utf-8",
) -> Set[str]:
"""
Watch for changes to source files in the given directory and its subdirectories.
Returns a set of changed file paths whenever changes are detected.
class FileWatcher:
"""Watches source files for changes and AI comments"""
def __init__(self, directory: str, encoding="utf-8"):
self.directory = directory
self.encoding = encoding
self.root = Path(directory)
self.root_abs = self.root.absolute()
self.stop_event = None
self.watcher_thread = None
self.changed_files = None
Args:
directory: Root directory to watch
stop_event: Threading event to signal when to stop watching
gitignores: List of paths to .gitignore files (optional)
ignore_func: Optional function that takes a path (relative to watched directory)
and returns True if it should be ignored
"""
root = Path(directory)
def create_filter_func(self, gitignore_spec, ignore_func):
"""Creates a filter function for the file watcher"""
def filter_func(change_type, path):
path_obj = Path(path)
path_abs = path_obj.absolute()
if VERBOSE:
dump(root)
if not path_abs.is_relative_to(self.root_abs):
return False
gitignore_paths = [Path(g) for g in gitignores] if gitignores else []
gitignore_spec = load_gitignores(gitignore_paths)
root_abs = root.absolute()
# Create a filter function that only accepts source files and respects gitignore
def filter_func(change_type, path):
path_obj = Path(path)
path_abs = path_obj.absolute()
if not path_abs.is_relative_to(root_abs):
return False
rel_path = path_abs.relative_to(root_abs)
if VERBOSE:
dump(rel_path)
if gitignore_spec and gitignore_spec.match_file(str(rel_path)):
return False
if ignore_func and ignore_func(rel_path):
return False
if not is_source_file(path_obj):
return False
if VERBOSE:
dump("ok", rel_path)
# Check if file contains AI markers
try:
with open(path_abs, encoding=encoding, errors="ignore") as f:
content = f.read()
res = bool(re.search(r"(?:#|//) *ai\b", content, re.IGNORECASE))
if VERBOSE:
dump(res)
return res
except (IOError, UnicodeDecodeError) as err:
rel_path = path_abs.relative_to(self.root_abs)
if VERBOSE:
dump(err)
return False
dump(rel_path)
# Watch the directory for changes
for changes in watch(root, watch_filter=filter_func, stop_event=stop_event):
# Convert the changes to a set of unique file paths
changed_files = {str(Path(change[1])) for change in changes}
result = {}
for file in changed_files:
if comments := get_ai_comment(file, encoding=encoding):
result[file] = comments
if gitignore_spec and gitignore_spec.match_file(str(rel_path)):
return False
if ignore_func and ignore_func(rel_path):
return False
if VERBOSE:
dump(result)
yield result
if not is_source_file(path_obj):
return False
if VERBOSE:
dump("ok", rel_path)
# Check if file contains AI markers
try:
with open(path_abs, encoding=self.encoding, errors="ignore") as f:
content = f.read()
res = bool(re.search(r"(?:#|//) *ai\b", content, re.IGNORECASE))
if VERBOSE:
dump(res)
return res
except (IOError, UnicodeDecodeError) as err:
if VERBOSE:
dump(err)
return False
return filter_func
def start(self, gitignores: list[str] = None, ignore_func=None):
"""Start watching for file changes"""
self.stop_event = threading.Event()
gitignore_paths = [Path(g) for g in gitignores] if gitignores else []
gitignore_spec = load_gitignores(gitignore_paths)
filter_func = self.create_filter_func(gitignore_spec, ignore_func)
def watch_files():
try:
for changes in watch(self.root, watch_filter=filter_func, stop_event=self.stop_event):
changed_files = {str(Path(change[1])) for change in changes}
result = {}
for file in changed_files:
if comments := get_ai_comment(file, encoding=self.encoding):
result[file] = comments
if VERBOSE:
dump(result)
if result:
self.changed_files = result
return
except Exception as e:
if VERBOSE:
dump(f"File watcher error: {e}")
raise e
self.watcher_thread = threading.Thread(target=watch_files, daemon=True)
self.watcher_thread.start()
def stop(self):
"""Stop watching for file changes"""
if self.stop_event:
self.stop_event.set()
if self.watcher_thread:
self.watcher_thread.join()
self.watcher_thread = None
self.stop_event = None
def get_changes(self):
"""Get any detected file changes"""
return self.changed_files
def get_ai_comment(filepath, encoding="utf-8"):