refactor: Split watch_files() into testable components and add tests

This commit is contained in:
Paul Gauthier (aider) 2025-02-06 08:43:04 -08:00
parent 51938affc2
commit 5c9746e209
2 changed files with 84 additions and 33 deletions

View file

@ -110,43 +110,57 @@ class FileWatcher:
except Exception:
return
def get_roots_to_watch(self):
"""Determine which root paths to watch based on gitignore rules"""
if self.gitignore_spec:
roots = [
str(path)
for path in self.root.iterdir()
if not self.gitignore_spec.match_file(
path.relative_to(self.root).as_posix() + ("/" if path.is_dir() else "")
)
]
# Fallback to watching root if all top-level items are filtered out
return roots if roots else [str(self.root)]
return [str(self.root)]
def handle_changes(self, changes):
"""Process the detected changes and update state"""
if not changes:
return False
changed_files = {str(Path(change[1])) for change in changes}
self.changed_files.update(changed_files)
self.io.interrupt_input()
return True
def watch_files(self):
"""Watch for file changes and process them"""
try:
roots_to_watch = self.get_roots_to_watch()
for changes in watch(
*roots_to_watch,
watch_filter=self.filter_func,
stop_event=self.stop_event
):
if self.handle_changes(changes):
return
except Exception as e:
if self.verbose:
dump(f"File watcher error: {e}")
raise e
def start(self):
"""Start watching for file changes"""
self.stop_event = threading.Event()
self.changed_files = set()
def watch_files():
try:
# If a gitignore spec exists, filter out top-level entries that match it
if self.gitignore_spec:
roots_to_watch = [
str(path)
for path in self.root.iterdir()
if not self.gitignore_spec.match_file(
path.relative_to(self.root).as_posix() + ("/" if path.is_dir() else "")
)
]
# Fallback to watching root if all top-level items are filtered out
if not roots_to_watch:
roots_to_watch = [str(self.root)]
else:
roots_to_watch = [str(self.root)]
for changes in watch(
*roots_to_watch, watch_filter=self.filter_func, stop_event=self.stop_event
):
if not changes:
continue
changed_files = {str(Path(change[1])) for change in changes}
self.changed_files.update(changed_files)
self.io.interrupt_input()
return
except Exception as e:
if self.verbose:
dump(f"File watcher error: {e}")
raise e
self.watcher_thread = threading.Thread(target=watch_files, daemon=True)
self.watcher_thread = threading.Thread(
target=self.watch_files,
daemon=True
)
self.watcher_thread.start()
def stop(self):

View file

@ -61,6 +61,43 @@ def test_gitignore_patterns():
tmp_gitignore.unlink()
def test_get_roots_to_watch(tmp_path):
# Create a test directory structure
(tmp_path / "included").mkdir()
(tmp_path / "excluded").mkdir()
io = InputOutput(pretty=False, fancy_input=False, yes=False)
coder = MinimalCoder(io)
# Test with no gitignore
watcher = FileWatcher(coder, root=tmp_path)
roots = watcher.get_roots_to_watch()
assert len(roots) == 1
assert roots[0] == str(tmp_path)
# Test with gitignore
gitignore = tmp_path / ".gitignore"
gitignore.write_text("excluded/")
watcher = FileWatcher(coder, root=tmp_path, gitignores=[gitignore])
roots = watcher.get_roots_to_watch()
assert len(roots) == 1
assert Path(roots[0]).name == "included"
def test_handle_changes():
io = InputOutput(pretty=False, fancy_input=False, yes=False)
coder = MinimalCoder(io)
watcher = FileWatcher(coder)
# Test no changes
assert not watcher.handle_changes([])
assert len(watcher.changed_files) == 0
# Test with changes
changes = [('modified', '/path/to/file.py')]
assert watcher.handle_changes(changes)
assert len(watcher.changed_files) == 1
assert str(Path('/path/to/file.py')) in watcher.changed_files
def test_ai_comment_pattern():
# Create minimal IO and Coder instances for testing
class MinimalCoder: