324 lines
13 KiB
Python
324 lines
13 KiB
Python
# Copyright (C) 2025 AIDC-AI
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
CharacterAnalyzer - VLM-based character appearance extraction
|
|
|
|
Analyzes reference images to extract detailed character descriptions
|
|
for maintaining visual consistency across video frames.
|
|
"""
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import re
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional
|
|
|
|
from loguru import logger
|
|
from openai import AsyncOpenAI
|
|
|
|
|
|
@dataclass
|
|
class CharacterAnalysisResult:
|
|
"""Result of character image analysis"""
|
|
|
|
appearance_description: str = "" # Physical features
|
|
clothing_description: str = "" # What they're wearing
|
|
distinctive_features: List[str] = None # Unique identifying features
|
|
|
|
def __post_init__(self):
|
|
if self.distinctive_features is None:
|
|
self.distinctive_features = []
|
|
|
|
def to_prompt_description(self) -> str:
|
|
"""Generate a prompt-ready character description"""
|
|
parts = []
|
|
|
|
if self.appearance_description:
|
|
parts.append(self.appearance_description)
|
|
|
|
if self.clothing_description:
|
|
parts.append(f"wearing {self.clothing_description}")
|
|
|
|
if self.distinctive_features:
|
|
features = ", ".join(self.distinctive_features)
|
|
parts.append(f"with {features}")
|
|
|
|
return ", ".join(parts) if parts else ""
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"appearance_description": self.appearance_description,
|
|
"clothing_description": self.clothing_description,
|
|
"distinctive_features": self.distinctive_features,
|
|
}
|
|
|
|
|
|
class CharacterAnalyzer:
|
|
"""
|
|
VLM-based character appearance analyzer
|
|
|
|
Analyzes reference images to extract detailed character descriptions
|
|
that can be injected into image generation prompts.
|
|
|
|
Example:
|
|
>>> analyzer = CharacterAnalyzer()
|
|
>>> result = await analyzer.analyze_reference_image("character.png")
|
|
>>> print(result.appearance_description)
|
|
"young woman with long black hair, round face, fair skin"
|
|
>>> print(result.to_prompt_description())
|
|
"young woman with long black hair, round face, fair skin, wearing blue hoodie, with round glasses"
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize CharacterAnalyzer"""
|
|
pass
|
|
|
|
async def analyze_reference_image(
|
|
self,
|
|
image_path: str,
|
|
) -> CharacterAnalysisResult:
|
|
"""
|
|
Analyze a reference image to extract character appearance
|
|
|
|
Args:
|
|
image_path: Path to the reference image
|
|
|
|
Returns:
|
|
CharacterAnalysisResult with extracted descriptions
|
|
"""
|
|
logger.info(f"Analyzing character reference image: {image_path}")
|
|
|
|
# Check if file exists
|
|
if not os.path.exists(image_path):
|
|
logger.warning(f"Image not found: {image_path}")
|
|
return CharacterAnalysisResult()
|
|
|
|
try:
|
|
# Read and encode image
|
|
with open(image_path, "rb") as f:
|
|
image_data = base64.b64encode(f.read()).decode("utf-8")
|
|
|
|
# Determine image type
|
|
ext = os.path.splitext(image_path)[1].lower()
|
|
media_type = "image/png" if ext == ".png" else "image/jpeg"
|
|
|
|
# VLM prompt for character analysis - optimized for storyboard consistency
|
|
# Focus on CONSTANT features (identity), exclude VARIABLE features (pose/expression)
|
|
analysis_prompt = """Analyze this character/person for VIDEO STORYBOARD consistency.
|
|
|
|
GOAL: Extract features that should remain CONSISTENT across different video frames.
|
|
The output will be injected into image generation prompts for multiple scenes.
|
|
|
|
Extract ONLY these CONSTANT features:
|
|
1. Identity: gender, approximate age group (child/young/middle-aged/elderly)
|
|
2. Hair: color, length, style (NOT affected by wind/movement)
|
|
3. Face: skin tone, face shape (NOT expressions)
|
|
4. Clothing: type and colors (assume same outfit throughout video)
|
|
5. Distinctive: glasses, accessories, tattoos, scars, unique marks
|
|
|
|
DO NOT include:
|
|
- Expressions (smile, frown) - changes per scene
|
|
- Poses/gestures - changes per scene
|
|
- View angle - determined by scene composition
|
|
- Lighting/shadows - scene-dependent
|
|
- Background elements
|
|
|
|
Output JSON format (simple strings for direct prompt injection):
|
|
{
|
|
"identity": "elderly man" or "young woman" etc,
|
|
"appearance": "short gray hair, light skin, round face",
|
|
"clothing": "brown sweater vest over white shirt, dark trousers",
|
|
"distinctive": ["round glasses", "silver watch"]
|
|
}
|
|
|
|
Output ONLY the JSON, no explanation."""
|
|
|
|
# Build multimodal message
|
|
messages = [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": analysis_prompt},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:{media_type};base64,{image_data}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
]
|
|
|
|
# Get VLM configuration
|
|
# Priority: Environment variables > config.yaml > defaults
|
|
from pixelle_video.config import config_manager
|
|
|
|
# VLM config from config.yaml (now part of PixelleVideoConfig)
|
|
vlm_config = config_manager.config.vlm
|
|
|
|
# Environment variables override config.yaml
|
|
vlm_provider = os.getenv("VLM_PROVIDER") or vlm_config.provider or "qwen"
|
|
vlm_api_key = os.getenv("VLM_API_KEY") or os.getenv("DASHSCOPE_API_KEY") or vlm_config.api_key
|
|
vlm_base_url = os.getenv("VLM_BASE_URL") or vlm_config.base_url
|
|
vlm_model = os.getenv("VLM_MODEL") or vlm_config.model
|
|
|
|
# Configure based on provider
|
|
if vlm_provider == "qwen":
|
|
# 通义千问 Qwen VL
|
|
vlm_base_url = vlm_base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1"
|
|
vlm_model = vlm_model or "qwen-vl-plus" # or qwen-vl-max, qwen3-vl-plus
|
|
logger.info(f"Using Qwen VL: model={vlm_model}")
|
|
elif vlm_provider == "glm":
|
|
# 智谱 GLM-4V
|
|
from pixelle_video.config import config_manager
|
|
llm_config = config_manager.config.llm
|
|
vlm_api_key = vlm_api_key or llm_config.api_key
|
|
vlm_base_url = vlm_base_url or llm_config.base_url
|
|
vlm_model = vlm_model or "glm-4v-flash"
|
|
logger.info(f"Using GLM VL: model={vlm_model}")
|
|
else: # openai or other
|
|
from pixelle_video.config import config_manager
|
|
llm_config = config_manager.config.llm
|
|
vlm_api_key = vlm_api_key or llm_config.api_key
|
|
vlm_base_url = vlm_base_url or llm_config.base_url
|
|
vlm_model = vlm_model or llm_config.model
|
|
logger.info(f"Using {vlm_provider} VL: model={vlm_model}")
|
|
|
|
if not vlm_api_key:
|
|
logger.error("No VLM API key configured. Set VLM_API_KEY or DASHSCOPE_API_KEY environment variable.")
|
|
return CharacterAnalysisResult()
|
|
|
|
# Create OpenAI-compatible client
|
|
client = AsyncOpenAI(
|
|
api_key=vlm_api_key,
|
|
base_url=vlm_base_url
|
|
)
|
|
|
|
# Call VLM
|
|
response = await client.chat.completions.create(
|
|
model=vlm_model,
|
|
messages=messages,
|
|
temperature=0.3,
|
|
max_tokens=2000
|
|
)
|
|
|
|
vlm_response = response.choices[0].message.content if response.choices else None
|
|
|
|
if vlm_response:
|
|
logger.debug(f"VLM character analysis response: {vlm_response[:150] if len(vlm_response) > 150 else vlm_response}...")
|
|
else:
|
|
logger.warning(f"VLM returned empty content. Full response: {response}")
|
|
|
|
# Parse response
|
|
return self._parse_response(vlm_response)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Character analysis failed: {e}")
|
|
return CharacterAnalysisResult()
|
|
|
|
def _parse_response(self, response: str) -> CharacterAnalysisResult:
|
|
"""Parse VLM response into CharacterAnalysisResult"""
|
|
if not response:
|
|
logger.warning("Empty VLM response")
|
|
return CharacterAnalysisResult()
|
|
|
|
# Log full response for debugging
|
|
logger.debug(f"Full VLM response:\n{response}")
|
|
|
|
try:
|
|
# Remove markdown code blocks if present
|
|
cleaned = response.strip()
|
|
if cleaned.startswith("```json"):
|
|
cleaned = cleaned[7:]
|
|
elif cleaned.startswith("```"):
|
|
cleaned = cleaned[3:]
|
|
if cleaned.endswith("```"):
|
|
cleaned = cleaned[:-3]
|
|
cleaned = cleaned.strip()
|
|
|
|
# Try to extract JSON from response
|
|
match = re.search(r'\{[\s\S]*\}', cleaned)
|
|
if match:
|
|
json_str = match.group()
|
|
logger.debug(f"Extracted JSON: {json_str[:200]}...")
|
|
data = json.loads(json_str)
|
|
else:
|
|
logger.warning(f"No JSON found in response, trying direct parse")
|
|
data = json.loads(cleaned)
|
|
|
|
# Handle nested JSON structures - flatten to strings
|
|
# New field names: identity, appearance, clothing, distinctive
|
|
identity = data.get("identity", "")
|
|
appearance = data.get("appearance", "") or data.get("appearance_description", "")
|
|
|
|
if isinstance(appearance, dict):
|
|
# Flatten nested object to descriptive string
|
|
parts = []
|
|
for key, value in appearance.items():
|
|
if isinstance(value, dict):
|
|
details = ", ".join(f"{k}: {v}" for k, v in value.items())
|
|
parts.append(f"{key} ({details})")
|
|
else:
|
|
parts.append(f"{key}: {value}")
|
|
appearance = "; ".join(parts)
|
|
|
|
# Combine identity + appearance for full description
|
|
if identity and appearance:
|
|
full_appearance = f"{identity}, {appearance}"
|
|
else:
|
|
full_appearance = identity or appearance
|
|
|
|
clothing = data.get("clothing", "") or data.get("clothing_description", "")
|
|
if isinstance(clothing, dict):
|
|
# Flatten nested clothing description
|
|
parts = []
|
|
for person, items in clothing.items():
|
|
if isinstance(items, dict):
|
|
details = ", ".join(f"{k}: {v}" for k, v in items.items())
|
|
parts.append(f"{person} ({details})")
|
|
else:
|
|
parts.append(f"{person}: {items}")
|
|
clothing = "; ".join(parts)
|
|
|
|
distinctive = data.get("distinctive", []) or data.get("distinctive_features", [])
|
|
if not isinstance(distinctive, list):
|
|
distinctive = [str(distinctive)]
|
|
|
|
result = CharacterAnalysisResult(
|
|
appearance_description=full_appearance,
|
|
clothing_description=clothing,
|
|
distinctive_features=distinctive,
|
|
)
|
|
|
|
logger.info(f"Character analysis extracted: {result.appearance_description[:80] if result.appearance_description else 'empty'}...")
|
|
return result
|
|
|
|
except (json.JSONDecodeError, KeyError) as e:
|
|
logger.warning(f"Failed to parse VLM response: {e}")
|
|
logger.debug(f"Response that failed to parse: {response[:500]}")
|
|
|
|
# Try to use the raw response as appearance description (fallback)
|
|
if response and 20 < len(response) < 500:
|
|
# Clean up the response
|
|
fallback = response.strip()
|
|
if "```" in fallback:
|
|
fallback = re.sub(r'```.*?```', '', fallback, flags=re.DOTALL).strip()
|
|
if fallback:
|
|
logger.info(f"Using raw response as appearance: {fallback[:80]}...")
|
|
return CharacterAnalysisResult(
|
|
appearance_description=fallback
|
|
)
|
|
|
|
return CharacterAnalysisResult()
|