Files
FilesReadSystem/backend/app/core/database/mongodb.py
dj 2c2ab56d2d 修复智能填表功能:支持直接从结构化数据提取列值并完善JSON解析
- 新增 _extract_values_from_structured_data 方法,直接从Excel rows提取列值
- 新增 _extract_values_by_regex 方法,使用正则从损坏的JSON中提取值
- 增大 max_tokens (500→50000) 和 max_length (8000→200000) 限制
- 改进JSON解析逻辑,处理markdown代码块包裹和不完整JSON
- 解决LLM返回被截断的JSON无法正确解析的问题
2026-04-09 19:33:05 +08:00

251 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MongoDB 数据库连接管理模块
提供非结构化数据的存储和查询功能
"""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from app.config import settings
logger = logging.getLogger(__name__)
class MongoDB:
"""MongoDB 数据库管理类"""
def __init__(self):
self.client: Optional[AsyncIOMotorClient] = None
self.db: Optional[AsyncIOMotorDatabase] = None
async def connect(self):
"""建立 MongoDB 连接"""
try:
self.client = AsyncIOMotorClient(
settings.MONGODB_URL,
serverSelectionTimeoutMS=30000, # 30秒超时适应远程服务器
connectTimeoutMS=30000, # 连接超时
socketTimeoutMS=60000, # Socket 超时
)
self.db = self.client[settings.MONGODB_DB_NAME]
# 验证连接
await self.client.admin.command('ping')
logger.info(f"MongoDB 连接成功: {settings.MONGODB_DB_NAME}")
except Exception as e:
logger.error(f"MongoDB 连接失败: {e}")
raise
async def close(self):
"""关闭 MongoDB 连接"""
if self.client:
self.client.close()
logger.info("MongoDB 连接已关闭")
@property
def documents(self):
"""文档集合 - 存储原始文档和解析结果"""
return self.db["documents"]
@property
def embeddings(self):
"""向量嵌入集合 - 存储文本嵌入向量"""
return self.db["embeddings"]
@property
def rag_index(self):
"""RAG索引集合 - 存储字段语义索引"""
return self.db["rag_index"]
# ==================== 文档操作 ====================
async def insert_document(
self,
doc_type: str,
content: str,
metadata: Dict[str, Any],
structured_data: Optional[Dict[str, Any]] = None,
) -> str:
"""
插入文档
Args:
doc_type: 文档类型 (docx/xlsx/md/txt)
content: 原始文本内容
metadata: 元数据
structured_data: 结构化数据 (表格等)
Returns:
插入文档的ID
"""
document = {
"doc_type": doc_type,
"content": content,
"metadata": metadata,
"structured_data": structured_data,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
}
result = await self.documents.insert_one(document)
doc_id = str(result.inserted_id)
filename = metadata.get("original_filename", "unknown")
logger.info(f"✓ 文档已存入MongoDB: [{doc_type}] {filename} | ID: {doc_id}")
return doc_id
async def get_document(self, doc_id: str) -> Optional[Dict[str, Any]]:
"""根据ID获取文档"""
from bson import ObjectId
doc = await self.documents.find_one({"_id": ObjectId(doc_id)})
if doc:
doc["_id"] = str(doc["_id"])
return doc
async def search_documents(
self,
query: str,
doc_type: Optional[str] = None,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""
搜索文档
Args:
query: 搜索关键词
doc_type: 文档类型过滤
limit: 返回数量
Returns:
文档列表
"""
filter_query = {"content": {"$regex": query}}
if doc_type:
filter_query["doc_type"] = doc_type
cursor = self.documents.find(filter_query).limit(limit)
documents = []
async for doc in cursor:
doc["_id"] = str(doc["_id"])
documents.append(doc)
return documents
async def delete_document(self, doc_id: str) -> bool:
"""删除文档"""
from bson import ObjectId
result = await self.documents.delete_one({"_id": ObjectId(doc_id)})
return result.deleted_count > 0
# ==================== RAG 索引操作 ====================
async def insert_rag_entry(
self,
table_name: str,
field_name: str,
field_description: str,
embedding: List[float],
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""
插入RAG索引条目
Args:
table_name: 表名
field_name: 字段名
field_description: 字段描述
embedding: 向量嵌入
metadata: 其他元数据
Returns:
插入条目的ID
"""
entry = {
"table_name": table_name,
"field_name": field_name,
"field_description": field_description,
"embedding": embedding,
"metadata": metadata or {},
"created_at": datetime.utcnow(),
}
result = await self.rag_index.insert_one(entry)
return str(result.inserted_id)
async def search_rag(
self,
query_embedding: List[float],
top_k: int = 5,
table_name: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
搜索RAG索引 (使用向量相似度)
Args:
query_embedding: 查询向量
top_k: 返回数量
table_name: 可选的表名过滤
Returns:
相关的索引条目
"""
# MongoDB 5.0+ 支持向量搜索
# 较低版本使用欧氏距离替代
pipeline = [
{
"$addFields": {
"distance": {
"$reduce": {
"input": {"$range": [0, {"$size": "$embedding"}]},
"initialValue": 0,
"in": {
"$add": [
"$$value",
{
"$pow": [
{
"$subtract": [
{"$arrayElemAt": ["$embedding", "$$this"]},
{"$arrayElemAt": [query_embedding, "$$this"]},
]
},
2,
]
},
]
},
}
}
}
},
{"$sort": {"distance": 1}},
{"$limit": top_k},
]
if table_name:
pipeline.insert(0, {"$match": {"table_name": table_name}})
results = []
async for doc in self.rag_index.aggregate(pipeline):
doc["_id"] = str(doc["_id"])
results.append(doc)
return results
# ==================== 集合管理 ====================
async def create_indexes(self):
"""创建索引以优化查询"""
# 文档集合索引
await self.documents.create_index("doc_type")
await self.documents.create_index("created_at")
await self.documents.create_index([("content", "text")])
# RAG索引集合索引
await self.rag_index.create_index("table_name")
await self.rag_index.create_index("field_name")
logger.info("MongoDB 索引创建完成")
# ==================== 全局单例 ====================
mongodb = MongoDB()