Files
AI-Video/docs/工作流完整接入示例.md
empty c65b040fe3 fix: Filter non-serializable objects in pipeline metadata
- Skip CharacterMemory and callback functions during serialization
- Add .pids/ and .serena/ to gitignore
- Add workflow integration documentation

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-10 15:56:06 +08:00

14 KiB
Raw Permalink Blame History

工作流完整接入示例

📝 RunningHub AI 工作流交互使用手册workflow 版本)

1. 功能概述

本脚本通过调用 RunningHub AI 平台的 OpenAPI实现从本地加载工作流 JSON、修改节点信息、上传文件、提交任务并自动查询结果的全流程操作。

主要功能包括:

  • 读取本地工作流配置JSON 文件)
  • 生成可修改节点信息列表nodeInfoList
  • 根据节点类型(图片、文本等)修改节点值
  • 上传图片、音频、视频文件
  • 向 RunningHub 提交任务并实时查询状态
  • 输出最终生成结果的文件链接

适用于有自定义工作流workflowId的高级用户可在不打开网页的情况下自动执行 AI 工作流。


2. 文件说明与主要函数

💡 主要文件

文件名 功能
workflow.py 主执行脚本
api.json 从 RunningHub 下载的工作流配置文件(包含节点定义)

🔧 核心函数介绍

函数名 功能描述
load_json(file_path) 从本地读取并解析工作流 JSON 文件
convert_to_node_info_list(data) 将 JSON 格式转换为节点信息列表
upload_file(API_KEY, file_path) 上传本地文件image/audio/video至 RunningHub
submit_task(workflowId, node_info_list, API_KEY) 提交任务,启动 AI 工作流执行
query_task_outputs(task_id, API_KEY) 轮询任务执行状态并获取结果输出

3. 操作步骤详解

Step 1:输入必要信息

运行脚本后,系统会提示输入以下信息:

请输入你的 api_key:

说明:在 RunningHub 控制台“API 调用”中可获得。 示例:0s2d1***********2n3mk4

请输入 workflowId:

示例:1980468315921559554 来源于链接末尾:https://www.runninghub.cn/workflow/1980237776367083521?source=workspace

然后输入本地工作流 JSON 文件路径:

输入您的json文件地址(json文件一定要在自己的工作台中获得获得途径为导出工作流api到本地)

示例:C:\Users\Mayn\Downloads\api.json

此时脚本会输出工作流中的所有节点信息:

等待node_info_list生成包含所有可修改的节点
{'3': {'inputs': {...}}, '4': {...}, '6': {...}, ...}

Step 2:查看并修改节点

脚本会提示:

请输入 nodeId输入 'exit' 结束修改):

输入节点 nodeId如 10脚本会展示该节点的所有字段

🧩 找到节点 10 的字段如下:
(0, {'nodeId': '10', 'fieldName': 'image', 'fieldValue': 'xxx.jpg'})

接着输入要修改的字段名:

请输入要修改的 fieldName:

示例:image


Step 3:修改字段值

📷 如果是文件类型image/audio/video

请输入您本地image文件路径:

示例输入:D:\R.jpg

上传成功后:

等待文件上传中
上传结果: {'code': 0, 'msg': 'success', 'data': {'fileName': 'api/xxx.jpg', 'fileType': 'input'}}
✅ 已更新 image fieldValue: api/xxx.jpg

📝 如果是文本或数值类型

请输入新的 fieldValue (text):

示例输入:1 girl in classroom

返回:

✅ 已更新 fieldValue: 1 girl in classroom

可多次修改不同节点,输入 exit 结束。


Step 4:提交任务

输入完成后,脚本自动提交任务:

开始提交任务,请等待
📌 提交任务返回: {'code': 0, 'msg': 'success', 'data': {...}}
📝 taskId: 1980471280073846785
✅ 无节点错误,任务提交成功。

Step 5:任务状态轮询

脚本每隔 5 秒查询任务状态:

⏳ 任务运行中...
⏳ 任务运行中...
🎉 生成结果完成!

如果任务失败,会打印详细原因:

❌ 任务失败!
节点 SaveImage 失败原因: 'str' object has no attribute 'shape'
Traceback: [...]

Step 6:查看结果文件

任务成功后会输出生成文件链接:

🎉 生成结果完成!
[{'fileUrl': 'https://rh-images.xiaoyaoyou.com/f24a6365b08fa3bc02f55cd1f63e74a7/output/ComfyUI_00001_hnqxe_1761016156.png',
  'fileType': 'png',
  'taskCostTime': '35',
  'nodeId': '17'}]
✅ 任务完成!

打开 fileUrl 即可查看 AI 生成的图片。

4. 完整运行流程概览

1 输入 API_KEY 和 workflowId
2 加载本地 JSON 工作流
3 自动生成可修改节点列表
4 修改所需节点参数
5 上传文件(如图片)
6 提交任务至 RunningHub
7 轮询任务状态
8 获取并打印生成结果链接


5. 示例输出结果

请输入你的 api_key: a0fada**************b2ke21
请输入 workflowId: ***8315921559***
输入您的json文件地址(json文件一定要在自己的工作台中获得获得途径为导出工作流api到本地)C:\Users\Mayn\Downloads\api.json
🧩 找到节点 10 的字段如下:
(0, {'nodeId': '10', 'fieldName': 'image', 'fieldValue': 'xxx.jpg'})
✅ 已更新 image fieldValue: api/xxx.jpg
开始提交任务,请等待
📌 提交任务返回: {...}
⏳ 任务运行中...
🎉 生成结果完成!
✅ 任务完成!

6. 小贴士Tips

  • 建议使用 Python 3.8+
  • 脚本可直接在终端运行:
python workflow.py
  • Windows 用户注意文件路径需使用双反斜杠 \\
  • 若使用代理或云主机,请确保端口 443 可访问 www.runninghub.cn
import http.client
import json
import mimetypes
from codecs import encode
import time
import os
import requests
API_HOST = "www.runninghub.cn"
def load_json(file_path):
    # 打开并读取 JSON 文件
    with open(file_path, "r", encoding="utf-8") as f:
        data = json.load(f)  # 将 JSON 内容解析为 Python 对象dict 或 list
    # 打印读取到的数据
    print(data)
    return data
def convert_to_node_info_list(data):
    node_info_list = []

    for node_id, node_content in data.items():
        inputs = node_content.get("inputs", {})
        for field_name, field_value in inputs.items():
            # 如果 field_value 是列表或字典,可以选择转换成字符串
            if isinstance(field_value, (list, dict)):
                field_value = json.dumps(field_value)
            else:
                field_value = str(field_value)

            node_info_list.append({
                "nodeId": str(node_id),
                "fieldName": str(field_name),
                "fieldValue": field_value
            })
    return node_info_list
def upload_file(API_KEY, file_path):
    """
    上传文件到 RunningHub 平台
    """
    url = "https://www.runninghub.cn/task/openapi/upload"
    headers = {
        'Host': 'www.runninghub.cn'
    }
    data = {
        'apiKey': API_KEY,
        'fileType': 'input'
    }
    with open(file_path, 'rb') as f:
        files = {'file': f}
        response = requests.post(url, headers=headers, files=files, data=data)
    return response.json()
# 1⃣ 提交任务
def submit_task(workflowId, node_info_list,API_KEY):
    conn = http.client.HTTPSConnection("www.runninghub.cn")
    payload = json.dumps({
        "apiKey": API_KEY,
        "workflowId": workflowId,
        "nodeInfoList": node_info_list
    })
    headers = {
        'Host': 'www.runninghub.cn',
        'Content-Type': 'application/json'
    }
    conn.request("POST", "/task/openapi/create", payload, headers)
    res = conn.getresponse()
    data = res.read()
    # ✅ 注意这里:用 json.loads 而不是 json.load
    data = json.loads(data.decode("utf-8"))
    print(data)
    return data
def query_task_outputs(task_id,API_KEY):
    conn = http.client.HTTPSConnection(API_HOST)
    payload = json.dumps({
        "apiKey": API_KEY,
        "taskId": task_id
    })
    headers = {
        'Host': API_HOST,
        'Content-Type': 'application/json'
    }
    conn.request("POST", "/task/openapi/outputs", payload, headers)
    res = conn.getresponse()
    data = json.loads(res.read().decode("utf-8"))
    conn.close()
    return data
if __name__ == "__main__":
    print("下面两个输入用于获得AI工作流所需要的信息api_key为用户的密钥从api调用——进入控制台中获得workflowId此为示例具体的workflowId为你所选择的AI工作流界面上方的链接https://www.runninghub.cn/workflow/1980237776367083521?source=workspace最后的数字为workflowId")
    Api_key = input("请输入你的 api_key: ").strip()
    workflowId = input("请输入 workflowId: ").strip()
    print("请您下载您的工作流API json到本地")
    file_path = input("输入您的json文件地址(json文件一定要在自己的工作台中获得获得途径为导出工作流api到本地)").strip()
    print("等待node_info_list生成包涵所有的可以修改的node节点")
    data = load_json(file_path)
    node_info_list =  convert_to_node_info_list(data)
    print(node_info_list)
    print("下面用户可以输入工作流可以修改的节点idnodeId,以及对应的fileName,锁定具体的节点位置在找到具体位置之后输入您需要修改的fileValue信息完成信息的修改用户发送AI工作流请求")
    modified_nodes = []
    while True:
        node_id_input = input("请输入 nodeId输入 'exit' 结束修改): ").strip()
        if node_id_input.lower() == "exit":
            break

        # 找出该 nodeId 对应的所有字段
        node_fields = [n for n in node_info_list if n['nodeId'] == node_id_input]

        if not node_fields:
            print("❌ 未找到该 nodeId 对应的节点")
            continue

        print(f"\n🧩 找到节点 {node_id_input} 的字段如下:")
        for field in enumerate(node_fields):
            print(field)

        # 让用户选择要修改的字段
        field_name_input = input("\n请输入要修改的 fieldName: ").strip()
        target_node = next(
            (f for f in node_fields if f['fieldName'] == field_name_input), None
        )

        if not target_node:
            print("❌ 未找到该 fieldName")
            continue

        print(f"选中字段: {target_node}")
        # 根据类型处理
        if target_node['fieldName'] in ["image", "audio", "video"]:
            file_path = input(f"请输入您本地{target_node['fieldName']}文件路径: ").strip()
            print("等待文件上传中")
            upload_result = upload_file(Api_key, file_path)
            print("上传结果:", upload_result)
            # 假设 upload_file 已返回解析后的 JSON 字典
            if upload_result and upload_result.get("msg") == "success":
                uploaded_file_name = upload_result.get("data", {}).get("fileName")
                if uploaded_file_name:
                    target_node['fieldValue'] = uploaded_file_name
                    print(f"✅ 已更新 {target_node['fieldName']} fieldValue:", uploaded_file_name)
            else:
                print("❌ 上传失败或返回格式异常:", upload_result)
        else:
            # 其他类型直接修改
            new_value = input(f"请输入新的 fieldValue ({target_node['fieldName']}): ").strip()
            target_node['fieldValue'] = new_value
            print("✅ 已更新 fieldValue:", new_value)
        modified_nodes.append({
            "nodeId": target_node['nodeId'],
            "fieldName": target_node['fieldName'],
            "fieldValue": target_node['fieldValue']
        })
    print(modified_nodes)
    print("开始提交任务,请等待")
    # 提交任务
    submit_result = submit_task(workflowId, modified_nodes,Api_key)
    print("📌 提交任务返回:", submit_result)
    if submit_result.get("code") != 0:
        print("❌ 提交任务失败:", submit_result)
        exit()
    task_id = submit_result["data"]["taskId"]
    print(f"📝 taskId: {task_id}")
    # 解析成功返回
    prompt_tips_str = submit_result["data"].get("promptTips")
    if prompt_tips_str:
        try:
            prompt_tips = json.loads(prompt_tips_str)
            node_errors = prompt_tips.get("node_errors", {})
            if node_errors:
                print("⚠️ 节点错误信息如下:")
                for node_id, err in node_errors.items():
                    print(f"  节点 {node_id} 错误: {err}")
            else:
                print("✅ 无节点错误,任务提交成功。")
        except Exception as e:
            print("⚠️ 无法解析 promptTips:", e)
    else:
        print("⚠️ 未返回 promptTips 字段。")
    timeout = 600
    start_time = time.time()
    while True:
        outputs_result = query_task_outputs(task_id, Api_key)
        code = outputs_result.get("code")
        msg = outputs_result.get("msg")
        data = outputs_result.get("data")
        if code == 0 and data:  # 成功
            file_url = data[0].get("fileUrl")
            print("🎉 生成结果完成!")
            print(data)
            break
        elif code == 805:  # 任务失败
            failed_reason = data.get("failedReason") if data else None
            print("❌ 任务失败!")
            if failed_reason:
                print(f"节点 {failed_reason.get('node_name')} 失败原因: {failed_reason.get('exception_message')}")
                print("Traceback:", failed_reason.get("traceback"))
            else:
                print(outputs_result)
            break
        elif code == 804 or code == 813:  # 运行中或排队中
            status_text = "运行中" if code == 804 else "排队中"
            print(f"⏳ 任务{status_text}...")
        else:
            print("⚠️ 未知状态:", outputs_result)
        # 超时检查
        if time.time() - start_time > timeout:
            print("⏰ 等待超时超过10分钟任务未完成。")
            break
        time.sleep(5)
    print("✅ 任务完成!")