增强 Word 文档 AI 解析和模板填充功能

This commit is contained in:
zzz
2026-04-10 09:48:57 +08:00
parent 7f67fa89de
commit bedf1af9c0
13 changed files with 2285 additions and 139 deletions

View File

@@ -11,6 +11,7 @@ from app.core.database import mongodb
from app.services.llm_service import llm_service
from app.core.document_parser import ParserFactory
from app.services.markdown_ai_service import markdown_ai_service
from app.services.word_ai_service import word_ai_service
logger = logging.getLogger(__name__)
@@ -173,16 +174,106 @@ class TemplateFillService:
if source_file_paths:
for file_path in source_file_paths:
try:
file_ext = file_path.lower().split('.')[-1]
# 对于 Word 文档,优先使用 AI 解析
if file_ext == 'docx':
# 使用 AI 深度解析 Word 文档
ai_result = await word_ai_service.parse_word_with_ai(
file_path=file_path,
user_hint="请提取文档中的所有结构化数据,包括表格、键值对等"
)
if ai_result.get("success"):
# AI 解析成功,转换为 SourceDocument 格式
# 注意word_ai_service 返回的是顶层数据,不是 {"data": {...}} 包装
parse_type = ai_result.get("type", "unknown")
# 构建 structured_data
doc_structured = {
"ai_parsed": True,
"parse_type": parse_type,
"tables": [],
"key_values": ai_result.get("key_values", {}) if "key_values" in ai_result else {},
"list_items": ai_result.get("list_items", []) if "list_items" in ai_result else [],
"summary": ai_result.get("summary", "") if "summary" in ai_result else ""
}
# 如果 AI 返回了表格数据
if parse_type == "table_data":
headers = ai_result.get("headers", [])
rows = ai_result.get("rows", [])
if headers and rows:
doc_structured["tables"] = [{
"headers": headers,
"rows": rows
}]
doc_structured["columns"] = headers
doc_structured["rows"] = rows
logger.info(f"AI 表格数据: {len(headers)} 列, {len(rows)}")
elif parse_type == "structured_text":
tables = ai_result.get("tables", [])
if tables:
doc_structured["tables"] = tables
logger.info(f"AI 结构化文本提取到 {len(tables)} 个表格")
# 获取摘要内容
content_text = doc_structured.get("summary", "") or ai_result.get("description", "")
source_docs.append(SourceDocument(
doc_id=file_path,
filename=file_path.split("/")[-1] if "/" in file_path else file_path.split("\\")[-1],
doc_type="docx",
content=content_text,
structured_data=doc_structured
))
logger.info(f"AI 解析 Word 文档: {file_path}, type={parse_type}, tables={len(doc_structured.get('tables', []))}")
continue # 跳过后续的基础解析
# 基础解析Excel 或非 AI 解析的 Word
parser = ParserFactory.get_parser(file_path)
result = parser.parse(file_path)
if result.success:
# result.data 的结构取决于解析器类型:
# - Excel 单 sheet: {columns: [...], rows: [...], row_count, column_count}
# - Excel 多 sheet: {sheets: {sheet_name: {columns, rows, ...}}}
# - Word/TXT: {content: "...", structured_data: {...}}
# - Word: {content: "...", paragraphs: [...], tables: [...], structured_data: {...}}
doc_data = result.data if result.data else {}
doc_content = doc_data.get("content", "") if isinstance(doc_data, dict) else ""
doc_structured = doc_data if isinstance(doc_data, dict) and "rows" in doc_data or isinstance(doc_data, dict) and "sheets" in doc_data else {}
# 根据文档类型确定 structured_data 的内容
if "sheets" in doc_data:
# Excel 多 sheet 格式
doc_structured = doc_data
elif "rows" in doc_data:
# Excel 单 sheet 格式
doc_structured = doc_data
elif "tables" in doc_data:
# Word 文档格式(已有表格)
doc_structured = doc_data
elif "paragraphs" in doc_data:
# Word 文档只有段落,没有表格 - 尝试 AI 二次解析
unstructured = doc_data
ai_result = await word_ai_service.parse_word_with_ai(
file_path=file_path,
user_hint="请提取文档中的所有结构化信息"
)
if ai_result.get("success"):
parse_type = ai_result.get("type", "text")
doc_structured = {
"ai_parsed": True,
"parse_type": parse_type,
"tables": ai_result.get("tables", []) if "tables" in ai_result else [],
"key_values": ai_result.get("key_values", {}) if "key_values" in ai_result else {},
"list_items": ai_result.get("list_items", []) if "list_items" in ai_result else [],
"summary": ai_result.get("summary", "") if "summary" in ai_result else "",
"content": doc_content
}
logger.info(f"AI 二次解析 Word 段落文档: type={parse_type}")
else:
doc_structured = unstructured
else:
doc_structured = {}
source_docs.append(SourceDocument(
doc_id=file_path,
@@ -321,11 +412,13 @@ class TemplateFillService:
# ========== 步骤3: 尝试解析 JSON ==========
# 3a. 尝试直接解析整个字符串
parsed_confidence = 0.5 # 默认置信度
try:
result = json.loads(json_text)
extracted_values = self._extract_values_from_json(result)
extracted_values, parsed_confidence = self._extract_values_from_json(result)
if extracted_values:
logger.info(f"✅ 直接解析成功,得到 {len(extracted_values)} 个值")
confidence = parsed_confidence if parsed_confidence > 0 else 0.8 # 成功提取,提高置信度
logger.info(f"✅ 直接解析成功,得到 {len(extracted_values)} 个值,置信度: {confidence}")
else:
logger.warning(f"直接解析成功但未提取到值")
except json.JSONDecodeError as e:
@@ -337,9 +430,10 @@ class TemplateFillService:
if fixed_json:
try:
result = json.loads(fixed_json)
extracted_values = self._extract_values_from_json(result)
extracted_values, parsed_confidence = self._extract_values_from_json(result)
if extracted_values:
logger.info(f"✅ 修复后解析成功,得到 {len(extracted_values)} 个值")
confidence = parsed_confidence if parsed_confidence > 0 else 0.7
logger.info(f"✅ 修复后解析成功,得到 {len(extracted_values)} 个值,置信度: {confidence}")
except json.JSONDecodeError as e2:
logger.warning(f"修复后仍然失败: {e2}")
@@ -347,10 +441,15 @@ class TemplateFillService:
if not extracted_values:
extracted_values = self._extract_values_by_regex(cleaned)
if extracted_values:
logger.info(f"✅ 正则提取成功,得到 {len(extracted_values)} 个值")
confidence = 0.6 # 正则提取置信度
logger.info(f"✅ 正则提取成功,得到 {len(extracted_values)} 个值,置信度: {confidence}")
else:
# 最后的备选:使用旧的文本提取
extracted_values = self._extract_values_from_text(cleaned, field.name)
if extracted_values:
confidence = 0.5
else:
confidence = 0.3 # 最后备选
# 如果仍然没有提取到值
if not extracted_values:
@@ -483,30 +582,27 @@ class TemplateFillService:
doc_content += " | ".join(str(cell) for cell in row) + "\n"
row_count += 1
elif doc.structured_data and doc.structured_data.get("tables"):
# Markdown 表格格式: {tables: [{headers: [...], rows: [...]}]}
# Word 文档的表格格式 - 直接输出完整表格,让 LLM 理解
tables = doc.structured_data.get("tables", [])
for table in tables:
for table_idx, table in enumerate(tables):
if isinstance(table, dict):
headers = table.get("headers", [])
rows = table.get("rows", [])
if rows and headers:
doc_content += f"\n【文档: {doc.filename} - 表格】\n"
doc_content += " | ".join(str(h) for h in headers) + "\n"
for row in rows:
table_rows = table.get("rows", [])
if table_rows:
doc_content += f"\n【文档: {doc.filename} - 表格{table_idx + 1},共 {len(table_rows)} 行】\n"
# 输出表头
if table_rows and isinstance(table_rows[0], list):
doc_content += "表头: " + " | ".join(str(cell) for cell in table_rows[0]) + "\n"
# 输出所有数据行
for row_idx, row in enumerate(table_rows[1:], start=1): # 跳过表头
if isinstance(row, list):
doc_content += " | ".join(str(cell) for cell in row) + "\n"
row_count += 1
# 如果有标题结构,也添加上下文
if doc.structured_data.get("titles"):
titles = doc.structured_data.get("titles", [])
doc_content += f"\n【文档章节结构】\n"
for title in titles[:20]: # 限制前20个标题
doc_content += f"{'#' * title.get('level', 1)} {title.get('text', '')}\n"
# 如果没有提取到表格内容,使用纯文本
if not doc_content.strip():
doc_content = doc.content[:5000] if doc.content else ""
doc_content += "" + str(row_idx) + ": " + " | ".join(str(cell) for cell in row) + "\n"
row_count += 1
elif doc.content:
doc_content = doc.content[:5000]
# 普通文本内容Word 段落、纯文本等)
content_preview = doc.content[:8000] if doc.content else ""
if content_preview:
doc_content = f"\n【文档: {doc.filename} ({doc.doc_type})】\n{content_preview}"
row_count = len(content_preview.split('\n'))
if doc_content:
doc_context = f"【文档: {doc.filename} ({doc.doc_type})】\n{doc_content}"
@@ -614,8 +710,20 @@ class TemplateFillService:
logger.info(f"读取 Excel 表头: {df.shape}, 列: {list(df.columns)[:10]}")
# 如果 DataFrame 列为空或只有默认索引,尝试其他方式
if len(df.columns) == 0 or (len(df.columns) == 1 and df.columns[0] == 0):
# 如果 DataFrame 列为空或只有默认索引0, 1, 2... 或 Unnamed: 0, Unnamed: 1...,尝试其他方式
needs_reparse = len(df.columns) == 0
if not needs_reparse:
# 检查是否所有列都是自动生成的0, 1, 2... 或 Unnamed: 0, Unnamed: 1...
auto_generated_count = 0
for col in df.columns:
col_str = str(col)
if col_str in ['0', '1', '2'] or col_str.startswith('Unnamed'):
auto_generated_count += 1
# 如果超过50%的列是自动生成的,认为表头解析失败
if auto_generated_count >= len(df.columns) * 0.5:
needs_reparse = True
if needs_reparse:
logger.warning(f"表头解析结果异常,重新解析: {df.columns}")
# 尝试读取整个文件获取列信息
df_full = pd.read_excel(file_path, header=None)
@@ -656,16 +764,26 @@ class TemplateFillService:
doc = Document(file_path)
for table_idx, table in enumerate(doc.tables):
for row_idx, row in enumerate(table.rows):
rows = list(table.rows)
if len(rows) < 2:
continue # 跳过少于2行的表格需要表头+数据)
# 第一行是表头,用于识别字段位置
header_cells = [cell.text.strip() for cell in rows[0].cells]
logger.info(f"Word 表格 {table_idx} 表头: {header_cells}")
# 从第二行开始是数据行(比赛模板格式:字段名 | 提示词 | 填写值)
for row_idx, row in enumerate(rows[1:], start=1):
cells = [cell.text.strip() for cell in row.cells]
# 假设第一列是字段名
# 第一列是字段名
if cells and cells[0]:
field_name = cells[0]
# 第二列是提示词
hint = cells[1] if len(cells) > 1 else ""
# 跳过空行或标题
if field_name and field_name not in ["", "字段名", "名称", "项目"]:
# 跳过空行或明显是表头的
if field_name and field_name not in ["", "字段名", "名称", "项目", "序号", "编号"]:
fields.append(TemplateField(
cell=f"T{table_idx}R{row_idx}",
name=field_name,
@@ -673,9 +791,10 @@ class TemplateFillService:
required=True,
hint=hint
))
logger.info(f" 提取字段: {field_name}, hint: {hint}")
except Exception as e:
logger.error(f"从Word提取字段失败: {str(e)}")
logger.error(f"从Word提取字段失败: {str(e)}", exc_info=True)
return fields
@@ -783,23 +902,101 @@ class TemplateFillService:
logger.info(f"从文档 {doc.filename} 提取到 {len(values)} 个值")
break
# 处理 Markdown 表格格式: {tables: [{headers: [...], rows: [...]}]}
# 处理 Word 文档表格格式: {tables: [{headers: [...], rows: [[]], ...}]}
elif structured.get("tables"):
tables = structured.get("tables", [])
for table in tables:
if isinstance(table, dict):
headers = table.get("headers", [])
rows = table.get("rows", [])
values = self._extract_column_values(rows, headers, field_name)
if values:
all_values.extend(values)
logger.info(f"从 Markdown 表格提取到 {len(values)} 个值")
break
# AI 返回格式: {headers: [...], rows: [[row1], [row2], ...]}
# 原始 Word 格式: {rows: [[header], [row1], [row2], ...]}
table_rows = table.get("rows", [])
headers = table.get("headers", []) # AI 返回的 headers
if not headers and table_rows:
# 原始格式:第一个元素是表头
headers = table_rows[0] if table_rows else []
data_rows = table_rows[1:] if len(table_rows) > 1 else []
else:
# AI 返回格式headers 和 rows 分开
data_rows = table_rows
if headers and data_rows:
values = self._extract_values_from_docx_table(data_rows, headers, field_name)
if values:
all_values.extend(values)
logger.info(f"从 Word 表格提取到 {len(values)} 个值")
break
if all_values:
break
return all_values
def _extract_values_from_docx_table(self, table_rows: List, header: List, field_name: str) -> List[str]:
"""
从 Word 文档表格中提取指定列的值
Args:
table_rows: 表格所有行(包括表头)
header: 表头行(可能是真正的表头,也可能是第一行数据)
field_name: 要提取的字段名
Returns:
值列表
"""
if not table_rows or len(table_rows) < 2:
return []
# 第一步:尝试在 header第一行中查找匹配列
target_col_idx = None
for col_idx, col_name in enumerate(header):
col_str = str(col_name).strip()
if field_name.lower() in col_str.lower() or col_str.lower() in field_name.lower():
target_col_idx = col_idx
break
# 如果 header 中没找到,尝试在 table_rows[1](第二行)中查找
# 这是因为有时第一行是数据而不是表头
if target_col_idx is None and len(table_rows) > 1:
second_row = table_rows[1]
if isinstance(second_row, list):
for col_idx, col_name in enumerate(second_row):
col_str = str(col_name).strip()
if field_name.lower() in col_str.lower() or col_str.lower() in field_name.lower():
target_col_idx = col_idx
logger.info(f"在第二行找到匹配列: {field_name} @ 列{col_idx}, header={header}")
break
if target_col_idx is None:
logger.warning(f"未找到匹配列: {field_name}, 表头: {header}")
return []
# 确定从哪一行开始提取数据
# 如果 header 是表头(包含 field_name则从 table_rows[1] 开始提取
# 如果 header 是数据(不包含 field_name则从 table_rows[2] 开始提取
header_contains_field = any(
field_name.lower() in str(col).strip().lower() or str(col).strip().lower() in field_name.lower()
for col in header
)
if header_contains_field:
# header 是表头,从第二行开始提取
data_start_idx = 1
else:
# header 是数据,从第三行开始提取(跳过表头和第一行数据)
data_start_idx = 2
# 提取值
values = []
for row_idx, row in enumerate(table_rows[data_start_idx:], start=data_start_idx):
if isinstance(row, list) and target_col_idx < len(row):
val = str(row[target_col_idx]).strip() if row[target_col_idx] else ""
values.append(val)
elif isinstance(row, dict):
val = str(row.get(target_col_idx, "")).strip()
values.append(val)
logger.info(f"从 Word 表格列 {target_col_idx} 提取到 {len(values)} 个值: {values[:3]}")
return values
def _extract_column_values(self, rows: List, columns: List, field_name: str) -> List[str]:
"""
从 rows 和 columns 中提取指定列的值
@@ -839,27 +1036,37 @@ class TemplateFillService:
return values
def _extract_values_from_json(self, result) -> List[str]:
def _extract_values_from_json(self, result) -> tuple:
"""
从解析后的 JSON 对象/数组中提取值数组
从解析后的 JSON 对象/数组中提取值数组和置信度
Args:
result: json.loads() 返回的对象
Returns:
值列表
(值列表, 置信度) 元组
"""
# 提取置信度
confidence = 0.5
if isinstance(result, dict) and "confidence" in result:
try:
conf = float(result["confidence"])
if 0 <= conf <= 1:
confidence = conf
except (ValueError, TypeError):
pass
if isinstance(result, dict):
# 优先找 values 数组
if "values" in result and isinstance(result["values"], list):
vals = [str(v).strip() for v in result["values"] if v and str(v).strip()]
if vals:
return vals
return vals, confidence
# 尝试找 value 字段
if "value" in result:
val = str(result["value"]).strip()
if val:
return [val]
return [val], confidence
# 尝试找任何数组类型的键
for key in result.keys():
val = result[key]
@@ -867,14 +1074,14 @@ class TemplateFillService:
if all(isinstance(v, (str, int, float, bool)) or v is None for v in val):
vals = [str(v).strip() for v in val if v is not None and str(v).strip()]
if vals:
return vals
return vals, confidence
elif isinstance(val, (str, int, float, bool)):
return [str(val).strip()]
return [str(val).strip()], confidence
elif isinstance(result, list):
vals = [str(v).strip() for v in result if v is not None and str(v).strip()]
if vals:
return vals
return []
return vals, confidence
return [], confidence
def _fix_json(self, json_text: str) -> str:
"""
@@ -1189,14 +1396,15 @@ class TemplateFillService:
json_text = cleaned[json_start:]
try:
result = json.loads(json_text)
values = self._extract_values_from_json(result)
values, parsed_conf = self._extract_values_from_json(result)
if values:
conf = result.get("confidence", parsed_conf) if isinstance(result, dict) else parsed_conf
return FillResult(
field=field.name,
values=values,
value=values[0] if values else "",
source=f"AI分析: {doc.filename}",
confidence=result.get("confidence", 0.8)
confidence=max(conf, 0.8) # 最低0.8
)
except json.JSONDecodeError:
# 尝试修复 JSON
@@ -1204,14 +1412,15 @@ class TemplateFillService:
if fixed:
try:
result = json.loads(fixed)
values = self._extract_values_from_json(result)
values, parsed_conf = self._extract_values_from_json(result)
if values:
conf = result.get("confidence", parsed_conf) if isinstance(result, dict) else parsed_conf
return FillResult(
field=field.name,
values=values,
value=values[0] if values else "",
source=f"AI分析: {doc.filename}",
confidence=result.get("confidence", 0.8)
confidence=max(conf, 0.8) # 最低0.8
)
except json.JSONDecodeError:
pass
@@ -1242,6 +1451,8 @@ class TemplateFillService:
try:
import pandas as pd
content_sample = ""
# 读取 Excel 内容检查是否为空
if file_type in ["xlsx", "xls"]:
df = pd.read_excel(file_path, header=None)
@@ -1265,9 +1476,35 @@ class TemplateFillService:
content_sample = df.iloc[:10].to_string() if len(df) >= 10 else df.to_string()
else:
content_sample = df.to_string()
else:
elif file_type == "docx":
# Word 文档:尝试使用 docx_parser 提取内容
try:
from docx import Document
doc = Document(file_path)
# 提取段落文本
paragraphs = [p.text.strip() for p in doc.paragraphs if p.text.strip()]
tables_text = ""
# 提取表格
if doc.tables:
for table in doc.tables:
for row in table.rows:
row_text = " | ".join(cell.text.strip() for cell in row.cells)
tables_text += row_text + "\n"
content_sample = f"【段落】\n{' '.join(paragraphs[:20])}\n\n【表格】\n{tables_text}"
logger.info(f"Word 文档内容预览: {len(content_sample)} 字符")
except Exception as e:
logger.warning(f"读取 Word 文档失败: {str(e)}")
content_sample = ""
else:
logger.warning(f"不支持的文件类型进行 AI 表头生成: {file_type}")
return None
# 调用 AI 生成表头
prompt = f"""你是一个专业的表格设计助手。请为以下空白表格生成合适的表头字段。