refactor: Reorganize redact script and improve code formatting

This commit is contained in:
Paul Gauthier 2025-03-11 19:30:46 -07:00 committed by Paul Gauthier (aider)
parent 533e5ec03f
commit 9513d307a1

View file

@ -1,41 +1,45 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json
import os
import re import re
import sys import sys
import os
import json
# Speed up factor for the recording # Speed up factor for the recording
SPEEDUP = 1.25 SPEEDUP = 1.25
def process_file(input_path, output_path): def process_file(input_path, output_path):
""" """
Process an asciinema cast v2 file to filter out certain sections based on ANSI cursor commands. Process an asciinema cast v2 file to filter out certain sections based on ANSI cursor commands.
Format: First line is a JSON header. Subsequent lines are JSON arrays: [timestamp, "o", "text"] Format: First line is a JSON header. Subsequent lines are JSON arrays: [timestamp, "o", "text"]
If a text field contains "\u001b[ROW;COL]H" followed by "Atuin", skip it and all subsequent If a text field contains "\u001b[ROW;COL]H" followed by "Atuin", skip it and all subsequent
records until finding a text with "\u001b[ROW;(COL-1)H". records until finding a text with "\u001b[ROW;(COL-1)H".
Maintains consistent timestamps by: Maintains consistent timestamps by:
1. Not advancing time during skip sections 1. Not advancing time during skip sections
2. Compressing any long gaps to 0.5 seconds maximum 2. Compressing any long gaps to 0.5 seconds maximum
""" """
skip_mode = False skip_mode = False
target_pattern = None target_pattern = None
ansi_pattern = re.compile(r'\u001b\[(\d+);(\d+)H') ansi_pattern = re.compile(r"\u001b\[(\d+);(\d+)H")
is_first_line = True is_first_line = True
last_timestamp = 0.0 last_timestamp = 0.0
time_offset = 0.0 # Accumulator for time to subtract time_offset = 0.0 # Accumulator for time to subtract
max_gap = 0.5 # Maximum allowed time gap between events max_gap = 0.5 # Maximum allowed time gap between events
with open(input_path, 'r', encoding='utf-8') as infile, open(output_path, 'w', encoding='utf-8') as outfile: with (
open(input_path, "r", encoding="utf-8") as infile,
open(output_path, "w", encoding="utf-8") as outfile,
):
for line in infile: for line in infile:
# Always include the header (first line) # Always include the header (first line)
if is_first_line: if is_first_line:
outfile.write(line) outfile.write(line)
is_first_line = False is_first_line = False
continue continue
# Parse the JSON record # Parse the JSON record
try: try:
record = json.loads(line) record = json.loads(line)
@ -43,28 +47,28 @@ def process_file(input_path, output_path):
# If not a valid record, just write it out # If not a valid record, just write it out
outfile.write(line) outfile.write(line)
continue continue
current_timestamp = float(record[0]) current_timestamp = float(record[0])
text = record[2] # The text content text = record[2] # The text content
# If we're not in skip mode, check if we need to enter it # If we're not in skip mode, check if we need to enter it
if not skip_mode: if not skip_mode:
if '\u001b[' in text and 'Atuin' in text: if "\u001b[" in text and "Atuin" in text:
match = ansi_pattern.search(text) match = ansi_pattern.search(text)
if match: if match:
row = match.group(1) row = match.group(1)
col = int(match.group(2)) col = int(match.group(2))
# Create pattern for the ending sequence # Create pattern for the ending sequence
target_pattern = f'\u001b[{row};{col-1}H' target_pattern = f"\u001b[{row};{col-1}H"
skip_mode = True skip_mode = True
# Start tracking time to subtract # Start tracking time to subtract
skip_start_time = current_timestamp skip_start_time = current_timestamp
continue # Skip this record continue # Skip this record
# If we're not skipping, write the record with adjusted timestamp # If we're not skipping, write the record with adjusted timestamp
# First, adjust for skipped sections # First, adjust for skipped sections
adjusted_timestamp = current_timestamp - time_offset adjusted_timestamp = current_timestamp - time_offset
# Then, check if there's a long gap to compress # Then, check if there's a long gap to compress
if last_timestamp > 0: if last_timestamp > 0:
time_gap = adjusted_timestamp - last_timestamp time_gap = adjusted_timestamp - last_timestamp
@ -73,27 +77,27 @@ def process_file(input_path, output_path):
excess_time = time_gap - max_gap excess_time = time_gap - max_gap
time_offset += excess_time time_offset += excess_time
adjusted_timestamp -= excess_time adjusted_timestamp -= excess_time
# Ensure timestamps never go backward # Ensure timestamps never go backward
adjusted_timestamp = max(adjusted_timestamp, last_timestamp) adjusted_timestamp = max(adjusted_timestamp, last_timestamp)
last_timestamp = adjusted_timestamp last_timestamp = adjusted_timestamp
# Apply speedup factor to the timestamp # Apply speedup factor to the timestamp
record[0] = adjusted_timestamp / SPEEDUP record[0] = adjusted_timestamp / SPEEDUP
outfile.write(json.dumps(record) + '\n') outfile.write(json.dumps(record) + "\n")
# If we're in skip mode, check if we should exit it # If we're in skip mode, check if we should exit it
else: else:
if target_pattern in text: if target_pattern in text:
skip_mode = False skip_mode = False
# Calculate how much time to subtract from future timestamps # Calculate how much time to subtract from future timestamps
time_offset += (current_timestamp - skip_start_time) time_offset += current_timestamp - skip_start_time
# Add a 0.5 second pause after each skip section # Add a 0.5 second pause after each skip section
last_timestamp += 0.5 last_timestamp += 0.5
# Write this record with adjusted timestamp # Write this record with adjusted timestamp
adjusted_timestamp = current_timestamp - time_offset adjusted_timestamp = current_timestamp - time_offset
# Check if there's a long gap to compress # Check if there's a long gap to compress
if last_timestamp > 0: if last_timestamp > 0:
time_gap = adjusted_timestamp - last_timestamp time_gap = adjusted_timestamp - last_timestamp
@ -102,19 +106,20 @@ def process_file(input_path, output_path):
excess_time = time_gap - max_gap excess_time = time_gap - max_gap
time_offset += excess_time time_offset += excess_time
adjusted_timestamp -= excess_time adjusted_timestamp -= excess_time
# Ensure timestamps never go backward # Ensure timestamps never go backward
adjusted_timestamp = max(adjusted_timestamp, last_timestamp) adjusted_timestamp = max(adjusted_timestamp, last_timestamp)
last_timestamp = adjusted_timestamp last_timestamp = adjusted_timestamp
# Apply speedup factor to the timestamp # Apply speedup factor to the timestamp
record[0] = adjusted_timestamp / SPEEDUP record[0] = adjusted_timestamp / SPEEDUP
outfile.write(json.dumps(record) + '\n') outfile.write(json.dumps(record) + "\n")
# Otherwise we're still in skip mode, don't write anything # Otherwise we're still in skip mode, don't write anything
except json.JSONDecodeError: except json.JSONDecodeError:
# If we can't parse the line as JSON, include it anyway # If we can't parse the line as JSON, include it anyway
outfile.write(line) outfile.write(line)
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) != 3: if len(sys.argv) != 3:
print(f"Usage: {os.path.basename(sys.argv[0])} input_file output_file") print(f"Usage: {os.path.basename(sys.argv[0])} input_file output_file")