增加视频生成过程详细信息的持久化l; 优化session管理逻辑;

This commit is contained in:
puke
2025-11-17 17:56:07 +08:00
parent d64292dce1
commit fa8051d826
8 changed files with 1043 additions and 416 deletions

View File

@@ -350,12 +350,88 @@ class CustomPipeline(BasePipeline):
logger.info(f"Size: {file_size / (1024*1024):.2f} MB") logger.info(f"Size: {file_size / (1024*1024):.2f} MB")
logger.info(f"Frames: {len(storyboard.frames)}") logger.info(f"Frames: {len(storyboard.frames)}")
# ========== Step 7: Persist metadata and storyboard ==========
await self._persist_task_data(
storyboard=storyboard,
result=result,
input_params={
"text": text,
"custom_param_example": custom_param_example,
"voice_id": voice_id,
"tts_workflow": tts_workflow,
"tts_speed": tts_speed,
"ref_audio": ref_audio,
"image_workflow": image_workflow,
"frame_template": frame_template,
"bgm_path": bgm_path,
"bgm_volume": bgm_volume,
}
)
return result return result
except Exception as e: except Exception as e:
logger.error(f"Custom pipeline failed: {e}") logger.error(f"Custom pipeline failed: {e}")
raise raise
# ==================== Persistence ====================
async def _persist_task_data(
self,
storyboard: Storyboard,
result: VideoGenerationResult,
input_params: dict
):
"""
Persist task metadata and storyboard to filesystem
Args:
storyboard: Complete storyboard
result: Video generation result
input_params: Input parameters used for generation
"""
try:
task_id = storyboard.config.task_id
if not task_id:
logger.warning("No task_id in storyboard, skipping persistence")
return
# Build metadata
metadata = {
"task_id": task_id,
"created_at": storyboard.created_at.isoformat() if storyboard.created_at else None,
"completed_at": storyboard.completed_at.isoformat() if storyboard.completed_at else None,
"status": "completed",
"input": input_params,
"result": {
"video_path": result.video_path,
"duration": result.duration,
"file_size": result.file_size,
"n_frames": len(storyboard.frames)
},
"config": {
"llm_model": self.core.config.get("llm", {}).get("model", "unknown"),
"llm_base_url": self.core.config.get("llm", {}).get("base_url", "unknown"),
"comfyui_url": self.core.config.get("comfyui", {}).get("comfyui_url", "unknown"),
"runninghub_enabled": bool(self.core.config.get("comfyui", {}).get("runninghub_api_key")),
}
}
# Save metadata
await self.core.persistence.save_task_metadata(task_id, metadata)
logger.info(f"💾 Saved task metadata: {task_id}")
# Save storyboard
await self.core.persistence.save_storyboard(task_id, storyboard)
logger.info(f"💾 Saved storyboard: {task_id}")
except Exception as e:
logger.error(f"Failed to persist task data: {e}")
# Don't raise - persistence failure shouldn't break video generation
# ==================== Custom Helper Methods ==================== # ==================== Custom Helper Methods ====================
# Add your own helper methods here # Add your own helper methods here

View File

@@ -504,9 +504,90 @@ class StandardPipeline(BasePipeline):
logger.info(f" Size: {file_size / (1024*1024):.2f} MB") logger.info(f" Size: {file_size / (1024*1024):.2f} MB")
logger.info(f" Frames: {len(storyboard.frames)}") logger.info(f" Frames: {len(storyboard.frames)}")
# ========== Step 7: Persist metadata and storyboard ==========
await self._persist_task_data(
storyboard=storyboard,
result=result,
input_params={
"text": text,
"mode": mode,
"title": title,
"n_scenes": n_scenes,
"tts_inference_mode": tts_inference_mode,
"tts_voice": tts_voice,
"voice_id": voice_id,
"tts_workflow": tts_workflow,
"tts_speed": tts_speed,
"ref_audio": ref_audio,
"image_workflow": image_workflow,
"prompt_prefix": prompt_prefix,
"frame_template": frame_template,
"template_params": template_params,
"bgm_path": bgm_path,
"bgm_volume": bgm_volume,
"bgm_mode": bgm_mode,
}
)
return result return result
except Exception as e: except Exception as e:
logger.error(f"❌ Video generation failed: {e}") logger.error(f"❌ Video generation failed: {e}")
raise raise
async def _persist_task_data(
self,
storyboard: Storyboard,
result: VideoGenerationResult,
input_params: dict
):
"""
Persist task metadata and storyboard to filesystem
Args:
storyboard: Complete storyboard
result: Video generation result
input_params: Input parameters used for generation
"""
try:
task_id = storyboard.config.task_id
if not task_id:
logger.warning("No task_id in storyboard, skipping persistence")
return
# Build metadata
metadata = {
"task_id": task_id,
"created_at": storyboard.created_at.isoformat() if storyboard.created_at else None,
"completed_at": storyboard.completed_at.isoformat() if storyboard.completed_at else None,
"status": "completed",
"input": input_params,
"result": {
"video_path": result.video_path,
"duration": result.duration,
"file_size": result.file_size,
"n_frames": len(storyboard.frames)
},
"config": {
"llm_model": self.core.config.get("llm", {}).get("model", "unknown"),
"llm_base_url": self.core.config.get("llm", {}).get("base_url", "unknown"),
"comfyui_url": self.core.config.get("comfyui", {}).get("comfyui_url", "unknown"),
"runninghub_enabled": bool(self.core.config.get("comfyui", {}).get("runninghub_api_key")),
}
}
# Save metadata
await self.core.persistence.save_task_metadata(task_id, metadata)
logger.info(f"💾 Saved task metadata: {task_id}")
# Save storyboard
await self.core.persistence.save_storyboard(task_id, storyboard)
logger.info(f"💾 Saved storyboard: {task_id}")
except Exception as e:
logger.error(f"Failed to persist task data: {e}")
# Don't raise - persistence failure shouldn't break video generation

View File

@@ -26,6 +26,7 @@ from pixelle_video.services.tts_service import TTSService
from pixelle_video.services.media import MediaService from pixelle_video.services.media import MediaService
from pixelle_video.services.video import VideoService from pixelle_video.services.video import VideoService
from pixelle_video.services.frame_processor import FrameProcessor from pixelle_video.services.frame_processor import FrameProcessor
from pixelle_video.services.persistence import PersistenceService
from pixelle_video.pipelines.standard import StandardPipeline from pixelle_video.pipelines.standard import StandardPipeline
from pixelle_video.pipelines.custom import CustomPipeline from pixelle_video.pipelines.custom import CustomPipeline
@@ -80,6 +81,7 @@ class PixelleVideoCore:
self.media: Optional[MediaService] = None self.media: Optional[MediaService] = None
self.video: Optional[VideoService] = None self.video: Optional[VideoService] = None
self.frame_processor: Optional[FrameProcessor] = None self.frame_processor: Optional[FrameProcessor] = None
self.persistence: Optional[PersistenceService] = None
# Video generation pipelines (dictionary of pipeline_name -> pipeline_instance) # Video generation pipelines (dictionary of pipeline_name -> pipeline_instance)
self.pipelines = {} self.pipelines = {}
@@ -108,6 +110,7 @@ class PixelleVideoCore:
self.media = MediaService(self.config) self.media = MediaService(self.config)
self.video = VideoService() self.video = VideoService()
self.frame_processor = FrameProcessor(self) self.frame_processor = FrameProcessor(self)
self.persistence = PersistenceService(output_dir="output")
# 2. Register video generation pipelines # 2. Register video generation pipelines
self.pipelines = { self.pipelines = {

View File

@@ -21,6 +21,7 @@ Services:
- MediaService: Media generation (image & video) - MediaService: Media generation (image & video)
- VideoService: Video processing - VideoService: Video processing
- FrameProcessor: Frame processing orchestrator - FrameProcessor: Frame processing orchestrator
- PersistenceService: Task metadata and storyboard persistence
- ComfyBaseService: Base class for ComfyUI-based services - ComfyBaseService: Base class for ComfyUI-based services
""" """
@@ -30,6 +31,7 @@ from pixelle_video.services.tts_service import TTSService
from pixelle_video.services.media import MediaService from pixelle_video.services.media import MediaService
from pixelle_video.services.video import VideoService from pixelle_video.services.video import VideoService
from pixelle_video.services.frame_processor import FrameProcessor from pixelle_video.services.frame_processor import FrameProcessor
from pixelle_video.services.persistence import PersistenceService
# Backward compatibility alias # Backward compatibility alias
ImageService = MediaService ImageService = MediaService
@@ -42,5 +44,6 @@ __all__ = [
"ImageService", # Backward compatibility "ImageService", # Backward compatibility
"VideoService", "VideoService",
"FrameProcessor", "FrameProcessor",
"PersistenceService",
] ]

View File

@@ -479,7 +479,6 @@ class HTMLFrameGenerator:
# Replace variables in HTML (supports DSL syntax: {{param:type=default}}) # Replace variables in HTML (supports DSL syntax: {{param:type=default}})
html = self._replace_parameters(self.template, context) html = self._replace_parameters(self.template, context)
logger.debug(f"html--->{html}")
# Use provided output path or auto-generate # Use provided output path or auto-generate
if output_path is None: if output_path is None:
# Fallback: auto-generate (for backward compatibility) # Fallback: auto-generate (for backward compatibility)

View File

@@ -0,0 +1,460 @@
# Copyright (C) 2025 AIDC-AI
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Persistence Service
Handles task metadata and storyboard persistence to filesystem.
"""
import json
from pathlib import Path
from typing import List, Optional, Dict, Any
from datetime import datetime
from loguru import logger
from pixelle_video.models.storyboard import Storyboard, StoryboardFrame, StoryboardConfig, ContentMetadata
class PersistenceService:
"""
Task persistence service using filesystem (JSON)
File structure:
output/
└── {task_id}/
├── metadata.json # Task metadata (input, result, config)
├── storyboard.json # Storyboard data (frames, prompts)
├── final.mp4
└── frames/
├── 01_audio.mp3
├── 01_image.png
└── ...
Usage:
persistence = PersistenceService()
# Save metadata
await persistence.save_task_metadata(task_id, metadata)
# Save storyboard
await persistence.save_storyboard(task_id, storyboard)
# Load task
metadata = await persistence.load_task_metadata(task_id)
storyboard = await persistence.load_storyboard(task_id)
# List all tasks
tasks = await persistence.list_tasks(status="completed", limit=50)
"""
def __init__(self, output_dir: str = "output"):
"""
Initialize persistence service
Args:
output_dir: Base output directory (default: "output")
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def get_task_dir(self, task_id: str) -> Path:
"""Get task directory path"""
return self.output_dir / task_id
def get_metadata_path(self, task_id: str) -> Path:
"""Get metadata.json path"""
return self.get_task_dir(task_id) / "metadata.json"
def get_storyboard_path(self, task_id: str) -> Path:
"""Get storyboard.json path"""
return self.get_task_dir(task_id) / "storyboard.json"
# ========================================================================
# Metadata Operations
# ========================================================================
async def save_task_metadata(
self,
task_id: str,
metadata: Dict[str, Any]
):
"""
Save task metadata to filesystem
Args:
task_id: Task ID
metadata: Metadata dict with structure:
{
"task_id": str,
"created_at": str,
"completed_at": str (optional),
"status": str,
"input": dict,
"result": dict (optional),
"config": dict
}
"""
try:
task_dir = self.get_task_dir(task_id)
task_dir.mkdir(parents=True, exist_ok=True)
metadata_path = self.get_metadata_path(task_id)
# Ensure task_id is set
metadata["task_id"] = task_id
# Convert datetime objects to ISO format strings
if "created_at" in metadata and isinstance(metadata["created_at"], datetime):
metadata["created_at"] = metadata["created_at"].isoformat()
if "completed_at" in metadata and isinstance(metadata["completed_at"], datetime):
metadata["completed_at"] = metadata["completed_at"].isoformat()
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
logger.debug(f"Saved task metadata: {task_id}")
except Exception as e:
logger.error(f"Failed to save task metadata {task_id}: {e}")
raise
async def load_task_metadata(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
Load task metadata from filesystem
Args:
task_id: Task ID
Returns:
Metadata dict or None if not found
"""
try:
metadata_path = self.get_metadata_path(task_id)
if not metadata_path.exists():
return None
with open(metadata_path, "r", encoding="utf-8") as f:
metadata = json.load(f)
return metadata
except Exception as e:
logger.error(f"Failed to load task metadata {task_id}: {e}")
return None
async def update_task_status(
self,
task_id: str,
status: str,
error: Optional[str] = None
):
"""
Update task status in metadata
Args:
task_id: Task ID
status: New status (pending, running, completed, failed, cancelled)
error: Error message (optional, for failed status)
"""
try:
metadata = await self.load_task_metadata(task_id)
if not metadata:
logger.warning(f"Cannot update status: task {task_id} not found")
return
metadata["status"] = status
if status in ["completed", "failed", "cancelled"]:
metadata["completed_at"] = datetime.now().isoformat()
if error:
metadata["error"] = error
await self.save_task_metadata(task_id, metadata)
except Exception as e:
logger.error(f"Failed to update task status {task_id}: {e}")
# ========================================================================
# Storyboard Operations
# ========================================================================
async def save_storyboard(
self,
task_id: str,
storyboard: Storyboard
):
"""
Save storyboard to filesystem
Args:
task_id: Task ID
storyboard: Storyboard instance
"""
try:
task_dir = self.get_task_dir(task_id)
task_dir.mkdir(parents=True, exist_ok=True)
storyboard_path = self.get_storyboard_path(task_id)
# Convert storyboard to dict
storyboard_dict = self._storyboard_to_dict(storyboard)
with open(storyboard_path, "w", encoding="utf-8") as f:
json.dump(storyboard_dict, f, indent=2, ensure_ascii=False)
logger.debug(f"Saved storyboard: {task_id}")
except Exception as e:
logger.error(f"Failed to save storyboard {task_id}: {e}")
raise
async def load_storyboard(self, task_id: str) -> Optional[Storyboard]:
"""
Load storyboard from filesystem
Args:
task_id: Task ID
Returns:
Storyboard instance or None if not found
"""
try:
storyboard_path = self.get_storyboard_path(task_id)
if not storyboard_path.exists():
return None
with open(storyboard_path, "r", encoding="utf-8") as f:
storyboard_dict = json.load(f)
# Convert dict to storyboard
storyboard = self._dict_to_storyboard(storyboard_dict)
return storyboard
except Exception as e:
logger.error(f"Failed to load storyboard {task_id}: {e}")
return None
# ========================================================================
# Task Listing & Querying
# ========================================================================
async def list_tasks(
self,
status: Optional[str] = None,
limit: int = 50,
offset: int = 0
) -> List[Dict[str, Any]]:
"""
List tasks with optional filtering
Args:
status: Filter by status (pending, running, completed, failed, cancelled)
limit: Maximum number of tasks to return
offset: Number of tasks to skip
Returns:
List of metadata dicts, sorted by created_at descending
"""
try:
tasks = []
# Scan all task directories
for task_dir in self.output_dir.iterdir():
if not task_dir.is_dir():
continue
metadata_path = task_dir / "metadata.json"
if not metadata_path.exists():
continue
try:
with open(metadata_path, "r", encoding="utf-8") as f:
metadata = json.load(f)
# Filter by status
if status and metadata.get("status") != status:
continue
tasks.append(metadata)
except Exception as e:
logger.warning(f"Failed to load metadata from {task_dir}: {e}")
continue
# Sort by created_at descending
tasks.sort(key=lambda t: t.get("created_at", ""), reverse=True)
# Apply pagination
return tasks[offset:offset + limit]
except Exception as e:
logger.error(f"Failed to list tasks: {e}")
return []
async def task_exists(self, task_id: str) -> bool:
"""Check if task exists"""
return self.get_task_dir(task_id).exists()
async def delete_task(self, task_id: str):
"""
Delete task directory and all files
Args:
task_id: Task ID
"""
try:
task_dir = self.get_task_dir(task_id)
if task_dir.exists():
import shutil
shutil.rmtree(task_dir)
logger.info(f"Deleted task: {task_id}")
except Exception as e:
logger.error(f"Failed to delete task {task_id}: {e}")
raise
# ========================================================================
# Serialization Helpers
# ========================================================================
def _storyboard_to_dict(self, storyboard: Storyboard) -> Dict[str, Any]:
"""Convert Storyboard to dict for JSON serialization"""
return {
"title": storyboard.title,
"config": self._config_to_dict(storyboard.config),
"frames": [self._frame_to_dict(frame) for frame in storyboard.frames],
"content_metadata": self._content_metadata_to_dict(storyboard.content_metadata) if storyboard.content_metadata else None,
"final_video_path": storyboard.final_video_path,
"total_duration": storyboard.total_duration,
"created_at": storyboard.created_at.isoformat() if storyboard.created_at else None,
"completed_at": storyboard.completed_at.isoformat() if storyboard.completed_at else None,
}
def _dict_to_storyboard(self, data: Dict[str, Any]) -> Storyboard:
"""Convert dict to Storyboard instance"""
return Storyboard(
title=data["title"],
config=self._dict_to_config(data["config"]),
frames=[self._dict_to_frame(frame_data) for frame_data in data["frames"]],
content_metadata=self._dict_to_content_metadata(data["content_metadata"]) if data.get("content_metadata") else None,
final_video_path=data.get("final_video_path"),
total_duration=data.get("total_duration", 0.0),
created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else None,
completed_at=datetime.fromisoformat(data["completed_at"]) if data.get("completed_at") else None,
)
def _config_to_dict(self, config: StoryboardConfig) -> Dict[str, Any]:
"""Convert StoryboardConfig to dict"""
return {
"task_id": config.task_id,
"n_storyboard": config.n_storyboard,
"min_narration_words": config.min_narration_words,
"max_narration_words": config.max_narration_words,
"min_image_prompt_words": config.min_image_prompt_words,
"max_image_prompt_words": config.max_image_prompt_words,
"video_fps": config.video_fps,
"tts_inference_mode": config.tts_inference_mode,
"voice_id": config.voice_id,
"tts_workflow": config.tts_workflow,
"tts_speed": config.tts_speed,
"ref_audio": config.ref_audio,
"image_width": config.image_width,
"image_height": config.image_height,
"image_workflow": config.image_workflow,
"frame_template": config.frame_template,
"template_params": config.template_params,
}
def _dict_to_config(self, data: Dict[str, Any]) -> StoryboardConfig:
"""Convert dict to StoryboardConfig"""
return StoryboardConfig(
task_id=data.get("task_id"),
n_storyboard=data.get("n_storyboard", 5),
min_narration_words=data.get("min_narration_words", 5),
max_narration_words=data.get("max_narration_words", 20),
min_image_prompt_words=data.get("min_image_prompt_words", 30),
max_image_prompt_words=data.get("max_image_prompt_words", 60),
video_fps=data.get("video_fps", 30),
tts_inference_mode=data.get("tts_inference_mode", "local"),
voice_id=data.get("voice_id"),
tts_workflow=data.get("tts_workflow"),
tts_speed=data.get("tts_speed"),
ref_audio=data.get("ref_audio"),
image_width=data.get("image_width", 1024),
image_height=data.get("image_height", 1024),
image_workflow=data.get("image_workflow"),
frame_template=data.get("frame_template", "1080x1920/default.html"),
template_params=data.get("template_params"),
)
def _frame_to_dict(self, frame: StoryboardFrame) -> Dict[str, Any]:
"""Convert StoryboardFrame to dict"""
return {
"index": frame.index,
"narration": frame.narration,
"image_prompt": frame.image_prompt,
"audio_path": frame.audio_path,
"media_type": frame.media_type,
"image_path": frame.image_path,
"video_path": frame.video_path,
"composed_image_path": frame.composed_image_path,
"video_segment_path": frame.video_segment_path,
"duration": frame.duration,
"created_at": frame.created_at.isoformat() if frame.created_at else None,
}
def _dict_to_frame(self, data: Dict[str, Any]) -> StoryboardFrame:
"""Convert dict to StoryboardFrame"""
return StoryboardFrame(
index=data["index"],
narration=data["narration"],
image_prompt=data["image_prompt"],
audio_path=data.get("audio_path"),
media_type=data.get("media_type"),
image_path=data.get("image_path"),
video_path=data.get("video_path"),
composed_image_path=data.get("composed_image_path"),
video_segment_path=data.get("video_segment_path"),
duration=data.get("duration", 0.0),
created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else None,
)
def _content_metadata_to_dict(self, metadata: ContentMetadata) -> Dict[str, Any]:
"""Convert ContentMetadata to dict"""
return {
"title": metadata.title,
"author": metadata.author,
"subtitle": metadata.subtitle,
"genre": metadata.genre,
"summary": metadata.summary,
"publication_year": metadata.publication_year,
"cover_url": metadata.cover_url,
}
def _dict_to_content_metadata(self, data: Dict[str, Any]) -> ContentMetadata:
"""Convert dict to ContentMetadata"""
return ContentMetadata(
title=data["title"],
author=data.get("author"),
subtitle=data.get("subtitle"),
genre=data.get("genre"),
summary=data.get("summary"),
publication_year=data.get("publication_year"),
cover_url=data.get("cover_url"),
)

View File

@@ -24,7 +24,7 @@ dependencies = [
"fastapi>=0.115.0", "fastapi>=0.115.0",
"uvicorn[standard]>=0.32.0", "uvicorn[standard]>=0.32.0",
"python-multipart>=0.0.12", "python-multipart>=0.0.12",
"comfykit>=0.1.9", "comfykit>=0.1.10",
"beautifulsoup4>=4.14.2", "beautifulsoup4>=4.14.2",
] ]

833
uv.lock generated

File diff suppressed because it is too large Load Diff