Files
AI-Video/docs/工作流完整接入示例.md
empty c65b040fe3 fix: Filter non-serializable objects in pipeline metadata
- Skip CharacterMemory and callback functions during serialization
- Add .pids/ and .serena/ to gitignore
- Add workflow integration documentation

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-10 15:56:06 +08:00

434 lines
14 KiB
Markdown
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 工作流完整接入示例
# 📝 RunningHub AI 工作流交互使用手册workflow 版本)
## 1. 功能概述
本脚本通过调用 RunningHub AI 平台的 OpenAPI实现从本地加载工作流 JSON、修改节点信息、上传文件、提交任务并自动查询结果的全流程操作。
主要功能包括:
- 读取本地工作流配置JSON 文件)
- 生成可修改节点信息列表nodeInfoList
- 根据节点类型(图片、文本等)修改节点值
- 上传图片、音频、视频文件
- 向 RunningHub 提交任务并实时查询状态
- 输出最终生成结果的文件链接
✅ 适用于有自定义工作流workflowId的高级用户可在不打开网页的情况下自动执行 AI 工作流。
---
## 2. 文件说明与主要函数
### 💡 主要文件
| 文件名 | 功能 |
|-------------|------|
| workflow.py | 主执行脚本 |
| api.json | 从 RunningHub 下载的工作流配置文件(包含节点定义) |
### 🔧 核心函数介绍
| 函数名 | 功能描述 |
|--------|----------|
| load_json(file_path) | 从本地读取并解析工作流 JSON 文件 |
| convert_to_node_info_list(data) | 将 JSON 格式转换为节点信息列表 |
| upload_file(API_KEY, file_path) | 上传本地文件image/audio/video至 RunningHub |
| submit_task(workflowId, node_info_list, API_KEY) | 提交任务,启动 AI 工作流执行 |
| query_task_outputs(task_id, API_KEY) | 轮询任务执行状态并获取结果输出 |
---
## 3. 操作步骤详解
### Step 1输入必要信息
运行脚本后,系统会提示输入以下信息:
```text
请输入你的 api_key:
```
说明:在 RunningHub 控制台“API 调用”中可获得。
示例:`0s2d1***********2n3mk4`
```
请输入 workflowId:
```
示例:`1980468315921559554`
来源于链接末尾https://www.runninghub.cn/workflow/1980237776367083521?source=workspace
然后输入本地工作流 JSON 文件路径:
```
输入您的json文件地址(json文件一定要在自己的工作台中获得获得途径为导出工作流api到本地)
```
示例:`C:\Users\Mayn\Downloads\api.json`
此时脚本会输出工作流中的所有节点信息:
```
等待node_info_list生成包含所有可修改的节点
{'3': {'inputs': {...}}, '4': {...}, '6': {...}, ...}
```
---
### Step 2查看并修改节点
脚本会提示:
```text
请输入 nodeId输入 'exit' 结束修改):
```
输入节点 nodeId如 10脚本会展示该节点的所有字段
```
🧩 找到节点 10 的字段如下:
(0, {'nodeId': '10', 'fieldName': 'image', 'fieldValue': 'xxx.jpg'})
```
接着输入要修改的字段名:
```
请输入要修改的 fieldName:
```
示例:`image`
---
### Step 3修改字段值
#### 📷 如果是文件类型image/audio/video
```
请输入您本地image文件路径:
```
示例输入:`D:\R.jpg`
上传成功后:
```
等待文件上传中
上传结果: {'code': 0, 'msg': 'success', 'data': {'fileName': 'api/xxx.jpg', 'fileType': 'input'}}
✅ 已更新 image fieldValue: api/xxx.jpg
```
#### 📝 如果是文本或数值类型
```
请输入新的 fieldValue (text):
```
示例输入:`1 girl in classroom`
返回:
```
✅ 已更新 fieldValue: 1 girl in classroom
```
> 可多次修改不同节点,输入 `exit` 结束。
---
### Step 4提交任务
输入完成后,脚本自动提交任务:
```
开始提交任务,请等待
📌 提交任务返回: {'code': 0, 'msg': 'success', 'data': {...}}
📝 taskId: 1980471280073846785
✅ 无节点错误,任务提交成功。
```
---
### Step 5任务状态轮询
脚本每隔 5 秒查询任务状态:
```
⏳ 任务运行中...
⏳ 任务运行中...
🎉 生成结果完成!
```
如果任务失败,会打印详细原因:
```
❌ 任务失败!
节点 SaveImage 失败原因: 'str' object has no attribute 'shape'
Traceback: [...]
```
---
### Step 6查看结果文件
任务成功后会输出生成文件链接:
```
🎉 生成结果完成!
[{'fileUrl': 'https://rh-images.xiaoyaoyou.com/f24a6365b08fa3bc02f55cd1f63e74a7/output/ComfyUI_00001_hnqxe_1761016156.png',
'fileType': 'png',
'taskCostTime': '35',
'nodeId': '17'}]
✅ 任务完成!
```
打开 `fileUrl` 即可查看 AI 生成的图片。
## 4. 完整运行流程概览
1⃣ 输入 API_KEY 和 workflowId
2⃣ 加载本地 JSON 工作流
3⃣ 自动生成可修改节点列表
4⃣ 修改所需节点参数
5⃣ 上传文件(如图片)
6⃣ 提交任务至 RunningHub
7⃣ 轮询任务状态
8⃣ 获取并打印生成结果链接
---
## 5. 示例输出结果
```
请输入你的 api_key: a0fada**************b2ke21
请输入 workflowId: ***8315921559***
输入您的json文件地址(json文件一定要在自己的工作台中获得获得途径为导出工作流api到本地)C:\Users\Mayn\Downloads\api.json
```
```
🧩 找到节点 10 的字段如下:
(0, {'nodeId': '10', 'fieldName': 'image', 'fieldValue': 'xxx.jpg'})
✅ 已更新 image fieldValue: api/xxx.jpg
```
```
开始提交任务,请等待
📌 提交任务返回: {...}
⏳ 任务运行中...
🎉 生成结果完成!
✅ 任务完成!
```
---
## 6. 小贴士Tips
- 建议使用 Python 3.8+
- 脚本可直接在终端运行:
```bash
python workflow.py
```
- Windows 用户注意文件路径需使用双反斜杠 `\\`
- 若使用代理或云主机,请确保端口 443 可访问 `www.runninghub.cn`
```python
import http.client
import json
import mimetypes
from codecs import encode
import time
import os
import requests
API_HOST = "www.runninghub.cn"
def load_json(file_path):
# 打开并读取 JSON 文件
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f) # 将 JSON 内容解析为 Python 对象dict 或 list
# 打印读取到的数据
print(data)
return data
def convert_to_node_info_list(data):
node_info_list = []
for node_id, node_content in data.items():
inputs = node_content.get("inputs", {})
for field_name, field_value in inputs.items():
# 如果 field_value 是列表或字典,可以选择转换成字符串
if isinstance(field_value, (list, dict)):
field_value = json.dumps(field_value)
else:
field_value = str(field_value)
node_info_list.append({
"nodeId": str(node_id),
"fieldName": str(field_name),
"fieldValue": field_value
})
return node_info_list
def upload_file(API_KEY, file_path):
"""
上传文件到 RunningHub 平台
"""
url = "https://www.runninghub.cn/task/openapi/upload"
headers = {
'Host': 'www.runninghub.cn'
}
data = {
'apiKey': API_KEY,
'fileType': 'input'
}
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(url, headers=headers, files=files, data=data)
return response.json()
# 1⃣ 提交任务
def submit_task(workflowId, node_info_list,API_KEY):
conn = http.client.HTTPSConnection("www.runninghub.cn")
payload = json.dumps({
"apiKey": API_KEY,
"workflowId": workflowId,
"nodeInfoList": node_info_list
})
headers = {
'Host': 'www.runninghub.cn',
'Content-Type': 'application/json'
}
conn.request("POST", "/task/openapi/create", payload, headers)
res = conn.getresponse()
data = res.read()
# ✅ 注意这里:用 json.loads 而不是 json.load
data = json.loads(data.decode("utf-8"))
print(data)
return data
def query_task_outputs(task_id,API_KEY):
conn = http.client.HTTPSConnection(API_HOST)
payload = json.dumps({
"apiKey": API_KEY,
"taskId": task_id
})
headers = {
'Host': API_HOST,
'Content-Type': 'application/json'
}
conn.request("POST", "/task/openapi/outputs", payload, headers)
res = conn.getresponse()
data = json.loads(res.read().decode("utf-8"))
conn.close()
return data
if __name__ == "__main__":
print("下面两个输入用于获得AI工作流所需要的信息api_key为用户的密钥从api调用——进入控制台中获得workflowId此为示例具体的workflowId为你所选择的AI工作流界面上方的链接https://www.runninghub.cn/workflow/1980237776367083521?source=workspace最后的数字为workflowId")
Api_key = input("请输入你的 api_key: ").strip()
workflowId = input("请输入 workflowId: ").strip()
print("请您下载您的工作流API json到本地")
file_path = input("输入您的json文件地址(json文件一定要在自己的工作台中获得获得途径为导出工作流api到本地)").strip()
print("等待node_info_list生成包涵所有的可以修改的node节点")
data = load_json(file_path)
node_info_list = convert_to_node_info_list(data)
print(node_info_list)
print("下面用户可以输入工作流可以修改的节点idnodeId,以及对应的fileName,锁定具体的节点位置在找到具体位置之后输入您需要修改的fileValue信息完成信息的修改用户发送AI工作流请求")
modified_nodes = []
while True:
node_id_input = input("请输入 nodeId输入 'exit' 结束修改): ").strip()
if node_id_input.lower() == "exit":
break
# 找出该 nodeId 对应的所有字段
node_fields = [n for n in node_info_list if n['nodeId'] == node_id_input]
if not node_fields:
print("❌ 未找到该 nodeId 对应的节点")
continue
print(f"\n🧩 找到节点 {node_id_input} 的字段如下:")
for field in enumerate(node_fields):
print(field)
# 让用户选择要修改的字段
field_name_input = input("\n请输入要修改的 fieldName: ").strip()
target_node = next(
(f for f in node_fields if f['fieldName'] == field_name_input), None
)
if not target_node:
print("❌ 未找到该 fieldName")
continue
print(f"选中字段: {target_node}")
# 根据类型处理
if target_node['fieldName'] in ["image", "audio", "video"]:
file_path = input(f"请输入您本地{target_node['fieldName']}文件路径: ").strip()
print("等待文件上传中")
upload_result = upload_file(Api_key, file_path)
print("上传结果:", upload_result)
# 假设 upload_file 已返回解析后的 JSON 字典
if upload_result and upload_result.get("msg") == "success":
uploaded_file_name = upload_result.get("data", {}).get("fileName")
if uploaded_file_name:
target_node['fieldValue'] = uploaded_file_name
print(f"✅ 已更新 {target_node['fieldName']} fieldValue:", uploaded_file_name)
else:
print("❌ 上传失败或返回格式异常:", upload_result)
else:
# 其他类型直接修改
new_value = input(f"请输入新的 fieldValue ({target_node['fieldName']}): ").strip()
target_node['fieldValue'] = new_value
print("✅ 已更新 fieldValue:", new_value)
modified_nodes.append({
"nodeId": target_node['nodeId'],
"fieldName": target_node['fieldName'],
"fieldValue": target_node['fieldValue']
})
print(modified_nodes)
print("开始提交任务,请等待")
# 提交任务
submit_result = submit_task(workflowId, modified_nodes,Api_key)
print("📌 提交任务返回:", submit_result)
if submit_result.get("code") != 0:
print("❌ 提交任务失败:", submit_result)
exit()
task_id = submit_result["data"]["taskId"]
print(f"📝 taskId: {task_id}")
# 解析成功返回
prompt_tips_str = submit_result["data"].get("promptTips")
if prompt_tips_str:
try:
prompt_tips = json.loads(prompt_tips_str)
node_errors = prompt_tips.get("node_errors", {})
if node_errors:
print("⚠️ 节点错误信息如下:")
for node_id, err in node_errors.items():
print(f" 节点 {node_id} 错误: {err}")
else:
print("✅ 无节点错误,任务提交成功。")
except Exception as e:
print("⚠️ 无法解析 promptTips:", e)
else:
print("⚠️ 未返回 promptTips 字段。")
timeout = 600
start_time = time.time()
while True:
outputs_result = query_task_outputs(task_id, Api_key)
code = outputs_result.get("code")
msg = outputs_result.get("msg")
data = outputs_result.get("data")
if code == 0 and data: # 成功
file_url = data[0].get("fileUrl")
print("🎉 生成结果完成!")
print(data)
break
elif code == 805: # 任务失败
failed_reason = data.get("failedReason") if data else None
print("❌ 任务失败!")
if failed_reason:
print(f"节点 {failed_reason.get('node_name')} 失败原因: {failed_reason.get('exception_message')}")
print("Traceback:", failed_reason.get("traceback"))
else:
print(outputs_result)
break
elif code == 804 or code == 813: # 运行中或排队中
status_text = "运行中" if code == 804 else "排队中"
print(f"⏳ 任务{status_text}...")
else:
print("⚠️ 未知状态:", outputs_result)
# 超时检查
if time.time() - start_time > timeout:
print("⏰ 等待超时超过10分钟任务未完成。")
break
time.sleep(5)
print("✅ 任务完成!")
```