Files
AI-Video/pixelle_video/services/image_analysis.py

198 lines
7.9 KiB
Python

# Copyright (C) 2025 AIDC-AI
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Image Analysis Service - ComfyUI Workflow-based implementation
Uses Florence-2 or other vision models to analyze images and generate descriptions.
"""
from typing import Optional, Literal
from pathlib import Path
from comfykit import ComfyKit
from loguru import logger
from pixelle_video.services.comfy_base_service import ComfyBaseService
class ImageAnalysisService(ComfyBaseService):
"""
Image analysis service - Workflow-based
Uses ComfyKit to execute image analysis workflows (e.g., Florence-2, BLIP, etc.).
Returns detailed textual descriptions of images.
Convention: workflows follow {source}/analyse_image.json pattern
- runninghub/analyse_image.json (default, cloud-based)
- selfhost/analyse_image.json (local ComfyUI)
Usage:
# Use default (runninghub cloud)
description = await pixelle_video.image_analysis("path/to/image.jpg")
# Use local ComfyUI
description = await pixelle_video.image_analysis(
"path/to/image.jpg",
source="selfhost"
)
# List available workflows
workflows = pixelle_video.image_analysis.list_workflows()
"""
WORKFLOW_PREFIX = "analyse_"
WORKFLOWS_DIR = "workflows"
def __init__(self, config: dict, core=None):
"""
Initialize image analysis service
Args:
config: Full application config dict
core: PixelleVideoCore instance (for accessing shared ComfyKit)
"""
super().__init__(config, service_name="image_analysis", core=core)
async def __call__(
self,
image_path: str,
# Workflow source selection
source: Literal['runninghub', 'selfhost'] = 'runninghub',
workflow: Optional[str] = None,
# ComfyUI connection (optional overrides)
comfyui_url: Optional[str] = None,
runninghub_api_key: Optional[str] = None,
# Additional workflow parameters
**params
) -> str:
"""
Analyze an image using workflow
Args:
image_path: Path to the image file (local or URL)
source: Workflow source - 'runninghub' (cloud, default) or 'selfhost' (local ComfyUI)
workflow: Workflow filename (optional, overrides source-based resolution)
comfyui_url: ComfyUI URL (optional, overrides config)
runninghub_api_key: RunningHub API key (optional, overrides config)
**params: Additional workflow parameters
Returns:
str: Text description of the image
Examples:
# Simplest: use default (runninghub cloud)
description = await pixelle_video.image_analysis("temp/06.JPG")
# Use local ComfyUI
description = await pixelle_video.image_analysis(
"temp/06.JPG",
source="selfhost"
)
# Use specific workflow (bypass source-based resolution)
description = await pixelle_video.image_analysis(
"temp/06.JPG",
workflow="selfhost/custom_analysis.json"
)
"""
from pixelle_video.utils.workflow_util import resolve_workflow_path
# 1. Validate image path
image_path_obj = Path(image_path)
if not image_path_obj.exists():
raise FileNotFoundError(f"Image file not found: {image_path}")
# 2. Resolve workflow path using convention
if workflow is None:
# Use standardized naming: {source}/analyse_image.json
workflow = resolve_workflow_path("analyse_image", source)
logger.info(f"Using {source} workflow: {workflow}")
# 2. Resolve workflow (returns structured info)
workflow_info = self._resolve_workflow(workflow=workflow)
# 3. Build workflow parameters
workflow_params = {
"image": str(image_path) # Pass image path to workflow
}
# Add any additional parameters
workflow_params.update(params)
logger.debug(f"Workflow parameters: {workflow_params}")
# 4. Execute workflow using shared ComfyKit instance from core
try:
# Get shared ComfyKit instance (lazy initialization + config hot-reload)
kit = await self.core._get_or_create_comfykit()
# Determine what to pass to ComfyKit based on source
if workflow_info["source"] == "runninghub" and "workflow_id" in workflow_info:
# RunningHub: pass workflow_id
workflow_input = workflow_info["workflow_id"]
logger.info(f"Executing RunningHub workflow: {workflow_input}")
else:
# Selfhost: pass file path
workflow_input = workflow_info["path"]
logger.info(f"Executing selfhost workflow: {workflow_input}")
result = await kit.execute(workflow_input, workflow_params)
# 5. Extract description from result
if result.status != "completed":
error_msg = result.msg or "Unknown error"
logger.error(f"Image analysis failed: {error_msg}")
raise Exception(f"Image analysis failed: {error_msg}")
# Extract text description from result (format varies by source)
description = None
# Try format 1: Selfhost outputs (direct text in outputs)
# Format: {'6': {'text': ['description text']}}
if result.outputs:
for node_id, node_output in result.outputs.items():
if 'text' in node_output:
text_list = node_output['text']
if text_list and len(text_list) > 0:
description = text_list[0]
break
# Try format 2: RunningHub raw_data (text file URL)
# Format: {'raw_data': [{'fileUrl': 'https://...txt', 'fileType': 'txt', ...}]}
if not description and result.outputs and 'raw_data' in result.outputs:
raw_data = result.outputs['raw_data']
if raw_data and len(raw_data) > 0:
# Find text file entry
for item in raw_data:
if item.get('fileType') == 'txt' and 'fileUrl' in item:
# Download text content from URL
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(item['fileUrl']) as resp:
if resp.status == 200:
description = await resp.text()
description = description.strip()
break
if not description:
logger.error(f"No text found in outputs: {result.outputs}")
raise Exception("No description generated")
logger.info(f"✅ Image analyzed: {description[:100]}...")
return description
except Exception as e:
logger.error(f"Image analysis error: {e}")
raise