123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import requests
- import logging
- import json
- import requests
- from fastapi import FastAPI, HTTPException, Request
- from pydantic import BaseModel
- import aiohttp
- import logging
- from fastapi.responses import JSONResponse
- import json
- from typing import Optional, List, Dict
- import asyncio
- # 配置日志
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- def call_llm(question: str, userId: str) -> Optional[Dict]:
- url = "http://192.168.100.100:7861/chat/kb_chat"
- payload = {
- "query": question,
- "mode": "local_kb",
- "kb_name": "lqbz",
- "top_k": 3,
- "score_threshold": 2,
- "history": [
- {"content": '你好', "role": "user"},
- {"content": "", "role": "assistant"}
- ],
- "stream": True, # 设置为True以启用流式传输
- "model": "qwen2.5:1.5b",
- "temperature": 0.7,
- "max_tokens": 0,
- "prompt_name": "default",
- "return_direct": False
- }
- combined_text = ""
- all_docs = []
- think_blocks = [] # 存储所有的think块内容
- current_think_block = "" # 当前的think块内容
- in_think_block = False
- try:
- with requests.post(url, json=payload, stream=True) as response:
- if response.status_code == 200:
- for line in response.iter_lines():
- if line: # 过滤掉keep-alive新行
- decoded_line = line.decode('utf-8')
- if decoded_line.startswith('data: '):
- json_str = decoded_line[len('data: '):]
- try:
- json_obj = json.loads(json_str)
- # 提取choices中的内容并合并
- for choice in json_obj.get("choices", []):
- content = choice.get("delta", {}).get("content", "")
- if content:
- while "<think>" in content or "</think>" in content:
- start_index = content.find("<think>")
- end_index = content.find("</think>")
- if start_index != -1 and end_index != -1:
- # 如果找到了完整的<think>...</think>块
- if not in_think_block:
- in_think_block = True
- think_part = content[start_index + len("<think>"):end_index]
- current_think_block += think_part
- think_blocks.append(current_think_block.strip())
- current_think_block = ""
- content = content[end_index + len("</think>"):]
- in_think_block = False
- elif start_index != -1:
- # 只有<think>开始标签
- if not in_think_block:
- in_think_block = True
- content = content[start_index + len("<think>"):]
- elif end_index != -1:
- # 只有</think>结束标签
- current_think_block += content[:end_index]
- think_blocks.append(current_think_block.strip())
- current_think_block = ""
- content = content[end_index + len("</think>"):]
- in_think_block = False
- break # 结束当前循环,继续处理剩余内容
- else:
- break # 没有找到任何标签,跳出循环
- if in_think_block:
- current_think_block += content
- else:
- combined_text += content
- # 如果在循环结束时仍有未完成的think块,则添加
- if current_think_block and not in_think_block:
- think_blocks.append(current_think_block.strip())
- # 收集docs中的内容
- docs = json_obj.get("docs", [])
- all_docs.extend(docs)
- except json.JSONDecodeError as e:
- logger.error(f"无法解析的块: {json_str[:50]}...")
- logger.error(f"错误信息: {e}")
- # 返回结果,包括保留换行符的answer和think块列表
- return {"answer": combined_text, "docs": all_docs, "think": think_blocks}
- else:
- logger.error(f"Error calling service: {response.status_code} - {response.text}")
- return None
- except requests.RequestException as e:
- logger.error(f"Unexpected error while calling service: {e}")
- return None
- # 示例:如何调用这个函数,并显示保留换行的答案
- if __name__ == "__main__":
- question = "你好"
- userId = "用户ID"
- result = call_llm(question, userId)
- print("**********")
- print(result['think'])
- print("**********")
- if result is not None:
- print("成功获取结果:")
- print("Answer:\n{}".format(result["answer"])) # 直接打印,保留换行符
- if result["docs"]:
- print("Docs:\n{}".format(result["docs"]))
- if result["think"]:
- print("Think Blocks Content:")
- for idx, block in enumerate(result["think"], 1):
- print(f"Block {idx}:\n{block}")
- else:
- print("未能成功获取结果")
|