From 8f6a4fbc6855e59dddbef72bcfa4c205129cf333 Mon Sep 17 00:00:00 2001 From: "let5sne.win10" Date: Tue, 24 Feb 2026 23:38:33 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8DROI=E5=88=87=E7=89=87y?= =?UTF-8?q?=5Foffset=E4=B8=A2=E5=A4=B1=E5=AF=BC=E8=87=B4=E5=9C=B0=E5=9D=80?= =?UTF-8?q?=E6=8B=BC=E6=8E=A5=E5=A4=B1=E8=B4=A5=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit resize步骤未传递y_offset,导致OCR worker无法还原切片坐标, 跨切片的地址行(如"楼3号")无法与前一行正确拼接。 同时重写extract_with_layout为锚点算法(邮编/电话锚点+单栏/多栏自动切换), 支持单位名含地址关键字、电话同行等场景。 Co-Authored-By: Claude Opus 4.6 --- scripts/test_extract_info.py | 71 +++++++++++++++++++ src/desktop.py | 11 ++- src/ocr_worker_process.py | 8 ++- src/processor.py | 129 +++++++++++++++++++++++------------ 4 files changed, 172 insertions(+), 47 deletions(-) diff --git a/scripts/test_extract_info.py b/scripts/test_extract_info.py index 8f31436..46b8194 100644 --- a/scripts/test_extract_info.py +++ b/scripts/test_extract_info.py @@ -80,10 +80,81 @@ def case_text_fallback() -> None: assert "华南建设小组办公室" in result["联系人/单位名"] or result["联系人/单位名"] == "王五" +def case_company_contact_with_phone() -> None: + """单位名含地址关键字 + 电话同行,地址跨两行。""" + ocr_lines = [ + {"text": "610000", "box": [[80, 40], [180, 40], [180, 80], [80, 80]], "source": "main"}, + {"text": "四川省成都市蒲江县鹤山街道", "box": [[80, 100], [520, 100], [520, 132], [80, 132]], "source": "main"}, + {"text": "健民路246号2栋1楼3号", "box": [[80, 140], [460, 140], [460, 172], [80, 172]], "source": "main"}, + {"text": "蒲江县宏利物流有限公司 15680801653", "box": [[80, 180], [560, 180], [560, 212], [80, 212]], "source": "main"}, + {"text": "20260200425708", "box": [[280, 60], [760, 60], [760, 94], [280, 94]], "source": "number"}, + ] + result = extract_info(ocr_lines) + _print_case("单位名+电话同行(带坐标)", result) + + assert result["邮编"] == "610000" + assert result["电话"] == "15680801653" + assert "四川省成都市蒲江县鹤山街道" in result["地址"] + assert "健民路246号2栋1楼3号" in result["地址"] + assert "蒲江县宏利物流有限公司" not in result["地址"], f"单位名不应混入地址: {result['地址']}" + assert "宏利物流" in result["联系人/单位名"], f"联系人应含单位名: {result['联系人/单位名']}" + assert result["编号"] == "20260200425708" + + +def case_company_contact_separate_line() -> None: + """单位名和电话分两行(无坐标回退)。""" + ocr_texts = [ + "610000", + "四川省成都市蒲江县鹤山街道", + "健民路246号2栋1楼3号", + "蒲江县宏利物流有限公司", + "15680801653", + ] + result = extract_info(ocr_texts) + _print_case("单位名+电话分行(纯文本)", result) + + assert result["邮编"] == "610000" + assert result["电话"] == "15680801653" + assert "四川省成都市蒲江县鹤山街道" in result["地址"] + assert "健民路246号2栋1楼3号" in result["地址"] + assert "宏利物流" in result["联系人/单位名"], f"联系人应含单位名: {result['联系人/单位名']}" + + +def case_split_roi_address() -> None: + """模拟 ROI 切片后坐标已偏移还原的场景:地址跨两个切片。 + + 切片1 (y_offset=0): 邮编 + 地址第一行 + 切片2 (y_offset=200): 地址第二行 + 联系人+电话 + 坐标已在 worker 中加上 y_offset,此处直接传最终坐标。 + """ + ocr_lines = [ + # 切片1 的结果(y_offset=0,坐标不变) + {"text": "610000", "box": [[80, 30], [180, 30], [180, 60], [80, 60]], "source": "main"}, + {"text": "四川省成都市蒲江县鹤山街道健民路246号2栋1", "box": [[80, 80], [560, 80], [560, 112], [80, 112]], "source": "main"}, + # 切片2 的结果(原始 y 约 10~42,加上 y_offset=200 后变成 210~242) + {"text": "楼3号", "box": [[80, 210], [160, 210], [160, 242], [80, 242]], "source": "main"}, + {"text": "蒲江县宏利物流有限公司 15680801653", "box": [[80, 260], [560, 260], [560, 292], [80, 292]], "source": "main"}, + # 编号区域 + {"text": "20260200425708", "box": [[280, 400], [760, 400], [760, 434], [280, 434]], "source": "number"}, + ] + result = extract_info(ocr_lines) + _print_case("ROI切片坐标还原", result) + + assert result["邮编"] == "610000" + assert result["电话"] == "15680801653" + # 关键:地址两行应正确拼接 + assert "健民路246号2栋1" in result["地址"], f"地址应含第一行: {result['地址']}" + assert "楼3号" in result["地址"], f"地址应含第二行: {result['地址']}" + assert "宏利物流" in result["联系人/单位名"], f"联系人应含单位名: {result['联系人/单位名']}" + + def main() -> None: case_layout_multi_column() case_layout_single_column() case_text_fallback() + case_company_contact_with_phone() + case_company_contact_separate_line() + case_split_roi_address() print("\n所有场景断言通过。") diff --git a/src/desktop.py b/src/desktop.py index 9b1992b..22576a9 100644 --- a/src/desktop.py +++ b/src/desktop.py @@ -540,6 +540,9 @@ class MainWindow(QMainWindow): if job_id != self._ocr_job_id: return + logger.info("OCR job=%s 原始文本: %s", job_id, texts) + logger.info("OCR job=%s 解析结果: %s", job_id, record) + self.records.append(record) self.update_table() cost = "" @@ -996,7 +999,7 @@ class MainWindow(QMainWindow): split_count = min(split_count, 4) if split_count <= 1 or roi_box.shape[0] < 120: - roi_inputs.append({"img": roi_box, "source": "main"}) + roi_inputs.append({"img": roi_box, "source": "main", "y_offset": 0}) else: h_box = roi_box.shape[0] step = h_box / float(split_count) @@ -1012,7 +1015,7 @@ class MainWindow(QMainWindow): ) part = roi_box[sy:ey, :] if part is not None and part.size > 0: - roi_inputs.append({"img": part, "source": "main"}) + roi_inputs.append({"img": part, "source": "main", "y_offset": sy}) except Exception: pass @@ -1043,13 +1046,15 @@ class MainWindow(QMainWindow): for item in roi_inputs: img = item.get("img") source = item.get("source", "main") + y_off = item.get("y_offset", 0) + scale = 1.0 try: if img is not None and img.shape[1] > max_w: scale = max_w / img.shape[1] img = cv2.resize(img, (int(img.shape[1] * scale), int(img.shape[0] * scale))) except Exception: pass - resized_inputs.append({"img": img, "source": source}) + resized_inputs.append({"img": img, "source": source, "y_offset": int(y_off * scale)}) logger.info( "UI 触发识别:frame=%s, rois=%s, frame_age=%.3fs", diff --git a/src/ocr_worker_process.py b/src/ocr_worker_process.py index 01d63a1..0704a0e 100644 --- a/src/ocr_worker_process.py +++ b/src/ocr_worker_process.py @@ -48,9 +48,11 @@ def run_ocr_worker(models_base_dir: str, request_q, response_q) -> None: for roi_index, entry in enumerate(images): source = "main" img = entry + y_offset = 0 if isinstance(entry, dict): source = str(entry.get("source", "main")) img = entry.get("img") + y_offset = int(entry.get("y_offset", 0)) elif roi_index > 0: source = "number" if img is None: @@ -68,10 +70,14 @@ def run_ocr_worker(models_base_dir: str, request_q, response_q) -> None: conf = float(line[1][1]) except Exception: conf = None + # 将切片内的局部坐标还原为完整 ROI 坐标 + box = line[0] + if y_offset and isinstance(box, (list, tuple)): + box = [[p[0], p[1] + y_offset] for p in box] ocr_lines.append( { "text": text, - "box": line[0], + "box": box, "conf": conf, "source": source, "roi_index": roi_index, diff --git a/src/processor.py b/src/processor.py index 82923a9..cdf2da8 100644 --- a/src/processor.py +++ b/src/processor.py @@ -11,6 +11,10 @@ ZIP_PATTERN = re.compile(r"(? def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[str, str, bool]: + """基于邮编/电话锚点的版面提取。 + + 两种模式自动切换: + - 单栏模式(信封典型排版):邮编后连续行=地址,电话行去掉电话=联系人 + - 多栏模式:左侧=地址,右侧=联系人(按 split_x 分割) + + 单栏/多栏判断:比较邮编和电话的左边缘(x1),而非中心点(cx), + 避免因文本长度不同导致误判。 + """ main_lines = [line for line in lines if line.source != "number"] if len(main_lines) < 2: return "", "", False @@ -262,6 +275,9 @@ def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[st if phone_anchor and not data["电话"]: data["电话"] = phone_anchor[1] + if not zip_anchor and not phone_anchor: + return "", "", False + if zip_anchor: start_row = zip_anchor[0].row_idx else: @@ -273,13 +289,62 @@ def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[st if start_row > end_row: start_row, end_row = end_row, start_row + # ── 单栏/多栏判断:用左边缘 x1 而非中心点 cx ── single_column_mode = False if zip_anchor and phone_anchor: + zip_x1 = zip_anchor[0].x1 if zip_anchor[0].x1 is not None else zip_anchor[0].cx + phone_x1 = phone_anchor[0].x1 if phone_anchor[0].x1 is not None else phone_anchor[0].cx line_widths = [line.width for line in main_lines if line.width > 0] width_ref = median(line_widths) if line_widths else 120.0 - single_column_mode = abs(phone_anchor[0].cx - zip_anchor[0].cx) < max(60.0, width_ref * 0.6) + single_column_mode = abs(phone_x1 - zip_x1) < max(60.0, width_ref * 0.4) - if zip_anchor and phone_anchor and phone_anchor[0].cx > zip_anchor[0].cx and not single_column_mode: + # ════════════════════════════════════════════ + # 单栏模式:邮编后连续行=地址,电话行去掉电话=联系人 + # ════════════════════════════════════════════ + if single_column_mode: + # 从电话行提取联系人 + contact_text = "" + if phone_anchor: + remainder = clean_text(phone_anchor[0].text.replace(phone_anchor[1], "")) + if remainder and not re.fullmatch(r"\d{2,20}", remainder): + contact_text = _sanitize_contact(remainder) + + # 邮编行之后、电话行之前的所有行 → 地址 + address_entries: List[Tuple[int, int, str]] = [] + for line in main_lines: + if line.row_idx < start_row or line.row_idx > end_row: + continue + if phone_anchor and line is phone_anchor[0]: + continue + text = line.text + if zip_anchor and line is zip_anchor[0]: + text = text.replace(zip_anchor[1], "") + text = clean_text(text) + if not text or re.fullmatch(r"\d{6,20}", text): + continue + address_entries.append((line.row_idx, line.col_idx, text)) + + # 联系人为空时,从地址末尾回退一行 + if not contact_text and address_entries: + last_row = max(item[0] for item in address_entries) + last_entries = [item for item in address_entries if item[0] == last_row] + last_text = _join_entries(last_entries) + candidate = _sanitize_contact(last_text) + if candidate: + prev_rows = [item[0] for item in address_entries if item[0] < last_row] + # 与前面地址行有行间距 > 1,或含单位关键字 → 视为联系人 + gap = (last_row - max(prev_rows)) if prev_rows else 999 + if gap > 1 or COMPANY_HINT_PATTERN.search(last_text): + contact_text = candidate + address_entries = [item for item in address_entries if item[0] != last_row] + + address_text = _sanitize_address(_join_entries(address_entries)) + return address_text, contact_text, True + + # ════════════════════════════════════════════ + # 多栏模式:按 split_x 左右分割 + # ════════════════════════════════════════════ + if zip_anchor and phone_anchor and phone_anchor[0].cx > zip_anchor[0].cx: split_x = (zip_anchor[0].cx + phone_anchor[0].cx) / 2.0 elif phone_anchor: split_x = phone_anchor[0].cx - max(40.0, phone_anchor[0].width * 0.6) @@ -288,29 +353,19 @@ def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[st else: split_x = median([line.cx for line in main_lines]) - address_entries: List[Tuple[int, int, str]] = [] + address_entries = [] contact_entries: List[Tuple[int, int, str]] = [] for line in main_lines: if line.row_idx < start_row or line.row_idx > end_row: continue - text = line.text if zip_anchor and line is zip_anchor[0]: text = text.replace(zip_anchor[1], "") if phone_anchor and line is phone_anchor[0]: text = text.replace(phone_anchor[1], "") text = clean_text(text) - if not text: - continue - if re.fullmatch(r"\d{6,20}", text): - continue - - if single_column_mode: - if phone_anchor and line is phone_anchor[0]: - contact_entries.append((line.row_idx, line.col_idx, text)) - else: - address_entries.append((line.row_idx, line.col_idx, text)) + if not text or re.fullmatch(r"\d{6,20}", text): continue if line.cx <= split_x: @@ -318,7 +373,7 @@ def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[st else: contact_entries.append((line.row_idx, line.col_idx, text)) - # 联系人优先取靠近电话的一段,降低把地址误分到联系人的概率 + # 联系人优先取靠近电话的一段 if phone_anchor and contact_entries: phone_row = phone_anchor[0].row_idx min_dist = min(abs(item[0] - phone_row) for item in contact_entries) @@ -329,34 +384,13 @@ def _extract_with_layout(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[st contact_text = _sanitize_contact(_join_entries(contact_entries)) address_text = _sanitize_address(_join_entries(address_entries)) - # 如果联系人仍为空,尝试从“电话所在行去掉电话号码”的残余文本提取 + # 多栏模式下联系人为空的回退 if not contact_text and phone_anchor: - fallback_contact = clean_text(phone_anchor[0].text.replace(phone_anchor[1], "")) - if fallback_contact and not re.fullmatch(r"\d{2,20}", fallback_contact): - contact_text = _sanitize_contact(fallback_contact) + remainder = clean_text(phone_anchor[0].text.replace(phone_anchor[1], "")) + if remainder and not re.fullmatch(r"\d{2,20}", remainder): + contact_text = _sanitize_contact(remainder) - # 若仍缺联系人,尝试从靠近电话的地址候选中回退一行 - if not contact_text and phone_anchor and address_entries: - phone_row = phone_anchor[0].row_idx - sorted_candidates = sorted( - address_entries, - key=lambda item: (abs(item[0] - phone_row), -item[0], item[1]), - ) - for row_idx, col_idx, txt in sorted_candidates: - if ADDRESS_HINT_PATTERN.search(txt): - continue - contact_text = _sanitize_contact(txt) - if contact_text: - address_entries = [ - item - for item in address_entries - if not (item[0] == row_idx and item[1] == col_idx and item[2] == txt) - ] - address_text = _sanitize_address(_join_entries(address_entries)) - break - - has_signal = bool(zip_anchor or phone_anchor) - return address_text, contact_text, has_signal + return address_text, contact_text, True def _extract_with_text_order(lines: List[OCRLine], data: Dict[str, str]) -> Tuple[str, str, bool]: @@ -407,13 +441,22 @@ def _extract_with_text_order(lines: List[OCRLine], data: Dict[str, str]) -> Tupl if not contact_text and address_parts: for idx, text in reversed(address_parts): - if ADDRESS_HINT_PATTERN.search(text): + # 含单位关键字的直接采纳;纯地址行跳过 + if ADDRESS_HINT_PATTERN.search(text) and not COMPANY_HINT_PATTERN.search(text): continue contact_text = _sanitize_contact(text) if contact_text: address_parts = [item for item in address_parts if item[0] != idx] break + # 兜底:电话紧邻上一行即使含地址关键字也采纳(如"蒲江县宏利物流有限公司") + if not contact_text and address_parts: + last_idx, last_text = address_parts[-1] + if last_idx == phone_idx - 1: + contact_text = _sanitize_contact(last_text) + if contact_text: + address_parts = address_parts[:-1] + address_text = _sanitize_address("".join(text for _, text in address_parts)) return address_text, contact_text, True @@ -436,7 +479,7 @@ def extract_info(ocr_results: List[Any]) -> Dict[str, str]: data["电话"] = _first_match(PHONE_PATTERN, full_content) data["编号"] = _extract_tracking_number(lines, data["邮编"], data["电话"]) - # 第一优先级:使用版面坐标进行“邮编-电话锚点 + 连续块”解析 + # 第一优先级:使用版面坐标进行"邮编-电话锚点 + 连续块"解析 address_text, contact_text, used_layout = _extract_with_layout(lines, data) if not used_layout: # 第二优先级:无坐标时按文本顺序回退