Compare commits
2 Commits
faff1a5977
...
df35105d16
| Author | SHA1 | Date | |
|---|---|---|---|
| df35105d16 | |||
| 2c2ab56d2d |
@@ -104,8 +104,15 @@ class XlsxParser(BaseParser):
|
|||||||
# pandas 读取失败,尝试 XML 方式
|
# pandas 读取失败,尝试 XML 方式
|
||||||
df = self._read_excel_sheet_xml(file_path, sheet_name=target_sheet, header_row=header_row)
|
df = self._read_excel_sheet_xml(file_path, sheet_name=target_sheet, header_row=header_row)
|
||||||
|
|
||||||
# 检查 DataFrame 是否为空
|
# 检查 DataFrame 是否为空(但如果有列名,仍算有效)
|
||||||
if df is None or df.empty:
|
if df is None:
|
||||||
|
return ParseResult(
|
||||||
|
success=False,
|
||||||
|
error=f"工作表 '{target_sheet}' 读取失败"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 如果 DataFrame 为空但有列名(比如模板文件),仍算有效
|
||||||
|
if df.empty and len(df.columns) == 0:
|
||||||
return ParseResult(
|
return ParseResult(
|
||||||
success=False,
|
success=False,
|
||||||
error=f"工作表 '{target_sheet}' 为空,请检查 Excel 文件内容"
|
error=f"工作表 '{target_sheet}' 为空,请检查 Excel 文件内容"
|
||||||
|
|||||||
@@ -221,8 +221,23 @@ class TemplateFillService:
|
|||||||
confidence=0.0
|
confidence=0.0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# 优先尝试直接从结构化数据中提取列值(适用于 Excel 等有 rows 的数据)
|
||||||
|
direct_values = self._extract_values_from_structured_data(source_docs, field.name)
|
||||||
|
if direct_values:
|
||||||
|
logger.info(f"✅ 字段 {field.name} 直接从结构化数据提取到 {len(direct_values)} 个值")
|
||||||
|
return FillResult(
|
||||||
|
field=field.name,
|
||||||
|
values=direct_values,
|
||||||
|
value=direct_values[0] if direct_values else "",
|
||||||
|
source="结构化数据直接提取",
|
||||||
|
confidence=1.0
|
||||||
|
)
|
||||||
|
|
||||||
|
# 无法从结构化数据提取,使用 LLM
|
||||||
|
logger.info(f"字段 {field.name} 无法直接从结构化数据提取,使用 LLM...")
|
||||||
|
|
||||||
# 构建上下文文本 - 传入字段名,只提取该列数据
|
# 构建上下文文本 - 传入字段名,只提取该列数据
|
||||||
context_text = self._build_context_text(source_docs, field_name=field.name, max_length=8000)
|
context_text = self._build_context_text(source_docs, field_name=field.name, max_length=200000)
|
||||||
|
|
||||||
# 构建提示词
|
# 构建提示词
|
||||||
hint_text = field.hint if field.hint else f"请提取{field.name}的信息"
|
hint_text = field.hint if field.hint else f"请提取{field.name}的信息"
|
||||||
@@ -255,7 +270,7 @@ class TemplateFillService:
|
|||||||
response = await self.llm.chat(
|
response = await self.llm.chat(
|
||||||
messages=messages,
|
messages=messages,
|
||||||
temperature=0.1,
|
temperature=0.1,
|
||||||
max_tokens=500
|
max_tokens=50000
|
||||||
)
|
)
|
||||||
|
|
||||||
content = self.llm.extract_message_content(response)
|
content = self.llm.extract_message_content(response)
|
||||||
@@ -264,60 +279,76 @@ class TemplateFillService:
|
|||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
|
|
||||||
# 尝试提取 JSON,使用更严格的匹配
|
|
||||||
extracted_values = []
|
extracted_values = []
|
||||||
extracted_value = ""
|
extracted_value = ""
|
||||||
extracted_source = "LLM生成"
|
extracted_source = "LLM生成"
|
||||||
confidence = 0.5
|
confidence = 0.5
|
||||||
|
|
||||||
try:
|
logger.info(f"原始 LLM 返回: {content[:500]}")
|
||||||
# 方法1: 尝试直接解析整个 content
|
|
||||||
result = json.loads(content)
|
|
||||||
if isinstance(result, dict):
|
|
||||||
# 优先使用 values 数组格式
|
|
||||||
if "values" in result and isinstance(result["values"], list):
|
|
||||||
extracted_values = [str(v) for v in result["values"]]
|
|
||||||
logger.info(f"字段 {field.name} 使用 values 数组格式: {len(extracted_values)} 个值")
|
|
||||||
elif "value" in result:
|
|
||||||
extracted_value = str(result.get("value", ""))
|
|
||||||
extracted_values = [extracted_value] if extracted_value else []
|
|
||||||
extracted_source = result.get("source", "LLM生成")
|
|
||||||
confidence = float(result.get("confidence", 0.5))
|
|
||||||
logger.info(f"字段 {field.name} 直接 JSON 解析成功")
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
# 方法2: 尝试提取 JSON 对象
|
|
||||||
json_match = re.search(r'\{[\s\S]*\}', content)
|
|
||||||
if json_match:
|
|
||||||
try:
|
|
||||||
result = json.loads(json_match.group())
|
|
||||||
if isinstance(result, dict):
|
|
||||||
# 优先使用 values 数组格式
|
|
||||||
if "values" in result and isinstance(result["values"], list):
|
|
||||||
extracted_values = [str(v) for v in result["values"]]
|
|
||||||
logger.info(f"字段 {field.name} 使用 values 数组格式: {len(extracted_values)} 个值")
|
|
||||||
elif "value" in result:
|
|
||||||
extracted_value = str(result.get("value", ""))
|
|
||||||
extracted_values = [extracted_value] if extracted_value else []
|
|
||||||
extracted_source = result.get("source", "LLM生成")
|
|
||||||
confidence = float(result.get("confidence", 0.5))
|
|
||||||
logger.info(f"字段 {field.name} 正则 JSON 解析成功")
|
|
||||||
else:
|
|
||||||
logger.warning(f"字段 {field.name} JSON 不是字典格式")
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
logger.error(f"字段 {field.name} JSON 解析失败: {str(e)}")
|
|
||||||
# 如果 JSON 解析失败,尝试从文本中提取
|
|
||||||
extracted_values = self._extract_values_from_text(content, field.name)
|
|
||||||
extracted_source = "文本提取"
|
|
||||||
confidence = 0.3
|
|
||||||
else:
|
|
||||||
logger.warning(f"字段 {field.name} 未找到 JSON: {content[:200]}")
|
|
||||||
extracted_values = self._extract_values_from_text(content, field.name)
|
|
||||||
extracted_source = "文本提取"
|
|
||||||
confidence = 0.3
|
|
||||||
|
|
||||||
# 如果没有提取到值,返回空
|
# ========== 步骤1: 彻底清理 markdown 和各种格式问题 ==========
|
||||||
|
# 移除 ```json 和 ``` 标记
|
||||||
|
cleaned = content.strip()
|
||||||
|
cleaned = re.sub(r'^```json\s*', '', cleaned, flags=re.MULTILINE)
|
||||||
|
cleaned = re.sub(r'^```\s*', '', cleaned, flags=re.MULTILINE)
|
||||||
|
cleaned = cleaned.strip()
|
||||||
|
|
||||||
|
logger.info(f"清理后: {cleaned[:500]}")
|
||||||
|
|
||||||
|
# ========== 步骤2: 定位 JSON 开始位置 ==========
|
||||||
|
json_start = -1
|
||||||
|
# 找到第一个 { 或 [
|
||||||
|
for i, c in enumerate(cleaned):
|
||||||
|
if c == '{' or c == '[':
|
||||||
|
json_start = i
|
||||||
|
break
|
||||||
|
|
||||||
|
if json_start == -1:
|
||||||
|
logger.warning(f"无法找到 JSON 开始位置")
|
||||||
|
extracted_values = self._extract_values_from_text(cleaned, field.name)
|
||||||
|
else:
|
||||||
|
json_text = cleaned[json_start:]
|
||||||
|
logger.info(f"JSON 开始位置: {json_start}, 内容: {json_text[:200]}")
|
||||||
|
|
||||||
|
# ========== 步骤3: 尝试解析 JSON ==========
|
||||||
|
# 3a. 尝试直接解析整个字符串
|
||||||
|
try:
|
||||||
|
result = json.loads(json_text)
|
||||||
|
extracted_values = self._extract_values_from_json(result)
|
||||||
|
if extracted_values:
|
||||||
|
logger.info(f"✅ 直接解析成功,得到 {len(extracted_values)} 个值")
|
||||||
|
else:
|
||||||
|
logger.warning(f"直接解析成功但未提取到值")
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logger.warning(f"直接解析失败: {e}, 尝试修复...")
|
||||||
|
|
||||||
|
# 3b. 尝试修复常见的 JSON 问题
|
||||||
|
# 尝试1: 找到配对的闭合括号
|
||||||
|
fixed_json = self._fix_json(json_text)
|
||||||
|
if fixed_json:
|
||||||
|
try:
|
||||||
|
result = json.loads(fixed_json)
|
||||||
|
extracted_values = self._extract_values_from_json(result)
|
||||||
|
if extracted_values:
|
||||||
|
logger.info(f"✅ 修复后解析成功,得到 {len(extracted_values)} 个值")
|
||||||
|
except json.JSONDecodeError as e2:
|
||||||
|
logger.warning(f"修复后仍然失败: {e2}")
|
||||||
|
|
||||||
|
# 3c. 如果以上都失败,使用正则直接从文本提取 values 数组
|
||||||
|
if not extracted_values:
|
||||||
|
extracted_values = self._extract_values_by_regex(cleaned)
|
||||||
|
if extracted_values:
|
||||||
|
logger.info(f"✅ 正则提取成功,得到 {len(extracted_values)} 个值")
|
||||||
|
else:
|
||||||
|
# 最后的备选:使用旧的文本提取
|
||||||
|
extracted_values = self._extract_values_from_text(cleaned, field.name)
|
||||||
|
|
||||||
|
# 如果仍然没有提取到值
|
||||||
if not extracted_values:
|
if not extracted_values:
|
||||||
extracted_values = [""]
|
extracted_values = [""]
|
||||||
|
logger.warning(f"❌ 字段 {field.name} 没有提取到值")
|
||||||
|
|
||||||
|
logger.info(f"✅✅ 字段 {field.name} 最终返回: {len(extracted_values)} 个值, 示例: {extracted_values[:3]}")
|
||||||
|
|
||||||
return FillResult(
|
return FillResult(
|
||||||
field=field.name,
|
field=field.name,
|
||||||
@@ -497,11 +528,45 @@ class TemplateFillService:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
df = pd.read_excel(file_path, nrows=5)
|
|
||||||
|
# 尝试读取 Excel 文件
|
||||||
|
try:
|
||||||
|
# header=0 表示第一行是表头
|
||||||
|
df = pd.read_excel(file_path, header=0, nrows=5)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"pandas 读取 Excel 表头失败,尝试无表头模式: {e}")
|
||||||
|
# 如果失败,尝试不使用表头模式
|
||||||
|
df = pd.read_excel(file_path, header=None, nrows=5)
|
||||||
|
# 如果没有表头,使用列索引作为列名
|
||||||
|
if df.shape[1] > 0:
|
||||||
|
# 检查第一行是否可以作为表头
|
||||||
|
first_row = df.iloc[0].tolist()
|
||||||
|
if all(pd.notna(v) and str(v).strip() != '' for v in first_row):
|
||||||
|
# 第一行有内容,作为表头
|
||||||
|
df.columns = [str(v) if pd.notna(v) else f"列{i}" for i, v in enumerate(first_row)]
|
||||||
|
df = df.iloc[1:] # 移除表头行
|
||||||
|
else:
|
||||||
|
# 第一行不是有效表头,使用默认列名
|
||||||
|
df.columns = [f"列{i}" for i in range(df.shape[1])]
|
||||||
|
|
||||||
|
logger.info(f"读取 Excel 表头: {df.shape}, 列: {list(df.columns)[:10]}")
|
||||||
|
|
||||||
|
# 如果 DataFrame 列为空或只有默认索引,尝试其他方式
|
||||||
|
if len(df.columns) == 0 or (len(df.columns) == 1 and df.columns[0] == 0):
|
||||||
|
logger.warning(f"表头解析结果异常,重新解析: {df.columns}")
|
||||||
|
# 尝试读取整个文件获取列信息
|
||||||
|
df_full = pd.read_excel(file_path, header=None)
|
||||||
|
if df_full.shape[1] > 0:
|
||||||
|
# 使用第一行作为列名
|
||||||
|
df = df_full
|
||||||
|
df.columns = [str(v) if pd.notna(v) and str(v).strip() else f"列{i}" for i, v in enumerate(df.iloc[0])]
|
||||||
|
df = df.iloc[1:]
|
||||||
|
|
||||||
for idx, col in enumerate(df.columns):
|
for idx, col in enumerate(df.columns):
|
||||||
cell = self._column_to_cell(idx)
|
cell = self._column_to_cell(idx)
|
||||||
col_str = str(col)
|
col_str = str(col)
|
||||||
|
if col_str == '0' or col_str.startswith('Unnamed'):
|
||||||
|
col_str = f"字段{idx+1}"
|
||||||
|
|
||||||
fields.append(TemplateField(
|
fields.append(TemplateField(
|
||||||
cell=cell,
|
cell=cell,
|
||||||
@@ -511,8 +576,10 @@ class TemplateFillService:
|
|||||||
hint=""
|
hint=""
|
||||||
))
|
))
|
||||||
|
|
||||||
|
logger.info(f"从 Excel 提取到 {len(fields)} 个字段")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"从Excel提取字段失败: {str(e)}")
|
logger.error(f"从Excel提取字段失败: {str(e)}", exc_info=True)
|
||||||
|
|
||||||
return fields
|
return fields
|
||||||
|
|
||||||
@@ -606,6 +673,238 @@ class TemplateFillService:
|
|||||||
values = self._extract_values_from_text(text, field_name)
|
values = self._extract_values_from_text(text, field_name)
|
||||||
return values[0] if values else ""
|
return values[0] if values else ""
|
||||||
|
|
||||||
|
def _extract_values_from_structured_data(self, source_docs: List[SourceDocument], field_name: str) -> List[str]:
|
||||||
|
"""
|
||||||
|
从结构化数据(Excel rows)中直接提取指定列的值
|
||||||
|
|
||||||
|
适用于有 rows 结构的文档数据,无需 LLM 即可提取
|
||||||
|
|
||||||
|
Args:
|
||||||
|
source_docs: 源文档列表
|
||||||
|
field_name: 字段名称
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
值列表,如果无法提取则返回空列表
|
||||||
|
"""
|
||||||
|
all_values = []
|
||||||
|
|
||||||
|
for doc in source_docs:
|
||||||
|
# 尝试从 structured_data 中提取
|
||||||
|
structured = doc.structured_data
|
||||||
|
|
||||||
|
if not structured:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 处理多 sheet 格式: {sheets: {sheet_name: {columns, rows}}}
|
||||||
|
if structured.get("sheets"):
|
||||||
|
sheets = structured.get("sheets", {})
|
||||||
|
for sheet_name, sheet_data in sheets.items():
|
||||||
|
if isinstance(sheet_data, dict):
|
||||||
|
columns = sheet_data.get("columns", [])
|
||||||
|
rows = sheet_data.get("rows", [])
|
||||||
|
values = self._extract_column_values(rows, columns, field_name)
|
||||||
|
if values:
|
||||||
|
all_values.extend(values)
|
||||||
|
logger.info(f"从 sheet {sheet_name} 提取到 {len(values)} 个值")
|
||||||
|
break # 只用第一个匹配的 sheet
|
||||||
|
if all_values:
|
||||||
|
break
|
||||||
|
|
||||||
|
# 处理单 sheet 格式: {columns: [...], rows: [...]}
|
||||||
|
elif structured.get("rows"):
|
||||||
|
columns = structured.get("columns", [])
|
||||||
|
rows = structured.get("rows", [])
|
||||||
|
values = self._extract_column_values(rows, columns, field_name)
|
||||||
|
if values:
|
||||||
|
all_values.extend(values)
|
||||||
|
logger.info(f"从文档 {doc.filename} 提取到 {len(values)} 个值")
|
||||||
|
break
|
||||||
|
|
||||||
|
return all_values
|
||||||
|
|
||||||
|
def _extract_column_values(self, rows: List, columns: List, field_name: str) -> List[str]:
|
||||||
|
"""
|
||||||
|
从 rows 和 columns 中提取指定列的值
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rows: 行数据列表
|
||||||
|
columns: 列名列表
|
||||||
|
field_name: 要提取的字段名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
值列表
|
||||||
|
"""
|
||||||
|
if not rows or not columns:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# 查找匹配的列(模糊匹配)
|
||||||
|
target_col = None
|
||||||
|
for col in columns:
|
||||||
|
col_str = str(col)
|
||||||
|
if field_name.lower() in col_str.lower() or col_str.lower() in field_name.lower():
|
||||||
|
target_col = col
|
||||||
|
break
|
||||||
|
|
||||||
|
if not target_col:
|
||||||
|
logger.warning(f"未找到匹配列: {field_name}, 可用列: {columns}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
values = []
|
||||||
|
for row in rows:
|
||||||
|
if isinstance(row, dict):
|
||||||
|
val = row.get(target_col, "")
|
||||||
|
elif isinstance(row, list) and target_col in columns:
|
||||||
|
val = row[columns.index(target_col)]
|
||||||
|
else:
|
||||||
|
val = ""
|
||||||
|
values.append(str(val) if val is not None else "")
|
||||||
|
|
||||||
|
return values
|
||||||
|
|
||||||
|
def _extract_values_from_json(self, result) -> List[str]:
|
||||||
|
"""
|
||||||
|
从解析后的 JSON 对象/数组中提取值数组
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: json.loads() 返回的对象
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
值列表
|
||||||
|
"""
|
||||||
|
if isinstance(result, dict):
|
||||||
|
# 优先找 values 数组
|
||||||
|
if "values" in result and isinstance(result["values"], list):
|
||||||
|
vals = [str(v).strip() for v in result["values"] if v and str(v).strip()]
|
||||||
|
if vals:
|
||||||
|
return vals
|
||||||
|
# 尝试找 value 字段
|
||||||
|
if "value" in result:
|
||||||
|
val = str(result["value"]).strip()
|
||||||
|
if val:
|
||||||
|
return [val]
|
||||||
|
# 尝试找任何数组类型的键
|
||||||
|
for key in result.keys():
|
||||||
|
val = result[key]
|
||||||
|
if isinstance(val, list) and len(val) > 0:
|
||||||
|
if all(isinstance(v, (str, int, float, bool)) or v is None for v in val):
|
||||||
|
vals = [str(v).strip() for v in val if v is not None and str(v).strip()]
|
||||||
|
if vals:
|
||||||
|
return vals
|
||||||
|
elif isinstance(val, (str, int, float, bool)):
|
||||||
|
return [str(val).strip()]
|
||||||
|
elif isinstance(result, list):
|
||||||
|
vals = [str(v).strip() for v in result if v is not None and str(v).strip()]
|
||||||
|
if vals:
|
||||||
|
return vals
|
||||||
|
return []
|
||||||
|
|
||||||
|
def _fix_json(self, json_text: str) -> str:
|
||||||
|
"""
|
||||||
|
尝试修复损坏的 JSON 字符串
|
||||||
|
|
||||||
|
Args:
|
||||||
|
json_text: 原始 JSON 文本
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
修复后的 JSON 文本,如果无法修复则返回空字符串
|
||||||
|
"""
|
||||||
|
import re
|
||||||
|
|
||||||
|
# 如果以 { 开头,尝试找到配对的 }
|
||||||
|
if json_text.startswith('{'):
|
||||||
|
# 统计括号深度
|
||||||
|
depth = 0
|
||||||
|
end_pos = -1
|
||||||
|
for i, c in enumerate(json_text):
|
||||||
|
if c == '{':
|
||||||
|
depth += 1
|
||||||
|
elif c == '}':
|
||||||
|
depth -= 1
|
||||||
|
if depth == 0:
|
||||||
|
end_pos = i + 1
|
||||||
|
break
|
||||||
|
|
||||||
|
if end_pos > 0:
|
||||||
|
fixed = json_text[:end_pos]
|
||||||
|
logger.info(f"修复 JSON (配对括号): {fixed[:200]}")
|
||||||
|
return fixed
|
||||||
|
|
||||||
|
# 如果找不到配对,尝试移除 trailing comma 和其他问题
|
||||||
|
# 移除末尾多余的逗号
|
||||||
|
fixed = re.sub(r',\s*([}\]])', r'\1', json_text)
|
||||||
|
# 确保以 } 结尾
|
||||||
|
fixed = fixed.strip()
|
||||||
|
if fixed and not fixed.endswith('}') and not fixed.endswith(']'):
|
||||||
|
# 尝试补全
|
||||||
|
if fixed.startswith('{') and not fixed.endswith('}'):
|
||||||
|
fixed = fixed + '}'
|
||||||
|
elif fixed.startswith('[') and not fixed.endswith(']'):
|
||||||
|
fixed = fixed + ']'
|
||||||
|
logger.info(f"修复 JSON (正则): {fixed[:200]}")
|
||||||
|
return fixed
|
||||||
|
|
||||||
|
# 如果以 [ 开头
|
||||||
|
elif json_text.startswith('['):
|
||||||
|
depth = 0
|
||||||
|
end_pos = -1
|
||||||
|
for i, c in enumerate(json_text):
|
||||||
|
if c == '[':
|
||||||
|
depth += 1
|
||||||
|
elif c == ']':
|
||||||
|
depth -= 1
|
||||||
|
if depth == 0:
|
||||||
|
end_pos = i + 1
|
||||||
|
break
|
||||||
|
|
||||||
|
if end_pos > 0:
|
||||||
|
fixed = json_text[:end_pos]
|
||||||
|
logger.info(f"修复 JSON (数组配对): {fixed[:200]}")
|
||||||
|
return fixed
|
||||||
|
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def _extract_values_by_regex(self, text: str) -> List[str]:
|
||||||
|
"""
|
||||||
|
使用正则从损坏/不完整的 JSON 文本中提取 values 数组
|
||||||
|
|
||||||
|
即使 JSON 被截断,只要能看到 "values": [...] 就能提取
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: 原始文本
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
值列表
|
||||||
|
"""
|
||||||
|
import re
|
||||||
|
|
||||||
|
# 方法1: 查找 "values": [ 开始的位置
|
||||||
|
values_start = re.search(r'"values"\s*:\s*\[', text)
|
||||||
|
if values_start:
|
||||||
|
# 从 [ 之后开始提取内容
|
||||||
|
start_pos = values_start.end()
|
||||||
|
remaining = text[start_pos:]
|
||||||
|
|
||||||
|
# 提取所有被双引号包裹的字符串值
|
||||||
|
# 使用简单正则:匹配 "..." 捕获引号内的内容
|
||||||
|
values = re.findall(r'"([^"]+)"', remaining)
|
||||||
|
|
||||||
|
if values:
|
||||||
|
# 过滤掉空字符串和很短的(可能是键名)
|
||||||
|
filtered = [v.strip() for v in values if v.strip() and len(v) > 1]
|
||||||
|
if filtered:
|
||||||
|
logger.info(f"正则提取到 {len(filtered)} 个值: {filtered[:3]}")
|
||||||
|
return filtered
|
||||||
|
|
||||||
|
# 方法2: 备选 - 直接查找所有 : "value" 格式的值
|
||||||
|
all_strings = re.findall(r':\s*"([^"]{1,200})"', text)
|
||||||
|
if all_strings:
|
||||||
|
filtered = [s for s in all_strings if s and len(s) < 500]
|
||||||
|
if filtered:
|
||||||
|
logger.info(f"备选正则提取到 {len(filtered)} 个值: {filtered[:3]}")
|
||||||
|
return filtered
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
def _extract_values_from_text(self, text: str, field_name: str) -> List[str]:
|
def _extract_values_from_text(self, text: str, field_name: str) -> List[str]:
|
||||||
"""
|
"""
|
||||||
从非 JSON 文本中提取多个字段值
|
从非 JSON 文本中提取多个字段值
|
||||||
@@ -618,14 +917,51 @@ class TemplateFillService:
|
|||||||
提取的值列表
|
提取的值列表
|
||||||
"""
|
"""
|
||||||
import re
|
import re
|
||||||
|
import json
|
||||||
|
|
||||||
|
# 先尝试解析整个文本为 JSON,检查是否包含嵌套的 values 数组
|
||||||
|
cleaned_text = text.strip()
|
||||||
|
# 移除可能的 markdown 代码块标记
|
||||||
|
cleaned_text = cleaned_text.replace('```json', '').replace('```', '').strip()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 尝试解析整个文本为 JSON
|
||||||
|
parsed = json.loads(cleaned_text)
|
||||||
|
if isinstance(parsed, dict):
|
||||||
|
# 如果是 {"values": [...]} 格式,提取 values
|
||||||
|
if "values" in parsed and isinstance(parsed["values"], list):
|
||||||
|
return [str(v).strip() for v in parsed["values"] if v and str(v).strip()]
|
||||||
|
# 如果是其他 dict 格式,尝试找 values 键
|
||||||
|
for key in ["values", "value", "data", "result"]:
|
||||||
|
if key in parsed and isinstance(parsed[key], list):
|
||||||
|
return [str(v).strip() for v in parsed[key] if v and str(v).strip()]
|
||||||
|
elif key in parsed:
|
||||||
|
return [str(parsed[key]).strip()]
|
||||||
|
elif isinstance(parsed, list):
|
||||||
|
return [str(v).strip() for v in parsed if v and str(v).strip()]
|
||||||
|
except (json.JSONDecodeError, TypeError):
|
||||||
|
pass
|
||||||
|
|
||||||
# 尝试匹配 JSON 数组格式
|
# 尝试匹配 JSON 数组格式
|
||||||
array_match = re.search(r'\[[\s\S]*\]', text)
|
array_match = re.search(r'\[[\s\S]*?\]', text)
|
||||||
if array_match:
|
if array_match:
|
||||||
try:
|
try:
|
||||||
arr = json.loads(array_match.group())
|
arr = json.loads(array_match.group())
|
||||||
if isinstance(arr, list):
|
if isinstance(arr, list):
|
||||||
return [str(v) for v in arr if v]
|
# 检查数组元素是否是 {"values": [...]} 结构
|
||||||
|
if arr and isinstance(arr[0], dict) and "values" in arr[0]:
|
||||||
|
# 提取嵌套的 values
|
||||||
|
result = []
|
||||||
|
for item in arr:
|
||||||
|
if isinstance(item, dict) and "values" in item and isinstance(item["values"], list):
|
||||||
|
result.extend([str(v).strip() for v in item["values"] if v and str(v).strip()])
|
||||||
|
elif isinstance(item, dict):
|
||||||
|
result.append(str(item))
|
||||||
|
else:
|
||||||
|
result.append(str(item))
|
||||||
|
if result:
|
||||||
|
return result
|
||||||
|
return [str(v).strip() for v in arr if v and str(v).strip()]
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user