diff --git a/.gitignore b/.gitignore index 3ab7c16..1f0854e 100644 --- a/.gitignore +++ b/.gitignore @@ -76,3 +76,5 @@ examples/ repositories/ *.out +.pids/ +.serena/ diff --git a/docs/工作流完整接入示例.md b/docs/工作流完整接入示例.md new file mode 100644 index 0000000..050a8bc --- /dev/null +++ b/docs/工作流完整接入示例.md @@ -0,0 +1,434 @@ +# 工作流完整接入示例 + +# 📝 RunningHub AI 工作流交互使用手册(workflow 版本) + +## 1. 功能概述 + +本脚本通过调用 RunningHub AI 平台的 OpenAPI,实现从本地加载工作流 JSON、修改节点信息、上传文件、提交任务并自动查询结果的全流程操作。 + +主要功能包括: + +- 读取本地工作流配置(JSON 文件) +- 生成可修改节点信息列表(nodeInfoList) +- 根据节点类型(图片、文本等)修改节点值 +- 上传图片、音频、视频文件 +- 向 RunningHub 提交任务并实时查询状态 +- 输出最终生成结果的文件链接 + +✅ 适用于有自定义工作流(workflowId)的高级用户,可在不打开网页的情况下自动执行 AI 工作流。 + +--- + +## 2. 文件说明与主要函数 + +### 💡 主要文件 + +| 文件名 | 功能 | +|-------------|------| +| workflow.py | 主执行脚本 | +| api.json | 从 RunningHub 下载的工作流配置文件(包含节点定义) | + +### 🔧 核心函数介绍 + +| 函数名 | 功能描述 | +|--------|----------| +| load_json(file_path) | 从本地读取并解析工作流 JSON 文件 | +| convert_to_node_info_list(data) | 将 JSON 格式转换为节点信息列表 | +| upload_file(API_KEY, file_path) | 上传本地文件(image/audio/video)至 RunningHub | +| submit_task(workflowId, node_info_list, API_KEY) | 提交任务,启动 AI 工作流执行 | +| query_task_outputs(task_id, API_KEY) | 轮询任务执行状态并获取结果输出 | + +--- + +## 3. 操作步骤详解 + +### Step 1️⃣:输入必要信息 + +运行脚本后,系统会提示输入以下信息: + +```text +请输入你的 api_key: +``` +说明:在 RunningHub 控制台“API 调用”中可获得。 +示例:`0s2d1***********2n3mk4` +``` +请输入 workflowId: +``` +示例:`1980468315921559554` +来源于链接末尾:https://www.runninghub.cn/workflow/1980237776367083521?source=workspace + +然后输入本地工作流 JSON 文件路径: + +``` +输入您的json文件地址(json文件一定要在自己的工作台中获得,获得途径为导出工作流api到本地): +``` +示例:`C:\Users\Mayn\Downloads\api.json` + +此时脚本会输出工作流中的所有节点信息: + +``` +等待node_info_list生成(包含所有可修改的节点) +{'3': {'inputs': {...}}, '4': {...}, '6': {...}, ...} +``` + +--- + +### Step 2️⃣:查看并修改节点 + +脚本会提示: + +```text +请输入 nodeId(输入 'exit' 结束修改): +``` + +输入节点 nodeId(如 10),脚本会展示该节点的所有字段: + +``` +🧩 找到节点 10 的字段如下: +(0, {'nodeId': '10', 'fieldName': 'image', 'fieldValue': 'xxx.jpg'}) +``` + +接着输入要修改的字段名: + +``` +请输入要修改的 fieldName: +``` +示例:`image` + +--- + +### Step 3️⃣:修改字段值 + +#### 📷 如果是文件类型(image/audio/video) + +``` +请输入您本地image文件路径: +``` +示例输入:`D:\R.jpg` + +上传成功后: + +``` +等待文件上传中 +上传结果: {'code': 0, 'msg': 'success', 'data': {'fileName': 'api/xxx.jpg', 'fileType': 'input'}} +✅ 已更新 image fieldValue: api/xxx.jpg +``` + +#### 📝 如果是文本或数值类型 + +``` +请输入新的 fieldValue (text): +``` +示例输入:`1 girl in classroom` + +返回: + +``` +✅ 已更新 fieldValue: 1 girl in classroom +``` + +> 可多次修改不同节点,输入 `exit` 结束。 + +--- + +### Step 4️⃣:提交任务 + +输入完成后,脚本自动提交任务: + +``` +开始提交任务,请等待 +📌 提交任务返回: {'code': 0, 'msg': 'success', 'data': {...}} +📝 taskId: 1980471280073846785 +✅ 无节点错误,任务提交成功。 +``` + +--- + +### Step 5️⃣:任务状态轮询 + +脚本每隔 5 秒查询任务状态: + +``` +⏳ 任务运行中... +⏳ 任务运行中... +🎉 生成结果完成! +``` + +如果任务失败,会打印详细原因: + +``` +❌ 任务失败! +节点 SaveImage 失败原因: 'str' object has no attribute 'shape' +Traceback: [...] +``` + +--- + +### Step 6️⃣:查看结果文件 + +任务成功后会输出生成文件链接: + +``` +🎉 生成结果完成! +[{'fileUrl': 'https://rh-images.xiaoyaoyou.com/f24a6365b08fa3bc02f55cd1f63e74a7/output/ComfyUI_00001_hnqxe_1761016156.png', + 'fileType': 'png', + 'taskCostTime': '35', + 'nodeId': '17'}] +✅ 任务完成! +``` + +打开 `fileUrl` 即可查看 AI 生成的图片。 + +## 4. 完整运行流程概览 + +1️⃣ 输入 API_KEY 和 workflowId +2️⃣ 加载本地 JSON 工作流 +3️⃣ 自动生成可修改节点列表 +4️⃣ 修改所需节点参数 +5️⃣ 上传文件(如图片) +6️⃣ 提交任务至 RunningHub +7️⃣ 轮询任务状态 +8️⃣ 获取并打印生成结果链接 + +--- + +## 5. 示例输出结果 + +``` +请输入你的 api_key: a0fada**************b2ke21 +请输入 workflowId: ***8315921559*** +输入您的json文件地址(json文件一定要在自己的工作台中获得,获得途径为导出工作流api到本地):C:\Users\Mayn\Downloads\api.json +``` +``` +🧩 找到节点 10 的字段如下: +(0, {'nodeId': '10', 'fieldName': 'image', 'fieldValue': 'xxx.jpg'}) +✅ 已更新 image fieldValue: api/xxx.jpg +``` +``` +开始提交任务,请等待 +📌 提交任务返回: {...} +⏳ 任务运行中... +🎉 生成结果完成! +✅ 任务完成! +``` + +--- + +## 6. 小贴士(Tips) + +- 建议使用 Python 3.8+ +- 脚本可直接在终端运行: + +```bash +python workflow.py +``` + +- Windows 用户注意文件路径需使用双反斜杠 `\\` +- 若使用代理或云主机,请确保端口 443 可访问 `www.runninghub.cn` + +```python +import http.client +import json +import mimetypes +from codecs import encode +import time +import os +import requests +API_HOST = "www.runninghub.cn" +def load_json(file_path): + # 打开并读取 JSON 文件 + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) # 将 JSON 内容解析为 Python 对象(dict 或 list) + # 打印读取到的数据 + print(data) + return data +def convert_to_node_info_list(data): + node_info_list = [] + + for node_id, node_content in data.items(): + inputs = node_content.get("inputs", {}) + for field_name, field_value in inputs.items(): + # 如果 field_value 是列表或字典,可以选择转换成字符串 + if isinstance(field_value, (list, dict)): + field_value = json.dumps(field_value) + else: + field_value = str(field_value) + + node_info_list.append({ + "nodeId": str(node_id), + "fieldName": str(field_name), + "fieldValue": field_value + }) + return node_info_list +def upload_file(API_KEY, file_path): + """ + 上传文件到 RunningHub 平台 + """ + url = "https://www.runninghub.cn/task/openapi/upload" + headers = { + 'Host': 'www.runninghub.cn' + } + data = { + 'apiKey': API_KEY, + 'fileType': 'input' + } + with open(file_path, 'rb') as f: + files = {'file': f} + response = requests.post(url, headers=headers, files=files, data=data) + return response.json() +# 1️⃣ 提交任务 +def submit_task(workflowId, node_info_list,API_KEY): + conn = http.client.HTTPSConnection("www.runninghub.cn") + payload = json.dumps({ + "apiKey": API_KEY, + "workflowId": workflowId, + "nodeInfoList": node_info_list + }) + headers = { + 'Host': 'www.runninghub.cn', + 'Content-Type': 'application/json' + } + conn.request("POST", "/task/openapi/create", payload, headers) + res = conn.getresponse() + data = res.read() + # ✅ 注意这里:用 json.loads 而不是 json.load + data = json.loads(data.decode("utf-8")) + print(data) + return data +def query_task_outputs(task_id,API_KEY): + conn = http.client.HTTPSConnection(API_HOST) + payload = json.dumps({ + "apiKey": API_KEY, + "taskId": task_id + }) + headers = { + 'Host': API_HOST, + 'Content-Type': 'application/json' + } + conn.request("POST", "/task/openapi/outputs", payload, headers) + res = conn.getresponse() + data = json.loads(res.read().decode("utf-8")) + conn.close() + return data +if __name__ == "__main__": + print("下面两个输入用于获得AI工作流所需要的信息,api_key为用户的密钥从api调用——进入控制台中获得,workflowId(此为示例,具体的workflowId为你所选择的AI工作流界面上方的链接https://www.runninghub.cn/workflow/1980237776367083521?source=workspace,最后的数字为workflowId)") + Api_key = input("请输入你的 api_key: ").strip() + workflowId = input("请输入 workflowId: ").strip() + print("请您下载您的工作流API json到本地") + file_path = input("输入您的json文件地址(json文件一定要在自己的工作台中获得,获得途径为导出工作流api到本地):").strip() + print("等待node_info_list生成(包涵所有的可以修改的node节点)") + data = load_json(file_path) + node_info_list = convert_to_node_info_list(data) + print(node_info_list) + print("下面用户可以输入工作流可以修改的节点id:nodeId,以及对应的fileName,锁定具体的节点位置,在找到具体位置之后,输入您需要修改的fileValue信息完成信息的修改用户发送AI工作流请求") + modified_nodes = [] + while True: + node_id_input = input("请输入 nodeId(输入 'exit' 结束修改): ").strip() + if node_id_input.lower() == "exit": + break + + # 找出该 nodeId 对应的所有字段 + node_fields = [n for n in node_info_list if n['nodeId'] == node_id_input] + + if not node_fields: + print("❌ 未找到该 nodeId 对应的节点") + continue + + print(f"\n🧩 找到节点 {node_id_input} 的字段如下:") + for field in enumerate(node_fields): + print(field) + + # 让用户选择要修改的字段 + field_name_input = input("\n请输入要修改的 fieldName: ").strip() + target_node = next( + (f for f in node_fields if f['fieldName'] == field_name_input), None + ) + + if not target_node: + print("❌ 未找到该 fieldName") + continue + + print(f"选中字段: {target_node}") + # 根据类型处理 + if target_node['fieldName'] in ["image", "audio", "video"]: + file_path = input(f"请输入您本地{target_node['fieldName']}文件路径: ").strip() + print("等待文件上传中") + upload_result = upload_file(Api_key, file_path) + print("上传结果:", upload_result) + # 假设 upload_file 已返回解析后的 JSON 字典 + if upload_result and upload_result.get("msg") == "success": + uploaded_file_name = upload_result.get("data", {}).get("fileName") + if uploaded_file_name: + target_node['fieldValue'] = uploaded_file_name + print(f"✅ 已更新 {target_node['fieldName']} fieldValue:", uploaded_file_name) + else: + print("❌ 上传失败或返回格式异常:", upload_result) + else: + # 其他类型直接修改 + new_value = input(f"请输入新的 fieldValue ({target_node['fieldName']}): ").strip() + target_node['fieldValue'] = new_value + print("✅ 已更新 fieldValue:", new_value) + modified_nodes.append({ + "nodeId": target_node['nodeId'], + "fieldName": target_node['fieldName'], + "fieldValue": target_node['fieldValue'] + }) + print(modified_nodes) + print("开始提交任务,请等待") + # 提交任务 + submit_result = submit_task(workflowId, modified_nodes,Api_key) + print("📌 提交任务返回:", submit_result) + if submit_result.get("code") != 0: + print("❌ 提交任务失败:", submit_result) + exit() + task_id = submit_result["data"]["taskId"] + print(f"📝 taskId: {task_id}") + # 解析成功返回 + prompt_tips_str = submit_result["data"].get("promptTips") + if prompt_tips_str: + try: + prompt_tips = json.loads(prompt_tips_str) + node_errors = prompt_tips.get("node_errors", {}) + if node_errors: + print("⚠️ 节点错误信息如下:") + for node_id, err in node_errors.items(): + print(f" 节点 {node_id} 错误: {err}") + else: + print("✅ 无节点错误,任务提交成功。") + except Exception as e: + print("⚠️ 无法解析 promptTips:", e) + else: + print("⚠️ 未返回 promptTips 字段。") + timeout = 600 + start_time = time.time() + while True: + outputs_result = query_task_outputs(task_id, Api_key) + code = outputs_result.get("code") + msg = outputs_result.get("msg") + data = outputs_result.get("data") + if code == 0 and data: # 成功 + file_url = data[0].get("fileUrl") + print("🎉 生成结果完成!") + print(data) + break + elif code == 805: # 任务失败 + failed_reason = data.get("failedReason") if data else None + print("❌ 任务失败!") + if failed_reason: + print(f"节点 {failed_reason.get('node_name')} 失败原因: {failed_reason.get('exception_message')}") + print("Traceback:", failed_reason.get("traceback")) + else: + print(outputs_result) + break + elif code == 804 or code == 813: # 运行中或排队中 + status_text = "运行中" if code == 804 else "排队中" + print(f"⏳ 任务{status_text}...") + else: + print("⚠️ 未知状态:", outputs_result) + # 超时检查 + if time.time() - start_time > timeout: + print("⏰ 等待超时(超过10分钟),任务未完成。") + break + time.sleep(5) + print("✅ 任务完成!") +``` \ No newline at end of file diff --git a/pixelle_video/pipelines/standard.py b/pixelle_video/pipelines/standard.py index 9962198..774b6c8 100644 --- a/pixelle_video/pipelines/standard.py +++ b/pixelle_video/pipelines/standard.py @@ -495,11 +495,26 @@ class StandardPipeline(LinearVideoPipeline): logger.warning("No task_id in storyboard, skipping persistence") return - # Build metadata - input_with_title = ctx.params.copy() - input_with_title["text"] = ctx.input_text # Ensure text is included - if not input_with_title.get("title"): - input_with_title["title"] = storyboard.title + # Build metadata - filter out non-serializable objects + clean_input = {} + for key, value in ctx.params.items(): + # Skip non-serializable objects like CharacterMemory + if key == "character_memory": + # Convert to serializable dict if present + if value is not None and hasattr(value, 'to_dict'): + clean_input["character_memory"] = value.to_dict() + elif key == "progress_callback": + # Skip callback functions + continue + elif callable(value): + # Skip any callable objects + continue + else: + clean_input[key] = value + + clean_input["text"] = ctx.input_text # Ensure text is included + if not clean_input.get("title"): + clean_input["title"] = storyboard.title metadata = { "task_id": task_id, @@ -507,7 +522,7 @@ class StandardPipeline(LinearVideoPipeline): "completed_at": storyboard.completed_at.isoformat() if storyboard.completed_at else None, "status": "completed", - "input": input_with_title, + "input": clean_input, "result": { "video_path": result.video_path,