fix: 修复ROI切片y_offset丢失导致地址拼接失败的问题

resize步骤未传递y_offset,导致OCR worker无法还原切片坐标,
跨切片的地址行(如"楼3号")无法与前一行正确拼接。
同时重写extract_with_layout为锚点算法(邮编/电话锚点+单栏/多栏自动切换),
支持单位名含地址关键字、电话同行等场景。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
let5sne.win10
2026-02-24 23:38:33 +08:00
parent 6ce4b7b363
commit 8f6a4fbc68
4 changed files with 172 additions and 47 deletions

View File

@@ -80,10 +80,81 @@ def case_text_fallback() -> None:
assert "华南建设小组办公室" in result["联系人/单位名"] or result["联系人/单位名"] == "王五" assert "华南建设小组办公室" in result["联系人/单位名"] or result["联系人/单位名"] == "王五"
def case_company_contact_with_phone() -> None:
"""单位名含地址关键字 + 电话同行,地址跨两行。"""
ocr_lines = [
{"text": "610000", "box": [[80, 40], [180, 40], [180, 80], [80, 80]], "source": "main"},
{"text": "四川省成都市蒲江县鹤山街道", "box": [[80, 100], [520, 100], [520, 132], [80, 132]], "source": "main"},
{"text": "健民路246号2栋1楼3号", "box": [[80, 140], [460, 140], [460, 172], [80, 172]], "source": "main"},
{"text": "蒲江县宏利物流有限公司 15680801653", "box": [[80, 180], [560, 180], [560, 212], [80, 212]], "source": "main"},
{"text": "20260200425708", "box": [[280, 60], [760, 60], [760, 94], [280, 94]], "source": "number"},
]
result = extract_info(ocr_lines)
_print_case("单位名+电话同行(带坐标)", result)
assert result["邮编"] == "610000"
assert result["电话"] == "15680801653"
assert "四川省成都市蒲江县鹤山街道" in result["地址"]
assert "健民路246号2栋1楼3号" in result["地址"]
assert "蒲江县宏利物流有限公司" not in result["地址"], f"单位名不应混入地址: {result['地址']}"
assert "宏利物流" in result["联系人/单位名"], f"联系人应含单位名: {result['联系人/单位名']}"
assert result["编号"] == "20260200425708"
def case_company_contact_separate_line() -> None:
"""单位名和电话分两行(无坐标回退)。"""
ocr_texts = [
"610000",
"四川省成都市蒲江县鹤山街道",
"健民路246号2栋1楼3号",
"蒲江县宏利物流有限公司",
"15680801653",
]
result = extract_info(ocr_texts)
_print_case("单位名+电话分行(纯文本)", result)
assert result["邮编"] == "610000"
assert result["电话"] == "15680801653"
assert "四川省成都市蒲江县鹤山街道" in result["地址"]
assert "健民路246号2栋1楼3号" in result["地址"]
assert "宏利物流" in result["联系人/单位名"], f"联系人应含单位名: {result['联系人/单位名']}"
def case_split_roi_address() -> None:
"""模拟 ROI 切片后坐标已偏移还原的场景:地址跨两个切片。
切片1 (y_offset=0): 邮编 + 地址第一行
切片2 (y_offset=200): 地址第二行 + 联系人+电话
坐标已在 worker 中加上 y_offset此处直接传最终坐标。
"""
ocr_lines = [
# 切片1 的结果y_offset=0坐标不变
{"text": "610000", "box": [[80, 30], [180, 30], [180, 60], [80, 60]], "source": "main"},
{"text": "四川省成都市蒲江县鹤山街道健民路246号2栋1", "box": [[80, 80], [560, 80], [560, 112], [80, 112]], "source": "main"},
# 切片2 的结果(原始 y 约 10~42加上 y_offset=200 后变成 210~242
{"text": "楼3号", "box": [[80, 210], [160, 210], [160, 242], [80, 242]], "source": "main"},
{"text": "蒲江县宏利物流有限公司 15680801653", "box": [[80, 260], [560, 260], [560, 292], [80, 292]], "source": "main"},
# 编号区域
{"text": "20260200425708", "box": [[280, 400], [760, 400], [760, 434], [280, 434]], "source": "number"},
]
result = extract_info(ocr_lines)
_print_case("ROI切片坐标还原", result)
assert result["邮编"] == "610000"
assert result["电话"] == "15680801653"
# 关键:地址两行应正确拼接
assert "健民路246号2栋1" in result["地址"], f"地址应含第一行: {result['地址']}"
assert "楼3号" in result["地址"], f"地址应含第二行: {result['地址']}"
assert "宏利物流" in result["联系人/单位名"], f"联系人应含单位名: {result['联系人/单位名']}"
def main() -> None: def main() -> None:
case_layout_multi_column() case_layout_multi_column()
case_layout_single_column() case_layout_single_column()
case_text_fallback() case_text_fallback()
case_company_contact_with_phone()
case_company_contact_separate_line()
case_split_roi_address()
print("\n所有场景断言通过。") print("\n所有场景断言通过。")

View File

@@ -540,6 +540,9 @@ class MainWindow(QMainWindow):
if job_id != self._ocr_job_id: if job_id != self._ocr_job_id:
return return
logger.info("OCR job=%s 原始文本: %s", job_id, texts)
logger.info("OCR job=%s 解析结果: %s", job_id, record)
self.records.append(record) self.records.append(record)
self.update_table() self.update_table()
cost = "" cost = ""
@@ -996,7 +999,7 @@ class MainWindow(QMainWindow):
split_count = min(split_count, 4) split_count = min(split_count, 4)
if split_count <= 1 or roi_box.shape[0] < 120: if split_count <= 1 or roi_box.shape[0] < 120:
roi_inputs.append({"img": roi_box, "source": "main"}) roi_inputs.append({"img": roi_box, "source": "main", "y_offset": 0})
else: else:
h_box = roi_box.shape[0] h_box = roi_box.shape[0]
step = h_box / float(split_count) step = h_box / float(split_count)
@@ -1012,7 +1015,7 @@ class MainWindow(QMainWindow):
) )
part = roi_box[sy:ey, :] part = roi_box[sy:ey, :]
if part is not None and part.size > 0: if part is not None and part.size > 0:
roi_inputs.append({"img": part, "source": "main"}) roi_inputs.append({"img": part, "source": "main", "y_offset": sy})
except Exception: except Exception:
pass pass
@@ -1043,13 +1046,15 @@ class MainWindow(QMainWindow):
for item in roi_inputs: for item in roi_inputs:
img = item.get("img") img = item.get("img")
source = item.get("source", "main") source = item.get("source", "main")
y_off = item.get("y_offset", 0)
scale = 1.0
try: try:
if img is not None and img.shape[1] > max_w: if img is not None and img.shape[1] > max_w:
scale = max_w / img.shape[1] scale = max_w / img.shape[1]
img = cv2.resize(img, (int(img.shape[1] * scale), int(img.shape[0] * scale))) img = cv2.resize(img, (int(img.shape[1] * scale), int(img.shape[0] * scale)))
except Exception: except Exception:
pass pass
resized_inputs.append({"img": img, "source": source}) resized_inputs.append({"img": img, "source": source, "y_offset": int(y_off * scale)})
logger.info( logger.info(
"UI 触发识别frame=%s, rois=%s, frame_age=%.3fs", "UI 触发识别frame=%s, rois=%s, frame_age=%.3fs",

View File

@@ -48,9 +48,11 @@ def run_ocr_worker(models_base_dir: str, request_q, response_q) -> None:
for roi_index, entry in enumerate(images): for roi_index, entry in enumerate(images):
source = "main" source = "main"
img = entry img = entry
y_offset = 0
if isinstance(entry, dict): if isinstance(entry, dict):
source = str(entry.get("source", "main")) source = str(entry.get("source", "main"))
img = entry.get("img") img = entry.get("img")
y_offset = int(entry.get("y_offset", 0))
elif roi_index > 0: elif roi_index > 0:
source = "number" source = "number"
if img is None: if img is None:
@@ -68,10 +70,14 @@ def run_ocr_worker(models_base_dir: str, request_q, response_q) -> None:
conf = float(line[1][1]) conf = float(line[1][1])
except Exception: except Exception:
conf = None conf = None
# 将切片内的局部坐标还原为完整 ROI 坐标
box = line[0]
if y_offset and isinstance(box, (list, tuple)):
box = [[p[0], p[1] + y_offset] for p in box]
ocr_lines.append( ocr_lines.append(
{ {
"text": text, "text": text,
"box": line[0], "box": box,
"conf": conf, "conf": conf,
"source": source, "source": source,
"roi_index": roi_index, "roi_index": roi_index,

View File

@@ -11,6 +11,10 @@ ZIP_PATTERN = re.compile(r"(?<!\d)(\d{6})(?!\d)")
PHONE_PATTERN = re.compile(r"(?<!\d)(1[3-9]\d{9}|0\d{2,3}-?\d{7,8})(?!\d)") PHONE_PATTERN = re.compile(r"(?<!\d)(1[3-9]\d{9}|0\d{2,3}-?\d{7,8})(?!\d)")
LONG_NUMBER_PATTERN = re.compile(r"(?<!\d)(\d{10,20})(?!\d)") LONG_NUMBER_PATTERN = re.compile(r"(?<!\d)(\d{10,20})(?!\d)")
ADDRESS_HINT_PATTERN = re.compile(r"(省|市|区|县|乡|镇|街|路|村|号|栋|单元|室)") ADDRESS_HINT_PATTERN = re.compile(r"(省|市|区|县|乡|镇|街|路|村|号|栋|单元|室)")
COMPANY_HINT_PATTERN = re.compile(
r"(公司|有限|集团|工厂|物流|商贸|商行|超市|药房|药店|诊所|医院|学校|幼儿园"
r"|办公室|办事处|服务部|经营部|工作室|研究所|事务所|中心|银行|信用社|合作社)"
)
@dataclass @dataclass
@@ -247,6 +251,15 @@ def _extract_tracking_number(lines: List[OCRLine], zip_code: str, phone: str) ->
def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[str, str, bool]: def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[str, str, bool]:
"""基于邮编/电话锚点的版面提取。
两种模式自动切换:
- 单栏模式(信封典型排版):邮编后连续行=地址,电话行去掉电话=联系人
- 多栏模式:左侧=地址,右侧=联系人(按 split_x 分割)
单栏/多栏判断比较邮编和电话的左边缘x1而非中心点cx
避免因文本长度不同导致误判。
"""
main_lines = [line for line in lines if line.source != "number"] main_lines = [line for line in lines if line.source != "number"]
if len(main_lines) < 2: if len(main_lines) < 2:
return "", "", False return "", "", False
@@ -262,6 +275,9 @@ def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[st
if phone_anchor and not data["电话"]: if phone_anchor and not data["电话"]:
data["电话"] = phone_anchor[1] data["电话"] = phone_anchor[1]
if not zip_anchor and not phone_anchor:
return "", "", False
if zip_anchor: if zip_anchor:
start_row = zip_anchor[0].row_idx start_row = zip_anchor[0].row_idx
else: else:
@@ -273,13 +289,62 @@ def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[st
if start_row > end_row: if start_row > end_row:
start_row, end_row = end_row, start_row start_row, end_row = end_row, start_row
# ── 单栏/多栏判断:用左边缘 x1 而非中心点 cx ──
single_column_mode = False single_column_mode = False
if zip_anchor and phone_anchor: if zip_anchor and phone_anchor:
zip_x1 = zip_anchor[0].x1 if zip_anchor[0].x1 is not None else zip_anchor[0].cx
phone_x1 = phone_anchor[0].x1 if phone_anchor[0].x1 is not None else phone_anchor[0].cx
line_widths = [line.width for line in main_lines if line.width > 0] line_widths = [line.width for line in main_lines if line.width > 0]
width_ref = median(line_widths) if line_widths else 120.0 width_ref = median(line_widths) if line_widths else 120.0
single_column_mode = abs(phone_anchor[0].cx - zip_anchor[0].cx) < max(60.0, width_ref * 0.6) single_column_mode = abs(phone_x1 - zip_x1) < max(60.0, width_ref * 0.4)
if zip_anchor and phone_anchor and phone_anchor[0].cx > zip_anchor[0].cx and not single_column_mode: # ════════════════════════════════════════════
# 单栏模式:邮编后连续行=地址,电话行去掉电话=联系人
# ════════════════════════════════════════════
if single_column_mode:
# 从电话行提取联系人
contact_text = ""
if phone_anchor:
remainder = clean_text(phone_anchor[0].text.replace(phone_anchor[1], ""))
if remainder and not re.fullmatch(r"\d{2,20}", remainder):
contact_text = _sanitize_contact(remainder)
# 邮编行之后、电话行之前的所有行 → 地址
address_entries: List[Tuple[int, int, str]] = []
for line in main_lines:
if line.row_idx < start_row or line.row_idx > end_row:
continue
if phone_anchor and line is phone_anchor[0]:
continue
text = line.text
if zip_anchor and line is zip_anchor[0]:
text = text.replace(zip_anchor[1], "")
text = clean_text(text)
if not text or re.fullmatch(r"\d{6,20}", text):
continue
address_entries.append((line.row_idx, line.col_idx, text))
# 联系人为空时,从地址末尾回退一行
if not contact_text and address_entries:
last_row = max(item[0] for item in address_entries)
last_entries = [item for item in address_entries if item[0] == last_row]
last_text = _join_entries(last_entries)
candidate = _sanitize_contact(last_text)
if candidate:
prev_rows = [item[0] for item in address_entries if item[0] < last_row]
# 与前面地址行有行间距 > 1或含单位关键字 → 视为联系人
gap = (last_row - max(prev_rows)) if prev_rows else 999
if gap > 1 or COMPANY_HINT_PATTERN.search(last_text):
contact_text = candidate
address_entries = [item for item in address_entries if item[0] != last_row]
address_text = _sanitize_address(_join_entries(address_entries))
return address_text, contact_text, True
# ════════════════════════════════════════════
# 多栏模式:按 split_x 左右分割
# ════════════════════════════════════════════
if zip_anchor and phone_anchor and phone_anchor[0].cx > zip_anchor[0].cx:
split_x = (zip_anchor[0].cx + phone_anchor[0].cx) / 2.0 split_x = (zip_anchor[0].cx + phone_anchor[0].cx) / 2.0
elif phone_anchor: elif phone_anchor:
split_x = phone_anchor[0].cx - max(40.0, phone_anchor[0].width * 0.6) split_x = phone_anchor[0].cx - max(40.0, phone_anchor[0].width * 0.6)
@@ -288,29 +353,19 @@ def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[st
else: else:
split_x = median([line.cx for line in main_lines]) split_x = median([line.cx for line in main_lines])
address_entries: List[Tuple[int, int, str]] = [] address_entries = []
contact_entries: List[Tuple[int, int, str]] = [] contact_entries: List[Tuple[int, int, str]] = []
for line in main_lines: for line in main_lines:
if line.row_idx < start_row or line.row_idx > end_row: if line.row_idx < start_row or line.row_idx > end_row:
continue continue
text = line.text text = line.text
if zip_anchor and line is zip_anchor[0]: if zip_anchor and line is zip_anchor[0]:
text = text.replace(zip_anchor[1], "") text = text.replace(zip_anchor[1], "")
if phone_anchor and line is phone_anchor[0]: if phone_anchor and line is phone_anchor[0]:
text = text.replace(phone_anchor[1], "") text = text.replace(phone_anchor[1], "")
text = clean_text(text) text = clean_text(text)
if not text: if not text or re.fullmatch(r"\d{6,20}", text):
continue
if re.fullmatch(r"\d{6,20}", text):
continue
if single_column_mode:
if phone_anchor and line is phone_anchor[0]:
contact_entries.append((line.row_idx, line.col_idx, text))
else:
address_entries.append((line.row_idx, line.col_idx, text))
continue continue
if line.cx <= split_x: if line.cx <= split_x:
@@ -318,7 +373,7 @@ def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[st
else: else:
contact_entries.append((line.row_idx, line.col_idx, text)) contact_entries.append((line.row_idx, line.col_idx, text))
# 联系人优先取靠近电话的一段,降低把地址误分到联系人的概率 # 联系人优先取靠近电话的一段
if phone_anchor and contact_entries: if phone_anchor and contact_entries:
phone_row = phone_anchor[0].row_idx phone_row = phone_anchor[0].row_idx
min_dist = min(abs(item[0] - phone_row) for item in contact_entries) min_dist = min(abs(item[0] - phone_row) for item in contact_entries)
@@ -329,34 +384,13 @@ def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[st
contact_text = _sanitize_contact(_join_entries(contact_entries)) contact_text = _sanitize_contact(_join_entries(contact_entries))
address_text = _sanitize_address(_join_entries(address_entries)) address_text = _sanitize_address(_join_entries(address_entries))
# 如果联系人为空,尝试从“电话所在行去掉电话号码”的残余文本提取 # 多栏模式下联系人为空的回退
if not contact_text and phone_anchor: if not contact_text and phone_anchor:
fallback_contact = clean_text(phone_anchor[0].text.replace(phone_anchor[1], "")) remainder = clean_text(phone_anchor[0].text.replace(phone_anchor[1], ""))
if fallback_contact and not re.fullmatch(r"\d{2,20}", fallback_contact): if remainder and not re.fullmatch(r"\d{2,20}", remainder):
contact_text = _sanitize_contact(fallback_contact) contact_text = _sanitize_contact(remainder)
# 若仍缺联系人,尝试从靠近电话的地址候选中回退一行 return address_text, contact_text, True
if not contact_text and phone_anchor and address_entries:
phone_row = phone_anchor[0].row_idx
sorted_candidates = sorted(
address_entries,
key=lambda item: (abs(item[0] - phone_row), -item[0], item[1]),
)
for row_idx, col_idx, txt in sorted_candidates:
if ADDRESS_HINT_PATTERN.search(txt):
continue
contact_text = _sanitize_contact(txt)
if contact_text:
address_entries = [
item
for item in address_entries
if not (item[0] == row_idx and item[1] == col_idx and item[2] == txt)
]
address_text = _sanitize_address(_join_entries(address_entries))
break
has_signal = bool(zip_anchor or phone_anchor)
return address_text, contact_text, has_signal
def _extract_with_text_order(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[str, str, bool]: def _extract_with_text_order(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[str, str, bool]:
@@ -407,13 +441,22 @@ def _extract_with_text_order(lines: List[OCRLine], data: Dict[str, str]) -> Tupl
if not contact_text and address_parts: if not contact_text and address_parts:
for idx, text in reversed(address_parts): for idx, text in reversed(address_parts):
if ADDRESS_HINT_PATTERN.search(text): # 含单位关键字的直接采纳;纯地址行跳过
if ADDRESS_HINT_PATTERN.search(text) and not COMPANY_HINT_PATTERN.search(text):
continue continue
contact_text = _sanitize_contact(text) contact_text = _sanitize_contact(text)
if contact_text: if contact_text:
address_parts = [item for item in address_parts if item[0] != idx] address_parts = [item for item in address_parts if item[0] != idx]
break break
# 兜底:电话紧邻上一行即使含地址关键字也采纳(如"蒲江县宏利物流有限公司"
if not contact_text and address_parts:
last_idx, last_text = address_parts[-1]
if last_idx == phone_idx - 1:
contact_text = _sanitize_contact(last_text)
if contact_text:
address_parts = address_parts[:-1]
address_text = _sanitize_address("".join(text for _, text in address_parts)) address_text = _sanitize_address("".join(text for _, text in address_parts))
return address_text, contact_text, True return address_text, contact_text, True
@@ -436,7 +479,7 @@ def extract_info(ocr_results: List[Any]) -> Dict[str, str]:
data["电话"] = _first_match(PHONE_PATTERN, full_content) data["电话"] = _first_match(PHONE_PATTERN, full_content)
data["编号"] = _extract_tracking_number(lines, data["邮编"], data["电话"]) data["编号"] = _extract_tracking_number(lines, data["邮编"], data["电话"])
# 第一优先级:使用版面坐标进行邮编-电话锚点 + 连续块解析 # 第一优先级:使用版面坐标进行"邮编-电话锚点 + 连续块"解析
address_text, contact_text, used_layout = _extract_with_layout(lines, data) address_text, contact_text, used_layout = _extract_with_layout(lines, data)
if not used_layout: if not used_layout:
# 第二优先级:无坐标时按文本顺序回退 # 第二优先级:无坐标时按文本顺序回退