import requests
import logging
import json
import requests
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import aiohttp
import logging
from fastapi.responses import JSONResponse
import json
from typing import Optional, List, Dict
import asyncio
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def call_llm(question: str, userId: str) -> Optional[Dict]:
url = "http://192.168.100.100:7861/chat/kb_chat"
payload = {
"query": question,
"mode": "local_kb",
"kb_name": "lqbz",
"top_k": 3,
"score_threshold": 2,
"history": [
{"content": '你好', "role": "user"},
{"content": "", "role": "assistant"}
],
"stream": True, # 设置为True以启用流式传输
"model": "qwen2.5:1.5b",
"temperature": 0.7,
"max_tokens": 0,
"prompt_name": "default",
"return_direct": False
}
combined_text = ""
all_docs = []
think_blocks = [] # 存储所有的think块内容
current_think_block = "" # 当前的think块内容
in_think_block = False
try:
with requests.post(url, json=payload, stream=True) as response:
if response.status_code == 200:
for line in response.iter_lines():
if line: # 过滤掉keep-alive新行
decoded_line = line.decode('utf-8')
if decoded_line.startswith('data: '):
json_str = decoded_line[len('data: '):]
try:
json_obj = json.loads(json_str)
# 提取choices中的内容并合并
for choice in json_obj.get("choices", []):
content = choice.get("delta", {}).get("content", "")
if content:
while "" in content or "" in content:
start_index = content.find("")
end_index = content.find("")
if start_index != -1 and end_index != -1:
# 如果找到了完整的...块
if not in_think_block:
in_think_block = True
think_part = content[start_index + len(""):end_index]
current_think_block += think_part
think_blocks.append(current_think_block.strip())
current_think_block = ""
content = content[end_index + len(""):]
in_think_block = False
elif start_index != -1:
# 只有开始标签
if not in_think_block:
in_think_block = True
content = content[start_index + len(""):]
elif end_index != -1:
# 只有结束标签
current_think_block += content[:end_index]
think_blocks.append(current_think_block.strip())
current_think_block = ""
content = content[end_index + len(""):]
in_think_block = False
break # 结束当前循环,继续处理剩余内容
else:
break # 没有找到任何标签,跳出循环
if in_think_block:
current_think_block += content
else:
combined_text += content
# 如果在循环结束时仍有未完成的think块,则添加
if current_think_block and not in_think_block:
think_blocks.append(current_think_block.strip())
# 收集docs中的内容
docs = json_obj.get("docs", [])
all_docs.extend(docs)
except json.JSONDecodeError as e:
logger.error(f"无法解析的块: {json_str[:50]}...")
logger.error(f"错误信息: {e}")
# 返回结果,包括保留换行符的answer和think块列表
return {"answer": combined_text, "docs": all_docs, "think": think_blocks}
else:
logger.error(f"Error calling service: {response.status_code} - {response.text}")
return None
except requests.RequestException as e:
logger.error(f"Unexpected error while calling service: {e}")
return None
# 示例:如何调用这个函数,并显示保留换行的答案
if __name__ == "__main__":
question = "你好"
userId = "用户ID"
result = call_llm(question, userId)
print("**********")
print(result['think'])
print("**********")
if result is not None:
print("成功获取结果:")
print("Answer:\n{}".format(result["answer"])) # 直接打印,保留换行符
if result["docs"]:
print("Docs:\n{}".format(result["docs"]))
if result["think"]:
print("Think Blocks Content:")
for idx, block in enumerate(result["think"], 1):
print(f"Block {idx}:\n{block}")
else:
print("未能成功获取结果")