diff --git a/backend/app/api/endpoints/templates.py b/backend/app/api/endpoints/templates.py index d1dcf46..a248dde 100644 --- a/backend/app/api/endpoints/templates.py +++ b/backend/app/api/endpoints/templates.py @@ -186,13 +186,51 @@ async def upload_joint_template( parser = ParserFactory.get_parser(sf_path) parse_result = parser.parse(sf_path) if parse_result.success and parse_result.data: + # 获取原始内容 + content = parse_result.data.get("content", "")[:5000] if parse_result.data.get("content") else "" + + # 获取标题(可能在顶层或structured_data内) + titles = parse_result.data.get("titles", []) + if not titles and parse_result.data.get("structured_data"): + titles = parse_result.data.get("structured_data", {}).get("titles", []) + titles = titles[:10] if titles else [] + + # 获取表格数量(可能在顶层或structured_data内) + tables = parse_result.data.get("tables", []) + if not tables and parse_result.data.get("structured_data"): + tables = parse_result.data.get("structured_data", {}).get("tables", []) + tables_count = len(tables) if tables else 0 + + # 获取表格内容摘要(用于 AI 理解源文档结构) + tables_summary = "" + if tables: + tables_summary = "\n【文档中的表格】:\n" + for idx, table in enumerate(tables[:5]): # 最多5个表格 + if isinstance(table, dict): + headers = table.get("headers", []) + rows = table.get("rows", []) + if headers: + tables_summary += f"表格{idx+1}表头: {', '.join(str(h) for h in headers)}\n" + if rows: + tables_summary += f"表格{idx+1}前3行: " + for row_idx, row in enumerate(rows[:3]): + if isinstance(row, list): + tables_summary += " | ".join(str(c) for c in row) + "; " + elif isinstance(row, dict): + tables_summary += " | ".join(str(row.get(h, "")) for h in headers if headers) + "; " + tables_summary += "\n" + source_contents.append({ "filename": sf.filename, "doc_type": sf_ext, - "content": parse_result.data.get("content", "")[:5000] if parse_result.data.get("content") else "", - "titles": parse_result.data.get("titles", [])[:10] if parse_result.data.get("titles") else [], - "tables_count": len(parse_result.data.get("tables", [])) if parse_result.data.get("tables") else 0 + "content": content, + "titles": titles, + "tables_count": tables_count, + "tables_summary": tables_summary }) + logger.info(f"[DEBUG] source_contents built: filename={sf.filename}, content_len={len(content)}, titles_count={len(titles)}, tables_count={tables_count}") + if tables_summary: + logger.info(f"[DEBUG] tables_summary preview: {tables_summary[:300]}") except Exception as e: logger.warning(f"解析源文档失败 {sf.filename}: {e}") @@ -365,12 +403,23 @@ async def fill_template( for f in request.template_fields ] + # 从 template_id 提取文件类型 + template_file_type = "xlsx" # 默认类型 + if request.template_id: + ext = request.template_id.split('.')[-1].lower() + if ext in ["xlsx", "xls"]: + template_file_type = "xlsx" + elif ext == "docx": + template_file_type = "docx" + # 执行填写 result = await template_fill_service.fill_template( template_fields=fields, source_doc_ids=request.source_doc_ids, source_file_paths=request.source_file_paths, - user_hint=request.user_hint + user_hint=request.user_hint, + template_id=request.template_id, + template_file_type=template_file_type ) return result diff --git a/backend/app/services/template_fill_service.py b/backend/app/services/template_fill_service.py index 5ed3781..13db9a2 100644 --- a/backend/app/services/template_fill_service.py +++ b/backend/app/services/template_fill_service.py @@ -60,7 +60,9 @@ class TemplateFillService: template_fields: List[TemplateField], source_doc_ids: Optional[List[str]] = None, source_file_paths: Optional[List[str]] = None, - user_hint: Optional[str] = None + user_hint: Optional[str] = None, + template_id: Optional[str] = None, + template_file_type: Optional[str] = "xlsx" ) -> Dict[str, Any]: """ 填写表格模板 @@ -70,6 +72,8 @@ class TemplateFillService: source_doc_ids: 源文档 MongoDB ID 列表 source_file_paths: 源文档文件路径列表 user_hint: 用户提示(如"请从合同文档中提取") + template_id: 模板文件路径(用于重新生成表头) + template_file_type: 模板文件类型 Returns: 填写结果 @@ -94,6 +98,78 @@ class TemplateFillService: if not source_docs: logger.warning("没有找到源文档,填表结果将全部为空") + # 3. 检查是否需要使用源文档重新生成表头 + # 条件:源文档已加载 AND 现有字段看起来是自动生成的(如"字段1"、"字段2") + needs_regenerate_headers = ( + len(source_docs) > 0 and + len(template_fields) > 0 and + all(self._is_auto_generated_field(f.name) for f in template_fields) + ) + + if needs_regenerate_headers: + logger.info(f"检测到自动生成表头,尝试使用源文档重新生成... (当前字段: {[f.name for f in template_fields]})") + + # 将 SourceDocument 转换为 source_contents 格式 + source_contents = [] + for doc in source_docs: + structured = doc.structured_data if doc.structured_data else {} + + # 获取标题 + titles = structured.get("titles", []) + if not titles: + titles = [] + + # 获取表格 + tables = structured.get("tables", []) + tables_count = len(tables) if tables else 0 + + # 生成表格摘要 + tables_summary = "" + if tables: + tables_summary = "\n【文档中的表格】:\n" + for idx, table in enumerate(tables[:5]): + if isinstance(table, dict): + headers = table.get("headers", []) + rows = table.get("rows", []) + if headers: + tables_summary += f"表格{idx+1}表头: {', '.join(str(h) for h in headers)}\n" + if rows: + tables_summary += f"表格{idx+1}前3行: " + for row_idx, row in enumerate(rows[:3]): + if isinstance(row, list): + tables_summary += " | ".join(str(c) for c in row) + "; " + elif isinstance(row, dict): + tables_summary += " | ".join(str(row.get(h, "")) for h in headers if headers) + "; " + tables_summary += "\n" + + source_contents.append({ + "filename": doc.filename, + "doc_type": doc.doc_type, + "content": doc.content[:5000] if doc.content else "", + "titles": titles[:10] if titles else [], + "tables_count": tables_count, + "tables_summary": tables_summary + }) + + # 使用源文档内容重新生成表头 + if template_id and template_file_type: + logger.info(f"使用源文档重新生成表头: template_id={template_id}, template_file_type={template_file_type}") + new_fields = await self.get_template_fields_from_file( + template_id, + template_file_type, + source_contents=source_contents + ) + if new_fields and len(new_fields) > 0: + logger.info(f"成功重新生成表头: {[f.name for f in new_fields]}") + template_fields = new_fields + else: + logger.warning("重新生成表头返回空结果,使用原始字段") + else: + logger.warning("无法重新生成表头:缺少 template_id 或 template_file_type") + else: + if source_docs and template_fields: + logger.info(f"表头看起来正常(非自动生成),无需重新生成: {[f.name for f in template_fields[:5]]}") + # 2. 对每个字段进行提取 for idx, field in enumerate(template_fields): try: @@ -1532,7 +1608,10 @@ class TemplateFillService: # 调用 AI 生成表头 # 根据源文档内容生成表头 source_info = "" + logger.info(f"[DEBUG] _generate_fields_with_ai received source_contents: {len(source_contents) if source_contents else 0} items") if source_contents: + for sc in source_contents: + logger.info(f"[DEBUG] source doc: filename={sc.get('filename')}, content_len={len(sc.get('content', ''))}, titles={len(sc.get('titles', []))}, tables_count={sc.get('tables_count', 0)}, has_tables_summary={bool(sc.get('tables_summary'))}") source_info = "\n\n【源文档内容摘要】(根据以下文档内容生成表头):\n" for idx, src in enumerate(source_contents[:5]): # 最多5个源文档 filename = src.get("filename", f"文档{idx+1}") @@ -1540,13 +1619,24 @@ class TemplateFillService: content = src.get("content", "")[:3000] # 限制内容长度 titles = src.get("titles", [])[:10] # 最多10个标题 tables_count = src.get("tables_count", 0) + tables_summary = src.get("tables_summary", "") source_info += f"\n--- 文档 {idx+1}: {filename} ({doc_type}) ---\n" + # 处理 titles(可能是字符串列表或字典列表) if titles: - source_info += f"【章节标题】: {', '.join([t.get('text', '') for t in titles[:5]])}\n" + title_texts = [] + for t in titles[:5]: + if isinstance(t, dict): + title_texts.append(t.get('text', '')) + else: + title_texts.append(str(t)) + if title_texts: + source_info += f"【章节标题】: {', '.join(title_texts)}\n" if tables_count > 0: source_info += f"【包含表格数】: {tables_count}\n" - if content: + if tables_summary: + source_info += f"{tables_summary}\n" + elif content: source_info += f"【内容预览】: {content[:1500]}...\n" prompt = f"""你是一个专业的表格设计助手。请根据源文档内容生成合适的表格表头字段。