refactor: 移除 Web 环境,专注桌面应用,修复 macOS 卡死问题

Web 环境移除:
- 删除 Web 相关文件:src/app.py, heartbeat.py
- 用 requirements-desktop.txt 替换 requirements.txt
- 更新 README.md:移除 Web 界面、部署方案等章节
- 更新技术栈说明:Streamlit → PyQt6
- 添加 usb_bundle/ 到 .gitignore

Desktop 应用改进:
- 重构 OCRService:使用独立 Python 线程替代 QThread
- 添加主线程预加载 paddleocr 模块,修复 macOS 上卡死问题
- 新增离线 OCR 初始化模块(src/ocr_offline.py)
- 新增模型准备脚本(scripts/prepare_models.py)
- 新增摄像头诊断工具(scripts/camera_probe.py)

功能定位:
- Desktop 应用(src/desktop.py):实时摄像头拍照识别
- CLI 批处理(src/main.py):批量处理目录中的图片

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
empty
2026-02-14 17:31:05 +08:00
parent 35d05d4701
commit 0ee00e6be7
10 changed files with 919 additions and 443 deletions

5
.gitignore vendored
View File

@@ -7,4 +7,9 @@ __pycache__/
.DS_Store
.venv/
venv/
.serena/
models/*
!models/.gitkeep
usb_bundle/

View File

@@ -7,7 +7,7 @@
- 自动识别信封图片中的文字信息
- 结构化提取:编号、邮编、地址、联系人、电话
- 支持批量处理,结果导出为 Excel
- 提供 Web 界面,操作简单
- 提供桌面应用,支持摄像头实时拍照识别
## 系统要求
@@ -41,76 +41,51 @@ python src/main.py
# 结果保存在 data/output/result.xlsx
```
**Web 界面**
**桌面应用**
```bash
streamlit run src/app.py --server.port 8501
python src/desktop.py
# 浏览器访问 http://localhost:8501
# 启动 PyQt6 窗口,可选择摄像头实时拍照识别
```
## 部署方案
---
### 方案一:内网服务器部署(推荐
## Windows 桌面离线版zip 目录包
适合多人使用,有内网环境的工厂
本项目桌面版入口为 `src/desktop.py`PyQt6 + OpenCV适合现场工位离线使用
### 1. 准备离线模型(在有网机器执行一次)
```bash
# 启动服务(监听所有网卡)
streamlit run src/app.py --server.address 0.0.0.0 --server.port 8501
# 工人通过浏览器访问: http://服务器IP:8501
pip install -r requirements.txt
python scripts/prepare_models.py --models-dir models
```
### 方案二Docker 容器化部署
执行完成后会生成 `models/whl/...` 目录结构;该 `models/` 目录需要与最终的 exe 同级分发。
适合需要隔离环境或快速部署的场景。
### 2. Windows 打包(建议使用 PyInstaller 的 onedir
```bash
# 构建镜像
docker build -t envelope-ocr .
请在 Windows 机器上构建 Windows 包(不要跨平台交叉打包)。
# 运行容器
docker run -d -p 8501:8501 --name envelope-ocr envelope-ocr
```powershell
pip install -r requirements.txt
pip install pyinstaller
pyinstaller --noconfirm --clean --windowed --onedir `
--name "post-ocr-desktop" `
--paths "src" `
--collect-all "Cython" `
--collect-all "paddleocr" `
--collect-all "paddle" `
--add-data "models;models" `
"src/desktop.py"
```
Dockerfile:
```dockerfile
FROM python:3.10-slim
RUN apt-get update && apt-get install -y libgl1-mesa-glx libglib2.0-0 && rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY . .
RUN pip install --no-cache-dir -r requirements.txt
EXPOSE 8501
CMD ["streamlit", "run", "src/app.py", "--server.address", "0.0.0.0"]
```
打包完成后,将 `dist\post-ocr-desktop\` 整个目录压缩为 zip 交付即可。
### 方案三:系统服务(开机自启)
适合长期稳定运行的生产环境。
创建服务文件 `/etc/systemd/system/envelope-ocr.service`:
```ini
[Unit]
Description=Envelope OCR Service
After=network.target
[Service]
User=www-data
WorkingDirectory=/opt/post-ocr
ExecStart=/usr/bin/streamlit run src/app.py --server.address 0.0.0.0 --server.port 8501
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
```
启用服务:
```bash
sudo systemctl daemon-reload
sudo systemctl enable envelope-ocr
sudo systemctl start envelope-ocr
```
注意:
- 本项目默认使用 PaddleOCR 2.10.0PP-OCRv4 中文)离线模型目录结构
-`models/` 缺失,程序会直接报错提示,避免触发联网下载
## 目录结构
@@ -121,7 +96,7 @@ post-ocr/
│ └── output/ # 结果 Excel 及处理日志
├── src/
│ ├── main.py # 命令行入口
│ ├── app.py # Web 界面
│ ├── desktop.py # 桌面应用入口
│ └── processor.py # 核心处理逻辑
├── requirements.txt
└── README.md
@@ -130,7 +105,7 @@ post-ocr/
## 技术栈
- OCR 引擎: PaddleOCR 2.10 (PP-OCRv4)
- Web 框架: Streamlit
- 桌面框架: PyQt6
- 数据处理: Pandas
## 常见问题

View File

@@ -1,53 +0,0 @@
#!/usr/bin/env python3
"""心跳程序 - 保持服务活跃"""
import sys
import time
import subprocess
import requests
from datetime import datetime
# 禁用输出缓冲
sys.stdout.reconfigure(line_buffering=True)
def log(msg):
print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] {msg}", flush=True)
def check_streamlit():
"""检查 Streamlit 服务"""
try:
r = requests.get("http://localhost:8501", timeout=5)
return r.status_code == 200
except:
return False
def restart_streamlit():
"""重启 Streamlit"""
subprocess.run(["pkill", "-f", "streamlit run"], capture_output=True)
time.sleep(2)
subprocess.Popen(
["streamlit", "run", "src/app.py", "--server.port", "8501", "--server.address", "0.0.0.0"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
print(f"[{datetime.now():%H:%M:%S}] Streamlit 已重启")
def main():
log("心跳程序启动")
while True:
if not check_streamlit():
log("Streamlit 无响应,正在重启...")
restart_streamlit()
time.sleep(10)
else:
log("✓ 服务正常")
time.sleep(60) # 每分钟检查一次
if __name__ == "__main__":
main()

View File

@@ -1,8 +0,0 @@
# 桌面版依赖(本地电脑安装)
paddleocr>=2.6,<3
paddlepaddle>=2.5,<3
pandas
openpyxl
pydantic
PyQt6
opencv-python

View File

@@ -1,7 +1,8 @@
# 桌面版依赖(本地电脑安装)
paddleocr>=2.6,<3
paddlepaddle>=2.5,<3
pandas
openpyxl
pydantic
tqdm
streamlit
PyQt6
opencv-python

67
scripts/camera_probe.py Executable file
View File

@@ -0,0 +1,67 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
摄像头探测脚本(用于排查 macOS/iPhone 连续互通相机无画面问题)
用法:
source .venv/bin/activate
python scripts/camera_probe.py
输出:
- 列出 0~9 号摄像头是否可打开、是否可读到有效帧、帧尺寸与亮度均值
"""
from __future__ import annotations
import sys
def open_cap(cv2, cam_id: int):
if sys.platform == "darwin" and hasattr(cv2, "CAP_AVFOUNDATION"):
return cv2.VideoCapture(cam_id, cv2.CAP_AVFOUNDATION)
return cv2.VideoCapture(cam_id)
def main() -> int:
import cv2 # pylint: disable=import-error
print(f"平台: {sys.platform}")
print(f"OpenCV: {cv2.__version__}")
print("")
found_any = False
for cam_id in range(10):
cap = open_cap(cv2, cam_id)
opened = cap.isOpened()
ok = False
shape = None
mean = None
if opened:
for _ in range(30):
ret, frame = cap.read()
if ret and frame is not None and frame.size > 0:
ok = True
shape = frame.shape
mean = float(frame.mean())
break
cap.release()
if opened:
found_any = True
status = "OK" if ok else ("打开但无画面" if opened else "无法打开")
print(f"摄像头 {cam_id}: {status}", end="")
if ok:
print(f" | shape={shape} | mean={mean:.1f}")
else:
print("")
if not found_any:
print("\n未检测到可打开的摄像头。")
else:
print("\n如果出现“打开但无画面”,优先检查 macOS 相机权限。")
return 0
if __name__ == "__main__":
raise SystemExit(main())

59
scripts/prepare_models.py Executable file
View File

@@ -0,0 +1,59 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
离线模型准备脚本(建议在“有网机器”执行一次)
用途:
- 将 PaddleOCR 2.10.0PP-OCRv4 中文)所需模型下载到指定 models/ 目录
- 该 models/ 目录可直接随 Windows zip 目录包分发,实现完全离线运行
设计说明:
- 脚本只做“下载/补齐”,不做删除或覆盖,避免误删用户已有模型(高风险操作)
"""
from __future__ import annotations
import argparse
import os
from pathlib import Path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="准备 post-ocr 离线模型PP-OCRv4 中文)")
parser.add_argument(
"--models-dir",
default="models",
help="模型输出目录默认models建议与 exe 同级)",
)
parser.add_argument(
"--show-log",
action="store_true",
help="显示 PaddleOCR 初始化日志(默认关闭)",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
models_dir = Path(args.models_dir).resolve()
models_dir.mkdir(parents=True, exist_ok=True)
# 关键:把 PaddleOCR 默认 base_dir 指到我们指定的 models/
os.environ["PADDLE_PDX_DISABLE_MODEL_SOURCE_CHECK"] = "True"
os.environ["PADDLE_OCR_BASE_DIR"] = str(models_dir)
# 延迟导入:确保环境变量在模块加载前生效
from paddleocr import PaddleOCR # pylint: disable=import-error
print(f"将下载/补齐模型到: {models_dir}")
print("首次执行需要联网下载(约数百 MB请耐心等待。")
# 初始化会自动下载 det/rec/cls 模型到 BASE_DIR/whl/...
PaddleOCR(lang="ch", show_log=args.show_log, use_angle_cls=False)
print("完成。你可以将该 models/ 目录随 zip 目录包一起分发(与 exe 同级)。")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,247 +0,0 @@
import os
import tempfile
import base64
import pandas as pd
import streamlit as st
import streamlit.components.v1 as components
from paddleocr import PaddleOCR
from processor import extract_info, save_to_excel
os.environ["PADDLE_PDX_DISABLE_MODEL_SOURCE_CHECK"] = "True"
st.set_page_config(
page_title="信封信息提取系统",
page_icon="📮",
layout="centered",
initial_sidebar_state="collapsed",
)
st.markdown("""
<style>
.stApp { max-width: 100%; }
.stButton>button { width: 100%; height: 3em; font-size: 1.2em; }
.stDownloadButton>button { width: 100%; height: 3em; }
</style>
""", unsafe_allow_html=True)
st.title("📮 信封信息提取")
@st.cache_resource
def load_ocr():
return PaddleOCR(use_textline_orientation=True, lang="ch", show_log=False)
ocr = load_ocr()
def process_image(image_data):
"""处理图片数据"""
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(image_data)
tmp_path = tmp.name
try:
result = ocr.ocr(tmp_path, cls=False)
ocr_texts = []
if result and result[0]:
for line in result[0]:
if line and len(line) >= 2:
ocr_texts.append(line[1][0])
return extract_info(ocr_texts), ocr_texts
finally:
os.unlink(tmp_path)
# 自定义摄像头组件,带叠加扫描框
CAMERA_COMPONENT = """
<div id="camera-container" style="position:relative; width:100%; max-width:500px; margin:0 auto;">
<video id="video" autoplay playsinline style="width:100%; border-radius:10px; background:#000;"></video>
<!-- 扫描框叠加层 -->
<div id="overlay" style="
position: absolute;
top: 8%;
left: 50%;
transform: translateX(-50%);
width: 88%;
height: 70%;
border: 3px solid #00ff00;
box-sizing: border-box;
pointer-events: none;
">
<!-- 四角 -->
<div style="position:absolute; top:-3px; left:-3px; width:20px; height:20px; border-top:4px solid #00ff00; border-left:4px solid #00ff00;"></div>
<div style="position:absolute; top:-3px; right:-3px; width:20px; height:20px; border-top:4px solid #00ff00; border-right:4px solid #00ff00;"></div>
<div style="position:absolute; bottom:-3px; left:-3px; width:20px; height:20px; border-bottom:4px solid #00ff00; border-left:4px solid #00ff00;"></div>
<div style="position:absolute; bottom:-3px; right:-3px; width:20px; height:20px; border-bottom:4px solid #00ff00; border-right:4px solid #00ff00;"></div>
<!-- 字段提示:邮编(左上)、地址(中间)、联系人+电话(底部) -->
<div style="position:absolute; top:8px; left:10px; color:rgba(255,255,255,0.6); font-size:12px;">邮编</div>
<div style="position:absolute; top:35%; left:10px; right:10px; color:rgba(255,255,255,0.6); font-size:12px; border-bottom:1px dashed rgba(255,255,255,0.3); padding-bottom:30%;">地址</div>
<div style="position:absolute; bottom:8px; left:10px; color:rgba(255,255,255,0.6); font-size:12px;">联系人</div>
<div style="position:absolute; bottom:8px; right:10px; color:rgba(255,255,255,0.6); font-size:12px;">电话</div>
</div>
<!-- 编号提示在框外底部 -->
<div style="position:absolute; bottom:18%; left:50%; transform:translateX(-50%); color:rgba(255,255,255,0.6); font-size:11px;">
↑ 编号在此处 ↑
</div>
<canvas id="canvas" style="display:none;"></canvas>
<p id="hint" style="text-align:center; color:#666; margin:10px 0; font-size:14px;">
📌 将信封背面对齐绿色框,编号对准底部
</p>
<button id="capture-btn" onclick="capturePhoto()" style="
width: 100%;
padding: 15px;
font-size: 18px;
background: #ff4b4b;
color: white;
border: none;
border-radius: 8px;
cursor: pointer;
margin-top: 10px;
">📷 拍照识别</button>
</div>
<script>
const video = document.getElementById('video');
const canvas = document.getElementById('canvas');
const hint = document.getElementById('hint');
// 启动后置摄像头
async function startCamera() {
try {
const stream = await navigator.mediaDevices.getUserMedia({
video: { facingMode: 'environment', width: { ideal: 1280 }, height: { ideal: 720 } }
});
video.srcObject = stream;
} catch (err) {
hint.textContent = '❌ 无法访问摄像头: ' + err.message;
console.error(err);
}
}
function capturePhoto() {
canvas.width = video.videoWidth;
canvas.height = video.videoHeight;
canvas.getContext('2d').drawImage(video, 0, 0);
const dataUrl = canvas.toDataURL('image/jpeg', 0.9);
// 发送到 Streamlit
window.parent.postMessage({
type: 'streamlit:setComponentValue',
value: dataUrl
}, '*');
hint.textContent = '✅ 已拍照,正在识别...';
document.getElementById('capture-btn').disabled = true;
}
startCamera();
</script>
"""
# 初始化 session state
if "records" not in st.session_state:
st.session_state.records = []
# 输入方式选择
tab_camera, tab_upload = st.tabs(["📷 拍照扫描", "📁 上传图片"])
with tab_camera:
# 使用自定义摄像头组件
photo_data = components.html(CAMERA_COMPONENT, height=550)
# 检查是否有拍照数据
if "captured_image" not in st.session_state:
st.session_state.captured_image = None
# 文件上传作为备用用于接收JS传来的数据
uploaded_photo = st.file_uploader(
"或直接上传照片",
type=["jpg", "jpeg", "png"],
key="camera_upload",
label_visibility="collapsed"
)
if uploaded_photo:
with st.spinner("识别中..."):
record, raw_texts = process_image(uploaded_photo.getvalue())
st.success("✅ 识别完成!")
col1, col2 = st.columns(2)
with col1:
st.image(uploaded_photo, caption="拍摄图片", use_container_width=True)
with col2:
st.metric("邮编", record.get("邮编", "-"))
st.metric("电话", record.get("电话", "-"))
st.metric("联系人", record.get("联系人/单位名", "-"))
st.text_area("地址", record.get("地址", ""), disabled=True, height=68)
st.text_input("编号", record.get("编号", ""), disabled=True)
if st.button("✅ 添加到列表", type="primary", key="add_camera"):
record["来源"] = "拍照"
st.session_state.records.append(record)
st.success(f"已添加!当前共 {len(st.session_state.records)} 条记录")
st.rerun()
with tab_upload:
uploaded_files = st.file_uploader(
"选择图片文件",
type=["jpg", "jpeg", "png", "bmp"],
accept_multiple_files=True,
label_visibility="collapsed",
)
if uploaded_files:
if st.button("🚀 开始识别", type="primary"):
progress = st.progress(0)
for i, file in enumerate(uploaded_files):
with st.spinner(f"处理 {file.name}..."):
record, _ = process_image(file.getvalue())
record["来源"] = file.name
st.session_state.records.append(record)
progress.progress((i + 1) / len(uploaded_files))
st.success(f"完成!已添加 {len(uploaded_files)} 条记录")
st.rerun()
# 显示已收集的记录
st.divider()
st.subheader(f"📋 已收集 {len(st.session_state.records)} 条记录")
if st.session_state.records:
df = pd.DataFrame(st.session_state.records)
cols = ["来源", "编号", "邮编", "地址", "联系人/单位名", "电话"]
df = df.reindex(columns=[c for c in cols if c in df.columns])
st.dataframe(df, use_container_width=True, hide_index=True)
col1, col2 = st.columns(2)
with col1:
output_path = tempfile.mktemp(suffix=".xlsx")
df.to_excel(output_path, index=False)
with open(output_path, "rb") as f:
st.download_button(
"📥 下载 Excel",
data=f,
file_name="信封提取结果.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
os.unlink(output_path)
with col2:
if st.button("🗑️ 清空列表"):
st.session_state.records = []
st.rerun()
else:
st.info("👆 使用上方拍照或上传功能添加记录")

View File

@@ -6,8 +6,11 @@
import os
import sys
import cv2
import tempfile
import pandas as pd
import time
import logging
import threading
import queue
from datetime import datetime
from pathlib import Path
@@ -17,55 +20,232 @@ from PyQt6.QtWidgets import (
QFileDialog, QMessageBox, QGroupBox, QSplitter, QHeaderView,
QStatusBar, QProgressBar
)
from PyQt6.QtCore import Qt, QTimer, QThread, pyqtSignal
from PyQt6.QtGui import QImage, QPixmap, QFont, QAction
from PyQt6.QtCore import Qt, QTimer, pyqtSignal, QObject, pyqtSlot
from PyQt6.QtGui import QImage, QPixmap, QFont, QAction, QKeySequence, QShortcut
from paddleocr import PaddleOCR
from processor import extract_info
from ocr_offline import create_offline_ocr, get_models_base_dir
os.environ["PADDLE_PDX_DISABLE_MODEL_SOURCE_CHECK"] = "True"
logger = logging.getLogger("post_ocr.desktop")
class OCRWorker(QThread):
"""OCR 识别线程"""
finished = pyqtSignal(dict, list)
error = pyqtSignal(str)
def __init__(self, ocr, image_path):
def setup_logging() -> Path:
"""
日志输出:
- 终端实时打印
- 写入 data/output/desktop.log便于用户反馈与排查
"""
level_name = os.environ.get("POST_OCR_LOG_LEVEL", "INFO").upper().strip()
level = getattr(logging, level_name, logging.INFO)
log_dir = Path("data/output").resolve()
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / "desktop.log"
fmt = "%(asctime)s.%(msecs)03d %(levelname)s [%(threadName)s] %(name)s: %(message)s"
datefmt = "%Y-%m-%d %H:%M:%S"
root = logging.getLogger()
root.setLevel(level)
# 清理旧 handler避免重复输出
for h in list(root.handlers):
root.removeHandler(h)
sh = logging.StreamHandler(stream=sys.stdout)
sh.setLevel(level)
sh.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt))
root.addHandler(sh)
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setLevel(level)
fh.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt))
root.addHandler(fh)
logger.info("日志已初始化level=%s, file=%s", level_name, str(log_file))
return log_file
class OCRService(QObject):
"""
OCR 后台服务(运行在标准 Python 线程内)。
关键点:
- 避免使用 QThread在 macOS 上QThread(Dummy-*) 内 import paddleocr 可能卡死
- PaddleOCR 实例在后台线程内创建并使用,避免跨线程调用导致卡死/死锁
- 单线程串行处理任务:避免并发推理挤爆内存或引发底层库竞争
"""
finished = pyqtSignal(int, dict, list)
error = pyqtSignal(int, str)
ready = pyqtSignal()
init_error = pyqtSignal(str)
busy_changed = pyqtSignal(bool)
def __init__(self, models_base_dir: Path):
super().__init__()
self.ocr = ocr
self.image_path = image_path
self._models_base_dir = models_base_dir
self._ocr = None
self._busy = False
self._stop_event = threading.Event()
self._queue: "queue.Queue[tuple[int, object] | None]" = queue.Queue()
self._thread = threading.Thread(target=self._run, name="OCRThread", daemon=True)
def _set_busy(self, busy: bool) -> None:
if self._busy != busy:
self._busy = busy
self.busy_changed.emit(busy)
def start(self) -> None:
"""启动后台线程并执行 warmup。"""
self._thread.start()
def stop(self, timeout_ms: int = 8000) -> bool:
"""请求停止后台线程并等待退出(后台线程为 daemon退出失败也不阻塞进程"""
def run(self):
try:
result = self.ocr.ocr(self.image_path, cls=False)
ocr_texts = []
if result and result[0]:
for line in result[0]:
if line and len(line) >= 2:
ocr_texts.append(line[1][0])
record = extract_info(ocr_texts)
self.finished.emit(record, ocr_texts)
self._stop_event.set()
# 用 sentinel 唤醒阻塞在 queue.get() 的线程
try:
self._queue.put_nowait(None)
except Exception:
pass
self._thread.join(timeout=max(0.0, timeout_ms / 1000.0))
return not self._thread.is_alive()
except Exception:
return False
def _ensure_ocr(self) -> None:
if self._ocr is None:
logger.info("OCR ensure_ocr: 开始创建 PaddleOCR线程=%s", threading.current_thread().name)
self._ocr = create_offline_ocr(models_base_dir=self._models_base_dir, show_log=False)
logger.info("OCR ensure_ocr: PaddleOCR 创建完成")
self.ready.emit()
def _warmup(self) -> None:
"""提前加载 OCR 模型,避免首次识别时才初始化导致“像卡死”"""
logger.info("OCR 预热开始(线程=%s", threading.current_thread().name)
self._ensure_ocr()
logger.info("OCR 预热完成")
def _run(self) -> None:
try:
self._warmup()
except Exception as e:
self.error.emit(str(e))
logger.exception("OCR 预热失败:%s", str(e))
self.init_error.emit(str(e))
return
while not self._stop_event.is_set():
item = None
try:
item = self._queue.get()
except Exception:
continue
if item is None:
# sentinel: stop
break
job_id, images = item
if self._stop_event.is_set():
break
self._process_job(job_id, images)
@pyqtSlot(int, object)
def process(self, job_id: int, images: object) -> None:
"""接收 UI 请求:把任务放进队列,由后台线程串行处理。"""
if self._stop_event.is_set():
self.error.emit(job_id, "OCR 服务正在关闭,请稍后重试。")
return
# 忙碌或已有排队任务时,直接拒绝,避免积压导致“看起来一直在识别”
if self._busy or (not self._queue.empty()):
self.error.emit(job_id, "OCR 正在进行中,请稍后再试。")
return
try:
# 注意:这里不做耗时工作,只入队,避免阻塞 UI
self._queue.put_nowait((job_id, images))
except Exception as e:
self.error.emit(job_id, f"OCR 入队失败:{str(e)}")
def _process_job(self, job_id: int, images: object) -> None:
self._set_busy(True)
try:
self._ensure_ocr()
if not isinstance(images, (list, tuple)) or len(images) == 0:
raise ValueError("内部错误:未传入有效图片数据")
shapes = []
for img in images:
try:
shapes.append(getattr(img, "shape", None))
except Exception:
shapes.append(None)
logger.info("OCR job=%s 开始images=%s", job_id, shapes)
ocr_texts: list[str] = []
for img in images:
if img is None:
continue
result = self._ocr.ocr(img, cls=False)
if result and result[0]:
for line in result[0]:
if line and len(line) >= 2:
ocr_texts.append(line[1][0])
record = extract_info(ocr_texts)
logger.info(
"OCR job=%s 完成lines=%s, record_keys=%s",
job_id,
len(ocr_texts),
list(record.keys()),
)
self.finished.emit(job_id, record, ocr_texts)
except Exception as e:
logger.exception("OCR job=%s 失败:%s", job_id, str(e))
self.error.emit(job_id, str(e))
finally:
self._set_busy(False)
class MainWindow(QMainWindow):
request_ocr = pyqtSignal(int, object)
def __init__(self):
super().__init__()
self.setWindowTitle("📮 信封信息提取系统")
self.setMinimumSize(1200, 700)
# 初始化 OCR
self.statusBar().showMessage("正在加载 OCR 模型...")
QApplication.processEvents()
self.ocr = PaddleOCR(use_textline_orientation=True, lang="ch", show_log=False)
self.statusBar().showMessage("OCR 模型加载完成")
# OCR 工作线程(避免 UI 卡死)
self._ocr_job_id = 0
self._ocr_start_time_by_job: dict[int, float] = {}
self._ocr_ready = False
self._ocr_busy = False
self._shutting_down = False
self._ocr_timeout_prompted = False
# 摄像头
self.cap = None
self.timer = QTimer()
self.timer.timeout.connect(self.update_frame)
self._frame_fail_count = 0
# 状态栏进度(识别中显示)
self._progress = QProgressBar()
self._progress.setMaximumWidth(160)
self._progress.setVisible(False)
self.statusBar().addPermanentWidget(self._progress)
# OCR 看门狗:显示耗时、并在疑似卡住时提示重启
self._ocr_watchdog = QTimer()
self._ocr_watchdog.setInterval(300)
self._ocr_watchdog.timeout.connect(self._tick_ocr_watchdog)
# 数据
self.records = []
@@ -73,6 +253,191 @@ class MainWindow(QMainWindow):
self.init_ui()
self.load_cameras()
# 主线程预加载:在 macOS 上,必须在主线程 import paddleocr否则后台线程会卡死
self.statusBar().showMessage("正在加载 OCR 模块...")
QApplication.processEvents()
try:
logger.info("主线程预加载import paddleocr")
import paddleocr # noqa: F401
logger.info("主线程预加载paddleocr 导入完成")
except Exception as e:
logger.error("主线程预加载失败:%s", e, exc_info=True)
QMessageBox.critical(self, "启动失败", f"无法加载 OCR 模块:{e}")
raise
# OCR 服务放在 UI 初始化之后启动,避免 ready/busy 信号回调时 btn_capture 尚未创建
self.statusBar().showMessage("正在启动 OCR 服务...")
QApplication.processEvents()
try:
self._init_ocr_service()
except FileNotFoundError as e:
QMessageBox.critical(self, "离线模型缺失", str(e))
raise
except Exception as e:
QMessageBox.critical(self, "启动失败", str(e))
raise
def shutdown(self, force: bool = False) -> None:
"""停止摄像头并关闭后台服务,避免退出时后台任务仍在运行。"""
if self._shutting_down:
return
self._shutting_down = True
# 先停止摄像头,避免继续读帧
try:
if self.cap:
self.timer.stop()
self.cap.release()
self.cap = None
except Exception:
pass
try:
self._stop_ocr_service(force=force)
except Exception:
pass
def _stop_ocr_service(self, force: bool = False) -> None:
"""仅停止 OCR 服务(用于超时重启/退出)。"""
try:
self._ocr_watchdog.stop()
except Exception:
pass
self._ocr_ready = False
self._ocr_busy = False
self._ocr_timeout_prompted = False
try:
self._progress.setVisible(False)
except Exception:
pass
try:
svc = getattr(self, "_ocr_service", None)
if svc is not None:
ok = svc.stop(timeout_ms=8000 if force else 3000)
if (not ok) and force:
# Python 线程无法可靠“强杀”,这里只做提示并继续退出流程。
logger.warning("OCR 服务停止超时:后台线程可能仍在运行,建议重启应用。")
except Exception:
pass
try:
self._ocr_service = None
except Exception:
pass
def _restart_ocr_service(self) -> None:
"""重启 OCR 服务(用于超时恢复)。"""
if self._shutting_down:
return
self.statusBar().showMessage("正在重启 OCR 服务...")
self._stop_ocr_service(force=True)
self._init_ocr_service()
def _init_ocr_service(self) -> None:
models_dir = get_models_base_dir()
# 先校验模型路径是否存在(缺失直接抛错给 UI
# create_offline_ocr 内部会做更完整校验,这里不提前创建模型,避免阻塞 UI
if not models_dir.exists():
raise FileNotFoundError(f"离线模型目录不存在:{models_dir}")
self._ocr_service = OCRService(models_base_dir=models_dir)
# 注意OCRService 内部使用 Python 线程做 warmup 与推理。
# 这里强制使用 QueuedConnection确保 UI 回调始终在主线程执行。
self.request_ocr.connect(self._ocr_service.process, Qt.ConnectionType.QueuedConnection)
self._ocr_service.ready.connect(self._on_ocr_ready, Qt.ConnectionType.QueuedConnection)
self._ocr_service.init_error.connect(self._on_ocr_init_error, Qt.ConnectionType.QueuedConnection)
self._ocr_service.busy_changed.connect(self._on_ocr_busy_changed, Qt.ConnectionType.QueuedConnection)
self._ocr_service.finished.connect(self._on_ocr_finished_job, Qt.ConnectionType.QueuedConnection)
self._ocr_service.error.connect(self._on_ocr_error_job, Qt.ConnectionType.QueuedConnection)
self._ocr_service.start()
def _on_ocr_ready(self) -> None:
try:
self._ocr_ready = True
self.statusBar().showMessage("OCR 模型已加载(离线)")
btn = getattr(self, "btn_capture", None)
if btn is not None:
btn.setEnabled(self.cap is not None and not self._ocr_busy)
logger.info("OCR ready")
except Exception as e:
logger.exception("处理 OCR ready 回调失败:%s", str(e))
def _on_ocr_init_error(self, error: str) -> None:
self.statusBar().showMessage("OCR 模型加载失败")
QMessageBox.critical(self, "OCR 初始化失败", error)
logger.error("OCR init error: %s", error)
def _on_ocr_busy_changed(self, busy: bool) -> None:
try:
self._ocr_busy = busy
if busy:
self._progress.setRange(0, 0) # 不确定进度条
self._progress.setVisible(True)
self._ocr_timeout_prompted = False
self._ocr_watchdog.start()
else:
self._progress.setVisible(False)
self._ocr_watchdog.stop()
btn = getattr(self, "btn_capture", None)
if btn is not None:
btn.setEnabled(self.cap is not None and self._ocr_ready and not busy)
except Exception as e:
logger.exception("处理 OCR busy 回调失败:%s", str(e))
def _tick_ocr_watchdog(self) -> None:
"""识别进行中:更新耗时,超时则提示是否重启 OCR 服务。"""
if not self._ocr_busy:
return
start_t = self._ocr_start_time_by_job.get(self._ocr_job_id)
if start_t is None:
return
cost = time.monotonic() - start_t
self.statusBar().showMessage(f"正在识别...(已用 {cost:.1f}s")
# 超时保护:底层推理偶发卡住时,让用户可以自救
if cost >= 45 and not self._ocr_timeout_prompted:
self._ocr_timeout_prompted = True
reply = QMessageBox.question(
self,
"识别超时",
"识别已超过 45 秒仍未完成,可能卡住。\n\n是否重启 OCR 服务?\n(若仍无响应,建议直接退出并重新打开应用)",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
)
if reply == QMessageBox.StandardButton.Yes:
self._restart_ocr_service()
def _on_ocr_finished_job(self, job_id: int, record: dict, texts: list) -> None:
start_t = self._ocr_start_time_by_job.pop(job_id, None)
# 只处理最新一次请求,避免旧结果回写
if job_id != self._ocr_job_id:
return
self.records.append(record)
self.update_table()
cost = ""
if start_t is not None:
cost = f"(耗时 {time.monotonic() - start_t:.1f}s"
self.statusBar().showMessage(f"识别完成: {record.get('联系人/单位名', '未知')}{cost}")
logger.info("OCR job=%s UI 回写完成 %s", job_id, cost)
def _on_ocr_error_job(self, job_id: int, error: str) -> None:
self._ocr_start_time_by_job.pop(job_id, None)
if job_id != self._ocr_job_id:
return
self.statusBar().showMessage("识别失败")
QMessageBox.warning(self, "识别失败", error)
logger.error("OCR job=%s error: %s", job_id, error)
def init_ui(self):
central = QWidget()
self.setCentralWidget(central)
@@ -111,7 +476,7 @@ class MainWindow(QMainWindow):
self.btn_capture.setFont(QFont("", 14))
self.btn_capture.setStyleSheet("background-color: #ff4b4b; color: white; border-radius: 8px;")
self.btn_capture.clicked.connect(self.capture_and_recognize)
self.btn_capture.setEnabled(False)
self.btn_capture.setEnabled(False) # 等摄像头连接 + OCR ready 后启用
left_layout.addWidget(self.btn_capture)
# 右侧:结果列表
@@ -152,27 +517,81 @@ class MainWindow(QMainWindow):
layout.addWidget(splitter)
# 快捷键
self.shortcut_capture = QAction(self)
self.shortcut_capture.setShortcut("Space")
self.shortcut_capture.triggered.connect(self.capture_and_recognize)
self.addAction(self.shortcut_capture)
# macOS/Qt 下 Space 经常被控件吞掉(按钮激活/表格选择等),用 ApplicationShortcut 更稳
self._shortcut_capture2 = QShortcut(QKeySequence("Space"), self)
self._shortcut_capture2.setContext(Qt.ShortcutContext.ApplicationShortcut)
self._shortcut_capture2.activated.connect(self.capture_and_recognize)
def load_cameras(self):
"""扫描可用摄像头"""
self.cam_combo.clear()
for i in range(10):
cap = cv2.VideoCapture(i)
if cap.isOpened():
ret, _ = cap.read()
if ret:
self.cam_combo.addItem(f"摄像头 {i}", i)
cap.release()
# macOS 上设备编号会变化(尤其“连续互通相机”/虚拟摄像头),这里多扫一些更稳。
# 若你想减少探测范围,可设置环境变量 POST_OCR_MAX_CAMERAS例如POST_OCR_MAX_CAMERAS=3
try:
max_probe = int(os.environ.get("POST_OCR_MAX_CAMERAS", "").strip() or "10")
except Exception:
max_probe = 10
logger.info("开始扫描摄像头max_probe=%s", max_probe)
if self.cam_combo.count() == 0:
self.cam_combo.addItem("未检测到摄像头", -1)
self.statusBar().showMessage("未检测到摄像头,请连接 Droidcam")
found = 0
for i in range(max_probe):
cap = None
try:
cap = self._open_capture(i)
if cap is None or (not cap.isOpened()):
continue
# 暖机:有些设备首帧为空或延迟较大(尤其手机/虚拟摄像头)
has_frame = False
for _ in range(25):
ret, frame = cap.read()
if ret and frame is not None and frame.size > 0:
has_frame = True
break
label = f"摄像头 {i}" if has_frame else f"摄像头 {i}(未验证画面)"
self.cam_combo.addItem(label, i)
logger.info("摄像头探测id=%s opened, has_frame=%s", i, has_frame)
found += 1
finally:
try:
if cap is not None:
cap.release()
except Exception:
pass
if found == 0:
# 自动探测可能因权限/占用/设备延迟失败;仍提供手动尝试入口,避免用户被“无设备”卡住
for i in range(max_probe):
self.cam_combo.addItem(f"摄像头 {i}(手动尝试)", i)
self.statusBar().showMessage(
"未能自动检测到可用摄像头。"
"如为 macOS请在 系统设置->隐私与安全->相机 中允许当前终端/应用访问;"
"并确保 iPhone 已解锁且未被其他应用占用。"
)
else:
self.statusBar().showMessage(f"检测到 {self.cam_combo.count()} 个摄像头")
self.statusBar().showMessage(f"检测到 {found} 个摄像头")
logger.info("摄像头扫描结束found=%s", found)
def _open_capture(self, cam_id: int):
"""
打开摄像头。
macOS 上优先使用 AVFoundation 后端(对“连续互通相机”等更友好)。
"""
if sys.platform == "darwin" and hasattr(cv2, "CAP_AVFOUNDATION"):
cap = cv2.VideoCapture(cam_id, cv2.CAP_AVFOUNDATION)
try:
if cap is not None and cap.isOpened():
return cap
except Exception:
pass
try:
if cap is not None:
cap.release()
except Exception:
pass
return cv2.VideoCapture(cam_id)
def toggle_camera(self):
"""连接/断开摄像头"""
@@ -182,19 +601,51 @@ class MainWindow(QMainWindow):
QMessageBox.warning(self, "错误", "请先选择有效的摄像头")
return
self.cap = cv2.VideoCapture(cam_id)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
self.cap = self._open_capture(cam_id)
if self.cap.isOpened():
# 不强制分辨率:某些设备(尤其虚拟摄像头/连续互通相机)被强设后会输出黑屏
# self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
# self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# 暖机读取,尽早发现“能打开但无画面”的情况
ok = False
for _ in range(20):
ret, frame = self.cap.read()
if ret and frame is not None and frame.size > 0:
ok = True
break
if not ok:
self.cap.release()
self.cap = None
QMessageBox.warning(
self,
"摄像头无画面",
"摄像头已打开,但读取不到画面。\n\n"
"排查建议:\n"
"1) macOS系统设置 -> 隐私与安全 -> 相机,允许当前运行的终端/应用访问\n"
"2) 连续互通相机:保持 iPhone 解锁并靠近 Mac且未被其他应用占用\n"
"3) 依次切换“摄像头 0/1/2”尝试\n",
)
return
self.timer.start(30) # ~33 FPS
self.btn_connect.setText("⏹ 断开")
self.btn_capture.setEnabled(True)
self.btn_capture.setEnabled(self._ocr_ready and not self._ocr_busy)
self.cam_combo.setEnabled(False)
self.statusBar().showMessage("摄像头已连接")
else:
self.cap = None
QMessageBox.warning(self, "错误", "无法打开摄像头")
QMessageBox.warning(
self,
"无法打开摄像头",
"无法打开摄像头。\n\n"
"排查建议:\n"
"1) macOS系统设置 -> 隐私与安全 -> 相机,允许当前运行的终端/应用访问\n"
"2) 如果有其他应用正在使用摄像头(微信/会议软件/浏览器),请先退出再试\n"
"3) 连续互通相机:保持 iPhone 解锁并靠近 Mac且未被其他应用占用\n"
"4) 在下拉框中切换不同编号0/1/2/3...)重试\n",
)
else:
self.timer.stop()
self.cap.release()
@@ -211,7 +662,8 @@ class MainWindow(QMainWindow):
return
ret, frame = self.cap.read()
if ret:
if ret and frame is not None and frame.size > 0:
self._frame_fail_count = 0
# 绘制扫描框
h, w = frame.shape[:2]
# 框的位置:上方 70%,编号在下方
@@ -247,10 +699,21 @@ class MainWindow(QMainWindow):
qimg = QImage(rgb.data, w, h, ch * w, QImage.Format.Format_RGB888)
scaled = qimg.scaled(self.video_label.size(), Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation)
self.video_label.setPixmap(QPixmap.fromImage(scaled))
else:
self._frame_fail_count += 1
if self._frame_fail_count == 1:
self.statusBar().showMessage("摄像头无画面:请检查权限/切换摄像头")
def capture_and_recognize(self):
"""拍照并识别"""
if self.cap is None:
self.statusBar().showMessage("请先连接摄像头")
return
if not self._ocr_ready:
self.statusBar().showMessage("OCR 模型尚未就绪,请稍等")
return
if self._ocr_busy:
self.statusBar().showMessage("正在识别中,请稍后再按空格")
return
ret, frame = self.cap.read()
@@ -258,35 +721,56 @@ class MainWindow(QMainWindow):
self.statusBar().showMessage("拍照失败")
return
# 保存临时文件
tmp_path = tempfile.mktemp(suffix=".jpg")
cv2.imwrite(tmp_path, frame)
# 裁剪两块 ROI主信息框 + 编号区域),显著减小像素量,提升速度与稳定性
h, w = frame.shape[:2]
x1, y1 = int(w * 0.06), int(h * 0.08)
x2 = int(w * 0.94)
y2_box = int(h * 0.78)
roi_images = []
try:
roi_box = frame[y1:y2_box, x1:x2]
if roi_box is not None and roi_box.size > 0:
roi_images.append(roi_box)
except Exception:
pass
try:
# 编号一般在底部中间,取较小区域即可
nx1, nx2 = int(w * 0.30), int(w * 0.70)
ny1, ny2 = int(h * 0.80), int(h * 0.98)
roi_num = frame[ny1:ny2, nx1:nx2]
if roi_num is not None and roi_num.size > 0:
roi_images.append(roi_num)
except Exception:
pass
if not roi_images:
self.statusBar().showMessage("拍照失败:未截取到有效区域")
return
# 超大分辨率下适当缩放(提高稳定性与速度)
resized_images = []
for img in roi_images:
try:
max_w = 1400
if img.shape[1] > max_w:
scale = max_w / img.shape[1]
img = cv2.resize(img, (int(img.shape[1] * scale), int(img.shape[0] * scale)))
except Exception:
pass
resized_images.append(img)
logger.info("UI 触发识别frame=%s, rois=%s", getattr(frame, "shape", None), [getattr(i, "shape", None) for i in resized_images])
self.statusBar().showMessage("正在识别...")
self.btn_capture.setEnabled(False)
# 启动 OCR 线程
self.worker = OCRWorker(self.ocr, tmp_path)
self.worker.finished.connect(lambda r, t: self.on_ocr_finished(r, t, tmp_path))
self.worker.error.connect(lambda e: self.on_ocr_error(e, tmp_path))
self.worker.start()
def on_ocr_finished(self, record, texts, tmp_path):
"""OCR 完成"""
os.unlink(tmp_path)
self.btn_capture.setEnabled(True)
# 添加到记录
self.records.append(record)
self.update_table()
self.statusBar().showMessage(f"识别完成: {record.get('联系人/单位名', '未知')}")
def on_ocr_error(self, error, tmp_path):
"""OCR 错误"""
os.unlink(tmp_path)
self.btn_capture.setEnabled(True)
self.statusBar().showMessage(f"识别失败: {error}")
# 派发到 OCR 工作线程
self._ocr_job_id += 1
job_id = self._ocr_job_id
self._ocr_start_time_by_job[job_id] = time.monotonic()
self.request_ocr.emit(job_id, resized_images)
def update_table(self):
"""更新表格"""
@@ -334,18 +818,33 @@ class MainWindow(QMainWindow):
def closeEvent(self, event):
"""关闭窗口"""
if self.cap:
self.timer.stop()
self.cap.release()
if self._ocr_busy:
reply = QMessageBox.question(
self,
"正在识别",
"当前正在识别,直接关闭可能导致任务中断。\n\n是否强制退出?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
)
if reply == QMessageBox.StandardButton.No:
event.ignore()
return
self.shutdown(force=True)
event.accept()
return
self.shutdown(force=False)
event.accept()
def main():
log_file = setup_logging()
app = QApplication(sys.argv)
app.setStyle("Fusion")
window = MainWindow()
window.show()
app.aboutToQuit.connect(lambda: window.shutdown(force=False))
logger.info("应用启动完成PID=%s,日志=%s", os.getpid(), str(log_file))
sys.exit(app.exec())

178
src/ocr_offline.py Normal file
View File

@@ -0,0 +1,178 @@
# -*- coding: utf-8 -*-
"""
离线 OCR 初始化工具
目标:
1. Windows 交付 zip 目录包时,模型随包携带,程序完全离线可用
2. 如果模型缺失,明确报错并阻止 PaddleOCR 自动联网下载
3. 统一桌面版 / Web 版 / 命令行的 OCR 初始化逻辑,避免参数漂移
"""
from __future__ import annotations
import os
import sys
from dataclasses import dataclass
from pathlib import Path
import logging
@dataclass(frozen=True)
class OCRModelPaths:
"""PP-OCRv4中文模型目录结构对应 paddleocr==2.10.0 默认下载结构)"""
base_dir: Path
det_dir: Path
rec_dir: Path
cls_dir: Path
def _is_frozen() -> bool:
"""判断是否为 PyInstaller 打包后的运行环境"""
return bool(getattr(sys, "frozen", False))
def get_app_base_dir() -> Path:
"""
获取“应用根目录”:
- 开发态项目根目录src 的上一级)
- 打包态exe 所在目录
"""
if _is_frozen():
return Path(sys.executable).resolve().parent
return Path(__file__).resolve().parent.parent
def get_models_base_dir(app_base_dir: Path | None = None) -> Path:
"""默认模型目录:与应用同级的 models/"""
base = app_base_dir or get_app_base_dir()
return base / "models"
def get_ppocr_v4_ch_model_paths(models_base_dir: Path | None = None) -> OCRModelPaths:
"""
返回 PP-OCRv4中文默认模型目录。
注意:这里的目录结构与 PaddleOCR 2.x 默认下载到 ~/.paddleocr 的结构一致,
只是我们把 BASE_DIR 指向了随包的 models/,从而实现离线。
"""
base = models_base_dir or get_models_base_dir()
det_dir = base / "whl" / "det" / "ch" / "ch_PP-OCRv4_det_infer"
rec_dir = base / "whl" / "rec" / "ch" / "ch_PP-OCRv4_rec_infer"
cls_dir = base / "whl" / "cls" / "ch_ppocr_mobile_v2.0_cls_infer"
return OCRModelPaths(base_dir=base, det_dir=det_dir, rec_dir=rec_dir, cls_dir=cls_dir)
def _configure_windows_dll_search_path(app_base_dir: Path) -> None:
"""
Windows 下 PaddlePaddle 依赖的 mkml.dll 等动态库,通常位于打包目录的:
- <exe_dir>/_internal/paddle/libs
某些情况下动态库加载不会自动命中该路径error code 126需要显式加入 DLL 搜索路径。
"""
if not sys.platform.startswith("win"):
return
# Python 3.8+ on Windows 支持 os.add_dll_directory
add_dll_dir = getattr(os, "add_dll_directory", None)
internal_dir = app_base_dir / "_internal"
candidates = [
internal_dir / "paddle" / "libs",
internal_dir / "paddle",
internal_dir,
app_base_dir,
]
# 同时设置 PATH兼容不走 add_dll_directory 的加载路径
path_parts = [os.environ.get("PATH", "")]
for p in candidates:
if p.exists():
if add_dll_dir is not None:
try:
add_dll_dir(str(p))
except Exception:
# add_dll_directory 在某些权限/路径场景可能失败PATH 兜底
pass
path_parts.insert(0, str(p))
os.environ["PATH"] = ";".join([x for x in path_parts if x])
def _check_infer_dir(dir_path: Path) -> bool:
"""判断一个推理模型目录是否完整(至少包含 inference.pdmodel / inference.pdiparams"""
return (dir_path / "inference.pdmodel").exists() and (dir_path / "inference.pdiparams").exists()
def verify_offline_models_or_raise(model_paths: OCRModelPaths) -> None:
"""
校验离线模型是否存在。
设计选择:
- 直接抛异常:由上层(桌面/UI/CLI决定如何展示错误
- 不允许缺失时继续初始化:避免触发 PaddleOCR 自动联网下载
"""
missing = []
if not _check_infer_dir(model_paths.det_dir):
missing.append(str(model_paths.det_dir))
if not _check_infer_dir(model_paths.rec_dir):
missing.append(str(model_paths.rec_dir))
if not _check_infer_dir(model_paths.cls_dir):
missing.append(str(model_paths.cls_dir))
if missing:
hint = (
"离线模型缺失,无法在离线模式启动。\n\n"
"缺失目录:\n- "
+ "\n- ".join(missing)
+ "\n\n"
"解决方式:\n"
"1) 在有网机器执行python scripts/prepare_models.py --models-dir models\n"
"2) 将生成的 models/ 目录随 zip 包一起分发(与 exe 同级)"
)
raise FileNotFoundError(hint)
def create_offline_ocr(models_base_dir: Path | None = None, show_log: bool = False):
"""
创建 PaddleOCR离线模式
关键点:
- 通过环境变量 PADDLE_OCR_BASE_DIR 将默认下载/查找目录指向随包 models/(与 paddleocr==2.10.0 行为匹配)
- 显式传入 det/rec/cls 的模型目录,避免目录不一致导致重复下载
- 如果模型缺失,提前报错,阻止联网下载
"""
log = logging.getLogger("post_ocr.ocr")
model_paths = get_ppocr_v4_ch_model_paths(models_base_dir=models_base_dir)
verify_offline_models_or_raise(model_paths)
# Windows 打包运行时,先配置 DLL 搜索路径,避免 mkml.dll 等加载失败error code 126
_configure_windows_dll_search_path(get_app_base_dir())
# 禁用联网检查(加快启动),并把默认 base_dir 指向随包 models/
os.environ["PADDLE_PDX_DISABLE_MODEL_SOURCE_CHECK"] = "True"
os.environ["PADDLE_OCR_BASE_DIR"] = str(model_paths.base_dir)
# 延迟导入:确保环境变量在 paddleocr 模块加载前设置生效
log.info("create_offline_ocr: importing paddleocr (base_dir=%s)", str(model_paths.base_dir))
from paddleocr import PaddleOCR # pylint: disable=import-error
# 注意paddleocr==2.10.0 不支持 use_textline_orientation 这类 3.x pipeline 参数
log.info("create_offline_ocr: creating PaddleOCR(det=%s, rec=%s)", str(model_paths.det_dir), str(model_paths.rec_dir))
ocr = PaddleOCR(
lang="ch",
show_log=show_log,
use_angle_cls=False,
det_model_dir=str(model_paths.det_dir),
rec_model_dir=str(model_paths.rec_dir),
cls_model_dir=str(model_paths.cls_dir),
)
log.info("create_offline_ocr: PaddleOCR created")
return ocr