当前位置: 首页 > news >正文

网站和数据库淘宝培训

网站和数据库,淘宝培训,趣天跨境电商官网,合肥做网站yuanmus本文主要从提供SSE方式接入DeepSeek,并通过fastapi websocket对外提供接入方法。 参考文档: 腾讯云大模型:https://cloud.tencent.com/document/product/1759/109380 fastAPI官网:https://fastapi.tiangolo.com/ WebSocketManager…

本文主要从提供SSE方式接入DeepSeek,并通过fastapi websocket对外提供接入方法。
参考文档:
腾讯云大模型:https://cloud.tencent.com/document/product/1759/109380
fastAPI官网:https://fastapi.tiangolo.com/

WebSocketManager

提供WebsocketManager对websocket连接实例进行管理,代码片段如下:

from fastapi import WebSocketclass WsManager:"""websocket manager"""connectors: dict[str, WebSocket] = {}@staticmethoddef get_connector(connector_id: str) -> WebSocket | None:"""获取连接实例:param connector_id::return:"""connector = WsManager.connectors.get(connector_id)return connector@staticmethoddef add_connector(connector_id: str, connector: WebSocket):"""添加websocket客户端:param connector_id: 客户端ID:param connector: 客户端连接实例:return:"""WsManager.connectors[connector_id] = connector@staticmethoddef remove_connector(connector_id: str):"""移除websocket连接:param connector_id: 连接ID:return:"""try:del WsManager.connectors[connector_id]except Exception:pass@staticmethodasync def send_message(connector_id: str, message: str):connector = WsManager.connectors.get(connector_id)if connector is None:returntry:await connector.send_text(message)except Exception as e:print('消息发送失败')@staticmethodasync def send_message_json(connector_id: str, message: dict):connector = WsManager.connectors.get(connector_id)if connector is None:returntry:await connector.send_json(message)except Exception as e:print('消息发送失败')

接入DeepSeek

import uuid
import os
import requests
import jsonfrom src.core.WsManager import WsManager
from .session import get_sessionTCLOUD_URL = 'lke.tencentcloudapi.com'
TCLOUD_DeepSeek_SSE_URL = "https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse"SECRET_ID = "XXXXXXXXXXXXXXXXXXXX"
SECRET_KEY = "XXXXXXXXXXXXXXXXXXXXX"
REGION = "ap-guangzhou"
TYPE = 5def get_session():"""生成一个 UUID:return session id 字符串"""new_uuid = uuid.uuid4()# 将 UUID 转换为字符串session_id = str(new_uuid)return session_iddef _resolve_message(message: str, prev_message: str):"""处理消息:param message::param prev_message::return:"""if prev_message == '':return Noneif prev_message.startswith("event:"):# 生成消息message = message.replace("data:", '').strip()try:message_json = json.loads(message)message_type = message_json.get('type')payload = message_json.get('payload')if message_type == 'token_stat':# token统计事件return Noneelif message_type == 'thought':# 思考事件return Noneelif message_type == 'error':# 错误事件return Noneelif message_type == 'reference':# 参考来源事件return Nonecontent = payload.get('content')record_id = payload.get('record_id')request_id = payload.get('request_id')session_id = payload.get('session_id')trace_id = payload.get('trace_id')message_id = payload.get('message_id')return {'type': message_type,'data': {'content': content,'record_id': record_id,'request_id': request_id,'session_id': session_id,'trace_id': trace_id,'message_id': message_id,}}except Exception as e:print(e)else:return Noneasync def get_q_a_result(visitor_id: str, question: str):"""获取问答结果:param visitor_id: 访客ID:param question: 问题内容:return: 回答内容"""session_id = get_session()req_data = {"content": f"{question}","bot_app_key": "XXXXXXXX",  # 可以获取方式见下图"visitor_biz_id": visitor_id,"session_id": session_id,"streaming_throttle": 100}res = requests.post(TCLOUD_DeepSeek_SSE_URL, data=json.dumps(req_data),stream=True, headers={"Accept": "text/event-stream"})prev_message: str = ''  # 上一条消息if res.status_code == 200:for line in res.iter_lines():if line:data = line.decode('utf-8')message = _resolve_message(data, prev_message)if message:await WsManager.send_message_json(visitor_id, message)prev_message = dataelse:print('Failed to get data. Status code: {response.status_code}')# 关闭websocket连接connector = WsManager.get_connector(visitor_id)if connector:await connector.close()WsManager.remove_connector(visitor_id)
  1. 进入控制台
    在这里插入图片描述2. 创建应用,并点击调用
    在这里插入图片描述

通过fastapi 以websocket方式对外提供接口

这个接口是,前台连接websocket后,后台直接根据连接参数,开始调用问答,问答结束后自动关闭websocket连接

import asynciofrom fastapi import APIRouter, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.responses import JSONResponsefrom src.core.WsManager import WsManager
from src.utils.tcloudLLM import get_example_recommend
from src.utils.session import get_visitor_idrouter = APIRouter(prefix="/api/tcloud", tags=['腾信云大模型接口'])@router.websocket("/ws/{compound_name}")
async def get_exercises_ws(compound_name: str, websocket: WebSocket, background_tasks: BackgroundTasks):await websocket.accept()visitor_id = get_visitor_id()WsManager.add_connector(visitor_id, websocket)# 本来使用BackgroundTask去做后台任务,结果不知道什么原因,调用不起来,然后换成asyncio处理asyncio.create_task(get_example_recommend(visitor_id, compound_name))  # 创建对应问答任务try:while True:data = await websocket.receive_text()await websocket.send_text(f"Message text was: {data}")except WebSocketDisconnect:WsManager.remove_connector(visitor_id)except Exception:WsManager.remove_connector(visitor_id)

如果需要进行多轮问答,提供为多次问答设置相关的 request_id 参数进行消息串联。

http://www.jinmujx.cn/news/117845.html

相关文章:

  • 做网站必须知道的问题全网营销图片
  • 做美食视频网站广告投放代理商加盟
  • 最新新闻热点事件简短河北seo
  • 哪里有手机网站建设windows优化大师官方免费
  • 移动网站开发书籍seo推广多少钱
  • 黑龙江 网站开发淘宝网页版
  • 自媒体135网站新闻软文广告
  • 医院各科室可以独立做网站吗重庆seo整站优化报价
  • php源码搭建网站流程上海网站排名seo公司
  • 折扣网站搭建网络推广平台收费不便宜
  • 虹口专业做网站优化设计六年级上册数学答案
  • 做外贸是不是要有网站网站运营
  • 把网站提交谷歌百度手机关键词排名工具
  • 扬中新闻网站学校招生网络营销方案
  • 公司网页制作免费石家庄seo代理商
  • 可以做动漫的网站有哪些南宁网站建设网站推广
  • ppt做的最好的网站有哪些谷歌推广教程
  • 网站排行查询seo研究
  • 用Axure做的网站原型百度云中国制造网
  • 漳浦县建设局网站sem代运营公司
  • 使用网站模板快速建站吴忠seo
  • 广州网站建设gzqiyi北大青鸟培训机构官网
  • 门户网站建设自查整改今日国际新闻事件
  • 怎么做网站首页关键词互联网媒体广告公司
  • AWS免费套餐做网站可以吗济南seo优化外包
  • 做微网站的公司怎么搭建自己的网站
  • 嵊州门户网站平台推广精准客源
  • 网站建设 上海珍岛石家庄网站seo外包
  • 做灯箱的网站seo教程自学网
  • 网站开发技术路线网络营销方案策划论文