Merge remote changes with RAG service optimization

- Keep user's RAG service integration for faster extraction
- Add remote's word_ai_service support
- Preserve user's parallel extraction and field header optimizations

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dj
2026-04-14 17:25:13 +08:00
14 changed files with 2057 additions and 83 deletions

View File

@@ -10,6 +10,8 @@ import os
from app.services.excel_ai_service import excel_ai_service
from app.services.markdown_ai_service import markdown_ai_service
from app.services.template_fill_service import template_fill_service
from app.services.word_ai_service import word_ai_service
logger = logging.getLogger(__name__)
@@ -340,3 +342,144 @@ async def get_markdown_outline(
except Exception as e:
logger.error(f"获取 Markdown 大纲失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取大纲失败: {str(e)}")
@router.post("/analyze/txt")
async def analyze_txt(
file: UploadFile = File(...),
):
"""
上传并使用 AI 分析 TXT 文本文件,提取结构化数据
将非结构化文本转换为结构化表格数据,便于后续填表使用
Args:
file: 上传的 TXT 文件
Returns:
dict: 分析结果,包含结构化表格数据
"""
if not file.filename:
raise HTTPException(status_code=400, detail="文件名为空")
file_ext = file.filename.split('.')[-1].lower()
if file_ext not in ['txt', 'text']:
raise HTTPException(
status_code=400,
detail=f"不支持的文件类型: {file_ext},仅支持 .txt"
)
try:
# 读取文件内容
content = await file.read()
# 保存到临时文件
with tempfile.NamedTemporaryFile(mode='wb', suffix='.txt', delete=False) as tmp:
tmp.write(content)
tmp_path = tmp.name
try:
logger.info(f"开始 AI 分析 TXT 文件: {file.filename}")
# 使用 template_fill_service 的 AI 分析方法
result = await template_fill_service.analyze_txt_with_ai(
content=content.decode('utf-8', errors='replace'),
filename=file.filename
)
if result:
logger.info(f"TXT AI 分析成功: {file.filename}")
return {
"success": True,
"filename": file.filename,
"structured_data": result
}
else:
logger.warning(f"TXT AI 分析返回空结果: {file.filename}")
return {
"success": False,
"filename": file.filename,
"error": "AI 分析未能提取到结构化数据",
"structured_data": None
}
finally:
# 清理临时文件
if os.path.exists(tmp_path):
os.unlink(tmp_path)
except HTTPException:
raise
except Exception as e:
logger.error(f"TXT AI 分析过程中出错: {str(e)}")
raise HTTPException(status_code=500, detail=f"分析失败: {str(e)}")
# ==================== Word 文档 AI 解析 ====================
@router.post("/analyze/word")
async def analyze_word(
file: UploadFile = File(...),
user_hint: str = Query("", description="用户提示词,如'请提取表格数据'")
):
"""
使用 AI 解析 Word 文档,提取结构化数据
适用于从非结构化的 Word 文档中提取表格数据、键值对等信息
Args:
file: 上传的 Word 文件
user_hint: 用户提示词
Returns:
dict: 包含结构化数据的解析结果
"""
if not file.filename:
raise HTTPException(status_code=400, detail="文件名为空")
file_ext = file.filename.split('.')[-1].lower()
if file_ext not in ['docx']:
raise HTTPException(
status_code=400,
detail=f"不支持的文件类型: {file_ext},仅支持 .docx"
)
try:
# 保存上传的文件
content = await file.read()
suffix = f".{file_ext}"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(content)
tmp_path = tmp.name
try:
# 使用 AI 解析 Word 文档
result = await word_ai_service.parse_word_with_ai(
file_path=tmp_path,
user_hint=user_hint or "请提取文档中的所有结构化数据,包括表格、键值对等"
)
if result.get("success"):
return {
"success": True,
"filename": file.filename,
"result": result
}
else:
return {
"success": False,
"filename": file.filename,
"error": result.get("error", "AI 解析失败"),
"result": None
}
finally:
# 清理临时文件
if os.path.exists(tmp_path):
os.unlink(tmp_path)
except HTTPException:
raise
except Exception as e:
logger.error(f"Word AI 分析过程中出错: {str(e)}")
raise HTTPException(status_code=500, detail=f"分析失败: {str(e)}")

View File

@@ -257,6 +257,50 @@ async def process_document(
structured_data=result.data.get("structured_data")
)
# 如果是 Word 文档,使用 AI 深度解析
if doc_type == "docx":
await redis_db.set_task_status(
task_id, status="processing",
meta={"progress": 40, "message": "正在使用 AI 解析 Word 文档"}
)
try:
from app.services.word_ai_service import word_ai_service
logger.info(f"开始 AI 解析 Word 文档: {original_filename}")
ai_result = await word_ai_service.parse_word_with_ai(
file_path=file_path,
user_hint="请提取文档中的所有结构化数据,包括表格、键值对、列表项等"
)
if ai_result.get("success"):
# 更新 MongoDB 文档,添加 AI 解析结果
ai_data = {
"ai_parsed": True,
"parse_type": ai_result.get("type", "unknown"),
"headers": ai_result.get("headers", []),
"rows": ai_result.get("rows", []),
"tables": ai_result.get("tables", []),
"key_values": ai_result.get("key_values", {}),
"list_items": ai_result.get("list_items", []),
"summary": ai_result.get("summary", ""),
"description": ai_result.get("description", "")
}
await mongodb.update_document(doc_id, {
"ai_analysis": ai_data,
"structured_data": {
**result.data.get("structured_data", {}),
**ai_data
}
})
logger.info(f"Word AI 解析成功: {original_filename}, type={ai_result.get('type')}")
else:
logger.warning(f"Word AI 解析返回失败: {ai_result.get('error')}")
except Exception as e:
logger.error(f"Word AI 解析异常: {str(e)}", exc_info=True)
# 如果是 Excel存储到 MySQL + AI生成描述 + RAG索引
if doc_type in ["xlsx", "xls"]:
await update_task_status(

View File

@@ -89,6 +89,13 @@ class ExportRequest(BaseModel):
format: str = "xlsx" # xlsx 或 docx
class FillAndExportRequest(BaseModel):
"""填充并导出请求 - 直接填充原始模板"""
template_path: str # 模板文件路径
filled_data: dict # 填写数据,格式: {字段名: [值1, 值2, ...]} 或 {字段名: 单个值}
format: str = "xlsx" # xlsx 或 docx
# ==================== 接口实现 ====================
@router.post("/upload")
@@ -692,3 +699,427 @@ async def export_to_word(
Word 文件流
"""
return await _export_to_word(filled_data, template_id)
@router.post("/fill-and-export")
async def fill_and_export_template(
request: FillAndExportRequest,
):
"""
填充原始模板并导出
直接打开原始模板文件,将数据填入模板的表格中,然后导出
Args:
request: 填充并导出请求
Returns:
填充后的模板文件流
"""
import os
logger.info(f"=== fill-and-export 请求 ===")
logger.info(f"template_path: {request.template_path}")
logger.info(f"format: {request.format}")
logger.info(f"filled_data: {request.filled_data}")
logger.info(f"filled_data 类型: {type(request.filled_data)}")
logger.info(f"filled_data 键数量: {len(request.filled_data) if request.filled_data else 0}")
logger.info(f"=========================")
template_path = request.template_path
# 检查模板文件是否存在
if not os.path.exists(template_path):
raise HTTPException(status_code=404, detail=f"模板文件不存在: {template_path}")
file_ext = os.path.splitext(template_path)[1].lower()
try:
if file_ext in ['.xlsx', '.xls']:
return await _fill_and_export_excel(template_path, request.filled_data)
elif file_ext == '.docx':
return await _fill_and_export_word(template_path, request.filled_data)
else:
raise HTTPException(
status_code=400,
detail=f"不支持的模板格式: {file_ext},仅支持 xlsx/xls/docx"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"填充模板失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"填充模板失败: {str(e)}")
async def _fill_and_export_word(template_path: str, filled_data: dict) -> StreamingResponse:
"""
填充原始 Word 模板
打开原始 Word 模板,找到表格,将数据填入对应单元格
Args:
template_path: 模板文件路径
filled_data: 填写数据 {字段名: [值1, 值2, ...]}
Returns:
填充后的 Word 文件流
"""
from docx import Document
logger.info(f"填充 Word 模板: {template_path}")
logger.info(f"填写数据字段: {list(filled_data.keys())}")
# 打开原始模板
doc = Document(template_path)
# 找到第一个表格(比赛模板通常是第一个表格)
if not doc.tables:
logger.warning("Word 模板中没有表格,创建新表格")
# 如果没有表格,创建一个
table = doc.add_table(rows=len(filled_data) + 1, cols=2)
# 表头
header_cells = table.rows[0].cells
header_cells[0].text = '字段名'
header_cells[1].text = '填写值'
# 数据行
for idx, (field_name, values) in enumerate(filled_data.items()):
row_cells = table.rows[idx + 1].cells
row_cells[0].text = field_name
if isinstance(values, list):
row_cells[1].text = '; '.join(str(v) for v in values if v)
else:
row_cells[1].text = str(values) if values else ''
else:
# 填充第一个表格
table = doc.tables[0]
logger.info(f"找到表格,行数: {len(table.rows)}, 列数: {len(table.columns)}")
# 打印表格内容(调试用)
logger.info("=== 表格内容预览 ===")
for row_idx, row in enumerate(table.rows[:5]): # 只打印前5行
row_texts = [cell.text.strip() for cell in row.cells]
logger.info(f"{row_idx}: {row_texts}")
logger.info("========================")
# 构建字段名到列索引的映射
field_to_col = {}
if table.rows:
# 假设第一行是表头
header_row = table.rows[0]
for col_idx, cell in enumerate(header_row.cells):
field_name = cell.text.strip()
if field_name:
field_to_col[field_name] = col_idx
field_to_col[field_name.lower()] = col_idx # 忽略大小写
logger.info(f"表头字段映射: {field_to_col}")
logger.info(f"待填充数据字段: {list(filled_data.keys())}")
# 填充数据
filled_count = 0
for field_name, values in filled_data.items():
# 查找匹配的列
col_idx = field_to_col.get(field_name)
if col_idx is None:
# 尝试模糊匹配
for c_idx in range(len(table.columns)):
header_text = table.rows[0].cells[c_idx].text.strip().lower()
if field_name.lower() in header_text or header_text in field_name.lower():
col_idx = c_idx
logger.info(f"模糊匹配成功: '{field_name}' -> 列 {col_idx}")
break
else:
col_idx = None
if col_idx is not None and col_idx < len(table.columns):
# 填充该列的所有数据行
if isinstance(values, list):
value_str = '; '.join(str(v) for v in values if v)
else:
value_str = str(values) if values else ''
# 填充每一行(从第二行开始,跳过表头)
for row_idx in range(1, min(len(table.rows), len(values) + 1) if isinstance(values, list) else 2):
try:
cell = table.rows[row_idx].cells[col_idx]
if isinstance(values, list) and row_idx - 1 < len(values):
cell.text = str(values[row_idx - 1]) if values[row_idx - 1] else ''
elif not isinstance(values, list):
if row_idx == 1:
cell.text = str(values) if values else ''
except Exception as e:
logger.warning(f"填充单元格失败 [{row_idx}][{col_idx}]: {e}")
filled_count += 1
logger.info(f"✅ 字段 '{field_name}' -> 列 {col_idx}, 值: {value_str[:50]}")
else:
logger.warning(f"❌ 未找到字段 '{field_name}' 对应的列")
logger.info(f"填充完成: {filled_count}/{len(filled_data)} 个字段")
# 保存到 BytesIO
output = io.BytesIO()
doc.save(output)
output.seek(0)
filename = f"filled_template.docx"
logger.info(f"Word 模板填充完成")
return StreamingResponse(
io.BytesIO(output.getvalue()),
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
async def _fill_and_export_excel(template_path: str, filled_data: dict) -> StreamingResponse:
"""
填充原始 Excel 模板
打开原始 Excel 模板,找到对应列,将数据填入
Args:
template_path: 模板文件路径
filled_data: 填写数据 {字段名: [值1, 值2, ...]}
Returns:
填充后的 Excel 文件流
"""
from openpyxl import load_workbook
import os
logger.info(f"填充 Excel 模板: {template_path}")
logger.info(f"填写数据: {list(filled_data.keys())}")
# 检查文件是否存在
if not os.path.exists(template_path):
raise HTTPException(status_code=404, detail=f"模板文件不存在: {template_path}")
# 打开原始模板
wb = load_workbook(template_path)
ws = wb.active # 获取当前工作表
logger.info(f"工作表: {ws.title}, 行数: {ws.max_row}, 列数: {ws.max_column}")
# 读取表头行(假设第一行是表头)
header_row = 1
field_to_col = {}
for col_idx in range(1, ws.max_column + 1):
cell_value = ws.cell(row=header_row, column=col_idx).value
if cell_value:
field_name = str(cell_value).strip()
field_to_col[field_name] = col_idx
field_to_col[field_name.lower()] = col_idx # 忽略大小写
logger.info(f"表头字段映射: {field_to_col}")
# 计算最大行数
max_rows = 1
for values in filled_data.values():
if isinstance(values, list):
max_rows = max(max_rows, len(values))
# 填充数据
for field_name, values in filled_data.items():
# 查找匹配的列
col_idx = field_to_col.get(field_name)
if col_idx is None:
# 尝试模糊匹配
for col_idx in range(1, ws.max_column + 1):
header_text = str(ws.cell(row=header_row, column=col_idx).value or '').strip().lower()
if field_name.lower() in header_text or header_text in field_name.lower():
break
else:
col_idx = None
if col_idx is not None:
# 填充数据(从第二行开始)
if isinstance(values, list):
for row_idx, value in enumerate(values, start=2):
ws.cell(row=row_idx, column=col_idx, value=value if value else '')
else:
ws.cell(row=2, column=col_idx, value=values if values else '')
logger.info(f"字段 {field_name} -> 列 {col_idx}, 值数量: {len(values) if isinstance(values, list) else 1}")
else:
logger.warning(f"未找到字段 {field_name} 对应的列")
# 如果需要扩展行数
current_max_row = ws.max_row
if max_rows > current_max_row - 1: # -1 是因为表头占一行
# 扩展样式(简单复制最后一行)
for row_idx in range(current_max_row + 1, max_rows + 2):
for col_idx in range(1, ws.max_column + 1):
source_cell = ws.cell(row=current_max_row, column=col_idx)
target_cell = ws.cell(row=row_idx, column=col_idx)
# 复制值(如果有对应数据)
if isinstance(filled_data.get(str(ws.cell(row=1, column=col_idx).value), []), list):
data_idx = row_idx - 2
data_list = filled_data.get(str(ws.cell(row=1, column=col_idx).value), [])
if data_idx < len(data_list):
target_cell.value = data_list[data_idx]
# 保存到 BytesIO
output = io.BytesIO()
wb.save(output)
output.seek(0)
# 关闭工作簿
wb.close()
filename = f"filled_template.xlsx"
logger.info(f"Excel 模板填充完成")
return StreamingResponse(
io.BytesIO(output.getvalue()),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
# ==================== Word 文档结构化字段提取接口 ====================
@router.post("/parse-word-structure")
async def parse_word_structure(
file: UploadFile = File(...),
):
"""
上传 Word 文档,提取结构化字段并存入数据库
专门用于比赛场景:从 Word 表格模板中提取字段定义
(字段名、提示词、字段类型等)并存入 MongoDB
Args:
file: 上传的 Word 文件
Returns:
提取的结构化字段信息
"""
if not file.filename:
raise HTTPException(status_code=400, detail="文件名为空")
file_ext = file.filename.split('.')[-1].lower()
if file_ext != 'docx':
raise HTTPException(
status_code=400,
detail=f"不支持的文件类型: {file_ext},仅支持 docx"
)
try:
# 1. 保存文件
content = await file.read()
saved_path = file_service.save_uploaded_file(
content,
file.filename,
subfolder="word_templates"
)
logger.info(f"Word 文件已保存: {saved_path}")
# 2. 解析文档,提取结构化数据
parser = ParserFactory.get_parser(saved_path)
parse_result = parser.parse(saved_path)
if not parse_result.success:
raise HTTPException(status_code=400, detail=parse_result.error)
# 3. 提取表格模板字段
from app.core.document_parser.docx_parser import DocxParser
docx_parser = DocxParser()
template_fields = docx_parser.extract_template_fields_from_docx(saved_path)
logger.info(f"从 Word 文档提取到 {len(template_fields)} 个字段")
# 4. 提取完整的结构化信息
template_info = docx_parser.parse_tables_for_template(saved_path)
# 5. 存储到 MongoDB
doc_id = await mongodb.insert_document(
doc_type="docx",
content=parse_result.data.get("content", ""),
metadata={
**parse_result.metadata,
"original_filename": file.filename,
"file_path": saved_path,
"template_fields": template_fields,
"table_count": len(template_info.get("tables", [])),
"field_count": len(template_fields)
},
structured_data={
**parse_result.data.get("structured_data", {}),
"template_fields": template_fields,
"template_info": template_info
}
)
logger.info(f"Word 文档结构化信息已存入 MongoDB, doc_id: {doc_id}")
# 6. 返回结果
return {
"success": True,
"doc_id": doc_id,
"filename": file.filename,
"file_path": saved_path,
"field_count": len(template_fields),
"fields": template_fields,
"tables": template_info.get("tables", []),
"metadata": {
"paragraph_count": parse_result.metadata.get("paragraph_count", 0),
"table_count": parse_result.metadata.get("table_count", 0),
"word_count": parse_result.metadata.get("word_count", 0),
"has_tables": parse_result.metadata.get("has_tables", False)
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"解析 Word 文档结构失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"解析失败: {str(e)}")
@router.get("/word-fields/{doc_id}")
async def get_word_template_fields(
doc_id: str,
):
"""
根据 doc_id 获取 Word 文档的模板字段信息
Args:
doc_id: MongoDB 文档 ID
Returns:
模板字段信息
"""
try:
doc = await mongodb.get_document(doc_id)
if not doc:
raise HTTPException(status_code=404, detail=f"文档不存在: {doc_id}")
# 从 structured_data 中提取模板字段信息
structured_data = doc.get("structured_data", {})
template_fields = structured_data.get("template_fields", [])
template_info = structured_data.get("template_info", {})
return {
"success": True,
"doc_id": doc_id,
"filename": doc.get("metadata", {}).get("original_filename", ""),
"fields": template_fields,
"tables": template_info.get("tables", []),
"field_count": len(template_fields),
"metadata": doc.get("metadata", {})
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取 Word 模板字段失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取失败: {str(e)}")