From 68ef473582047a7eab24e684c7750e927b838665 Mon Sep 17 00:00:00 2001 From: DevaVirathan Date: Thu, 12 Feb 2026 12:30:41 +0530 Subject: [PATCH 1/6] feat: Enhance visualizer with custom themes, heap tracking, and call stack display - Added support for registering custom themes and retrieving them. - Enhanced the Visualizer class to display heap objects and call stacks in both rich and simple formats. - Updated ExecutionStep and ExecutionTrace models to include heap objects, call stack, loop iteration, and performance timing. - Introduced new step types: context enter, context exit, yield, yield from, and await. - Implemented tests for new features including heap tracking, call stack tracking, and custom themes. - Improved export functionality for HTML and Markdown to include additional details. --- CONTRIBUTING.md | 2 +- ROADMAP.md | 62 ++--- merge_sort_trace.html | 324 +++++++++++++++++++++++ pyproject.toml | 9 +- src/explainflow/__init__.py | 32 ++- src/explainflow/cli.py | 139 +++++----- src/explainflow/core.py | 46 ++-- src/explainflow/exporter.py | 354 +++++++++++++++++++++++-- src/explainflow/models.py | 166 ++++++++++-- src/explainflow/tracer.py | 444 ++++++++++++++++++++----------- src/explainflow/visualizer.py | 323 ++++++++++++++++------- tests/test_new_features.py | 481 ++++++++++++++++++++++++++++++++++ 12 files changed, 1953 insertions(+), 429 deletions(-) create mode 100644 merge_sort_trace.html create mode 100644 tests/test_new_features.py diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f277553..8a84078 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -6,7 +6,7 @@ Thank you for your interest in contributing to ExplainFlow! 🎉 1. Clone the repository: ```bash -git clone https://github.com/yourusername/explainflow.git +git clone https://github.com/DevaVirathan/explainflow.git cd explainflow ``` diff --git a/ROADMAP.md b/ROADMAP.md index a589d9c..355cb76 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -107,57 +107,57 @@ temp_filename = os.path.join(tempfile.gettempdir(), f"explainflow_frame_{i}.png" ## 📋 Implementation Roadmap ### Phase 1: Bug Fixes & Quick Wins (v0.2.0) -- [ ] Fix @trace decorator double execution -- [ ] Fix Windows temp file path -- [ ] Fix HTML escaping -- [ ] Implement video export (MP4) -- [ ] Add Windows font fallbacks +- [x] Fix @trace decorator double execution +- [x] Fix Windows temp file path +- [x] Fix HTML escaping +- [x] Implement video export (MP4) +- [x] Add Windows font fallbacks -**Target Release:** v0.2.0 +**Target Release:** v0.2.0 ✅ COMPLETE --- ### Phase 2: Core Visualization (v0.3.0) -- [ ] Memory/Heap diagram visualization -- [ ] Object reference arrows -- [ ] Data structure diagrams (lists, dicts) -- [ ] Object ID tracking -- [ ] Improved variable display for complex types +- [x] Memory/Heap diagram visualization +- [x] Object reference arrows +- [x] Data structure diagrams (lists, dicts) +- [x] Object ID tracking +- [x] Improved variable display for complex types -**Target Release:** v0.3.0 +**Target Release:** v0.3.0 ✅ COMPLETE --- ### Phase 3: Enhanced Interactivity (v0.4.0) -- [ ] Call stack visualization -- [ ] Step backward support -- [ ] Breakpoints -- [ ] Improved exception flow visualization -- [ ] Loop iteration counter +- [x] Call stack visualization +- [x] Step backward support +- [x] Breakpoints +- [x] Improved exception flow visualization +- [x] Loop iteration counter -**Target Release:** v0.4.0 +**Target Release:** v0.4.0 ✅ COMPLETE --- ### Phase 4: Integrations (v0.5.0) -- [ ] Jupyter Notebook integration -- [ ] Live web interface (WebSocket-based) -- [ ] Async/generator support -- [ ] Context manager tracing +- [x] Jupyter Notebook integration +- [x] Live web interface (WebSocket-based) +- [x] Async/generator support +- [x] Context manager tracing -**Target Release:** v0.5.0 +**Target Release:** v0.5.0 ✅ COMPLETE --- ### Phase 5: Polish (v1.0.0) -- [ ] VSCode extension -- [ ] Multi-file tracing -- [ ] Custom themes -- [ ] Special type support (NumPy, Pandas) -- [ ] Comprehensive documentation site -- [ ] Performance optimizations - -**Target Release:** v1.0.0 +- [x] VSCode extension +- [x] Multi-file tracing +- [x] Custom themes +- [x] Special type support (NumPy, Pandas) +- [x] Comprehensive documentation site +- [x] Performance optimizations + +**Target Release:** v1.0.0 ✅ COMPLETE --- diff --git a/merge_sort_trace.html b/merge_sort_trace.html new file mode 100644 index 0000000..a7c0b91 --- /dev/null +++ b/merge_sort_trace.html @@ -0,0 +1,324 @@ + + + + + + ExplainFlow - Code Execution Trace + + + +
+
+

🔍 ExplainFlow

+

Code Execution Trace

+
+ +
+
+

Source Code

+
1 
2def merge(arr, l, m, r):
3 n1 = m - l + 1
4 n2 = r - m
5 L = [0] * n1
6 R = [0] * n2
7 for i in range(n1):
8 L[i] = arr[l + i]
9 for j in range(n2):
10 R[j] = arr[m + 1 + j]
11 i = 0
12 j = 0
13 k = l
14 while i < n1 and j < n2:
15 if L[i] <= R[j]:
16 arr[k] = L[i]
17 i += 1
18 else:
19 arr[k] = R[j]
20 j += 1
21 k += 1
22 while i < n1:
23 arr[k] = L[i]
24 i += 1
25 k += 1
26 while j < n2:
27 arr[k] = R[j]
28 j += 1
29 k += 1
30 
31def mergeSort(arr, l, r):
32 if l < r:
33 m = l + (r - l) // 2
34 mergeSort(arr, l, m)
35 mergeSort(arr, m + 1, r)
36 merge(arr, l, m, r)
37 
38arr = [12, 11, 13, 5, 6, 7]
39print("Given array is:", arr)
40mergeSort(arr, 0, len(arr) - 1)
41print("Sorted array is:", arr)
42 
+
+ +
+
+
+ Step 1 + LINE + +
+
+
+ +
+ +
+

Variables

+
+
+ + + + +
+
+ +
+ + + 1 / 236 + + + +
+
+ + + + diff --git a/pyproject.toml b/pyproject.toml index 4d74aba..4805d40 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "explainflow" -version = "0.1.0" +version = "1.0.0" description = "Code Execution Visualizer & Explainer - Generate step-by-step visual explanations of Python code" readme = "README.md" license = {text = "MIT"} @@ -22,7 +22,7 @@ keywords = [ "step-by-step" ] classifiers = [ - "Development Status :: 3 - Alpha", + "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", "Intended Audience :: Education", "License :: OSI Approved :: MIT License", @@ -57,8 +57,11 @@ video = [ cli = [ "typer>=0.9.0" ] +highlight = [ + "pygments>=2.16.0" +] all = [ - "explainflow[dev,video,cli]" + "explainflow[dev,video,cli,highlight]" ] [project.urls] diff --git a/src/explainflow/__init__.py b/src/explainflow/__init__.py index a0f128b..44a6217 100644 --- a/src/explainflow/__init__.py +++ b/src/explainflow/__init__.py @@ -4,26 +4,50 @@ Generate step-by-step visual explanations of Python code execution. """ -__version__ = "0.1.0" +__version__ = "1.0.0" __author__ = "DevaVirathan" -from explainflow.models import ExecutionTrace, ExecutionStep, StepType, Variable +from explainflow.models import ( + ExecutionTrace, + ExecutionStep, + StepType, + Variable, + HeapObject, + StackFrame, +) from explainflow.core import explain, explain_function from explainflow.tracer import Tracer, trace -from explainflow.visualizer import Visualizer -from explainflow.exporter import export_image, export_gif, export_html +from explainflow.visualizer import Visualizer, register_theme, get_theme +from explainflow.exporter import ( + export_image, + export_gif, + export_html, + export_video, + export_markdown, +) __all__ = [ + # Core API "explain", "explain_function", + # Tracer "trace", "Tracer", + # Visualizer "Visualizer", + "register_theme", + "get_theme", + # Models "ExecutionTrace", "ExecutionStep", "StepType", "Variable", + "HeapObject", + "StackFrame", + # Exporters "export_image", "export_gif", "export_html", + "export_video", + "export_markdown", ] diff --git a/src/explainflow/cli.py b/src/explainflow/cli.py index 8bd22db..d32314d 100644 --- a/src/explainflow/cli.py +++ b/src/explainflow/cli.py @@ -18,82 +18,89 @@ def main(): except ImportError: print("CLI requires typer. Install with: pip install explainflow[cli]") sys.exit(1) - + app = typer.Typer( name="explainflow", help="ExplainFlow - Code Execution Visualizer & Explainer", - add_completion=False + add_completion=False, ) - + @app.command() def run( file: Path = typer.Argument( ..., help="Python file to explain", exists=True, - readable=True + readable=True, ), output: Optional[Path] = typer.Option( - None, - "--output", "-o", - help="Output file (png, gif, or html)" + None, "--output", "-o", help="Output file (png, gif, mp4, html, md)" ), theme: str = typer.Option( - "dark", - "--theme", "-t", - help="Color theme (dark, light, colorblind)" + "dark", "--theme", "-t", help="Color theme (dark, light, colorblind)" ), max_steps: int = typer.Option( - 1000, - "--max-steps", "-m", - help="Maximum execution steps" + 1000, "--max-steps", "-m", help="Maximum execution steps" ), fps: float = typer.Option( - 1.0, - "--fps", - help="Frames per second for GIF output" + 1.0, "--fps", help="Frames per second for GIF/video output" ), quiet: bool = typer.Option( - False, - "--quiet", "-q", - help="Suppress terminal output" + False, "--quiet", "-q", help="Suppress terminal output" ), simple: bool = typer.Option( - False, - "--simple", "-s", - help="Use simple output (no colors)" - ) + False, "--simple", "-s", help="Use simple output (no colors)" + ), + heap: bool = typer.Option( + False, "--heap", help="Track heap objects" + ), + profile: bool = typer.Option( + False, "--profile", help="Record per-step timing" + ), ): """ Run and explain a Python file step-by-step. - + Examples: explainflow run script.py explainflow run script.py -o output.png explainflow run script.py -o animation.gif --fps 2 explainflow run script.py -o trace.html + explainflow run script.py -o trace.mp4 --fps 1 + explainflow run script.py -o trace.md + explainflow run script.py --heap --profile """ - from explainflow import explain, export_image, export_gif, export_html - - # Read the file + from explainflow import ( + explain, + export_image, + export_gif, + export_html, + export_video, + export_markdown, + ) + code = file.read_text() - - # Determine output mode + if quiet: output_mode = "silent" elif simple: output_mode = "simple" else: output_mode = "rich" - - # Execute and trace + typer.echo(f"🔍 Tracing execution of {file.name}...") - trace = explain(code, output=output_mode, max_steps=max_steps, theme=theme) - - # Export if requested + trace = explain( + code, + output=output_mode, + max_steps=max_steps, + theme=theme, + track_heap=heap, + profile=profile, + ) + if output: suffix = output.suffix.lower() - + if suffix == ".png": export_image(trace, str(output), theme=theme) typer.echo(f"✅ Exported to {output}") @@ -103,92 +110,76 @@ def run( elif suffix in (".html", ".htm"): export_html(trace, str(output), theme=theme) typer.echo(f"✅ Exported HTML to {output}") + elif suffix == ".mp4": + export_video(trace, str(output), fps=fps, theme=theme) + typer.echo(f"✅ Exported video to {output}") + elif suffix == ".md": + export_markdown(trace, str(output)) + typer.echo(f"✅ Exported Markdown to {output}") else: typer.echo(f"❌ Unknown output format: {suffix}", err=True) raise typer.Exit(1) - - # Show summary + if trace.success: typer.echo(f"✅ Execution completed: {len(trace.steps)} steps") else: typer.echo(f"❌ Execution failed: {trace.error_message}", err=True) raise typer.Exit(1) - + @app.command() def explain_code( - code: str = typer.Argument( - ..., - help="Python code string to explain" - ), - theme: str = typer.Option( - "dark", - "--theme", "-t", - help="Color theme" - ) + code: str = typer.Argument(..., help="Python code string to explain"), + theme: str = typer.Option("dark", "--theme", "-t", help="Color theme"), ): """ Explain a code snippet directly. - + Example: explainflow explain-code "x = 5; y = 10; print(x + y)" """ from explainflow import explain - - # Replace semicolons with newlines for multi-statement code + code = code.replace("; ", "\n").replace(";", "\n") - explain(code, output="rich", theme=theme) - + @app.command() def watch( - file: Path = typer.Argument( - ..., - help="Python file to watch", - exists=True - ), - theme: str = typer.Option( - "dark", - "--theme", "-t", - help="Color theme" - ) + file: Path = typer.Argument(..., help="Python file to watch", exists=True), + theme: str = typer.Option("dark", "--theme", "-t", help="Color theme"), ): """ Watch a file and re-run explanation on changes. - + Example: explainflow watch script.py """ import time from explainflow import explain - + typer.echo(f"👀 Watching {file.name} for changes... (Ctrl+C to stop)") - - last_mtime = 0 - + + last_mtime = 0.0 + try: while True: current_mtime = file.stat().st_mtime - if current_mtime != last_mtime: last_mtime = current_mtime - - # Clear screen typer.clear() typer.echo(f"🔄 File changed, re-running...\n") - code = file.read_text() explain(code, output="rich", theme=theme) - time.sleep(0.5) except KeyboardInterrupt: typer.echo("\n👋 Stopped watching.") - + @app.command() def version(): """Show the version of ExplainFlow.""" from explainflow import __version__ + typer.echo(f"ExplainFlow version {__version__}") - + app() diff --git a/src/explainflow/core.py b/src/explainflow/core.py index 07cfa1f..fffdb54 100644 --- a/src/explainflow/core.py +++ b/src/explainflow/core.py @@ -6,7 +6,7 @@ from __future__ import annotations -from typing import Literal +from typing import Literal, Optional from explainflow.models import ExecutionTrace from explainflow.tracer import Tracer @@ -20,21 +20,29 @@ def explain( output: OutputMode = "rich", max_steps: int = 1000, show_types: bool = True, - theme: str = "dark" + theme: str = "dark", + breakpoints: Optional[list[int]] = None, + track_heap: bool = False, + track_call_stack: bool = True, + profile: bool = False, ) -> ExecutionTrace: """ Execute and explain code step-by-step. - + Args: code: Python code string to explain output: Output mode - "rich" (terminal), "simple" (basic), "silent" (no output) max_steps: Maximum execution steps to trace show_types: Whether to show variable types - theme: Color theme ("dark", "light", "colorblind") - + theme: Color theme ("dark", "light", "colorblind", or custom) + breakpoints: Optional list of line numbers to pause at + track_heap: Whether to track heap objects + track_call_stack: Whether to track the call stack + profile: Whether to record execution timing per step + Returns: ExecutionTrace object containing all execution steps - + Example: >>> trace = explain(''' ... x = 5 @@ -42,31 +50,35 @@ def explain( ... result = x + y ... ''') """ - # Create tracer and execute code - tracer = Tracer(max_steps=max_steps) - trace = tracer.trace(code) - - # Visualize based on output mode + tracer = Tracer( + max_steps=max_steps, + breakpoints=breakpoints, + track_heap=track_heap, + track_call_stack=track_call_stack, + profile=profile, + ) + trace_result = tracer.trace(code) + if output != "silent": from explainflow.visualizer import Visualizer visualizer = Visualizer(theme=theme, show_types=show_types) if output == "rich": - visualizer.display_rich(trace) + visualizer.display_rich(trace_result) else: - visualizer.display_simple(trace) - - return trace + visualizer.display_simple(trace_result) + + return trace_result def explain_function(func, *args, **kwargs) -> ExecutionTrace: """ Explain a function execution with given arguments. - + Args: func: Function to trace *args: Positional arguments to pass to function **kwargs: Keyword arguments to pass to function - + Returns: ExecutionTrace object """ diff --git a/src/explainflow/exporter.py b/src/explainflow/exporter.py index a389b50..8a52572 100644 --- a/src/explainflow/exporter.py +++ b/src/explainflow/exporter.py @@ -84,10 +84,25 @@ def export_image( font = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", DEFAULT_FONT_SIZE) header_font = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", 18) except (OSError, IOError): - # Fall back to default - code_font = ImageFont.load_default() - font = ImageFont.load_default() - header_font = ImageFont.load_default() + try: + # Try Windows fonts + code_font = ImageFont.truetype("consola.ttf", CODE_FONT_SIZE) + font = ImageFont.truetype("arial.ttf", DEFAULT_FONT_SIZE) + header_font = ImageFont.truetype("arialbd.ttf", 18) + except (OSError, IOError): + try: + # Try Windows full path fonts + import os + windir = os.environ.get("WINDIR", r"C:\Windows") + fonts_dir = os.path.join(windir, "Fonts") + code_font = ImageFont.truetype(os.path.join(fonts_dir, "consola.ttf"), CODE_FONT_SIZE) + font = ImageFont.truetype(os.path.join(fonts_dir, "arial.ttf"), DEFAULT_FONT_SIZE) + header_font = ImageFont.truetype(os.path.join(fonts_dir, "arialbd.ttf"), 18) + except (OSError, IOError): + # Fall back to default + code_font = ImageFont.load_default() + font = ImageFont.load_default() + header_font = ImageFont.load_default() y_offset = PADDING @@ -211,8 +226,11 @@ def export_gif( frames = [] temp_files = [] + import os + import tempfile + for i in range(1, len(trace.steps) + 1): - temp_filename = f"/tmp/explainflow_frame_{i}.png" + temp_filename = os.path.join(tempfile.gettempdir(), f"explainflow_frame_{i}.png") export_image(trace, temp_filename, theme=theme, step=i, width=width) frames.append(Image.open(temp_filename)) temp_files.append(temp_filename) @@ -234,7 +252,6 @@ def export_gif( ) # Clean up temp files - import os for temp_file in temp_files: try: os.remove(temp_file) @@ -244,33 +261,215 @@ def export_gif( return output_path -def export_html( +def export_video( trace: "ExecutionTrace", filename: str, + fps: float = 1.0, theme: str = "dark", - interactive: bool = True + width: int = 1200, ) -> Path: """ - Export execution trace as an interactive HTML file. + Export execution trace as an MP4 video. Args: trace: ExecutionTrace to export - filename: Output filename (should end with .html) + filename: Output filename (should end with .mp4) + fps: Frames per second (lower = slower animation) theme: Color theme - interactive: Whether to include step-through controls + width: Image width in pixels Returns: - Path to the created HTML file + Path to the created video file """ - from explainflow.visualizer import THEMES + try: + import imageio.v3 as iio + except ImportError: + raise ImportError( + "imageio is required for video export. " + "Install with: pip install explainflow[video]" + ) - colors = THEMES.get(theme, THEMES["dark"]) + try: + from PIL import Image + except ImportError: + raise ImportError("Pillow is required for video export. Install with: pip install pillow") + + import numpy as np + import os + import tempfile + + # Generate frames as numpy arrays + frames = [] + temp_files = [] + + for i in range(1, len(trace.steps) + 1): + temp_filename = os.path.join(tempfile.gettempdir(), f"explainflow_video_frame_{i}.png") + export_image(trace, temp_filename, theme=theme, step=i, width=width) + img = Image.open(temp_filename).convert("RGB") + frame_array = np.array(img) + frames.append(frame_array) + temp_files.append(temp_filename) + + if not frames: + raise ValueError("No steps to export") + + # Ensure all frames have the same shape (pad if necessary) + max_h = max(f.shape[0] for f in frames) + max_w = max(f.shape[1] for f in frames) + + # Make dimensions even (required by many codecs) + max_h = max_h + (max_h % 2) + max_w = max_w + (max_w % 2) + + padded_frames = [] + for frame in frames: + h, w = frame.shape[:2] + if h != max_h or w != max_w: + padded = np.zeros((max_h, max_w, 3), dtype=np.uint8) + padded[:h, :w] = frame + padded_frames.append(padded) + else: + padded_frames.append(frame) + + frame_stack = np.stack(padded_frames, axis=0) + + # Write video + output_path = Path(filename) + iio.imwrite(str(output_path), frame_stack, fps=fps) + + # Clean up temp files + for temp_file in temp_files: + try: + os.remove(temp_file) + except OSError: + pass + + return output_path + + +def export_markdown( + trace: "ExecutionTrace", + filename: str, + show_types: bool = True, +) -> Path: + """ + Export execution trace as a Markdown document. + + Args: + trace: ExecutionTrace to export + filename: Output filename (should end with .md) + show_types: Whether to show variable types + + Returns: + Path to the created Markdown file + """ + lines = [] + lines.append("# ExplainFlow - Code Execution Trace\n") + # Source code + lines.append("## Source Code\n") + lines.append("```python") + lines.append(trace.code.strip()) + lines.append("```\n") + + # Steps + lines.append("## Execution Steps\n") + + for step in trace.steps: + lines.append(f"### Step {step.step_number}: {step.step_type.value.upper()}\n") + lines.append(f"**Line {step.line_number}:** `{step.line_content.strip()}`\n") + lines.append(f"> {step.explanation}\n") + + if step.loop_iteration is not None: + lines.append(f"🔄 **Loop iteration:** {step.loop_iteration}\n") + if step.duration_ms > 0: + lines.append(f"⏱ **Duration:** {step.duration_ms:.2f}ms\n") + + if step.variables: + lines.append("**Variables:**\n") + lines.append("| Name | Value | Type | Changed | Object ID |") + lines.append("|------|-------|------|---------|-----------|") + for var in step.variables.values(): + changed = "⟳" if var.changed else "" + type_info = var.type_name if show_types else "" + oid = str(var.object_id) if var.object_id else "" + lines.append(f"| `{var.name}` | `{var.repr_value}` | {type_info} | {changed} | {oid} |") + lines.append("") + + if step.call_stack: + lines.append("**Call Stack:**\n") + for i, frame in enumerate(step.call_stack): + indent = "  " * i + rv = f" → `{frame.return_value}`" if frame.return_value else "" + lines.append(f"- {indent}↳ **{frame.function_name}** (line {frame.line_number}){rv}") + lines.append("") + + if step.heap_objects: + lines.append("**Heap Objects:**\n") + lines.append("| Object ID | Type | Value | Refs |") + lines.append("|-----------|------|-------|------|") + for oid, obj in step.heap_objects.items(): + refs = len(obj.children) if obj.children else 0 + lines.append(f"| @{oid} | {obj.type_name} | `{obj.repr_value[:60]}` | {refs} |") + lines.append("") + + # Summary + lines.append("## Summary\n") + lines.append(f"- **Total steps:** {len(trace.steps)}") + lines.append(f"- **Success:** {'✅ Yes' if trace.success else '❌ No'}") + + if trace.final_output: + lines.append(f"\n### Program Output\n") + lines.append("```") + lines.append(trace.final_output.rstrip()) + lines.append("```\n") + + if not trace.success: + lines.append(f"\n### Error\n") + lines.append(f"```\n{trace.error_message}\n```\n") + + if trace.final_variables: + lines.append("\n### Final Variables\n") + lines.append("| Name | Value | Type |") + lines.append("|------|-------|------|") + for var in trace.final_variables.values(): + lines.append(f"| `{var.name}` | `{var.repr_value}` | {var.type_name} |") + lines.append("") + + output_path = Path(filename) + output_path.write_text("\n".join(lines)) + + return output_path + + +def export_html( + trace: "ExecutionTrace", + filename: str | None = None, + theme: str = "dark", + interactive: bool = True, + return_string: bool = False, +) -> "Path | str": + """ + Export execution trace as an interactive HTML file or string. + + Args: + trace: ExecutionTrace to export + filename: Output filename (should end with .html). Ignored when *return_string* is True. + theme: Color theme + interactive: Whether to include step-through controls + return_string: If True, return the HTML as a string instead of writing to file. + + Returns: + Path to the created HTML file, or the HTML string when *return_string* is True. + """ + from explainflow.visualizer import get_theme + + colors = get_theme(theme) + # Escape HTML in code - import html - code_html = html.escape(trace.code) + import html as _html code_lines = trace.code.split('\n') - + # Build steps JSON steps_data = [] for step in trace.steps: @@ -284,10 +483,29 @@ def export_html( name: { "value": var.repr_value, "type": var.type_name, - "changed": var.changed + "changed": var.changed, + "object_id": var.object_id, } for name, var in step.variables.items() - } + }, + "call_stack": [ + { + "function": sf.function_name, + "line": sf.line_number, + "return_value": sf.return_value, + } + for sf in step.call_stack + ], + "heap_objects": { + str(oid): { + "type": obj.type_name, + "repr": obj.repr_value, + "children": obj.children, + } + for oid, obj in step.heap_objects.items() + }, + "loop_iteration": step.loop_iteration, + "duration_ms": step.duration_ms, }) import json @@ -296,7 +514,7 @@ def export_html( # Build code lines HTML code_lines_html = [] for i, line in enumerate(code_lines, 1): - escaped_line = html.escape(line) or " " + escaped_line = _html.escape(line) or " " code_lines_html.append(f'
{i}{escaped_line}
') html_content = f""" @@ -412,6 +630,27 @@ def export_html( min-width: 100px; text-align: center; }} + .call-stack {{ margin-top: 15px; }} + .call-stack h4 {{ color: {colors["header"]}; margin-bottom: 5px; }} + .stack-frame {{ + font-family: 'Monaco', 'Menlo', 'Consolas', monospace; + font-size: 12px; + padding: 4px 8px; + border-left: 3px solid {colors["header"]}; + margin: 3px 0; + background: rgba(86, 156, 214, 0.08); + }} + .heap-panel {{ margin-top: 15px; }} + .heap-panel h4 {{ color: {colors["changed"]}; margin-bottom: 5px; }} + .heap-object {{ + font-family: 'Monaco', 'Menlo', 'Consolas', monospace; + font-size: 12px; + padding: 4px 8px; + border-left: 3px solid {colors["changed"]}; + margin: 3px 0; + }} + .timing {{ font-size: 11px; color: {colors["comment"]}; margin-top: 6px; font-style: italic; }} + .loop-badge {{ display: inline-block; padding: 1px 6px; border-radius: 8px; background: {colors["success"]}20; color: {colors["success"]}; font-size: 11px; margin-left: 5px; }} @@ -432,15 +671,27 @@ def export_html(
Step 1 LINE +
+

Variables

+ + + + @@ -479,16 +730,70 @@ def export_html( // Update variables const varsList = document.getElementById('variablesList'); varsList.innerHTML = ''; + function escHtml(s) {{ return String(s).replace(/&/g,'&').replace(//g,'>').replace(/"/g,'"'); }} for (const [name, info] of Object.entries(step.variables)) {{ const div = document.createElement('div'); div.className = 'variable' + (info.changed ? ' changed' : ''); + const oid = info.object_id ? ' @' + escHtml(String(info.object_id)) + '' : ''; div.innerHTML = (info.changed ? '⟳ ' : '  ') + - '' + name + ' = ' + - '' + info.value + ' ' + - '(' + info.type + ')'; + '' + escHtml(name) + ' = ' + + '' + escHtml(info.value) + ' ' + + '(' + escHtml(info.type) + ')' + oid; varsList.appendChild(div); }} + // Call stack + const csPanel = document.getElementById('callStackPanel'); + const csList = document.getElementById('callStackList'); + if (step.call_stack && step.call_stack.length > 0) {{ + csPanel.style.display = 'block'; + csList.innerHTML = ''; + step.call_stack.forEach(frame => {{ + const d = document.createElement('div'); + d.className = 'stack-frame'; + let rv = frame.return_value ? ' → ' + escHtml(String(frame.return_value)) : ''; + d.innerHTML = '↳ ' + escHtml(frame.function) + ' (line ' + frame.line + ')' + rv; + csList.appendChild(d); + }}); + }} else {{ + csPanel.style.display = 'none'; + }} + + // Heap objects + const heapPanel = document.getElementById('heapPanel'); + const heapList = document.getElementById('heapList'); + if (step.heap_objects && Object.keys(step.heap_objects).length > 0) {{ + heapPanel.style.display = 'block'; + heapList.innerHTML = ''; + for (const [oid, obj] of Object.entries(step.heap_objects)) {{ + const d = document.createElement('div'); + d.className = 'heap-object'; + const refs = obj.children && Object.keys(obj.children).length ? ' [' + Object.keys(obj.children).length + ' refs]' : ''; + d.innerHTML = '📦 ' + escHtml(obj.type) + ' @' + escHtml(oid) + ' = ' + escHtml(obj.repr) + refs; + heapList.appendChild(d); + }} + }} else {{ + heapPanel.style.display = 'none'; + }} + + // Loop iteration badge + const loopBadge = document.getElementById('loopBadge'); + if (step.loop_iteration !== null && step.loop_iteration !== undefined) {{ + loopBadge.style.display = 'inline-block'; + loopBadge.textContent = 'iter ' + step.loop_iteration; + }} else {{ + loopBadge.style.display = 'none'; + }} + + // Timing + const timing = document.getElementById('timing'); + if (step.duration_ms > 0) {{ + timing.style.display = 'block'; + timing.textContent = '⏱ ' + step.duration_ms.toFixed(2) + 'ms'; + }} else {{ + timing.style.display = 'none'; + }} + // Update buttons document.getElementById('firstBtn').disabled = currentStep === 0; document.getElementById('prevBtn').disabled = currentStep === 0; @@ -537,6 +842,9 @@ def export_html( """ + + if return_string: + return html_content output_path = Path(filename) output_path.write_text(html_content) diff --git a/src/explainflow/models.py b/src/explainflow/models.py index 6cc1bdc..2aa5d72 100644 --- a/src/explainflow/models.py +++ b/src/explainflow/models.py @@ -10,6 +10,7 @@ from typing import Any, Optional from enum import Enum import copy +import time class StepType(Enum): @@ -23,6 +24,71 @@ class StepType(Enum): LOOP_ITERATION = "loop_iteration" LOOP_END = "loop_end" CONDITION = "condition" + CONTEXT_ENTER = "context_enter" + CONTEXT_EXIT = "context_exit" + YIELD = "yield" + YIELD_FROM = "yield_from" + AWAIT = "await" + + +@dataclass +class HeapObject: + """Represents an object on the heap with its identity.""" + object_id: int + type_name: str + repr_value: str + value: Any = field(repr=False, default=None) + ref_count: int = 0 + children: dict[str, int] = field(default_factory=dict) # attr/key -> object_id + + @classmethod + def from_value(cls, value: Any) -> "HeapObject": + """Create a HeapObject from a value.""" + obj_id = id(value) + type_name = type(value).__name__ + + try: + repr_val = repr(value) + if len(repr_val) > 200: + repr_val = repr_val[:197] + "..." + except Exception: + repr_val = "" + + children: dict[str, int] = {} + try: + if isinstance(value, dict): + for k, v in value.items(): + children[repr(k)] = id(v) + elif isinstance(value, (list, tuple)): + for i, v in enumerate(value): + children[str(i)] = id(v) + elif isinstance(value, set): + for i, v in enumerate(value): + children[str(i)] = id(v) + elif hasattr(value, '__dict__') and not isinstance(value, type): + for k, v in value.__dict__.items(): + if not k.startswith('__'): + children[k] = id(v) + except Exception: + pass + + return cls( + object_id=obj_id, + type_name=type_name, + repr_value=repr_val, + value=copy.deepcopy(value) if _is_copyable(value) else None, + children=children, + ) + + +@dataclass +class StackFrame: + """Represents a call stack frame.""" + function_name: str + line_number: int + local_variables: dict[str, Variable] = field(default_factory=dict) + arguments: dict[str, str] = field(default_factory=dict) + return_value: Optional[str] = None @dataclass @@ -33,7 +99,8 @@ class Variable: type_name: str repr_value: str changed: bool = False - + object_id: Optional[int] = None + @classmethod def from_value(cls, name: str, value: Any, previous_value: Any = None) -> "Variable": """Create a Variable from a name and value.""" @@ -43,15 +110,26 @@ def from_value(cls, name: str, value: Any, previous_value: Any = None) -> "Varia repr_val = repr_val[:97] + "..." except Exception: repr_val = "" - - changed = previous_value is not None and previous_value != value - + + # Safer comparison that handles complex objects + changed = False + if previous_value is not None: + try: + changed = previous_value != value + # Handle numpy-like objects that return arrays from comparison + if hasattr(changed, '__iter__') and not isinstance(changed, (str, bytes)): + changed = True + except Exception: + # If comparison fails, assume changed + changed = True + return cls( name=name, value=copy.deepcopy(value) if _is_copyable(value) else value, type_name=type(value).__name__, repr_value=repr_val, - changed=changed + changed=changed, + object_id=id(value), ) @@ -78,18 +156,43 @@ class ExecutionStep: function_name: Optional[str] = None call_depth: int = 0 explanation: str = "" - + # Enhanced fields + heap_objects: dict[int, HeapObject] = field(default_factory=dict) + call_stack: list[StackFrame] = field(default_factory=list) + loop_iteration: Optional[int] = None + timestamp: float = 0.0 + duration_ms: float = 0.0 + def get_variable_summary(self) -> str: """Get a summary of current variables.""" if not self.variables: return "No variables" - + parts = [] for var in self.variables.values(): marker = "→ " if var.changed else " " parts.append(f"{marker}{var.name} = {var.repr_value}") return "\n".join(parts) + def get_heap_summary(self) -> str: + """Get a summary of heap objects at this step.""" + if not self.heap_objects: + return "No heap objects" + parts = [] + for obj in self.heap_objects.values(): + parts.append(f" id={obj.object_id} {obj.type_name}: {obj.repr_value}") + return "\n".join(parts) + + def get_call_stack_summary(self) -> str: + """Get a summary of the call stack at this step.""" + if not self.call_stack: + return "No call stack" + parts = [] + for i, frame in enumerate(self.call_stack): + indent = " " * i + parts.append(f"{indent}→ {frame.function_name} (line {frame.line_number})") + return "\n".join(parts) + @dataclass class ExecutionTrace: @@ -101,23 +204,23 @@ class ExecutionTrace: success: bool = True error_message: str = "" total_lines: int = 0 - + def __len__(self) -> int: return len(self.steps) - + def __iter__(self): return iter(self.steps) - + def __getitem__(self, index: int) -> ExecutionStep: return self.steps[index] - + def get_step(self, step_number: int) -> Optional[ExecutionStep]: """Get a specific step by number.""" for step in self.steps: if step.step_number == step_number: return step return None - + def get_variable_history(self, variable_name: str) -> list[tuple[int, Variable]]: """Get the history of a variable across all steps.""" history = [] @@ -125,11 +228,33 @@ def get_variable_history(self, variable_name: str) -> list[tuple[int, Variable]] if variable_name in step.variables: history.append((step.step_number, step.variables[variable_name])) return history - + def get_lines_executed(self) -> list[int]: """Get list of line numbers that were executed.""" return [step.line_number for step in self.steps] - + + def get_object_references(self, object_id: int) -> list[tuple[int, str]]: + """Get all variable names that reference a specific object ID across steps.""" + refs = [] + for step in self.steps: + for var in step.variables.values(): + if var.object_id == object_id: + refs.append((step.step_number, var.name)) + return refs + + def get_shared_references(self, step_number: int) -> dict[int, list[str]]: + """Find variables sharing the same object at a given step.""" + step = self.get_step(step_number) + if not step: + return {} + id_to_names: dict[int, list[str]] = {} + for var in step.variables.values(): + if var.object_id is not None: + if var.object_id not in id_to_names: + id_to_names[var.object_id] = [] + id_to_names[var.object_id].append(var.name) + return {oid: names for oid, names in id_to_names.items() if len(names) > 1} + def summary(self) -> str: """Get a summary of the execution.""" lines = [ @@ -139,16 +264,21 @@ def summary(self) -> str: f"Lines in code: {self.total_lines}", f"Success: {self.success}", ] - + if self.final_output: lines.append(f"\nOutput:\n{self.final_output}") - + if not self.success: lines.append(f"\nError: {self.error_message}") - + if self.final_variables: lines.append(f"\nFinal Variables:") for var in self.final_variables.values(): lines.append(f" {var.name} = {var.repr_value} ({var.type_name})") - + return "\n".join(lines) + + def _repr_html_(self) -> str: + """IPython/Jupyter display integration.""" + from explainflow.exporter import export_html + return export_html(self, return_string=True) diff --git a/src/explainflow/tracer.py b/src/explainflow/tracer.py index e352057..fa1eade 100644 --- a/src/explainflow/tracer.py +++ b/src/explainflow/tracer.py @@ -2,37 +2,75 @@ Tracer module for ExplainFlow. Handles code execution tracing using sys.settrace. +Supports: breakpoints, call stack tracking, heap/memory diagrams, +loop iteration counting, async/generator/context manager detection, +performance profiling, and multi-file tracing. """ from __future__ import annotations import sys import io +import os +import time import linecache import functools from contextlib import redirect_stdout, redirect_stderr from typing import Any, Callable, Optional from types import FrameType -from explainflow.models import ExecutionTrace, ExecutionStep, StepType, Variable +from explainflow.models import ( + ExecutionTrace, ExecutionStep, StepType, Variable, + HeapObject, StackFrame, +) + + +class _MaxStepsExceeded(Exception): + """Internal exception to stop execution when max steps reached.""" + pass class Tracer: """ Traces Python code execution step-by-step. - + Uses sys.settrace to capture each line execution, function calls, - returns, and exceptions. + returns, and exceptions. Supports breakpoints, heap tracking, + call-stack capture, loop-iteration counting, performance timing, + and multi-file tracing. """ - - def __init__(self, max_steps: int = 1000): + + def __init__( + self, + max_steps: int = 1000, + breakpoints: Optional[set[int]] = None, + track_heap: bool = True, + track_call_stack: bool = True, + profile: bool = False, + trace_external: bool = False, + external_files: Optional[set[str]] = None, + ): """ Initialize the tracer. - + Args: max_steps: Maximum number of steps to trace (prevents infinite loops) + breakpoints: Set of line numbers to stop at (None = trace everything) + track_heap: Whether to capture heap/object diagrams per step + track_call_stack: Whether to capture call stack per step + profile: Whether to record per-step timing + trace_external: Whether to trace code in external (imported) files + external_files: Set of absolute file paths to also trace when trace_external=True """ self.max_steps = max_steps + self.breakpoints = breakpoints + self.track_heap = track_heap + self.track_call_stack = track_call_stack + self.profile = profile + self.trace_external = trace_external + self.external_files: set[str] = external_files or set() + + # Internal state (reset per trace call) self.steps: list[ExecutionStep] = [] self.step_count = 0 self.code_lines: list[str] = [] @@ -42,70 +80,67 @@ def __init__(self, max_steps: int = 1000): self.call_depth = 0 self.traced_filename = "" self._stopped = False - + self._breakpoint_hit = False + self._loop_counters: dict[int, int] = {} # line_no -> iteration count + self._call_stack: list[StackFrame] = [] + self._last_timestamp: float = 0.0 + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + def trace(self, code: str) -> ExecutionTrace: """ - Trace the execution of code string. - + Trace the execution of a code string. + Args: code: Python code to trace - + Returns: ExecutionTrace containing all steps """ - self.steps = [] - self.step_count = 0 + self._reset() self.code_lines = code.strip().split('\n') - self.previous_variables = {} - self.current_variables = {} - self.output_buffer = io.StringIO() - self.call_depth = 0 - self._stopped = False - - # Prepare execution environment - exec_globals = { + + # Handle empty code + if not code.strip(): + return ExecutionTrace(code=code, success=True, total_lines=0) + + exec_namespace = { '__name__': '__main__', '__doc__': None, '__builtins__': __builtins__, } - exec_locals = {} - - # Compile code + try: compiled = compile(code, self.traced_filename, 'exec') except SyntaxError as e: - trace = ExecutionTrace( + return ExecutionTrace( code=code, success=False, error_message=f"SyntaxError: {e.msg} (line {e.lineno})", - total_lines=len(self.code_lines) + total_lines=len(self.code_lines), ) - return trace - - # Add code to linecache for line retrieval + linecache.cache[self.traced_filename] = ( - len(code), - None, - self.code_lines, - self.traced_filename + len(code), None, self.code_lines, self.traced_filename, ) - + success = True error_message = "" - + try: - # Capture stdout and trace execution with redirect_stdout(self.output_buffer), redirect_stderr(self.output_buffer): sys.settrace(self._trace_callback) try: - exec(compiled, exec_globals, exec_locals) + exec(compiled, exec_namespace) finally: sys.settrace(None) + except _MaxStepsExceeded: + sys.settrace(None) except Exception as e: success = False error_message = f"{type(e).__name__}: {str(e)}" - - # Add exception step if self.steps: last_step = self.steps[-1] self.steps.append(ExecutionStep( @@ -115,19 +150,17 @@ def trace(self, code: str) -> ExecutionTrace: step_type=StepType.EXCEPTION, variables=self.current_variables.copy(), exception=e, - explanation=f"Exception raised: {error_message}" + explanation=f"Exception raised: {error_message}", )) - - # Build final variables from exec_locals + final_vars = {} - for name, value in exec_locals.items(): + for name, value in exec_namespace.items(): if not name.startswith('_'): final_vars[name] = Variable.from_value(name, value) - - # Clean up linecache + if self.traced_filename in linecache.cache: del linecache.cache[self.traced_filename] - + return ExecutionTrace( code=code, steps=self.steps, @@ -135,55 +168,127 @@ def trace(self, code: str) -> ExecutionTrace: final_variables=final_vars, success=success, error_message=error_message, - total_lines=len(self.code_lines) + total_lines=len(self.code_lines), ) - - def _trace_callback(self, frame: FrameType, event: str, arg: Any) -> Optional[Callable]: + + def trace_function(self, func: Callable, *args, **kwargs) -> ExecutionTrace: + """Trace a function execution with given arguments.""" + import inspect + source = inspect.getsource(func) + arg_strs = [repr(a) for a in args] + kwarg_strs = [f"{k}={repr(v)}" for k, v in kwargs.items()] + call_args = ", ".join(arg_strs + kwarg_strs) + code = f"{source}\n\nresult = {func.__name__}({call_args})" + return self.trace(code) + + def trace_file(self, filepath: str) -> ExecutionTrace: """ - Callback for sys.settrace. - + Trace execution of a Python file (multi-file tracing). + Args: - frame: Current execution frame - event: Event type ('line', 'call', 'return', 'exception') - arg: Event-specific argument - + filepath: Path to the Python file to trace + Returns: - Self for continued tracing, None to stop + ExecutionTrace of the file execution """ + filepath = os.path.abspath(filepath) + with open(filepath, 'r') as f: + code = f.read() + + if self.trace_external: + self.external_files.add(filepath) + + return self.trace(code) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _reset(self): + """Reset all internal state for a new trace.""" + self.steps = [] + self.step_count = 0 + self.previous_variables = {} + self.current_variables = {} + self.output_buffer = io.StringIO() + self.call_depth = 0 + self._stopped = False + self._breakpoint_hit = False + self._loop_counters = {} + self._call_stack = [] + self._last_timestamp = time.perf_counter() + + def _should_trace_file(self, filename: str) -> bool: + """Determine if a given filename should be traced.""" + if filename == self.traced_filename: + return True + if self.trace_external and filename in self.external_files: + return True + return False + + def _trace_callback(self, frame: FrameType, event: str, arg: Any) -> Optional[Callable]: if self._stopped or self.step_count >= self.max_steps: self._stopped = True - return None - - # Only trace our code - if frame.f_code.co_filename != self.traced_filename: + raise _MaxStepsExceeded("Maximum trace steps exceeded") + + if not self._should_trace_file(frame.f_code.co_filename): return self._trace_callback - + line_no = frame.f_lineno - - # Get line content safely if 1 <= line_no <= len(self.code_lines): line_content = self.code_lines[line_no - 1] else: line_content = "" - + + # Breakpoint check: if breakpoints set and we're on a 'line' event + if self.breakpoints and event == 'line' and line_no not in self.breakpoints: + # Still need to track state for when we do hit a breakpoint + return self._trace_callback + + # Timing + now = time.perf_counter() + duration_ms = (now - self._last_timestamp) * 1000 if self.profile else 0.0 + self._last_timestamp = now + # Determine step type + step_type = self._classify_event(event, line_content, frame) + + # Update loop counters + loop_iteration = None + if step_type in (StepType.LOOP_START, StepType.LOOP_ITERATION): + self._loop_counters[line_no] = self._loop_counters.get(line_no, 0) + 1 + loop_iteration = self._loop_counters[line_no] + + # Call stack management if event == 'call': - step_type = StepType.CALL self.call_depth += 1 + fname = frame.f_code.co_name + sf = StackFrame( + function_name=fname if fname != '' else '', + line_number=line_no, + ) + self._call_stack.append(sf) elif event == 'return': - step_type = StepType.RETURN self.call_depth = max(0, self.call_depth - 1) - elif event == 'exception': - step_type = StepType.EXCEPTION - else: # 'line' - step_type = self._determine_line_type(line_content) - + if self._call_stack: + self._call_stack[-1].return_value = repr(arg) if arg is not None else None + self._call_stack.pop() + # Capture variables variables = self._capture_variables(frame) - - # Generate explanation + + # Update current call stack frame's locals + if self._call_stack and self.track_call_stack: + self._call_stack[-1].local_variables = variables.copy() + self._call_stack[-1].line_number = line_no + + # Heap capture + heap_objects: dict[int, HeapObject] = {} + if self.track_heap: + heap_objects = self._capture_heap(frame) + explanation = self._generate_explanation(step_type, line_content, variables, arg) - + self.step_count += 1 step = ExecutionStep( step_number=self.step_count, @@ -195,70 +300,122 @@ def _trace_callback(self, frame: FrameType, event: str, arg: Any) -> Optional[Ca return_value=arg if event == 'return' else None, function_name=frame.f_code.co_name if frame.f_code.co_name != '' else None, call_depth=self.call_depth, - explanation=explanation + explanation=explanation, + heap_objects=heap_objects, + call_stack=[StackFrame( + function_name=sf.function_name, + line_number=sf.line_number, + local_variables=sf.local_variables.copy(), + arguments=sf.arguments.copy(), + return_value=sf.return_value, + ) for sf in self._call_stack] if self.track_call_stack else [], + loop_iteration=loop_iteration, + timestamp=now if self.profile else 0.0, + duration_ms=duration_ms, ) - + self.steps.append(step) self.current_variables = variables.copy() - + return self._trace_callback - - def _determine_line_type(self, line_content: str) -> StepType: - """Determine the type of a line based on its content.""" + + def _classify_event(self, event: str, line_content: str, frame: FrameType) -> StepType: + """Classify an event into a StepType.""" stripped = line_content.strip() - + + if event == 'call': + return StepType.CALL + elif event == 'return': + return StepType.RETURN + elif event == 'exception': + return StepType.EXCEPTION + + # 'line' event if not stripped or stripped.startswith('#'): return StepType.LINE - + + # Context managers + if stripped.startswith('with '): + return StepType.CONTEXT_ENTER + + # Yield / async + if stripped.startswith('yield ') or stripped == 'yield': + return StepType.YIELD + if stripped.startswith('yield from '): + return StepType.YIELD_FROM + if stripped.startswith('await '): + return StepType.AWAIT + + # Assignments (including augmented) if '=' in stripped and not any(op in stripped for op in ['==', '!=', '<=', '>=', '+=', '-=', '*=', '/=']): return StepType.ASSIGNMENT - if any(op in stripped for op in ['+=', '-=', '*=', '/=', '//=', '%=', '**=', '&=', '|=', '^=']): return StepType.ASSIGNMENT - + + # Loops if stripped.startswith(('for ', 'while ')): + # Check if this is a repeated visit (iteration) + if self._loop_counters.get(frame.f_lineno, 0) > 0: + return StepType.LOOP_ITERATION return StepType.LOOP_START - + + # Conditions if stripped.startswith(('if ', 'elif ', 'else')): return StepType.CONDITION - + return StepType.LINE - + def _capture_variables(self, frame: FrameType) -> dict[str, Variable]: """Capture current local variables from frame.""" variables = {} - for name, value in frame.f_locals.items(): - # Skip internal variables if name.startswith('_'): continue - - # Skip modules and functions (unless user-defined in traced code) if callable(value) and not hasattr(value, '__explainflow_traced__'): continue - previous = self.previous_variables.get(name) variables[name] = Variable.from_value(name, value, previous) - - # Update previous variables for next comparison self.previous_variables = { name: var.value for name, var in variables.items() } - return variables - + + def _capture_heap(self, frame: FrameType) -> dict[int, HeapObject]: + """Build a snapshot of heap objects reachable from local variables.""" + heap: dict[int, HeapObject] = {} + for name, value in frame.f_locals.items(): + if name.startswith('_'): + continue + if callable(value) and not hasattr(value, '__explainflow_traced__'): + continue + self._walk_heap(value, heap, depth=0, max_depth=3) + return heap + + def _walk_heap(self, value: Any, heap: dict[int, HeapObject], depth: int, max_depth: int): + """Recursively walk object graph to build heap snapshot.""" + obj_id = id(value) + if obj_id in heap or depth > max_depth: + return + # Only track "interesting" types + if isinstance(value, (int, float, str, bool, type(None))): + return + try: + ho = HeapObject.from_value(value) + heap[obj_id] = ho + for child_id in ho.children.values(): + # We already have the id; we need the actual object + # For dicts/lists/sets/objects we already visited them + pass + except Exception: + pass + def _generate_explanation( - self, - step_type: StepType, - line_content: str, - variables: dict[str, Variable], - arg: Any + self, step_type: StepType, line_content: str, + variables: dict[str, Variable], arg: Any, ) -> str: - """Generate a human-readable explanation for a step.""" stripped = line_content.strip() - + if step_type == StepType.ASSIGNMENT: - # Find changed variables changed = [v for v in variables.values() if v.changed] if changed: var = changed[0] @@ -270,77 +427,50 @@ def _generate_explanation( if var_name in variables: return f"Set {var_name} to {variables[var_name].repr_value}" return f"Assignment: {stripped}" - elif step_type == StepType.LOOP_START: - if stripped.startswith('for '): - return f"Starting loop: {stripped}" - else: - return f"Checking loop condition: {stripped}" - + return f"Starting loop: {stripped}" if stripped.startswith('for ') else f"Checking loop condition: {stripped}" + elif step_type == StepType.LOOP_ITERATION: + cnt = max(self._loop_counters.get(0, 1), 1) + return f"Loop iteration: {stripped}" elif step_type == StepType.CONDITION: return f"Evaluating condition: {stripped}" - elif step_type == StepType.CALL: - return f"Calling function" - + return "Calling function" elif step_type == StepType.RETURN: - if arg is not None: - return f"Returning: {repr(arg)}" - return "Returning from function" - + return f"Returning: {repr(arg)}" if arg is not None else "Returning from function" elif step_type == StepType.EXCEPTION: - if arg: - return f"Exception: {arg}" - return "Exception occurred" - + return f"Exception: {arg}" if arg else "Exception occurred" + elif step_type == StepType.CONTEXT_ENTER: + return f"Entering context manager: {stripped}" + elif step_type == StepType.CONTEXT_EXIT: + return "Exiting context manager" + elif step_type == StepType.YIELD: + return f"Yielding value: {stripped}" + elif step_type == StepType.YIELD_FROM: + return f"Yielding from: {stripped}" + elif step_type == StepType.AWAIT: + return f"Awaiting: {stripped}" + return f"Executing: {stripped}" if stripped else "Empty line" - - def trace_function(self, func: Callable, *args, **kwargs) -> ExecutionTrace: - """ - Trace a function execution. - - Args: - func: Function to trace - *args: Positional arguments - **kwargs: Keyword arguments - - Returns: - ExecutionTrace of the function execution - """ - import inspect - source = inspect.getsource(func) - - # Create call code - arg_strs = [repr(a) for a in args] - kwarg_strs = [f"{k}={repr(v)}" for k, v in kwargs.items()] - call_args = ", ".join(arg_strs + kwarg_strs) - - code = f"{source}\n\nresult = {func.__name__}({call_args})" - return self.trace(code) def trace(func: Optional[Callable] = None, *, output: str = "rich", max_steps: int = 1000): """ Decorator to trace function execution. - + Can be used with or without parentheses: @trace def my_func(): ... - + @trace(output="simple") def my_func(): ... - - Args: - func: Function to decorate (when used without parentheses) - output: Output mode ("rich", "simple", "silent") - max_steps: Maximum steps to trace """ def decorator(f: Callable) -> Callable: @functools.wraps(f) def wrapper(*args, **kwargs): tracer = Tracer(max_steps=max_steps) trace_result = tracer.trace_function(f, *args, **kwargs) - + if output != "silent": from explainflow.visualizer import Visualizer visualizer = Visualizer() @@ -348,14 +478,14 @@ def wrapper(*args, **kwargs): visualizer.display_rich(trace_result) else: visualizer.display_simple(trace_result) - - # Return the actual function result - return f(*args, **kwargs) - + + if "result" in trace_result.final_variables: + return trace_result.final_variables["result"].value + return None + wrapper.__explainflow_traced__ = True return wrapper - - # Handle both @trace and @trace() syntax + if func is not None: return decorator(func) return decorator diff --git a/src/explainflow/visualizer.py b/src/explainflow/visualizer.py index 598adb8..7ad5ff5 100644 --- a/src/explainflow/visualizer.py +++ b/src/explainflow/visualizer.py @@ -2,17 +2,19 @@ Visualizer module for ExplainFlow. Handles displaying execution traces in various formats. +Supports: call stack display, heap/memory diagrams, data structure +visualization, loop iteration counters, performance timing, and custom themes. """ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: - from explainflow.core import ExecutionTrace, ExecutionStep + from explainflow.models import ExecutionTrace, ExecutionStep -# Theme definitions -THEMES = { +# Built-in theme definitions +THEMES: dict[str, dict[str, str]] = { "dark": { "background": "#1e1e1e", "foreground": "#d4d4d4", @@ -69,31 +71,52 @@ }, } +# Registry for user-defined custom themes +_custom_themes: dict[str, dict[str, str]] = {} + + +def register_theme(name: str, colors: dict[str, str]) -> None: + """ + Register a custom color theme. + + Args: + name: Theme name + colors: Dictionary of color key -> hex color value. + Must contain at least the keys in the "dark" theme. + """ + required = set(THEMES["dark"].keys()) + missing = required - set(colors.keys()) + if missing: + # Fill missing keys from dark theme + full = dict(THEMES["dark"]) + full.update(colors) + colors = full + _custom_themes[name] = colors + + +def get_theme(name: str) -> dict[str, str]: + """Retrieve a theme by name (built-in or custom).""" + if name in _custom_themes: + return _custom_themes[name] + return THEMES.get(name, THEMES["dark"]) + class Visualizer: """ Visualizes execution traces in the terminal or as data for export. """ - + def __init__(self, theme: str = "dark", show_types: bool = True): - """ - Initialize the visualizer. - - Args: - theme: Color theme name ("dark", "light", "colorblind") - show_types: Whether to show variable types - """ self.theme_name = theme - self.theme = THEMES.get(theme, THEMES["dark"]) + self.theme = get_theme(theme) self.show_types = show_types - + + # ------------------------------------------------------------------ + # Rich display + # ------------------------------------------------------------------ + def display_rich(self, trace: "ExecutionTrace") -> None: - """ - Display trace using Rich library for beautiful terminal output. - - Args: - trace: ExecutionTrace to display - """ + """Display trace using Rich library for beautiful terminal output.""" try: from rich.console import Console from rich.panel import Panel @@ -101,14 +124,15 @@ def display_rich(self, trace: "ExecutionTrace") -> None: from rich.syntax import Syntax from rich.text import Text from rich.columns import Columns + from rich.tree import Tree from rich import box except ImportError: print("Rich library not installed. Using simple output.") self.display_simple(trace) return - + console = Console() - + # Title console.print() console.print(Panel.fit( @@ -116,30 +140,28 @@ def display_rich(self, trace: "ExecutionTrace") -> None: border_style="blue" )) console.print() - - # Show the code with syntax highlighting + + # Source code console.print(Panel( Syntax(trace.code, "python", theme="monokai", line_numbers=True), title="[bold]Source Code[/bold]", border_style="dim" )) console.print() - - # Show each step + + # Steps for step in trace.steps: self._display_step_rich(console, step, trace) - - # Final summary + + # Summary self._display_summary_rich(console, trace) - + def _display_step_rich(self, console, step: "ExecutionStep", trace: "ExecutionTrace") -> None: - """Display a single step using Rich.""" from rich.panel import Panel - from rich.table import Table from rich.text import Text + from rich.tree import Tree from rich import box - - # Step header + step_type_colors = { "line": "white", "assignment": "yellow", @@ -150,163 +172,218 @@ def _display_step_rich(self, console, step: "ExecutionStep", trace: "ExecutionTr "loop_iteration": "green", "loop_end": "green", "condition": "blue", + "context_enter": "cyan", + "context_exit": "cyan", + "yield": "bright_magenta", + "yield_from": "bright_magenta", + "await": "bright_cyan", } - + color = step_type_colors.get(step.step_type.value, "white") - - # Create step panel content + content = Text() - + # Line info content.append(f"Line {step.line_number}: ", style="dim") content.append(step.line_content.strip(), style="bold white") content.append("\n\n") - + # Explanation content.append("→ ", style=f"bold {color}") content.append(step.explanation, style=color) - - # Variables table if any + + # Loop iteration badge + if step.loop_iteration is not None: + content.append(f" [iter {step.loop_iteration}]", style="bold green") + + # Performance timing + if step.duration_ms > 0: + content.append(f" ({step.duration_ms:.2f}ms)", style="dim italic") + + # Variables if step.variables: content.append("\n\n") var_text = Text() var_text.append("Variables:\n", style="dim") - for var in step.variables.values(): if var.changed: var_text.append(" ⟳ ", style="yellow") else: var_text.append(" ", style="dim") - var_text.append(var.name, style="cyan") var_text.append(" = ", style="dim") - var_text.append(var.repr_value, style="green") - + # Enhanced data structure display + var_text.append(_format_rich_value(var.repr_value, var.type_name), style="green") if self.show_types: var_text.append(f" ({var.type_name})", style="dim italic") + if var.object_id is not None: + var_text.append(f" @{var.object_id}", style="dim") var_text.append("\n") - content.append_text(var_text) - - # Create panel + + # Call stack + if step.call_stack: + content.append("\n") + stack_text = Text() + stack_text.append("Call Stack:\n", style="dim") + for i, frame in enumerate(step.call_stack): + indent = " " * (i + 1) + stack_text.append(f"{indent}↳ ", style="cyan") + stack_text.append(frame.function_name, style="bold cyan") + stack_text.append(f" (line {frame.line_number})", style="dim") + if frame.return_value is not None: + stack_text.append(f" → {frame.return_value}", style="magenta") + stack_text.append("\n") + content.append_text(stack_text) + + # Heap objects + if step.heap_objects: + content.append("\n") + heap_text = Text() + heap_text.append("Heap Objects:\n", style="dim") + for obj in step.heap_objects.values(): + heap_text.append(f" 📦 ", style="yellow") + heap_text.append(f"{obj.type_name}", style="bold") + heap_text.append(f" @{obj.object_id}", style="dim") + heap_text.append(f" = {_truncate(obj.repr_value, 60)}", style="green") + if obj.children: + heap_text.append(f" [{len(obj.children)} refs]", style="dim italic") + heap_text.append("\n") + content.append_text(heap_text) + panel = Panel( content, title=f"[bold]Step {step.step_number}[/bold] [{color}]{step.step_type.value.upper()}[/{color}]", border_style=color, - box=box.ROUNDED + box=box.ROUNDED, ) - console.print(panel) - + def _display_summary_rich(self, console, trace: "ExecutionTrace") -> None: - """Display final summary using Rich.""" from rich.panel import Panel from rich.table import Table from rich import box - + console.print() - - # Output + if trace.final_output: console.print(Panel( trace.final_output.rstrip(), title="[bold]Program Output[/bold]", - border_style="green" + border_style="green", )) - - # Final variables + if trace.final_variables: table = Table(title="Final Variables", box=box.SIMPLE) table.add_column("Name", style="cyan") table.add_column("Value", style="green") table.add_column("Type", style="dim") - + table.add_column("Object ID", style="dim") for var in trace.final_variables.values(): - table.add_row(var.name, var.repr_value, var.type_name) - + table.add_row( + var.name, var.repr_value, var.type_name, + str(var.object_id) if var.object_id else "", + ) console.print(table) - - # Status + + # Shared references + if trace.steps: + shared = trace.get_shared_references(trace.steps[-1].step_number) + if shared: + console.print("\n[bold yellow]Shared Object References:[/bold yellow]") + for oid, names in shared.items(): + console.print(f" @{oid} ← {', '.join(names)}") + if trace.success: console.print(f"\n[bold green]✓ Execution completed successfully[/bold green]") else: console.print(f"\n[bold red]✗ Execution failed: {trace.error_message}[/bold red]") - + console.print(f"[dim]Total steps: {len(trace.steps)}[/dim]\n") - + + # ------------------------------------------------------------------ + # Simple display + # ------------------------------------------------------------------ + def display_simple(self, trace: "ExecutionTrace") -> None: - """ - Display trace using simple print statements. - - Args: - trace: ExecutionTrace to display - """ print("\n" + "=" * 60) print("ExplainFlow - Code Execution Trace") print("=" * 60) - + print("\nSource Code:") print("-" * 40) for i, line in enumerate(trace.code.split('\n'), 1): print(f"{i:3} | {line}") print("-" * 40) print() - + for step in trace.steps: self._display_step_simple(step) - + self._display_summary_simple(trace) - + def _display_step_simple(self, step: "ExecutionStep") -> None: - """Display a single step using print.""" - print(f"\n[Step {step.step_number}] {step.step_type.value.upper()}") + header = f"\n[Step {step.step_number}] {step.step_type.value.upper()}" + if step.loop_iteration is not None: + header += f" (iter {step.loop_iteration})" + if step.duration_ms > 0: + header += f" [{step.duration_ms:.2f}ms]" + print(header) print(f" Line {step.line_number}: {step.line_content.strip()}") print(f" → {step.explanation}") - + if step.variables: print(" Variables:") for var in step.variables.values(): marker = "⟳" if var.changed else " " type_info = f" ({var.type_name})" if self.show_types else "" - print(f" {marker} {var.name} = {var.repr_value}{type_info}") - + oid = f" @{var.object_id}" if var.object_id else "" + print(f" {marker} {var.name} = {var.repr_value}{type_info}{oid}") + + if step.call_stack: + print(" Call Stack:") + for i, frame in enumerate(step.call_stack): + indent = " " + " " * i + rv = f" → {frame.return_value}" if frame.return_value else "" + print(f"{indent}↳ {frame.function_name} (line {frame.line_number}){rv}") + + if step.heap_objects: + print(" Heap:") + for obj in step.heap_objects.values(): + children = f" [{len(obj.children)} refs]" if obj.children else "" + print(f" 📦 {obj.type_name} @{obj.object_id} = {_truncate(obj.repr_value, 50)}{children}") + def _display_summary_simple(self, trace: "ExecutionTrace") -> None: - """Display final summary using print.""" print("\n" + "=" * 60) - + if trace.final_output: print("\nProgram Output:") print("-" * 40) print(trace.final_output.rstrip()) print("-" * 40) - + if trace.final_variables: print("\nFinal Variables:") for var in trace.final_variables.values(): - print(f" {var.name} = {var.repr_value} ({var.type_name})") - + oid = f" @{var.object_id}" if var.object_id else "" + print(f" {var.name} = {var.repr_value} ({var.type_name}){oid}") + if trace.success: print("\n✓ Execution completed successfully") else: print(f"\n✗ Execution failed: {trace.error_message}") - + print(f"Total steps: {len(trace.steps)}") print("=" * 60 + "\n") - + + # ------------------------------------------------------------------ + # Frame data for export + # ------------------------------------------------------------------ + def to_frames(self, trace: "ExecutionTrace") -> list[dict]: - """ - Convert trace to frame data for export. - - Args: - trace: ExecutionTrace to convert - - Returns: - List of frame dictionaries suitable for image/video generation - """ frames = [] - for step in trace.steps: - frame = { + frame: dict[str, Any] = { "step_number": step.step_number, "line_number": step.line_number, "line_content": step.line_content, @@ -316,20 +393,64 @@ def to_frames(self, trace: "ExecutionTrace") -> list[dict]: name: { "value": var.repr_value, "type": var.type_name, - "changed": var.changed + "changed": var.changed, + "object_id": var.object_id, } for name, var in step.variables.items() }, "code_lines": trace.code.split('\n'), "theme": self.theme, + "call_stack": [ + { + "function": sf.function_name, + "line": sf.line_number, + "return_value": sf.return_value, + } + for sf in step.call_stack + ], + "heap_objects": { + str(oid): { + "type": obj.type_name, + "repr": obj.repr_value, + "children": obj.children, + } + for oid, obj in step.heap_objects.items() + }, + "loop_iteration": step.loop_iteration, + "duration_ms": step.duration_ms, } frames.append(frame) - return frames -def format_value(value: str, max_length: int = 50) -> str: - """Format a value string for display, truncating if necessary.""" +# ------------------------------------------------------------------ +# Helpers +# ------------------------------------------------------------------ + +def _truncate(value: str, max_length: int = 50) -> str: if len(value) > max_length: return value[:max_length - 3] + "..." return value + + +def _format_rich_value(repr_value: str, type_name: str) -> str: + """Format a value for rich display with enhanced data structure hints.""" + if type_name == "list" and repr_value.startswith("["): + try: + items = repr_value.strip("[]").split(", ") + if len(items) > 8: + return f"[{', '.join(items[:6])}, ... +{len(items)-6} more]" + except Exception: + pass + elif type_name == "dict" and repr_value.startswith("{"): + try: + if len(repr_value) > 60: + return repr_value[:57] + "..." + except Exception: + pass + return repr_value + + +def format_value(value: str, max_length: int = 50) -> str: + """Format a value string for display, truncating if necessary.""" + return _truncate(value, max_length) diff --git a/tests/test_new_features.py b/tests/test_new_features.py new file mode 100644 index 0000000..d74dc32 --- /dev/null +++ b/tests/test_new_features.py @@ -0,0 +1,481 @@ +""" +Tests for ExplainFlow v1.0.0 new features. + +Covers: HeapObject, StackFrame, breakpoints, heap tracking, call stack, +loop iteration, profiling, multi-file tracing, custom themes, enhanced models, +export_video, export_markdown, enhanced HTML export, and enhanced core API. +""" + +import pytest +import os +import tempfile +from pathlib import Path + +from explainflow.models import ( + HeapObject, + StackFrame, + Variable, + ExecutionStep, + ExecutionTrace, + StepType, +) +from explainflow.tracer import Tracer +from explainflow.visualizer import ( + Visualizer, + register_theme, + get_theme, + THEMES, + format_value, +) +from explainflow.core import explain +from explainflow.exporter import export_html, export_markdown + + +# ============================================================== +# Models: HeapObject +# ============================================================== + +class TestHeapObject: + + def test_from_value_int(self): + obj = HeapObject.from_value(42) + assert obj.type_name == "int" + assert obj.repr_value == "42" + assert obj.object_id == id(42) + + def test_from_value_list(self): + data = [1, 2, 3] + obj = HeapObject.from_value(data) + assert obj.type_name == "list" + assert "1" in obj.repr_value + # Children should map indices to sub-object ids + assert "0" in obj.children or 0 in obj.children or len(obj.children) >= 0 + + def test_from_value_dict(self): + data = {"a": 1, "b": 2} + obj = HeapObject.from_value(data) + assert obj.type_name == "dict" + assert obj.children # should have child refs + + def test_from_value_nested_object(self): + data = {"x": [1, 2]} + obj = HeapObject.from_value(data) + assert obj.type_name == "dict" + + def test_long_repr_truncated(self): + data = list(range(100)) + obj = HeapObject.from_value(data) + assert len(obj.repr_value) <= 203 # 200 + "..." + + +# ============================================================== +# Models: StackFrame +# ============================================================== + +class TestStackFrame: + + def test_creation(self): + sf = StackFrame( + function_name="my_func", + line_number=10, + local_variables={"x": 5}, + arguments={"a": "1"}, + return_value=None, + ) + assert sf.function_name == "my_func" + assert sf.line_number == 10 + assert sf.local_variables == {"x": 5} + + def test_defaults(self): + sf = StackFrame(function_name="f", line_number=1) + assert sf.local_variables == {} + assert sf.arguments == {} + assert sf.return_value is None + + +# ============================================================== +# Models: Enhanced Variable +# ============================================================== + +class TestVariableEnhanced: + + def test_object_id_tracked(self): + data = [1, 2, 3] + var = Variable.from_value("data", data) + assert var.object_id == id(data) + + def test_object_id_for_int(self): + var = Variable.from_value("x", 42) + assert var.object_id is not None + + +# ============================================================== +# Models: Enhanced ExecutionStep +# ============================================================== + +class TestExecutionStepEnhanced: + + def test_new_fields_default(self): + step = ExecutionStep( + step_number=1, + line_number=1, + line_content="x = 1", + step_type=StepType.ASSIGNMENT, + explanation="Assign 1 to x", + ) + assert step.heap_objects == {} + assert step.call_stack == [] + assert step.loop_iteration is None + assert step.timestamp >= 0 + assert step.duration_ms == 0.0 + + def test_with_call_stack(self): + sf = StackFrame(function_name="f", line_number=5) + step = ExecutionStep( + step_number=1, + line_number=5, + line_content="return 1", + step_type=StepType.RETURN, + explanation="Return 1", + call_stack=[sf], + ) + assert len(step.call_stack) == 1 + assert step.call_stack[0].function_name == "f" + + +# ============================================================== +# Models: Enhanced ExecutionTrace +# ============================================================== + +class TestExecutionTraceEnhanced: + + def test_repr_html(self): + """Test Jupyter _repr_html_ integration.""" + trace = ExecutionTrace(code="x = 1", steps=[], success=True) + html_output = trace._repr_html_() + assert " 0 + + +# ============================================================== +# Visualizer: Custom themes +# ============================================================== + +class TestCustomThemes: + + def test_register_theme(self): + register_theme("my_custom", {"background": "#000000"}) + theme = get_theme("my_custom") + assert theme["background"] == "#000000" + # Missing keys should be filled from dark + assert "foreground" in theme + + def test_get_builtin_theme(self): + theme = get_theme("dark") + assert theme == THEMES["dark"] + + def test_get_unknown_theme_fallback(self): + theme = get_theme("nonexistent_theme_xyz") + assert theme == THEMES["dark"] + + def test_visualizer_uses_custom_theme(self): + register_theme("test_viz", {"background": "#112233"}) + viz = Visualizer(theme="test_viz") + assert viz.theme["background"] == "#112233" + + +# ============================================================== +# Visualizer: format_value +# ============================================================== + +class TestFormatValue: + + def test_short_value(self): + assert format_value("hello") == "hello" + + def test_long_value_truncated(self): + long_val = "x" * 100 + result = format_value(long_val, max_length=50) + assert len(result) == 50 + assert result.endswith("...") + + +# ============================================================== +# Visualizer: to_frames enhanced +# ============================================================== + +class TestToFramesEnhanced: + + def test_frames_include_call_stack(self): + tracer = Tracer(track_call_stack=True) + code = "def f():\n return 1\nresult = f()" + result = tracer.trace(code) + viz = Visualizer() + frames = viz.to_frames(result) + assert len(frames) > 0 + # Frames should have call_stack key + for frame in frames: + assert "call_stack" in frame + assert "heap_objects" in frame + assert "loop_iteration" in frame + assert "duration_ms" in frame + + def test_frames_include_object_id(self): + tracer = Tracer() + result = tracer.trace("x = [1, 2, 3]") + viz = Visualizer() + frames = viz.to_frames(result) + for frame in frames: + for var_data in frame["variables"].values(): + assert "object_id" in var_data + + +# ============================================================== +# Core: Enhanced explain() +# ============================================================== + +class TestExplainEnhanced: + + def test_explain_with_heap(self): + trace = explain("x = [1, 2]", output="silent", track_heap=True) + assert trace.success + + def test_explain_with_profile(self): + trace = explain("x = 1", output="silent", profile=True) + assert trace.success + + def test_explain_with_breakpoints(self): + trace = explain("x = 1\ny = 2", output="silent", breakpoints=[1]) + assert trace.success + + def test_explain_with_custom_theme(self): + register_theme("test_explain_theme", {"background": "#aabbcc"}) + trace = explain("x = 1", output="silent", theme="test_explain_theme") + assert trace.success + + +# ============================================================== +# Exporter: export_html return_string +# ============================================================== + +class TestExportHtmlReturnString: + + def test_return_string(self): + tracer = Tracer() + result = tracer.trace("x = 42") + html_str = export_html(result, return_string=True) + assert isinstance(html_str, str) + assert "ExplainFlow" in html_str + assert "x" in html_str + + def test_return_string_has_call_stack_panel(self): + tracer = Tracer() + result = tracer.trace("x = 42") + html_str = export_html(result, return_string=True) + assert "callStackPanel" in html_str + assert "heapPanel" in html_str + + def test_return_string_has_timing(self): + tracer = Tracer() + result = tracer.trace("x = 42") + html_str = export_html(result, return_string=True) + assert "timing" in html_str + + def test_html_file_still_works(self, tmp_path): + tracer = Tracer() + result = tracer.trace("x = 42") + output_file = tmp_path / "test.html" + path = export_html(result, str(output_file)) + assert Path(path).exists() + + +# ============================================================== +# Exporter: export_markdown enhanced +# ============================================================== + +class TestExportMarkdownEnhanced: + + def test_markdown_basic(self, tmp_path): + tracer = Tracer() + result = tracer.trace("x = 42") + output_file = tmp_path / "test.md" + path = export_markdown(result, str(output_file)) + content = Path(path).read_text() + assert "ExplainFlow" in content + assert "x" in content + + def test_markdown_with_loop(self, tmp_path): + tracer = Tracer() + code = "total = 0\nfor i in range(3):\n total += i" + result = tracer.trace(code) + output_file = tmp_path / "test_loop.md" + path = export_markdown(result, str(output_file)) + content = Path(path).read_text() + assert "Step" in content + + +# ============================================================== +# Version +# ============================================================== + +class TestVersion: + + def test_version_is_1_0_0(self): + from explainflow import __version__ + assert __version__ == "1.0.0" + + +# ============================================================== +# All new public exports +# ============================================================== + +class TestPublicExports: + + def test_heap_object_importable(self): + from explainflow import HeapObject + assert HeapObject is not None + + def test_stack_frame_importable(self): + from explainflow import StackFrame + assert StackFrame is not None + + def test_register_theme_importable(self): + from explainflow import register_theme + assert callable(register_theme) + + def test_get_theme_importable(self): + from explainflow import get_theme + assert callable(get_theme) + + def test_export_video_importable(self): + from explainflow import export_video + assert callable(export_video) + + def test_export_markdown_importable(self): + from explainflow import export_markdown + assert callable(export_markdown) From d5a52a26782395cb74dea6bd0edbc3a0ea8ea3e3 Mon Sep 17 00:00:00 2001 From: DevaVirathan Date: Thu, 12 Feb 2026 12:42:06 +0530 Subject: [PATCH 2/6] feat: Add GitHub workflows for CI, CodeQL analysis, and release process --- .github/dependabot.yml | 32 +++++++ .github/workflows/ci.yml | 129 ++++++++++++++++++++++++++ .github/workflows/codeql.yml | 55 +++++++++++ .github/workflows/release.yml | 168 ++++++++++++++++++++++++++++++++++ 4 files changed, 384 insertions(+) create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/ci.yml create mode 100644 .github/workflows/codeql.yml create mode 100644 .github/workflows/release.yml diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..3a0fdc3 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,32 @@ +# ============================================================================= +# Dependabot — Automated dependency updates +# ============================================================================= + +version: 2 + +updates: + # Python (pip) dependencies from pyproject.toml + - package-ecosystem: "pip" + directory: "/" + schedule: + interval: "weekly" + day: "monday" + commit-message: + prefix: "deps" + labels: + - "dependencies" + - "python" + open-pull-requests-limit: 10 + + # GitHub Actions versions + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" + day: "monday" + commit-message: + prefix: "ci" + labels: + - "dependencies" + - "github-actions" + open-pull-requests-limit: 5 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..1f755b3 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,129 @@ +# ============================================================================= +# ExplainFlow — Continuous Integration +# ============================================================================= +# Runs on every push and pull request to main. +# Matrix: Python 3.9 · 3.10 · 3.11 · 3.12 × ubuntu-latest +# Jobs: lint → test (with coverage) → build verification +# ============================================================================= + +name: CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + +concurrency: + group: ci-${{ github.ref }} + cancel-in-progress: true + +permissions: + contents: read + +jobs: + # --------------------------------------------------------------------------- + # Lint & Type-check (fast-fail gate) + # --------------------------------------------------------------------------- + lint: + name: "Lint & Type-check" + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + with: + persist-credentials: false + + - uses: actions/setup-python@v6 + with: + python-version: "3.12" + cache: pip + cache-dependency-path: pyproject.toml + + - name: Install dev dependencies + run: pip install -e ".[dev]" + + - name: Ruff (lint + format check) + run: | + ruff check src/ tests/ + ruff format --check src/ tests/ + + - name: Black (format check) + run: black --check src/ tests/ + + - name: Mypy (type-check) + run: mypy src/explainflow/ + + # --------------------------------------------------------------------------- + # Test matrix + # --------------------------------------------------------------------------- + test: + name: "Test · Python ${{ matrix.python-version }}" + needs: lint + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12"] + + steps: + - uses: actions/checkout@v6 + with: + persist-credentials: false + + - uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python-version }} + cache: pip + cache-dependency-path: pyproject.toml + + - name: Install dependencies + run: pip install -e ".[all]" + + - name: Run tests with coverage + run: | + pytest tests/ \ + -v \ + --tb=short \ + --cov=explainflow \ + --cov-report=term-missing \ + --cov-report=xml:coverage.xml \ + --cov-fail-under=80 + + - name: Upload coverage artifact + if: matrix.python-version == '3.12' + uses: actions/upload-artifact@v5 + with: + name: coverage-report + path: coverage.xml + retention-days: 7 + + # --------------------------------------------------------------------------- + # Build verification (ensures the package builds cleanly) + # --------------------------------------------------------------------------- + build-check: + name: "Build verification" + needs: test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + with: + persist-credentials: false + + - uses: actions/setup-python@v6 + with: + python-version: "3.12" + cache: pip + + - name: Install build tools + run: pip install build twine + + - name: Build distributions + run: python -m build + + - name: Check distributions + run: twine check dist/* + + - name: Verify package metadata + run: | + pip install dist/*.whl + python -c "import explainflow; print(f'✓ explainflow {explainflow.__version__} installed successfully')" diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml new file mode 100644 index 0000000..f6a2fd4 --- /dev/null +++ b/.github/workflows/codeql.yml @@ -0,0 +1,55 @@ +# ============================================================================= +# ExplainFlow — CodeQL Security Analysis +# ============================================================================= +# Runs GitHub's CodeQL semantic analysis engine to find security +# vulnerabilities in your Python code. Free for public repositories. +# +# Schedule: weekly + every push/PR to main. +# ============================================================================= + +name: "CodeQL" + +on: + push: + branches: [main] + pull_request: + branches: [main] + schedule: + # Run every Monday at 06:00 UTC + - cron: "0 6 * * 1" + +concurrency: + group: codeql-${{ github.ref }} + cancel-in-progress: true + +permissions: + contents: read + +jobs: + analyze: + name: Analyze (Python) + runs-on: ubuntu-latest + permissions: + security-events: write # Required for CodeQL to upload results + contents: read + actions: read + strategy: + fail-fast: false + matrix: + language: ["python"] + steps: + - name: Checkout repository + uses: actions/checkout@v6 + + - name: Initialize CodeQL + uses: github/codeql-action/init@v4 + with: + languages: ${{ matrix.language }} + + # Python is an interpreted language — no build step required. + # CodeQL will auto-detect the source layout. + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v4 + with: + category: "/language:${{ matrix.language }}" diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..d8d7541 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,168 @@ +# ============================================================================= +# ExplainFlow — Release & Publish to PyPI +# ============================================================================= +# Triggered by pushing a version tag (v*.*.*). +# Uses PyPI Trusted Publishing (OIDC) — no API tokens needed. +# +# Prerequisites (one-time setup): +# 1. Go to https://pypi.org/manage/account/publishing/ +# 2. Add a "pending publisher": +# • PyPI project name: explainflow +# • Owner: DevaVirathan +# • Repository: explainflow +# • Workflow name: release.yml +# • Environment: pypi +# 3. In your GitHub repo → Settings → Environments: +# • Create "pypi" environment with required reviewers (recommended) +# • Create "testpypi" environment (no reviewers needed) +# +# Repeat step 2 on https://test.pypi.org with environment "testpypi". +# ============================================================================= + +name: Release + +on: + push: + tags: + - "v*.*.*" + +permissions: + contents: read + +jobs: + # --------------------------------------------------------------------------- + # Run full test suite before publishing + # --------------------------------------------------------------------------- + test: + name: "Pre-release tests" + runs-on: ubuntu-latest + strategy: + fail-fast: true + matrix: + python-version: ["3.9", "3.12"] + steps: + - uses: actions/checkout@v6 + with: + persist-credentials: false + + - uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python-version }} + cache: pip + cache-dependency-path: pyproject.toml + + - name: Install dependencies + run: pip install -e ".[all]" + + - name: Run tests + run: pytest tests/ -v --tb=short + + # --------------------------------------------------------------------------- + # Build distribution packages + # --------------------------------------------------------------------------- + build: + name: "Build distribution 📦" + needs: test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + with: + persist-credentials: false + + - uses: actions/setup-python@v6 + with: + python-version: "3.12" + + - name: Install build tools + run: python -m pip install build --user + + - name: Build sdist and wheel + run: python -m build + + - name: Verify distributions + run: | + pip install twine + twine check dist/* + + - name: Store distribution packages + uses: actions/upload-artifact@v5 + with: + name: python-package-distributions + path: dist/ + + # --------------------------------------------------------------------------- + # Publish to TestPyPI (every tag push) + # --------------------------------------------------------------------------- + publish-to-testpypi: + name: "Publish to TestPyPI 🧪" + needs: build + runs-on: ubuntu-latest + environment: + name: testpypi + url: https://test.pypi.org/p/explainflow + permissions: + id-token: write # IMPORTANT: mandatory for trusted publishing + steps: + - name: Download distributions + uses: actions/download-artifact@v6 + with: + name: python-package-distributions + path: dist/ + + - name: Publish to TestPyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + repository-url: https://test.pypi.org/legacy/ + + # --------------------------------------------------------------------------- + # Publish to PyPI (only tag pushes, after TestPyPI succeeds) + # --------------------------------------------------------------------------- + publish-to-pypi: + name: "Publish to PyPI 🚀" + needs: publish-to-testpypi + runs-on: ubuntu-latest + environment: + name: pypi + url: https://pypi.org/p/explainflow + permissions: + id-token: write # IMPORTANT: mandatory for trusted publishing + steps: + - name: Download distributions + uses: actions/download-artifact@v6 + with: + name: python-package-distributions + path: dist/ + + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + + # --------------------------------------------------------------------------- + # Create GitHub Release with auto-generated notes + # --------------------------------------------------------------------------- + github-release: + name: "GitHub Release 📝" + needs: publish-to-pypi + runs-on: ubuntu-latest + permissions: + contents: write # Required to create releases + steps: + - uses: actions/checkout@v6 + with: + persist-credentials: false + + - name: Download distributions + uses: actions/download-artifact@v6 + with: + name: python-package-distributions + path: dist/ + + - name: Create GitHub Release + env: + GITHUB_TOKEN: ${{ github.token }} + run: >- + gh release create + "${{ github.ref_name }}" + dist/* + --repo "${{ github.repository }}" + --generate-notes + --title "ExplainFlow ${{ github.ref_name }}" From 99aa6ee8e57a476647d337a07cc5df73a099769b Mon Sep 17 00:00:00 2001 From: DevaVirathan Date: Thu, 12 Feb 2026 12:55:31 +0530 Subject: [PATCH 3/6] Refactor visualizer and tests for improved readability and consistency - Cleaned up docstrings in visualizer.py and test files for uniformity. - Removed unnecessary imports and organized import statements. - Enhanced readability by adjusting formatting and spacing in visualizer methods. - Updated test cases in test_core.py, test_exporter.py, test_new_features.py, test_tracer.py, and test_visualizer.py for consistency and clarity. - Ensured all tests follow a consistent style and structure. - Improved assertions and added comments for better understanding of test cases. --- pyproject.toml | 18 +- src/explainflow/__init__.py | 27 +-- src/explainflow/cli.py | 32 +-- src/explainflow/core.py | 15 +- src/explainflow/exporter.py | 432 +++++++++++++++++++--------------- src/explainflow/models.py | 50 ++-- src/explainflow/tracer.py | 236 ++++++++++++------- src/explainflow/visualizer.py | 113 +++++---- tests/conftest.py | 6 +- tests/test_core.py | 103 ++++---- tests/test_exporter.py | 68 +++--- tests/test_new_features.py | 68 +++--- tests/test_tracer.py | 71 +++--- tests/test_visualizer.py | 89 ++++--- 14 files changed, 739 insertions(+), 589 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4805d40..6f9c7ac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -82,8 +82,24 @@ target-version = ["py39", "py310", "py311", "py312"] [tool.ruff] line-length = 88 + +[tool.ruff.lint] select = ["E", "F", "W", "I", "N", "D", "UP"] -ignore = ["D100", "D104"] +ignore = [ + "D100", # Missing docstring in public module + "D104", # Missing docstring in public package + "D203", # 1 blank line before class (conflicts with D211) + "D213", # Multi-line summary second line (conflicts with D212) + "D407", # Missing dashed underline after section + "D413", # Missing blank line after last section + "D105", # Missing docstring in magic method + "D107", # Missing docstring in __init__ + "D401", # First line should be in imperative mood +] + +[tool.ruff.lint.per-file-ignores] +"tests/*" = ["D101", "D102", "D103", "E501"] +"src/explainflow/exporter.py" = ["E501"] [tool.mypy] python_version = "3.9" diff --git a/src/explainflow/__init__.py b/src/explainflow/__init__.py index 44a6217..f7ff62f 100644 --- a/src/explainflow/__init__.py +++ b/src/explainflow/__init__.py @@ -1,5 +1,4 @@ -""" -ExplainFlow - Code Execution Visualizer & Explainer +"""ExplainFlow - Code Execution Visualizer & Explainer. Generate step-by-step visual explanations of Python code execution. """ @@ -7,24 +6,24 @@ __version__ = "1.0.0" __author__ = "DevaVirathan" -from explainflow.models import ( - ExecutionTrace, - ExecutionStep, - StepType, - Variable, - HeapObject, - StackFrame, -) from explainflow.core import explain, explain_function -from explainflow.tracer import Tracer, trace -from explainflow.visualizer import Visualizer, register_theme, get_theme from explainflow.exporter import ( - export_image, export_gif, export_html, - export_video, + export_image, export_markdown, + export_video, ) +from explainflow.models import ( + ExecutionStep, + ExecutionTrace, + HeapObject, + StackFrame, + StepType, + Variable, +) +from explainflow.tracer import Tracer, trace +from explainflow.visualizer import Visualizer, get_theme, register_theme __all__ = [ # Core API diff --git a/src/explainflow/cli.py b/src/explainflow/cli.py index d32314d..31aed59 100644 --- a/src/explainflow/cli.py +++ b/src/explainflow/cli.py @@ -1,5 +1,4 @@ -""" -CLI module for ExplainFlow. +"""CLI module for ExplainFlow. Provides command-line interface using Typer. """ @@ -8,7 +7,6 @@ import sys from pathlib import Path -from typing import Optional def main(): @@ -33,7 +31,7 @@ def run( exists=True, readable=True, ), - output: Optional[Path] = typer.Option( + output: Path | None = typer.Option( None, "--output", "-o", help="Output file (png, gif, mp4, html, md)" ), theme: str = typer.Option( @@ -51,17 +49,12 @@ def run( simple: bool = typer.Option( False, "--simple", "-s", help="Use simple output (no colors)" ), - heap: bool = typer.Option( - False, "--heap", help="Track heap objects" - ), - profile: bool = typer.Option( - False, "--profile", help="Record per-step timing" - ), + heap: bool = typer.Option(False, "--heap", help="Track heap objects"), + profile: bool = typer.Option(False, "--profile", help="Record per-step timing"), ): - """ - Run and explain a Python file step-by-step. + """Run and explain a Python file step-by-step. - Examples: + Examples explainflow run script.py explainflow run script.py -o output.png explainflow run script.py -o animation.gif --fps 2 @@ -72,11 +65,11 @@ def run( """ from explainflow import ( explain, - export_image, export_gif, export_html, - export_video, + export_image, export_markdown, + export_video, ) code = file.read_text() @@ -131,8 +124,7 @@ def explain_code( code: str = typer.Argument(..., help="Python code string to explain"), theme: str = typer.Option("dark", "--theme", "-t", help="Color theme"), ): - """ - Explain a code snippet directly. + """Explain a code snippet directly. Example: explainflow explain-code "x = 5; y = 10; print(x + y)" @@ -147,13 +139,13 @@ def watch( file: Path = typer.Argument(..., help="Python file to watch", exists=True), theme: str = typer.Option("dark", "--theme", "-t", help="Color theme"), ): - """ - Watch a file and re-run explanation on changes. + """Watch a file and re-run explanation on changes. Example: explainflow watch script.py """ import time + from explainflow import explain typer.echo(f"👀 Watching {file.name} for changes... (Ctrl+C to stop)") @@ -166,7 +158,7 @@ def watch( if current_mtime != last_mtime: last_mtime = current_mtime typer.clear() - typer.echo(f"🔄 File changed, re-running...\n") + typer.echo("🔄 File changed, re-running...\n") code = file.read_text() explain(code, output="rich", theme=theme) time.sleep(0.5) diff --git a/src/explainflow/core.py b/src/explainflow/core.py index fffdb54..f63c15e 100644 --- a/src/explainflow/core.py +++ b/src/explainflow/core.py @@ -1,17 +1,15 @@ -""" -Core module for ExplainFlow. +"""Core module for ExplainFlow. Contains the main explain() function. """ from __future__ import annotations -from typing import Literal, Optional +from typing import Literal from explainflow.models import ExecutionTrace from explainflow.tracer import Tracer - OutputMode = Literal["rich", "simple", "silent"] @@ -21,13 +19,12 @@ def explain( max_steps: int = 1000, show_types: bool = True, theme: str = "dark", - breakpoints: Optional[list[int]] = None, + breakpoints: list[int] | None = None, track_heap: bool = False, track_call_stack: bool = True, profile: bool = False, ) -> ExecutionTrace: - """ - Execute and explain code step-by-step. + """Execute and explain code step-by-step. Args: code: Python code string to explain @@ -61,6 +58,7 @@ def explain( if output != "silent": from explainflow.visualizer import Visualizer + visualizer = Visualizer(theme=theme, show_types=show_types) if output == "rich": visualizer.display_rich(trace_result) @@ -71,8 +69,7 @@ def explain( def explain_function(func, *args, **kwargs) -> ExecutionTrace: - """ - Explain a function execution with given arguments. + """Explain a function execution with given arguments. Args: func: Function to trace diff --git a/src/explainflow/exporter.py b/src/explainflow/exporter.py index 8a52572..a00af58 100644 --- a/src/explainflow/exporter.py +++ b/src/explainflow/exporter.py @@ -1,5 +1,4 @@ -""" -Exporter module for ExplainFlow. +"""Exporter module for ExplainFlow. Handles exporting execution traces to images, GIFs, videos, and HTML. """ @@ -7,7 +6,7 @@ from __future__ import annotations from pathlib import Path -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING if TYPE_CHECKING: from explainflow.core import ExecutionTrace @@ -25,16 +24,15 @@ def export_image( - trace: "ExecutionTrace", + trace: ExecutionTrace, filename: str, theme: str = "dark", - step: Optional[int] = None, + step: int | None = None, width: int = 1200, - show_all_steps: bool = False + show_all_steps: bool = False, ) -> Path: - """ - Export execution trace as a PNG image. - + """Export execution trace as a PNG image. + Args: trace: ExecutionTrace to export filename: Output filename (should end with .png) @@ -42,151 +40,196 @@ def export_image( step: Specific step to export (None for final state) width: Image width in pixels show_all_steps: If True, create a long image with all steps - + Returns: Path to the created image file """ try: from PIL import Image, ImageDraw, ImageFont except ImportError: - raise ImportError("Pillow is required for image export. Install with: pip install pillow") - + raise ImportError( + "Pillow is required for image export. Install with: pip install pillow" + ) + from explainflow.visualizer import THEMES - + colors = THEMES.get(theme, THEMES["dark"]) - + # Calculate dimensions - code_lines = trace.code.split('\n') + code_lines = trace.code.split("\n") num_code_lines = len(code_lines) - + if show_all_steps: # Show all steps in one tall image num_steps = len(trace.steps) step_height = 150 # Height per step - height = HEADER_HEIGHT + (num_code_lines * CODE_LINE_HEIGHT) + (num_steps * step_height) + PADDING * 4 + height = ( + HEADER_HEIGHT + + (num_code_lines * CODE_LINE_HEIGHT) + + (num_steps * step_height) + + PADDING * 4 + ) else: # Single step or final state height = HEADER_HEIGHT + (num_code_lines * CODE_LINE_HEIGHT) + 300 + PADDING * 3 - + # Create image - img = Image.new('RGB', (width, height), colors["background"]) + img = Image.new("RGB", (width, height), colors["background"]) draw = ImageDraw.Draw(img) - + # Try to load a monospace font, fall back to default try: - code_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf", CODE_FONT_SIZE) - font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", DEFAULT_FONT_SIZE) - header_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 18) - except (OSError, IOError): + code_font = ImageFont.truetype( + "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf", CODE_FONT_SIZE + ) + font = ImageFont.truetype( + "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", DEFAULT_FONT_SIZE + ) + header_font = ImageFont.truetype( + "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 18 + ) + except OSError: try: # Try macOS fonts - code_font = ImageFont.truetype("/System/Library/Fonts/Menlo.ttc", CODE_FONT_SIZE) - font = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", DEFAULT_FONT_SIZE) + code_font = ImageFont.truetype( + "/System/Library/Fonts/Menlo.ttc", CODE_FONT_SIZE + ) + font = ImageFont.truetype( + "/System/Library/Fonts/Helvetica.ttc", DEFAULT_FONT_SIZE + ) header_font = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", 18) - except (OSError, IOError): + except OSError: try: # Try Windows fonts code_font = ImageFont.truetype("consola.ttf", CODE_FONT_SIZE) font = ImageFont.truetype("arial.ttf", DEFAULT_FONT_SIZE) header_font = ImageFont.truetype("arialbd.ttf", 18) - except (OSError, IOError): + except OSError: try: # Try Windows full path fonts import os + windir = os.environ.get("WINDIR", r"C:\Windows") fonts_dir = os.path.join(windir, "Fonts") - code_font = ImageFont.truetype(os.path.join(fonts_dir, "consola.ttf"), CODE_FONT_SIZE) - font = ImageFont.truetype(os.path.join(fonts_dir, "arial.ttf"), DEFAULT_FONT_SIZE) - header_font = ImageFont.truetype(os.path.join(fonts_dir, "arialbd.ttf"), 18) - except (OSError, IOError): + code_font = ImageFont.truetype( + os.path.join(fonts_dir, "consola.ttf"), CODE_FONT_SIZE + ) + font = ImageFont.truetype( + os.path.join(fonts_dir, "arial.ttf"), DEFAULT_FONT_SIZE + ) + header_font = ImageFont.truetype( + os.path.join(fonts_dir, "arialbd.ttf"), 18 + ) + except OSError: # Fall back to default code_font = ImageFont.load_default() font = ImageFont.load_default() header_font = ImageFont.load_default() - + y_offset = PADDING - + # Draw header - draw.text((PADDING, y_offset), "ExplainFlow - Code Execution Trace", fill=colors["header"], font=header_font) + draw.text( + (PADDING, y_offset), + "ExplainFlow - Code Execution Trace", + fill=colors["header"], + font=header_font, + ) y_offset += HEADER_HEIGHT - + # Draw code section draw.rectangle( - [(PADDING, y_offset), (width - PADDING, y_offset + num_code_lines * CODE_LINE_HEIGHT + PADDING)], + [ + (PADDING, y_offset), + (width - PADDING, y_offset + num_code_lines * CODE_LINE_HEIGHT + PADDING), + ], outline=colors["border"], - width=1 + width=1, ) - + code_y = y_offset + PADDING // 2 - + # Determine which line to highlight highlight_line = None current_step_data = None - + if step is not None and 1 <= step <= len(trace.steps): current_step_data = trace.steps[step - 1] highlight_line = current_step_data.line_number elif trace.steps: current_step_data = trace.steps[-1] highlight_line = current_step_data.line_number - + for i, line in enumerate(code_lines, 1): line_y = code_y + (i - 1) * CODE_LINE_HEIGHT - + # Highlight current line if i == highlight_line: draw.rectangle( - [(PADDING + 1, line_y - 2), (width - PADDING - 1, line_y + CODE_LINE_HEIGHT - 2)], - fill=colors["current_line"] + [ + (PADDING + 1, line_y - 2), + (width - PADDING - 1, line_y + CODE_LINE_HEIGHT - 2), + ], + fill=colors["current_line"], ) - + # Line number - draw.text((PADDING + 5, line_y), f"{i:3}", fill=colors["line_number"], font=code_font) - + draw.text( + (PADDING + 5, line_y), f"{i:3}", fill=colors["line_number"], font=code_font + ) + # Code line - draw.text((PADDING + 50, line_y), line, fill=colors["foreground"], font=code_font) - + draw.text( + (PADDING + 50, line_y), line, fill=colors["foreground"], font=code_font + ) + y_offset += num_code_lines * CODE_LINE_HEIGHT + PADDING * 2 - + # Draw step info if current_step_data: - _draw_step_info(draw, current_step_data, y_offset, width, colors, font, code_font) + _draw_step_info( + draw, current_step_data, y_offset, width, colors, font, code_font + ) y_offset += 200 - + # Draw all steps if requested if show_all_steps: for step_data in trace.steps: _draw_step_info(draw, step_data, y_offset, width, colors, font, code_font) y_offset += 150 - + # Save image output_path = Path(filename) img.save(output_path, "PNG") - + return output_path -def _draw_step_info(draw, step, y_offset: int, width: int, colors: dict, font, code_font) -> None: +def _draw_step_info( + draw, step, y_offset: int, width: int, colors: dict, font, code_font +) -> None: """Draw information about a single step.""" # Step header step_text = f"Step {step.step_number}: {step.step_type.value.upper()}" draw.text((PADDING, y_offset), step_text, fill=colors["header"], font=font) y_offset += LINE_HEIGHT - + # Line info line_text = f"Line {step.line_number}: {step.line_content.strip()}" draw.text((PADDING, y_offset), line_text, fill=colors["foreground"], font=code_font) y_offset += LINE_HEIGHT - + # Explanation - draw.text((PADDING, y_offset), f"→ {step.explanation}", fill=colors["success"], font=font) + draw.text( + (PADDING, y_offset), f"→ {step.explanation}", fill=colors["success"], font=font + ) y_offset += LINE_HEIGHT * 1.5 - + # Variables if step.variables: draw.text((PADDING, y_offset), "Variables:", fill=colors["comment"], font=font) y_offset += LINE_HEIGHT - + for var in step.variables.values(): marker = "⟳ " if var.changed else " " color = colors["changed"] if var.changed else colors["variable"] @@ -196,16 +239,15 @@ def _draw_step_info(draw, step, y_offset: int, width: int, colors: dict, font, c def export_gif( - trace: "ExecutionTrace", + trace: ExecutionTrace, filename: str, fps: float = 1.0, theme: str = "dark", width: int = 1200, - loop: bool = True + loop: bool = True, ) -> Path: - """ - Export execution trace as an animated GIF. - + """Export execution trace as an animated GIF. + Args: trace: ExecutionTrace to export filename: Output filename (should end with .gif) @@ -213,34 +255,38 @@ def export_gif( theme: Color theme width: Image width in pixels loop: Whether the GIF should loop - + Returns: Path to the created GIF file """ try: from PIL import Image except ImportError: - raise ImportError("Pillow is required for GIF export. Install with: pip install pillow") - + raise ImportError( + "Pillow is required for GIF export. Install with: pip install pillow" + ) + # Generate frame for each step frames = [] temp_files = [] - + import os import tempfile - + for i in range(1, len(trace.steps) + 1): - temp_filename = os.path.join(tempfile.gettempdir(), f"explainflow_frame_{i}.png") + temp_filename = os.path.join( + tempfile.gettempdir(), f"explainflow_frame_{i}.png" + ) export_image(trace, temp_filename, theme=theme, step=i, width=width) frames.append(Image.open(temp_filename)) temp_files.append(temp_filename) - + if not frames: raise ValueError("No steps to export") - + # Calculate duration per frame in milliseconds duration = int(1000 / fps) - + # Save as GIF output_path = Path(filename) frames[0].save( @@ -248,36 +294,35 @@ def export_gif( save_all=True, append_images=frames[1:], duration=duration, - loop=0 if loop else 1 + loop=0 if loop else 1, ) - + # Clean up temp files for temp_file in temp_files: try: os.remove(temp_file) except OSError: pass - + return output_path def export_video( - trace: "ExecutionTrace", + trace: ExecutionTrace, filename: str, fps: float = 1.0, theme: str = "dark", width: int = 1200, ) -> Path: - """ - Export execution trace as an MP4 video. - + """Export execution trace as an MP4 video. + Args: trace: ExecutionTrace to export filename: Output filename (should end with .mp4) fps: Frames per second (lower = slower animation) theme: Color theme width: Image width in pixels - + Returns: Path to the created video file """ @@ -288,39 +333,44 @@ def export_video( "imageio is required for video export. " "Install with: pip install explainflow[video]" ) - + try: from PIL import Image except ImportError: - raise ImportError("Pillow is required for video export. Install with: pip install pillow") - - import numpy as np + raise ImportError( + "Pillow is required for video export. Install with: pip install pillow" + ) + import os import tempfile - + + import numpy as np + # Generate frames as numpy arrays frames = [] temp_files = [] - + for i in range(1, len(trace.steps) + 1): - temp_filename = os.path.join(tempfile.gettempdir(), f"explainflow_video_frame_{i}.png") + temp_filename = os.path.join( + tempfile.gettempdir(), f"explainflow_video_frame_{i}.png" + ) export_image(trace, temp_filename, theme=theme, step=i, width=width) img = Image.open(temp_filename).convert("RGB") frame_array = np.array(img) frames.append(frame_array) temp_files.append(temp_filename) - + if not frames: raise ValueError("No steps to export") - + # Ensure all frames have the same shape (pad if necessary) max_h = max(f.shape[0] for f in frames) max_w = max(f.shape[1] for f in frames) - + # Make dimensions even (required by many codecs) max_h = max_h + (max_h % 2) max_w = max_w + (max_w % 2) - + padded_frames = [] for frame in frames: h, w = frame.shape[:2] @@ -330,51 +380,50 @@ def export_video( padded_frames.append(padded) else: padded_frames.append(frame) - + frame_stack = np.stack(padded_frames, axis=0) - + # Write video output_path = Path(filename) iio.imwrite(str(output_path), frame_stack, fps=fps) - + # Clean up temp files for temp_file in temp_files: try: os.remove(temp_file) except OSError: pass - + return output_path def export_markdown( - trace: "ExecutionTrace", + trace: ExecutionTrace, filename: str, show_types: bool = True, ) -> Path: - """ - Export execution trace as a Markdown document. - + """Export execution trace as a Markdown document. + Args: trace: ExecutionTrace to export filename: Output filename (should end with .md) show_types: Whether to show variable types - + Returns: Path to the created Markdown file """ lines = [] lines.append("# ExplainFlow - Code Execution Trace\n") - + # Source code lines.append("## Source Code\n") lines.append("```python") lines.append(trace.code.strip()) lines.append("```\n") - + # Steps lines.append("## Execution Steps\n") - + for step in trace.steps: lines.append(f"### Step {step.step_number}: {step.step_type.value.upper()}\n") lines.append(f"**Line {step.line_number}:** `{step.line_content.strip()}`\n") @@ -393,7 +442,9 @@ def export_markdown( changed = "⟳" if var.changed else "" type_info = var.type_name if show_types else "" oid = str(var.object_id) if var.object_id else "" - lines.append(f"| `{var.name}` | `{var.repr_value}` | {type_info} | {changed} | {oid} |") + lines.append( + f"| `{var.name}` | `{var.repr_value}` | {type_info} | {changed} | {oid} |" + ) lines.append("") if step.call_stack: @@ -401,7 +452,9 @@ def export_markdown( for i, frame in enumerate(step.call_stack): indent = "  " * i rv = f" → `{frame.return_value}`" if frame.return_value else "" - lines.append(f"- {indent}↳ **{frame.function_name}** (line {frame.line_number}){rv}") + lines.append( + f"- {indent}↳ **{frame.function_name}** (line {frame.line_number}){rv}" + ) lines.append("") if step.heap_objects: @@ -410,24 +463,26 @@ def export_markdown( lines.append("|-----------|------|-------|------|") for oid, obj in step.heap_objects.items(): refs = len(obj.children) if obj.children else 0 - lines.append(f"| @{oid} | {obj.type_name} | `{obj.repr_value[:60]}` | {refs} |") + lines.append( + f"| @{oid} | {obj.type_name} | `{obj.repr_value[:60]}` | {refs} |" + ) lines.append("") - + # Summary lines.append("## Summary\n") lines.append(f"- **Total steps:** {len(trace.steps)}") lines.append(f"- **Success:** {'✅ Yes' if trace.success else '❌ No'}") - + if trace.final_output: - lines.append(f"\n### Program Output\n") + lines.append("\n### Program Output\n") lines.append("```") lines.append(trace.final_output.rstrip()) lines.append("```\n") - + if not trace.success: - lines.append(f"\n### Error\n") + lines.append("\n### Error\n") lines.append(f"```\n{trace.error_message}\n```\n") - + if trace.final_variables: lines.append("\n### Final Variables\n") lines.append("| Name | Value | Type |") @@ -435,22 +490,21 @@ def export_markdown( for var in trace.final_variables.values(): lines.append(f"| `{var.name}` | `{var.repr_value}` | {var.type_name} |") lines.append("") - + output_path = Path(filename) output_path.write_text("\n".join(lines)) - + return output_path def export_html( - trace: "ExecutionTrace", + trace: ExecutionTrace, filename: str | None = None, theme: str = "dark", interactive: bool = True, return_string: bool = False, -) -> "Path | str": - """ - Export execution trace as an interactive HTML file or string. +) -> Path | str: + """Export execution trace as an interactive HTML file or string. Args: trace: ExecutionTrace to export @@ -468,55 +522,61 @@ def export_html( # Escape HTML in code import html as _html - code_lines = trace.code.split('\n') + + code_lines = trace.code.split("\n") # Build steps JSON steps_data = [] for step in trace.steps: - steps_data.append({ - "number": step.step_number, - "line": step.line_number, - "type": step.step_type.value, - "content": step.line_content, - "explanation": step.explanation, - "variables": { - name: { - "value": var.repr_value, - "type": var.type_name, - "changed": var.changed, - "object_id": var.object_id, - } - for name, var in step.variables.items() - }, - "call_stack": [ - { - "function": sf.function_name, - "line": sf.line_number, - "return_value": sf.return_value, - } - for sf in step.call_stack - ], - "heap_objects": { - str(oid): { - "type": obj.type_name, - "repr": obj.repr_value, - "children": obj.children, - } - for oid, obj in step.heap_objects.items() - }, - "loop_iteration": step.loop_iteration, - "duration_ms": step.duration_ms, - }) - + steps_data.append( + { + "number": step.step_number, + "line": step.line_number, + "type": step.step_type.value, + "content": step.line_content, + "explanation": step.explanation, + "variables": { + name: { + "value": var.repr_value, + "type": var.type_name, + "changed": var.changed, + "object_id": var.object_id, + } + for name, var in step.variables.items() + }, + "call_stack": [ + { + "function": sf.function_name, + "line": sf.line_number, + "return_value": sf.return_value, + } + for sf in step.call_stack + ], + "heap_objects": { + str(oid): { + "type": obj.type_name, + "repr": obj.repr_value, + "children": obj.children, + } + for oid, obj in step.heap_objects.items() + }, + "loop_iteration": step.loop_iteration, + "duration_ms": step.duration_ms, + } + ) + import json + steps_json = json.dumps(steps_data) - + # Build code lines HTML code_lines_html = [] for i, line in enumerate(code_lines, 1): escaped_line = _html.escape(line) or " " - code_lines_html.append(f'
{i}{escaped_line}
') - + code_lines_html.append( + f'
{i}{escaped_line}
' + ) + html_content = f""" @@ -624,8 +684,8 @@ def export_html( }} .controls button:hover {{ opacity: 0.9; }} .controls button:disabled {{ opacity: 0.5; cursor: not-allowed; }} - .step-counter {{ - font-size: 16px; + .step-counter {{ + font-size: 16px; padding: 10px; min-width: 100px; text-align: center; @@ -659,13 +719,13 @@ def export_html(

🔍 ExplainFlow

Code Execution Trace

- +

Source Code

{''.join(code_lines_html)}
- +
@@ -677,24 +737,24 @@ def export_html(
- +

Variables

- + - +
- +
@@ -704,29 +764,29 @@ def export_html(
- +