feat: Local Analytics Dashboard for Aider

feat: Initialize LocalAnalyticsCollector in main.py

feat: Display session data in local analytics dashboard

fix: Use cumulative data from last interaction for dashboard stats

fix: Extract initial query from diffs in local analytics collector.
This commit is contained in:
flanker 2025-05-13 18:48:24 +05:30
parent 4e0964046a
commit e8bee42d76
12 changed files with 1732 additions and 12 deletions

1
.gitignore vendored
View file

@ -16,4 +16,3 @@ aider/_version.py
.#*
.gitattributes
tmp.benchmarks/
local_analytics/

View file

@ -24,7 +24,7 @@ except ImportError: # Babel not installed we will fall back to a small mapp
Locale = None
from json.decoder import JSONDecodeError
from pathlib import Path
from typing import List
from typing import List, Optional
from rich.console import Console
@ -36,6 +36,8 @@ from aider.history import ChatSummary
from aider.io import ConfirmGroup, InputOutput
from aider.linter import Linter
from aider.llm import litellm
from local_analytics.local_analytics_collector import LocalAnalyticsCollector
from local_analytics.local_analytics_collector import LocalAnalyticsCollector
from aider.models import RETRY_TIMEOUT
from aider.reasoning_tags import (
REASONING_TAG,
@ -119,6 +121,7 @@ class Coder:
ignore_mentions = None
chat_language = None
file_watcher = None
# analytics_store is defined in __init__
@classmethod
def create(
@ -179,6 +182,7 @@ class Coder:
total_tokens_sent=from_coder.total_tokens_sent,
total_tokens_received=from_coder.total_tokens_received,
file_watcher=from_coder.file_watcher,
analytics_store=from_coder.analytics_store, # Pass along analytics_store
)
use_kwargs.update(update) # override to complete the switch
use_kwargs.update(kwargs) # override passed kwargs
@ -335,9 +339,11 @@ class Coder:
file_watcher=None,
auto_copy_context=False,
auto_accept_architect=True,
analytics_store=None, # Added for completeness, though set post-init
):
# Fill in a dummy Analytics if needed, but it is never .enable()'d
self.analytics = analytics if analytics is not None else Analytics()
self.analytics_store = analytics_store
self.event = self.analytics.event
self.chat_language = chat_language
@ -924,19 +930,31 @@ class Coder:
else:
message = user_message
while message:
self.reflected_message = None
list(self.send_message(message))
interaction_started_for_analytics = False
if self.analytics_store and self.analytics_store.enabled and message:
files_in_chat_for_interaction = self.get_inchat_relative_files()
# Start tracking a new user interaction for local analytics.
self.analytics_store.start_interaction(query=message, modified_files_in_chat=files_in_chat_for_interaction)
interaction_started_for_analytics = True
if not self.reflected_message:
break
try:
while message:
self.reflected_message = None
list(self.send_message(message)) # This is where LLM calls happen
if self.num_reflections >= self.max_reflections:
self.io.tool_warning(f"Only {self.max_reflections} reflections allowed, stopping.")
return
if not self.reflected_message:
break
if self.num_reflections >= self.max_reflections:
self.io.tool_warning(f"Only {self.max_reflections} reflections allowed, stopping.")
return
self.num_reflections += 1
message = self.reflected_message
finally:
if interaction_started_for_analytics and self.analytics_store and self.analytics_store.enabled:
self.analytics_store.end_interaction()
self.num_reflections += 1
message = self.reflected_message
def check_and_open_urls(self, exc, friendly_msg=None):
"""Check exception for URLs, offer to open in a browser, with user-friendly error msgs."""
@ -2379,6 +2397,8 @@ class Coder:
if res:
self.show_auto_commit_outcome(res)
commit_hash, commit_message = res
if self.analytics_store and self.analytics_store.enabled:
self.analytics_store.log_commit(commit_hash, commit_message)
return self.gpt_prompts.files_content_gpt_edits.format(
hash=commit_hash,
message=commit_message,

View file

@ -36,6 +36,7 @@ from aider.repo import ANY_GIT_ERROR, GitRepo
from aider.report import report_uncaught_exceptions
from aider.versioncheck import check_version, install_from_main_branch, install_upgrade
from aider.watch import FileWatcher
from local_analytics.local_analytics_collector import LocalAnalyticsCollector
from .dump import dump # noqa: F401
@ -658,6 +659,14 @@ def main(argv=None, input=None, output=None, force_git_root=None, return_coder=F
analytics.event("launched")
# Initialize LocalAnalyticsCollector
# It will register an atexit handler to save data and update the dashboard.
local_analytics_collector = None
if hasattr(args, 'dry_run'): # Check if dry_run attribute exists
local_analytics_collector = LocalAnalyticsCollector(io=io, git_root=git_root, enabled=not args.dry_run)
else:
local_analytics_collector = LocalAnalyticsCollector(io=io, git_root=git_root, enabled=True)
if args.gui and not return_coder:
if not check_streamlit_install(io):
analytics.event("exit", reason="Streamlit not installed")
@ -996,6 +1005,7 @@ def main(argv=None, input=None, output=None, force_git_root=None, return_coder=F
detect_urls=args.detect_urls,
auto_copy_context=args.copy_paste,
auto_accept_architect=args.auto_accept_architect,
analytics_store=local_analytics_collector, # Pass the collector instance
)
except UnknownEditFormat as err:
io.tool_error(str(err))
@ -1166,6 +1176,9 @@ def main(argv=None, input=None, output=None, force_git_root=None, return_coder=F
if "show_announcements" in kwargs:
del kwargs["show_announcements"]
# Ensure the new Coder uses the same LocalAnalyticsCollector instance
kwargs['analytics_store'] = local_analytics_collector
coder = Coder.create(**kwargs)
if switch.kwargs.get("show_announcements") is not False:

View file

@ -0,0 +1,2 @@
# This file can be empty.
# It makes the 'local_analytics' directory a Python package.

View file

@ -0,0 +1,63 @@
import json
from pathlib import Path
from collections import defaultdict
def calculate_cost_by_model(filepath):
"""
Reads session data from a JSONL file and calculates the total estimated cost per model.
"""
cost_by_model = defaultdict(float)
if not filepath.exists():
print(f"Error: Session data file not found at {filepath}")
return dict(cost_by_model) # Return empty dict if file not found
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
try:
data = json.loads(line)
# Iterate through the models used summary for this interaction
models_summary = data.get("models_used_summary", [])
if not isinstance(models_summary, list):
print(f"Warning: 'models_used_summary' is not a list in line: {line.strip()}")
continue
for model_info in models_summary:
if not isinstance(model_info, dict):
print(f"Warning: Item in 'models_used_summary' is not a dict in line: {line.strip()}")
continue
model_name = model_info.get("name", "Unknown Model")
cost = model_info.get("cost", 0.0)
# Ensure cost is a number before adding
if isinstance(cost, (int, float)):
cost_by_model[model_name] += cost
else:
print(f"Warning: Found non-numeric cost value for model '{model_name}': {cost} in line: {line.strip()}")
except json.JSONDecodeError as e:
print(f"Error decoding JSON from line: {line.strip()} - {e}")
except Exception as e:
print(f"An unexpected error occurred processing line: {line.strip()} - {e}")
return dict(cost_by_model) # Convert defaultdict to dict for final return
if __name__ == "__main__":
# Define the path to the session data file
BASE_DIR = Path(__file__).resolve().parent
SESSION_DATA_FILE = BASE_DIR / "session.jsonl"
cost_by_model = calculate_cost_by_model(SESSION_DATA_FILE)
print("Total Estimated Cost by Model:")
if cost_by_model:
# Sort models by cost descending
sorted_models = sorted(cost_by_model.items(), key=lambda item: item[1], reverse=True)
for model, cost in sorted_models:
print(f" {model}: ${cost:.4f}")
total_overall_cost = sum(cost_by_model.values())
print("-" * 30)
print(f"Total Estimated Cost (Overall): ${total_overall_cost:.4f}")
else:
print(" No cost data found.")

View file

@ -0,0 +1,97 @@
import json
from pathlib import Path
import re
# Define file paths (assuming they are in the same directory as the script)
BASE_DIR = Path(__file__).resolve().parent
SESSION_DATA_FILE = BASE_DIR / "session.jsonl"
# Regex to identify common code/diff starting lines.
# This regex checks if the *first line* of a query starts with one of these patterns.
CODE_DIFF_MARKERS_REGEX = re.compile(
r"^(```|diff --git|--- |\+\+\+ |@@ )"
)
def clean_query(query_text):
"""
Cleans the query text.
The cleaned query should be only the first line of the original query,
and should not be a code/diff line itself.
"""
if not isinstance(query_text, str) or not query_text.strip():
# Return as is if not a string, or if it's an empty/whitespace-only string
return query_text
# First, get the part of the query before any "```diff" block
query_before_diff = re.split(r"```diff", query_text, 1)[0]
# If the part before "```diff" is empty or just whitespace, return empty string
if not query_before_diff.strip():
return ""
# Now, take the first line of this potentially multi-line pre-diff query
lines_before_diff = query_before_diff.splitlines()
if not lines_before_diff: # Should be caught by query_before_diff.strip() check, but for safety
return ""
first_line = lines_before_diff[0]
# Check if this first line itself is a code/diff marker
if CODE_DIFF_MARKERS_REGEX.match(first_line):
# If the first line itself is identified as a code/diff marker,
# this implies the query might predominantly be code or a diff.
# In this case, we set the query to an empty string.
return ""
else:
# Otherwise, the first line is considered the cleaned query.
return first_line
def main():
"""Main function to clean the query field in session.jsonl."""
if not SESSION_DATA_FILE.exists():
print(f"Error: Session data file not found at {SESSION_DATA_FILE}")
return
updated_lines = []
modified_count = 0
processed_lines = 0
print(f"Starting cleaning process for {SESSION_DATA_FILE}...")
with open(SESSION_DATA_FILE, "r", encoding="utf-8") as f:
for line_num, line_content in enumerate(f, 1):
processed_lines += 1
try:
data = json.loads(line_content)
original_query = data.get("query") # Use .get() for safety
if "query" in data and isinstance(original_query, str):
cleaned_query = clean_query(original_query)
if cleaned_query != original_query:
data["query"] = cleaned_query
modified_count += 1
updated_lines.append(json.dumps(data) + "\n")
except json.JSONDecodeError as e:
print(f"Warning: Error decoding JSON from line {line_num}: {e}. Keeping original line.")
updated_lines.append(line_content) # Keep original line if JSON error
except Exception as e:
print(f"Warning: Error processing line {line_num}: {e}. Keeping original line.")
updated_lines.append(line_content) # Keep original line if other error
# Write back to the original file
try:
with open(SESSION_DATA_FILE, "w", encoding="utf-8") as f:
for updated_line in updated_lines:
f.write(updated_line)
print(f"\nProcessing complete.")
print(f"Processed {processed_lines} lines.")
print(f"{modified_count} queries were cleaned.")
print(f"Cleaned data saved to {SESSION_DATA_FILE.resolve()}")
except IOError as e:
print(f"Error writing cleaned data to {SESSION_DATA_FILE}: {e}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,168 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<title>development_aider - Aider Analytics Dashboard</title>
<style>
:root {
--bg-page: #f8f0e0; --bg-widget: #ede9dd; --text-main: #5c5951;
--text-stat-number: #3c3a35; --bar-green: #7fb069; --bar-red: #d26a5d;
--bar-yellow: #e7b468; --bar-teal: #77a099; --tag-bg: #fdfbf5;
--tag-border: #dcd8cf; --font-family-main: 'Consolas', 'Courier New', monospace;
}
body {
font-family: var(--font-family-main); background-color: var(--bg-page);
color: var(--text-main); margin: 0; padding: 25px; font-size: 14px;
}
.dashboard-container { max-width: 1000px; margin: 0 auto; }
header h1 { font-size: 1.4em; font-weight: bold; margin-bottom: 25px; }
.sticky-header-content {
position: sticky;
top: 0; /* Stick to the top of the viewport */
z-index: 10; /* Ensure it stays above scrolling content */
background-color: var(--bg-page); /* Match body background */
padding-bottom: 20px; /* Add some space below the sticky content */
}
.stats-overview { display: flex; gap: 20px; margin-bottom: 0; align-items: flex-start; } /* Remove bottom margin */
.main-stat-item { flex: 1; min-width: 200px; padding: 20px; }
/* Decreased font size for main stat number */
.main-stat-item .stat-number-main { font-size: 4em; font-weight: bold; color: var(--text-stat-number); line-height: 1; margin-bottom: 8px; }
.main-stat-item .stat-label { font-size: 0.85em; text-transform: uppercase; margin-bottom: 15px; }
.main-stat-item .last-entry { font-size: 0.9em; }
.main-stat-item .last-entry strong { color: var(--text-stat-number); }
/* New style for the model cost summary box */
.model-cost-summary-box {
flex: 1; /* Take up remaining space */
min-width: 250px; /* Ensure minimum width */
background-color: var(--bg-widget);
padding: 20px;
display: flex;
flex-direction: column;
}
.model-cost-summary-box h3 {
font-size: 1.1em;
font-weight: bold;
margin: 0 0 10px 0;
color: var(--text-stat-number);
}
.model-cost-summary-box ul {
list-style: none;
padding: 0;
margin: 0;
font-size: 0.9em;
overflow-y: auto; /* Add scroll if list is long */
max-height: 150px; /* Limit height */
}
.model-cost-summary-box li {
margin-bottom: 5px;
padding-bottom: 3px;
border-bottom: 1px dashed #e7e3da;
}
.model-cost-summary-box li:last-child {
border-bottom: none;
margin-bottom: 0;
}
.model-cost-summary-box .model-name {
font-weight: bold;
}
.model-cost-summary-box .model-cost {
float: right; /* Align cost to the right */
}
/* Secondary stats section below the main overview */
.secondary-stats-section { margin-top: 20px; }
.secondary-stats-section h2 { font-size: 1.15em; font-weight: bold; margin-bottom: 15px; }
/* Modified right-stats-group to be full width and smaller */
.right-stats-group {
width: 100%; /* Full width */
display: grid;
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); /* Responsive grid */
gap: 15px; /* Smaller gap */
}
.right-stats-group .stat-box { background-color: var(--bg-widget); padding: 15px; font-size: 0.9em; } /* Smaller padding and font */
.right-stats-group .stat-box .stat-number-medium { font-size: 1.8em; font-weight: bold; color: var(--text-stat-number); line-height: 1; margin-bottom: 5px; } /* Smaller number font */
.right-stats-group .stat-box .stat-label { font-size: 0.7em; text-transform: uppercase; } /* Smaller label font */
.text-entry-history-section { margin-top: 35px; }
.text-entry-history-section h2 { font-size: 1.15em; font-weight: bold; margin-bottom: 15px; }
.history-list-container { background-color: var(--bg-widget); padding: 5px; }
.session-group { margin-bottom: 10px; border: 1px solid #dcd8cf; border-radius: 4px; background-color: #fff; /* White background for the group content */}
.session-group details[open] .session-summary { border-bottom: 1px solid #dcd8cf; }
.session-summary { padding: 10px 15px; background-color: #f5f2eb; cursor: pointer; font-weight: bold; list-style: none; /* Remove default marker */ }
.session-summary::-webkit-details-marker { display: none; /* Chrome/Safari */ }
.session-summary::marker { display: none; /* Firefox */ }
.session-summary:hover { background-color: #e9e5dc; }
.history-item { display: flex; padding: 15px 20px 15px 15px; border-bottom: 1px solid #e7e3da; background-color: #fdfbf5; /* Slightly off-white for items */ }
.history-item:last-child { border-bottom: none; }
/* .session-group > .history-item:first-of-type { border-top: 1px solid #dcd8cf; } */ /* Removed as details summary acts as separator */
.color-bar { width: 4px; margin-right: 15px; flex-shrink: 0; }
.color-bar.green { background-color: var(--bar-green); } .color-bar.red { background-color: var(--bar-red); }
.color-bar.yellow { background-color: var(--bar-yellow); } .color-bar.teal { background-color: var(--bar-teal); }
.color-bar.blue { background-color: #5D9CEC; } /* Added blue color for latest interaction */
.item-content { flex-grow: 1; }
.item-header { display: flex; justify-content: space-between; align-items: flex-start; margin-bottom: 8px; }
.item-header h3 { font-size: 1.05em; font-weight: bold; margin: 0; color: var(--text-stat-number); }
.item-header .timestamp { font-size: 0.8em; white-space: nowrap; margin-left: 10px; padding-top: 2px; }
.entry-text { font-size: 0.9em; line-height: 1.6; margin-bottom: 12px; word-break: break-word; }
.details dl { margin: 0; padding: 0; }
.details dt { font-weight: bold; margin-top: 8px; font-size: 0.9em; color: #4c4a44; }
.details dd { margin-left: 0; margin-bottom: 8px; font-size: 0.85em; }
.details p { font-size: 0.9em; margin: 5px 0; }
.details h4 { font-size: 0.95em; margin: 10px 0 5px 0; }
.details ul { list-style-type: disc; margin: 0 0 5px 20px; padding: 0; }
.details ul li { font-size: 0.85em; margin-bottom: 3px; }
.token-stats, .litellm-call { font-size: 0.85em; }
.litellm-call { padding: 5px; border: 1px dashed #ccc; margin-top: 5px; background-color: #f9f9f9; }
/* Raw Data Section Styles */
.raw-data-details { margin-top: 15px; border-top: 1px dashed #ccc; padding-top: 10px; }
.raw-data-details summary { font-weight: bold; cursor: pointer; font-size: 0.9em; color: #4c4a44; }
.raw-data-details summary:hover { text-decoration: underline; }
.raw-data-json {
background-color: #f0f0f0; padding: 10px; border-radius: 4px;
overflow-x: auto; white-space: pre-wrap; word-wrap: break-word;
font-size: 0.8em; line-height: 1.4; color: #333;
}
footer { text-align: center; margin-top: 40px; padding-top: 20px; font-size: 0.8em; }
</style>
</head>
<body>
<div class="dashboard-container">
<div class="sticky-header-content">
<header>
<h1><!-- AIDER_ANALYTICS_PROJECT_NAME --> - AIDER ANALYTICS</h1>
</header>
<section class="stats-overview">
<!-- AIDER_ANALYTICS_STATS_OVERVIEW_CONTENT -->
</section>
<section class="secondary-stats-section">
<h2>SECONDARY STATS</h2>
<!-- AIDER_ANALYTICS_SECONDARY_STATS_CONTENT -->
</section>
<section class="latest-interaction-display">
<h2>LATEST INTERACTION</h2>
<!-- AIDER_ANALYTICS_LATEST_INTERACTION_CONTENT -->
<!-- This will be populated with a single history-item styled block -->
</section>
</div>
<section class="text-entry-history-section">
<h2>TEXT ENTRY HISTORY</h2>
<div class="history-list-container">
<!-- AIDER_ANALYTICS_HISTORY_ENTRIES_CONTENT -->
</div>
</section>
<footer>
<p>AIDER ANALYTICS SYSTEM v0.1.0</p>
</footer>
</div>
</body>
</html>

View file

@ -0,0 +1,481 @@
import json
import os
from datetime import datetime
from pathlib import Path
import html
from collections import defaultdict # Import defaultdict
import webbrowser
# Define file paths (assuming they are in the same directory as the script)
BASE_DIR = Path(__file__).resolve().parent
SESSION_DATA_FILE = BASE_DIR / "session.jsonl"
COLOR_CLASSES = ["teal", "green", "yellow", "red"] # For dynamic history item colors
DASHBOARD_TEMPLATE_FILE = BASE_DIR / "dashboard.html"
DASHBOARD_OUTPUT_FILE = BASE_DIR / "dashboard_generated.html"
def format_timestamp(ts_str):
"""Formats an ISO timestamp string into a more readable format."""
if not ts_str:
return "N/A"
try:
# Handle potential 'Z' for UTC
if ts_str.endswith('Z'):
ts_str = ts_str[:-1] + '+00:00'
dt_obj = datetime.fromisoformat(ts_str)
return dt_obj.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
return ts_str # Return original if parsing fails
def format_duration(seconds):
"""Formats a duration in seconds into a human-readable string (e.g., 1m 38s)."""
if seconds is None:
return "N/A"
try:
s = int(seconds)
if s < 0:
return "N/A"
m, s = divmod(s, 60)
h, m = divmod(m, 60)
if h > 0:
return f"{h}h {m}m {s}s"
elif m > 0:
return f"{m}m {s}s"
else:
return f"{s}s"
except (ValueError, TypeError):
return "N/A"
def escape_html(text):
"""Escapes HTML special characters in a string."""
if text is None:
return ""
return html.escape(str(text))
def read_session_data(filepath):
"""Reads session data from a JSONL file."""
data = []
if not filepath.exists():
print(f"Error: Session data file not found at {filepath}")
return data
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
try:
data.append(json.loads(line))
except json.JSONDecodeError as e:
print(f"Error decoding JSON from line: {line.strip()} - {e}")
return data
def calculate_cost_by_model(all_data):
"""
Calculates the total estimated cost per model from all session data.
"""
cost_by_model = defaultdict(float)
if not all_data:
return dict(cost_by_model)
for data in all_data:
# Iterate through the models used summary for this interaction
models_summary = data.get("models_used_summary", [])
if not isinstance(models_summary, list):
# print(f"Warning: 'models_used_summary' is not a list in data: {data}") # Optional debug
continue
for model_info in models_summary:
if not isinstance(model_info, dict):
# print(f"Warning: Item in 'models_used_summary' is not a dict in data: {data}") # Optional debug
continue
model_name = model_info.get("name", "Unknown Model")
cost = model_info.get("cost", 0.0)
# Ensure cost is a number before adding
if isinstance(cost, (int, float)):
cost_by_model[model_name] += cost
else:
print(f"Warning: Found non-numeric cost value for model '{model_name}': {cost} in data: {data}")
return dict(cost_by_model) # Convert defaultdict to dict for final return
def format_cost_by_model_html(cost_by_model):
"""Generates HTML list for cost breakdown by model."""
if not cost_by_model:
return "<ul><li>No model cost data available.</li></ul>"
# Sort models by cost descending
sorted_models = sorted(cost_by_model.items(), key=lambda item: item[1], reverse=True)
list_items_html = ""
for model, cost in sorted_models:
list_items_html += f"""
<li>
<span class="model-name">{escape_html(model)}:</span>
<span class="model-cost">${cost:.4f}</span>
</li>
"""
return f"<ul>{list_items_html}</ul>"
def generate_stats_overview_html(all_data, cost_by_model):
"""Generates HTML for the main stats overview section (Total Cost + Cost by Model)."""
total_estimated_cost = sum(item.get("token_summary", {}).get("estimated_cost", 0.0) or 0.0 for item in all_data)
last_entry_timestamp_str = "N/A"
if all_data:
# Assuming all_data is sorted with newest entry last after reading
last_interaction_data = all_data[-1] # Newest interaction
last_entry_timestamp_str = format_timestamp(last_interaction_data.get("interaction_timestamp"))
model_cost_list_html = format_cost_by_model_html(cost_by_model)
return f"""
<div class="main-stat-item">
<div class="stat-number-main">${total_estimated_cost:.4f}</div>
<div class="stat-label">TOTAL ESTIMATED COST</div>
<div class="last-entry">
<span class="data-label">Last Entry:</span>
<span class="data-value">{escape_html(last_entry_timestamp_str)}</span>
</div>
</div>
<div class="model-cost-summary-box">
<h3>COST BY MODEL</h3>
{model_cost_list_html}
</div>
"""
def generate_secondary_stats_html(all_data):
"""Generates HTML for the secondary stats section (Tokens, Duration, Sessions)."""
if not all_data:
# Return the structure with N/A values if no data, matching dashboard.html's expectation
return """
<div class="right-stats-group">
<div class="stat-box">
<div class="stat-number-medium">0</div>
<div class="stat-label">TOTAL PROMPT TOKENS</div>
</div>
<div class="stat-box">
<div class="stat-number-medium">0s</div>
<div class="stat-label">TOTAL INTERACTION DURATION</div>
</div>
<div class="stat-box">
<div class="stat-number-medium">0</div>
<div class="stat-label">TOTAL COMPLETION TOKENS</div>
</div>
<div class="stat-box">
<div class="stat-number-medium">0</div>
<div class="stat-label">TOTAL SESSIONS</div>
</div>
</div>"""
total_duration_seconds = sum(item.get("interaction_duration_seconds", 0) or 0 for item in all_data)
total_prompt_tokens = sum(item.get("token_summary", {}).get("prompt_tokens", 0) or 0 for item in all_data)
total_completion_tokens = sum(item.get("token_summary", {}).get("completion_tokens", 0) or 0 for item in all_data)
total_sessions = 0
if all_data:
session_ids = set()
for item in all_data:
if item.get("session_id"):
session_ids.add(item.get("session_id"))
total_sessions = len(session_ids)
formatted_duration = format_duration(total_duration_seconds)
formatted_prompt_tokens = f"{total_prompt_tokens / 1_000_000:.2f}M" if total_prompt_tokens >= 1_000_000 else str(total_prompt_tokens)
formatted_completion_tokens = f"{total_completion_tokens / 1_000_000:.2f}M" if total_completion_tokens >= 1_000_000 else str(total_completion_tokens)
return f"""
<div class="right-stats-group">
<div class="stat-box">
<div class="stat-number-medium">{formatted_prompt_tokens}</div>
<div class="stat-label">TOTAL PROMPT TOKENS</div>
</div>
<div class="stat-box">
<div class="stat-number-medium">{formatted_duration}</div>
<div class="stat-label">TOTAL INTERACTION DURATION</div>
</div>
<div class="stat-box">
<div class="stat-number-medium">{formatted_completion_tokens}</div>
<div class="stat-label">TOTAL COMPLETION TOKENS</div>
</div>
<div class="stat-box">
<div class="stat-number-medium">{total_sessions}</div>
<div class="stat-label">TOTAL SESSIONS</div>
</div>
</div>"""
def generate_collapsible_list_html(title, items_list):
items_list = items_list or [] # Ensure items_list is not None
if not items_list:
return f"<p><strong>{escape_html(title)}:</strong> None</p>"
list_items_html = "".join(f"<li>{escape_html(item)}</li>" for item in items_list)
return f"""
<details class="collapsible-section">
<summary class="collapsible-summary">{escape_html(title)} ({len(items_list)})</summary>
<div class="collapsible-content">
<ul>{list_items_html}</ul>
</div>
</details>
"""
def generate_token_summary_html(token_summary):
token_summary = token_summary or {} # Ensure token_summary is not None
if not token_summary:
return "<p>No token summary available.</p>"
return f"""
<details class="collapsible-section">
<summary class="collapsible-summary">Token Summary</summary>
<div class="collapsible-content">
<p><strong>Prompt Tokens:</strong> {token_summary.get("prompt_tokens", "N/A")}</p>
<p><strong>Completion Tokens:</strong> {token_summary.get("completion_tokens", "N/A")}</p>
<p><strong>Total Tokens:</strong> {token_summary.get("total_tokens", "N/A")}</p>
<p><strong>Estimated Cost:</strong> ${token_summary.get("estimated_cost", 0.0):.6f}</p>
</div>
</details>
"""
def generate_models_used_summary_html(models_summary):
models_summary = models_summary or [] # Ensure models_summary is not None
if not models_summary:
return "<p>No models used summary available.</p>"
rows_html = ""
for model_info in models_summary:
model_info = model_info or {} # Ensure model_info is not None
rows_html += f"""
<tr>
<td>{escape_html(model_info.get("name"))}</td>
<td>{model_info.get("calls", "N/A")}</td>
<td>${model_info.get("cost", 0.0):.6f}</td>
<td>{model_info.get("prompt_tokens", "N/A")}</td>
<td>{model_info.get("completion_tokens", "N/A")}</td>
</tr>
"""
return f"""
<details class="collapsible-section">
<summary class="collapsible-summary">Models Used Summary ({len(models_summary)})</summary>
<div class="collapsible-content">
<table>
<thead>
<tr>
<th>Name</th>
<th>Calls</th>
<th>Cost</th>
<th>Prompt Tokens</th>
<th>Completion Tokens</th>
</tr>
</thead>
<tbody>
{rows_html}
</tbody>
</table>
</div>
</details>
"""
def generate_llm_calls_details_html(llm_calls):
llm_calls = llm_calls or [] # Ensure llm_calls is not None
if not llm_calls:
return "<p>No LLM call details available.</p>"
rows_html = ""
for call in llm_calls:
call = call or {} # Ensure call is not None
rows_html += f"""
<tr>
<td>{escape_html(call.get("model"))}</td>
<td>{escape_html(call.get("id"))}</td>
<td>{escape_html(call.get("finish_reason", "N/A"))}</td>
<td>{call.get("prompt_tokens", "N/A")}</td>
<td>{call.get("completion_tokens", "N/A")}</td>
<td>${call.get("cost", 0.0):.6f}</td>
<td>{format_timestamp(call.get("timestamp"))}</td>
</tr>
"""
return f"""
<details class="collapsible-section">
<summary class="collapsible-summary">LLM Calls Details ({len(llm_calls)})</summary>
<div class="collapsible-content">
<table>
<thead>
<tr>
<th>Model</th>
<th>ID</th>
<th>Finish Reason</th>
<th>Prompt Tokens</th>
<th>Completion Tokens</th>
<th>Cost</th>
<th>Timestamp</th>
</tr>
</thead>
<tbody>
{rows_html}
</tbody>
</table>
</div>
</details>
"""
def generate_interaction_html(interaction_data, index, use_special_color_bar=False, special_color_class="blue"):
"""Generates HTML for a single interaction entry."""
interaction_data = interaction_data or {}
session_id = escape_html(interaction_data.get("session_id", f"interaction-{index}"))
project_name = escape_html(interaction_data.get("project_name", "N/A"))
timestamp_str = format_timestamp(interaction_data.get("interaction_timestamp"))
duration_str = format_duration(interaction_data.get("interaction_duration_seconds"))
query_text = escape_html(interaction_data.get("query", "No query provided."))
aider_version = escape_html(interaction_data.get("aider_version", "N/A"))
platform_info = escape_html(interaction_data.get("platform_info", "N/A"))
python_version = escape_html(interaction_data.get("python_version", "N/A"))
if use_special_color_bar:
color_bar_class = special_color_class
else:
if COLOR_CLASSES: # Ensure COLOR_CLASSES is not empty
color_bar_class = COLOR_CLASSES[index % len(COLOR_CLASSES)]
else:
color_bar_class = "teal" # Fallback if COLOR_CLASSES is somehow empty
return f"""
<div class="history-item" id="interaction-{session_id}-{index}">
<div class="color-bar {color_bar_class}"></div>
<div class="item-content">
<div class="item-header">
<h3>{project_name}</h3>
<span class="timestamp">{timestamp_str} (Duration: {duration_str})</span>
</div>
<p class="entry-text">
<span class="data-label">Query:</span>
<span class="data-value">{query_text}</span>
</p>
<div class="details">
<dl>
<dt class="data-label">Session ID:</dt>
<dd class="data-value">{session_id}</dd>
<dt class="data-label">Aider Version:</dt>
<dd class="data-value">
{aider_version}
<span class="data-label">Platform:</span>
<span class="data-value">{platform_info}</span>,
<span class="data-label">Python:</span>
<span class="data-value">{python_version}</span>
</dd>
<dt class="data-label">Token Usage:</dt>
<dd class="data-value">{generate_token_summary_html(interaction_data.get("token_summary"))}</dd>
<dt class="data-label">Models Used:</dt>
<dd class="data-value">{generate_models_used_summary_html(interaction_data.get("models_used_summary"))}</dd>
<dt class="data-label">LLM Call Details:</dt>
<dd class="data-value">{generate_llm_calls_details_html(interaction_data.get("llm_calls_details"))}</dd>
<dt class="data-label">Modified Files (in chat context):</dt>
<dd class="data-value">{generate_collapsible_list_html("Modified Files in Chat", interaction_data.get("modified_files_in_chat"))}</dd>
<dt class="data-label">Commits Made This Interaction:</dt>
<dd class="data-value">{generate_collapsible_list_html("Commits Made This Interaction", interaction_data.get("commits_made_this_interaction"))}</dd>
</dl>
</div>
</div>
</div>
"""
def main():
"""Main function to generate the dashboard."""
all_session_data = read_session_data(SESSION_DATA_FILE)
# Calculate cost by model once
cost_by_model = calculate_cost_by_model(all_session_data)
# Generate HTML for the different sections
stats_overview_html = generate_stats_overview_html(all_session_data, cost_by_model)
secondary_stats_html = generate_secondary_stats_html(all_session_data)
latest_interaction_display_html = ""
history_entries_html = ""
project_name_header = "AIDER ANALYTICS" # Default if no data
if not all_session_data:
latest_interaction_display_html = '<p class="empty-state">No latest interaction data to display.</p>'
history_entries_html = '<p class="empty-state">No interaction history to display.</p>'
else:
# Data is assumed to be oldest to newest from read_session_data
data_for_processing = list(all_session_data) # Make a copy
latest_interaction_data = data_for_processing.pop() # Removes and returns the last item (newest)
project_name_header = escape_html(latest_interaction_data.get("project_name", "AIDER ANALYTICS")) # Get project name from latest interaction
# Index 0 for latest, but color is overridden by use_special_color_bar
latest_interaction_display_html = generate_interaction_html(latest_interaction_data, 0, use_special_color_bar=True, special_color_class="blue")
history_entries_html_parts = []
if not data_for_processing:
history_entries_html = '<p class="empty-state">No further interaction history to display.</p>'
else:
# Iterate from newest to oldest for display for the rest of the history
for i, interaction_data in enumerate(reversed(data_for_processing)):
# i will be 0 for the newest in remaining, 1 for next, etc.
history_entries_html_parts.append(generate_interaction_html(interaction_data, i))
history_entries_html = "\n".join(history_entries_html_parts)
if not history_entries_html_parts: # Should not happen if data_for_processing was not empty
history_entries_html = '<p class="empty-state">No further interaction history to display.</p>'
if not DASHBOARD_TEMPLATE_FILE.exists():
print(f"Error: Dashboard template file not found at {DASHBOARD_TEMPLATE_FILE}")
# Create a basic HTML structure if template is missing, to show some output
output_content = f"""
<html>
<head><title>Aider Analytics Dashboard</title></head>
<body>
<h1>{project_name_header} - Aider Analytics Dashboard</h1>
<h2>Stats Overview</h2>
<section class="stats-overview">{stats_overview_html}</section>
<h2>Secondary Stats</h2>
<section class="secondary-stats-section">{secondary_stats_html}</section>
<h2>Latest Interaction</h2>
<section class="latest-interaction-display">{latest_interaction_display_html}</section>
<h2>Interaction History</h2>
<section class="text-entry-history-section">{history_entries_html}</section>
<p><small>Note: dashboard.html template was not found. This is a fallback display.</small></p>
</body>
</html>
"""
else:
with open(DASHBOARD_TEMPLATE_FILE, "r", encoding="utf-8") as f:
template_content = f.read()
output_content = template_content.replace("<!-- AIDER_ANALYTICS_PROJECT_NAME -->", project_name_header)
output_content = output_content.replace("<!-- AIDER_ANALYTICS_STATS_OVERVIEW_CONTENT -->", stats_overview_html)
output_content = output_content.replace("<!-- AIDER_ANALYTICS_SECONDARY_STATS_CONTENT -->", secondary_stats_html)
output_content = output_content.replace("<!-- AIDER_ANALYTICS_LATEST_INTERACTION_CONTENT -->", latest_interaction_display_html)
output_content = output_content.replace("<!-- AIDER_ANALYTICS_HISTORY_ENTRIES_CONTENT -->", history_entries_html)
# Check if placeholders were correctly replaced (optional, for debugging)
# if "<!-- AIDER_ANALYTICS_STATS_OVERVIEW_CONTENT -->" in output_content and "<!-- AIDER_ANALYTICS_STATS_OVERVIEW_CONTENT -->" not in stats_overview_html:
# print("Warning: Stats overview placeholder was not replaced.")
# if "<!-- AIDER_ANALYTICS_SECONDARY_STATS_CONTENT -->" in output_content and "<!-- AIDER_ANALYTICS_SECONDARY_STATS_CONTENT -->" not in secondary_stats_html:
# print("Warning: Secondary stats placeholder was not replaced.")
# if "<!-- AIDER_ANALYTICS_LATEST_INTERACTION_CONTENT -->" in output_content and "<!-- AIDER_ANALYTICS_LATEST_INTERACTION_CONTENT -->" not in latest_interaction_display_html:
# print("Warning: Latest interaction placeholder was not replaced.")
# if "<!-- AIDER_ANALYTICS_HISTORY_ENTRIES_CONTENT -->" in output_content and "<!-- AIDER_ANALYTICS_HISTORY_ENTRIES_CONTENT -->" not in history_entries_html:
# print("Warning: History entries placeholder was not replaced.")
with open(DASHBOARD_OUTPUT_FILE, "w", encoding="utf-8") as f:
f.write(output_content)
print(f"Dashboard generated: {DASHBOARD_OUTPUT_FILE.resolve().as_uri()}")
webbrowser.open(DASHBOARD_OUTPUT_FILE.resolve().as_uri())
if __name__ == "__main__":
main()

View file

@ -0,0 +1,351 @@
# aider/local_analytics_collector.py
import atexit
import datetime
import logging
import os
import platform
import shelve
import sys
import time
import uuid
import json # Import json module
import re # Import re module
import litellm
# Import from the local_analytics package (assuming project_root/local_analytics/dashboard_generator.py)
from local_analytics.dashboard_generator import main
try:
from aider import __version__ as aider_version_val
except ImportError:
aider_version_val = "unknown"
# Path constants relative to the project root where Aider is run
DATA_SHELVE_FILE = "local_analytics/aider_analytics_data.shelve"
# Constant for the dashboard HTML file
# REMOVED: DASHBOARD_HTML_FILE = "local_analytics/dashboard.html"
LOG_FILE = "local_analytics/local_analytics_collector.logs"
SESSION_JSONL_FILE = "local_analytics/session.jsonl" # Define the new JSONL file path
class LocalAnalyticsCollector:
"""
Collects local analytics data for Aider sessions and interactions.
This class tracks various metrics related to LLM calls, token usage,
code modifications, and session timings. Data is stored locally using
the `shelve` module.
"""
def __init__(self, io, git_root=None, enabled=True):
"""
Initializes the LocalAnalyticsCollector.
Args:
io: An InputOutput object for user interaction (currently unused beyond holding a reference).
git_root (str, optional): The root directory of the git project.
Defaults to None, in which case the current working directory is used.
enabled (bool, optional): Whether analytics collection is enabled. Defaults to True.
"""
self.io = io # Retain for the final user-facing message
self.enabled = enabled
if not self.enabled:
return
if git_root:
self.project_name = os.path.basename(os.path.abspath(git_root))
base_path = git_root
else:
self.project_name = os.path.basename(os.getcwd())
base_path = os.getcwd()
self.data_file = os.path.join(base_path, DATA_SHELVE_FILE)
self.log_file = os.path.join(base_path, LOG_FILE)
# Store the dashboard output file path
# REMOVED: self.dashboard_output_file = os.path.join(base_path, DASHBOARD_HTML_FILE)
# Store the session JSONL file path
self.session_jsonl_file = os.path.join(base_path, SESSION_JSONL_FILE)
self.session_id = str(uuid.uuid4())
self.aider_version = aider_version_val
self.platform_info = platform.platform()
self.python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
self._current_interaction_data = None
self._interaction_start_time_monotonic = None
# <<< START LOGGER SETUP
log_dir = os.path.dirname(self.log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
self.logger = logging.getLogger(__name__ + ".LocalAnalyticsCollector") # Or just __name__
self.logger.setLevel(logging.DEBUG)
self.logger.propagate = False # Prevent logs from reaching root logger / console
# Remove existing handlers to prevent duplication if __init__ is called multiple times
for handler in self.logger.handlers[:]:
self.logger.removeHandler(handler)
handler.close()
fh = logging.FileHandler(self.log_file, encoding='utf-8')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(session_id)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
# Make session_id available to logger formatter
self._log_adapter = logging.LoggerAdapter(self.logger, {'session_id': self.session_id})
self._log_adapter.debug(f"--- LocalAnalyticsCollector Initialized ---")
self._log_adapter.debug(f"Project: {self.project_name}")
self._log_adapter.debug(f"Data file: {self.data_file}")
self._log_adapter.debug(f"Log file: {self.log_file}")
self._log_adapter.debug(f"Session JSONL file: {self.session_jsonl_file}")
# <<< END LOGGER SETUP
data_dir = os.path.dirname(self.data_file)
if data_dir and not os.path.exists(data_dir):
os.makedirs(data_dir, exist_ok=True)
# Ensure directory for dashboard.html and session.jsonl also exists
# REMOVED: output_dir = os.path.dirname(self.dashboard_output_file) # Assuming dashboard and jsonl are in the same dir
output_dir = os.path.dirname(self.session_jsonl_file) # Use session_jsonl_file path
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
atexit.register(self.end_session)
self._original_success_callbacks = litellm.success_callback[:]
self._original_failure_callbacks = litellm.failure_callback[:]
if self._litellm_success_callback not in litellm.success_callback:
litellm.success_callback.append(self._litellm_success_callback)
def start_interaction(self, query, modified_files_in_chat=None):
"""
Starts tracking a new interaction.
If a previous interaction was in progress, it will be ended first.
Args:
query (str): The user's query for this interaction.
modified_files_in_chat (list, optional): A list of files modified in the chat context.
Defaults to None.
"""
if not self.enabled:
return
if self._current_interaction_data:
self.end_interaction() # End previous interaction if any
self._interaction_start_time_monotonic = time.monotonic()
self._current_interaction_data = {
"session_id": self.session_id,
"project_name": self.project_name,
"interaction_timestamp": datetime.datetime.now().isoformat(),
"interaction_duration_seconds": 0,
"query": re.split(r"```diff", query, 1)[0].strip(),
"aider_version": self.aider_version,
"platform_info": self.platform_info,
"python_version": self.python_version,
"token_summary": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "estimated_cost": 0.0},
"models_used_summary": [],
"llm_calls_details": [],
"modified_files_in_chat": modified_files_in_chat or [],
"commits_made_this_interaction": []
}
def end_interaction(self):
"""
Ends the current interaction and saves its data.
Calculates interaction duration, summarizes model usage, and persists
the interaction data to the shelve database.
"""
if not self.enabled or not self._current_interaction_data:
return
if self._interaction_start_time_monotonic:
duration = time.monotonic() - self._interaction_start_time_monotonic
self._current_interaction_data["interaction_duration_seconds"] = duration
# Summarize model usage from detailed calls
model_summary_map = {}
for call in self._current_interaction_data.get("llm_calls_details", []):
model_name = call.get("model", "unknown_model")
entry = model_summary_map.setdefault(
model_name,
{
"name": model_name,
"calls": 0,
"cost": 0.0,
"prompt_tokens": 0,
"completion_tokens": 0,
},
)
entry["calls"] += 1
entry["cost"] += call.get("cost", 0.0)
entry["prompt_tokens"] += call.get("prompt_tokens", 0)
entry["completion_tokens"] += call.get("completion_tokens", 0)
self._current_interaction_data["models_used_summary"] = list(model_summary_map.values())
try:
with shelve.open(self.data_file) as db:
interactions = db.get("interactions", [])
interactions.append(self._current_interaction_data)
db["interactions"] = interactions
except Exception as e:
self._log_adapter.error(f"Error saving interaction to shelve: {e}")
self._current_interaction_data = None
self._interaction_start_time_monotonic = None
def _litellm_success_callback(self, kwargs, completion_response, start_time, end_time):
"""
Callback for successful LiteLLM calls.
This method is registered with LiteLLM to capture details of each
successful LLM API call, including token usage and cost.
Args:
kwargs: Keyword arguments passed to the LiteLLM completion call.
completion_response: The response object from LiteLLM.
start_time: Timestamp when the LLM call started.
end_time: Timestamp when the LLM call ended.
"""
if not self.enabled or not self._current_interaction_data:
return
model_name = kwargs.get("model", "unknown_model")
usage = getattr(completion_response, "usage", None)
prompt_tokens = getattr(usage, 'prompt_tokens', 0) if usage else 0
completion_tokens = getattr(usage, 'completion_tokens', 0) if usage else 0
cost = 0.0
try:
# Ensure cost is float, handle potential errors from litellm.completion_cost
calculated_cost = litellm.completion_cost(completion_response=completion_response)
cost = float(calculated_cost) if calculated_cost is not None else 0.0
except Exception as e: # Broad exception catch if litellm.completion_cost fails
self._log_adapter.warning(
f"Analytics: Could not calculate cost for LLM call. Error: {e}"
)
cost = 0.0 # Ensure cost is always a float, defaulting to 0.0 on error
call_detail = {
"model": model_name,
"id": getattr(completion_response, "id", None),
"finish_reason": (
getattr(completion_response.choices[0], "finish_reason", None)
if hasattr(completion_response, "choices") and completion_response.choices
else None
),
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"cost": cost,
"timestamp": start_time.isoformat(),
}
self._current_interaction_data["llm_calls_details"].append(call_detail)
ts = self._current_interaction_data["token_summary"]
ts["prompt_tokens"] += prompt_tokens
ts["completion_tokens"] += completion_tokens
ts["total_tokens"] += prompt_tokens + completion_tokens
ts["estimated_cost"] += cost
def log_commit(self, commit_hash, commit_message):
"""
Logs a git commit made during the current interaction.
Args:
commit_hash (str): The hash of the commit.
commit_message (str): The commit message.
"""
if not self.enabled or not self._current_interaction_data:
return
commit_info = {"hash": commit_hash, "message": commit_message}
self._current_interaction_data["commits_made_this_interaction"].append(commit_info)
def end_session(self):
"""
Ends the analytics collection session.
Ensures any ongoing interaction is ended, generates the HTML dashboard,
unregisters the atexit handler, and restores original LiteLLM callbacks.
"""
if not self.enabled: # If analytics was never enabled or session already ended.
# Unregister atexit handler early if it was somehow registered without enabling
# This path should ideally not be hit if __init__ logic is correct.
try:
atexit.unregister(self.end_session)
except TypeError: # pragma: no cover
pass # Handler was not registered or other issue
return
# End any ongoing interaction first
if self._current_interaction_data:
self.end_interaction()
# Write all the `shelve` data to session.jsonl
if hasattr(self, 'data_file') and hasattr(self, 'session_jsonl_file'):
try:
with shelve.open(self.data_file, 'r') as db:
interactions = db.get("interactions", [])
with open(self.session_jsonl_file, 'w', encoding='utf-8') as f:
for interaction in interactions:
# Ensure data is JSON serializable (e.g., handle datetime objects if any slipped through)
# Although datetime is converted to isoformat already, this is a good practice.
# Simple approach: convert to string if not serializable, or use a custom encoder.
# For now, assuming isoformat is sufficient based on start_interaction.
json_line = json.dumps(interaction)
f.write(json_line + '\n')
# generate dashboard
main()
if hasattr(self, '_log_adapter'):
self._log_adapter.info(f"Shelve data written to {self.session_jsonl_file}")
except Exception as e:
if hasattr(self, '_log_adapter'):
self._log_adapter.error(f"Error writing shelve data to JSONL: {e}")
else: # pragma: no cover
print(f"Error writing shelve data to JSONL: {e}") # Fallback if logger not set
# Cleanup atexit handler
try:
atexit.unregister(self.end_session)
except TypeError: # pragma: no cover
pass # Handler was not registered or other issue
# Restore LiteLLM callbacks
# Check if _original_success_callbacks exists before assigning
if hasattr(self, '_original_success_callbacks'):
litellm.success_callback = self._original_success_callbacks
# if hasattr(self, '_original_failure_callbacks'): # If failure callbacks were also stored
# litellm.failure_callback = self._original_failure_callbacks
if hasattr(self, '_log_adapter'):
self._log_adapter.info("LocalAnalyticsCollector session ended.")
# Ensure logger handlers are closed to release file locks, especially on Windows
if hasattr(self, 'logger'): # Check if logger was initialized
for handler in self.logger.handlers[:]:
handler.close()
self.logger.removeHandler(handler)
# Set self.enabled to False after cleanup to prevent re-entry or further use
self.enabled = False

View file

@ -0,0 +1,112 @@
#!/usr/bin/env python
import pexpect
import sys
import os
import time
# Define the command to run Aider
# Ensure the path to the .env file is correct for your environment
# This script assumes it's run from a location where 'python -m aider' works
# and the path '~/Dev/aider/.env' is valid.
aider_command = "python -m aider --env-file ~/Dev/aider/.env"
# Define the expected prompts using regex
# r'> ' matches the main aider prompt
# r'Apply edits\? \[y/n/a/e\] ' matches the edit confirmation prompt
main_prompt = r'> '
edit_prompt = r'Apply edits\? \[y/n/a/e\] '
# Set a timeout for pexpect operations (in seconds)
# Adjust this if your LLM responses are very long or system is slow
timeout_seconds = 300 # 5 minutes
print(f"Running command: {aider_command}")
child = None
try:
# Spawn the aider process
# encoding='utf-8' ensures consistent text handling
# timeout sets a default timeout for expect operations
child = pexpect.spawn(aider_command, encoding='utf-8', timeout=timeout_seconds)
# Optional: Uncomment the line below to see the raw output from the child process
# child.logfile_read = sys.stdout
# Wait for the initial Aider prompt
print("Waiting for initial prompt...")
child.expect(main_prompt)
print("Initial prompt received.")
# Change mode to /ask
print("Sending /ask command...")
child.sendline("/ask")
# Wait for the prompt to confirm mode change
child.expect(main_prompt)
print("Mode changed to /ask.")
# Send the query
query = "what is the reflection error"
print(f"Sending query: '{query}'...")
child.sendline(query)
# Wait for the LLM response to finish and the prompt to reappear.
# This loop also handles potential edit prompts that might appear
# during or after the LLM's response.
print("Waiting for LLM response and handling potential edit prompts...")
while True:
# Wait for either the edit prompt, the main prompt, EOF, or timeout
index = child.expect([edit_prompt, main_prompt, pexpect.EOF, pexpect.TIMEOUT])
if index == 0:
# Matched the edit prompt: 'Apply edits? [y/n/a/e] '
print("Edit prompt received. Sending 'n' to decline...")
child.sendline("n")
# Continue the loop to wait for the next prompt (could be another edit or the main prompt)
elif index == 1:
# Matched the main prompt: '> '
# This indicates the LLM response is likely finished and no more edit prompts are pending
print("Main prompt received. LLM response finished.")
break # Exit the loop
elif index == 2:
# Matched EOF - the process exited unexpectedly before we sent /exit
print("ERROR: Process exited unexpectedly (EOF).")
print("Output before EOF:")
print(child.before)
break # Exit the loop
elif index == 3:
# Matched TIMEOUT
print(f"ERROR: Timeout occurred ({timeout_seconds} seconds) while waiting for prompt.")
print("Output before timeout:")
print(child.before)
break # Exit the loop
# Send the /exit command to quit Aider
print("Sending /exit command...")
child.sendline("/exit")
# Wait for the process to terminate gracefully
print("Waiting for process to exit...")
child.expect(pexpect.EOF)
print("Process exited.")
except pexpect.exceptions.TIMEOUT as e:
print(f"ERROR: Timeout exception: {e}")
if child:
print("Output before timeout:")
print(child.before)
except pexpect.exceptions.EOF as e:
print(f"ERROR: EOF exception: {e}")
if child:
print("Output before EOF:")
print(child.before)
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
# Ensure the child process is terminated if it's still running
if child and child.isalive():
print("Terminating child process...")
child.close()
print("Child process terminated.")
print("Script finished.")

View file

@ -0,0 +1,184 @@
import subprocess
import time
import os
import sys
import threading
import queue
# Define the aider command
# Use the full path to the .env file
# Assumes the script is run from the project root or a location where
# ~/Dev/aider/.env is the correct path.
# Using sys.executable ensures the script runs aider with the same python env.
aider_command = [
sys.executable,
"-m", "aider",
"--env-file", os.path.expanduser("~/Dev/aider/.env")
]
# Inputs to send to aider
inputs = [
"/ask",
"what is the reflection error",
"/exit"
]
# Expected prompts (as bytes, since we read bytes)
# Use strip() because rich might add spaces or other control characters
MAIN_PROMPT = b"> "
EDIT_PROMPT = b"Apply edits? (y/n/commit/diff/quit) "
def enqueue_output(out, queue):
"""Helper function to read output from a stream and put it in a queue."""
# Read line by line
for line in iter(out.readline, b''):
queue.put(line)
out.close()
def run_aider_session():
print(f"[SCRIPT] Starting aider with command: {' '.join(aider_command)}")
# Start the subprocess
# Use bufsize=1 for line buffering
# universal_newlines=False to read bytes and reliably detect byte prompts
# stderr is also piped as rich often prints to stderr
process = subprocess.Popen(
aider_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=False
)
# Queues for stdout and stderr
q_stdout = queue.Queue()
q_stderr = queue.Queue()
# Start threads to read stdout and stderr asynchronously
t_stdout = threading.Thread(target=enqueue_output, args=(process.stdout, q_stdout))
t_stderr = threading.Thread(target=enqueue_output, args=(process.stderr, q_stderr))
t_stdout.daemon = True # Thread dies with the main program
t_stderr.daemon = True
t_stdout.start()
t_stderr.start()
# Give aider a moment to start and print initial messages
time.sleep(3) # Increased initial sleep slightly
current_input_index = 0
# State machine: WAITING_FOR_MAIN_PROMPT, WAITING_FOR_RESPONSE
state = "WAITING_FOR_MAIN_PROMPT"
print(f"[SCRIPT] Initial state: {state}")
try:
# Continue as long as the process is running OR there is output in the queues
while process.poll() is None or not q_stdout.empty() or not q_stderr.empty():
try:
# Get a line from stdout queue with a timeout
# A small timeout allows the loop to check process.poll() and stderr queue
line = q_stdout.get(timeout=0.05)
sys.stdout.buffer.write(line)
sys.stdout.buffer.flush()
# Check for prompts based on the state
if state == "WAITING_FOR_MAIN_PROMPT":
# Check if the line ends with the main prompt bytes (after stripping)
if line.strip().endswith(MAIN_PROMPT.strip()):
print("\n[SCRIPT] Detected main prompt.")
if current_input_index < len(inputs):
command = inputs[current_input_index]
print(f"[SCRIPT] Sending: {command}")
process.stdin.write((command + "\n").encode()) # Encode string to bytes
process.stdin.flush()
current_input_index += 1
state = "WAITING_FOR_RESPONSE" # After sending input, wait for response/next prompt
print(f"[SCRIPT] State transition: {state}")
else:
# Should not happen if /exit is the last input, but as a safeguard
print("[SCRIPT] No more inputs defined, waiting for process exit.")
state = "SESSION_COMPLETE"
print(f"[SCRIPT] State transition: {state}")
elif state == "WAITING_FOR_RESPONSE":
# While waiting for response, we might see an edit prompt or the main prompt
if line.strip().endswith(EDIT_PROMPT.strip()):
print("\n[SCRIPT] Detected edit prompt.")
print("[SCRIPT] Sending: n")
process.stdin.write(b"n\n") # Send 'n' to decline edits
process.stdin.flush()
# Stay in WAITING_FOR_RESPONSE state, as declining might lead to another prompt
print(f"[SCRIPT] State remains: {state}")
elif line.strip().endswith(MAIN_PROMPT.strip()):
print("\n[SCRIPT] Detected main prompt (while waiting for response).")
# Response finished, now ready for next main input
state = "WAITING_FOR_MAIN_PROMPT"
print(f"[SCRIPT] State transition: {state}")
except queue.Empty:
# No output from stdout, check stderr queue
try:
err_line = q_stderr.get(timeout=0.01)
sys.stderr.buffer.write(err_line)
sys.stderr.buffer.flush()
except queue.Empty:
# No output from either queue, check if process is still running
if process.poll() is not None:
# Process exited and queues are empty, we are done
print("[SCRIPT] Process exited and queues are empty.")
break
# If process is still running but no output, just continue loop and wait
# Add a small sleep to prevent tight loop if process is slow to produce output
time.sleep(0.01)
# End of while loop: process exited and queues are drained
except Exception as e:
print(f"[SCRIPT] An error occurred: {e}")
# Attempt to read remaining output before terminating
try:
# Give threads a moment to finish putting data in queues
t_stdout.join(timeout=1)
t_stderr.join(timeout=1)
# Drain queues
while not q_stdout.empty():
sys.stdout.buffer.write(q_stdout.get_nowait())
sys.stdout.buffer.flush()
while not q_stderr.empty():
sys.stderr.buffer.write(q_stderr.get_nowait())
sys.stdout.buffer.flush()
except Exception as e_drain:
print(f"[SCRIPT] Error draining queues: {e_drain}")
if process.poll() is None:
print("[SCRIPT] Terminating process...")
process.terminate() # Ensure process is terminated on error
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
print("[SCRIPT] Process did not terminate, killing...")
process.kill()
process.wait()
finally:
# Ensure process is waited upon if not already
if process.poll() is None:
print("[SCRIPT] Waiting for process to finish...")
process.wait()
# Final drain of queues just in case
while not q_stdout.empty():
sys.stdout.buffer.write(q_stdout.get_nowait())
sys.stdout.buffer.flush()
while not q_stderr.empty():
sys.stderr.buffer.write(q_stderr.get_nowait())
sys.stderr.buffer.flush()
print(f"[SCRIPT] Aider process finished with return code {process.returncode}")
if __name__ == "__main__":
run_aider_session()

View file

@ -0,0 +1,230 @@
import os
import shelve
import json
import tempfile
import shutil
import unittest
from unittest.mock import patch, MagicMock
import datetime
import time
import logging # Import logging for logger checks
# Assuming the script is run from the project root or PYTHONPATH is set
# This import path assumes the test script is in tests/local_analytics/
try:
from local_analytics.local_analytics_collector import LocalAnalyticsCollector
except ImportError:
# Fallback import path if the script is run from a different location
# This might require adjusting PYTHONPATH or running from the project root
print("Could not import LocalAnalyticsCollector directly. Ensure PYTHONPATH is set or run from project root.")
print("Attempting import assuming script is in tests/local_analytics/")
try:
# Adjust path for potential different execution contexts
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
from local_analytics.local_analytics_collector import LocalAnalyticsCollector
sys.path.pop(0) # Clean up sys.path
except ImportError as e:
print(f"Failed to import LocalAnalyticsCollector even with path adjustment: {e}")
# Exit or raise error if import fails
raise
# Dummy IO class to satisfy the collector's __init__
class DummyIO:
def tool_output(self, *args, **kwargs):
pass
def tool_warning(self, *args, **kwargs):
pass
def tool_error(self, *args, **kwargs):
pass
def confirm_ask(self, *args, **kwargs):
return 'y' # Default to yes for confirmations
def print(self, *args, **kwargs):
pass
def append_chat_history(self, *args, **kwargs):
pass
class TestLocalAnalyticsCollectorStandalone(unittest.TestCase):
def setUp(self):
# Create a temporary directory for test files
self.temp_dir = tempfile.mkdtemp()
self.project_name = os.path.basename(self.temp_dir)
# Define file paths relative to the temporary project root
self.analytics_dir = os.path.join(self.temp_dir, "local_analytics")
self.data_file = os.path.join(self.analytics_dir, "aider_analytics_data.shelve")
self.session_jsonl_file = os.path.join(self.analytics_dir, "session.jsonl")
self.dashboard_output_file = os.path.join(self.analytics_dir, "dashboard.html")
self.log_file = os.path.join(self.analytics_dir, "local_analytics_collector.logs")
# Ensure the local_analytics directory exists within the temp dir
os.makedirs(self.analytics_dir, exist_ok=True)
# Mock the generate_dashboard function
# Patch the function where it's *used* in local_analytics_collector.py
self.patcher_generate_dashboard = patch('local_analytics.local_analytics_collector.generate_dashboard')
self.mock_generate_dashboard = self.patcher_generate_dashboard.start()
# Mock litellm.completion_cost as it might be called internally
self.patcher_litellm_cost = patch('litellm.completion_cost')
self.mock_litellm_cost = self.patcher_litellm_cost.start()
self.mock_litellm_cost.return_value = 0.03 # Return a fixed cost for testing
# Mock litellm.success_callback list to control it during the test
# The collector appends its callback to this list in __init__
self.patcher_litellm_success_callback_list = patch('litellm.success_callback', new_callable=list)
self.mock_litellm_success_callback_list = self.patcher_litellm_success_callback_list.start()
# Create a dummy IO object
self.dummy_io = DummyIO()
def tearDown(self):
# Stop all patches
self.patcher_generate_dashboard.stop()
self.patcher_litellm_cost.stop()
self.patcher_litellm_success_callback_list.stop()
# Clean up the temporary directory
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def test_analytics_collection_and_output(self):
"""
Tests that analytics data is collected, saved to shelve,
written to session.jsonl, and the dashboard generator is called
with the correct shelve file path.
"""
# 1. Initialize collector
# Pass the temporary directory as the git_root
collector = LocalAnalyticsCollector(self.dummy_io, git_root=self.temp_dir, enabled=True)
# Verify the collector's callback was added to the litellm list
self.assertIn(collector._litellm_success_callback, self.mock_litellm_success_callback_list)
# 2. Simulate an interaction
query = "Test query for analytics collection"
modified_files = ["test_file1.py", "docs/test_doc.md"]
collector.start_interaction(query, modified_files_in_chat=modified_files)
# Simulate an LLM call within the interaction
mock_completion_response = MagicMock()
mock_completion_response.usage.prompt_tokens = 100
mock_completion_response.usage.completion_tokens = 200
mock_completion_response.id = "chatcmpl-test-id-12345"
mock_completion_response.choices = [MagicMock()]
mock_completion_response.choices[0].finish_reason = "stop"
llm_call_kwargs = {"model": "gpt-4o", "messages": [{"role": "user", "content": "..."}]}
start_time = datetime.datetime.now()
# Simulate some duration
time.sleep(0.01)
end_time = datetime.datetime.now()
# Manually call the internal success callback to simulate a completed LLM call
collector._litellm_success_callback(llm_call_kwargs, mock_completion_response, start_time, end_time)
# Simulate a commit
commit_hash = "abcdef1234567890"
commit_message = "feat: added test analytics data"
collector.log_commit(commit_hash, commit_message)
# End the interaction
collector.end_interaction()
# 3. End the session (triggers saving to shelve and writing to jsonl)
collector.end_session()
# 4. Assertions
# Check if shelve file exists and contains data
# Shelve creates multiple files, check for the base name
self.assertTrue(any(f.startswith(os.path.basename(self.data_file)) for f in os.listdir(self.analytics_dir)),
"Shelve data files should exist")
try:
# Use the base path for shelve.open
with shelve.open(self.data_file, 'r') as db:
self.assertIn("interactions", db, "Shelve should contain 'interactions' key")
interactions = db["interactions"]
self.assertIsInstance(interactions, list, "'interactions' in shelve should be a list")
self.assertEqual(len(interactions), 1, "Shelve should contain exactly one interaction")
interaction_data = interactions[0]
self.assertEqual(interaction_data.get("query"), query)
self.assertEqual(interaction_data.get("modified_files_in_chat"), modified_files)
self.assertGreater(interaction_data.get("interaction_duration_seconds", 0), 0)
self.assertIn("llm_calls_details", interaction_data)
self.assertEqual(len(interaction_data["llm_calls_details"]), 1)
llm_call_detail = interaction_data["llm_calls_details"][0]
self.assertEqual(llm_call_detail.get("model"), "gpt-4o")
self.assertEqual(llm_call_detail.get("prompt_tokens"), 100)
self.assertEqual(llm_call_detail.get("completion_tokens"), 200)
self.assertEqual(llm_call_detail.get("cost"), 0.03)
# Check timestamp format (isoformat)
self.assertIsInstance(llm_call_detail.get("timestamp"), str)
try:
datetime.datetime.fromisoformat(llm_call_detail["timestamp"])
except ValueError:
self.fail("LLM call timestamp is not in ISO format")
self.assertIn("commits_made_this_interaction", interaction_data)
self.assertEqual(len(interaction_data["commits_made_this_interaction"]), 1)
self.assertEqual(interaction_data["commits_made_this_interaction"][0].get("hash"), commit_hash)
self.assertEqual(interaction_data["commits_made_this_interaction"][0].get("message"), commit_message)
# Check token summary
token_summary = interaction_data.get("token_summary", {})
self.assertEqual(token_summary.get("prompt_tokens"), 100)
self.assertEqual(token_summary.get("completion_tokens"), 200)
self.assertEqual(token_summary.get("total_tokens"), 300)
self.assertEqual(token_summary.get("estimated_cost"), 0.03)
except Exception as e:
self.fail(f"Error reading shelve file: {e}")
# Check if session.jsonl file exists and contains data
self.assertTrue(os.path.exists(self.session_jsonl_file), "session.jsonl file should exist")
try:
with open(self.session_jsonl_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
self.assertEqual(len(lines), 1, "session.jsonl should contain exactly one line")
json_data = json.loads(lines[0])
# Verify content matches the interaction data saved in shelve
# Note: JSON serialization/deserialization might change types slightly (e.g., datetime becomes string)
# We already verified the shelve data structure above, just check some key values
self.assertIsInstance(json_data, dict)
self.assertEqual(json_data.get("query"), query)
self.assertEqual(json_data.get("modified_files_in_chat"), modified_files)
self.assertIn("llm_calls_details", json_data)
self.assertEqual(len(json_data["llm_calls_details"]), 1)
self.assertIn("commits_made_this_interaction", json_data)
self.assertEqual(len(json_data["commits_made_this_interaction"]), 1)
self.assertEqual(json_data.get("token_summary", {}).get("total_tokens"), 300)
except Exception as e:
self.fail(f"Error reading or parsing session.jsonl: {e}")
# Check if generate_dashboard was called with correct arguments
self.mock_generate_dashboard.assert_called_once()
# Check arguments: project_name, shelve_file_path, dashboard_output_path, logger
called_args, called_kwargs = self.mock_generate_dashboard.call_args
self.assertEqual(called_args[0], self.project_name)
self.assertEqual(called_args[1], self.data_file) # Verify shelve file path is passed
self.assertEqual(called_args[2], self.dashboard_output_file)
# Optionally check the logger argument type
self.assertIsInstance(called_args[3], logging.LoggerAdapter)
# This allows running the test directly from the command line
if __name__ == '__main__':
# Add a basic handler for unittest output if run directly
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
unittest.main(argv=['first-arg-is-ignored'], exit=False)