270 lines
11 KiB
Python
270 lines
11 KiB
Python
# Copyright (C) 2025 AIDC-AI
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
CharacterAnalyzer - VLM-based character appearance extraction
|
|
|
|
Analyzes reference images to extract detailed character descriptions
|
|
for maintaining visual consistency across video frames.
|
|
"""
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import re
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional
|
|
|
|
from loguru import logger
|
|
from openai import AsyncOpenAI
|
|
|
|
|
|
@dataclass
|
|
class CharacterAnalysisResult:
|
|
"""Result of character image analysis"""
|
|
|
|
appearance_description: str = "" # Physical features
|
|
clothing_description: str = "" # What they're wearing
|
|
distinctive_features: List[str] = None # Unique identifying features
|
|
|
|
def __post_init__(self):
|
|
if self.distinctive_features is None:
|
|
self.distinctive_features = []
|
|
|
|
def to_prompt_description(self) -> str:
|
|
"""Generate a prompt-ready character description"""
|
|
parts = []
|
|
|
|
if self.appearance_description:
|
|
parts.append(self.appearance_description)
|
|
|
|
if self.clothing_description:
|
|
parts.append(f"wearing {self.clothing_description}")
|
|
|
|
if self.distinctive_features:
|
|
features = ", ".join(self.distinctive_features)
|
|
parts.append(f"with {features}")
|
|
|
|
return ", ".join(parts) if parts else ""
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"appearance_description": self.appearance_description,
|
|
"clothing_description": self.clothing_description,
|
|
"distinctive_features": self.distinctive_features,
|
|
}
|
|
|
|
|
|
class CharacterAnalyzer:
|
|
"""
|
|
VLM-based character appearance analyzer
|
|
|
|
Analyzes reference images to extract detailed character descriptions
|
|
that can be injected into image generation prompts.
|
|
|
|
Example:
|
|
>>> analyzer = CharacterAnalyzer()
|
|
>>> result = await analyzer.analyze_reference_image("character.png")
|
|
>>> print(result.appearance_description)
|
|
"young woman with long black hair, round face, fair skin"
|
|
>>> print(result.to_prompt_description())
|
|
"young woman with long black hair, round face, fair skin, wearing blue hoodie, with round glasses"
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize CharacterAnalyzer"""
|
|
pass
|
|
|
|
async def analyze_reference_image(
|
|
self,
|
|
image_path: str,
|
|
) -> CharacterAnalysisResult:
|
|
"""
|
|
Analyze a reference image to extract character appearance
|
|
|
|
Args:
|
|
image_path: Path to the reference image
|
|
|
|
Returns:
|
|
CharacterAnalysisResult with extracted descriptions
|
|
"""
|
|
logger.info(f"Analyzing character reference image: {image_path}")
|
|
|
|
# Check if file exists
|
|
if not os.path.exists(image_path):
|
|
logger.warning(f"Image not found: {image_path}")
|
|
return CharacterAnalysisResult()
|
|
|
|
try:
|
|
# Read and encode image
|
|
with open(image_path, "rb") as f:
|
|
image_data = base64.b64encode(f.read()).decode("utf-8")
|
|
|
|
# Determine image type
|
|
ext = os.path.splitext(image_path)[1].lower()
|
|
media_type = "image/png" if ext == ".png" else "image/jpeg"
|
|
|
|
# VLM prompt for character analysis
|
|
analysis_prompt = """Analyze this character/person image and extract detailed visual descriptions.
|
|
|
|
Provide your analysis in JSON format:
|
|
{
|
|
"appearance_description": "Detailed physical features including: hair (color, length, style), face shape, eye color, skin tone, approximate age, gender, body type. Be specific and descriptive.",
|
|
"clothing_description": "What they're wearing - describe colors, style, and notable items.",
|
|
"distinctive_features": ["list", "of", "unique", "identifying", "features"]
|
|
}
|
|
|
|
Focus on visually distinctive and reproducible features. Be specific enough that another image generator could recreate a similar-looking character.
|
|
|
|
Examples of good distinctive_features: "round glasses", "freckles", "scar on left cheek", "silver earrings", "bright red lipstick"
|
|
|
|
Output ONLY the JSON object, no additional text."""
|
|
|
|
# Build multimodal message
|
|
messages = [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": analysis_prompt},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:{media_type};base64,{image_data}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
]
|
|
|
|
# Get VLM configuration
|
|
# Priority: Environment variables > config.yaml > defaults
|
|
from pixelle_video.config import config_manager
|
|
|
|
# Try to get VLM config from config.yaml
|
|
vlm_config = getattr(config_manager.config, 'vlm', None)
|
|
|
|
vlm_provider = os.getenv("VLM_PROVIDER") or (vlm_config.provider if vlm_config and hasattr(vlm_config, 'provider') else "qwen")
|
|
vlm_api_key = os.getenv("VLM_API_KEY") or os.getenv("DASHSCOPE_API_KEY") or (vlm_config.api_key if vlm_config and hasattr(vlm_config, 'api_key') else None)
|
|
vlm_base_url = os.getenv("VLM_BASE_URL") or (vlm_config.base_url if vlm_config and hasattr(vlm_config, 'base_url') else None)
|
|
vlm_model = os.getenv("VLM_MODEL") or (vlm_config.model if vlm_config and hasattr(vlm_config, 'model') else None)
|
|
|
|
# Configure based on provider
|
|
if vlm_provider == "qwen":
|
|
# 通义千问 Qwen VL
|
|
vlm_base_url = vlm_base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1"
|
|
vlm_model = vlm_model or "qwen-vl-plus" # or qwen-vl-max, qwen3-vl-plus
|
|
logger.info(f"Using Qwen VL: model={vlm_model}")
|
|
elif vlm_provider == "glm":
|
|
# 智谱 GLM-4V
|
|
from pixelle_video.config import config_manager
|
|
llm_config = config_manager.config.llm
|
|
vlm_api_key = vlm_api_key or llm_config.api_key
|
|
vlm_base_url = vlm_base_url or llm_config.base_url
|
|
vlm_model = vlm_model or "glm-4v-flash"
|
|
logger.info(f"Using GLM VL: model={vlm_model}")
|
|
else: # openai or other
|
|
from pixelle_video.config import config_manager
|
|
llm_config = config_manager.config.llm
|
|
vlm_api_key = vlm_api_key or llm_config.api_key
|
|
vlm_base_url = vlm_base_url or llm_config.base_url
|
|
vlm_model = vlm_model or llm_config.model
|
|
logger.info(f"Using {vlm_provider} VL: model={vlm_model}")
|
|
|
|
if not vlm_api_key:
|
|
logger.error("No VLM API key configured. Set VLM_API_KEY or DASHSCOPE_API_KEY environment variable.")
|
|
return CharacterAnalysisResult()
|
|
|
|
# Create OpenAI-compatible client
|
|
client = AsyncOpenAI(
|
|
api_key=vlm_api_key,
|
|
base_url=vlm_base_url
|
|
)
|
|
|
|
# Call VLM
|
|
response = await client.chat.completions.create(
|
|
model=vlm_model,
|
|
messages=messages,
|
|
temperature=0.3,
|
|
max_tokens=2000
|
|
)
|
|
|
|
vlm_response = response.choices[0].message.content if response.choices else None
|
|
|
|
if vlm_response:
|
|
logger.debug(f"VLM character analysis response: {vlm_response[:150] if len(vlm_response) > 150 else vlm_response}...")
|
|
else:
|
|
logger.warning(f"VLM returned empty content. Full response: {response}")
|
|
|
|
# Parse response
|
|
return self._parse_response(vlm_response)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Character analysis failed: {e}")
|
|
return CharacterAnalysisResult()
|
|
|
|
def _parse_response(self, response: str) -> CharacterAnalysisResult:
|
|
"""Parse VLM response into CharacterAnalysisResult"""
|
|
if not response:
|
|
logger.warning("Empty VLM response")
|
|
return CharacterAnalysisResult()
|
|
|
|
# Log full response for debugging
|
|
logger.debug(f"Full VLM response:\n{response}")
|
|
|
|
try:
|
|
# Remove markdown code blocks if present
|
|
cleaned = response.strip()
|
|
if cleaned.startswith("```json"):
|
|
cleaned = cleaned[7:]
|
|
elif cleaned.startswith("```"):
|
|
cleaned = cleaned[3:]
|
|
if cleaned.endswith("```"):
|
|
cleaned = cleaned[:-3]
|
|
cleaned = cleaned.strip()
|
|
|
|
# Try to extract JSON from response
|
|
match = re.search(r'\{[\s\S]*\}', cleaned)
|
|
if match:
|
|
json_str = match.group()
|
|
logger.debug(f"Extracted JSON: {json_str[:200]}...")
|
|
data = json.loads(json_str)
|
|
else:
|
|
logger.warning(f"No JSON found in response, trying direct parse")
|
|
data = json.loads(cleaned)
|
|
|
|
result = CharacterAnalysisResult(
|
|
appearance_description=data.get("appearance_description", ""),
|
|
clothing_description=data.get("clothing_description", ""),
|
|
distinctive_features=data.get("distinctive_features", []),
|
|
)
|
|
|
|
logger.info(f"Character analysis extracted: {result.appearance_description[:80]}...")
|
|
return result
|
|
|
|
except (json.JSONDecodeError, KeyError) as e:
|
|
logger.warning(f"Failed to parse VLM response: {e}")
|
|
logger.debug(f"Response that failed to parse: {response[:500]}")
|
|
|
|
# Try to use the raw response as appearance description (fallback)
|
|
if response and 20 < len(response) < 500:
|
|
# Clean up the response
|
|
fallback = response.strip()
|
|
if "```" in fallback:
|
|
fallback = re.sub(r'```.*?```', '', fallback, flags=re.DOTALL).strip()
|
|
if fallback:
|
|
logger.info(f"Using raw response as appearance: {fallback[:80]}...")
|
|
return CharacterAnalysisResult(
|
|
appearance_description=fallback
|
|
)
|
|
|
|
return CharacterAnalysisResult()
|