Compare commits

..

2 Commits

Author SHA1 Message Date
ed0f51f2a4 Merge branch 'main' of https://gitea.kronecker.cc/OurCodesAreAllRight/FilesReadSystem 2026-04-10 00:26:57 +08:00
ecc0c79475 增强模板填写服务支持表格内容摘要和表头重生成
- 在源文档解析过程中增加表格内容摘要功能,提取表格结构用于AI理解
- 新增表格摘要逻辑,包括表头和前3行数据的提取和格式化
- 添加模板文件类型识别,支持xlsx和docx格式判断
- 实现基于源文档内容的表头自动重生成功能
- 当检测到自动生成的表头时,使用源文档内容重新生成更准确的字段
- 增加详细的调试日志用于跟踪表格处理过程
2026-04-10 00:26:54 +08:00
2 changed files with 146 additions and 7 deletions

View File

@@ -186,13 +186,51 @@ async def upload_joint_template(
parser = ParserFactory.get_parser(sf_path)
parse_result = parser.parse(sf_path)
if parse_result.success and parse_result.data:
# 获取原始内容
content = parse_result.data.get("content", "")[:5000] if parse_result.data.get("content") else ""
# 获取标题可能在顶层或structured_data内
titles = parse_result.data.get("titles", [])
if not titles and parse_result.data.get("structured_data"):
titles = parse_result.data.get("structured_data", {}).get("titles", [])
titles = titles[:10] if titles else []
# 获取表格数量可能在顶层或structured_data内
tables = parse_result.data.get("tables", [])
if not tables and parse_result.data.get("structured_data"):
tables = parse_result.data.get("structured_data", {}).get("tables", [])
tables_count = len(tables) if tables else 0
# 获取表格内容摘要(用于 AI 理解源文档结构)
tables_summary = ""
if tables:
tables_summary = "\n【文档中的表格】:\n"
for idx, table in enumerate(tables[:5]): # 最多5个表格
if isinstance(table, dict):
headers = table.get("headers", [])
rows = table.get("rows", [])
if headers:
tables_summary += f"表格{idx+1}表头: {', '.join(str(h) for h in headers)}\n"
if rows:
tables_summary += f"表格{idx+1}前3行: "
for row_idx, row in enumerate(rows[:3]):
if isinstance(row, list):
tables_summary += " | ".join(str(c) for c in row) + "; "
elif isinstance(row, dict):
tables_summary += " | ".join(str(row.get(h, "")) for h in headers if headers) + "; "
tables_summary += "\n"
source_contents.append({
"filename": sf.filename,
"doc_type": sf_ext,
"content": parse_result.data.get("content", "")[:5000] if parse_result.data.get("content") else "",
"titles": parse_result.data.get("titles", [])[:10] if parse_result.data.get("titles") else [],
"tables_count": len(parse_result.data.get("tables", [])) if parse_result.data.get("tables") else 0
"content": content,
"titles": titles,
"tables_count": tables_count,
"tables_summary": tables_summary
})
logger.info(f"[DEBUG] source_contents built: filename={sf.filename}, content_len={len(content)}, titles_count={len(titles)}, tables_count={tables_count}")
if tables_summary:
logger.info(f"[DEBUG] tables_summary preview: {tables_summary[:300]}")
except Exception as e:
logger.warning(f"解析源文档失败 {sf.filename}: {e}")
@@ -365,12 +403,23 @@ async def fill_template(
for f in request.template_fields
]
# 从 template_id 提取文件类型
template_file_type = "xlsx" # 默认类型
if request.template_id:
ext = request.template_id.split('.')[-1].lower()
if ext in ["xlsx", "xls"]:
template_file_type = "xlsx"
elif ext == "docx":
template_file_type = "docx"
# 执行填写
result = await template_fill_service.fill_template(
template_fields=fields,
source_doc_ids=request.source_doc_ids,
source_file_paths=request.source_file_paths,
user_hint=request.user_hint
user_hint=request.user_hint,
template_id=request.template_id,
template_file_type=template_file_type
)
return result

View File

@@ -60,7 +60,9 @@ class TemplateFillService:
template_fields: List[TemplateField],
source_doc_ids: Optional[List[str]] = None,
source_file_paths: Optional[List[str]] = None,
user_hint: Optional[str] = None
user_hint: Optional[str] = None,
template_id: Optional[str] = None,
template_file_type: Optional[str] = "xlsx"
) -> Dict[str, Any]:
"""
填写表格模板
@@ -70,6 +72,8 @@ class TemplateFillService:
source_doc_ids: 源文档 MongoDB ID 列表
source_file_paths: 源文档文件路径列表
user_hint: 用户提示(如"请从合同文档中提取"
template_id: 模板文件路径(用于重新生成表头)
template_file_type: 模板文件类型
Returns:
填写结果
@@ -94,6 +98,78 @@ class TemplateFillService:
if not source_docs:
logger.warning("没有找到源文档,填表结果将全部为空")
# 3. 检查是否需要使用源文档重新生成表头
# 条件:源文档已加载 AND 现有字段看起来是自动生成的(如"字段1"、"字段2"
needs_regenerate_headers = (
len(source_docs) > 0 and
len(template_fields) > 0 and
all(self._is_auto_generated_field(f.name) for f in template_fields)
)
if needs_regenerate_headers:
logger.info(f"检测到自动生成表头,尝试使用源文档重新生成... (当前字段: {[f.name for f in template_fields]})")
# 将 SourceDocument 转换为 source_contents 格式
source_contents = []
for doc in source_docs:
structured = doc.structured_data if doc.structured_data else {}
# 获取标题
titles = structured.get("titles", [])
if not titles:
titles = []
# 获取表格
tables = structured.get("tables", [])
tables_count = len(tables) if tables else 0
# 生成表格摘要
tables_summary = ""
if tables:
tables_summary = "\n【文档中的表格】:\n"
for idx, table in enumerate(tables[:5]):
if isinstance(table, dict):
headers = table.get("headers", [])
rows = table.get("rows", [])
if headers:
tables_summary += f"表格{idx+1}表头: {', '.join(str(h) for h in headers)}\n"
if rows:
tables_summary += f"表格{idx+1}前3行: "
for row_idx, row in enumerate(rows[:3]):
if isinstance(row, list):
tables_summary += " | ".join(str(c) for c in row) + "; "
elif isinstance(row, dict):
tables_summary += " | ".join(str(row.get(h, "")) for h in headers if headers) + "; "
tables_summary += "\n"
source_contents.append({
"filename": doc.filename,
"doc_type": doc.doc_type,
"content": doc.content[:5000] if doc.content else "",
"titles": titles[:10] if titles else [],
"tables_count": tables_count,
"tables_summary": tables_summary
})
# 使用源文档内容重新生成表头
if template_id and template_file_type:
logger.info(f"使用源文档重新生成表头: template_id={template_id}, template_file_type={template_file_type}")
new_fields = await self.get_template_fields_from_file(
template_id,
template_file_type,
source_contents=source_contents
)
if new_fields and len(new_fields) > 0:
logger.info(f"成功重新生成表头: {[f.name for f in new_fields]}")
template_fields = new_fields
else:
logger.warning("重新生成表头返回空结果,使用原始字段")
else:
logger.warning("无法重新生成表头:缺少 template_id 或 template_file_type")
else:
if source_docs and template_fields:
logger.info(f"表头看起来正常(非自动生成),无需重新生成: {[f.name for f in template_fields[:5]]}")
# 2. 对每个字段进行提取
for idx, field in enumerate(template_fields):
try:
@@ -1532,7 +1608,10 @@ class TemplateFillService:
# 调用 AI 生成表头
# 根据源文档内容生成表头
source_info = ""
logger.info(f"[DEBUG] _generate_fields_with_ai received source_contents: {len(source_contents) if source_contents else 0} items")
if source_contents:
for sc in source_contents:
logger.info(f"[DEBUG] source doc: filename={sc.get('filename')}, content_len={len(sc.get('content', ''))}, titles={len(sc.get('titles', []))}, tables_count={sc.get('tables_count', 0)}, has_tables_summary={bool(sc.get('tables_summary'))}")
source_info = "\n\n【源文档内容摘要】(根据以下文档内容生成表头):\n"
for idx, src in enumerate(source_contents[:5]): # 最多5个源文档
filename = src.get("filename", f"文档{idx+1}")
@@ -1540,13 +1619,24 @@ class TemplateFillService:
content = src.get("content", "")[:3000] # 限制内容长度
titles = src.get("titles", [])[:10] # 最多10个标题
tables_count = src.get("tables_count", 0)
tables_summary = src.get("tables_summary", "")
source_info += f"\n--- 文档 {idx+1}: {filename} ({doc_type}) ---\n"
# 处理 titles可能是字符串列表或字典列表
if titles:
source_info += f"【章节标题】: {', '.join([t.get('text', '') for t in titles[:5]])}\n"
title_texts = []
for t in titles[:5]:
if isinstance(t, dict):
title_texts.append(t.get('text', ''))
else:
title_texts.append(str(t))
if title_texts:
source_info += f"【章节标题】: {', '.join(title_texts)}\n"
if tables_count > 0:
source_info += f"【包含表格数】: {tables_count}\n"
if content:
if tables_summary:
source_info += f"{tables_summary}\n"
elif content:
source_info += f"【内容预览】: {content[:1500]}...\n"
prompt = f"""你是一个专业的表格设计助手。请根据源文档内容生成合适的表格表头字段。