diff --git a/backend/.env.example b/backend/.env.example index 7e23672..b10fd86 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -37,8 +37,8 @@ LLM_MODEL_NAME="MiniMax-Text-01" # 上传文件存储目录 (相对于项目根目录) UPLOAD_DIR="./data/uploads" -# ChromaDB 向量数据库持久化目录 -CHROMADB_PERSIST_DIR="./data/chromadb" +# Faiss 向量数据库持久化目录 (LangChain + Faiss 实现) +FAISS_INDEX_DIR="./data/faiss" # ==================== RAG 配置 ==================== # Embedding 模型名称 diff --git a/backend/app/api/__pycache__/__init__.cpython-312.pyc b/backend/app/api/__pycache__/__init__.cpython-312.pyc index ee51b9a..1a3fce8 100644 Binary files a/backend/app/api/__pycache__/__init__.cpython-312.pyc and b/backend/app/api/__pycache__/__init__.cpython-312.pyc differ diff --git a/backend/app/api/endpoints/__pycache__/documents.cpython-312.pyc b/backend/app/api/endpoints/__pycache__/documents.cpython-312.pyc new file mode 100644 index 0000000..ec1367a Binary files /dev/null and b/backend/app/api/endpoints/__pycache__/documents.cpython-312.pyc differ diff --git a/backend/app/api/endpoints/documents.py b/backend/app/api/endpoints/documents.py index ba27bff..a0bd91c 100644 --- a/backend/app/api/endpoints/documents.py +++ b/backend/app/api/endpoints/documents.py @@ -2,17 +2,20 @@ 文档管理 API 接口 支持多格式文档(docx/xlsx/md/txt)上传、解析、存储和RAG索引 +集成 Excel 存储和 AI 生成字段描述 """ +import logging import uuid -from datetime import datetime from typing import List, Optional from fastapi import APIRouter, UploadFile, File, HTTPException, Query, BackgroundTasks from pydantic import BaseModel from app.services.file_service import file_service -from app.core.database import mongodb, mysql_db +from app.core.database import mongodb, redis_db from app.services.rag_service import rag_service +from app.services.table_rag_service import table_rag_service +from app.services.excel_storage_service import excel_storage_service from app.core.document_parser import ParserFactory, ParseResult logger = logging.getLogger(__name__) @@ -31,7 +34,7 @@ class UploadResponse(BaseModel): class TaskStatusResponse(BaseModel): task_id: str - status: str # pending, processing, success, failure + status: str progress: int = 0 message: Optional[str] = None result: Optional[dict] = None @@ -44,7 +47,6 @@ class TaskStatusResponse(BaseModel): async def upload_document( background_tasks: BackgroundTasks, file: UploadFile = File(...), - doc_type: Optional[str] = Query(None, description="文档类型: docx/xlsx/md/txt"), parse_all_sheets: bool = Query(False, description="是否解析所有工作表(仅Excel)"), sheet_name: Optional[str] = Query(None, description="指定工作表(仅Excel)"), header_row: int = Query(0, description="表头行号(仅Excel)") @@ -56,13 +58,15 @@ async def upload_document( 1. 保存到本地存储 2. 解析内容 3. 存入 MongoDB (原始内容) - 4. 如果是 Excel,存入 MySQL (结构化数据) - 5. 建立 RAG 索引 + 4. 如果是 Excel: + - 存入 MySQL (结构化数据) + - AI 生成字段描述 + - 建立 RAG 索引 + 5. 建立 RAG 索引 (非结构化文档) """ if not file.filename: raise HTTPException(status_code=400, detail="文件名为空") - # 根据扩展名确定文档类型 file_ext = file.filename.split('.')[-1].lower() if file_ext not in ['docx', 'xlsx', 'xls', 'md', 'txt']: raise HTTPException( @@ -70,21 +74,16 @@ async def upload_document( detail=f"不支持的文件类型: {file_ext},仅支持 docx/xlsx/xls/md/txt" ) - # 生成任务ID task_id = str(uuid.uuid4()) try: - # 读取文件内容 content = await file.read() - - # 保存文件 saved_path = file_service.save_uploaded_file( content, file.filename, subfolder=file_ext ) - # 后台处理文档 background_tasks.add_task( process_document, task_id=task_id, @@ -114,13 +113,8 @@ async def upload_document( async def upload_documents( background_tasks: BackgroundTasks, files: List[UploadFile] = File(...), - doc_type: Optional[str] = Query(None, description="文档类型") ): - """ - 批量上传文档 - - 所有文档会异步处理,处理完成后可通过 task_id 查询状态 - """ + """批量上传文档""" if not files: raise HTTPException(status_code=400, detail="没有上传文件") @@ -131,25 +125,15 @@ async def upload_documents( for file in files: if not file.filename: continue - content = await file.read() - saved_path = file_service.save_uploaded_file( - content, - file.filename, - subfolder="batch" - ) + saved_path = file_service.save_uploaded_file(content, file.filename, subfolder="batch") saved_paths.append({ "path": saved_path, "filename": file.filename, "ext": file.filename.split('.')[-1].lower() }) - # 后台处理所有文档 - background_tasks.add_task( - process_documents_batch, - task_id=task_id, - files=saved_paths - ) + background_tasks.add_task(process_documents_batch, task_id=task_id, files=saved_paths) return UploadResponse( task_id=task_id, @@ -173,13 +157,10 @@ async def process_document( parse_options: dict ): """处理单个文档""" - from app.core.database import redis_db - try: - # 更新状态: 处理中 + # 状态: 解析中 await redis_db.set_task_status( - task_id, - status="processing", + task_id, status="processing", meta={"progress": 10, "message": "正在解析文档"} ) @@ -190,11 +171,10 @@ async def process_document( if not result.success: raise Exception(result.error or "解析失败") - # 更新状态: 存储数据 + # 状态: 存储中 await redis_db.set_task_status( - task_id, - status="processing", - meta={"progress": 40, "message": "正在存储数据"} + task_id, status="processing", + meta={"progress": 30, "message": "正在存储数据"} ) # 存储到 MongoDB @@ -209,24 +189,53 @@ async def process_document( structured_data=result.data.get("structured_data") ) - # 如果是 Excel,存储到 MySQL + # 如果是 Excel,存储到 MySQL + AI生成描述 + RAG索引 if doc_type in ["xlsx", "xls"]: - await store_excel_to_mysql(file_path, original_filename, result) + await redis_db.set_task_status( + task_id, status="processing", + meta={"progress": 50, "message": "正在存储到MySQL并生成字段描述"} + ) - # 更新状态: 建立 RAG 索引 + # 使用 TableRAG 服务完成建表和RAG索引 + rag_result = await table_rag_service.build_table_rag_index( + file_path=file_path, + filename=original_filename, + sheet_name=parse_options.get("sheet_name"), + header_row=parse_options.get("header_row", 0) + ) + + if rag_result.get("success"): + logger.info(f"RAG索引构建成功: {original_filename}") + else: + logger.warning(f"RAG索引构建失败: {rag_result.get('error')}") + + else: + # 非结构化文档 + await redis_db.set_task_status( + task_id, status="processing", + meta={"progress": 60, "message": "正在建立索引"} + ) + + # 如果文档中有表格数据,提取并存储到 MySQL + RAG + structured_data = result.data.get("structured_data", {}) + tables = structured_data.get("tables", []) + + if tables: + # 对每个表格建立 MySQL 表和 RAG 索引 + for table_info in tables: + await table_rag_service.index_document_table( + doc_id=doc_id, + filename=original_filename, + table_data=table_info, + source_doc_type=doc_type + ) + + # 同时对文档内容建立 RAG 索引 + await index_document_to_rag(doc_id, original_filename, result, doc_type) + + # 完成 await redis_db.set_task_status( - task_id, - status="processing", - meta={"progress": 70, "message": "正在建立索引"} - ) - - # 建立 RAG 索引 - await index_document_to_rag(doc_id, original_filename, result, doc_type) - - # 更新状态: 完成 - await redis_db.set_task_status( - task_id, - status="success", + task_id, status="success", meta={ "progress": 100, "message": "处理完成", @@ -244,20 +253,16 @@ async def process_document( except Exception as e: logger.error(f"文档处理失败: {str(e)}") await redis_db.set_task_status( - task_id, - status="failure", + task_id, status="failure", meta={"error": str(e)} ) async def process_documents_batch(task_id: str, files: List[dict]): """批量处理文档""" - from app.core.database import redis_db - try: await redis_db.set_task_status( - task_id, - status="processing", + task_id, status="processing", meta={"progress": 0, "message": "开始批量处理"} ) @@ -278,6 +283,29 @@ async def process_documents_batch(task_id: str, files: List[dict]): }, structured_data=result.data.get("structured_data") ) + + # Excel 处理 + if file_info["ext"] in ["xlsx", "xls"]: + await table_rag_service.build_table_rag_index( + file_path=file_info["path"], + filename=file_info["filename"] + ) + else: + # 非结构化文档:处理其中的表格 + 内容索引 + structured_data = result.data.get("structured_data", {}) + tables = structured_data.get("tables", []) + + if tables: + for table_info in tables: + await table_rag_service.index_document_table( + doc_id=doc_id, + filename=file_info["filename"], + table_data=table_info, + source_doc_type=file_info["ext"] + ) + + await index_document_to_rag(doc_id, file_info["filename"], result, file_info["ext"]) + results.append({"filename": file_info["filename"], "doc_id": doc_id, "success": True}) else: results.append({"filename": file_info["filename"], "success": False, "error": result.error}) @@ -285,61 +313,38 @@ async def process_documents_batch(task_id: str, files: List[dict]): except Exception as e: results.append({"filename": file_info["filename"], "success": False, "error": str(e)}) - # 更新进度 progress = int((i + 1) / len(files) * 100) await redis_db.set_task_status( - task_id, - status="processing", + task_id, status="processing", meta={"progress": progress, "message": f"已处理 {i+1}/{len(files)}"} ) await redis_db.set_task_status( - task_id, - status="success", + task_id, status="success", meta={"progress": 100, "message": "批量处理完成", "results": results} ) except Exception as e: logger.error(f"批量处理失败: {str(e)}") await redis_db.set_task_status( - task_id, - status="failure", + task_id, status="failure", meta={"error": str(e)} ) -async def store_excel_to_mysql(file_path: str, filename: str, result: ParseResult): - """将 Excel 数据存储到 MySQL""" - # TODO: 实现 Excel 数据到 MySQL 的转换和存储 - # 需要根据表头动态创建表结构 - pass - - async def index_document_to_rag(doc_id: str, filename: str, result: ParseResult, doc_type: str): - """将文档索引到 RAG""" + """将非结构化文档索引到 RAG""" try: - if doc_type in ["xlsx", "xls"]: - # Excel 文件: 索引字段信息 - columns = result.metadata.get("columns", []) - for col in columns: - rag_service.index_field( - table_name=filename, - field_name=col, - field_description=f"Excel表格 {filename} 的列 {col}", - sample_values=None - ) - else: - # 其他文档: 索引文档内容 - content = result.data.get("content", "") - if content: - rag_service.index_document_content( - doc_id=doc_id, - content=content[:5000], # 限制长度 - metadata={ - "filename": filename, - "doc_type": doc_type - } - ) + content = result.data.get("content", "") + if content: + rag_service.index_document_content( + doc_id=doc_id, + content=content[:5000], + metadata={ + "filename": filename, + "doc_type": doc_type + } + ) except Exception as e: logger.warning(f"RAG 索引失败: {str(e)}") @@ -365,7 +370,3 @@ async def parse_uploaded_document( except Exception as e: logger.error(f"解析文档失败: {str(e)}") raise HTTPException(status_code=500, detail=f"解析失败: {str(e)}") - - -# 需要添加 import -import logging diff --git a/backend/app/api/endpoints/templates.py b/backend/app/api/endpoints/templates.py index 2248b1c..572d56e 100644 --- a/backend/app/api/endpoints/templates.py +++ b/backend/app/api/endpoints/templates.py @@ -4,6 +4,7 @@ 提供模板上传、解析和填写功能 """ import io +import logging from typing import List, Optional from fastapi import APIRouter, File, HTTPException, Query, UploadFile @@ -222,7 +223,3 @@ async def export_filled_template( except Exception as e: logger.error(f"导出失败: {str(e)}") raise HTTPException(status_code=500, detail=f"导出失败: {str(e)}") - - -# ==================== 需要添加的 import ==================== -import logging diff --git a/backend/app/config.py b/backend/app/config.py index dc80837..5e0e871 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -34,7 +34,7 @@ class Settings(BaseSettings): UPLOAD_DIR: str = "data/uploads" # ==================== RAG/向量数据库配置 ==================== - CHROMADB_PERSIST_DIR: str = "data/chromadb" + FAISS_INDEX_DIR: str = "data/faiss" # 允许 Pydantic 从 .env 文件读取 model_config = SettingsConfigDict( diff --git a/backend/app/core/database/__pycache__/__init__.cpython-312.pyc b/backend/app/core/database/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..d0c9f86 Binary files /dev/null and b/backend/app/core/database/__pycache__/__init__.cpython-312.pyc differ diff --git a/backend/app/core/database/__pycache__/mysql.cpython-312.pyc b/backend/app/core/database/__pycache__/mysql.cpython-312.pyc new file mode 100644 index 0000000..0f1ebba Binary files /dev/null and b/backend/app/core/database/__pycache__/mysql.cpython-312.pyc differ diff --git a/backend/app/core/document_parser/__pycache__/__init__.cpython-312.pyc b/backend/app/core/document_parser/__pycache__/__init__.cpython-312.pyc index b60eb18..25edcb6 100644 Binary files a/backend/app/core/document_parser/__pycache__/__init__.cpython-312.pyc and b/backend/app/core/document_parser/__pycache__/__init__.cpython-312.pyc differ diff --git a/backend/app/core/document_parser/__pycache__/docx_parser.cpython-312.pyc b/backend/app/core/document_parser/__pycache__/docx_parser.cpython-312.pyc new file mode 100644 index 0000000..5c1d8b0 Binary files /dev/null and b/backend/app/core/document_parser/__pycache__/docx_parser.cpython-312.pyc differ diff --git a/backend/app/core/document_parser/__pycache__/md_parser.cpython-312.pyc b/backend/app/core/document_parser/__pycache__/md_parser.cpython-312.pyc new file mode 100644 index 0000000..5793eb6 Binary files /dev/null and b/backend/app/core/document_parser/__pycache__/md_parser.cpython-312.pyc differ diff --git a/backend/app/core/document_parser/__pycache__/txt_parser.cpython-312.pyc b/backend/app/core/document_parser/__pycache__/txt_parser.cpython-312.pyc new file mode 100644 index 0000000..6f5c39b Binary files /dev/null and b/backend/app/core/document_parser/__pycache__/txt_parser.cpython-312.pyc differ diff --git a/backend/app/services/excel_storage_service.py b/backend/app/services/excel_storage_service.py index 5f348e1..eb6d98a 100644 --- a/backend/app/services/excel_storage_service.py +++ b/backend/app/services/excel_storage_service.py @@ -246,6 +246,150 @@ class ExcelStorageService: logger.error(f"存储 Excel 到 MySQL 失败: {str(e)}") return {"success": False, "error": str(e)} + async def store_structured_data( + self, + table_name: str, + data: Dict[str, Any], + source_doc_id: str = None + ) -> Dict[str, Any]: + """ + 将结构化数据(从非结构化文档提取的表格)存储到 MySQL + + Args: + table_name: 表名 + data: 结构化数据,格式为: + { + "columns": ["col1", "col2"], # 列名 + "rows": [["val1", "val2"], ["val3", "val4"]] # 数据行 + } + source_doc_id: 源文档 ID + + Returns: + 存储结果 + """ + results = { + "success": True, + "table_name": table_name, + "row_count": 0, + "columns": [] + } + + try: + columns = data.get("columns", []) + rows = data.get("rows", []) + + if not columns or not rows: + return {"success": False, "error": "数据为空"} + + # 清理列名 + sanitized_columns = [self._sanitize_column_name(c) for c in columns] + + # 推断列类型 + column_types = {} + for i, col in enumerate(columns): + col_values = [row[i] for row in rows if i < len(row)] + # 根据数据推断类型 + col_type = self._infer_type_from_values(col_values) + column_types[col] = col_type + results["columns"].append({ + "original_name": col, + "sanitized_name": self._sanitize_column_name(col), + "type": col_type + }) + + # 创建表 + model_class = self._create_table_model(table_name, columns, column_types) + + # 创建表结构 + async with self.mysql_db.get_session() as session: + model_class.__table__.create(session.bind, checkfirst=True) + + # 插入数据 + records = [] + for row in rows: + record = {} + for i, col in enumerate(columns): + if i >= len(row): + continue + col_name = self._sanitize_column_name(col) + value = row[i] + col_type = column_types.get(col, "TEXT") + + # 处理空值 + if value is None or str(value).strip() == '': + record[col_name] = None + elif col_type == "INTEGER": + try: + record[col_name] = int(value) + except (ValueError, TypeError): + record[col_name] = None + elif col_type == "FLOAT": + try: + record[col_name] = float(value) + except (ValueError, TypeError): + record[col_name] = None + else: + record[col_name] = str(value) + + records.append(record) + + # 批量插入 + async with self.mysql_db.get_session() as session: + for record in records: + session.add(model_class(**record)) + await session.commit() + + results["row_count"] = len(records) + logger.info(f"结构化数据已存储到 MySQL 表 {table_name},共 {len(records)} 行") + + return results + + except Exception as e: + logger.error(f"存储结构化数据到 MySQL 失败: {str(e)}") + return {"success": False, "error": str(e)} + + def _infer_type_from_values(self, values: List[Any]) -> str: + """ + 根据值列表推断列类型 + + Args: + values: 值列表 + + Returns: + 类型名称 + """ + non_null_values = [v for v in values if v is not None and str(v).strip() != ''] + if not non_null_values: + return "TEXT" + + # 检查是否全是整数 + is_integer = all(self._is_integer(v) for v in non_null_values) + if is_integer: + return "INTEGER" + + # 检查是否全是浮点数 + is_float = all(self._is_float(v) for v in non_null_values) + if is_float: + return "FLOAT" + + return "TEXT" + + def _is_integer(self, value: Any) -> bool: + """判断值是否可以转为整数""" + try: + int(value) + return True + except (ValueError, TypeError): + return False + + def _is_float(self, value: Any) -> bool: + """判断值是否可以转为浮点数""" + try: + float(value) + return True + except (ValueError, TypeError): + return False + async def query_table( self, table_name: str, diff --git a/backend/app/services/table_rag_service.py b/backend/app/services/table_rag_service.py new file mode 100644 index 0000000..4471e1d --- /dev/null +++ b/backend/app/services/table_rag_service.py @@ -0,0 +1,491 @@ +""" +表结构 RAG 索引服务 + +AI 自动生成表字段的语义描述,并建立向量索引 +""" +import logging +from typing import Any, Dict, List, Optional + +import pandas as pd + +from app.services.llm_service import llm_service +from app.services.rag_service import rag_service +from app.services.excel_storage_service import excel_storage_service +from app.core.database.mysql import mysql_db + +logger = logging.getLogger(__name__) + + +class TableRAGService: + """ + 表结构 RAG 索引服务 + + 核心功能: + 1. AI 根据表头和数据生成字段语义描述 + 2. 将字段描述存入向量数据库 (RAG) + 3. 支持自然语言查询表字段 + """ + + def __init__(self): + self.llm = llm_service + self.rag = rag_service + self.excel_storage = excel_storage_service + + async def generate_field_description( + self, + table_name: str, + field_name: str, + sample_values: List[Any], + all_fields: Dict[str, List[Any]] = None + ) -> str: + """ + 使用 AI 生成字段的语义描述 + + Args: + table_name: 表名 + field_name: 字段名 + sample_values: 字段示例值 (前10个) + all_fields: 其他字段的示例值,用于上下文理解 + + Returns: + 字段的语义描述 + """ + # 构建 Prompt + context = "" + if all_fields: + context = "\n其他字段示例:\n" + for fname, values in all_fields.items(): + if fname != field_name and values: + context += f"- {fname}: {', '.join([str(v) for v in values[:3]])}\n" + + prompt = f"""你是一个数据语义分析专家。请根据字段名和示例值,推断该字段的语义含义。 + +表名:{table_name} +字段名:{field_name} +示例值:{', '.join([str(v) for v in sample_values[:10] if v is not None])} +{context} + +请生成一段简洁的字段语义描述(不超过50字),说明: +1. 该字段代表什么含义 +2. 数据格式或单位(如果有) +3. 可能的业务用途 + +只输出描述文字,不要其他内容。""" + + try: + messages = [ + {"role": "system", "content": "你是一个专业的数据分析师。"}, + {"role": "user", "content": prompt} + ] + + response = await self.llm.chat( + messages=messages, + temperature=0.3, + max_tokens=200 + ) + + description = self.llm.extract_message_content(response) + return description.strip() + + except Exception as e: + logger.error(f"生成字段描述失败: {str(e)}") + return f"{field_name}: 数据字段" + + async def build_table_rag_index( + self, + file_path: str, + filename: str, + sheet_name: Optional[str] = None, + header_row: int = 0, + sample_size: int = 10 + ) -> Dict[str, Any]: + """ + 为 Excel 表构建完整的 RAG 索引 + + 流程: + 1. 读取 Excel 获取字段信息 + 2. AI 生成每个字段的语义描述 + 3. 将字段描述存入向量数据库 + + Args: + file_path: Excel 文件路径 + filename: 原始文件名 + sheet_name: 工作表名称 + header_row: 表头行号 + sample_size: 每个字段采样的数据条数 + + Returns: + 索引构建结果 + """ + results = { + "success": True, + "table_name": "", + "field_count": 0, + "indexed_fields": [], + "errors": [] + } + + try: + # 1. 读取 Excel + if sheet_name: + df = pd.read_excel(file_path, sheet_name=sheet_name, header=header_row) + else: + df = pd.read_excel(file_path, header=header_row) + + if df.empty: + return {"success": False, "error": "Excel 文件为空"} + + # 清理列名 + df.columns = [str(c) for c in df.columns] + table_name = excel_storage._sanitize_table_name(filename) + results["table_name"] = table_name + results["field_count"] = len(df.columns) + + # 2. 初始化 RAG (如果需要) + if not self.rag._initialized: + self.rag._init_vector_store() + + # 3. 为每个字段生成描述并索引 + all_fields_data = {} + for col in df.columns: + # 采样示例值 + sample_values = df[col].dropna().head(sample_size).tolist() + all_fields_data[col] = sample_values + + # 批量生成描述(避免过多 API 调用) + indexed_count = 0 + for col in df.columns: + try: + sample_values = all_fields_data[col] + + # 生成描述 + description = await self.generate_field_description( + table_name=table_name, + field_name=col, + sample_values=sample_values, + all_fields=all_fields_data + ) + + # 存入 RAG + self.rag.index_field( + table_name=table_name, + field_name=col, + field_description=description, + sample_values=[str(v) for v in sample_values[:5]] + ) + + indexed_count += 1 + results["indexed_fields"].append({ + "field": col, + "description": description + }) + + logger.info(f"字段已索引: {table_name}.{col}") + + except Exception as e: + error_msg = f"字段 {col} 索引失败: {str(e)}" + logger.error(error_msg) + results["errors"].append(error_msg) + + # 4. 存储到 MySQL + store_result = await self.excel_storage.store_excel( + file_path=file_path, + filename=filename, + sheet_name=sheet_name, + header_row=header_row + ) + + if store_result.get("success"): + results["mysql_table"] = store_result.get("table_name") + results["row_count"] = store_result.get("row_count") + else: + results["mysql_warning"] = "MySQL 存储失败: " + str(store_result.get("error")) + + results["indexed_count"] = indexed_count + logger.info(f"表 {table_name} RAG 索引构建完成,共 {indexed_count} 个字段") + + return results + + except Exception as e: + logger.error(f"构建 RAG 索引失败: {str(e)}") + return {"success": False, "error": str(e)} + + async def index_document_table( + self, + doc_id: str, + filename: str, + table_data: Dict[str, Any], + source_doc_type: str + ) -> Dict[str, Any]: + """ + 为非结构化文档中提取的表格建立 MySQL 存储和 RAG 索引 + + Args: + doc_id: 源文档 ID + filename: 源文件名 + table_data: 表格数据,支持两种格式: + 1. docx/txt格式: {"rows": [["col1", "col2"], ["val1", "val2"]], ...} + 2. md格式: {"headers": [...], "rows": [...], ...} + source_doc_type: 源文档类型 (docx/md/txt) + + Returns: + 索引构建结果 + """ + results = { + "success": True, + "table_name": "", + "field_count": 0, + "indexed_fields": [], + "errors": [] + } + + try: + # 兼容两种格式 + if "headers" in table_data: + # md 格式:headers 和 rows 分开 + columns = table_data.get("headers", []) + data_rows = table_data.get("rows", []) + else: + # docx/txt 格式:第一行作为表头 + rows = table_data.get("rows", []) + if not rows or len(rows) < 2: + return {"success": False, "error": "表格数据不足"} + columns = rows[0] + data_rows = rows[1:] + + # 生成表名:源文件 + 表格索引 + base_name = self.excel_storage._sanitize_table_name(filename) + table_name = f"{base_name}_table{table_data.get('table_index', 0)}" + + results["table_name"] = table_name + results["field_count"] = len(columns) + + # 1. 初始化 RAG + if not self.rag._initialized: + self.rag._init_vector_store() + + # 2. 准备结构化数据 + structured_data = { + "columns": columns, + "rows": data_rows + } + + # 3. 存储到 MySQL + store_result = await self.excel_storage.store_structured_data( + table_name=table_name, + data=structured_data, + source_doc_id=doc_id + ) + + if store_result.get("success"): + results["mysql_table"] = store_result.get("table_name") + results["row_count"] = store_result.get("row_count") + else: + results["mysql_warning"] = "MySQL 存储失败: " + str(store_result.get("error")) + + # 4. 为每个字段生成描述并索引 + all_fields_data = {} + for i, col in enumerate(columns): + col_values = [row[i] for row in data_rows if i < len(row)] + all_fields_data[col] = col_values + + indexed_count = 0 + for col in columns: + try: + col_values = all_fields_data.get(col, []) + + # 生成描述 + description = await self.generate_field_description( + table_name=table_name, + field_name=col, + sample_values=col_values[:10], + all_fields=all_fields_data + ) + + # 存入 RAG + self.rag.index_field( + table_name=table_name, + field_name=col, + field_description=description, + sample_values=[str(v) for v in col_values[:5]] + ) + + indexed_count += 1 + results["indexed_fields"].append({ + "field": col, + "description": description + }) + + logger.info(f"文档表格字段已索引: {table_name}.{col}") + + except Exception as e: + error_msg = f"字段 {col} 索引失败: {str(e)}" + logger.error(error_msg) + results["errors"].append(error_msg) + + results["indexed_count"] = indexed_count + logger.info(f"文档表格 {table_name} RAG 索引构建完成,共 {indexed_count} 个字段") + + return results + + except Exception as e: + logger.error(f"构建文档表格 RAG 索引失败: {str(e)}") + return {"success": False, "error": str(e)} + + async def query_table_by_natural_language( + self, + user_query: str, + top_k: int = 5 + ) -> Dict[str, Any]: + """ + 根据自然语言查询相关表字段 + + Args: + user_query: 用户查询 + top_k: 返回数量 + + Returns: + 匹配的字段信息 + """ + try: + # 1. RAG 检索 + rag_results = self.rag.retrieve(user_query, top_k=top_k) + + # 2. 解析检索结果 + matched_fields = [] + for result in rag_results: + metadata = result.get("metadata", {}) + matched_fields.append({ + "table_name": metadata.get("table_name", ""), + "field_name": metadata.get("field_name", ""), + "description": result.get("content", ""), + "score": result.get("score", 0), + "sample_values": [] # 可以后续补充 + }) + + return { + "success": True, + "query": user_query, + "matched_fields": matched_fields, + "count": len(matched_fields) + } + + except Exception as e: + logger.error(f"查询失败: {str(e)}") + return {"success": False, "error": str(e)} + + async def get_table_fields_with_description( + self, + table_name: str + ) -> List[Dict[str, Any]]: + """ + 获取表的字段及其描述 + + Args: + table_name: 表名 + + Returns: + 字段列表 + """ + try: + # 从 RAG 检索该表的所有字段 + results = self.rag.retrieve_by_table(table_name, top_k=50) + + fields = [] + for result in results: + metadata = result.get("metadata", {}) + fields.append({ + "table_name": metadata.get("table_name", ""), + "field_name": metadata.get("field_name", ""), + "description": result.get("content", ""), + "score": result.get("score", 0) + }) + + return fields + + except Exception as e: + logger.error(f"获取字段失败: {str(e)}") + return [] + + async def rebuild_all_table_indexes(self) -> Dict[str, Any]: + """ + 重建所有表的 RAG 索引 + + 从 MySQL 读取所有表结构,重新生成描述并索引 + """ + try: + # 清空现有索引 + self.rag.clear() + + # 获取所有表 + tables = await self.excel_storage.list_tables() + + results = { + "success": True, + "tables_processed": 0, + "total_fields": 0, + "errors": [] + } + + for table_name in tables: + try: + # 获取表结构 + schema = await self.excel_storage.get_table_schema(table_name) + + if not schema: + continue + + # 初始化 RAG + if not self.rag._initialized: + self.rag._init_vector_store() + + # 为每个字段生成描述并索引 + for col_info in schema: + field_name = col_info.get("COLUMN_NAME", "") + if field_name in ["id", "created_at", "updated_at"]: + continue + + # 采样数据 + samples = await self.excel_storage.query_table( + table_name, + columns=[field_name], + limit=10 + ) + sample_values = [r.get(field_name) for r in samples if r.get(field_name)] + + # 生成描述 + description = await self.generate_field_description( + table_name=table_name, + field_name=field_name, + sample_values=sample_values + ) + + # 索引 + self.rag.index_field( + table_name=table_name, + field_name=field_name, + field_description=description, + sample_values=[str(v) for v in sample_values[:5]] + ) + + results["total_fields"] += 1 + + results["tables_processed"] += 1 + logger.info(f"表 {table_name} 索引重建完成") + + except Exception as e: + error_msg = f"表 {table_name} 索引失败: {str(e)}" + logger.error(error_msg) + results["errors"].append(error_msg) + + logger.info(f"全部 {results['tables_processed']} 个表索引重建完成") + return results + + except Exception as e: + logger.error(f"重建索引失败: {str(e)}") + return {"success": False, "error": str(e)} + + +# ==================== 全局单例 ==================== + +table_rag_service = TableRAGService()