Files
FilesReadSystem/logs/docx_parser_and_template_fill.patch
2026-04-08 20:27:24 +08:00

855 lines
30 KiB
Diff
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

diff --git a/backend/app/api/endpoints/templates.py b/backend/app/api/endpoints/templates.py
index 572d56e..706f281 100644
--- a/backend/app/api/endpoints/templates.py
+++ b/backend/app/api/endpoints/templates.py
@@ -13,7 +13,7 @@ import pandas as pd
from pydantic import BaseModel
from app.services.template_fill_service import template_fill_service, TemplateField
-from app.services.excel_storage_service import excel_storage_service
+from app.services.file_service import file_service
logger = logging.getLogger(__name__)
@@ -28,13 +28,15 @@ class TemplateFieldRequest(BaseModel):
name: str
field_type: str = "text"
required: bool = True
+ hint: str = ""
class FillRequest(BaseModel):
"""填写请求"""
template_id: str
template_fields: List[TemplateFieldRequest]
- source_doc_ids: Optional[List[str]] = None
+ source_doc_ids: Optional[List[str]] = None # MongoDB 文档 ID 列表
+ source_file_paths: Optional[List[str]] = None # 源文档文件路径列表
user_hint: Optional[str] = None
@@ -71,7 +73,6 @@ async def upload_template(
try:
# 保存文件
- from app.services.file_service import file_service
content = await file.read()
saved_path = file_service.save_uploaded_file(
content,
@@ -87,7 +88,7 @@ async def upload_template(
return {
"success": True,
- "template_id": saved_path, # 使用文件路径作为ID
+ "template_id": saved_path,
"filename": file.filename,
"file_type": file_ext,
"fields": [
@@ -95,7 +96,8 @@ async def upload_template(
"cell": f.cell,
"name": f.name,
"field_type": f.field_type,
- "required": f.required
+ "required": f.required,
+ "hint": f.hint
}
for f in template_fields
],
@@ -135,7 +137,8 @@ async def extract_template_fields(
"cell": f.cell,
"name": f.name,
"field_type": f.field_type,
- "required": f.required
+ "required": f.required,
+ "hint": f.hint
}
for f in fields
]
@@ -153,7 +156,7 @@ async def fill_template(
"""
执行表格填写
- 根据提供的字段定义,从已上传的文档中检索信息并填写
+ 根据提供的字段定义,从源文档中检索信息并填写
Args:
request: 填写请求
@@ -168,7 +171,8 @@ async def fill_template(
cell=f.cell,
name=f.name,
field_type=f.field_type,
- required=f.required
+ required=f.required,
+ hint=f.hint
)
for f in request.template_fields
]
@@ -177,6 +181,7 @@ async def fill_template(
result = await template_fill_service.fill_template(
template_fields=fields,
source_doc_ids=request.source_doc_ids,
+ source_file_paths=request.source_file_paths,
user_hint=request.user_hint
)
@@ -194,6 +199,8 @@ async def export_filled_template(
"""
导出填写后的表格
+ 支持 Excel (.xlsx) 和 Word (.docx) 格式
+
Args:
request: 导出请求
@@ -201,25 +208,124 @@ async def export_filled_template(
文件流
"""
try:
- # 创建 DataFrame
- df = pd.DataFrame([request.filled_data])
+ if request.format == "xlsx":
+ return await _export_to_excel(request.filled_data, request.template_id)
+ elif request.format == "docx":
+ return await _export_to_word(request.filled_data, request.template_id)
+ else:
+ raise HTTPException(
+ status_code=400,
+ detail=f"不支持的导出格式: {request.format},仅支持 xlsx/docx"
+ )
- # 导出为 Excel
- output = io.BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- df.to_excel(writer, index=False, sheet_name='填写结果')
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"导出失败: {str(e)}")
+ raise HTTPException(status_code=500, detail=f"导出失败: {str(e)}")
- output.seek(0)
- # 生成文件名
- filename = f"filled_template.{request.format}"
+async def _export_to_excel(filled_data: dict, template_id: str) -> StreamingResponse:
+ """导出为 Excel 格式"""
+ # 将字典转换为单行 DataFrame
+ df = pd.DataFrame([filled_data])
- return StreamingResponse(
- io.BytesIO(output.getvalue()),
- media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- headers={"Content-Disposition": f"attachment; filename={filename}"}
- )
+ output = io.BytesIO()
+ with pd.ExcelWriter(output, engine='openpyxl') as writer:
+ df.to_excel(writer, index=False, sheet_name='填写结果')
- except Exception as e:
- logger.error(f"导出失败: {str(e)}")
- raise HTTPException(status_code=500, detail=f"导出失败: {str(e)}")
+ output.seek(0)
+
+ filename = f"filled_template.xlsx"
+
+ return StreamingResponse(
+ io.BytesIO(output.getvalue()),
+ media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+ headers={"Content-Disposition": f"attachment; filename={filename}"}
+ )
+
+
+async def _export_to_word(filled_data: dict, template_id: str) -> StreamingResponse:
+ """导出为 Word 格式"""
+ from docx import Document
+ from docx.shared import Pt, RGBColor
+ from docx.enum.text import WD_ALIGN_PARAGRAPH
+
+ doc = Document()
+
+ # 添加标题
+ title = doc.add_heading('填写结果', level=1)
+ title.alignment = WD_ALIGN_PARAGRAPH.CENTER
+
+ # 添加填写时间和模板信息
+ from datetime import datetime
+ info_para = doc.add_paragraph()
+ info_para.add_run(f"模板ID: {template_id}\n").bold = True
+ info_para.add_run(f"导出时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+
+ doc.add_paragraph() # 空行
+
+ # 添加字段表格
+ table = doc.add_table(rows=1, cols=3)
+ table.style = 'Light Grid Accent 1'
+
+ # 表头
+ header_cells = table.rows[0].cells
+ header_cells[0].text = '字段名'
+ header_cells[1].text = '填写值'
+ header_cells[2].text = '状态'
+
+ for field_name, field_value in filled_data.items():
+ row_cells = table.add_row().cells
+ row_cells[0].text = field_name
+ row_cells[1].text = str(field_value) if field_value else ''
+ row_cells[2].text = '已填写' if field_value else '为空'
+
+ # 保存到 BytesIO
+ output = io.BytesIO()
+ doc.save(output)
+ output.seek(0)
+
+ filename = f"filled_template.docx"
+
+ return StreamingResponse(
+ io.BytesIO(output.getvalue()),
+ media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
+ headers={"Content-Disposition": f"attachment; filename={filename}"}
+ )
+
+
+@router.post("/export/excel")
+async def export_to_excel(
+ filled_data: dict,
+ template_id: str = Query(..., description="模板ID")
+):
+ """
+ 专门导出为 Excel 格式
+
+ Args:
+ filled_data: 填写数据
+ template_id: 模板ID
+
+ Returns:
+ Excel 文件流
+ """
+ return await _export_to_excel(filled_data, template_id)
+
+
+@router.post("/export/word")
+async def export_to_word(
+ filled_data: dict,
+ template_id: str = Query(..., description="模板ID")
+):
+ """
+ 专门导出为 Word 格式
+
+ Args:
+ filled_data: 填写数据
+ template_id: 模板ID
+
+ Returns:
+ Word 文件流
+ """
+ return await _export_to_word(filled_data, template_id)
diff --git a/backend/app/core/document_parser/docx_parser.py b/backend/app/core/document_parser/docx_parser.py
index 75e79da..03c341d 100644
--- a/backend/app/core/document_parser/docx_parser.py
+++ b/backend/app/core/document_parser/docx_parser.py
@@ -161,3 +161,133 @@ class DocxParser(BaseParser):
fields[field_name] = match.group(1)
return fields
+
+ def parse_tables_for_template(
+ self,
+ file_path: str
+ ) -> Dict[str, Any]:
+ """
+ 解析 Word 文档中的表格,提取模板字段
+
+ 专门用于比赛场景:解析表格模板,识别需要填写的字段
+
+ Args:
+ file_path: Word 文件路径
+
+ Returns:
+ 包含表格字段信息的字典
+ """
+ from docx import Document
+ from docx.table import Table
+ from docx.oxml.ns import qn
+
+ doc = Document(file_path)
+
+ template_info = {
+ "tables": [],
+ "fields": [],
+ "field_count": 0
+ }
+
+ for table_idx, table in enumerate(doc.tables):
+ table_info = {
+ "table_index": table_idx,
+ "rows": [],
+ "headers": [],
+ "data_rows": [],
+ "field_hints": {} # 字段名称 -> 提示词/描述
+ }
+
+ # 提取表头(第一行)
+ if table.rows:
+ header_cells = [cell.text.strip() for cell in table.rows[0].cells]
+ table_info["headers"] = header_cells
+
+ # 提取数据行
+ for row_idx, row in enumerate(table.rows[1:], 1):
+ row_data = [cell.text.strip() for cell in row.cells]
+ table_info["data_rows"].append(row_data)
+ table_info["rows"].append({
+ "row_index": row_idx,
+ "cells": row_data
+ })
+
+ # 尝试从第二列/第三列提取提示词
+ # 比赛模板通常格式为:字段名 | 提示词 | 填写值
+ if len(table.rows[0].cells) >= 2:
+ for row_idx, row in enumerate(table.rows[1:], 1):
+ cells = [cell.text.strip() for cell in row.cells]
+ if len(cells) >= 2 and cells[0]:
+ # 第一列是字段名
+ field_name = cells[0]
+ # 第二列可能是提示词或描述
+ hint = cells[1] if len(cells) > 1 else ""
+ table_info["field_hints"][field_name] = hint
+
+ template_info["fields"].append({
+ "table_index": table_idx,
+ "row_index": row_idx,
+ "field_name": field_name,
+ "hint": hint,
+ "expected_value": cells[2] if len(cells) > 2 else ""
+ })
+
+ template_info["tables"].append(table_info)
+
+ template_info["field_count"] = len(template_info["fields"])
+ return template_info
+
+ def extract_template_fields_from_docx(
+ self,
+ file_path: str
+ ) -> List[Dict[str, Any]]:
+ """
+ 从 Word 文档中提取模板字段定义
+
+ 适用于比赛评分表格:表格第一列是字段名,第二列是提示词/填写示例
+
+ Args:
+ file_path: Word 文件路径
+
+ Returns:
+ 字段定义列表
+ """
+ template_info = self.parse_tables_for_template(file_path)
+
+ fields = []
+ for field in template_info["fields"]:
+ fields.append({
+ "cell": f"T{field['table_index']}R{field['row_index']}", # TableXRowY 格式
+ "name": field["field_name"],
+ "hint": field["hint"],
+ "table_index": field["table_index"],
+ "row_index": field["row_index"],
+ "field_type": self._infer_field_type_from_hint(field["hint"]),
+ "required": True
+ })
+
+ return fields
+
+ def _infer_field_type_from_hint(self, hint: str) -> str:
+ """
+ 从提示词推断字段类型
+
+ Args:
+ hint: 字段提示词
+
+ Returns:
+ 字段类型 (text/number/date)
+ """
+ hint_lower = hint.lower()
+
+ # 日期关键词
+ date_keywords = ["年", "月", "日", "日期", "时间", "出生"]
+ if any(kw in hint for kw in date_keywords):
+ return "date"
+
+ # 数字关键词
+ number_keywords = ["数量", "金额", "人数", "面积", "增长", "比率", "%", "率"]
+ if any(kw in hint_lower for kw in number_keywords):
+ return "number"
+
+ return "text"
diff --git a/backend/app/services/template_fill_service.py b/backend/app/services/template_fill_service.py
index 2612354..94930fb 100644
--- a/backend/app/services/template_fill_service.py
+++ b/backend/app/services/template_fill_service.py
@@ -4,13 +4,12 @@
从非结构化文档中检索信息并填写到表格模板
"""
import logging
-from dataclasses import dataclass
+from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from app.core.database import mongodb
-from app.services.rag_service import rag_service
from app.services.llm_service import llm_service
-from app.services.excel_storage_service import excel_storage_service
+from app.core.document_parser import ParserFactory
logger = logging.getLogger(__name__)
@@ -22,6 +21,17 @@ class TemplateField:
name: str # 字段名称
field_type: str = "text" # 字段类型: text/number/date
required: bool = True
+ hint: str = "" # 字段提示词
+
+
+@dataclass
+class SourceDocument:
+ """源文档"""
+ doc_id: str
+ filename: str
+ doc_type: str
+ content: str = ""
+ structured_data: Dict[str, Any] = field(default_factory=dict)
@dataclass
@@ -38,12 +48,12 @@ class TemplateFillService:
def __init__(self):
self.llm = llm_service
- self.rag = rag_service
async def fill_template(
self,
template_fields: List[TemplateField],
source_doc_ids: Optional[List[str]] = None,
+ source_file_paths: Optional[List[str]] = None,
user_hint: Optional[str] = None
) -> Dict[str, Any]:
"""
@@ -51,7 +61,8 @@ class TemplateFillService:
Args:
template_fields: 模板字段列表
- source_doc_ids: 源文档ID列表不指定则从所有文档检索
+ source_doc_ids: 源文档 MongoDB ID 列表
+ source_file_paths: 源文档文件路径列表
user_hint: 用户提示(如"请从合同文档中提取"
Returns:
@@ -60,28 +71,23 @@ class TemplateFillService:
filled_data = {}
fill_details = []
+ # 1. 加载源文档内容
+ source_docs = await self._load_source_documents(source_doc_ids, source_file_paths)
+
+ if not source_docs:
+ logger.warning("没有找到源文档,填表结果将全部为空")
+
+ # 2. 对每个字段进行提取
for field in template_fields:
try:
- # 1. 从 RAG 检索相关上下文
- rag_results = await self._retrieve_context(field.name, user_hint)
-
- if not rag_results:
- # 如果没有检索到结果,尝试直接询问 LLM
- result = FillResult(
- field=field.name,
- value="",
- source="未找到相关数据",
- confidence=0.0
- )
- else:
- # 2. 构建 Prompt 让 LLM 提取信息
- result = await self._extract_field_value(
- field=field,
- rag_context=rag_results,
- user_hint=user_hint
- )
-
- # 3. 存储结果
+ # 从源文档中提取字段值
+ result = await self._extract_field_value(
+ field=field,
+ source_docs=source_docs,
+ user_hint=user_hint
+ )
+
+ # 存储结果
filled_data[field.name] = result.value
fill_details.append({
"field": field.name,
@@ -107,75 +113,113 @@ class TemplateFillService:
return {
"success": True,
"filled_data": filled_data,
- "fill_details": fill_details
+ "fill_details": fill_details,
+ "source_doc_count": len(source_docs)
}
- async def _retrieve_context(
+ async def _load_source_documents(
self,
- field_name: str,
- user_hint: Optional[str] = None
- ) -> List[Dict[str, Any]]:
+ source_doc_ids: Optional[List[str]] = None,
+ source_file_paths: Optional[List[str]] = None
+ ) -> List[SourceDocument]:
"""
- 从 RAG 检索相关上下文
+ 加载源文档内容
Args:
- field_name: 字段名称
- user_hint: 用户提示
+ source_doc_ids: MongoDB 文档 ID 列表
+ source_file_paths: 源文档文件路径列表
Returns:
- 检索结果列表
+ 源文档列表
"""
- # 构建查询文本
- query = field_name
- if user_hint:
- query = f"{user_hint} {field_name}"
-
- # 检索相关文档片段
- results = self.rag.retrieve(query=query, top_k=5)
-
- return results
+ source_docs = []
+
+ # 1. 从 MongoDB 加载文档
+ if source_doc_ids:
+ for doc_id in source_doc_ids:
+ try:
+ doc = await mongodb.get_document(doc_id)
+ if doc:
+ source_docs.append(SourceDocument(
+ doc_id=doc_id,
+ filename=doc.get("metadata", {}).get("original_filename", "unknown"),
+ doc_type=doc.get("doc_type", "unknown"),
+ content=doc.get("content", ""),
+ structured_data=doc.get("structured_data", {})
+ ))
+ logger.info(f"从MongoDB加载文档: {doc_id}")
+ except Exception as e:
+ logger.error(f"从MongoDB加载文档失败 {doc_id}: {str(e)}")
+
+ # 2. 从文件路径加载文档
+ if source_file_paths:
+ for file_path in source_file_paths:
+ try:
+ parser = ParserFactory.get_parser(file_path)
+ result = parser.parse(file_path)
+ if result.success:
+ source_docs.append(SourceDocument(
+ doc_id=file_path,
+ filename=result.metadata.get("filename", file_path.split("/")[-1]),
+ doc_type=result.metadata.get("extension", "unknown").replace(".", ""),
+ content=result.data.get("content", ""),
+ structured_data=result.data.get("structured_data", {})
+ ))
+ logger.info(f"从文件加载文档: {file_path}")
+ except Exception as e:
+ logger.error(f"从文件加载文档失败 {file_path}: {str(e)}")
+
+ return source_docs
async def _extract_field_value(
self,
field: TemplateField,
- rag_context: List[Dict[str, Any]],
+ source_docs: List[SourceDocument],
user_hint: Optional[str] = None
) -> FillResult:
"""
- 使用 LLM 从上下文中提取字段值
+ 使用 LLM 从源文档中提取字段值
Args:
field: 字段定义
- rag_context: RAG 检索到的上下文
+ source_docs: 源文档列表
user_hint: 用户提示
Returns:
提取结果
"""
+ if not source_docs:
+ return FillResult(
+ field=field.name,
+ value="",
+ source="无源文档",
+ confidence=0.0
+ )
+
# 构建上下文文本
- context_text = "\n\n".join([
- f"【文档 {i+1}】\n{doc['content']}"
- for i, doc in enumerate(rag_context)
- ])
+ context_text = self._build_context_text(source_docs, max_length=8000)
+
+ # 构建提示词
+ hint_text = field.hint if field.hint else f"请提取{field.name}的信息"
+ if user_hint:
+ hint_text = f"{user_hint}。{hint_text}"
- # 构建 Prompt
- prompt = f"""你是一个数据提取专家。请根据以下文档内容,提取指定字段的信息。
+ prompt = f"""你是一个专业的数据提取专家。请根据以下文档内容,提取指定字段的信息。
需要提取的字段:
- 字段名称:{field.name}
- 字段类型:{field.field_type}
+- 填写提示:{hint_text}
- 是否必填:{'是' if field.required else '否'}
-{'用户提示:' + user_hint if user_hint else ''}
-
参考文档内容:
{context_text}
请严格按照以下 JSON 格式输出,不要添加任何解释:
{{
"value": "提取到的值,如果没有找到则填写空字符串",
- "source": "数据来源的文档描述",
- "confidence": 0.0到1.0之间的置信度
+ "source": "数据来源的文档描述来自xxx文档",
+ "confidence": 0.0到1.0之间的置信度,表示对提取结果的信心程度"
}}
"""
@@ -226,6 +270,54 @@ class TemplateFillService:
confidence=0.0
)
+ def _build_context_text(self, source_docs: List[SourceDocument], max_length: int = 8000) -> str:
+ """
+ 构建上下文文本
+
+ Args:
+ source_docs: 源文档列表
+ max_length: 最大字符数
+
+ Returns:
+ 上下文文本
+ """
+ contexts = []
+ total_length = 0
+
+ for doc in source_docs:
+ # 优先使用结构化数据(表格),其次使用文本内容
+ doc_content = ""
+
+ if doc.structured_data and doc.structured_data.get("tables"):
+ # 如果有表格数据,优先使用
+ tables = doc.structured_data.get("tables", [])
+ for table in tables:
+ if isinstance(table, dict):
+ rows = table.get("rows", [])
+ if rows:
+ doc_content += f"\n【文档: {doc.filename} 表格数据】\n"
+ for row in rows[:20]: # 限制每表最多20行
+ if isinstance(row, list):
+ doc_content += " | ".join(str(cell) for cell in row) + "\n"
+ elif isinstance(row, dict):
+ doc_content += " | ".join(str(v) for v in row.values()) + "\n"
+ elif doc.content:
+ doc_content = doc.content[:5000] # 限制文本长度
+
+ if doc_content:
+ doc_context = f"【文档: {doc.filename} ({doc.doc_type})】\n{doc_content}"
+ if total_length + len(doc_context) <= max_length:
+ contexts.append(doc_context)
+ total_length += len(doc_context)
+ else:
+ # 如果超出长度,截断
+ remaining = max_length - total_length
+ if remaining > 100:
+ contexts.append(doc_context[:remaining])
+ break
+
+ return "\n\n".join(contexts) if contexts else "(源文档内容为空)"
+
async def get_template_fields_from_file(
self,
file_path: str,
@@ -236,7 +328,7 @@ class TemplateFillService:
Args:
file_path: 模板文件路径
- file_type: 文件类型
+ file_type: 文件类型 (xlsx/xls/docx)
Returns:
字段列表
@@ -245,43 +337,108 @@ class TemplateFillService:
try:
if file_type in ["xlsx", "xls"]:
- # 从 Excel 读取表头
- import pandas as pd
- df = pd.read_excel(file_path, nrows=5)
+ fields = await self._get_template_fields_from_excel(file_path)
+ elif file_type == "docx":
+ fields = await self._get_template_fields_from_docx(file_path)
- for idx, col in enumerate(df.columns):
- # 获取单元格位置 (A, B, C, ...)
- cell = self._column_to_cell(idx)
+ except Exception as e:
+ logger.error(f"提取模板字段失败: {str(e)}")
- fields.append(TemplateField(
- cell=cell,
- name=str(col),
- field_type=self._infer_field_type(df[col]),
- required=True
- ))
+ return fields
- elif file_type == "docx":
- # 从 Word 表格读取
- from docx import Document
- doc = Document(file_path)
-
- for table_idx, table in enumerate(doc.tables):
- for row_idx, row in enumerate(table.rows):
- for col_idx, cell in enumerate(row.cells):
- cell_text = cell.text.strip()
- if cell_text:
- fields.append(TemplateField(
- cell=self._column_to_cell(col_idx),
- name=cell_text,
- field_type="text",
- required=True
- ))
+ async def _get_template_fields_from_excel(self, file_path: str) -> List[TemplateField]:
+ """从 Excel 模板提取字段"""
+ fields = []
+
+ try:
+ import pandas as pd
+ df = pd.read_excel(file_path, nrows=5)
+
+ for idx, col in enumerate(df.columns):
+ cell = self._column_to_cell(idx)
+ col_str = str(col)
+
+ fields.append(TemplateField(
+ cell=cell,
+ name=col_str,
+ field_type=self._infer_field_type_from_value(df[col].iloc[0] if len(df) > 0 else ""),
+ required=True,
+ hint=""
+ ))
except Exception as e:
- logger.error(f"提取模板字段失败: {str(e)}")
+ logger.error(f"从Excel提取字段失败: {str(e)}")
return fields
+ async def _get_template_fields_from_docx(self, file_path: str) -> List[TemplateField]:
+ """从 Word 模板提取字段"""
+ fields = []
+
+ try:
+ from docx import Document
+
+ doc = Document(file_path)
+
+ for table_idx, table in enumerate(doc.tables):
+ for row_idx, row in enumerate(table.rows):
+ cells = [cell.text.strip() for cell in row.cells]
+
+ # 假设第一列是字段名
+ if cells and cells[0]:
+ field_name = cells[0]
+ hint = cells[1] if len(cells) > 1 else ""
+
+ # 跳过空行或标题行
+ if field_name and field_name not in ["", "字段名", "名称", "项目"]:
+ fields.append(TemplateField(
+ cell=f"T{table_idx}R{row_idx}",
+ name=field_name,
+ field_type=self._infer_field_type_from_hint(hint),
+ required=True,
+ hint=hint
+ ))
+
+ except Exception as e:
+ logger.error(f"从Word提取字段失败: {str(e)}")
+
+ return fields
+
+ def _infer_field_type_from_hint(self, hint: str) -> str:
+ """从提示词推断字段类型"""
+ hint_lower = hint.lower()
+
+ date_keywords = ["年", "月", "日", "日期", "时间", "出生"]
+ if any(kw in hint for kw in date_keywords):
+ return "date"
+
+ number_keywords = ["数量", "金额", "人数", "面积", "增长", "比率", "%", "率", "总计", "合计"]
+ if any(kw in hint_lower for kw in number_keywords):
+ return "number"
+
+ return "text"
+
+ def _infer_field_type_from_value(self, value: Any) -> str:
+ """从示例值推断字段类型"""
+ if value is None or value == "":
+ return "text"
+
+ value_str = str(value)
+
+ # 检查日期模式
+ import re
+ if re.search(r'\d{4}[年/-]\d{1,2}[月/-]\d{1,2}', value_str):
+ return "date"
+
+ # 检查数值
+ try:
+ float(value_str.replace(',', '').replace('%', ''))
+ return "number"
+ except ValueError:
+ pass
+
+ return "text"
+
def _column_to_cell(self, col_idx: int) -> str:
"""将列索引转换为单元格列名 (0 -> A, 1 -> B, ...)"""
result = ""
@@ -290,17 +447,6 @@ class TemplateFillService:
col_idx = col_idx // 26 - 1
return result
- def _infer_field_type(self, series) -> str:
- """推断字段类型"""
- import pandas as pd
-
- if pd.api.types.is_numeric_dtype(series):
- return "number"
- elif pd.api.types.is_datetime64_any_dtype(series):
- return "date"
- else:
- return "text"
-
# ==================== 全局单例 ====================