改进智能填表功能:支持Markdown表格提取和修复LLM调用

- 新增对MongoDB存储的tables格式支持,直接从structured_data.tables提取数据
- 修复max_tokens值过大问题(50000→4000),解决DeepSeek API限制
- 增强列名匹配算法,支持模糊匹配
- 添加详细日志便于调试结构化数据提取过程

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dj
2026-04-09 21:42:07 +08:00
parent 718f864926
commit 78417c898a

View File

@@ -77,12 +77,19 @@ class TemplateFillService:
fill_details = [] fill_details = []
logger.info(f"开始填表: {len(template_fields)} 个字段, {len(source_doc_ids or [])} 个源文档") logger.info(f"开始填表: {len(template_fields)} 个字段, {len(source_doc_ids or [])} 个源文档")
logger.info(f"source_doc_ids: {source_doc_ids}")
logger.info(f"source_file_paths: {source_file_paths}")
# 1. 加载源文档内容 # 1. 加载源文档内容
source_docs = await self._load_source_documents(source_doc_ids, source_file_paths) source_docs = await self._load_source_documents(source_doc_ids, source_file_paths)
logger.info(f"加载了 {len(source_docs)} 个源文档") logger.info(f"加载了 {len(source_docs)} 个源文档")
# 打印每个加载的文档的详细信息
for i, doc in enumerate(source_docs):
logger.info(f" 文档[{i}]: id={doc.doc_id}, filename={doc.filename}, doc_type={doc.doc_type}")
logger.info(f" content长度: {len(doc.content)}, structured_data keys: {list(doc.structured_data.keys()) if doc.structured_data else 'None'}")
if not source_docs: if not source_docs:
logger.warning("没有找到源文档,填表结果将全部为空") logger.warning("没有找到源文档,填表结果将全部为空")
@@ -157,14 +164,21 @@ class TemplateFillService:
try: try:
doc = await mongodb.get_document(doc_id) doc = await mongodb.get_document(doc_id)
if doc: if doc:
sd = doc.get("structured_data", {})
sd_keys = list(sd.keys()) if sd else []
logger.info(f"从MongoDB加载文档: {doc_id}, doc_type={doc.get('doc_type')}, structured_data keys={sd_keys}")
if sd.get("tables"):
logger.info(f" tables数量: {len(sd.get('tables', []))}")
if sd["tables"]:
first_table = sd["tables"][0]
logger.info(f" 第一表格: headers={first_table.get('headers', [])[:3]}..., rows数量={len(first_table.get('rows', []))}")
source_docs.append(SourceDocument( source_docs.append(SourceDocument(
doc_id=doc_id, doc_id=doc_id,
filename=doc.get("metadata", {}).get("original_filename", "unknown"), filename=doc.get("metadata", {}).get("original_filename", "unknown"),
doc_type=doc.get("doc_type", "unknown"), doc_type=doc.get("doc_type", "unknown"),
content=doc.get("content", ""), content=doc.get("content", ""),
structured_data=doc.get("structured_data", {}) structured_data=sd
)) ))
logger.info(f"从MongoDB加载文档: {doc_id}")
except Exception as e: except Exception as e:
logger.error(f"从MongoDB加载文档失败 {doc_id}: {str(e)}") logger.error(f"从MongoDB加载文档失败 {doc_id}: {str(e)}")
@@ -178,10 +192,48 @@ class TemplateFillService:
# result.data 的结构取决于解析器类型: # result.data 的结构取决于解析器类型:
# - Excel 单 sheet: {columns: [...], rows: [...], row_count, column_count} # - Excel 单 sheet: {columns: [...], rows: [...], row_count, column_count}
# - Excel 多 sheet: {sheets: {sheet_name: {columns, rows, ...}}} # - Excel 多 sheet: {sheets: {sheet_name: {columns, rows, ...}}}
# - Markdown: {content: "...", tables: [...], structured_data: {tables: [...]}}
# - Word/TXT: {content: "...", structured_data: {...}} # - Word/TXT: {content: "...", structured_data: {...}}
doc_data = result.data if result.data else {} doc_data = result.data if result.data else {}
doc_content = doc_data.get("content", "") if isinstance(doc_data, dict) else "" doc_content = doc_data.get("content", "") if isinstance(doc_data, dict) else ""
doc_structured = doc_data if isinstance(doc_data, dict) and "rows" in doc_data or isinstance(doc_data, dict) and "sheets" in doc_data else {}
# 检查并提取 structured_data
doc_structured = {}
if isinstance(doc_data, dict):
logger.info(f"文档 {file_path} doc_data keys: {list(doc_data.keys())}")
# Excel 多 sheet
if "sheets" in doc_data:
doc_structured = doc_data
logger.info(f" -> 使用 Excel 多 sheet 格式")
# Excel 单 sheet 或有 rows 的格式
elif "rows" in doc_data:
doc_structured = doc_data
logger.info(f" -> 使用 rows 格式,列数: {len(doc_data.get('columns', []))}")
# Markdown 格式tables 可能直接在 doc_data.tables 或在 structured_data.tables 中
elif "tables" in doc_data and doc_data["tables"]:
# Markdown: tables 直接在 doc_data 中
tables = doc_data["tables"]
first_table = tables[0]
doc_structured = {
"headers": first_table.get("headers", []),
"rows": first_table.get("rows", [])
}
logger.info(f" -> 使用 doc_data.tables 格式,表头: {doc_structured.get('headers', [])[:5]}")
elif "structured_data" in doc_data and isinstance(doc_data["structured_data"], dict):
# Markdown: tables 在 structured_data 中
tables = doc_data["structured_data"].get("tables", [])
if tables:
first_table = tables[0]
doc_structured = {
"headers": first_table.get("headers", []),
"rows": first_table.get("rows", [])
}
logger.info(f" -> 使用 structured_data.tables 格式,表头: {doc_structured.get('headers', [])[:5]}")
else:
logger.warning(f" -> structured_data.tables 为空")
else:
logger.warning(f" -> 未识别的文档格式,无 structured_data")
source_docs.append(SourceDocument( source_docs.append(SourceDocument(
doc_id=file_path, doc_id=file_path,
@@ -270,7 +322,7 @@ class TemplateFillService:
response = await self.llm.chat( response = await self.llm.chat(
messages=messages, messages=messages,
temperature=0.1, temperature=0.1,
max_tokens=50000 max_tokens=4000
) )
content = self.llm.extract_message_content(response) content = self.llm.extract_message_content(response)
@@ -675,7 +727,7 @@ class TemplateFillService:
def _extract_values_from_structured_data(self, source_docs: List[SourceDocument], field_name: str) -> List[str]: def _extract_values_from_structured_data(self, source_docs: List[SourceDocument], field_name: str) -> List[str]:
""" """
从结构化数据Excel rows中直接提取指定列的值 从结构化数据Excel rows 或 Markdown tables)中直接提取指定列的值
适用于有 rows 结构的文档数据,无需 LLM 即可提取 适用于有 rows 结构的文档数据,无需 LLM 即可提取
@@ -687,10 +739,15 @@ class TemplateFillService:
值列表,如果无法提取则返回空列表 值列表,如果无法提取则返回空列表
""" """
all_values = [] all_values = []
logger.info(f"[_extract_values_from_structured_data] 开始提取字段: {field_name}")
logger.info(f" source_docs 数量: {len(source_docs)}")
for doc in source_docs: for doc_idx, doc in enumerate(source_docs):
# 尝试从 structured_data 中提取 # 尝试从 structured_data 中提取
structured = doc.structured_data structured = doc.structured_data
logger.info(f" 文档[{doc_idx}]: {doc.filename}, structured类型: {type(structured)}, 是否为空: {not bool(structured)}")
if structured:
logger.info(f" structured_data keys: {list(structured.keys())}")
if not structured: if not structured:
continue continue
@@ -710,6 +767,33 @@ class TemplateFillService:
if all_values: if all_values:
break break
# 处理 Markdown 表格格式: {headers: [...], rows: [...], ...}
elif structured.get("headers") and structured.get("rows"):
headers = structured.get("headers", [])
rows = structured.get("rows", [])
values = self._extract_values_from_markdown_table(headers, rows, field_name)
if values:
all_values.extend(values)
logger.info(f"从 Markdown 文档 {doc.filename} 提取到 {len(values)} 个值")
break
# 处理 MongoDB 存储的 tables 格式: {tables: [{headers, rows, ...}, ...]}
elif structured.get("tables") and isinstance(structured.get("tables"), list):
tables = structured.get("tables", [])
logger.info(f" 检测到 tables 格式,共 {len(tables)} 个表")
for table_idx, table in enumerate(tables):
if isinstance(table, dict):
headers = table.get("headers", [])
rows = table.get("rows", [])
logger.info(f" 表格[{table_idx}]: headers={headers[:3]}..., rows数量={len(rows)}")
values = self._extract_values_from_markdown_table(headers, rows, field_name)
if values:
all_values.extend(values)
logger.info(f"从表格[{table_idx}] 提取到 {len(values)} 个值")
break
if all_values:
break
# 处理单 sheet 格式: {columns: [...], rows: [...]} # 处理单 sheet 格式: {columns: [...], rows: [...]}
elif structured.get("rows"): elif structured.get("rows"):
columns = structured.get("columns", []) columns = structured.get("columns", [])
@@ -722,6 +806,100 @@ class TemplateFillService:
return all_values return all_values
def _extract_values_from_markdown_table(self, headers: List, rows: List, field_name: str) -> List[str]:
"""
从 Markdown 表格中提取指定列的值
Markdown 表格格式:
- headers: ["col1", "col2", ...]
- rows: [["val1", "val2", ...], ...]
Args:
headers: 表头列表
rows: 数据行列表
field_name: 要提取的字段名
Returns:
值列表
"""
if not rows or not headers:
logger.warning(f"Markdown 表格为空: headers={headers}, rows={len(rows) if rows else 0}")
return []
# 查找匹配的列索引 - 使用增强的匹配算法
target_idx = self._find_best_matching_column(headers, field_name)
if target_idx is None:
logger.warning(f"未找到匹配列: {field_name}, 可用表头: {headers}")
return []
logger.info(f"列匹配成功: {field_name} -> {headers[target_idx]} (索引: {target_idx})")
values = []
for row in rows:
if isinstance(row, list) and target_idx < len(row):
val = row[target_idx]
else:
val = ""
values.append(self._format_value(val))
return values
def _find_best_matching_column(self, headers: List, field_name: str) -> Optional[int]:
"""
查找最佳匹配的列索引
使用多层匹配策略:
1. 精确匹配(忽略大小写)
2. 子字符串匹配(字段名在表头中,或表头在字段名中)
3. 关键词重叠匹配(中文字符串分割后比对)
Args:
headers: 表头列表
field_name: 要匹配的字段名
Returns:
匹配的列索引,找不到返回 None
"""
field_lower = field_name.lower().strip()
field_keywords = set(field_lower.replace(" ", "").split())
best_match_idx = None
best_match_score = 0
for idx, header in enumerate(headers):
header_str = str(header).strip()
header_lower = header_str.lower()
# 策略1: 精确匹配(忽略大小写)
if header_lower == field_lower:
return idx
# 策略2: 子字符串匹配
if field_lower in header_lower or header_lower in field_lower:
# 计算匹配分数(较长匹配更优先)
score = max(len(field_lower), len(header_lower)) / min(len(field_lower) + 1, len(header_lower) + 1)
if score > best_match_score:
best_match_score = score
best_match_idx = idx
continue
# 策略3: 关键词重叠匹配(适用于中文)
header_keywords = set(header_lower.replace(" ", "").split())
overlap = field_keywords & header_keywords
if overlap and len(overlap) > 0:
score = len(overlap) / max(len(field_keywords), len(header_keywords), 1)
if score > best_match_score:
best_match_score = score
best_match_idx = idx
# 只有当匹配分数超过阈值时才返回
if best_match_score >= 0.3:
logger.info(f"模糊匹配: {field_name} -> {headers[best_match_idx]} (分数: {best_match_score:.2f})")
return best_match_idx
return None
def _extract_column_values(self, rows: List, columns: List, field_name: str) -> List[str]: def _extract_column_values(self, rows: List, columns: List, field_name: str) -> List[str]:
""" """
从 rows 和 columns 中提取指定列的值 从 rows 和 columns 中提取指定列的值
@@ -737,24 +915,22 @@ class TemplateFillService:
if not rows or not columns: if not rows or not columns:
return [] return []
# 查找匹配的列(模糊匹配) # 使用增强的匹配算法查找最佳匹配的列索引
target_col = None target_idx = self._find_best_matching_column(columns, field_name)
for col in columns:
col_str = str(col)
if field_name.lower() in col_str.lower() or col_str.lower() in field_name.lower():
target_col = col
break
if not target_col: if target_idx is None:
logger.warning(f"未找到匹配列: {field_name}, 可用列: {columns}") logger.warning(f"未找到匹配列: {field_name}, 可用列: {columns}")
return [] return []
target_col = columns[target_idx]
logger.info(f"列匹配成功: {field_name} -> {target_col} (索引: {target_idx})")
values = [] values = []
for row in rows: for row in rows:
if isinstance(row, dict): if isinstance(row, dict):
val = row.get(target_col, "") val = row.get(target_col, "")
elif isinstance(row, list) and target_col in columns: elif isinstance(row, list) and target_idx < len(row):
val = row[columns.index(target_col)] val = row[target_idx]
else: else:
val = "" val = ""
values.append(self._format_value(val)) values.append(self._format_value(val))