Files
AI-Video/pixelle_video/services/comfy_base_service.py

315 lines
11 KiB
Python

# Copyright (C) 2025 AIDC-AI
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
ComfyUI Base Service - Common logic for ComfyUI-based services
"""
import json
import os
from pathlib import Path
from typing import Optional, List, Dict, Any
from comfykit import ComfyKit
from loguru import logger
from pixelle_video.utils.os_util import (
get_resource_path,
list_resource_files,
list_resource_dirs
)
class ComfyBaseService:
"""
Base service for ComfyUI workflow-based capabilities
Provides common functionality for TTS, Image, and other ComfyUI-based services.
Subclasses should define:
- WORKFLOW_PREFIX: Prefix for workflow files (e.g., "image_", "tts_")
- DEFAULT_WORKFLOW: Default workflow filename (e.g., "image_flux.json")
- WORKFLOWS_DIR: Directory containing workflows (default: "workflows")
"""
WORKFLOW_PREFIX: str = "" # Must be overridden by subclass
DEFAULT_WORKFLOW: str = "" # Must be overridden by subclass
WORKFLOWS_DIR: str = "workflows"
def __init__(self, config: dict, service_name: str, core=None):
"""
Initialize ComfyUI base service
Args:
config: Full application config dict
service_name: Service name in config (e.g., "tts", "image")
core: PixelleVideoCore instance (for accessing shared ComfyKit)
"""
# Service-specific config (e.g., config["comfyui"]["tts"])
comfyui_config = config.get("comfyui", {})
self.config = comfyui_config.get(service_name, {})
# Global ComfyUI config (for comfyui_url and runninghub_api_key)
self.global_config = comfyui_config
self.service_name = service_name
self._workflows_cache: Optional[List[str]] = None
# Reference to core (for accessing shared ComfyKit)
self.core = core
def _scan_workflows(self) -> List[Dict[str, Any]]:
"""
Scan workflows/source/*.json files from all source directories (merged from workflows/ and data/workflows/)
Returns:
List of workflow info dicts
Example: [
{
"name": "image_flux.json",
"display_name": "image_flux.json - Selfhost",
"source": "selfhost",
"path": "workflows/selfhost/image_flux.json",
"key": "selfhost/image_flux.json"
},
{
"name": "image_flux.json",
"display_name": "image_flux.json - Runninghub",
"source": "runninghub",
"path": "workflows/runninghub/image_flux.json",
"key": "runninghub/image_flux.json",
"workflow_id": "123456"
}
]
"""
workflows = []
# Get all workflow source directories (merged from workflows/ and data/workflows/)
source_dirs = list_resource_dirs("workflows")
if not source_dirs:
logger.warning("No workflow source directories found")
return workflows
# Scan each source directory for workflow files
for source_name in source_dirs:
# Get all JSON files for this source (merged from both locations)
workflow_files = list_resource_files("workflows", source_name)
# Filter to only files matching the prefix
matching_files = [
f for f in workflow_files
if f.startswith(self.WORKFLOW_PREFIX) and f.endswith('.json')
]
for filename in matching_files:
try:
# Get actual file path (custom > default)
file_path = Path(get_resource_path("workflows", source_name, filename))
workflow_info = self._parse_workflow_file(file_path, source_name)
workflows.append(workflow_info)
logger.debug(f"Found workflow: {workflow_info['key']}")
except Exception as e:
logger.error(f"Failed to parse workflow {source_name}/{filename}: {e}")
# Sort by key (source/name)
return sorted(workflows, key=lambda w: w["key"])
def _parse_workflow_file(self, file_path: Path, source: str) -> Dict[str, Any]:
"""
Parse workflow file and extract metadata
Args:
file_path: Path to workflow JSON file
source: Source directory name (e.g., "selfhost", "runninghub")
Returns:
Workflow info dict with structure:
{
"name": "image_flux.json",
"display_name": "image_flux.json - Runninghub",
"source": "runninghub",
"path": "workflows/runninghub/image_flux.json",
"key": "runninghub/image_flux.json",
"workflow_id": "123456" # Only for RunningHub
}
"""
with open(file_path, 'r', encoding='utf-8') as f:
content = json.load(f)
# Build base info
workflow_info = {
"name": file_path.name,
"display_name": f"{file_path.name} - {source.title()}",
"source": source,
"path": str(file_path),
"key": f"{source}/{file_path.name}"
}
# Check if it's a wrapper format (RunningHub, etc.)
if "source" in content:
# Wrapper format: {"source": "runninghub", "workflow_id": "xxx", ...}
if "workflow_id" in content:
workflow_info["workflow_id"] = content["workflow_id"]
return workflow_info
def _get_default_workflow(self) -> str:
"""
Get default workflow from config (required, no fallback)
Returns:
Default workflow key (e.g., "runninghub/image_flux.json")
Raises:
ValueError: If default_workflow not configured
"""
default_workflow = self.config.get("default_workflow")
if not default_workflow:
raise ValueError(
f"No default workflow configured for {self.service_name}. "
f"Please set 'default_workflow' in config.yaml under '{self.service_name}' section. "
f"Available workflows: {', '.join(self.available)}"
)
return default_workflow
def _resolve_workflow(self, workflow: Optional[str] = None) -> Dict[str, Any]:
"""
Resolve workflow key to workflow info
Args:
workflow: Workflow key (e.g., "runninghub/image_flux.json")
If None, uses default from config
Returns:
Workflow info dict with structure:
{
"name": "image_flux.json",
"display_name": "image_flux.json - Runninghub",
"source": "runninghub",
"path": "workflows/runninghub/image_flux.json",
"key": "runninghub/image_flux.json",
"workflow_id": "123456" # Only for RunningHub
}
Raises:
ValueError: If workflow not found
"""
# 1. If not specified, use default from config
if workflow is None:
workflow = self._get_default_workflow()
# 2. Scan available workflows
available_workflows = self._scan_workflows()
# 3. Find matching workflow by key
for wf_info in available_workflows:
if wf_info["key"] == workflow:
logger.info(f"🎬 Using {self.service_name} workflow: {workflow}")
return wf_info
# 4. Not found - generate error message
available_keys = [wf["key"] for wf in available_workflows]
available_str = ", ".join(available_keys) if available_keys else "none"
raise ValueError(
f"Workflow '{workflow}' not found. "
f"Available workflows: {available_str}"
)
def _prepare_comfykit_config(
self,
comfyui_url: Optional[str] = None,
runninghub_api_key: Optional[str] = None,
) -> Dict[str, Any]:
"""
Prepare ComfyKit configuration
Args:
comfyui_url: ComfyUI URL (optional, overrides config)
runninghub_api_key: RunningHub API key (optional, overrides config)
Returns:
ComfyKit configuration dict
"""
kit_config = {}
# ComfyUI URL (priority: param > global config > env > default)
final_comfyui_url = (
comfyui_url
or self.global_config.get("comfyui_url")
or os.getenv("COMFYUI_BASE_URL")
or "http://127.0.0.1:8188"
)
kit_config["comfyui_url"] = final_comfyui_url
# RunningHub API key (priority: param > global config > env)
final_rh_key = (
runninghub_api_key
or self.global_config.get("runninghub_api_key")
or os.getenv("RUNNINGHUB_API_KEY")
)
if final_rh_key:
kit_config["runninghub_api_key"] = final_rh_key
logger.debug(f"ComfyKit config: {kit_config}")
return kit_config
def list_workflows(self) -> List[Dict[str, Any]]:
"""
List all available workflows with full metadata
Returns:
List of workflow info dicts (sorted by key)
Example:
workflows = service.list_workflows()
# [
# {
# "name": "image_flux.json",
# "display_name": "image_flux.json - Runninghub",
# "source": "runninghub",
# "path": "workflows/runninghub/image_flux.json",
# "key": "runninghub/image_flux.json",
# "workflow_id": "123456"
# },
# ...
# ]
"""
return self._scan_workflows()
@property
def available(self) -> List[str]:
"""
List available workflow keys
Returns:
List of available workflow keys (e.g., ["runninghub/image_flux.json", ...])
Example:
print(f"Available workflows: {service.available}")
"""
workflows = self.list_workflows()
return [wf["key"] for wf in workflows]
def __repr__(self) -> str:
"""String representation"""
default = self._get_default_workflow()
available = ", ".join(self.available) if self.available else "none"
return (
f"<{self.__class__.__name__} "
f"default={default!r} "
f"available=[{available}]>"
)