From 64bb94b0eff274dbb4e1b7c7e5d461aa56870b4b Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 15 Jan 2026 16:33:14 +0000 Subject: [PATCH] Add Streamlit trajectory visualization tool Created an interactive web-based trajectory viewer for Android World: - Load and visualize .pkl.gz trajectory files - Display episode metadata and step-by-step execution details - View screenshots with UI element bounding boxes - Inspect actions, UI elements, and LLM interactions - Search and filter UI elements - Support for multiple agent types (M3A, T3A, RandomAgent) Files added: - trajectory_viewer.py: Main Streamlit application - trajectory_viewer_requirements.txt: Dependencies - TRAJECTORY_VIEWER_README.md: Comprehensive usage documentation --- TRAJECTORY_VIEWER_README.md | 231 ++++++++++++++ trajectory_viewer.py | 486 +++++++++++++++++++++++++++++ trajectory_viewer_requirements.txt | 3 + 3 files changed, 720 insertions(+) create mode 100644 TRAJECTORY_VIEWER_README.md create mode 100644 trajectory_viewer.py create mode 100644 trajectory_viewer_requirements.txt diff --git a/TRAJECTORY_VIEWER_README.md b/TRAJECTORY_VIEWER_README.md new file mode 100644 index 00000000..0dfec26a --- /dev/null +++ b/TRAJECTORY_VIEWER_README.md @@ -0,0 +1,231 @@ +# Android World 轨迹查看器使用说明 + +## 简介 + +这是一个基于 Streamlit 的交互式可视化工具,用于查看和分析 Android World 任务执行轨迹。 + +## 功能特性 + +✨ **主要功能**: +- 📂 支持加载 `.pkl.gz` 格式的轨迹文件 +- 📊 显示 Episode 元数据(任务目标、成功率、运行时间等) +- 🔍 逐步浏览任务执行的每一步 +- 📸 可视化截图和 UI 元素边界框 +- 🎯 查看每一步的动作详情和执行理由 +- 🗂️ 浏览和搜索 UI 元素列表 +- 💬 查看 LLM 的提示词和响应内容 + +## 安装依赖 + +```bash +pip install -r trajectory_viewer_requirements.txt +``` + +或者手动安装: + +```bash +pip install streamlit numpy Pillow +``` + +## 运行方法 + +在项目根目录下运行: + +```bash +streamlit run trajectory_viewer.py +``` + +程序将自动在浏览器中打开(默认地址:http://localhost:8501) + +## 使用步骤 + +### 1. 加载轨迹文件 + +有两种方式加载轨迹文件: + +**方式一:上传文件** +- 在左侧边栏选择"上传文件" +- 点击文件上传按钮,选择 `.pkl.gz` 文件 + +**方式二:本地路径** +- 在左侧边栏选择"本地路径" +- 输入完整的文件路径,例如:`/path/to/your/trajectory.pkl.gz` + +### 2. 选择 Episode + +如果轨迹文件包含多个 episode,在左侧边栏的下拉菜单中选择要查看的 episode。 + +### 3. 查看 Episode 元数据 + +页面顶部显示当前 episode 的元数据: +- **任务模板**:任务类型 +- **实例 ID**:任务实例标识 +- **成功率**:任务完成的成功程度(0-100%) +- **步骤数**:总共执行了多少步 +- **运行时间**:任务执行耗时 +- **代理名称**:执行任务的 Agent 名称 +- **任务目标**:任务的具体目标描述 + +### 4. 浏览步骤详情 + +使用以下方式在步骤之间导航: +- **上一步/下一步按钮**:逐步浏览 +- **滑块**:快速跳转到特定步骤 + +### 5. 查看步骤内容 + +每个步骤有四个标签页: + +#### 📸 截图标签页 +- 显示当前步骤的屏幕截图 +- 支持多种截图类型(原始截图、带标注的截图等) +- 可选择显示 UI 元素的边界框,每个元素会用绿色框标注,并显示索引编号 +- 点击元素时,可以在"UI 元素"标签页查看详细信息 + +#### 🎯 动作标签页 +- 显示 Agent 执行的动作类型(点击、滑动、输入文本等) +- 显示动作参数(坐标、目标元素索引、输入文本等) +- 显示 Agent 选择该动作的理由 +- 显示步骤执行的总结 + +#### 🗂️ UI 元素标签页 +- 列出当前屏幕上的所有 UI 元素 +- 支持搜索功能,可按文本、描述、ID 搜索 +- 点击元素可查看详细属性: + - 文本内容 + - 内容描述 + - 类名和资源 ID + - 交互属性(可点击、可编辑、可滚动等) + - 完整的原始数据 + +#### 💬 LLM 交互标签页 +- 显示发送给 LLM 的提示词 +- 显示 LLM 的原始响应 +- 包括动作选择和步骤总结两个阶段的交互内容 + +## 数据格式说明 + +### Episode 数据结构 + +```python +{ + 'task_template': str, # 任务模板名称 + 'instance_id': str, # 实例 ID + 'goal': str, # 任务目标 + 'is_successful': float, # 成功率 (0.0-1.0) + 'episode_length': int, # 步骤数 + 'run_time': float, # 运行时间(秒) + 'agent_name': str, # Agent 名称 + 'seed': int, # 随机种子 + 'finish_dtime': datetime, # 完成时间 + 'episode_data': dict, # 步骤数据(字典-列表格式) +} +``` + +### Step 数据结构 + +每个步骤可能包含以下字段(取决于 Agent 类型): + +```python +{ + # 观察数据 + 'raw_screenshot': np.ndarray, # 原始截图 + 'before_screenshot': np.ndarray, # 执行前截图 + 'after_screenshot': np.ndarray, # 执行后截图 + 'before_screenshot_with_som': np.ndarray, # 带标注的截图 + 'ui_elements': list[dict], # UI 元素列表 + + # 动作数据 + 'action_prompt': str, # 动作选择提示词 + 'action_output': str, # 动作输出 + 'action_output_json': dict, # 动作 JSON + 'action_reason': str, # 动作理由 + 'action_raw_response': str, # LLM 原始响应 + + # 总结数据 + 'summary_prompt': str, # 总结提示词 + 'summary': str, # 步骤总结 + 'summary_raw_response': str, # 总结原始响应 +} +``` + +### UI 元素数据结构 + +```python +{ + # 文本信息 + 'text': str, # 显示文本 + 'content_description': str, # 内容描述 + 'hint_text': str, # 提示文本 + + # 位置信息 + 'bbox': dict, # 归一化边界框 (0-1) + 'bbox_pixels': dict, # 像素边界框 + + # 交互属性 + 'is_clickable': bool, # 可点击 + 'is_editable': bool, # 可编辑 + 'is_scrollable': bool, # 可滚动 + 'is_checkable': bool, # 可勾选 + + # 标识信息 + 'class_name': str, # 类名 + 'resource_id': str, # 资源 ID + 'package_name': str, # 包名 +} +``` + +## 常见问题 + +### Q: 支持哪些文件格式? +A: 目前支持 Android World 标准的 `.pkl.gz` 格式(Gzip 压缩的 Pickle 文件)。 + +### Q: 为什么有些步骤没有截图? +A: 不同类型的 Agent 记录的数据不同。例如,RandomAgent 可能只记录基本信息,而 M3A 和 T3A Agent 会记录更详细的截图和交互数据。 + +### Q: UI 元素边界框显示不正确怎么办? +A: 确保轨迹数据中包含 `bbox` 或 `bbox_pixels` 字段。如果使用归一化坐标,程序会自动转换为像素坐标。 + +### Q: 如何快速找到特定的 UI 元素? +A: 在"UI 元素"标签页使用搜索功能,输入元素的文本、描述或资源 ID 进行过滤。 + +### Q: 程序运行缓慢怎么办? +A: 对于包含大量步骤或高分辨率截图的轨迹,加载可能较慢。建议: + - 关闭不需要的边界框显示 + - 避免同时展开多个扩展面板 + - 考虑分批处理大型轨迹文件 + +## 技术栈 + +- **Streamlit**: 交互式 Web 应用框架 +- **NumPy**: 数组处理 +- **Pillow (PIL)**: 图像处理和标注 + +## 扩展和定制 + +### 自定义 UI 元素渲染 + +修改 `render_ui_element()` 函数以添加或修改显示的元素属性。 + +### 添加新的可视化功能 + +在相应的标签页添加新的可视化组件,例如: +- 动作轨迹图 +- 成功率统计 +- 时间线视图 +- 比较多个 episode + +### 导出功能 + +可以添加导出功能,例如: +- 导出特定步骤的截图 +- 导出 LLM 交互记录 +- 生成 PDF 报告 + +## 贡献 + +欢迎提出问题和改进建议! + +## 许可证 + +与 Android World 项目保持一致。 diff --git a/trajectory_viewer.py b/trajectory_viewer.py new file mode 100644 index 00000000..4615215b --- /dev/null +++ b/trajectory_viewer.py @@ -0,0 +1,486 @@ +""" +Android World 轨迹可视化工具 + +使用 Streamlit 创建的交互式轨迹查看器,用于可视化和分析 Android World 任务执行轨迹。 + +运行方式: + streamlit run trajectory_viewer.py +""" + +import gzip +import io +import pickle +from pathlib import Path +from typing import Any, Dict, List, Optional + +import numpy as np +import streamlit as st +from PIL import Image, ImageDraw, ImageFont + + +# ============================================================================ +# 数据加载函数 +# ============================================================================ + +def load_trajectory_file(file_path: str) -> List[Dict[str, Any]]: + """加载 .pkl.gz 格式的轨迹文件""" + try: + with gzip.open(file_path, 'rb') as f: + data = pickle.load(f) + return data if isinstance(data, list) else [data] + except Exception as e: + st.error(f"加载文件失败: {e}") + return [] + + +def load_trajectory_from_bytes(file_bytes: bytes) -> List[Dict[str, Any]]: + """从上传的文件字节加载轨迹数据""" + try: + with gzip.GzipFile(fileobj=io.BytesIO(file_bytes)) as f: + data = pickle.load(f) + return data if isinstance(data, list) else [data] + except Exception as e: + st.error(f"解析文件失败: {e}") + return [] + + +def transpose_dol_to_lod(data: Dict[str, List[Any]]) -> List[Dict[str, Any]]: + """将字典-列表格式转换为列表-字典格式""" + if not data: + return [] + return [dict(zip(data.keys(), values)) for values in zip(*data.values())] + + +# ============================================================================ +# UI 元素可视化 +# ============================================================================ + +def draw_bbox_on_image( + image: np.ndarray, + ui_elements: List[Dict[str, Any]], + selected_index: Optional[int] = None +) -> Image.Image: + """在截图上绘制 UI 元素的边界框""" + if image is None: + return None + + # 转换为 PIL Image + if isinstance(image, np.ndarray): + pil_image = Image.fromarray(image.astype('uint8'), 'RGB') + else: + pil_image = image + + draw = ImageDraw.Draw(pil_image) + + # 尝试加载字体,如果失败则使用默认字体 + try: + font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 12) + except: + font = ImageFont.load_default() + + for idx, elem in enumerate(ui_elements): + # 获取边界框(优先使用像素坐标) + bbox = elem.get('bbox_pixels') or elem.get('bbox') + if not bbox: + continue + + # 提取坐标 + if hasattr(bbox, 'x_min'): + x_min, y_min = bbox.x_min, bbox.y_min + x_max, y_max = bbox.x_max, bbox.y_max + elif isinstance(bbox, dict): + x_min = bbox.get('x_min', 0) + y_min = bbox.get('y_min', 0) + x_max = bbox.get('x_max', 0) + y_max = bbox.get('y_max', 0) + else: + continue + + # 如果是归一化坐标,转换为像素坐标 + if x_max <= 1.0 and y_max <= 1.0: + width, height = pil_image.size + x_min, x_max = int(x_min * width), int(x_max * width) + y_min, y_max = int(y_min * height), int(y_max * height) + + # 选择颜色(选中的元素用红色,其他用绿色) + color = 'red' if idx == selected_index else 'green' + width = 3 if idx == selected_index else 1 + + # 绘制边界框 + draw.rectangle([x_min, y_min, x_max, y_max], outline=color, width=width) + + # 绘制索引标签 + label = f"{idx}" + text_bbox = draw.textbbox((x_min, y_min - 15), label, font=font) + draw.rectangle(text_bbox, fill=color) + draw.text((x_min, y_min - 15), label, fill='white', font=font) + + return pil_image + + +def render_ui_element(elem: Dict[str, Any], index: int) -> str: + """渲染单个 UI 元素的详细信息""" + text = elem.get('text', '') + content_desc = elem.get('content_description', '') + class_name = elem.get('class_name', '') + resource_id = elem.get('resource_id', '') + + # 构建显示文本 + parts = [] + if text: + parts.append(f"📝 Text: `{text}`") + if content_desc: + parts.append(f"📄 Desc: `{content_desc}`") + if class_name: + parts.append(f"🏷️ Class: `{class_name}`") + if resource_id: + parts.append(f"🆔 ID: `{resource_id}`") + + # 添加交互属性 + attrs = [] + if elem.get('is_clickable'): + attrs.append('✅ Clickable') + if elem.get('is_editable'): + attrs.append('✏️ Editable') + if elem.get('is_scrollable'): + attrs.append('📜 Scrollable') + if elem.get('is_checkable'): + attrs.append('☑️ Checkable') + + if attrs: + parts.append(' | '.join(attrs)) + + return '\n'.join(parts) if parts else f"Element {index}" + + +# ============================================================================ +# 主界面 +# ============================================================================ + +def main(): + st.set_page_config( + page_title="Android World 轨迹查看器", + page_icon="📱", + layout="wide" + ) + + st.title("📱 Android World 轨迹查看器") + st.markdown("---") + + # 侧边栏:文件加载 + with st.sidebar: + st.header("📂 加载轨迹文件") + + # 选择加载方式 + load_method = st.radio( + "选择加载方式:", + ["上传文件", "本地路径"] + ) + + episodes = [] + + if load_method == "上传文件": + uploaded_file = st.file_uploader( + "上传 .pkl.gz 文件", + type=['gz', 'pkl'], + help="选择 Android World 保存的轨迹文件" + ) + if uploaded_file: + episodes = load_trajectory_from_bytes(uploaded_file.read()) + else: + file_path = st.text_input( + "输入文件路径:", + placeholder="/path/to/trajectory.pkl.gz" + ) + if file_path and Path(file_path).exists(): + episodes = load_trajectory_file(file_path) + elif file_path: + st.error("文件不存在") + + if not episodes: + st.info("👆 请加载一个轨迹文件") + st.stop() + + st.success(f"✅ 成功加载 {len(episodes)} 个 episode(s)") + + # Episode 选择 + st.markdown("---") + st.header("📊 选择 Episode") + episode_idx = st.selectbox( + "Episode 编号:", + range(len(episodes)), + format_func=lambda x: f"Episode {x}" + ) + + # 获取当前 episode + episode = episodes[episode_idx] + + # 显示 Episode 元数据 + st.header(f"📋 Episode {episode_idx} 元数据") + + col1, col2, col3, col4 = st.columns(4) + + with col1: + st.metric("任务模板", episode.get('task_template', 'N/A')) + st.metric("实例 ID", episode.get('instance_id', 'N/A')) + + with col2: + is_successful = episode.get('is_successful', 0) + success_color = "🟢" if is_successful > 0.5 else "🔴" + st.metric(f"{success_color} 成功率", f"{is_successful:.2%}") + st.metric("步骤数", episode.get('episode_length', 0)) + + with col3: + run_time = episode.get('run_time', 0) + st.metric("运行时间", f"{run_time:.2f}s") + st.metric("代理名称", episode.get('agent_name', 'N/A')) + + with col4: + st.metric("随机种子", episode.get('seed', 'N/A')) + finish_time = episode.get('finish_dtime', 'N/A') + if finish_time != 'N/A': + finish_time = str(finish_time)[:19] # 截断时间戳 + st.metric("完成时间", finish_time) + + # 显示任务目标 + goal = episode.get('goal', 'N/A') + st.info(f"🎯 **任务目标**: {goal}") + + # 异常信息(如果有) + exception_info = episode.get('exception_info') + if exception_info: + st.error(f"⚠️ **异常信息**: {exception_info}") + + st.markdown("---") + + # 获取步骤数据 + episode_data = episode.get('episode_data', {}) + if not episode_data: + st.warning("此 episode 没有步骤数据") + st.stop() + + # 转换为列表-字典格式 + steps = transpose_dol_to_lod(episode_data) + total_steps = len(steps) + + if total_steps == 0: + st.warning("此 episode 没有步骤数据") + st.stop() + + # 步骤导航 + st.header(f"🔍 步骤详情 (共 {total_steps} 步)") + + col1, col2, col3 = st.columns([1, 3, 1]) + + with col1: + if st.button("⬅️ 上一步", disabled=st.session_state.get('step_idx', 0) == 0): + st.session_state.step_idx = max(0, st.session_state.get('step_idx', 0) - 1) + + with col2: + step_idx = st.slider( + "选择步骤:", + 0, total_steps - 1, + st.session_state.get('step_idx', 0), + key='step_slider' + ) + st.session_state.step_idx = step_idx + + with col3: + if st.button("下一步 ➡️", disabled=st.session_state.get('step_idx', 0) >= total_steps - 1): + st.session_state.step_idx = min(total_steps - 1, st.session_state.get('step_idx', 0) + 1) + + # 获取当前步骤 + current_step = steps[step_idx] + + # 创建标签页 + tab1, tab2, tab3, tab4 = st.tabs(["📸 截图", "🎯 动作", "🗂️ UI 元素", "💬 LLM 交互"]) + + # Tab 1: 截图 + with tab1: + st.subheader("截图") + + # 尝试多个可能的截图字段 + screenshot_keys = [ + 'raw_screenshot', + 'before_screenshot_with_som', + 'after_screenshot_with_som', + 'before_screenshot', + 'after_screenshot' + ] + + screenshots_found = {} + for key in screenshot_keys: + if key in current_step and current_step[key] is not None: + screenshots_found[key] = current_step[key] + + if screenshots_found: + # 创建列显示多个截图 + cols = st.columns(len(screenshots_found)) + for idx, (key, screenshot) in enumerate(screenshots_found.items()): + with cols[idx]: + st.markdown(f"**{key.replace('_', ' ').title()}**") + + # 获取对应的 UI 元素 + ui_elements = None + if 'before' in key: + ui_elements = current_step.get('before_ui_elements') or current_step.get('ui_elements', []) + elif 'after' in key: + ui_elements = current_step.get('after_ui_elements', []) + else: + ui_elements = current_step.get('ui_elements', []) + + # 绘制边界框 + if ui_elements and st.checkbox(f"显示 UI 边界框 ({key})", key=f"bbox_{idx}"): + annotated_img = draw_bbox_on_image(screenshot, ui_elements) + st.image(annotated_img, use_container_width=True) + else: + st.image(screenshot, use_container_width=True) + else: + st.info("此步骤没有截图数据") + + # Tab 2: 动作 + with tab2: + st.subheader("执行的动作") + + # 检查动作输出 + action_output = current_step.get('action_output_json') or current_step.get('action_output') + + if action_output: + # 解析动作 + if isinstance(action_output, dict): + action_data = action_output + elif isinstance(action_output, str): + try: + import json + action_data = json.loads(action_output) + except: + action_data = {'raw': action_output} + else: + action_data = {'raw': str(action_output)} + + # 显示动作类型 + action_type = action_data.get('action_type', 'Unknown') + st.markdown(f"### 🎯 动作类型: `{action_type}`") + + # 显示动作参数 + col1, col2 = st.columns(2) + + with col1: + if 'index' in action_data and action_data['index'] is not None: + st.metric("目标元素索引", action_data['index']) + if 'x' in action_data and action_data['x'] is not None: + st.metric("X 坐标", action_data['x']) + if 'text' in action_data and action_data['text']: + st.text_input("输入文本", action_data['text'], disabled=True) + + with col2: + if 'y' in action_data and action_data['y'] is not None: + st.metric("Y 坐标", action_data['y']) + if 'direction' in action_data and action_data['direction']: + st.metric("滚动方向", action_data['direction']) + if 'app_name' in action_data and action_data['app_name']: + st.metric("应用名称", action_data['app_name']) + + # 显示完整动作 JSON + with st.expander("📄 查看完整动作 JSON"): + st.json(action_data) + else: + st.info("此步骤没有动作数据") + + # 显示动作理由(如果有) + action_reason = current_step.get('action_reason') + if action_reason: + st.markdown("### 💭 动作理由") + st.markdown(f"> {action_reason}") + + # 显示步骤总结(如果有) + summary = current_step.get('summary') + if summary: + st.markdown("### 📝 步骤总结") + st.markdown(f"> {summary}") + + # Tab 3: UI 元素 + with tab3: + st.subheader("UI 元素列表") + + # 获取 UI 元素 + ui_elements_keys = ['before_ui_elements', 'after_ui_elements', 'ui_elements', 'before_element_list', 'after_element_list'] + ui_data = {} + + for key in ui_elements_keys: + if key in current_step and current_step[key]: + ui_data[key] = current_step[key] + + if ui_data: + # 选择要显示的 UI 元素集 + selected_ui_key = st.selectbox( + "选择 UI 元素集:", + list(ui_data.keys()), + format_func=lambda x: x.replace('_', ' ').title() + ) + + ui_elements = ui_data[selected_ui_key] + st.info(f"共 {len(ui_elements)} 个 UI 元素") + + # 搜索过滤 + search_term = st.text_input("🔍 搜索 UI 元素(文本、描述、ID):") + + # 过滤元素 + filtered_elements = [] + for idx, elem in enumerate(ui_elements): + if search_term: + text = str(elem.get('text', '')).lower() + desc = str(elem.get('content_description', '')).lower() + res_id = str(elem.get('resource_id', '')).lower() + if search_term.lower() not in text + desc + res_id: + continue + filtered_elements.append((idx, elem)) + + # 显示过滤后的元素 + st.info(f"显示 {len(filtered_elements)} 个元素") + + for idx, elem in filtered_elements: + with st.expander(f"**[{idx}]** {elem.get('text', elem.get('content_description', elem.get('class_name', 'Element')))}"): + st.markdown(render_ui_element(elem, idx)) + + # 显示完整元素数据 + with st.expander("🔍 查看完整数据"): + st.json({k: str(v) if not isinstance(v, (dict, list, int, float, bool, type(None))) else v + for k, v in elem.items()}) + else: + st.info("此步骤没有 UI 元素数据") + + # Tab 4: LLM 交互 + with tab4: + st.subheader("LLM 提示词和响应") + + # 动作提示词 + action_prompt = current_step.get('action_prompt') + if action_prompt: + with st.expander("🤖 动作选择提示词", expanded=True): + st.code(action_prompt, language="text") + + # 动作原始响应 + action_raw_response = current_step.get('action_raw_response') + if action_raw_response: + with st.expander("💬 动作选择响应"): + st.code(action_raw_response, language="text") + + # 总结提示词 + summary_prompt = current_step.get('summary_prompt') + if summary_prompt: + with st.expander("📋 总结提示词"): + st.code(summary_prompt, language="text") + + # 总结原始响应 + summary_raw_response = current_step.get('summary_raw_response') + if summary_raw_response: + with st.expander("📝 总结响应"): + st.code(summary_raw_response, language="text") + + if not any([action_prompt, action_raw_response, summary_prompt, summary_raw_response]): + st.info("此步骤没有 LLM 交互数据") + + +if __name__ == "__main__": + main() diff --git a/trajectory_viewer_requirements.txt b/trajectory_viewer_requirements.txt new file mode 100644 index 00000000..8eac6b46 --- /dev/null +++ b/trajectory_viewer_requirements.txt @@ -0,0 +1,3 @@ +streamlit>=1.28.0 +numpy>=1.24.0 +Pillow>=10.0.0