websocketConfig.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. # -*- coding: utf-8 -*-
  2. import urllib
  3. from asgiref.sync import sync_to_async, async_to_sync
  4. from channels.db import database_sync_to_async
  5. from channels.generic.websocket import AsyncJsonWebsocketConsumer, AsyncWebsocketConsumer
  6. import json
  7. from channels.layers import get_channel_layer
  8. from jwt import InvalidSignatureError
  9. from rest_framework.request import Request
  10. from application import settings
  11. send_dict = {}
  12. # 发送消息结构体
  13. def set_message(sender, msg_type, msg, refresh_unread=False):
  14. text = {
  15. 'sender': sender,
  16. 'contentType': msg_type,
  17. 'content': msg,
  18. 'refresh_unread': refresh_unread
  19. }
  20. return text
  21. # 异步获取消息中心的目标用户
  22. @database_sync_to_async
  23. def _get_message_center_instance(message_id):
  24. from dvadmin.system.models import MessageCenter
  25. _MessageCenter = MessageCenter.objects.filter(id=message_id).values_list('target_user', flat=True)
  26. if _MessageCenter:
  27. return _MessageCenter
  28. else:
  29. return []
  30. @database_sync_to_async
  31. def _get_message_unread(user_id):
  32. """获取用户的未读消息数量"""
  33. from dvadmin.system.models import MessageCenterTargetUser
  34. count = MessageCenterTargetUser.objects.filter(users=user_id, is_read=False).count()
  35. return count or 0
  36. def request_data(scope):
  37. query_string = scope.get('query_string', b'').decode('utf-8')
  38. qs = urllib.parse.parse_qs(query_string)
  39. return qs
  40. class DvadminWebSocket(AsyncJsonWebsocketConsumer):
  41. async def connect(self):
  42. try:
  43. import jwt
  44. self.service_uid = self.scope["url_route"]["kwargs"]["service_uid"]
  45. decoded_result = jwt.decode(self.service_uid, settings.SECRET_KEY, algorithms=["HS256"])
  46. if decoded_result:
  47. self.user_id = decoded_result.get('user_id')
  48. self.room_name = "user_" + str(self.user_id)
  49. # 收到连接时候处理,
  50. await self.channel_layer.group_add(
  51. "dvadmin",
  52. self.channel_name
  53. )
  54. await self.channel_layer.group_add(
  55. self.room_name,
  56. self.channel_name
  57. )
  58. await self.accept()
  59. # 主动推送消息
  60. unread_count = await _get_message_unread(self.user_id)
  61. if unread_count == 0:
  62. # 发送连接成功
  63. await self.send_json(set_message('system', 'SYSTEM', '连接成功'))
  64. else:
  65. await self.send_json(
  66. set_message('system', 'SYSTEM', "请查看您的未读消息~",
  67. refresh_unread=True))
  68. except InvalidSignatureError:
  69. await self.disconnect(None)
  70. async def disconnect(self, close_code):
  71. # Leave room group
  72. await self.channel_layer.group_discard(self.room_name, self.channel_name)
  73. await self.channel_layer.group_discard("dvadmin", self.channel_name)
  74. print("连接关闭")
  75. try:
  76. await self.close(close_code)
  77. except Exception:
  78. pass
  79. class MegCenter(DvadminWebSocket):
  80. """
  81. 消息中心
  82. """
  83. async def receive(self, text_data):
  84. # 接受客户端的信息,你处理的函数
  85. text_data_json = json.loads(text_data)
  86. # message_id = text_data_json.get('message_id', None)
  87. # user_list = await _get_message_center_instance(message_id)
  88. # for send_user in user_list:
  89. # await self.channel_layer.group_send(
  90. # "user_" + str(send_user),
  91. # {'type': 'push.message', 'json': text_data_json}
  92. # )
  93. async def push_message(self, event):
  94. """消息发送"""
  95. message = event['json']
  96. await self.send(text_data=json.dumps(message))
  97. def websocket_push(room_name,message):
  98. """
  99. 主动推送
  100. @param room_name: 群组名称
  101. @param message: 消息内容
  102. """
  103. channel_layer = get_channel_layer()
  104. async_to_sync(channel_layer.group_send)(
  105. room_name,
  106. {
  107. "type": "push.message",
  108. "json": message
  109. }
  110. )