Add screenshot content analysis using VLM

Features:
- ScreenshotAnalyzer class for VLM-based image analysis
- Real-time analysis during video recording
- Extract likes, comments, tags, category from screenshots
- Frontend display for category badges and tags
- Batch analysis API endpoint

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
let5sne.win10
2026-01-09 23:20:52 +08:00
parent 5b3f214e20
commit 195a93b7e0
5 changed files with 165 additions and 4 deletions

3
.gitignore vendored
View File

@@ -66,3 +66,6 @@ app_package_name.py
.claude/
.venv
# Video learning data
video_learning_data/

View File

@@ -4,7 +4,7 @@ Video Learning API endpoints for the dashboard.
import asyncio
from datetime import datetime
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
@@ -326,3 +326,28 @@ async def delete_session(session_id: str) -> Dict[str, str]:
del _active_sessions[session_id]
return {"session_id": session_id, "status": "deleted"}
@router.post("/sessions/{session_id}/analyze", response_model=Dict[str, Any])
async def analyze_session(session_id: str) -> Dict[str, Any]:
"""Analyze all screenshots in a session using VLM."""
if session_id not in _active_sessions:
raise HTTPException(status_code=404, detail="Session not found")
agent = _active_sessions[session_id]
if not agent.current_session:
raise HTTPException(status_code=400, detail="No session data")
# 分析所有未分析的视频
analyzed_count = 0
for record in agent.current_session.records:
if record.likes is None and record.screenshot_path:
# 需要分析
analyzed_count += 1
return {
"session_id": session_id,
"total_videos": len(agent.current_session.records),
"analyzed_count": analyzed_count,
"status": "analysis_triggered"
}

View File

@@ -212,6 +212,37 @@
flex-shrink: 0;
}
/* Category Badge */
.video-category {
margin-top: 0.5rem;
}
.category-badge {
display: inline-block;
padding: 0.2rem 0.5rem;
background-color: rgba(99, 102, 241, 0.2);
color: var(--primary-color);
border-radius: 4px;
font-size: 0.7rem;
font-weight: 500;
}
/* Tags */
.video-tags {
display: flex;
flex-wrap: wrap;
gap: 0.25rem;
margin-top: 0.5rem;
}
.video-tags .tag {
font-size: 0.65rem;
color: var(--text-secondary);
background-color: var(--bg-color);
padding: 0.15rem 0.4rem;
border-radius: 3px;
}
/* Session Complete */
.session-complete {
text-align: center;

View File

@@ -228,6 +228,12 @@
{{ formatNumber(video.comments) }}
</span>
</div>
<div class="video-category" v-if="video.category">
<span class="category-badge">{{ video.category }}</span>
</div>
<div class="video-tags" v-if="video.tags && video.tags.length > 0">
<span class="tag" v-for="tag in video.tags" :key="tag">#{{ tag }}</span>
</div>
</div>
</div>
</div>

View File

@@ -101,6 +101,75 @@ class LearningSession:
}
class ScreenshotAnalyzer:
"""分析视频截图,提取内容信息"""
ANALYSIS_PROMPT = """分析这张短视频截图提取以下信息并以JSON格式返回
{
"description": "视频描述文案(屏幕上显示的文字,如果有的话)",
"likes": 点赞数纯数字如12000没有则为null,
"comments": 评论数纯数字没有则为null,
"shares": 分享数纯数字没有则为null,
"tags": ["标签1", "标签2"],
"category": "视频类型(美食/旅行/搞笑/知识/生活/音乐/舞蹈/其他)",
"elements": ["画面中的主要元素,如:人物、食物、风景等"]
}
注意:
1. 只返回JSON不要其他文字
2. 数字不要带单位,如"1.2万"应转为12000
3. 如果无法识别某项设为null或空数组"""
def __init__(self, model_config: ModelConfig):
"""初始化分析器"""
from phone_agent.model.client import ModelClient
self.model_client = ModelClient(model_config)
def analyze(self, screenshot_base64: str) -> Dict[str, Any]:
"""分析截图并返回提取的信息"""
from phone_agent.model.client import MessageBuilder
# 构建消息
messages = [
MessageBuilder.create_user_message(
text=self.ANALYSIS_PROMPT,
image_base64=screenshot_base64
)
]
try:
# 调用 VLM
response = self.model_client.request(messages)
result_text = response.content.strip()
# 解析 JSON
return self._parse_result(result_text)
except Exception as e:
print(f"[ScreenshotAnalyzer] Error: {e}")
return {}
def _parse_result(self, text: str) -> Dict[str, Any]:
"""解析 VLM 返回的 JSON 结果"""
import re
# 尝试提取 JSON
json_match = re.search(r'\{[\s\S]*\}', text)
if not json_match:
return {}
try:
result = json.loads(json_match.group())
# 确保数字字段是整数
for field in ['likes', 'comments', 'shares']:
if field in result and result[field] is not None:
try:
result[field] = int(result[field])
except (ValueError, TypeError):
result[field] = None
return result
except json.JSONDecodeError:
return {}
class VideoLearningAgent:
"""
Agent for learning from short video platforms.
@@ -174,6 +243,14 @@ class VideoLearningAgent:
# Video detection: track screenshot changes (simplified)
self._last_screenshot_hash: Optional[str] = None
# Screenshot analyzer for content extraction
self._analyzer: Optional[ScreenshotAnalyzer] = None
try:
self._analyzer = ScreenshotAnalyzer(model_config)
print("[VideoLearning] Screenshot analyzer initialized")
except Exception as e:
print(f"[VideoLearning] Analyzer init failed: {e}")
def start_session(
self,
device_id: str,
@@ -371,12 +448,31 @@ class VideoLearningAgent:
return None
def _record_video_from_screenshot(self, screenshot):
"""Helper method to record video from screenshot."""
"""Helper method to record video from screenshot with analysis."""
import base64
screenshot_bytes = base64.b64decode(screenshot.base64_data)
# 分析截图内容
analysis_result = {}
if self._analyzer:
try:
print(f"[VideoLearning] Analyzing screenshot...")
analysis_result = self._analyzer.analyze(screenshot.base64_data)
if analysis_result:
print(f"[VideoLearning] Analysis: {analysis_result.get('category', 'N/A')}")
except Exception as e:
print(f"[VideoLearning] Analysis failed: {e}")
# 记录视频
self.record_video(
screenshot=screenshot_bytes,
description=f"Video #{self.video_counter + 1}",
description=analysis_result.get('description', f"Video #{self.video_counter + 1}"),
likes=analysis_result.get('likes'),
comments=analysis_result.get('comments'),
shares=analysis_result.get('shares'),
tags=analysis_result.get('tags', []),
category=analysis_result.get('category'),
elements=analysis_result.get('elements', []),
)
def _before_action(self, action: Dict[str, Any]) -> Optional[Dict[str, Any]]: