fix: Filter non-serializable objects in pipeline metadata

- Skip CharacterMemory and callback functions during serialization
- Add .pids/ and .serena/ to gitignore
- Add workflow integration documentation

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
empty
2026-01-10 15:56:06 +08:00
parent be2639c596
commit c65b040fe3
3 changed files with 457 additions and 6 deletions

2
.gitignore vendored
View File

@@ -76,3 +76,5 @@ examples/
repositories/ repositories/
*.out *.out
.pids/
.serena/

View File

@@ -0,0 +1,434 @@
# 工作流完整接入示例
# 📝 RunningHub AI 工作流交互使用手册workflow 版本)
## 1. 功能概述
本脚本通过调用 RunningHub AI 平台的 OpenAPI实现从本地加载工作流 JSON、修改节点信息、上传文件、提交任务并自动查询结果的全流程操作。
主要功能包括:
- 读取本地工作流配置JSON 文件)
- 生成可修改节点信息列表nodeInfoList
- 根据节点类型(图片、文本等)修改节点值
- 上传图片、音频、视频文件
- 向 RunningHub 提交任务并实时查询状态
- 输出最终生成结果的文件链接
✅ 适用于有自定义工作流workflowId的高级用户可在不打开网页的情况下自动执行 AI 工作流。
---
## 2. 文件说明与主要函数
### 💡 主要文件
| 文件名 | 功能 |
|-------------|------|
| workflow.py | 主执行脚本 |
| api.json | 从 RunningHub 下载的工作流配置文件(包含节点定义) |
### 🔧 核心函数介绍
| 函数名 | 功能描述 |
|--------|----------|
| load_json(file_path) | 从本地读取并解析工作流 JSON 文件 |
| convert_to_node_info_list(data) | 将 JSON 格式转换为节点信息列表 |
| upload_file(API_KEY, file_path) | 上传本地文件image/audio/video至 RunningHub |
| submit_task(workflowId, node_info_list, API_KEY) | 提交任务,启动 AI 工作流执行 |
| query_task_outputs(task_id, API_KEY) | 轮询任务执行状态并获取结果输出 |
---
## 3. 操作步骤详解
### Step 1输入必要信息
运行脚本后,系统会提示输入以下信息:
```text
请输入你的 api_key:
```
说明:在 RunningHub 控制台“API 调用”中可获得。
示例:`0s2d1***********2n3mk4`
```
请输入 workflowId:
```
示例:`1980468315921559554`
来源于链接末尾https://www.runninghub.cn/workflow/1980237776367083521?source=workspace
然后输入本地工作流 JSON 文件路径:
```
输入您的json文件地址(json文件一定要在自己的工作台中获得获得途径为导出工作流api到本地)
```
示例:`C:\Users\Mayn\Downloads\api.json`
此时脚本会输出工作流中的所有节点信息:
```
等待node_info_list生成包含所有可修改的节点
{'3': {'inputs': {...}}, '4': {...}, '6': {...}, ...}
```
---
### Step 2查看并修改节点
脚本会提示:
```text
请输入 nodeId输入 'exit' 结束修改):
```
输入节点 nodeId如 10脚本会展示该节点的所有字段
```
🧩 找到节点 10 的字段如下:
(0, {'nodeId': '10', 'fieldName': 'image', 'fieldValue': 'xxx.jpg'})
```
接着输入要修改的字段名:
```
请输入要修改的 fieldName:
```
示例:`image`
---
### Step 3修改字段值
#### 📷 如果是文件类型image/audio/video
```
请输入您本地image文件路径:
```
示例输入:`D:\R.jpg`
上传成功后:
```
等待文件上传中
上传结果: {'code': 0, 'msg': 'success', 'data': {'fileName': 'api/xxx.jpg', 'fileType': 'input'}}
✅ 已更新 image fieldValue: api/xxx.jpg
```
#### 📝 如果是文本或数值类型
```
请输入新的 fieldValue (text):
```
示例输入:`1 girl in classroom`
返回:
```
✅ 已更新 fieldValue: 1 girl in classroom
```
> 可多次修改不同节点,输入 `exit` 结束。
---
### Step 4提交任务
输入完成后,脚本自动提交任务:
```
开始提交任务,请等待
📌 提交任务返回: {'code': 0, 'msg': 'success', 'data': {...}}
📝 taskId: 1980471280073846785
✅ 无节点错误,任务提交成功。
```
---
### Step 5任务状态轮询
脚本每隔 5 秒查询任务状态:
```
⏳ 任务运行中...
⏳ 任务运行中...
🎉 生成结果完成!
```
如果任务失败,会打印详细原因:
```
❌ 任务失败!
节点 SaveImage 失败原因: 'str' object has no attribute 'shape'
Traceback: [...]
```
---
### Step 6查看结果文件
任务成功后会输出生成文件链接:
```
🎉 生成结果完成!
[{'fileUrl': 'https://rh-images.xiaoyaoyou.com/f24a6365b08fa3bc02f55cd1f63e74a7/output/ComfyUI_00001_hnqxe_1761016156.png',
'fileType': 'png',
'taskCostTime': '35',
'nodeId': '17'}]
✅ 任务完成!
```
打开 `fileUrl` 即可查看 AI 生成的图片。
## 4. 完整运行流程概览
1⃣ 输入 API_KEY 和 workflowId
2⃣ 加载本地 JSON 工作流
3⃣ 自动生成可修改节点列表
4⃣ 修改所需节点参数
5⃣ 上传文件(如图片)
6⃣ 提交任务至 RunningHub
7⃣ 轮询任务状态
8⃣ 获取并打印生成结果链接
---
## 5. 示例输出结果
```
请输入你的 api_key: a0fada**************b2ke21
请输入 workflowId: ***8315921559***
输入您的json文件地址(json文件一定要在自己的工作台中获得获得途径为导出工作流api到本地)C:\Users\Mayn\Downloads\api.json
```
```
🧩 找到节点 10 的字段如下:
(0, {'nodeId': '10', 'fieldName': 'image', 'fieldValue': 'xxx.jpg'})
✅ 已更新 image fieldValue: api/xxx.jpg
```
```
开始提交任务,请等待
📌 提交任务返回: {...}
⏳ 任务运行中...
🎉 生成结果完成!
✅ 任务完成!
```
---
## 6. 小贴士Tips
- 建议使用 Python 3.8+
- 脚本可直接在终端运行:
```bash
python workflow.py
```
- Windows 用户注意文件路径需使用双反斜杠 `\\`
- 若使用代理或云主机,请确保端口 443 可访问 `www.runninghub.cn`
```python
import http.client
import json
import mimetypes
from codecs import encode
import time
import os
import requests
API_HOST = "www.runninghub.cn"
def load_json(file_path):
# 打开并读取 JSON 文件
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f) # 将 JSON 内容解析为 Python 对象dict 或 list
# 打印读取到的数据
print(data)
return data
def convert_to_node_info_list(data):
node_info_list = []
for node_id, node_content in data.items():
inputs = node_content.get("inputs", {})
for field_name, field_value in inputs.items():
# 如果 field_value 是列表或字典,可以选择转换成字符串
if isinstance(field_value, (list, dict)):
field_value = json.dumps(field_value)
else:
field_value = str(field_value)
node_info_list.append({
"nodeId": str(node_id),
"fieldName": str(field_name),
"fieldValue": field_value
})
return node_info_list
def upload_file(API_KEY, file_path):
"""
上传文件到 RunningHub 平台
"""
url = "https://www.runninghub.cn/task/openapi/upload"
headers = {
'Host': 'www.runninghub.cn'
}
data = {
'apiKey': API_KEY,
'fileType': 'input'
}
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, headers=headers, files=files, data=data)
return response.json()
# 1⃣ 提交任务
def submit_task(workflowId, node_info_list,API_KEY):
conn = http.client.HTTPSConnection("www.runninghub.cn")
payload = json.dumps({
"apiKey": API_KEY,
"workflowId": workflowId,
"nodeInfoList": node_info_list
})
headers = {
'Host': 'www.runninghub.cn',
'Content-Type': 'application/json'
}
conn.request("POST", "/task/openapi/create", payload, headers)
res = conn.getresponse()
data = res.read()
# ✅ 注意这里:用 json.loads 而不是 json.load
data = json.loads(data.decode("utf-8"))
print(data)
return data
def query_task_outputs(task_id,API_KEY):
conn = http.client.HTTPSConnection(API_HOST)
payload = json.dumps({
"apiKey": API_KEY,
"taskId": task_id
})
headers = {
'Host': API_HOST,
'Content-Type': 'application/json'
}
conn.request("POST", "/task/openapi/outputs", payload, headers)
res = conn.getresponse()
data = json.loads(res.read().decode("utf-8"))
conn.close()
return data
if __name__ == "__main__":
print("下面两个输入用于获得AI工作流所需要的信息api_key为用户的密钥从api调用——进入控制台中获得workflowId此为示例具体的workflowId为你所选择的AI工作流界面上方的链接https://www.runninghub.cn/workflow/1980237776367083521?source=workspace最后的数字为workflowId")
Api_key = input("请输入你的 api_key: ").strip()
workflowId = input("请输入 workflowId: ").strip()
print("请您下载您的工作流API json到本地")
file_path = input("输入您的json文件地址(json文件一定要在自己的工作台中获得获得途径为导出工作流api到本地)").strip()
print("等待node_info_list生成包涵所有的可以修改的node节点")
data = load_json(file_path)
node_info_list = convert_to_node_info_list(data)
print(node_info_list)
print("下面用户可以输入工作流可以修改的节点idnodeId,以及对应的fileName,锁定具体的节点位置在找到具体位置之后输入您需要修改的fileValue信息完成信息的修改用户发送AI工作流请求")
modified_nodes = []
while True:
node_id_input = input("请输入 nodeId输入 'exit' 结束修改): ").strip()
if node_id_input.lower() == "exit":
break
# 找出该 nodeId 对应的所有字段
node_fields = [n for n in node_info_list if n['nodeId'] == node_id_input]
if not node_fields:
print("❌ 未找到该 nodeId 对应的节点")
continue
print(f"\n🧩 找到节点 {node_id_input} 的字段如下:")
for field in enumerate(node_fields):
print(field)
# 让用户选择要修改的字段
field_name_input = input("\n请输入要修改的 fieldName: ").strip()
target_node = next(
(f for f in node_fields if f['fieldName'] == field_name_input), None
)
if not target_node:
print("❌ 未找到该 fieldName")
continue
print(f"选中字段: {target_node}")
# 根据类型处理
if target_node['fieldName'] in ["image", "audio", "video"]:
file_path = input(f"请输入您本地{target_node['fieldName']}文件路径: ").strip()
print("等待文件上传中")
upload_result = upload_file(Api_key, file_path)
print("上传结果:", upload_result)
# 假设 upload_file 已返回解析后的 JSON 字典
if upload_result and upload_result.get("msg") == "success":
uploaded_file_name = upload_result.get("data", {}).get("fileName")
if uploaded_file_name:
target_node['fieldValue'] = uploaded_file_name
print(f"✅ 已更新 {target_node['fieldName']} fieldValue:", uploaded_file_name)
else:
print("❌ 上传失败或返回格式异常:", upload_result)
else:
# 其他类型直接修改
new_value = input(f"请输入新的 fieldValue ({target_node['fieldName']}): ").strip()
target_node['fieldValue'] = new_value
print("✅ 已更新 fieldValue:", new_value)
modified_nodes.append({
"nodeId": target_node['nodeId'],
"fieldName": target_node['fieldName'],
"fieldValue": target_node['fieldValue']
})
print(modified_nodes)
print("开始提交任务,请等待")
# 提交任务
submit_result = submit_task(workflowId, modified_nodes,Api_key)
print("📌 提交任务返回:", submit_result)
if submit_result.get("code") != 0:
print("❌ 提交任务失败:", submit_result)
exit()
task_id = submit_result["data"]["taskId"]
print(f"📝 taskId: {task_id}")
# 解析成功返回
prompt_tips_str = submit_result["data"].get("promptTips")
if prompt_tips_str:
try:
prompt_tips = json.loads(prompt_tips_str)
node_errors = prompt_tips.get("node_errors", {})
if node_errors:
print("⚠️ 节点错误信息如下:")
for node_id, err in node_errors.items():
print(f" 节点 {node_id} 错误: {err}")
else:
print("✅ 无节点错误,任务提交成功。")
except Exception as e:
print("⚠️ 无法解析 promptTips:", e)
else:
print("⚠️ 未返回 promptTips 字段。")
timeout = 600
start_time = time.time()
while True:
outputs_result = query_task_outputs(task_id, Api_key)
code = outputs_result.get("code")
msg = outputs_result.get("msg")
data = outputs_result.get("data")
if code == 0 and data: # 成功
file_url = data[0].get("fileUrl")
print("🎉 生成结果完成!")
print(data)
break
elif code == 805: # 任务失败
failed_reason = data.get("failedReason") if data else None
print("❌ 任务失败!")
if failed_reason:
print(f"节点 {failed_reason.get('node_name')} 失败原因: {failed_reason.get('exception_message')}")
print("Traceback:", failed_reason.get("traceback"))
else:
print(outputs_result)
break
elif code == 804 or code == 813: # 运行中或排队中
status_text = "运行中" if code == 804 else "排队中"
print(f"⏳ 任务{status_text}...")
else:
print("⚠️ 未知状态:", outputs_result)
# 超时检查
if time.time() - start_time > timeout:
print("⏰ 等待超时超过10分钟任务未完成。")
break
time.sleep(5)
print("✅ 任务完成!")
```

View File

@@ -495,11 +495,26 @@ class StandardPipeline(LinearVideoPipeline):
logger.warning("No task_id in storyboard, skipping persistence") logger.warning("No task_id in storyboard, skipping persistence")
return return
# Build metadata # Build metadata - filter out non-serializable objects
input_with_title = ctx.params.copy() clean_input = {}
input_with_title["text"] = ctx.input_text # Ensure text is included for key, value in ctx.params.items():
if not input_with_title.get("title"): # Skip non-serializable objects like CharacterMemory
input_with_title["title"] = storyboard.title if key == "character_memory":
# Convert to serializable dict if present
if value is not None and hasattr(value, 'to_dict'):
clean_input["character_memory"] = value.to_dict()
elif key == "progress_callback":
# Skip callback functions
continue
elif callable(value):
# Skip any callable objects
continue
else:
clean_input[key] = value
clean_input["text"] = ctx.input_text # Ensure text is included
if not clean_input.get("title"):
clean_input["title"] = storyboard.title
metadata = { metadata = {
"task_id": task_id, "task_id": task_id,
@@ -507,7 +522,7 @@ class StandardPipeline(LinearVideoPipeline):
"completed_at": storyboard.completed_at.isoformat() if storyboard.completed_at else None, "completed_at": storyboard.completed_at.isoformat() if storyboard.completed_at else None,
"status": "completed", "status": "completed",
"input": input_with_title, "input": clean_input,
"result": { "result": {
"video_path": result.video_path, "video_path": result.video_path,