# 工作流完整接入示例 # 📝 RunningHub AI 工作流交互使用手册(workflow 版本) ## 1. 功能概述 本脚本通过调用 RunningHub AI 平台的 OpenAPI,实现从本地加载工作流 JSON、修改节点信息、上传文件、提交任务并自动查询结果的全流程操作。 主要功能包括: - 读取本地工作流配置(JSON 文件) - 生成可修改节点信息列表(nodeInfoList) - 根据节点类型(图片、文本等)修改节点值 - 上传图片、音频、视频文件 - 向 RunningHub 提交任务并实时查询状态 - 输出最终生成结果的文件链接 ✅ 适用于有自定义工作流(workflowId)的高级用户,可在不打开网页的情况下自动执行 AI 工作流。 --- ## 2. 文件说明与主要函数 ### 💡 主要文件 | 文件名 | 功能 | |-------------|------| | workflow.py | 主执行脚本 | | api.json | 从 RunningHub 下载的工作流配置文件(包含节点定义) | ### 🔧 核心函数介绍 | 函数名 | 功能描述 | |--------|----------| | load_json(file_path) | 从本地读取并解析工作流 JSON 文件 | | convert_to_node_info_list(data) | 将 JSON 格式转换为节点信息列表 | | upload_file(API_KEY, file_path) | 上传本地文件(image/audio/video)至 RunningHub | | submit_task(workflowId, node_info_list, API_KEY) | 提交任务,启动 AI 工作流执行 | | query_task_outputs(task_id, API_KEY) | 轮询任务执行状态并获取结果输出 | --- ## 3. 操作步骤详解 ### Step 1️⃣:输入必要信息 运行脚本后,系统会提示输入以下信息: ```text 请输入你的 api_key: ``` 说明:在 RunningHub 控制台“API 调用”中可获得。 示例:`0s2d1***********2n3mk4` ``` 请输入 workflowId: ``` 示例:`1980468315921559554` 来源于链接末尾:https://www.runninghub.cn/workflow/1980237776367083521?source=workspace 然后输入本地工作流 JSON 文件路径: ``` 输入您的json文件地址(json文件一定要在自己的工作台中获得,获得途径为导出工作流api到本地): ``` 示例:`C:\Users\Mayn\Downloads\api.json` 此时脚本会输出工作流中的所有节点信息: ``` 等待node_info_list生成(包含所有可修改的节点) {'3': {'inputs': {...}}, '4': {...}, '6': {...}, ...} ``` --- ### Step 2️⃣:查看并修改节点 脚本会提示: ```text 请输入 nodeId(输入 'exit' 结束修改): ``` 输入节点 nodeId(如 10),脚本会展示该节点的所有字段: ``` 🧩 找到节点 10 的字段如下: (0, {'nodeId': '10', 'fieldName': 'image', 'fieldValue': 'xxx.jpg'}) ``` 接着输入要修改的字段名: ``` 请输入要修改的 fieldName: ``` 示例:`image` --- ### Step 3️⃣:修改字段值 #### 📷 如果是文件类型(image/audio/video) ``` 请输入您本地image文件路径: ``` 示例输入:`D:\R.jpg` 上传成功后: ``` 等待文件上传中 上传结果: {'code': 0, 'msg': 'success', 'data': {'fileName': 'api/xxx.jpg', 'fileType': 'input'}} ✅ 已更新 image fieldValue: api/xxx.jpg ``` #### 📝 如果是文本或数值类型 ``` 请输入新的 fieldValue (text): ``` 示例输入:`1 girl in classroom` 返回: ``` ✅ 已更新 fieldValue: 1 girl in classroom ``` > 可多次修改不同节点,输入 `exit` 结束。 --- ### Step 4️⃣:提交任务 输入完成后,脚本自动提交任务: ``` 开始提交任务,请等待 📌 提交任务返回: {'code': 0, 'msg': 'success', 'data': {...}} 📝 taskId: 1980471280073846785 ✅ 无节点错误,任务提交成功。 ``` --- ### Step 5️⃣:任务状态轮询 脚本每隔 5 秒查询任务状态: ``` ⏳ 任务运行中... ⏳ 任务运行中... 🎉 生成结果完成! ``` 如果任务失败,会打印详细原因: ``` ❌ 任务失败! 节点 SaveImage 失败原因: 'str' object has no attribute 'shape' Traceback: [...] ``` --- ### Step 6️⃣:查看结果文件 任务成功后会输出生成文件链接: ``` 🎉 生成结果完成! [{'fileUrl': 'https://rh-images.xiaoyaoyou.com/f24a6365b08fa3bc02f55cd1f63e74a7/output/ComfyUI_00001_hnqxe_1761016156.png', 'fileType': 'png', 'taskCostTime': '35', 'nodeId': '17'}] ✅ 任务完成! ``` 打开 `fileUrl` 即可查看 AI 生成的图片。 ## 4. 完整运行流程概览 1️⃣ 输入 API_KEY 和 workflowId 2️⃣ 加载本地 JSON 工作流 3️⃣ 自动生成可修改节点列表 4️⃣ 修改所需节点参数 5️⃣ 上传文件(如图片) 6️⃣ 提交任务至 RunningHub 7️⃣ 轮询任务状态 8️⃣ 获取并打印生成结果链接 --- ## 5. 示例输出结果 ``` 请输入你的 api_key: a0fada**************b2ke21 请输入 workflowId: ***8315921559*** 输入您的json文件地址(json文件一定要在自己的工作台中获得,获得途径为导出工作流api到本地):C:\Users\Mayn\Downloads\api.json ``` ``` 🧩 找到节点 10 的字段如下: (0, {'nodeId': '10', 'fieldName': 'image', 'fieldValue': 'xxx.jpg'}) ✅ 已更新 image fieldValue: api/xxx.jpg ``` ``` 开始提交任务,请等待 📌 提交任务返回: {...} ⏳ 任务运行中... 🎉 生成结果完成! ✅ 任务完成! ``` --- ## 6. 小贴士(Tips) - 建议使用 Python 3.8+ - 脚本可直接在终端运行: ```bash python workflow.py ``` - Windows 用户注意文件路径需使用双反斜杠 `\\` - 若使用代理或云主机,请确保端口 443 可访问 `www.runninghub.cn` ```python import http.client import json import mimetypes from codecs import encode import time import os import requests API_HOST = "www.runninghub.cn" def load_json(file_path): # 打开并读取 JSON 文件 with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) # 将 JSON 内容解析为 Python 对象(dict 或 list) # 打印读取到的数据 print(data) return data def convert_to_node_info_list(data): node_info_list = [] for node_id, node_content in data.items(): inputs = node_content.get("inputs", {}) for field_name, field_value in inputs.items(): # 如果 field_value 是列表或字典,可以选择转换成字符串 if isinstance(field_value, (list, dict)): field_value = json.dumps(field_value) else: field_value = str(field_value) node_info_list.append({ "nodeId": str(node_id), "fieldName": str(field_name), "fieldValue": field_value }) return node_info_list def upload_file(API_KEY, file_path): """ 上传文件到 RunningHub 平台 """ url = "https://www.runninghub.cn/task/openapi/upload" headers = { 'Host': 'www.runninghub.cn' } data = { 'apiKey': API_KEY, 'fileType': 'input' } with open(file_path, 'rb') as f: files = {'file': f} response = requests.post(url, headers=headers, files=files, data=data) return response.json() # 1️⃣ 提交任务 def submit_task(workflowId, node_info_list,API_KEY): conn = http.client.HTTPSConnection("www.runninghub.cn") payload = json.dumps({ "apiKey": API_KEY, "workflowId": workflowId, "nodeInfoList": node_info_list }) headers = { 'Host': 'www.runninghub.cn', 'Content-Type': 'application/json' } conn.request("POST", "/task/openapi/create", payload, headers) res = conn.getresponse() data = res.read() # ✅ 注意这里:用 json.loads 而不是 json.load data = json.loads(data.decode("utf-8")) print(data) return data def query_task_outputs(task_id,API_KEY): conn = http.client.HTTPSConnection(API_HOST) payload = json.dumps({ "apiKey": API_KEY, "taskId": task_id }) headers = { 'Host': API_HOST, 'Content-Type': 'application/json' } conn.request("POST", "/task/openapi/outputs", payload, headers) res = conn.getresponse() data = json.loads(res.read().decode("utf-8")) conn.close() return data if __name__ == "__main__": print("下面两个输入用于获得AI工作流所需要的信息,api_key为用户的密钥从api调用——进入控制台中获得,workflowId(此为示例,具体的workflowId为你所选择的AI工作流界面上方的链接https://www.runninghub.cn/workflow/1980237776367083521?source=workspace,最后的数字为workflowId)") Api_key = input("请输入你的 api_key: ").strip() workflowId = input("请输入 workflowId: ").strip() print("请您下载您的工作流API json到本地") file_path = input("输入您的json文件地址(json文件一定要在自己的工作台中获得,获得途径为导出工作流api到本地):").strip() print("等待node_info_list生成(包涵所有的可以修改的node节点)") data = load_json(file_path) node_info_list = convert_to_node_info_list(data) print(node_info_list) print("下面用户可以输入工作流可以修改的节点id:nodeId,以及对应的fileName,锁定具体的节点位置,在找到具体位置之后,输入您需要修改的fileValue信息完成信息的修改用户发送AI工作流请求") modified_nodes = [] while True: node_id_input = input("请输入 nodeId(输入 'exit' 结束修改): ").strip() if node_id_input.lower() == "exit": break # 找出该 nodeId 对应的所有字段 node_fields = [n for n in node_info_list if n['nodeId'] == node_id_input] if not node_fields: print("❌ 未找到该 nodeId 对应的节点") continue print(f"\n🧩 找到节点 {node_id_input} 的字段如下:") for field in enumerate(node_fields): print(field) # 让用户选择要修改的字段 field_name_input = input("\n请输入要修改的 fieldName: ").strip() target_node = next( (f for f in node_fields if f['fieldName'] == field_name_input), None ) if not target_node: print("❌ 未找到该 fieldName") continue print(f"选中字段: {target_node}") # 根据类型处理 if target_node['fieldName'] in ["image", "audio", "video"]: file_path = input(f"请输入您本地{target_node['fieldName']}文件路径: ").strip() print("等待文件上传中") upload_result = upload_file(Api_key, file_path) print("上传结果:", upload_result) # 假设 upload_file 已返回解析后的 JSON 字典 if upload_result and upload_result.get("msg") == "success": uploaded_file_name = upload_result.get("data", {}).get("fileName") if uploaded_file_name: target_node['fieldValue'] = uploaded_file_name print(f"✅ 已更新 {target_node['fieldName']} fieldValue:", uploaded_file_name) else: print("❌ 上传失败或返回格式异常:", upload_result) else: # 其他类型直接修改 new_value = input(f"请输入新的 fieldValue ({target_node['fieldName']}): ").strip() target_node['fieldValue'] = new_value print("✅ 已更新 fieldValue:", new_value) modified_nodes.append({ "nodeId": target_node['nodeId'], "fieldName": target_node['fieldName'], "fieldValue": target_node['fieldValue'] }) print(modified_nodes) print("开始提交任务,请等待") # 提交任务 submit_result = submit_task(workflowId, modified_nodes,Api_key) print("📌 提交任务返回:", submit_result) if submit_result.get("code") != 0: print("❌ 提交任务失败:", submit_result) exit() task_id = submit_result["data"]["taskId"] print(f"📝 taskId: {task_id}") # 解析成功返回 prompt_tips_str = submit_result["data"].get("promptTips") if prompt_tips_str: try: prompt_tips = json.loads(prompt_tips_str) node_errors = prompt_tips.get("node_errors", {}) if node_errors: print("⚠️ 节点错误信息如下:") for node_id, err in node_errors.items(): print(f" 节点 {node_id} 错误: {err}") else: print("✅ 无节点错误,任务提交成功。") except Exception as e: print("⚠️ 无法解析 promptTips:", e) else: print("⚠️ 未返回 promptTips 字段。") timeout = 600 start_time = time.time() while True: outputs_result = query_task_outputs(task_id, Api_key) code = outputs_result.get("code") msg = outputs_result.get("msg") data = outputs_result.get("data") if code == 0 and data: # 成功 file_url = data[0].get("fileUrl") print("🎉 生成结果完成!") print(data) break elif code == 805: # 任务失败 failed_reason = data.get("failedReason") if data else None print("❌ 任务失败!") if failed_reason: print(f"节点 {failed_reason.get('node_name')} 失败原因: {failed_reason.get('exception_message')}") print("Traceback:", failed_reason.get("traceback")) else: print(outputs_result) break elif code == 804 or code == 813: # 运行中或排队中 status_text = "运行中" if code == 804 else "排队中" print(f"⏳ 任务{status_text}...") else: print("⚠️ 未知状态:", outputs_result) # 超时检查 if time.time() - start_time > timeout: print("⏰ 等待超时(超过10分钟),任务未完成。") break time.sleep(5) print("✅ 任务完成!") ```