@@ -221,8 +221,23 @@ class TemplateFillService:
confidence = 0.0
)
# 优先尝试直接从结构化数据中提取列值(适用于 Excel 等有 rows 的数据)
direct_values = self . _extract_values_from_structured_data ( source_docs , field . name )
if direct_values :
logger . info ( f " ✅ 字段 { field . name } 直接从结构化数据提取到 { len ( direct_values ) } 个值 " )
return FillResult (
field = field . name ,
values = direct_values ,
value = direct_values [ 0 ] if direct_values else " " ,
source = " 结构化数据直接提取 " ,
confidence = 1.0
)
# 无法从结构化数据提取,使用 LLM
logger . info ( f " 字段 { field . name } 无法直接从结构化数据提取,使用 LLM... " )
# 构建上下文文本 - 传入字段名,只提取该列数据
context_text = self . _build_context_text ( source_docs , field_name = field . name , max_length = 8 000)
context_text = self . _build_context_text ( source_docs , field_name = field . name , max_length = 200 000)
# 构建提示词
hint_text = field . hint if field . hint else f " 请提取 { field . name } 的信息 "
@@ -255,7 +270,7 @@ class TemplateFillService:
response = await self . llm . chat (
messages = messages ,
temperature = 0.1 ,
max_tokens = 500
max_tokens = 50000
)
content = self . llm . extract_message_content ( response )
@@ -264,60 +279,76 @@ class TemplateFillService:
import json
import re
# 尝试提取 JSON, 使用更严格的匹配
extracted_values = [ ]
extracted_value = " "
extracted_source = " LLM生成 "
confidence = 0.5
try :
# 方法1: 尝试直接解析整个 content
result = json . loads ( content )
if isinstance ( result , dict ) :
# 优先使用 values 数组格式
if " values " in result and isinstance ( result [ " values " ] , list ) :
extracted_values = [ str ( v ) for v in result [ " values " ] ]
logger . info ( f " 字段 { field . name } 使用 values 数组格式: { len ( extracted_values ) } 个值 " )
elif " value " in result :
extracted_value = str ( result . get ( " value " , " " ) )
extracted_values = [ extracted_value ] if extracted_value else [ ]
extracted_source = result . get ( " source " , " LLM生成 " )
confidence = float ( result . get ( " confidence " , 0.5 ) )
logger . info ( f " 字段 { field . name } 直接 JSON 解析成功 " )
except json . JSONDecodeError :
# 方法2: 尝试提取 JSON 对象
json_match = re . search ( r ' \ { [ \ s \ S]* \ } ' , content )
if json_match :
try :
result = json . loads ( json_match . group ( ) )
if isinstance ( result , dict ) :
# 优先使用 values 数组格式
if " values " in result and isinstance ( result [ " values " ] , list ) :
extracted_values = [ str ( v ) for v in result [ " values " ] ]
logger . info ( f " 字段 { field . name } 使用 values 数组格式: { len ( extracted_values ) } 个值 " )
elif " value " in result :
extracted_value = str ( result . get ( " value " , " " ) )
extracted_values = [ extracted_value ] if extracted_value else [ ]
extracted_source = result . get ( " source " , " LLM生成 " )
confidence = float ( result . get ( " confidence " , 0.5 ) )
logger . info ( f " 字段 { field . name } 正则 JSON 解析成功 " )
else :
logger . warning ( f " 字段 { field . name } JSON 不是字典格式 " )
except json . JSONDecodeError as e :
logger . error ( f " 字段 { field . name } JSON 解析失败: { str ( e ) } " )
# 如果 JSON 解析失败,尝试从文本中提取
extracted_values = self . _extract_values_from_text ( content , field . name )
extracted_source = " 文本提取 "
confidence = 0.3
else :
logger . warning ( f " 字段 { field . name } 未找到 JSON: { content [ : 200 ] } " )
extracted_values = self . _extract_values_from_text ( content , field . name )
extracted_source = " 文本提取 "
confidence = 0.3
logger . info ( f " 原始 LLM 返回: { content [ : 500 ] } " )
# 如果没有提取到值,返回空
# ========== 步骤1: 彻底清理 markdown 和各种格式问题 ==========
# 移除 ```json 和 ``` 标记
cleaned = content . strip ( )
cleaned = re . sub ( r ' ^```json \ s* ' , ' ' , cleaned , flags = re . MULTILINE )
cleaned = re . sub ( r ' ^``` \ s* ' , ' ' , cleaned , flags = re . MULTILINE )
cleaned = cleaned . strip ( )
logger . info ( f " 清理后: { cleaned [ : 500 ] } " )
# ========== 步骤2: 定位 JSON 开始位置 ==========
json_start = - 1
# 找到第一个 { 或 [
for i , c in enumerate ( cleaned ) :
if c == ' { ' or c == ' [ ' :
json_start = i
break
if json_start == - 1 :
logger . warning ( f " 无法找到 JSON 开始位置 " )
extracted_values = self . _extract_values_from_text ( cleaned , field . name )
else :
json_text = cleaned [ json_start : ]
logger . info ( f " JSON 开始位置: { json_start } , 内容: { json_text [ : 200 ] } " )
# ========== 步骤3: 尝试解析 JSON ==========
# 3a. 尝试直接解析整个字符串
try :
result = json . loads ( json_text )
extracted_values = self . _extract_values_from_json ( result )
if extracted_values :
logger . info ( f " ✅ 直接解析成功,得到 { len ( extracted_values ) } 个值 " )
else :
logger . warning ( f " 直接解析成功但未提取到值 " )
except json . JSONDecodeError as e :
logger . warning ( f " 直接解析失败: { e } , 尝试修复... " )
# 3b. 尝试修复常见的 JSON 问题
# 尝试1: 找到配对的闭合括号
fixed_json = self . _fix_json ( json_text )
if fixed_json :
try :
result = json . loads ( fixed_json )
extracted_values = self . _extract_values_from_json ( result )
if extracted_values :
logger . info ( f " ✅ 修复后解析成功,得到 { len ( extracted_values ) } 个值 " )
except json . JSONDecodeError as e2 :
logger . warning ( f " 修复后仍然失败: { e2 } " )
# 3c. 如果以上都失败,使用正则直接从文本提取 values 数组
if not extracted_values :
extracted_values = self . _extract_values_by_regex ( cleaned )
if extracted_values :
logger . info ( f " ✅ 正则提取成功,得到 { len ( extracted_values ) } 个值 " )
else :
# 最后的备选:使用旧的文本提取
extracted_values = self . _extract_values_from_text ( cleaned , field . name )
# 如果仍然没有提取到值
if not extracted_values :
extracted_values = [ " " ]
logger . warning ( f " ❌ 字段 { field . name } 没有提取到值 " )
logger . info ( f " ✅✅ 字段 { field . name } 最终返回: { len ( extracted_values ) } 个值, 示例: { extracted_values [ : 3 ] } " )
return FillResult (
field = field . name ,
@@ -497,11 +528,45 @@ class TemplateFillService:
try :
import pandas as pd
df = pd . read_excel ( file_path , nrows = 5 )
# 尝试读取 Excel 文件
try :
# header=0 表示第一行是表头
df = pd . read_excel ( file_path , header = 0 , nrows = 5 )
except Exception as e :
logger . warning ( f " pandas 读取 Excel 表头失败,尝试无表头模式: { e } " )
# 如果失败,尝试不使用表头模式
df = pd . read_excel ( file_path , header = None , nrows = 5 )
# 如果没有表头,使用列索引作为列名
if df . shape [ 1 ] > 0 :
# 检查第一行是否可以作为表头
first_row = df . iloc [ 0 ] . tolist ( )
if all ( pd . notna ( v ) and str ( v ) . strip ( ) != ' ' for v in first_row ) :
# 第一行有内容,作为表头
df . columns = [ str ( v ) if pd . notna ( v ) else f " 列 { i } " for i , v in enumerate ( first_row ) ]
df = df . iloc [ 1 : ] # 移除表头行
else :
# 第一行不是有效表头,使用默认列名
df . columns = [ f " 列 { i } " for i in range ( df . shape [ 1 ] ) ]
logger . info ( f " 读取 Excel 表头: { df . shape } , 列: { list ( df . columns ) [ : 10 ] } " )
# 如果 DataFrame 列为空或只有默认索引,尝试其他方式
if len ( df . columns ) == 0 or ( len ( df . columns ) == 1 and df . columns [ 0 ] == 0 ) :
logger . warning ( f " 表头解析结果异常,重新解析: { df . columns } " )
# 尝试读取整个文件获取列信息
df_full = pd . read_excel ( file_path , header = None )
if df_full . shape [ 1 ] > 0 :
# 使用第一行作为列名
df = df_full
df . columns = [ str ( v ) if pd . notna ( v ) and str ( v ) . strip ( ) else f " 列 { i } " for i , v in enumerate ( df . iloc [ 0 ] ) ]
df = df . iloc [ 1 : ]
for idx , col in enumerate ( df . columns ) :
cell = self . _column_to_cell ( idx )
col_str = str ( col )
if col_str == ' 0 ' or col_str . startswith ( ' Unnamed ' ) :
col_str = f " 字段 { idx + 1 } "
fields . append ( TemplateField (
cell = cell ,
@@ -511,8 +576,10 @@ class TemplateFillService:
hint = " "
) )
logger . info ( f " 从 Excel 提取到 { len ( fields ) } 个字段 " )
except Exception as e :
logger . error ( f " 从Excel提取字段失败: { str ( e ) } " )
logger . error ( f " 从Excel提取字段失败: { str ( e ) } " , exc_info = True )
return fields
@@ -606,6 +673,238 @@ class TemplateFillService:
values = self . _extract_values_from_text ( text , field_name )
return values [ 0 ] if values else " "
def _extract_values_from_structured_data ( self , source_docs : List [ SourceDocument ] , field_name : str ) - > List [ str ] :
"""
从结构化数据( Excel rows) 中直接提取指定列的值
适用于有 rows 结构的文档数据,无需 LLM 即可提取
Args:
source_docs: 源文档列表
field_name: 字段名称
Returns:
值列表,如果无法提取则返回空列表
"""
all_values = [ ]
for doc in source_docs :
# 尝试从 structured_data 中提取
structured = doc . structured_data
if not structured :
continue
# 处理多 sheet 格式: {sheets: {sheet_name: {columns, rows}}}
if structured . get ( " sheets " ) :
sheets = structured . get ( " sheets " , { } )
for sheet_name , sheet_data in sheets . items ( ) :
if isinstance ( sheet_data , dict ) :
columns = sheet_data . get ( " columns " , [ ] )
rows = sheet_data . get ( " rows " , [ ] )
values = self . _extract_column_values ( rows , columns , field_name )
if values :
all_values . extend ( values )
logger . info ( f " 从 sheet { sheet_name } 提取到 { len ( values ) } 个值 " )
break # 只用第一个匹配的 sheet
if all_values :
break
# 处理单 sheet 格式: {columns: [...], rows: [...]}
elif structured . get ( " rows " ) :
columns = structured . get ( " columns " , [ ] )
rows = structured . get ( " rows " , [ ] )
values = self . _extract_column_values ( rows , columns , field_name )
if values :
all_values . extend ( values )
logger . info ( f " 从文档 { doc . filename } 提取到 { len ( values ) } 个值 " )
break
return all_values
def _extract_column_values ( self , rows : List , columns : List , field_name : str ) - > List [ str ] :
"""
从 rows 和 columns 中提取指定列的值
Args:
rows: 行数据列表
columns: 列名列表
field_name: 要提取的字段名
Returns:
值列表
"""
if not rows or not columns :
return [ ]
# 查找匹配的列(模糊匹配)
target_col = None
for col in columns :
col_str = str ( col )
if field_name . lower ( ) in col_str . lower ( ) or col_str . lower ( ) in field_name . lower ( ) :
target_col = col
break
if not target_col :
logger . warning ( f " 未找到匹配列: { field_name } , 可用列: { columns } " )
return [ ]
values = [ ]
for row in rows :
if isinstance ( row , dict ) :
val = row . get ( target_col , " " )
elif isinstance ( row , list ) and target_col in columns :
val = row [ columns . index ( target_col ) ]
else :
val = " "
values . append ( str ( val ) if val is not None else " " )
return values
def _extract_values_from_json ( self , result ) - > List [ str ] :
"""
从解析后的 JSON 对象/数组中提取值数组
Args:
result: json.loads() 返回的对象
Returns:
值列表
"""
if isinstance ( result , dict ) :
# 优先找 values 数组
if " values " in result and isinstance ( result [ " values " ] , list ) :
vals = [ str ( v ) . strip ( ) for v in result [ " values " ] if v and str ( v ) . strip ( ) ]
if vals :
return vals
# 尝试找 value 字段
if " value " in result :
val = str ( result [ " value " ] ) . strip ( )
if val :
return [ val ]
# 尝试找任何数组类型的键
for key in result . keys ( ) :
val = result [ key ]
if isinstance ( val , list ) and len ( val ) > 0 :
if all ( isinstance ( v , ( str , int , float , bool ) ) or v is None for v in val ) :
vals = [ str ( v ) . strip ( ) for v in val if v is not None and str ( v ) . strip ( ) ]
if vals :
return vals
elif isinstance ( val , ( str , int , float , bool ) ) :
return [ str ( val ) . strip ( ) ]
elif isinstance ( result , list ) :
vals = [ str ( v ) . strip ( ) for v in result if v is not None and str ( v ) . strip ( ) ]
if vals :
return vals
return [ ]
def _fix_json ( self , json_text : str ) - > str :
"""
尝试修复损坏的 JSON 字符串
Args:
json_text: 原始 JSON 文本
Returns:
修复后的 JSON 文本,如果无法修复则返回空字符串
"""
import re
# 如果以 { 开头,尝试找到配对的 }
if json_text . startswith ( ' { ' ) :
# 统计括号深度
depth = 0
end_pos = - 1
for i , c in enumerate ( json_text ) :
if c == ' { ' :
depth + = 1
elif c == ' } ' :
depth - = 1
if depth == 0 :
end_pos = i + 1
break
if end_pos > 0 :
fixed = json_text [ : end_pos ]
logger . info ( f " 修复 JSON (配对括号): { fixed [ : 200 ] } " )
return fixed
# 如果找不到配对,尝试移除 trailing comma 和其他问题
# 移除末尾多余的逗号
fixed = re . sub ( r ' , \ s*([} \ ]]) ' , r ' \ 1 ' , json_text )
# 确保以 } 结尾
fixed = fixed . strip ( )
if fixed and not fixed . endswith ( ' } ' ) and not fixed . endswith ( ' ] ' ) :
# 尝试补全
if fixed . startswith ( ' { ' ) and not fixed . endswith ( ' } ' ) :
fixed = fixed + ' } '
elif fixed . startswith ( ' [ ' ) and not fixed . endswith ( ' ] ' ) :
fixed = fixed + ' ] '
logger . info ( f " 修复 JSON (正则): { fixed [ : 200 ] } " )
return fixed
# 如果以 [ 开头
elif json_text . startswith ( ' [ ' ) :
depth = 0
end_pos = - 1
for i , c in enumerate ( json_text ) :
if c == ' [ ' :
depth + = 1
elif c == ' ] ' :
depth - = 1
if depth == 0 :
end_pos = i + 1
break
if end_pos > 0 :
fixed = json_text [ : end_pos ]
logger . info ( f " 修复 JSON (数组配对): { fixed [ : 200 ] } " )
return fixed
return " "
def _extract_values_by_regex ( self , text : str ) - > List [ str ] :
"""
使用正则从损坏/不完整的 JSON 文本中提取 values 数组
即使 JSON 被截断,只要能看到 " values " : [...] 就能提取
Args:
text: 原始文本
Returns:
值列表
"""
import re
# 方法1: 查找 "values": [ 开始的位置
values_start = re . search ( r ' " values " \ s*: \ s* \ [ ' , text )
if values_start :
# 从 [ 之后开始提取内容
start_pos = values_start . end ( )
remaining = text [ start_pos : ]
# 提取所有被双引号包裹的字符串值
# 使用简单正则:匹配 "..." 捕获引号内的内容
values = re . findall ( r ' " ([^ " ]+) " ' , remaining )
if values :
# 过滤掉空字符串和很短的(可能是键名)
filtered = [ v . strip ( ) for v in values if v . strip ( ) and len ( v ) > 1 ]
if filtered :
logger . info ( f " 正则提取到 { len ( filtered ) } 个值: { filtered [ : 3 ] } " )
return filtered
# 方法2: 备选 - 直接查找所有 : "value" 格式的值
all_strings = re . findall ( r ' : \ s* " ([^ " ] { 1,200}) " ' , text )
if all_strings :
filtered = [ s for s in all_strings if s and len ( s ) < 500 ]
if filtered :
logger . info ( f " 备选正则提取到 { len ( filtered ) } 个值: { filtered [ : 3 ] } " )
return filtered
return [ ]
def _extract_values_from_text ( self , text : str , field_name : str ) - > List [ str ] :
"""
从非 JSON 文本中提取多个字段值
@@ -618,14 +917,51 @@ class TemplateFillService:
提取的值列表
"""
import re
import json
# 先尝试解析整个文本为 JSON, 检查是否包含嵌套的 values 数组
cleaned_text = text . strip ( )
# 移除可能的 markdown 代码块标记
cleaned_text = cleaned_text . replace ( ' ```json ' , ' ' ) . replace ( ' ``` ' , ' ' ) . strip ( )
try :
# 尝试解析整个文本为 JSON
parsed = json . loads ( cleaned_text )
if isinstance ( parsed , dict ) :
# 如果是 {"values": [...]} 格式,提取 values
if " values " in parsed and isinstance ( parsed [ " values " ] , list ) :
return [ str ( v ) . strip ( ) for v in parsed [ " values " ] if v and str ( v ) . strip ( ) ]
# 如果是其他 dict 格式,尝试找 values 键
for key in [ " values " , " value " , " data " , " result " ] :
if key in parsed and isinstance ( parsed [ key ] , list ) :
return [ str ( v ) . strip ( ) for v in parsed [ key ] if v and str ( v ) . strip ( ) ]
elif key in parsed :
return [ str ( parsed [ key ] ) . strip ( ) ]
elif isinstance ( parsed , list ) :
return [ str ( v ) . strip ( ) for v in parsed if v and str ( v ) . strip ( ) ]
except ( json . JSONDecodeError , TypeError ) :
pass
# 尝试匹配 JSON 数组格式
array_match = re . search ( r ' \ [[ \ s \ S]* \ ] ' , text )
array_match = re . search ( r ' \ [[ \ s \ S]*? \ ] ' , text )
if array_match :
try :
arr = json . loads ( array_match . group ( ) )
if isinstance ( arr , list ) :
return [ str ( v ) for v in arr if v ]
# 检查数组元素是否是 {"values": [...]} 结构
if arr and isinstance ( arr [ 0 ] , dict ) and " values " in arr [ 0 ] :
# 提取嵌套的 values
result = [ ]
for item in arr :
if isinstance ( item , dict ) and " values " in item and isinstance ( item [ " values " ] , list ) :
result . extend ( [ str ( v ) . strip ( ) for v in item [ " values " ] if v and str ( v ) . strip ( ) ] )
elif isinstance ( item , dict ) :
result . append ( str ( item ) )
else :
result . append ( str ( item ) )
if result :
return result
return [ str ( v ) . strip ( ) for v in arr if v and str ( v ) . strip ( ) ]
except :
pass