Compare commits

..

12 Commits

Author SHA1 Message Date
empty
b68612dd53 feat: 切换可选RapidOCR后端并修复macOS识别卡住 2026-02-25 09:36:37 +08:00
let5sne.win10
7ebd5badf5 fix: 添加freeze_support防止打包后子进程重复启动主窗口
PyInstaller + Windows spawn模式下,子进程会重新执行主脚本,
缺少freeze_support()导致无限循环创建窗口和OCR进程。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 23:52:09 +08:00
let5sne.win10
8f6a4fbc68 fix: 修复ROI切片y_offset丢失导致地址拼接失败的问题
resize步骤未传递y_offset,导致OCR worker无法还原切片坐标,
跨切片的地址行(如"楼3号")无法与前一行正确拼接。
同时重写extract_with_layout为锚点算法(邮编/电话锚点+单栏/多栏自动切换),
支持单位名含地址关键字、电话同行等场景。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 23:38:33 +08:00
empty
6ce4b7b363 feat: 提升OCR稳定性并支持多栏地址解析 2026-02-24 22:45:11 +08:00
let5sne.win10
1d6ee0a95e feat: 添加Android app自适应图标
- 深蓝渐变背景 + 白色摄像头/信封前景的vector drawable图标
- 支持Android 8.0+ 自适应图标(圆形/圆角方形等)
- Manifest添加icon和roundIcon引用

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 21:40:07 +08:00
let5sne.win10
b2ec97215f feat: 美化Android app UI,Material风格重设计
- 新增深蓝工业风配色方案和圆角卡片/按钮样式
- 顶部标题栏带运行状态指示灯
- 按钮增加disabled灰色状态,启动/停止/拍照三色区分
- 状态卡片支持ScrollView滚动,内容不再截断
- monospace字体显示连接命令,提升可读性

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 21:36:41 +08:00
let5sne.win10
d2bc33d8c4 chore: 移除误追踪的构建报告文件
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 20:58:41 +08:00
let5sne.win10
46fdff508a chore: 清理gradle废弃配置,移除enableJetifier
- 移除已废弃的enableJetifier=true
- 添加generateSyncIssueWhenLibraryConstraintsAreEnabled抑制重复警告
- 其余deprecated属性暂保留(删除会导致构建失败)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 20:54:22 +08:00
let5sne.win10
99b1849e7f fix: 修复Android app UI不显示信息及拍照保存问题
- 切换到Light主题,解决深色模式下文字不可见
- 重新布局为横屏左右分栏,显示设备IP和连接方式
- 拍照改用MediaStore写入系统相册,修复原acquireLatestImage竞争导致的空帧
- 修正ImageFormat导入包路径
- 补充gradle-wrapper.jar

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 20:50:33 +08:00
let5sne.win10
86cb704eae fix: 解决打包后三个运行时问题
- rthook_paddle.py: stub paddle.utils.cpp_extension,避免Cython缺文件崩溃
- build_exe.py: 显式收集paddle DLLs(mklml.dll等)
- ocr_offline.py: 非ASCII路径自动复制模型到临时目录,绕过PaddlePaddle C++路径限制

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 20:31:28 +08:00
let5sne.win10
dfbab1b61e feat: 重写打包脚本,支持PaddleOCR 2.x离线分发
- build_exe.py: 适配2.x,onedir模式,自动复制models/到dist
- ocr_offline.py: 打包态自动检测并使用本地离线模型

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 19:30:27 +08:00
let5sne.win10
bac1818ed0 chore: 更新.gitignore,排除日志和临时文件
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 19:20:16 +08:00
31 changed files with 2036 additions and 1012 deletions

17
.gitignore vendored
View File

@@ -64,3 +64,20 @@ build/
dist/
*.exe
*.db
# ==================================================
# 日志 / 临时文件
# ==================================================
*.log
nul
hook-paddlex.py
# ==================================================
# IDE / 工具配置(本地)
# ==================================================
.claude/
# ==================================================
# Android - release APK已单独管理
# ==================================================
android-app/app/release/

View File

@@ -28,6 +28,8 @@
sudo apt-get install -y libgl1-mesa-glx libglib2.0-0
# 安装 Python 依赖
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
@@ -36,18 +38,42 @@ pip install -r requirements.txt
**命令行批处理**
```bash
# 将图片放入 data/input/ 目录
python src/main.py
.venv/bin/python src/main.py
# 结果保存在 data/output/result.xlsx
```
**桌面应用**
```bash
python src/desktop.py
.venv/bin/python src/desktop.py
# 启动 PyQt6 窗口,可选择摄像头实时拍照识别
```
### 3. OCR 后端切换RapidOCR / PaddleOCR
默认后端为 **RapidOCR(ONNX)**,可通过环境变量切换:
```bash
# 默认RapidOCR推荐跨平台更稳
POST_OCR_BACKEND=rapidocr .venv/bin/python src/desktop.py
# 强制使用 PaddleOCR
POST_OCR_BACKEND=paddle .venv/bin/python src/desktop.py
# 自动:优先 RapidOCR失败回退 PaddleOCR
POST_OCR_BACKEND=auto .venv/bin/python src/desktop.py
```
常用相关环境变量:
- `POST_OCR_BACKEND_FALLBACK_PADDLE=1|0`:是否允许回退到 Paddle默认
- `POST_OCR_BACKEND=auto` 时为 `1`
- 用户显式 `POST_OCR_BACKEND=rapidocr` 时为 `0`
- `POST_OCR_MP_START_METHOD=spawn|fork`:强制指定 OCR 子进程启动方式macOS 默认rapidocr 用 `spawn`paddle 用 `fork`
- `POST_OCR_MAIN_SPLIT=1~4`:主 ROI 分片数(默认 2
- `POST_OCR_MAX_ROI_WIDTH=600+`:识别前缩放宽度上限(默认 960
- `POST_OCR_JOB_TIMEOUT_SEC`:单次识别超时秒数(默认 25
---
## Windows 桌面离线版zip 目录包)

View File

@@ -10,8 +10,10 @@
<application
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:roundIcon="@mipmap/ic_launcher_round"
android:label="USB摄像头"
android:theme="@style/Theme.AppCompat.NoActionBar">
android:theme="@style/Theme.AppCompat.Light.NoActionBar">
<activity
android:name=".MainActivity"

View File

@@ -1,21 +1,16 @@
package com.usbwebcam
import android.content.ContentValues
import android.content.Context
import android.graphics.Bitmap
import android.graphics.ImageFormat
import android.graphics.Rect
import android.graphics.YuvImage
import android.hardware.camera2.*
import android.media.Image
import android.graphics.ImageFormat
import android.media.ImageReader
import android.os.Build
import android.os.Handler
import android.os.HandlerThread
import android.provider.MediaStore
import android.util.Size
import android.view.Surface
import android.view.SurfaceHolder
import java.io.ByteArrayOutputStream
import java.io.File
import java.io.FileOutputStream
import java.text.SimpleDateFormat
import java.util.*
@@ -29,6 +24,9 @@ class CameraHelper(
private var backgroundThread: HandlerThread? = null
private var backgroundHandler: Handler? = null
@Volatile
private var latestJpeg: ByteArray? = null
private val cameraManager = context.getSystemService(Context.CAMERA_SERVICE) as CameraManager
private val cameraId = cameraManager.cameraIdList.firstOrNull {
cameraManager.getCameraCharacteristics(it)
@@ -84,7 +82,6 @@ class CameraHelper(
private fun createCameraPreviewSession() {
try {
// 获取支持的尺寸
val characteristics = cameraManager.getCameraCharacteristics(cameraId)
val map = characteristics.get(CameraCharacteristics.SCALER_STREAM_CONFIGURATION_MAP)
val previewSize = map?.getOutputSizes(SurfaceHolder::class.java)?.firstOrNull {
@@ -125,26 +122,47 @@ class CameraHelper(
val buffer = image.planes[0].buffer
val bytes = ByteArray(buffer.remaining())
buffer.get(bytes)
latestJpeg = bytes
onFrame(bytes, image.width, image.height)
} finally {
image.close()
}
}
fun captureAndSave() {
// 保存最一帧
imageReader?.acquireLatestImage()?.use { image ->
val buffer = image.planes[0].buffer
val bytes = ByteArray(buffer.remaining())
buffer.get(bytes)
/**
* 保存最一帧到系统相册,返回是否成功
*/
fun captureAndSave(): Boolean {
val jpeg = latestJpeg ?: return false
val timestamp = SimpleDateFormat("yyyyMMdd_HHmmss", Locale.getDefault()).format(Date())
val file = File(
context.getExternalFilesDir(null),
"envelope_$timestamp.jpg"
)
val timestamp = SimpleDateFormat("yyyyMMdd_HHmmss", Locale.getDefault()).format(Date())
val filename = "envelope_$timestamp.jpg"
FileOutputStream(file).use { it.write(bytes) }
val contentValues = ContentValues().apply {
put(MediaStore.Images.Media.DISPLAY_NAME, filename)
put(MediaStore.Images.Media.MIME_TYPE, "image/jpeg")
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) {
put(MediaStore.Images.Media.RELATIVE_PATH, "Pictures/信封拍照")
put(MediaStore.Images.Media.IS_PENDING, 1)
}
}
val resolver = context.contentResolver
val uri = resolver.insert(MediaStore.Images.Media.EXTERNAL_CONTENT_URI, contentValues)
?: return false
return try {
resolver.openOutputStream(uri)?.use { it.write(jpeg) }
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) {
contentValues.clear()
contentValues.put(MediaStore.Images.Media.IS_PENDING, 0)
resolver.update(uri, contentValues, null, null)
}
true
} catch (e: Exception) {
resolver.delete(uri, null, null)
e.printStackTrace()
false
}
}
@@ -156,25 +174,15 @@ class CameraHelper(
e.printStackTrace()
}
try {
captureSession?.close()
} catch (e: Exception) {
e.printStackTrace()
}
try { captureSession?.close() } catch (_: Exception) {}
captureSession = null
try {
imageReader?.close()
} catch (e: Exception) {
e.printStackTrace()
}
try { imageReader?.close() } catch (_: Exception) {}
imageReader = null
try {
cameraDevice?.close()
} catch (e: Exception) {
e.printStackTrace()
}
try { cameraDevice?.close() } catch (_: Exception) {}
cameraDevice = null
latestJpeg = null
}
}

View File

@@ -3,12 +3,14 @@ package com.usbwebcam
import android.Manifest
import android.content.pm.PackageManager
import android.os.Bundle
import android.view.View
import android.widget.Button
import android.widget.TextView
import android.widget.Toast
import androidx.activity.result.contract.ActivityResultContracts
import androidx.appcompat.app.AppCompatActivity
import androidx.core.content.ContextCompat
import java.net.NetworkInterface
class MainActivity : AppCompatActivity() {
private var mjpegServer: MjpegServer? = null
@@ -28,29 +30,24 @@ class MainActivity : AppCompatActivity() {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val btnStart = findViewById<Button>(R.id.btn_start)
val btnStop = findViewById<Button>(R.id.btn_stop)
val btnCapture = findViewById<Button>(R.id.btn_capture)
val tvStatus = findViewById<TextView>(R.id.tv_status)
btnStart.setOnClickListener {
findViewById<Button>(R.id.btn_start).setOnClickListener {
if (checkPermission()) {
startCamera()
}
}
btnStop.setOnClickListener {
findViewById<Button>(R.id.btn_stop).setOnClickListener {
stopCamera()
}
btnCapture.setOnClickListener {
cameraHelper?.captureAndSave()
Toast.makeText(this, "图片已保存到相册", Toast.LENGTH_SHORT).show()
findViewById<Button>(R.id.btn_capture).setOnClickListener {
val saved = cameraHelper?.captureAndSave() == true
val msg = if (saved) "图片已保存到相册" else "保存失败,请先启动服务"
Toast.makeText(this, msg, Toast.LENGTH_SHORT).show()
}
if (ContextCompat.checkSelfPermission(
this,
Manifest.permission.CAMERA
this, Manifest.permission.CAMERA
) == PackageManager.PERMISSION_GRANTED
) {
startCamera()
@@ -59,8 +56,7 @@ class MainActivity : AppCompatActivity() {
private fun checkPermission(): Boolean {
return if (ContextCompat.checkSelfPermission(
this,
Manifest.permission.CAMERA
this, Manifest.permission.CAMERA
) == PackageManager.PERMISSION_GRANTED
) {
true
@@ -70,6 +66,19 @@ class MainActivity : AppCompatActivity() {
}
}
private fun getDeviceIp(): String {
try {
NetworkInterface.getNetworkInterfaces()?.toList()?.forEach { intf ->
intf.inetAddresses?.toList()?.forEach { addr ->
if (!addr.isLoopbackAddress && addr is java.net.Inet4Address) {
return addr.hostAddress ?: "未知"
}
}
}
} catch (_: Exception) {}
return "未知"
}
private fun startCamera() {
if (cameraHelper != null) return
@@ -77,17 +86,27 @@ class MainActivity : AppCompatActivity() {
cameraHelper = CameraHelper(this) { frame, _, _ ->
mjpegServer?.updateFrame(frame)
}
// Start the server and update UI once started
mjpegServer?.start {
runOnUiThread {
findViewById<TextView>(R.id.tv_status).text = """
服务运行中
端口: 8080
IP: 无需IP (ADB模式)
USB连接命令:
adb forward tcp:8080 tcp:8080
""".trimIndent()
val ip = getDeviceIp()
findViewById<TextView>(R.id.tv_status).text =
"● 服务运行中\n\n" +
"端口: 8080\n" +
"设备IP: $ip\n\n" +
"USB连接 (推荐):\n" +
" adb forward tcp:8080 tcp:8080\n\n" +
"WiFi连接:\n" +
" http://$ip:8080"
findViewById<TextView>(R.id.tv_ip).apply {
text = "在电脑端浏览器打开上述地址即可查看画面"
visibility = View.VISIBLE
}
findViewById<TextView>(R.id.tv_indicator).apply {
text = "● 运行中"
setTextColor(getColor(R.color.status_running))
}
findViewById<Button>(R.id.btn_start).isEnabled = false
findViewById<Button>(R.id.btn_stop).isEnabled = true
}
@@ -104,6 +123,11 @@ class MainActivity : AppCompatActivity() {
runOnUiThread {
findViewById<TextView>(R.id.tv_status).text = "服务已停止"
findViewById<TextView>(R.id.tv_ip).visibility = View.GONE
findViewById<TextView>(R.id.tv_indicator).apply {
text = "⏸ 未启动"
setTextColor(getColor(R.color.status_stopped))
}
findViewById<Button>(R.id.btn_start).isEnabled = true
findViewById<Button>(R.id.btn_stop).isEnabled = false
}

View File

@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="utf-8"?>
<ripple xmlns:android="http://schemas.android.com/apk/res/android"
android:color="#40FFFFFF">
<item>
<shape android:shape="rectangle">
<solid android:color="@color/accent_green" />
<corners android:radius="10dp" />
</shape>
</item>
</ripple>

View File

@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="utf-8"?>
<ripple xmlns:android="http://schemas.android.com/apk/res/android"
android:color="#40FFFFFF">
<item>
<selector>
<item android:state_enabled="false">
<shape android:shape="rectangle">
<solid android:color="#B0BEC5" />
<corners android:radius="10dp" />
</shape>
</item>
<item>
<shape android:shape="rectangle">
<solid android:color="@color/accent_red" />
<corners android:radius="10dp" />
</shape>
</item>
</selector>
</item>
</ripple>

View File

@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="utf-8"?>
<ripple xmlns:android="http://schemas.android.com/apk/res/android"
android:color="#40FFFFFF">
<item>
<selector>
<item android:state_enabled="false">
<shape android:shape="rectangle">
<solid android:color="#B0BEC5" />
<corners android:radius="10dp" />
</shape>
</item>
<item>
<shape android:shape="rectangle">
<solid android:color="@color/accent_blue" />
<corners android:radius="10dp" />
</shape>
</item>
</selector>
</item>
</ripple>

View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<shape xmlns:android="http://schemas.android.com/apk/res/android"
android:shape="rectangle">
<solid android:color="@color/bg_card" />
<corners android:radius="12dp" />
<stroke android:width="1dp" android:color="@color/divider" />
</shape>

View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<shape xmlns:android="http://schemas.android.com/apk/res/android"
android:shape="rectangle">
<solid android:color="@color/primary" />
<corners android:radius="12dp" />
</shape>

View File

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="utf-8"?>
<!-- 图标背景:深蓝渐变 -->
<vector xmlns:android="http://schemas.android.com/apk/res/android"
android:width="108dp"
android:height="108dp"
android:viewportWidth="108"
android:viewportHeight="108">
<path
android:fillColor="#1B2838"
android:pathData="M0,0 L108,0 L108,108 L0,108 Z" />
<!-- 装饰:对角线条纹 -->
<path
android:fillColor="#22374D"
android:pathData="M0,90 L90,0 L108,0 L108,18 L18,108 L0,108 Z" />
<path
android:fillColor="#22374D"
android:pathData="M0,60 L60,0 L72,0 L0,72 Z" />
</vector>

View File

@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="utf-8"?>
<!-- 图标前景:摄像头 + 信封组合 -->
<vector xmlns:android="http://schemas.android.com/apk/res/android"
android:width="108dp"
android:height="108dp"
android:viewportWidth="108"
android:viewportHeight="108">
<!-- 摄像头机身 -->
<path
android:fillColor="#FFFFFF"
android:pathData="M34,40 L74,40 C76.2,40 78,41.8 78,44 L78,68 C78,70.2 76.2,72 74,72 L34,72 C31.8,72 30,70.2 30,68 L30,44 C30,41.8 31.8,40 34,40 Z" />
<!-- 摄像头镜头外圈 -->
<path
android:fillColor="#1B2838"
android:pathData="M54,48 m-11,0 a11,11 0,1 1,22 0 a11,11 0,1 1,-22 0" />
<!-- 摄像头镜头内圈 -->
<path
android:fillColor="#42A5F5"
android:pathData="M54,48 m-7,0 a7,7 0,1 1,14 0 a7,7 0,1 1,-14 0" />
<!-- 镜头高光 -->
<path
android:fillColor="#90CAF9"
android:pathData="M50,45 m-2,0 a2,2 0,1 1,4 0 a2,2 0,1 1,-4 0" />
<!-- 闪光灯 -->
<path
android:fillColor="#FFD54F"
android:pathData="M68,44 m-2,0 a2,2 0,1 1,4 0 a2,2 0,1 1,-4 0" />
<!-- 信封图标(右下角) -->
<path
android:fillColor="#FFFFFF"
android:pathData="M60,62 L76,62 C77.1,62 78,62.9 78,64 L78,72 C78,73.1 77.1,74 76,74 L60,74 C58.9,74 58,73.1 58,72 L58,64 C58,62.9 58.9,62 60,62 Z" />
<path
android:pathData="M58,63 L68,69 L78,63"
android:strokeWidth="1.2"
android:strokeColor="#1B2838"
android:fillColor="#00000000" />
</vector>

View File

@@ -3,49 +3,152 @@
android:layout_width="match_parent"
android:layout_height="match_parent"
android:orientation="vertical"
android:padding="24dp"
android:gravity="center">
<TextView
android:id="@+id/tv_status"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="等待启动..."
android:textSize="16sp"
android:textColor="#333"
android:layout_marginBottom="32dp" />
android:background="@color/bg_main">
<!-- 顶部标题栏 -->
<LinearLayout
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_width="match_parent"
android:layout_height="56dp"
android:orientation="horizontal"
android:gravity="center">
android:gravity="center_vertical"
android:paddingHorizontal="20dp"
android:background="@drawable/bg_header">
<Button
android:id="@+id/btn_start"
android:layout_width="120dp"
android:layout_height="60dp"
android:text="启动"
<TextView
android:layout_width="0dp"
android:layout_height="wrap_content"
android:layout_weight="1"
android:text="📷 USB 摄像头服务"
android:textSize="18sp"
android:layout_marginEnd="16dp" />
android:textColor="@color/text_on_dark"
android:textStyle="bold" />
<Button
android:id="@+id/btn_stop"
android:layout_width="120dp"
android:layout_height="60dp"
android:text="停止"
android:textSize="18sp"
android:enabled="false" />
<TextView
android:id="@+id/tv_indicator"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="⏸ 未启动"
android:textSize="13sp"
android:textColor="@color/status_stopped" />
</LinearLayout>
<Button
android:id="@+id/btn_capture"
android:layout_width="200dp"
android:layout_height="70dp"
android:text="拍照保存"
android:textSize="20sp"
android:layout_marginTop="24dp"
android:backgroundTint="#4CAF50" />
<!-- 主内容区 -->
<LinearLayout
android:layout_width="match_parent"
android:layout_height="0dp"
android:layout_weight="1"
android:orientation="horizontal"
android:padding="12dp">
<!-- 左侧:状态信息卡片 -->
<LinearLayout
android:layout_width="0dp"
android:layout_height="match_parent"
android:layout_weight="1"
android:orientation="vertical"
android:layout_marginEnd="12dp">
<LinearLayout
style="@style/CardStyle"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:orientation="vertical">
<TextView
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="服务状态"
android:textSize="12sp"
android:textColor="@color/text_secondary"
android:textStyle="bold"
android:textAllCaps="true"
android:letterSpacing="0.08"
android:layout_marginBottom="8dp" />
<View
android:layout_width="match_parent"
android:layout_height="1dp"
android:background="@color/divider"
android:layout_marginBottom="8dp" />
<ScrollView
android:layout_width="match_parent"
android:layout_height="0dp"
android:layout_weight="1"
android:fillViewport="true">
<LinearLayout
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:orientation="vertical">
<TextView
android:id="@+id/tv_status"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text="等待启动..."
android:textSize="14sp"
android:textColor="@color/text_primary"
android:lineSpacingExtra="5dp"
android:fontFamily="monospace" />
<TextView
android:id="@+id/tv_ip"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:textSize="12sp"
android:textColor="@color/text_hint"
android:layout_marginTop="8dp"
android:visibility="gone" />
</LinearLayout>
</ScrollView>
</LinearLayout>
</LinearLayout>
<!-- 右侧:操作按钮 -->
<LinearLayout
android:layout_width="180dp"
android:layout_height="match_parent"
android:orientation="vertical"
android:gravity="center">
<Button
android:id="@+id/btn_start"
style="@style/BtnPrimary"
android:layout_width="match_parent"
android:layout_height="52dp"
android:text="▶ 启动服务"
android:layout_marginBottom="10dp" />
<Button
android:id="@+id/btn_stop"
style="@style/BtnDanger"
android:layout_width="match_parent"
android:layout_height="52dp"
android:text="■ 停止服务"
android:enabled="false"
android:layout_marginBottom="24dp" />
<View
android:layout_width="match_parent"
android:layout_height="1dp"
android:background="@color/divider"
android:layout_marginBottom="24dp" />
<Button
android:id="@+id/btn_capture"
style="@style/BtnCapture"
android:layout_width="match_parent"
android:layout_height="60dp"
android:text="📸 拍照保存" />
</LinearLayout>
</LinearLayout>
</LinearLayout>

View File

@@ -0,0 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<adaptive-icon xmlns:android="http://schemas.android.com/apk/res/android">
<background android:drawable="@drawable/ic_launcher_background" />
<foreground android:drawable="@drawable/ic_launcher_foreground" />
</adaptive-icon>

View File

@@ -0,0 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<adaptive-icon xmlns:android="http://schemas.android.com/apk/res/android">
<background android:drawable="@drawable/ic_launcher_background" />
<foreground android:drawable="@drawable/ic_launcher_foreground" />
</adaptive-icon>

View File

@@ -0,0 +1,34 @@
<?xml version="1.0" encoding="utf-8"?>
<resources>
<!-- 主色调:深蓝工业风 -->
<color name="primary">#1B2838</color>
<color name="primary_dark">#0F1923</color>
<color name="primary_light">#2A3F56</color>
<!-- 强调色 -->
<color name="accent_green">#43A047</color>
<color name="accent_green_dark">#2E7D32</color>
<color name="accent_red">#E53935</color>
<color name="accent_red_dark">#C62828</color>
<color name="accent_blue">#1E88E5</color>
<color name="accent_blue_dark">#1565C0</color>
<!-- 背景 -->
<color name="bg_main">#ECEFF1</color>
<color name="bg_card">#FFFFFF</color>
<color name="bg_status_bar">#1B2838</color>
<!-- 文字 -->
<color name="text_primary">#212121</color>
<color name="text_secondary">#546E7A</color>
<color name="text_hint">#90A4AE</color>
<color name="text_on_dark">#ECEFF1</color>
<color name="text_on_primary">#FFFFFF</color>
<!-- 状态指示 -->
<color name="status_running">#43A047</color>
<color name="status_stopped">#90A4AE</color>
<!-- 分隔线 -->
<color name="divider">#CFD8DC</color>
</resources>

View File

@@ -0,0 +1,33 @@
<?xml version="1.0" encoding="utf-8"?>
<resources>
<!-- 卡片样式 -->
<style name="CardStyle">
<item name="android:background">@drawable/bg_card</item>
<item name="android:padding">16dp</item>
</style>
<!-- 按钮基础样式 -->
<style name="BtnBase">
<item name="android:textSize">16sp</item>
<item name="android:textStyle">bold</item>
<item name="android:textAllCaps">false</item>
<item name="android:stateListAnimator">@null</item>
<item name="android:elevation">2dp</item>
</style>
<style name="BtnPrimary" parent="BtnBase">
<item name="android:background">@drawable/bg_btn_primary</item>
<item name="android:textColor">@color/text_on_primary</item>
</style>
<style name="BtnDanger" parent="BtnBase">
<item name="android:background">@drawable/bg_btn_danger</item>
<item name="android:textColor">@color/text_on_primary</item>
</style>
<style name="BtnCapture" parent="BtnBase">
<item name="android:background">@drawable/bg_btn_capture</item>
<item name="android:textColor">@color/text_on_primary</item>
<item name="android:textSize">18sp</item>
</style>
</resources>

View File

@@ -6,7 +6,7 @@ buildscript {
mavenCentral()
}
dependencies {
classpath 'com.android.tools.build:gradle:9.0.0'
classpath 'com.android.tools.build:gradle:9.0.1'
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}
}

File diff suppressed because one or more lines are too long

View File

@@ -1,7 +1,6 @@
# Project-wide Gradle settings
org.gradle.jvmargs=-Xmx2048m -Dfile.encoding=UTF-8
android.useAndroidX=true
android.enableJetifier=true
android.defaults.buildfeatures.resvalues=true
android.sdk.defaultTargetSdkToCompileSdkIfUnset=false
android.enableAppCompileTimeRClass=false
@@ -12,3 +11,4 @@ android.r8.strictFullModeForKeepRules=false
android.r8.optimizedResourceShrinking=false
android.builtInKotlin=false
android.newDsl=false
android.generateSyncIssueWhenLibraryConstraintsAreEnabled=false

Binary file not shown.

View File

@@ -1,63 +1,108 @@
#!/usr/bin/env python3
"""
打包脚本 - 将桌面程序打包成独立可执行文件
使用方法: pip install pyinstaller && python build_exe.py
打包脚本 - 将桌面程序打包成可离线分发的目录包
使用方法: python build_exe.py [--debug]
调试版本: python build_exe.py --debug
产出: dist/信封信息提取系统/
├── 信封信息提取系统.exe
├── _internal/ (运行时依赖)
└── models/ (OCR 模型,需提前通过 prepare_models.py 准备)
"""
import os
import subprocess
import sys
import shutil
from pathlib import Path
PROJECT_ROOT = Path(__file__).parent)
PROJECT_ROOT = Path(__file__).parent
DIST_NAME = "信封信息提取系统"
# paddle DLLs 所在目录mklml.dll 等不会被 PyInstaller 自动收集)
import paddle as _paddle
PADDLE_LIBS = str(Path(_paddle.__file__).parent / "libs")
def build(debug=False):
"""使用 PyInstaller 打包"""
"""使用 PyInstaller 打包onedir 模式)"""
print("正在打包,请稍候...")
print(f"工作目录: {PROJECT_ROOT}")
print(f"模式: {'调试(带控制台)' if debug else '正式(无控制台)'}")
print("-" * 50)
# 使用 Python -m PyInstaller 方式
# --paths 将 src 目录添加到 Python 路径,避免导入问题
cmd = [
sys.executable,
"-m", "PyInstaller",
"--name=信封信息提取系统",
"--onefile",
"--clean",
f"--name={DIST_NAME}",
"--onedir",
"--noconfirm",
"--paths=src",
# --- hidden imports ---
"--hidden-import=cv2",
"--hidden-import=PIL",
"--hidden-import=processor",
"--hidden-import=ocr_offline",
"--hidden-import=paddleocr",
"--hidden-import=paddle",
# --- paddle DLLsmklml.dll 等不会被自动收集) ---
f"--add-binary={PADDLE_LIBS}/*.dll{os.pathsep}paddle/libs",
# --- runtime hook: stub 掉 paddle 开发模块,避免 Cython 缺文件崩溃 ---
"--runtime-hook=rthook_paddle.py",
# --- 收集 paddleocr 全部数据(模型配置、字典等) ---
"--collect-all=paddleocr",
# --- 元数据(部分库在运行时通过 importlib.metadata 查版本) ---
"--copy-metadata=paddlepaddle",
"--copy-metadata=paddleocr",
]
# 调试模式:显示控制台窗口,便于查看错误
if not debug:
cmd.append("--windowed")
else:
cmd.append("--console")
cmd.append("src/desktop.py")
try:
subprocess.run(cmd, check=True, cwd=str(PROJECT_ROOT))
print("-" * 50)
print("打包完成!")
exe_path = PROJECT_ROOT / "dist" / "信封信息提取系统.exe"
if exe_path.exists():
size_mb = exe_path.stat().st_size / 1024 / 1024
print(f"可执行文件: {exe_path}")
print(f"文件大小: {size_mb:.1f} MB")
else:
print("警告: 未找到输出文件")
except subprocess.CalledProcessError as e:
print("-" * 50)
print(f"打包失败: {e}")
sys.exit(1)
except FileNotFoundError:
print("-" * 50)
print("错误: 未找到 PyInstaller")
print("请先安装: pip install pyinstaller")
print("错误: 未找到 PyInstaller,请先安装: pip install pyinstaller")
sys.exit(1)
print("-" * 50)
# 复制 models/ 到输出目录(与 exe 同级)
dist_dir = PROJECT_ROOT / "dist" / DIST_NAME
models_src = PROJECT_ROOT / "models"
models_dst = dist_dir / "models"
if models_src.exists() and any(models_src.rglob("*.pdmodel")):
print(f"复制离线模型: {models_src} -> {models_dst}")
if models_dst.exists():
shutil.rmtree(models_dst)
shutil.copytree(models_src, models_dst)
else:
print("⚠️ 未找到离线模型。请先执行:")
print(" python scripts/prepare_models.py --models-dir models")
print(" 然后重新打包,或手动将 models/ 放到 dist 目录中。")
# 统计大小
exe_path = dist_dir / f"{DIST_NAME}.exe"
if exe_path.exists():
folder_size = sum(
f.stat().st_size for f in dist_dir.rglob("*") if f.is_file()
) / 1024 / 1024
print(f"\n打包完成!")
print(f"输出目录: {dist_dir}")
print(f"总大小: {folder_size:.1f} MB")
print(f"\n分发方式: 将 {dist_dir.name}/ 整个文件夹打成 zip 即可。")
else:
print("警告: 未找到输出的 exe 文件")
if __name__ == "__main__":
debug = "--debug" in sys.argv

View File

@@ -1,5 +1,6 @@
# 桌面版依赖(本地电脑安装)
# ⚠️ PaddleOCR 3.x 有 PIR+oneDNN 兼容性问题,必须使用 2.x
rapidocr-onnxruntime
paddleocr==2.10.0
paddlepaddle==2.6.2

33
rthook_paddle.py Normal file
View File

@@ -0,0 +1,33 @@
"""
PyInstaller runtime hook: stub 掉 paddle 中仅开发时需要的模块,
避免打包后因缺少 Cython Utility 文件而崩溃。
"""
import types
import sys
class _Stub(types.ModuleType):
"""空模块 stub所有属性访问返回空类"""
def __getattr__(self, name):
if name.startswith("_"):
raise AttributeError(name)
return type(name, (), {})
def _inject(name):
if name not in sys.modules:
m = _Stub(name)
m.__path__ = []
m.__package__ = name
m.__spec__ = None
sys.modules[name] = m
# paddle.utils.cpp_extension 会拉入 Cython 编译器,推理不需要
for _p in [
"paddle.utils.cpp_extension",
"paddle.utils.cpp_extension.cpp_extension",
"paddle.utils.cpp_extension.extension_utils",
"paddle.utils.cpp_extension.jit_compile",
]:
_inject(_p)

View File

@@ -0,0 +1,162 @@
#!/usr/bin/env python3
"""
解析器快速自测脚本
运行方式:
python scripts/test_extract_info.py
"""
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from processor import extract_info # noqa: E402
def _print_case(name: str, result: dict) -> None:
print(f"\n=== {name} ===")
for key in ["编号", "邮编", "地址", "联系人/单位名", "电话"]:
print(f"{key}: {result.get(key, '')}")
def case_layout_multi_column() -> None:
"""多栏场景:左侧地址、右侧单位+联系人。"""
ocr_lines = [
{"text": "518000", "box": [[80, 40], [180, 40], [180, 80], [80, 80]], "source": "main"},
{"text": "广东省深圳市南山区", "box": [[80, 100], [450, 100], [450, 132], [80, 132]], "source": "main"},
{"text": "科技园高新南一道18号", "box": [[80, 140], [520, 140], [520, 172], [80, 172]], "source": "main"},
{"text": "创新大厦3栋1201", "box": [[80, 180], [420, 180], [420, 212], [80, 212]], "source": "main"},
{"text": "华南建设小组办公室", "box": [[620, 182], [960, 182], [960, 214], [620, 214]], "source": "main"},
{"text": "张三13800138000", "box": [[620, 222], [960, 222], [960, 254], [620, 254]], "source": "main"},
{"text": "202602241234567890", "box": [[280, 60], [760, 60], [760, 94], [280, 94]], "source": "number"},
]
result = extract_info(ocr_lines)
_print_case("多栏版面", result)
assert result["邮编"] == "518000"
assert result["电话"] == "13800138000"
assert "广东省深圳市南山区" in result["地址"]
assert "科技园高新南一道18号" in result["地址"]
assert "华南建设小组办公室" in result["联系人/单位名"]
assert result["编号"] == "202602241234567890"
def case_layout_single_column() -> None:
"""单列场景:邮编后连续地址,电话行包含联系人。"""
ocr_lines = [
{"text": "200120", "box": [[90, 42], [188, 42], [188, 76], [90, 76]], "source": "main"},
{"text": "上海市浦东新区世纪大道100号", "box": [[90, 96], [620, 96], [620, 128], [90, 128]], "source": "main"},
{"text": "A座1201室", "box": [[90, 136], [300, 136], [300, 168], [90, 168]], "source": "main"},
{"text": "李四021-12345678", "box": [[90, 178], [420, 178], [420, 210], [90, 210]], "source": "main"},
]
result = extract_info(ocr_lines)
_print_case("单列版面", result)
assert result["邮编"] == "200120"
assert result["电话"] == "021-12345678"
assert "上海市浦东新区世纪大道100号" in result["地址"]
assert "A座1201室" in result["地址"]
assert result["联系人/单位名"] == "李四"
def case_text_fallback() -> None:
"""无坐标回退:纯文本顺序规则。"""
ocr_texts = [
"518000",
"广东省深圳市南山区科技园",
"高新南一道18号",
"华南建设小组办公室",
"王五 13911112222",
]
result = extract_info(ocr_texts)
_print_case("纯文本回退", result)
assert result["邮编"] == "518000"
assert result["电话"] == "13911112222"
assert "广东省深圳市南山区科技园" in result["地址"]
assert "高新南一道18号" in result["地址"]
assert "华南建设小组办公室" in result["联系人/单位名"] or result["联系人/单位名"] == "王五"
def case_company_contact_with_phone() -> None:
"""单位名含地址关键字 + 电话同行,地址跨两行。"""
ocr_lines = [
{"text": "610000", "box": [[80, 40], [180, 40], [180, 80], [80, 80]], "source": "main"},
{"text": "四川省成都市蒲江县鹤山街道", "box": [[80, 100], [520, 100], [520, 132], [80, 132]], "source": "main"},
{"text": "健民路246号2栋1楼3号", "box": [[80, 140], [460, 140], [460, 172], [80, 172]], "source": "main"},
{"text": "蒲江县宏利物流有限公司 15680801653", "box": [[80, 180], [560, 180], [560, 212], [80, 212]], "source": "main"},
{"text": "20260200425708", "box": [[280, 60], [760, 60], [760, 94], [280, 94]], "source": "number"},
]
result = extract_info(ocr_lines)
_print_case("单位名+电话同行(带坐标)", result)
assert result["邮编"] == "610000"
assert result["电话"] == "15680801653"
assert "四川省成都市蒲江县鹤山街道" in result["地址"]
assert "健民路246号2栋1楼3号" in result["地址"]
assert "蒲江县宏利物流有限公司" not in result["地址"], f"单位名不应混入地址: {result['地址']}"
assert "宏利物流" in result["联系人/单位名"], f"联系人应含单位名: {result['联系人/单位名']}"
assert result["编号"] == "20260200425708"
def case_company_contact_separate_line() -> None:
"""单位名和电话分两行(无坐标回退)。"""
ocr_texts = [
"610000",
"四川省成都市蒲江县鹤山街道",
"健民路246号2栋1楼3号",
"蒲江县宏利物流有限公司",
"15680801653",
]
result = extract_info(ocr_texts)
_print_case("单位名+电话分行(纯文本)", result)
assert result["邮编"] == "610000"
assert result["电话"] == "15680801653"
assert "四川省成都市蒲江县鹤山街道" in result["地址"]
assert "健民路246号2栋1楼3号" in result["地址"]
assert "宏利物流" in result["联系人/单位名"], f"联系人应含单位名: {result['联系人/单位名']}"
def case_split_roi_address() -> None:
"""模拟 ROI 切片后坐标已偏移还原的场景:地址跨两个切片。
切片1 (y_offset=0): 邮编 + 地址第一行
切片2 (y_offset=200): 地址第二行 + 联系人+电话
坐标已在 worker 中加上 y_offset此处直接传最终坐标。
"""
ocr_lines = [
# 切片1 的结果y_offset=0坐标不变
{"text": "610000", "box": [[80, 30], [180, 30], [180, 60], [80, 60]], "source": "main"},
{"text": "四川省成都市蒲江县鹤山街道健民路246号2栋1", "box": [[80, 80], [560, 80], [560, 112], [80, 112]], "source": "main"},
# 切片2 的结果(原始 y 约 10~42加上 y_offset=200 后变成 210~242
{"text": "楼3号", "box": [[80, 210], [160, 210], [160, 242], [80, 242]], "source": "main"},
{"text": "蒲江县宏利物流有限公司 15680801653", "box": [[80, 260], [560, 260], [560, 292], [80, 292]], "source": "main"},
# 编号区域
{"text": "20260200425708", "box": [[280, 400], [760, 400], [760, 434], [280, 434]], "source": "number"},
]
result = extract_info(ocr_lines)
_print_case("ROI切片坐标还原", result)
assert result["邮编"] == "610000"
assert result["电话"] == "15680801653"
# 关键:地址两行应正确拼接
assert "健民路246号2栋1" in result["地址"], f"地址应含第一行: {result['地址']}"
assert "楼3号" in result["地址"], f"地址应含第二行: {result['地址']}"
assert "宏利物流" in result["联系人/单位名"], f"联系人应含单位名: {result['联系人/单位名']}"
def main() -> None:
case_layout_multi_column()
case_layout_single_column()
case_text_fallback()
case_company_contact_with_phone()
case_company_contact_separate_line()
case_split_roi_address()
print("\n所有场景断言通过。")
if __name__ == "__main__":
main()

View File

@@ -11,6 +11,7 @@ import time
import logging
import threading
import queue
import multiprocessing as mp
import subprocess
from datetime import datetime
from pathlib import Path
@@ -24,8 +25,8 @@ from PyQt6.QtWidgets import (
from PyQt6.QtCore import Qt, QTimer, pyqtSignal, QObject, pyqtSlot
from PyQt6.QtGui import QImage, QPixmap, QFont, QAction, QKeySequence, QShortcut
from processor import extract_info
from ocr_offline import create_offline_ocr, get_models_base_dir
from ocr_offline import get_models_base_dir
from ocr_worker_process import run_ocr_worker
logger = logging.getLogger("post_ocr.desktop")
@@ -70,12 +71,12 @@ def setup_logging() -> Path:
class OCRService(QObject):
"""
OCR 后台服务(运行在标准 Python 线程内)。
OCR 后台服务(运行在独立子进程中)。
关键点:
- 避免使用 QThread在 macOS 上QThread(Dummy-*) 内 import paddleocr 可能卡死
- PaddleOCR 实例在后台线程内创建并使用,避免跨线程调用导致卡死/死锁
- 单线程串行处理任务:避免并发推理挤爆内存或引发底层库竞争
- PaddleOCR 初始化与推理都放到子进程,避免阻塞 UI 主进程
- 主进程只做任务投递与结果回调
- 子进程异常或卡住时,可通过重启服务恢复
"""
finished = pyqtSignal(int, dict, list)
@@ -87,11 +88,27 @@ class OCRService(QObject):
def __init__(self, models_base_dir: Path):
super().__init__()
self._models_base_dir = models_base_dir
self._ocr = None
self._busy = False
self.backend_name = "unknown"
self._stop_event = threading.Event()
self._queue: "queue.Queue[tuple[int, object] | None]" = queue.Queue()
self._thread = threading.Thread(target=self._run, name="OCRThread", daemon=True)
backend_req = os.environ.get("POST_OCR_BACKEND", "rapidocr").strip().lower() or "rapidocr"
if sys.platform == "darwin":
# macOS + PyQt/OpenCV 场景下 fork 对 ONNX 推理稳定性较差rapidocr 默认走 spawn。
# Paddle 在 macOS 历史上与 spawn 组合更容易出现卡住,因此保留 fork。
method_default = "fork" if backend_req == "paddle" else "spawn"
else:
method_default = "spawn"
method = os.environ.get("POST_OCR_MP_START_METHOD", method_default).strip() or method_default
try:
self._ctx = mp.get_context(method)
except ValueError:
method = method_default
self._ctx = mp.get_context(method_default)
logger.info("OCR multiprocessing start_method=%s (backend_req=%s)", method, backend_req)
self._req_q = None
self._resp_q = None
self._proc = None
self._reader_thread = None
def _set_busy(self, busy: bool) -> None:
if self._busy != busy:
@@ -99,118 +116,157 @@ class OCRService(QObject):
self.busy_changed.emit(busy)
def start(self) -> None:
"""启动后台线程并执行 warmup"""
"""启动 OCR 子进程与响应监听线程"""
self._thread.start()
self._stop_event.clear()
self._req_q = self._ctx.Queue(maxsize=1)
self._resp_q = self._ctx.Queue()
self._proc = self._ctx.Process(
target=run_ocr_worker,
args=(str(self._models_base_dir), self._req_q, self._resp_q),
name="OCRProcess",
daemon=True,
)
self._proc.start()
self._reader_thread = threading.Thread(
target=self._read_responses,
name="OCRRespReader",
daemon=True,
)
self._reader_thread.start()
def stop(self, timeout_ms: int = 8000) -> bool:
"""请求停止后台线程并等待退出(后台线程为 daemon退出失败也不阻塞进程"""
"""停止 OCR 子进程与监听线程"""
try:
self._stop_event.set()
# 用 sentinel 唤醒阻塞在 queue.get() 的线程
try:
self._queue.put_nowait(None)
if self._req_q is not None:
self._req_q.put_nowait(None)
except Exception:
pass
self._thread.join(timeout=max(0.0, timeout_ms / 1000.0))
return not self._thread.is_alive()
if self._reader_thread is not None:
self._reader_thread.join(timeout=max(0.0, timeout_ms / 1000.0))
proc_alive = False
if self._proc is not None:
self._proc.join(timeout=max(0.0, timeout_ms / 1000.0))
if self._proc.is_alive():
proc_alive = True
self._proc.terminate()
self._proc.join(timeout=1.0)
self._set_busy(False)
return not proc_alive
except Exception:
self._set_busy(False)
return False
finally:
self._proc = None
self._reader_thread = None
self._req_q = None
self._resp_q = None
def _ensure_ocr(self) -> None:
if self._ocr is None:
logger.info("OCR ensure_ocr: 开始创建 PaddleOCR线程=%s", threading.current_thread().name)
self._ocr = create_offline_ocr(models_base_dir=self._models_base_dir)
logger.info("OCR ensure_ocr: PaddleOCR 创建完成")
self.ready.emit()
def _warmup(self) -> None:
"""提前加载 OCR 模型,避免首次识别时才初始化导致“像卡死”"""
logger.info("OCR 预热开始(线程=%s", threading.current_thread().name)
self._ensure_ocr()
logger.info("OCR 预热完成")
def _run(self) -> None:
try:
self._warmup()
except Exception as e:
logger.exception("OCR 预热失败:%s", str(e))
self.init_error.emit(str(e))
return
def _read_responses(self) -> None:
"""读取 OCR 子进程响应并转发为 Qt 信号。"""
while not self._stop_event.is_set():
item = None
try:
item = self._queue.get()
except Exception:
if self._resp_q is None:
return
msg = self._resp_q.get(timeout=0.2)
except queue.Empty:
continue
except Exception:
if not self._stop_event.is_set():
self.init_error.emit("OCR 子进程通信失败")
return
if item is None:
# sentinel: stop
break
job_id, images = item
if self._stop_event.is_set():
break
self._process_job(job_id, images)
if not isinstance(msg, dict):
continue
msg_type = str(msg.get("type", "")).strip()
if msg_type == "progress":
job_id = msg.get("job_id", "-")
stage = msg.get("stage", "")
extra = []
if "images" in msg:
extra.append(f"images={msg.get('images')}")
if "texts" in msg:
extra.append(f"texts={msg.get('texts')}")
suffix = f" ({', '.join(extra)})" if extra else ""
logger.info("OCR 子进程进度 job=%s stage=%s%s", job_id, stage, suffix)
continue
if msg_type == "ready":
self.backend_name = str(msg.get("backend", "unknown"))
logger.info(
"OCR 子进程已就绪 pid=%s backend=%s",
getattr(self._proc, "pid", None),
self.backend_name,
)
self.ready.emit()
continue
if msg_type == "init_error":
self._set_busy(False)
self.init_error.emit(str(msg.get("error", "OCR 初始化失败")))
continue
if msg_type == "result":
self._set_busy(False)
try:
job_id = int(msg.get("job_id"))
except Exception:
job_id = -1
record = msg.get("record") if isinstance(msg.get("record"), dict) else {}
texts = msg.get("texts") if isinstance(msg.get("texts"), list) else []
self.finished.emit(job_id, record, texts)
continue
if msg_type == "error":
self._set_busy(False)
try:
job_id = int(msg.get("job_id"))
except Exception:
job_id = -1
self.error.emit(job_id, str(msg.get("error", "OCR 处理失败")))
continue
@pyqtSlot(int, object)
def process(self, job_id: int, images: object) -> None:
"""接收 UI 请求:把任务放进队列,由后台线程串行处理"""
"""接收 UI 请求并投递到 OCR 子进程"""
if self._stop_event.is_set():
self.error.emit(job_id, "OCR 服务正在关闭,请稍后重试。")
return
# 忙碌或已有排队任务时,直接拒绝,避免积压导致“看起来一直在识别”
if self._busy or (not self._queue.empty()):
if self._proc is None or (not self._proc.is_alive()):
self.error.emit(job_id, "OCR 服务未就绪,请稍后重试。")
return
if self._busy:
self.error.emit(job_id, "OCR 正在进行中,请稍后再试。")
return
if not isinstance(images, (list, tuple)) or len(images) == 0:
self.error.emit(job_id, "内部错误:未传入有效图片数据")
return
try:
# 注意:这里不做耗时工作,只入队,避免阻塞 UI
self._queue.put_nowait((job_id, images))
except Exception as e:
self.error.emit(job_id, f"OCR 入队失败:{str(e)}")
def _process_job(self, job_id: int, images: object) -> None:
self._set_busy(True)
try:
self._ensure_ocr()
if not isinstance(images, (list, tuple)) or len(images) == 0:
raise ValueError("内部错误:未传入有效图片数据")
shapes = []
for img in images:
for item in images:
img = item
source = "main"
if isinstance(item, dict):
img = item.get("img")
source = str(item.get("source", "main"))
try:
shapes.append(getattr(img, "shape", None))
shapes.append({"source": source, "shape": getattr(img, "shape", None)})
except Exception:
shapes.append(None)
logger.info("OCR job=%s 开始images=%s", job_id, shapes)
shapes.append({"source": source, "shape": None})
logger.info("OCR job=%s 投递到子进程images=%s", job_id, shapes)
ocr_texts: list[str] = []
for img in images:
if img is None:
continue
result = self._ocr.ocr(img, cls=False)
if result and result[0]:
for line in result[0]:
if line and len(line) >= 2:
ocr_texts.append(line[1][0])
record = extract_info(ocr_texts)
logger.info(
"OCR job=%s 完成lines=%s, record_keys=%s",
job_id,
len(ocr_texts),
list(record.keys()),
)
self.finished.emit(job_id, record, ocr_texts)
except Exception as e:
logger.exception("OCR job=%s 失败:%s", job_id, str(e))
self.error.emit(job_id, str(e))
finally:
self._set_busy(True)
if self._req_q is None:
raise RuntimeError("OCR 请求队列不可用")
self._req_q.put_nowait((int(job_id), list(images)))
except queue.Full:
self._set_busy(False)
self.error.emit(job_id, "OCR 队列已满,请稍后再试。")
except Exception as e:
self._set_busy(False)
self.error.emit(job_id, f"OCR 入队失败:{str(e)}")
class MainWindow(QMainWindow):
@@ -223,17 +279,22 @@ class MainWindow(QMainWindow):
# OCR 工作线程(避免 UI 卡死)
self._ocr_job_id = 0
self._ocr_pending_job_id = None
self._ocr_start_time_by_job: dict[int, float] = {}
self._ocr_ready = False
self._ocr_busy = False
self._shutting_down = False
self._ocr_timeout_prompted = False
self._ocr_restarting = False
# 摄像头
self.cap = None
self.timer = QTimer()
self.timer.timeout.connect(self.update_frame)
self._frame_fail_count = 0
self._last_frame = None
self._last_frame_ts = 0.0
self._capture_in_progress = False
# 状态栏进度(识别中显示)
self._progress = QProgressBar()
@@ -252,17 +313,44 @@ class MainWindow(QMainWindow):
self.init_ui()
self.load_cameras()
# 主线程预加载:在 macOS 上,必须在主线程 import paddleocr,否则后台线程会卡死
self.statusBar().showMessage("正在加载 OCR 模块...")
QApplication.processEvents()
try:
logger.info("主线程预加载import paddleocr")
import paddleocr # noqa: F401
logger.info("主线程预加载paddleocr 导入完成")
except Exception as e:
logger.error("主线程预加载失败:%s", e, exc_info=True)
QMessageBox.critical(self, "启动失败", f"无法加载 OCR 模块:{e}")
raise
# 历史上主线程直接 import paddleocr 偶发卡死
# 默认跳过该步骤,避免 UI 被阻塞;如需诊断可打开轻量预检(子进程 + 超时)。
if os.environ.get("POST_OCR_PRECHECK_IMPORT", "0").strip() == "1":
timeout_sec = 8
try:
timeout_sec = max(
2,
int(
os.environ.get("POST_OCR_PRECHECK_TIMEOUT_SEC", "8").strip()
or "8"
),
)
except Exception:
timeout_sec = 8
self.statusBar().showMessage("正在预检 OCR 模块...")
QApplication.processEvents()
try:
logger.info("OCR 预检开始子进程timeout=%ss", timeout_sec)
proc = subprocess.run(
[sys.executable, "-c", "import paddleocr"],
capture_output=True,
text=True,
timeout=timeout_sec,
)
if proc.returncode == 0:
logger.info("OCR 预检通过")
else:
logger.warning(
"OCR 预检失败rc=%s%s",
proc.returncode,
(proc.stderr or "").strip(),
)
except subprocess.TimeoutExpired:
logger.warning("OCR 预检超时(%ss跳过预检继续启动。", timeout_sec)
except Exception as e:
logger.warning("OCR 预检异常:%s(忽略并继续)", str(e))
else:
logger.info("已跳过主线程 OCR 预检POST_OCR_PRECHECK_IMPORT=0")
# OCR 服务放在 UI 初始化之后启动,避免 ready/busy 信号回调时 btn_capture 尚未创建
self.statusBar().showMessage("正在启动 OCR 服务...")
@@ -308,6 +396,8 @@ class MainWindow(QMainWindow):
self._ocr_ready = False
self._ocr_busy = False
self._ocr_timeout_prompted = False
self._ocr_pending_job_id = None
self._ocr_start_time_by_job.clear()
try:
self._progress.setVisible(False)
except Exception:
@@ -316,10 +406,13 @@ class MainWindow(QMainWindow):
try:
svc = getattr(self, "_ocr_service", None)
if svc is not None:
try:
self.request_ocr.disconnect(svc.process)
except Exception:
pass
ok = svc.stop(timeout_ms=8000 if force else 3000)
if (not ok) and force:
# Python 线程无法可靠“强杀”,这里只做提示并继续退出流程。
logger.warning("OCR 服务停止超时:后台线程可能仍在运行,建议重启应用。")
logger.warning("OCR 服务停止超时:子进程可能仍在退出中,建议重启应用。")
except Exception:
pass
@@ -333,9 +426,15 @@ class MainWindow(QMainWindow):
if self._shutting_down:
return
self.statusBar().showMessage("正在重启 OCR 服务...")
self._stop_ocr_service(force=True)
self._init_ocr_service()
if self._ocr_restarting:
return
self._ocr_restarting = True
try:
self.statusBar().showMessage("正在重启 OCR 服务...")
self._stop_ocr_service(force=True)
self._init_ocr_service()
finally:
self._ocr_restarting = False
def _init_ocr_service(self) -> None:
models_dir = get_models_base_dir()
@@ -347,7 +446,7 @@ class MainWindow(QMainWindow):
self._ocr_service = OCRService(models_base_dir=models_dir)
# 注意OCRService 内部使用 Python 线程做 warmup 与推理。
# 注意OCRService 内部使用独立子进程做 warmup 与推理。
# 这里强制使用 QueuedConnection确保 UI 回调始终在主线程执行。
self.request_ocr.connect(self._ocr_service.process, Qt.ConnectionType.QueuedConnection)
self._ocr_service.ready.connect(self._on_ocr_ready, Qt.ConnectionType.QueuedConnection)
@@ -361,11 +460,16 @@ class MainWindow(QMainWindow):
def _on_ocr_ready(self) -> None:
try:
self._ocr_ready = True
self.statusBar().showMessage("OCR 模型已加载(离线)")
backend = "unknown"
try:
backend = str(getattr(self._ocr_service, "backend_name", "unknown"))
except Exception:
backend = "unknown"
self.statusBar().showMessage(f"OCR 模型已加载({backend}")
btn = getattr(self, "btn_capture", None)
if btn is not None:
btn.setEnabled(self.cap is not None and not self._ocr_busy)
logger.info("OCR ready")
logger.info("OCR ready backend=%s", backend)
except Exception as e:
logger.exception("处理 OCR ready 回调失败:%s", str(e))
@@ -378,6 +482,8 @@ class MainWindow(QMainWindow):
try:
self._ocr_busy = busy
if busy:
# OCR 线程已开始处理,提交阶段不再算“待接收”
self._ocr_pending_job_id = None
self._progress.setRange(0, 0) # 不确定进度条
self._progress.setVisible(True)
self._ocr_timeout_prompted = False
@@ -391,8 +497,27 @@ class MainWindow(QMainWindow):
except Exception as e:
logger.exception("处理 OCR busy 回调失败:%s", str(e))
def _guard_ocr_submission(self, job_id: int) -> None:
"""
兜底保护:
如果提交后一段时间仍未进入 busy 状态,说明任务可能未被 OCR 线程接收,
主动恢复按钮,避免界面一直停留在“正在识别...”。
"""
if job_id != self._ocr_pending_job_id:
return
if self._ocr_busy:
return
self._ocr_pending_job_id = None
self._ocr_start_time_by_job.pop(job_id, None)
logger.warning("OCR job=%s 提交后未被接收,已自动恢复 UI 状态", job_id)
self.statusBar().showMessage("识别请求未被处理,请重试一次(已自动恢复)")
if self.btn_capture is not None:
self.btn_capture.setEnabled(self.cap is not None and self._ocr_ready)
def _tick_ocr_watchdog(self) -> None:
"""识别进行中:更新耗时,超时则提示是否重启 OCR 服务。"""
"""识别进行中:更新耗时,超时自动重启 OCR 服务。"""
if not self._ocr_busy:
return
@@ -402,25 +527,39 @@ class MainWindow(QMainWindow):
cost = time.monotonic() - start_t
self.statusBar().showMessage(f"正在识别...(已用 {cost:.1f}s")
# 超时保护:底层推理偶发卡住时,让用户可以自救
if cost >= 45 and not self._ocr_timeout_prompted:
# 超时保护:底层推理偶发卡住时,自动重启 OCR 服务并恢复可用状态
timeout_sec = 25
try:
timeout_sec = max(
8, int(os.environ.get("POST_OCR_JOB_TIMEOUT_SEC", "25").strip() or "25")
)
except Exception:
timeout_sec = 25
if cost >= timeout_sec and not self._ocr_timeout_prompted:
self._ocr_timeout_prompted = True
reply = QMessageBox.question(
logger.warning("OCR job=%s 超时 %.1fs自动重启 OCR 服务", self._ocr_job_id, cost)
self.statusBar().showMessage(f"识别超时({cost:.1f}s正在自动恢复...")
# 当前任务视为失败并回收,避免界面一直等待结果
self._ocr_start_time_by_job.pop(self._ocr_job_id, None)
self._restart_ocr_service()
QMessageBox.warning(
self,
"识别超时",
"识别已超过 45 秒仍未完成,可能卡住。\n\n是否重启 OCR 服务\n(若仍无响应,建议直接退出并重新打开应用)",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
"本次识别超时,已自动重启 OCR 服务\n请再次拍照识别。",
)
if reply == QMessageBox.StandardButton.Yes:
self._restart_ocr_service()
def _on_ocr_finished_job(self, job_id: int, record: dict, texts: list) -> None:
if self._ocr_pending_job_id == job_id:
self._ocr_pending_job_id = None
start_t = self._ocr_start_time_by_job.pop(job_id, None)
# 只处理最新一次请求,避免旧结果回写
if job_id != self._ocr_job_id:
return
logger.info("OCR job=%s 原始文本: %s", job_id, texts)
logger.info("OCR job=%s 解析结果: %s", job_id, record)
self.records.append(record)
self.update_table()
cost = ""
@@ -428,14 +567,18 @@ class MainWindow(QMainWindow):
cost = f"(耗时 {time.monotonic() - start_t:.1f}s"
self.statusBar().showMessage(f"识别完成: {record.get('联系人/单位名', '未知')}{cost}")
logger.info("OCR job=%s UI 回写完成 %s", job_id, cost)
self.btn_capture.setEnabled(self.cap is not None and self._ocr_ready and not self._ocr_busy)
def _on_ocr_error_job(self, job_id: int, error: str) -> None:
if self._ocr_pending_job_id == job_id:
self._ocr_pending_job_id = None
self._ocr_start_time_by_job.pop(job_id, None)
if job_id != self._ocr_job_id:
return
self.statusBar().showMessage("识别失败")
QMessageBox.warning(self, "识别失败", error)
logger.error("OCR job=%s error: %s", job_id, error)
self.btn_capture.setEnabled(self.cap is not None and self._ocr_ready and not self._ocr_busy)
def init_ui(self):
central = QWidget()
@@ -519,6 +662,7 @@ class MainWindow(QMainWindow):
# macOS/Qt 下 Space 经常被控件吞掉(按钮激活/表格选择等),用 ApplicationShortcut 更稳
self._shortcut_capture2 = QShortcut(QKeySequence("Space"), self)
self._shortcut_capture2.setContext(Qt.ShortcutContext.ApplicationShortcut)
self._shortcut_capture2.setAutoRepeat(False)
self._shortcut_capture2.activated.connect(self.capture_and_recognize)
def load_cameras(self):
@@ -770,6 +914,13 @@ class MainWindow(QMainWindow):
ret, frame = self.cap.read()
if ret and frame is not None and frame.size > 0:
self._frame_fail_count = 0
# 缓存原始帧,拍照时直接使用,避免按空格再读摄像头导致主线程阻塞
try:
self._last_frame = frame.copy()
self._last_frame_ts = time.monotonic()
except Exception:
self._last_frame = frame
self._last_frame_ts = time.monotonic()
# 绘制扫描框
h, w = frame.shape[:2]
# 框的位置:上方 70%,编号在下方
@@ -812,6 +963,9 @@ class MainWindow(QMainWindow):
def capture_and_recognize(self):
"""拍照并识别"""
if self._capture_in_progress:
self.statusBar().showMessage("正在拍照,请稍候")
return
if self.cap is None:
self.statusBar().showMessage("请先连接摄像头")
return
@@ -822,61 +976,128 @@ class MainWindow(QMainWindow):
self.statusBar().showMessage("正在识别中,请稍后再按空格")
return
ret, frame = self.cap.read()
if not ret:
self.statusBar().showMessage("拍照失败")
return
# 裁剪两块 ROI主信息框 + 编号区域),显著减小像素量,提升速度与稳定性
h, w = frame.shape[:2]
x1, y1 = int(w * 0.06), int(h * 0.08)
x2 = int(w * 0.94)
y2_box = int(h * 0.78)
roi_images = []
self._capture_in_progress = True
try:
roi_box = frame[y1:y2_box, x1:x2]
if roi_box is not None and roi_box.size > 0:
roi_images.append(roi_box)
except Exception:
pass
# 直接使用预览缓存帧,避免在按键回调中阻塞式 read 摄像头导致卡顿
frame = None
now = time.monotonic()
if self._last_frame is not None and (now - self._last_frame_ts) <= 1.5:
try:
frame = self._last_frame.copy()
except Exception:
frame = self._last_frame
try:
# 编号一般在底部中间,取较小区域即可
nx1, nx2 = int(w * 0.30), int(w * 0.70)
ny1, ny2 = int(h * 0.80), int(h * 0.98)
roi_num = frame[ny1:ny2, nx1:nx2]
if roi_num is not None and roi_num.size > 0:
roi_images.append(roi_num)
except Exception:
pass
if frame is None:
self.statusBar().showMessage("尚未拿到稳定画面,请稍后再按空格")
return
if not roi_images:
self.statusBar().showMessage("拍照失败:未截取到有效区域")
return
# 裁剪主信息 ROI 与编号 ROI
h, w = frame.shape[:2]
x1, y1 = int(w * 0.06), int(h * 0.08)
x2 = int(w * 0.94)
y2_box = int(h * 0.78)
# 超大分辨率下适当缩放(提高稳定性与速度)
resized_images = []
for img in roi_images:
roi_inputs = []
try:
max_w = 1400
if img.shape[1] > max_w:
scale = max_w / img.shape[1]
img = cv2.resize(img, (int(img.shape[1] * scale), int(img.shape[0] * scale)))
roi_box = frame[y1:y2_box, x1:x2]
if roi_box is not None and roi_box.size > 0:
# 主信息区域切成多段,规避大图整块检测偶发卡住
split_count = 2
try:
split_count = max(
1,
int(
os.environ.get("POST_OCR_MAIN_SPLIT", "2").strip()
or "2"
),
)
except Exception:
split_count = 2
split_count = min(split_count, 4)
if split_count <= 1 or roi_box.shape[0] < 120:
roi_inputs.append({"img": roi_box, "source": "main", "y_offset": 0})
else:
h_box = roi_box.shape[0]
step = h_box / float(split_count)
overlap = max(8, int(h_box * 0.06))
for i in range(split_count):
sy = int(max(0, i * step - (overlap if i > 0 else 0)))
ey = int(
min(
h_box,
(i + 1) * step
+ (overlap if i < split_count - 1 else 0),
)
)
part = roi_box[sy:ey, :]
if part is not None and part.size > 0:
roi_inputs.append({"img": part, "source": "main", "y_offset": sy})
except Exception:
pass
resized_images.append(img)
logger.info("UI 触发识别frame=%s, rois=%s", getattr(frame, "shape", None), [getattr(i, "shape", None) for i in resized_images])
try:
# 编号一般在底部中间,取较小区域即可
nx1, nx2 = int(w * 0.30), int(w * 0.70)
ny1, ny2 = int(h * 0.80), int(h * 0.98)
roi_num = frame[ny1:ny2, nx1:nx2]
if roi_num is not None and roi_num.size > 0:
roi_inputs.append({"img": roi_num, "source": "number"})
except Exception:
pass
self.statusBar().showMessage("正在识别...")
self.btn_capture.setEnabled(False)
if not roi_inputs:
self.statusBar().showMessage("拍照失败:未截取到有效区域")
return
# 派发到 OCR 工作线程
self._ocr_job_id += 1
job_id = self._ocr_job_id
self._ocr_start_time_by_job[job_id] = time.monotonic()
self.request_ocr.emit(job_id, resized_images)
# 超大分辨率下适当缩放(提高稳定性与速度)
resized_inputs = []
max_w = 960
try:
max_w = max(
600, int(os.environ.get("POST_OCR_MAX_ROI_WIDTH", "960").strip() or "960")
)
except Exception:
max_w = 960
for item in roi_inputs:
img = item.get("img")
source = item.get("source", "main")
y_off = item.get("y_offset", 0)
scale = 1.0
try:
if img is not None and img.shape[1] > max_w:
scale = max_w / img.shape[1]
img = cv2.resize(img, (int(img.shape[1] * scale), int(img.shape[0] * scale)))
except Exception:
pass
resized_inputs.append({"img": img, "source": source, "y_offset": int(y_off * scale)})
logger.info(
"UI 触发识别frame=%s, rois=%s, frame_age=%.3fs",
getattr(frame, "shape", None),
[
{
"source": item.get("source", "main"),
"shape": getattr(item.get("img"), "shape", None),
}
for item in resized_inputs
],
max(0.0, now - self._last_frame_ts),
)
self.statusBar().showMessage("正在识别...")
self.btn_capture.setEnabled(False)
# 派发到 OCR 工作线程
self._ocr_job_id += 1
job_id = self._ocr_job_id
self._ocr_pending_job_id = job_id
self._ocr_start_time_by_job[job_id] = time.monotonic()
self.request_ocr.emit(job_id, resized_inputs)
QTimer.singleShot(2000, lambda j=job_id: self._guard_ocr_submission(j))
finally:
self._capture_in_progress = False
def update_table(self):
"""更新表格"""
@@ -943,6 +1164,7 @@ class MainWindow(QMainWindow):
def main():
mp.freeze_support()
log_file = setup_logging()
app = QApplication(sys.argv)
app.setStyle("Fusion")

View File

@@ -1,8 +1,10 @@
import os
import glob
import cv2
import pandas as pd
from tqdm import tqdm
from paddleocr import PaddleOCR
from pathlib import Path
from ocr_engine import create_ocr_engine
from processor import extract_info, save_to_excel
# 禁用联网检查,加快启动速度
@@ -10,8 +12,9 @@ os.environ["PADDLE_PDX_DISABLE_MODEL_SOURCE_CHECK"] = "True"
def main():
# 初始化 PaddleOCR
ocr = PaddleOCR(use_textline_orientation=True, lang="ch")
# 初始化 OCR 引擎(默认 rapidocr可通过环境变量切换
models_dir = Path("models")
ocr_engine = create_ocr_engine(models_base_dir=models_dir)
input_dir = "data/input"
output_dir = "data/output"
@@ -36,19 +39,35 @@ def main():
for img_path in tqdm(image_paths):
try:
# 1. 执行 OCR 识别
result = ocr.ocr(img_path, cls=False)
img = cv2.imread(img_path)
if img is None:
errors.append(
{"file": os.path.basename(img_path), "error": "图片读取失败"}
)
continue
lines = ocr_engine.infer_lines(img)
# 2. 提取文字行
ocr_texts = []
if result and result[0]:
for line in result[0]:
# line 格式: [box, (text, confidence)]
if line and len(line) >= 2:
ocr_texts.append(line[1][0])
ocr_lines = []
for line in lines:
text = str(line.text).strip()
if not text:
continue
ocr_texts.append(text)
ocr_lines.append(
{
"text": text,
"box": line.box,
"conf": line.conf,
"source": "main",
"roi_index": 0,
}
)
# 3. 结构化解析
if ocr_texts:
record = extract_info(ocr_texts)
record = extract_info(ocr_lines if ocr_lines else ocr_texts)
all_records.append(record)
else:
errors.append(

217
src/ocr_engine.py Normal file
View File

@@ -0,0 +1,217 @@
from __future__ import annotations
import logging
import os
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, List, Optional
logger = logging.getLogger("post_ocr.ocr_engine")
@dataclass
class OCRLine:
text: str
box: Any
conf: Optional[float] = None
class BaseOCREngine:
backend_name: str = "unknown"
def infer_lines(self, img: Any) -> List[OCRLine]:
raise NotImplementedError
def _to_float(val: Any) -> Optional[float]:
try:
return float(val)
except Exception:
return None
class PaddleOCREngine(BaseOCREngine):
backend_name = "paddle"
def __init__(self, models_base_dir: Path):
from ocr_offline import create_offline_ocr
self._ocr = create_offline_ocr(models_base_dir=models_base_dir)
def infer_lines(self, img: Any) -> List[OCRLine]:
result = self._ocr.ocr(img, cls=False)
lines: List[OCRLine] = []
if result and result[0]:
for line in result[0]:
if not line or len(line) < 2:
continue
text = str(line[1][0]) if isinstance(line[1], (list, tuple)) and line[1] else ""
if not text:
continue
conf = None
if isinstance(line[1], (list, tuple)) and len(line[1]) >= 2:
conf = _to_float(line[1][1])
lines.append(OCRLine(text=text, box=line[0], conf=conf))
return lines
class RapidOCREngine(BaseOCREngine):
backend_name = "rapidocr"
def __init__(self, models_base_dir: Path):
# 按官方包名导入rapidocr-onnxruntime -> rapidocr_onnxruntime
from rapidocr_onnxruntime import RapidOCR
kwargs: dict[str, Any] = {}
# 可选:如果用户准备了本地 ONNX 模型,可通过环境变量覆盖路径
det_path = os.environ.get("POST_OCR_RAPID_DET_MODEL", "").strip()
cls_path = os.environ.get("POST_OCR_RAPID_CLS_MODEL", "").strip()
rec_path = os.environ.get("POST_OCR_RAPID_REC_MODEL", "").strip()
dict_path = os.environ.get("POST_OCR_RAPID_KEYS_PATH", "").strip()
if det_path:
kwargs["det_model_path"] = det_path
if cls_path:
kwargs["cls_model_path"] = cls_path
if rec_path:
kwargs["rec_model_path"] = rec_path
if dict_path:
kwargs["rec_keys_path"] = dict_path
self._ocr = RapidOCR(**kwargs)
self._models_base_dir = models_base_dir
def _parse_result_item(self, item: Any) -> Optional[OCRLine]:
if isinstance(item, dict):
text = str(item.get("text") or item.get("txt") or "").strip()
if not text:
return None
box = item.get("box") or item.get("points")
conf = _to_float(item.get("score", item.get("conf")))
return OCRLine(text=text, box=box, conf=conf)
if not isinstance(item, (list, tuple)):
return None
# 常见格式1: [box, text, score]
if len(item) >= 2 and isinstance(item[1], str):
box = item[0]
text = item[1].strip()
conf = _to_float(item[2]) if len(item) >= 3 else None
if text:
return OCRLine(text=text, box=box, conf=conf)
return None
# 常见格式2Paddle风格: [box, (text, score)]
if len(item) >= 2 and isinstance(item[1], (list, tuple)) and len(item[1]) >= 1:
text = str(item[1][0]).strip()
if not text:
return None
conf = _to_float(item[1][1]) if len(item[1]) >= 2 else None
return OCRLine(text=text, box=item[0], conf=conf)
return None
def infer_lines(self, img: Any) -> List[OCRLine]:
# RapidOCR 常见返回:(ocr_res, elapse)
raw = self._ocr(img)
result = raw[0] if isinstance(raw, tuple) and len(raw) >= 1 else raw
if result is None:
return []
lines: List[OCRLine] = []
# 一些版本返回对象boxes/txts/scores
if hasattr(result, "boxes") and hasattr(result, "txts"):
boxes = list(getattr(result, "boxes") or [])
txts = list(getattr(result, "txts") or [])
scores = list(getattr(result, "scores") or [])
for idx, text in enumerate(txts):
t = str(text).strip()
if not t:
continue
box = boxes[idx] if idx < len(boxes) else None
conf = _to_float(scores[idx]) if idx < len(scores) else None
lines.append(OCRLine(text=t, box=box, conf=conf))
return lines
if isinstance(result, (list, tuple)):
for item in result:
parsed = self._parse_result_item(item)
if parsed is not None:
lines.append(parsed)
return lines
def create_ocr_engine(models_base_dir: Path) -> BaseOCREngine:
"""
创建 OCR 引擎。
环境变量:
- POST_OCR_BACKEND: rapidocr | paddle | auto默认 rapidocr
- POST_OCR_BACKEND_FALLBACK_PADDLE: 1/0不设置时按后端类型决定
"""
backend_env = os.environ.get("POST_OCR_BACKEND")
backend = (backend_env or "rapidocr").strip().lower() or "rapidocr"
fallback_env = os.environ.get("POST_OCR_BACKEND_FALLBACK_PADDLE")
if fallback_env is None or fallback_env.strip() == "":
# 规则:
# 1) auto 模式默认允许回退
# 2) 用户显式指定 rapidocr 时,默认不静默回退(避免“看似切到 rapidocr 实际仍是 paddle”
# 3) 其他场景保持兼容,默认允许回退
if backend == "auto":
allow_fallback = True
elif backend == "rapidocr" and backend_env is not None:
allow_fallback = False
else:
allow_fallback = True
else:
allow_fallback = fallback_env.strip().lower() not in {"0", "false", "off", "no"}
logger.info(
"create_ocr_engine: request=%s explicit=%s fallback=%s python=%s",
backend,
backend_env is not None,
allow_fallback,
sys.executable,
)
if backend in {"rapidocr", "onnx"}:
try:
engine = RapidOCREngine(models_base_dir=models_base_dir)
logger.info("create_ocr_engine: using backend=%s", engine.backend_name)
return engine
except Exception as e:
logger.exception("create_ocr_engine: rapidocr 初始化失败")
if allow_fallback:
logger.warning("create_ocr_engine: 已回退到 paddle")
engine = PaddleOCREngine(models_base_dir=models_base_dir)
logger.info("create_ocr_engine: using backend=%s", engine.backend_name)
return engine
raise RuntimeError(
"POST_OCR_BACKEND=rapidocr 初始化失败,且未启用回退。"
"请先安装 rapidocr-onnxruntime或设置 POST_OCR_BACKEND_FALLBACK_PADDLE=1。"
) from e
if backend == "paddle":
engine = PaddleOCREngine(models_base_dir=models_base_dir)
logger.info("create_ocr_engine: using backend=%s", engine.backend_name)
return engine
# auto: 优先 rapidocr失败回退 paddle
if backend == "auto":
try:
engine = RapidOCREngine(models_base_dir=models_base_dir)
logger.info("create_ocr_engine: using backend=%s", engine.backend_name)
return engine
except Exception:
logger.exception("create_ocr_engine: auto 模式 rapidocr 初始化失败,回退 paddle")
engine = PaddleOCREngine(models_base_dir=models_base_dir)
logger.info("create_ocr_engine: using backend=%s", engine.backend_name)
return engine
# 未知值兜底
logger.warning("create_ocr_engine: 未知后端 '%s',回退 paddle", backend)
engine = PaddleOCREngine(models_base_dir=models_base_dir)
logger.info("create_ocr_engine: using backend=%s", engine.backend_name)
return engine

View File

@@ -11,10 +11,43 @@ from __future__ import annotations
import os
import sys
import shutil
import tempfile
from pathlib import Path
import logging
def _ensure_ascii_model_dir(model_dir: Path) -> str:
"""
PaddlePaddle C++ 推理引擎在 Windows 上不支持非 ASCII 路径。
如果模型路径含非 ASCII 字符,复制到临时目录(仅首次复制,后续复用)。
"""
s = str(model_dir)
try:
s.encode("ascii")
return s # 纯 ASCII直接用
except UnicodeEncodeError:
pass
# 路径含非 ASCII复制到 %TEMP%/post_ocr_models/<子目录名>
safe_base = Path(tempfile.gettempdir()) / "post_ocr_models"
safe_dir = safe_base / model_dir.name
# 用 pdmodel 文件大小做简单校验,避免每次都复制
src_marker = model_dir / "inference.pdmodel"
dst_marker = safe_dir / "inference.pdmodel"
if dst_marker.exists() and dst_marker.stat().st_size == src_marker.stat().st_size:
return str(safe_dir)
# 复制模型
log = logging.getLogger("post_ocr.ocr")
log.info("模型路径含非ASCII字符复制到: %s", safe_dir)
if safe_dir.exists():
shutil.rmtree(safe_dir)
shutil.copytree(model_dir, safe_dir)
return str(safe_dir)
def _is_frozen() -> bool:
"""判断是否为 PyInstaller 打包后的运行环境"""
return bool(getattr(sys, "frozen", False))
@@ -73,7 +106,8 @@ def create_offline_ocr(models_base_dir: Path | None = None):
"""
创建 PaddleOCR 2.x 实例PP-OCRv4 中文)。
首次运行会自动下载模型到 ~/.paddleocr/whl/。
- 打包态:使用 models/ 目录下的离线模型(完全离线)
- 开发态:首次运行自动下载到 ~/.paddleocr/whl/
"""
log = logging.getLogger("post_ocr.ocr")
@@ -83,11 +117,43 @@ def create_offline_ocr(models_base_dir: Path | None = None):
log.info("create_offline_ocr: importing paddleocr")
from paddleocr import PaddleOCR
# 构建 PaddleOCR 参数
# 说明:在部分 macOS/CPU 环境下oneDNN(MKLDNN) 可能出现卡住,默认关闭以换取稳定性。
kwargs = dict(lang="ch", use_angle_cls=False, show_log=False)
disable_mkldnn = os.environ.get("POST_OCR_DISABLE_MKLDNN", "1").strip() == "1"
if disable_mkldnn:
os.environ["FLAGS_use_mkldnn"] = "0"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
kwargs["enable_mkldnn"] = False
try:
kwargs["cpu_threads"] = max(
1, int(os.environ.get("POST_OCR_CPU_THREADS", "1").strip() or "1")
)
except Exception:
kwargs["cpu_threads"] = 1
# 如果 models/ 目录存在离线模型,显式指定路径(打包分发场景)
models_dir = models_base_dir or get_models_base_dir()
det_dir = models_dir / "ch_PP-OCRv4_det_infer"
rec_dir = models_dir / "ch_PP-OCRv4_rec_infer"
if (det_dir / "inference.pdmodel").exists() and (rec_dir / "inference.pdmodel").exists():
log.info("使用离线模型: %s", models_dir)
kwargs["det_model_dir"] = _ensure_ascii_model_dir(det_dir)
kwargs["rec_model_dir"] = _ensure_ascii_model_dir(rec_dir)
log.info("det_model_dir=%s, rec_model_dir=%s", kwargs["det_model_dir"], kwargs["rec_model_dir"])
else:
log.info("未找到离线模型,将使用默认路径(可能需要联网下载)")
log.info("create_offline_ocr: creating PaddleOCR(lang=ch)")
ocr = PaddleOCR(
lang="ch",
use_angle_cls=False,
show_log=False,
)
try:
ocr = PaddleOCR(**kwargs)
except TypeError:
# 兼容个别 PaddleOCR 版本不支持的参数
kwargs.pop("enable_mkldnn", None)
kwargs.pop("cpu_threads", None)
ocr = PaddleOCR(**kwargs)
log.info("create_offline_ocr: PaddleOCR created")
return ocr

98
src/ocr_worker_process.py Normal file
View File

@@ -0,0 +1,98 @@
from __future__ import annotations
# 必须在所有 paddle/numpy import 之前设置,否则 macOS spawn 子进程推理会死锁
import os
import logging
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["FLAGS_use_mkldnn"] = "0"
os.environ["PADDLE_DISABLE_SIGNAL_HANDLER"] = "1"
os.environ["PADDLE_PDX_DISABLE_MODEL_SOURCE_CHECK"] = "True"
from pathlib import Path
from typing import Any
from ocr_engine import create_ocr_engine
from processor import extract_info
logger = logging.getLogger("post_ocr.ocr_worker")
def run_ocr_worker(models_base_dir: str, request_q, response_q) -> None:
"""
OCR 子进程主循环:
- 在子进程内初始化 PaddleOCR避免阻塞主 UI 进程
- 接收任务并返回结构化结果
"""
try:
response_q.put({"type": "progress", "stage": "init_start"})
engine = create_ocr_engine(models_base_dir=Path(models_base_dir))
response_q.put({"type": "ready", "backend": getattr(engine, "backend_name", "unknown")})
except Exception as e:
logger.exception("OCR 子进程初始化失败")
response_q.put({"type": "init_error", "error": str(e)})
return
while True:
item = request_q.get()
if item is None:
break
job_id = -1
try:
job_id, images = item
if not isinstance(images, (list, tuple)) or len(images) == 0:
raise ValueError("内部错误:未传入有效图片数据")
response_q.put({"type": "progress", "job_id": int(job_id), "stage": "job_received", "images": len(images)})
ocr_texts: list[str] = []
ocr_lines: list[dict[str, Any]] = []
for roi_index, entry in enumerate(images):
source = "main"
img = entry
y_offset = 0
if isinstance(entry, dict):
source = str(entry.get("source", "main"))
img = entry.get("img")
y_offset = int(entry.get("y_offset", 0))
elif roi_index > 0:
source = "number"
if img is None:
continue
response_q.put({"type": "progress", "job_id": int(job_id), "stage": f"roi_{roi_index}_start"})
lines = engine.infer_lines(img)
response_q.put({"type": "progress", "job_id": int(job_id), "stage": f"roi_{roi_index}_done"})
for line in lines:
text = str(line.text).strip()
if not text:
continue
ocr_texts.append(text)
# 将切片内的局部坐标还原为完整 ROI 坐标
box = line.box
if y_offset and isinstance(box, (list, tuple)):
box = [[p[0], p[1] + y_offset] for p in box]
ocr_lines.append(
{
"text": text,
"box": box,
"conf": line.conf,
"source": source,
"roi_index": roi_index,
}
)
record = extract_info(ocr_lines if ocr_lines else ocr_texts)
response_q.put({"type": "progress", "job_id": int(job_id), "stage": "parse_done", "texts": len(ocr_texts)})
response_q.put(
{
"type": "result",
"job_id": int(job_id),
"record": record,
"texts": ocr_texts,
}
)
except Exception as e:
logger.exception("OCR 子进程处理任务失败 job=%s", job_id)
response_q.put({"type": "error", "job_id": int(job_id), "error": str(e)})

View File

@@ -1,9 +1,68 @@
import re
from dataclasses import dataclass
from statistics import median
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
import pandas as pd
from typing import List, Dict, Any
from pydantic import BaseModel, Field
ZIP_PATTERN = re.compile(r"(?<!\d)(\d{6})(?!\d)")
PHONE_PATTERN = re.compile(r"(?<!\d)(1[3-9]\d{9}|0\d{2,3}-?\d{7,8})(?!\d)")
LONG_NUMBER_PATTERN = re.compile(r"(?<!\d)(\d{10,20})(?!\d)")
ADDRESS_HINT_PATTERN = re.compile(r"(省|市|区|县|乡|镇|街|路|村|号|栋|单元|室)")
COMPANY_HINT_PATTERN = re.compile(
r"(公司|有限|集团|工厂|物流|商贸|商行|超市|药房|药店|诊所|医院|学校|幼儿园"
r"|办公室|办事处|服务部|经营部|工作室|研究所|事务所|中心|银行|信用社|合作社)"
)
@dataclass
class OCRLine:
text: str
source: str
order: int
x1: Optional[float] = None
y1: Optional[float] = None
x2: Optional[float] = None
y2: Optional[float] = None
row_idx: int = -1
col_idx: int = -1
@property
def has_pos(self) -> bool:
return (
self.x1 is not None
and self.y1 is not None
and self.x2 is not None
and self.y2 is not None
)
@property
def cx(self) -> float:
if not self.has_pos:
return float(self.order)
return (self.x1 + self.x2) / 2.0 # type: ignore[operator]
@property
def cy(self) -> float:
if not self.has_pos:
return float(self.order)
return (self.y1 + self.y2) / 2.0 # type: ignore[operator]
@property
def height(self) -> float:
if not self.has_pos:
return 0.0
return max(0.0, float(self.y2) - float(self.y1))
@property
def width(self) -> float:
if not self.has_pos:
return 0.0
return max(0.0, float(self.x2) - float(self.x1))
class EnvelopeRecord(BaseModel):
编号: str = ""
邮编: str = ""
@@ -13,75 +72,451 @@ class EnvelopeRecord(BaseModel):
def clean_text(text: str) -> str:
"""清理OCR识别出的杂质字符"""
return text.strip().replace(" ", "")
"""清理 OCR 识别文本中的空白和无意义分隔符。"""
if not text:
return ""
text = text.replace("\u3000", " ").strip()
return re.sub(r"\s+", "", text)
def extract_info(ocr_results: List[str]) -> Dict[str, str]:
def _parse_box(raw_box: Any) -> Tuple[Optional[float], Optional[float], Optional[float], Optional[float]]:
if not isinstance(raw_box, (list, tuple)) or len(raw_box) < 4:
return None, None, None, None
xs: List[float] = []
ys: List[float] = []
for p in raw_box:
if not isinstance(p, (list, tuple)) or len(p) < 2:
continue
try:
xs.append(float(p[0]))
ys.append(float(p[1]))
except Exception:
continue
if not xs or not ys:
return None, None, None, None
return min(xs), min(ys), max(xs), max(ys)
def _to_ocr_line(item: Any, idx: int) -> Optional[OCRLine]:
if isinstance(item, str):
text = clean_text(item)
if not text:
return None
return OCRLine(text=text, source="main", order=idx)
if not isinstance(item, dict):
return None
text = clean_text(str(item.get("text", "")))
if not text:
return None
source = str(item.get("source", "main"))
x1, y1, x2, y2 = _parse_box(item.get("box"))
if x1 is None:
# 兼容直接传坐标的输入
try:
x1 = float(item.get("x1"))
y1 = float(item.get("y1"))
x2 = float(item.get("x2"))
y2 = float(item.get("y2"))
except Exception:
x1, y1, x2, y2 = None, None, None, None
return OCRLine(text=text, source=source, order=idx, x1=x1, y1=y1, x2=x2, y2=y2)
def _normalize_ocr_results(ocr_results: Sequence[Any]) -> List[OCRLine]:
lines: List[OCRLine] = []
seen = set()
for idx, item in enumerate(ocr_results):
line = _to_ocr_line(item, idx)
if line is None:
continue
if line.has_pos:
key = (
line.text,
line.source,
round(line.cx, 1),
round(line.cy, 1),
)
else:
key = (line.text, line.source, line.order)
if key in seen:
continue
seen.add(key)
lines.append(line)
return lines
def _first_match(pattern: re.Pattern[str], text: str) -> str:
m = pattern.search(text)
if not m:
return ""
if m.lastindex:
return m.group(1)
return m.group(0)
def _find_anchor(lines: Iterable[OCRLine], pattern: re.Pattern[str], prefer_bottom: bool) -> Optional[Tuple[OCRLine, str]]:
candidates: List[Tuple[OCRLine, str]] = []
for line in lines:
m = pattern.search(line.text)
if not m:
continue
token = m.group(1) if m.lastindex else m.group(0)
candidates.append((line, token))
if not candidates:
return None
if prefer_bottom:
return max(candidates, key=lambda item: (item[0].row_idx, item[0].cy, item[0].cx, item[0].order))
return min(candidates, key=lambda item: (item[0].row_idx, item[0].cy, item[0].cx, item[0].order))
def _build_rows(lines: List[OCRLine]) -> List[List[OCRLine]]:
positioned = [line for line in lines if line.has_pos]
if not positioned:
return []
positioned.sort(key=lambda line: (line.cy, line.cx))
heights = [line.height for line in positioned if line.height > 1.0]
h_med = median(heights) if heights else 20.0
y_threshold = max(8.0, h_med * 0.65)
rows: List[List[OCRLine]] = []
for line in positioned:
if not rows:
rows.append([line])
continue
row = rows[-1]
mean_y = sum(item.cy for item in row) / len(row)
if abs(line.cy - mean_y) <= y_threshold:
row.append(line)
else:
rows.append([line])
for row_idx, row in enumerate(rows):
row.sort(key=lambda line: (line.cx, line.x1 or 0.0))
for col_idx, line in enumerate(row):
line.row_idx = row_idx
line.col_idx = col_idx
return rows
def _sanitize_address(text: str) -> str:
text = clean_text(text)
text = re.sub(r"^(地址|收件地址|详细地址)[:]?", "", text)
return text
def _sanitize_contact(text: str) -> str:
text = clean_text(text)
text = re.sub(r"^(收件人|联系人|单位|收)[:]?", "", text)
return text.strip(",。;;:")
def _join_entries(entries: List[Tuple[int, int, str]]) -> str:
if not entries:
return ""
entries.sort(key=lambda item: (item[0], item[1]))
merged: List[str] = []
for _, _, text in entries:
if not text:
continue
if merged and merged[-1] == text:
continue
merged.append(text)
return "".join(merged)
def _extract_tracking_number(lines: List[OCRLine], zip_code: str, phone: str) -> str:
phone_digits = re.sub(r"\D", "", phone)
candidates: List[Tuple[int, int, str]] = []
for line in lines:
for match in LONG_NUMBER_PATTERN.finditer(line.text):
number = match.group(1)
if number == zip_code:
continue
if phone and (number == phone or number == phone_digits):
continue
src_score = 2 if line.source == "number" else 1
candidates.append((src_score, len(number), number))
if not candidates:
return ""
candidates.sort(reverse=True)
return candidates[0][2]
def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[str, str, bool]:
"""基于邮编/电话锚点的版面提取。
两种模式自动切换:
- 单栏模式(信封典型排版):邮编后连续行=地址,电话行去掉电话=联系人
- 多栏模式:左侧=地址,右侧=联系人(按 split_x 分割)
单栏/多栏判断比较邮编和电话的左边缘x1而非中心点cx
避免因文本长度不同导致误判。
"""
从OCR结果列表中提取结构化信息。
main_lines = [line for line in lines if line.source != "number"]
if len(main_lines) < 2:
return "", "", False
rows = _build_rows(main_lines)
if not rows:
return "", "", False
zip_anchor = _find_anchor(main_lines, ZIP_PATTERN, prefer_bottom=False)
phone_anchor = _find_anchor(main_lines, PHONE_PATTERN, prefer_bottom=True)
if zip_anchor and not data["邮编"]:
data["邮编"] = zip_anchor[1]
if phone_anchor and not data["电话"]:
data["电话"] = phone_anchor[1]
if not zip_anchor and not phone_anchor:
return "", "", False
if zip_anchor:
start_row = zip_anchor[0].row_idx
else:
start_row = min(line.row_idx for line in main_lines)
if phone_anchor:
end_row = phone_anchor[0].row_idx
else:
end_row = max(line.row_idx for line in main_lines)
if start_row > end_row:
start_row, end_row = end_row, start_row
# ── 单栏/多栏判断:用左边缘 x1 而非中心点 cx ──
single_column_mode = False
if zip_anchor and phone_anchor:
zip_x1 = zip_anchor[0].x1 if zip_anchor[0].x1 is not None else zip_anchor[0].cx
phone_x1 = phone_anchor[0].x1 if phone_anchor[0].x1 is not None else phone_anchor[0].cx
line_widths = [line.width for line in main_lines if line.width > 0]
width_ref = median(line_widths) if line_widths else 120.0
single_column_mode = abs(phone_x1 - zip_x1) < max(60.0, width_ref * 0.4)
# ════════════════════════════════════════════
# 单栏模式:邮编后连续行=地址,电话行去掉电话=联系人
# ════════════════════════════════════════════
if single_column_mode:
# 从电话行提取联系人
contact_text = ""
if phone_anchor:
remainder = clean_text(phone_anchor[0].text.replace(phone_anchor[1], ""))
if remainder and not re.fullmatch(r"\d{2,20}", remainder):
contact_text = _sanitize_contact(remainder)
# 邮编行之后、电话行之前的所有行 → 地址
address_entries: List[Tuple[int, int, str]] = []
for line in main_lines:
if line.row_idx < start_row or line.row_idx > end_row:
continue
if phone_anchor and line is phone_anchor[0]:
continue
text = line.text
if zip_anchor and line is zip_anchor[0]:
text = text.replace(zip_anchor[1], "")
text = clean_text(text)
if not text or re.fullmatch(r"\d{6,20}", text):
continue
address_entries.append((line.row_idx, line.col_idx, text))
# 联系人为空时,从地址末尾回退一行
if not contact_text and address_entries:
last_row = max(item[0] for item in address_entries)
last_entries = [item for item in address_entries if item[0] == last_row]
last_text = _join_entries(last_entries)
candidate = _sanitize_contact(last_text)
if candidate:
prev_rows = [item[0] for item in address_entries if item[0] < last_row]
# 与前面地址行有行间距 > 1或含单位关键字 → 视为联系人
gap = (last_row - max(prev_rows)) if prev_rows else 999
if gap > 1 or COMPANY_HINT_PATTERN.search(last_text):
contact_text = candidate
address_entries = [item for item in address_entries if item[0] != last_row]
address_text = _sanitize_address(_join_entries(address_entries))
return address_text, contact_text, True
# ════════════════════════════════════════════
# 多栏模式:按 split_x 左右分割
# ════════════════════════════════════════════
if zip_anchor and phone_anchor and phone_anchor[0].cx > zip_anchor[0].cx:
split_x = (zip_anchor[0].cx + phone_anchor[0].cx) / 2.0
elif phone_anchor:
split_x = phone_anchor[0].cx - max(40.0, phone_anchor[0].width * 0.6)
elif zip_anchor:
split_x = zip_anchor[0].cx + max(80.0, zip_anchor[0].width * 1.5)
else:
split_x = median([line.cx for line in main_lines])
address_entries = []
contact_entries: List[Tuple[int, int, str]] = []
for line in main_lines:
if line.row_idx < start_row or line.row_idx > end_row:
continue
text = line.text
if zip_anchor and line is zip_anchor[0]:
text = text.replace(zip_anchor[1], "")
if phone_anchor and line is phone_anchor[0]:
text = text.replace(phone_anchor[1], "")
text = clean_text(text)
if not text or re.fullmatch(r"\d{6,20}", text):
continue
if line.cx <= split_x:
address_entries.append((line.row_idx, line.col_idx, text))
else:
contact_entries.append((line.row_idx, line.col_idx, text))
# 联系人优先取靠近电话的一段
if phone_anchor and contact_entries:
phone_row = phone_anchor[0].row_idx
min_dist = min(abs(item[0] - phone_row) for item in contact_entries)
contact_entries = [
item for item in contact_entries if abs(item[0] - phone_row) <= min_dist + 1
]
contact_text = _sanitize_contact(_join_entries(contact_entries))
address_text = _sanitize_address(_join_entries(address_entries))
# 多栏模式下联系人为空的回退
if not contact_text and phone_anchor:
remainder = clean_text(phone_anchor[0].text.replace(phone_anchor[1], ""))
if remainder and not re.fullmatch(r"\d{2,20}", remainder):
contact_text = _sanitize_contact(remainder)
return address_text, contact_text, True
def _extract_with_text_order(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[str, str, bool]:
if not lines:
return "", "", False
zip_idx = -1
zip_token = ""
for idx, line in enumerate(lines):
m = ZIP_PATTERN.search(line.text)
if m:
zip_idx = idx
zip_token = m.group(1)
break
phone_idx = -1
phone_token = ""
for idx in range(len(lines) - 1, -1, -1):
m = PHONE_PATTERN.search(lines[idx].text)
if m:
phone_idx = idx
phone_token = m.group(1)
break
if zip_idx < 0 or phone_idx < 0 or zip_idx > phone_idx:
return "", "", False
if not data["邮编"]:
data["邮编"] = zip_token
if not data["电话"]:
data["电话"] = phone_token
address_parts: List[Tuple[int, str]] = []
contact_text = ""
for idx in range(zip_idx, phone_idx + 1):
text = lines[idx].text
if idx == zip_idx:
text = text.replace(zip_token, "")
if idx == phone_idx:
text = text.replace(phone_token, "")
text = clean_text(text)
if not text:
continue
if idx == phone_idx:
contact_text = _sanitize_contact(text)
else:
address_parts.append((idx, text))
if not contact_text and address_parts:
for idx, text in reversed(address_parts):
# 含单位关键字的直接采纳;纯地址行跳过
if ADDRESS_HINT_PATTERN.search(text) and not COMPANY_HINT_PATTERN.search(text):
continue
contact_text = _sanitize_contact(text)
if contact_text:
address_parts = [item for item in address_parts if item[0] != idx]
break
# 兜底:电话紧邻上一行即使含地址关键字也采纳(如"蒲江县宏利物流有限公司"
if not contact_text and address_parts:
last_idx, last_text = address_parts[-1]
if last_idx == phone_idx - 1:
contact_text = _sanitize_contact(last_text)
if contact_text:
address_parts = address_parts[:-1]
address_text = _sanitize_address("".join(text for _, text in address_parts))
return address_text, contact_text, True
def extract_info(ocr_results: List[Any]) -> Dict[str, str]:
"""
从 OCR 结果中提取结构化信息。
支持两类输入:
1. 纯文本列表:`List[str]`
2. 带坐标的行对象列表:`List[{"text": "...", "box": [[x,y],...], "source": "..."}]`
"""
data = {"编号": "", "邮编": "", "地址": "", "联系人/单位名": "", "电话": ""}
lines = _normalize_ocr_results(ocr_results)
if not lines:
return data
full_content = " ".join(ocr_results)
full_content = " ".join(line.text for line in lines)
data["邮编"] = _first_match(ZIP_PATTERN, full_content)
data["电话"] = _first_match(PHONE_PATTERN, full_content)
data["编号"] = _extract_tracking_number(lines, data["邮编"], data["电话"])
# 1. 提取邮编 (6位数字)
zip_match = re.search(r"\b(\d{6})\b", full_content)
if zip_match:
data["邮编"] = zip_match.group(1)
# 第一优先级:使用版面坐标进行"邮编-电话锚点 + 连续块"解析
address_text, contact_text, used_layout = _extract_with_layout(lines, data)
if not used_layout:
# 第二优先级:无坐标时按文本顺序回退
address_text, contact_text, _ = _extract_with_text_order(lines, data)
# 2. 提取电话 (11位手机号或带区号固话)
phone_match = re.search(r"(1[3-9]\d{9}|0\d{2,3}-\d{7,8})", full_content)
if phone_match:
data["电话"] = phone_match.group(0)
data["地址"] = _sanitize_address(address_text)
data["联系人/单位名"] = _sanitize_contact(contact_text)
# 3. 提取联系人 (通常在电话前面,或者是独立的短行)
# 遍历每一行寻找包含电话的行
for line in ocr_results:
if data["电话"] and data["电话"] in line:
# 移除电话部分,剩下的可能是姓名
name_part = line.replace(data["电话"], "").strip()
# 进一步清洗姓名(移除符号)
name_part = re.sub(r"[^\w\u4e00-\u9fa5]", "", name_part)
if name_part:
data["联系人/单位名"] = name_part
break
# 如果还没找到联系人,尝试找不含数字的短行
# 最终兜底:联系人和地址任一为空时,补旧规则避免完全丢字段
if not data["联系人/单位名"]:
for line in ocr_results:
clean_line = re.sub(r"[^\w\u4e00-\u9fa5]", "", line)
if 2 <= len(clean_line) <= 10 and not re.search(r"\d", clean_line):
data["联系人/单位名"] = clean_line
break
for line in lines:
text = clean_text(line.text)
if not text:
continue
if data["电话"] and data["电话"] in text:
name_part = _sanitize_contact(text.replace(data["电话"], ""))
if name_part:
data["联系人/单位名"] = name_part
break
if not data["联系人/单位名"]:
for line in lines:
text = clean_text(line.text)
if 2 <= len(text) <= 20 and not re.search(r"\d", text):
data["联系人/单位名"] = _sanitize_contact(text)
break
# 4. 提取地址
address_match = re.search(
r"([^,,。\s]*(?:省|市|区|县|乡|镇|路|街|村|组|号)[^,,。\s]*)", full_content
)
if address_match:
data["地址"] = address_match.group(1)
else:
# 兜底:寻找较长的包含地名特征的行
for line in ocr_results:
if any(k in line for k in ["", "", "", "", "", "", ""]):
data["地址"] = line.strip()
break
# 5. 提取编号 (长数字串)
# 排除邮编和电话后的最长数字串
long_numbers = re.findall(r"\b\d{10,20}\b", full_content)
for num in long_numbers:
if num != data["电话"]:
data["编号"] = num
break
if not data["地址"]:
hint_lines = [line.text for line in lines if ADDRESS_HINT_PATTERN.search(line.text)]
if hint_lines:
hint_lines.sort(key=lambda txt: len(clean_text(txt)), reverse=True)
data["地址"] = _sanitize_address(hint_lines[0])
return data
def save_to_excel(records: List[Dict[str, Any]], output_path: str):
df = pd.DataFrame(records)
# 调整列顺序
cols = ["编号", "邮编", "地址", "联系人/单位名", "电话"]
df = df.reindex(columns=cols)
df.to_excel(output_path, index=False)