import os import re import subprocess import sys from collections import OrderedDict from pathlib import Path import git from aider import models, prompts, voice from aider.help import Help, install_help_extra from aider.llm import litellm from aider.scrape import Scraper, install_playwright from aider.utils import is_image_file from .dump import dump # noqa: F401 class SwitchCoder(Exception): def __init__(self, **kwargs): self.kwargs = kwargs class Commands: voice = None scraper = None def __init__(self, io, coder, voice_language=None, verify_ssl=True): self.io = io self.coder = coder self.verify_ssl = verify_ssl if voice_language == "auto": voice_language = None self.voice_language = voice_language self.help = None def cmd_model(self, args): "Switch to a new LLM" model_name = args.strip() model = models.Model(model_name) models.sanity_check_models(self.io, model) raise SwitchCoder(main_model=model) def cmd_chat_mode(self, args): "Switch to a new chat mode" from aider import coders ef = args.strip() valid_formats = OrderedDict( sorted( ( coder.edit_format, coder.__doc__.strip().split("\n")[0] if coder.__doc__ else "No description", ) for coder in coders.__all__ if getattr(coder, "edit_format", None) ) ) show_formats = OrderedDict( [ ("help", "Get help about using aider (usage, config, troubleshoot)."), ("ask", "Ask questions about your code without making any changes."), ("code", "Ask for changes to your code (using the best edit format)."), ] ) if ef not in valid_formats and ef not in show_formats: if ef: self.io.tool_error(f'Chat mode "{ef}" should be one of these:\n') else: self.io.tool_output("Chat mode should be one of these:\n") max_format_length = max(len(format) for format in valid_formats.keys()) for format, description in show_formats.items(): self.io.tool_output(f"- {format:<{max_format_length}} : {description}") self.io.tool_output("\nOr a valid edit format:\n") for format, description in valid_formats.items(): if format not in show_formats: self.io.tool_output(f"- {format:<{max_format_length}} : {description}") return summarize_from_coder = True edit_format = ef if ef == "code": edit_format = self.coder.main_model.edit_format summarize_from_coder = False elif ef == "ask": summarize_from_coder = False raise SwitchCoder( edit_format=edit_format, summarize_from_coder=summarize_from_coder, ) def completions_model(self): models = litellm.model_cost.keys() return models def cmd_models(self, args): "Search the list of available models" args = args.strip() if args: models.print_matching_models(self.io, args) else: self.io.tool_output("Please provide a partial model name to search for.") def cmd_web(self, args): "Use headless selenium to scrape a webpage and add the content to the chat" url = args.strip() if not url: self.io.tool_error("Please provide a URL to scrape.") return if not self.scraper: res = install_playwright(self.io) if not res: self.io.tool_error("Unable to initialize playwright.") self.scraper = Scraper( print_error=self.io.tool_error, playwright_available=res, verify_ssl=self.verify_ssl ) content = self.scraper.scrape(url) or "" # if content: # self.io.tool_output(content) content = f"{url}:\n\n" + content return content def is_command(self, inp): return inp[0] in "/!" def get_completions(self, cmd): assert cmd.startswith("/") cmd = cmd[1:] fun = getattr(self, f"completions_{cmd}", None) if not fun: return [] return sorted(fun()) def get_commands(self): commands = [] for attr in dir(self): if not attr.startswith("cmd_"): continue cmd = attr[4:] cmd = cmd.replace("_", "-") commands.append("/" + cmd) return commands def do_run(self, cmd_name, args): cmd_name = cmd_name.replace("-", "_") cmd_method_name = f"cmd_{cmd_name}" cmd_method = getattr(self, cmd_method_name, None) if cmd_method: return cmd_method(args) else: self.io.tool_output(f"Error: Command {cmd_name} not found.") def matching_commands(self, inp): words = inp.strip().split() if not words: return first_word = words[0] rest_inp = inp[len(words[0]) :] all_commands = self.get_commands() matching_commands = [cmd for cmd in all_commands if cmd.startswith(first_word)] return matching_commands, first_word, rest_inp def run(self, inp): if inp.startswith("!"): return self.do_run("run", inp[1:]) res = self.matching_commands(inp) if res is None: return matching_commands, first_word, rest_inp = res if len(matching_commands) == 1: return self.do_run(matching_commands[0][1:], rest_inp) elif first_word in matching_commands: return self.do_run(first_word[1:], rest_inp) elif len(matching_commands) > 1: self.io.tool_error(f"Ambiguous command: {', '.join(matching_commands)}") else: self.io.tool_error(f"Invalid command: {first_word}") # any method called cmd_xxx becomes a command automatically. # each one must take an args param. def cmd_commit(self, args=None): "Commit edits to the repo made outside the chat (commit message optional)" if not self.coder.repo: self.io.tool_error("No git repository found.") return if not self.coder.repo.is_dirty(): self.io.tool_error("No more changes to commit.") return commit_message = args.strip() if args else None self.coder.repo.commit(message=commit_message) def cmd_lint(self, args="", fnames=None): "Lint and fix provided files or in-chat files if none provided" if not self.coder.repo: self.io.tool_error("No git repository found.") return if not fnames: fnames = self.coder.get_inchat_relative_files() dump(fnames) # If still no files, get all dirty files in the repo if not fnames and self.coder.repo: fnames = [item.a_path for item in self.coder.repo.repo.index.diff(None)] dump(fnames) if not fnames: self.io.tool_error("No dirty files to lint.") return lint_coder = None for fname in fnames: try: errors = self.coder.linter.lint(fname) except FileNotFoundError as err: self.io.tool_error(f"Unable to lint {fname}") self.io.tool_error(str(err)) continue if not errors: continue self.io.tool_error(errors) if not self.io.confirm_ask(f"Fix lint errors in {fname}?", default="y"): continue # Commit everything before we start fixing lint errors if self.coder.repo.is_dirty(): self.cmd_commit("") if not lint_coder: lint_coder = self.coder.clone( # Clear the chat history, fnames cur_messages=[], done_messages=[], fnames=None, ) lint_coder.add_rel_fname(fname) lint_coder.run(errors) lint_coder.abs_fnames = set() if lint_coder and self.coder.repo.is_dirty(): self.cmd_commit("") def cmd_clear(self, args): "Clear the chat history" self.coder.done_messages = [] self.coder.cur_messages = [] def cmd_tokens(self, args): "Report on the number of tokens used by the current chat context" res = [] self.coder.choose_fence() # system messages main_sys = self.coder.fmt_system_prompt(self.coder.gpt_prompts.main_system) main_sys += "\n" + self.coder.fmt_system_prompt(self.coder.gpt_prompts.system_reminder) msgs = [ dict(role="system", content=main_sys), dict( role="system", content=self.coder.fmt_system_prompt(self.coder.gpt_prompts.system_reminder), ), ] tokens = self.coder.main_model.token_count(msgs) res.append((tokens, "system messages", "")) # chat history msgs = self.coder.done_messages + self.coder.cur_messages if msgs: msgs = [dict(role="dummy", content=msg) for msg in msgs] tokens = self.coder.main_model.token_count(msgs) res.append((tokens, "chat history", "use /clear to clear")) # repo map other_files = set(self.coder.get_all_abs_files()) - set(self.coder.abs_fnames) if self.coder.repo_map: repo_content = self.coder.repo_map.get_repo_map(self.coder.abs_fnames, other_files) if repo_content: tokens = self.coder.main_model.token_count(repo_content) res.append((tokens, "repository map", "use --map-tokens to resize")) # files for fname in self.coder.abs_fnames: relative_fname = self.coder.get_rel_fname(fname) content = self.io.read_text(fname) if is_image_file(relative_fname): tokens = self.coder.main_model.token_count_for_image(fname) else: # approximate content = f"{relative_fname}\n```\n" + content + "```\n" tokens = self.coder.main_model.token_count(content) res.append((tokens, f"{relative_fname}", "use /drop to drop from chat")) self.io.tool_output("Approximate context window usage, in tokens:") self.io.tool_output() width = 8 cost_width = 9 def fmt(v): return format(int(v), ",").rjust(width) col_width = max(len(row[1]) for row in res) cost_pad = " " * cost_width total = 0 total_cost = 0.0 for tk, msg, tip in res: total += tk cost = tk * self.coder.main_model.info.get("input_cost_per_token", 0) total_cost += cost msg = msg.ljust(col_width) self.io.tool_output(f"${cost:7.4f} {fmt(tk)} {msg} {tip}") # noqa: E231 self.io.tool_output("=" * (width + cost_width + 1)) self.io.tool_output(f"${total_cost:7.4f} {fmt(total)} tokens total") # noqa: E231 limit = self.coder.main_model.info.get("max_input_tokens", 0) if not limit: return remaining = limit - total if remaining > 1024: self.io.tool_output(f"{cost_pad}{fmt(remaining)} tokens remaining in context window") elif remaining > 0: self.io.tool_error( f"{cost_pad}{fmt(remaining)} tokens remaining in context window (use /drop or" " /clear to make space)" ) else: self.io.tool_error( f"{cost_pad}{fmt(remaining)} tokens remaining, window exhausted (use /drop or" " /clear to make space)" ) self.io.tool_output(f"{cost_pad}{fmt(limit)} tokens max context window size") def cmd_undo(self, args): "Undo the last git commit if it was done by aider" if not self.coder.repo: self.io.tool_error("No git repository found.") return last_commit = self.coder.repo.repo.head.commit if not last_commit.parents: self.io.tool_error("This is the first commit in the repository. Cannot undo.") return prev_commit = last_commit.parents[0] changed_files_last_commit = [item.a_path for item in last_commit.diff(prev_commit)] for fname in changed_files_last_commit: if self.coder.repo.repo.is_dirty(path=fname): self.io.tool_error( f"The file {fname} has uncommitted changes. Please stash them before undoing." ) return # Check if the file was in the repo in the previous commit try: prev_commit.tree[fname] except KeyError: self.io.tool_error( f"The file {fname} was not in the repository in the previous commit. Cannot" " undo safely." ) return local_head = self.coder.repo.repo.git.rev_parse("HEAD") current_branch = self.coder.repo.repo.active_branch.name try: remote_head = self.coder.repo.repo.git.rev_parse(f"origin/{current_branch}") has_origin = True except git.exc.GitCommandError: has_origin = False if has_origin: if local_head == remote_head: self.io.tool_error( "The last commit has already been pushed to the origin. Undoing is not" " possible." ) return last_commit_hash = self.coder.repo.repo.head.commit.hexsha[:7] last_commit_message = self.coder.repo.repo.head.commit.message.strip() if last_commit_hash not in self.coder.aider_commit_hashes: self.io.tool_error("The last commit was not made by aider in this chat session.") self.io.tool_error( "You could try `/git reset --hard HEAD^` but be aware that this is a destructive" " command!" ) return # Reset only the files which are part of `last_commit` for file_path in changed_files_last_commit: self.coder.repo.repo.git.checkout("HEAD~1", file_path) # Move the HEAD back before the latest commit self.coder.repo.repo.git.reset("--soft", "HEAD~1") self.io.tool_output(f"Removed: {last_commit_hash} {last_commit_message}") # Get the current HEAD after undo current_head_hash = self.coder.repo.repo.head.commit.hexsha[:7] current_head_message = self.coder.repo.repo.head.commit.message.strip() self.io.tool_output(f"HEAD is: {current_head_hash} {current_head_message}") if self.coder.main_model.send_undo_reply: return prompts.undo_command_reply def cmd_diff(self, args=""): "Display the diff of the last aider commit" if not self.coder.repo: self.io.tool_error("No git repository found.") return last_commit_hash = self.coder.repo.repo.head.commit.hexsha[:7] if last_commit_hash not in self.coder.aider_commit_hashes: self.io.tool_error(f"Last commit {last_commit_hash} was not an aider commit.") self.io.tool_error("You could try `/git diff` or `/git diff HEAD^`.") return diff = self.coder.repo.diff_commits( self.coder.pretty, "HEAD^", "HEAD", ) # don't use io.tool_output() because we don't want to log or further colorize print(diff) def quote_fname(self, fname): if " " in fname and '"' not in fname: fname = f'"{fname}"' return fname def completions_add(self): files = set(self.coder.get_all_relative_files()) files = files - set(self.coder.get_inchat_relative_files()) files = [self.quote_fname(fn) for fn in files] return files def glob_filtered_to_repo(self, pattern): try: if os.path.isabs(pattern): # Handle absolute paths raw_matched_files = [Path(pattern)] else: raw_matched_files = list(Path(self.coder.root).glob(pattern)) except ValueError as err: self.io.tool_error(f"Error matching {pattern}: {err}") raw_matched_files = [] matched_files = [] for fn in raw_matched_files: matched_files += expand_subdir(fn) matched_files = [ str(Path(fn).relative_to(self.coder.root)) for fn in matched_files if Path(fn).is_relative_to(self.coder.root) ] # if repo, filter against it if self.coder.repo: git_files = self.coder.repo.get_tracked_files() matched_files = [fn for fn in matched_files if str(fn) in git_files] res = list(map(str, matched_files)) return res def cmd_add(self, args): "Add files to the chat so GPT can edit them or review them in detail" added_fnames = [] all_matched_files = set() filenames = parse_quoted_filenames(args) for word in filenames: if Path(word).is_absolute(): fname = Path(word) else: fname = Path(self.coder.root) / word if self.coder.repo and self.coder.repo.ignored_file(fname): self.io.tool_error(f"Skipping {fname} that matches aiderignore spec.") continue if fname.exists(): if fname.is_file(): all_matched_files.add(str(fname)) continue # an existing dir, escape any special chars so they won't be globs word = re.sub(r"([\*\?\[\]])", r"[\1]", word) matched_files = self.glob_filtered_to_repo(word) if matched_files: all_matched_files.update(matched_files) continue if self.io.confirm_ask(f"No files matched '{word}'. Do you want to create {fname}?"): if "*" in str(fname) or "?" in str(fname): self.io.tool_error(f"Cannot create file with wildcard characters: {fname}") else: try: fname.touch() all_matched_files.add(str(fname)) except OSError as e: self.io.tool_error(f"Error creating file {fname}: {e}") for matched_file in all_matched_files: abs_file_path = self.coder.abs_root_path(matched_file) if not abs_file_path.startswith(self.coder.root) and not is_image_file(matched_file): self.io.tool_error( f"Can not add {abs_file_path}, which is not within {self.coder.root}" ) continue if abs_file_path in self.coder.abs_fnames: self.io.tool_error(f"{matched_file} is already in the chat") else: if is_image_file(matched_file) and not self.coder.main_model.accepts_images: self.io.tool_error( f"Cannot add image file {matched_file} as the" f" {self.coder.main_model.name} does not support image.\nYou can run `aider" " --4-turbo-vision` to use GPT-4 Turbo with Vision." ) continue content = self.io.read_text(abs_file_path) if content is None: self.io.tool_error(f"Unable to read {matched_file}") else: self.coder.abs_fnames.add(abs_file_path) self.io.tool_output(f"Added {matched_file} to the chat") self.coder.check_added_files() added_fnames.append(matched_file) if not added_fnames: return # only reply if there's been some chatting since the last edit if not self.coder.cur_messages: return reply = prompts.added_files.format(fnames=", ".join(added_fnames)) return reply def completions_drop(self): files = self.coder.get_inchat_relative_files() files = [self.quote_fname(fn) for fn in files] return files def cmd_drop(self, args=""): "Remove files from the chat session to free up context space" if not args.strip(): self.io.tool_output("Dropping all files from the chat session.") self.coder.abs_fnames = set() filenames = parse_quoted_filenames(args) for word in filenames: matched_files = self.glob_filtered_to_repo(word) if not matched_files: matched_files.append(word) for matched_file in matched_files: abs_fname = self.coder.abs_root_path(matched_file) if abs_fname in self.coder.abs_fnames: self.coder.abs_fnames.remove(abs_fname) self.io.tool_output(f"Removed {matched_file} from the chat") def cmd_git(self, args): "Run a git command" combined_output = None try: args = "git " + args env = dict(subprocess.os.environ) env["GIT_EDITOR"] = "true" result = subprocess.run( args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env, shell=True, ) combined_output = result.stdout except Exception as e: self.io.tool_error(f"Error running /git command: {e}") if combined_output is None: return self.io.tool_output(combined_output) def cmd_test(self, args): "Run a shell command and add the output to the chat on non-zero exit code" if not args and self.coder.test_cmd: args = self.coder.test_cmd if not callable(args): return self.cmd_run(args, True) errors = args() if not errors: return self.io.tool_error(errors, strip=False) return errors def cmd_run(self, args, add_on_nonzero_exit=False): "Run a shell command and optionally add the output to the chat (alias: !)" combined_output = None instructions = None try: result = subprocess.run( args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, shell=True, encoding=self.io.encoding, errors="replace", ) combined_output = result.stdout except Exception as e: self.io.tool_error(f"Error running command: {e}") if combined_output is None: return self.io.tool_output(combined_output) if add_on_nonzero_exit: add = result.returncode != 0 else: response = self.io.prompt_ask( "Add the output to the chat? (y/n/instructions): ", default="y" ).strip() if response.lower() in ["yes", "y"]: add = True elif response.lower() in ["no", "n"]: add = False else: add = True instructions = response if add: for line in combined_output.splitlines(): self.io.tool_output(line, log_only=True) msg = prompts.run_output.format( command=args, output=combined_output, ) if instructions: msg = instructions + "\n\n" + msg return msg def cmd_exit(self, args): "Exit the application" sys.exit() def cmd_quit(self, args): "Exit the application" sys.exit() def cmd_ls(self, args): "List all known files and indicate which are included in the chat session" files = self.coder.get_all_relative_files() other_files = [] chat_files = [] for file in files: abs_file_path = self.coder.abs_root_path(file) if abs_file_path in self.coder.abs_fnames: chat_files.append(file) else: other_files.append(file) if not chat_files and not other_files: self.io.tool_output("\nNo files in chat or git repo.") return if other_files: self.io.tool_output("Repo files not in the chat:\n") for file in other_files: self.io.tool_output(f" {file}") if chat_files: self.io.tool_output("\nFiles in chat:\n") for file in chat_files: self.io.tool_output(f" {file}") def basic_help(self): commands = sorted(self.get_commands()) pad = max(len(cmd) for cmd in commands) pad = "{cmd:" + str(pad) + "}" for cmd in commands: cmd_method_name = f"cmd_{cmd[1:]}".replace("-", "_") cmd_method = getattr(self, cmd_method_name, None) cmd = pad.format(cmd=cmd) if cmd_method: description = cmd_method.__doc__ self.io.tool_output(f"{cmd} {description}") else: self.io.tool_output(f"{cmd} No description available.") self.io.tool_output() self.io.tool_output("Use `/help ` to ask questions about how to use aider.") def cmd_help(self, args): "Ask questions about aider" if not args.strip(): self.basic_help() return from aider.coders import Coder if not self.help: res = install_help_extra(self.io) if not res: self.io.tool_error("Unable to initialize interactive help.") return self.help = Help() coder = Coder.create( io=self.io, from_coder=self.coder, edit_format="help", summarize_from_coder=False, map_tokens=512, map_mul_no_files=1, ) user_msg = self.help.ask(args) user_msg += """ # Announcement lines from when this session of aider was launched: """ user_msg += "\n".join(self.coder.get_announcements()) + "\n" assistant_msg = coder.run(user_msg) self.coder.cur_messages += [ dict(role="user", content=user_msg), dict(role="assistant", content=assistant_msg), ] def cmd_ask(self, args): "Ask questions about the code base without editing any files" if not args.strip(): self.io.tool_error("Please provide a question or topic for the chat.") return from aider.coders import Coder chat_coder = Coder.create( io=self.io, from_coder=self.coder, edit_format="ask", summarize_from_coder=False, ) user_msg = args assistant_msg = chat_coder.run(user_msg) self.coder.cur_messages += [ dict(role="user", content=user_msg), dict(role="assistant", content=assistant_msg), ] def get_help_md(self): "Show help about all commands in markdown" res = """ |Command|Description| |:------|:----------| """ commands = sorted(self.get_commands()) for cmd in commands: cmd_method_name = f"cmd_{cmd[1:]}".replace("-", "_") cmd_method = getattr(self, cmd_method_name, None) if cmd_method: description = cmd_method.__doc__ res += f"| **{cmd}** | {description} |\n" else: res += f"| **{cmd}** | |\n" res += "\n" return res def cmd_voice(self, args): "Record and transcribe voice input" if not self.voice: if "OPENAI_API_KEY" not in os.environ: self.io.tool_error("To use /voice you must provide an OpenAI API key.") return try: self.voice = voice.Voice() except voice.SoundDeviceError: self.io.tool_error( "Unable to import `sounddevice` and/or `soundfile`, is portaudio installed?" ) return history_iter = self.io.get_input_history() history = [] size = 0 for line in history_iter: if line.startswith("/"): continue if line in history: continue if size + len(line) > 1024: break size += len(line) history.append(line) history.reverse() history = "\n".join(history) try: text = self.voice.record_and_transcribe(history, language=self.voice_language) except litellm.OpenAIError as err: self.io.tool_error(f"Unable to use OpenAI whisper model: {err}") return if text: self.io.add_to_input_history(text) print() self.io.user_input(text, log_only=False) print() return text def expand_subdir(file_path): file_path = Path(file_path) if file_path.is_file(): yield file_path return if file_path.is_dir(): for file in file_path.rglob("*"): if file.is_file(): yield str(file) def parse_quoted_filenames(args): filenames = re.findall(r"\"(.+?)\"|(\S+)", args) filenames = [name for sublist in filenames for name in sublist if name] return filenames def get_help_md(): from aider.coders import Coder from aider.models import Model coder = Coder(Model("gpt-3.5-turbo"), None) md = coder.commands.get_help_md() return md def main(): md = get_help_md() print(md) if __name__ == "__main__": status = main() sys.exit(status)