lty/qy_lty/docs/features/websocket.md
2026-03-17 13:17:02 +08:00

1197 lines
41 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# WebSocket 实时通信实现指南
本文档详细说明QY LTY Backend系统中WebSocket实时通信的实现架构、使用方法和最佳实践。
## 1. WebSocket架构概览
```
WebSocket Architecture:
┌─────────────────────────────────────────────┐
│ Client Layer │
│ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │
│ │Web App │ │Mobile │ │ Device │ │
│ │ │ │ App │ │ SDK │ │
│ └─────────┘ └─────────┘ └─────────────┘ │
└─────────────┬───────────────────────────────┘
│ WebSocket Connection
│ ws://domain/ws/device/
┌─────────────┴───────────────────────────────┐
│ Connection Layer │
│ ┌─────────────────────────────────────────┐ │
│ │ Django Channels │ │
│ │ ┌─────────────┐ ┌─────────────────┐ │ │
│ │ │ ASGI │ │ URL Routing │ │ │
│ │ │ Server │ │ │ │ │
│ │ │ (Daphne) │ │ WebSocket │ │ │
│ │ └─────────────┘ └─────────────────┘ │ │
│ └─────────────────────────────────────────┘ │
└─────────────┬───────────────────────────────┘
┌─────────────┴───────────────────────────────┐
│ Consumer Layer │
│ ┌─────────────────────────────────────────┐ │
│ │ DeviceConsumer │ │
│ │ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Connect │ │ Disconnect │ │ │
│ │ │ Handler │ │ Handler │ │ │
│ │ └─────────────┘ └─────────────────────┘ │ │
│ │ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Receive │ │ Send │ │ │
│ │ │ Handler │ │ Handler │ │ │
│ │ └─────────────┘ └─────────────────────┘ │ │
│ └─────────────────────────────────────────┘ │
└─────────────┬───────────────────────────────┘
┌─────────────┴───────────────────────────────┐
│ Channel Layer │
│ ┌─────────────────────────────────────────┐ │
│ │ Redis │ │
│ │ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Groups │ │ Messages │ │ │
│ │ │ Management │ │ Queue │ │ │
│ │ └─────────────┘ └─────────────────────┘ │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
```
## 2. 核心组件实现
### 2.1 ASGI配置
```python
# qy_lty/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from device_interaction.routing import websocket_urlpatterns
from device_interaction.auth import TokenAuthMiddleware
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'qy_lty.settings')
application = ProtocolTypeRouter({
# HTTP请求处理
"http": get_asgi_application(),
# WebSocket连接处理
"websocket": TokenAuthMiddleware(
URLRouter(
websocket_urlpatterns
)
),
})
```
### 2.2 WebSocket路由配置
```python
# device_interaction/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
# 基础设备连接
re_path(r'^ws/device/$', consumers.DeviceConsumer.as_asgi()),
# 基于Token的设备连接
re_path(r'^ws/device/token/(?P<token>[^/]+)/?$',
consumers.DeviceConsumer.as_asgi()),
# 基于用户ID的连接已弃用保留兼容性
re_path(r'^ws/device/(?P<user_id>\d+)/?$',
consumers.DeviceConsumer.as_asgi()),
]
```
### 2.3 认证中间件
```python
# device_interaction/auth.py
import logging
from channels.middleware import BaseMiddleware
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from rest_framework.authtoken.models import Token
from urllib.parse import parse_qs
logger = logging.getLogger(__name__)
class TokenAuthMiddleware(BaseMiddleware):
"""WebSocket Token认证中间件"""
async def __call__(self, scope, receive, send):
# 获取查询参数中的token
query_params = parse_qs(scope.get('query_string', b'').decode())
token = query_params.get('token', [None])[0]
# 或从URL路径中获取token
if not token and 'url_route' in scope:
url_kwargs = scope['url_route']['kwargs']
token = url_kwargs.get('token')
# 验证token并设置用户
if token:
user = await self.get_user_from_token(token)
scope['user'] = user
else:
scope['user'] = AnonymousUser()
return await super().__call__(scope, receive, send)
@database_sync_to_async
def get_user_from_token(self, token_key):
"""从Token获取用户"""
try:
token = Token.objects.select_related('user').get(key=token_key)
return token.user
except Token.DoesNotExist:
logger.warning(f"Invalid token: {token_key}")
return AnonymousUser()
```
## 3. Consumer实现
### 3.1 设备Consumer
```python
# device_interaction/consumers.py
import json
import logging
from typing import Dict, Any
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from .models import Device, UserDevice
from .services import MessageService, WeatherService
logger = logging.getLogger(__name__)
class DeviceConsumer(AsyncWebsocketConsumer):
"""设备WebSocket消费者"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = None
self.device = None
self.user_group_name = None
self.device_group_name = None
async def connect(self):
"""处理WebSocket连接"""
try:
# 获取用户信息
self.user = self.scope.get('user')
if isinstance(self.user, AnonymousUser):
logger.warning("Anonymous user attempted to connect")
await self.close(code=4001)
return
# 获取用户设备信息
self.device = await self.get_user_primary_device(self.user.id)
if not self.device:
logger.warning(f"User {self.user.id} has no bound devices")
# 允许连接但记录警告
# await self.close(code=4002)
# return
# 设置组名
self.user_group_name = f"user_{self.user.id}"
if self.device:
self.device_group_name = f"device_{self.device.id}"
# 加入用户组
await self.channel_layer.group_add(
self.user_group_name,
self.channel_name
)
# 加入设备组
if self.device_group_name:
await self.channel_layer.group_add(
self.device_group_name,
self.channel_name
)
# 接受连接
await self.accept()
# 发送连接成功消息
await self.send_message({
'type': 'connection_status',
'status': 'connected',
'user_id': str(self.user.id),
'device_id': self.device.id if self.device else None,
'timestamp': self.get_timestamp()
})
logger.info(f"User {self.user.id} connected to WebSocket")
except Exception as e:
logger.error(f"WebSocket connection error: {str(e)}")
await self.close(code=4000)
async def disconnect(self, close_code):
"""处理WebSocket断开连接"""
try:
# 离开用户组
if self.user_group_name:
await self.channel_layer.group_discard(
self.user_group_name,
self.channel_name
)
# 离开设备组
if self.device_group_name:
await self.channel_layer.group_discard(
self.device_group_name,
self.channel_name
)
if self.user:
logger.info(f"User {self.user.id} disconnected from WebSocket (code: {close_code})")
except Exception as e:
logger.error(f"WebSocket disconnect error: {str(e)}")
async def receive(self, text_data):
"""处理接收到的消息"""
try:
# 解析消息
data = json.loads(text_data)
message_type = data.get('type')
message_content = data.get('message')
logger.info(f"Received message from user {self.user.id}: {message_type}")
# 根据消息类型处理
if message_type == 'chat_message':
await self.handle_chat_message(message_content)
elif message_type == 'weather':
await self.handle_weather_request(message_content)
elif message_type == 'sing':
await self.handle_sing_request(message_content)
elif message_type == 'dance':
await self.handle_dance_request(message_content)
elif message_type == 'ping':
await self.handle_ping()
else:
await self.send_error(f"Unsupported message type: {message_type}")
except json.JSONDecodeError:
await self.send_error("Invalid JSON format")
except Exception as e:
logger.error(f"Message handling error: {str(e)}")
await self.send_error("Internal server error")
async def handle_chat_message(self, message_content):
"""处理聊天消息"""
try:
# 这里可以集成AI聊天服务
response = await self.process_chat_message(message_content)
await self.send_message({
'type': 'chat_response',
'message': response,
'timestamp': self.get_timestamp()
})
except Exception as e:
await self.send_error(f"Chat processing error: {str(e)}")
async def handle_weather_request(self, message_content):
"""处理天气查询请求"""
try:
city_name = message_content.get('city', '北京')
# 调用天气服务
weather_service = WeatherService()
weather_data = await weather_service.get_weather(city_name)
if weather_data['success']:
await self.send_message({
'type': 'weather_response',
'data': weather_data['data'],
'timestamp': self.get_timestamp()
})
else:
await self.send_error(f"Weather query failed: {weather_data['error']}")
except Exception as e:
await self.send_error(f"Weather service error: {str(e)}")
async def handle_sing_request(self, message_content):
"""处理唱歌请求"""
try:
song_info = {
'song': message_content.get('song', ''),
'artist': message_content.get('artist', ''),
'action': 'start_singing'
}
await self.send_message({
'type': 'sing_response',
'data': song_info,
'message': f"开始播放:{song_info['song']} - {song_info['artist']}",
'timestamp': self.get_timestamp()
})
except Exception as e:
await self.send_error(f"Sing request error: {str(e)}")
async def handle_dance_request(self, message_content):
"""处理跳舞请求"""
try:
dance_info = {
'dance': message_content.get('dance', ''),
'style': message_content.get('style', ''),
'duration': message_content.get('duration', '30秒'),
'action': 'start_dancing'
}
await self.send_message({
'type': 'dance_response',
'data': dance_info,
'message': f"开始跳舞:{dance_info['dance']} ({dance_info['style']})",
'timestamp': self.get_timestamp()
})
except Exception as e:
await self.send_error(f"Dance request error: {str(e)}")
async def handle_ping(self):
"""处理心跳包"""
await self.send_message({
'type': 'pong',
'timestamp': self.get_timestamp()
})
# 组消息处理器
async def group_message(self, event):
"""处理组消息"""
message = event['message']
await self.send(text_data=json.dumps(message))
async def send_message(self, message: Dict[str, Any]):
"""发送消息到客户端"""
await self.send(text_data=json.dumps(message))
async def send_error(self, error_message: str):
"""发送错误消息"""
await self.send_message({
'type': 'error',
'message': error_message,
'timestamp': self.get_timestamp()
})
@database_sync_to_async
def get_user_primary_device(self, user_id: int):
"""获取用户的主要设备"""
try:
user_device = UserDevice.objects.select_related('device').filter(
user_id=user_id,
is_primary=True
).first()
return user_device.device if user_device else None
except Exception:
return None
async def process_chat_message(self, message_content: str) -> str:
"""处理聊天消息可以集成AI服务"""
# 这里可以调用AI对话服务
# 目前返回简单回复
return f"收到消息:{message_content}"
@staticmethod
def get_timestamp() -> str:
"""获取当前时间戳"""
from django.utils import timezone
return timezone.now().isoformat()
```
## 4. 消息服务
### 4.1 消息发送服务
```python
# device_interaction/services.py
import json
import logging
from typing import Dict, Any, List, Optional
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from django.contrib.auth.models import User
from .models import UserDevice
logger = logging.getLogger(__name__)
class MessageService:
"""消息发送服务"""
def __init__(self):
self.channel_layer = get_channel_layer()
def send_to_user(self, user_id: int, message: Dict[str, Any]) -> bool:
"""发送消息给指定用户"""
try:
group_name = f"user_{user_id}"
async_to_sync(self.channel_layer.group_send)(
group_name,
{
'type': 'group_message',
'message': message
}
)
logger.info(f"Message sent to user {user_id}")
return True
except Exception as e:
logger.error(f"Failed to send message to user {user_id}: {str(e)}")
return False
def send_to_device(self, device_id: int, message: Dict[str, Any]) -> bool:
"""发送消息给指定设备"""
try:
group_name = f"device_{device_id}"
async_to_sync(self.channel_layer.group_send)(
group_name,
{
'type': 'group_message',
'message': message
}
)
logger.info(f"Message sent to device {device_id}")
return True
except Exception as e:
logger.error(f"Failed to send message to device {device_id}: {str(e)}")
return False
def broadcast_to_all_users(self, message: Dict[str, Any]) -> int:
"""广播消息给所有在线用户"""
try:
# 获取所有在线用户(这里需要维护在线用户列表)
online_users = self.get_online_users()
success_count = 0
for user_id in online_users:
if self.send_to_user(user_id, message):
success_count += 1
logger.info(f"Broadcast message sent to {success_count} users")
return success_count
except Exception as e:
logger.error(f"Broadcast failed: {str(e)}")
return 0
def get_online_users(self) -> List[int]:
"""获取在线用户列表(需要实现在线用户追踪)"""
# 这里可以通过Redis或数据库维护在线用户列表
# 简化实现,返回空列表
return []
class WeatherService:
"""天气服务"""
async def get_weather(self, city: str) -> Dict[str, Any]:
"""获取天气信息"""
try:
# 这里集成真实的天气API
# 示例返回模拟数据
weather_data = {
'city': {'name': city},
'weather': {
'now': {
'temp': '25',
'text': '晴',
'icon': '100'
},
'updateTime': '2023-01-01T12:00:00Z'
}
}
return {
'success': True,
'data': weather_data
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
```
### 4.2 消息队列处理
```python
# device_interaction/message_queue.py
import json
import redis
import logging
from typing import Dict, Any
from django.conf import settings
logger = logging.getLogger(__name__)
class MessageQueue:
"""消息队列处理"""
def __init__(self):
self.redis_client = redis.Redis.from_url(settings.REDIS_LOCATION)
self.queue_name = 'device_messages'
def enqueue_message(self, user_id: int, message: Dict[str, Any]) -> bool:
"""将消息加入队列"""
try:
message_data = {
'user_id': user_id,
'message': message,
'timestamp': self._get_timestamp()
}
self.redis_client.lpush(
self.queue_name,
json.dumps(message_data)
)
logger.info(f"Message enqueued for user {user_id}")
return True
except Exception as e:
logger.error(f"Failed to enqueue message: {str(e)}")
return False
def dequeue_message(self) -> Dict[str, Any]:
"""从队列中取出消息"""
try:
message_json = self.redis_client.brpop(self.queue_name, timeout=1)
if message_json:
return json.loads(message_json[1])
return None
except Exception as e:
logger.error(f"Failed to dequeue message: {str(e)}")
return None
def get_queue_size(self) -> int:
"""获取队列大小"""
return self.redis_client.llen(self.queue_name)
def clear_queue(self) -> bool:
"""清空队列"""
try:
self.redis_client.delete(self.queue_name)
return True
except Exception as e:
logger.error(f"Failed to clear queue: {str(e)}")
return False
@staticmethod
def _get_timestamp() -> str:
from django.utils import timezone
return timezone.now().isoformat()
```
## 5. HTTP API集成
### 5.1 消息发送API
```python
# device_interaction/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from .services import MessageService
class MessageViewSet(ViewSet):
"""消息管理API"""
permission_classes = [IsAuthenticated]
def create(self, request):
"""发送消息"""
try:
message_type = request.data.get('type')
message_content = request.data.get('message')
user_id = request.data.get('user_id')
if not all([message_type, message_content, user_id]):
return Response({
'status': 'error',
'message': '缺少必要参数'
}, status=400)
# 构造消息
message = {
'type': message_type,
'message': message_content,
'timestamp': timezone.now().isoformat()
}
# 发送消息
message_service = MessageService()
success = message_service.send_to_user(int(user_id), message)
if success:
return Response({
'status': 'success',
'message': '消息发送成功',
'data': message
})
else:
return Response({
'status': 'error',
'message': '消息发送失败'
}, status=500)
except Exception as e:
return Response({
'status': 'error',
'message': str(e)
}, status=500)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def send_message_to_user(request, user_id):
"""发送消息给指定用户"""
try:
message_data = {
'type': request.data.get('type', 'system'),
'message': request.data.get('message'),
'sender': request.user.username,
'timestamp': timezone.now().isoformat()
}
message_service = MessageService()
success = message_service.send_to_user(int(user_id), message_data)
return Response({
'status': 'success' if success else 'error',
'message': '消息发送成功' if success else '消息发送失败'
})
except Exception as e:
return Response({
'status': 'error',
'message': str(e)
}, status=500)
```
## 6. 客户端使用示例
### 6.1 JavaScript客户端
```javascript
// WebSocket客户端示例
class DeviceWebSocketClient {
constructor(token, baseUrl = 'ws://localhost:8000') {
this.token = token;
this.baseUrl = baseUrl;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.heartbeatInterval = null;
this.messageHandlers = new Map();
}
connect() {
try {
const wsUrl = `${this.baseUrl}/ws/device/?token=${this.token}`;
this.ws = new WebSocket(wsUrl);
this.ws.onopen = (event) => {
console.log('WebSocket连接已建立');
this.reconnectAttempts = 0;
this.startHeartbeat();
this.onConnect?.(event);
};
this.ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
this.handleMessage(data);
} catch (error) {
console.error('消息解析错误:', error);
}
};
this.ws.onclose = (event) => {
console.log('WebSocket连接已关闭:', event.code, event.reason);
this.stopHeartbeat();
this.onDisconnect?.(event);
if (event.code !== 1000 && this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnect();
}
};
this.ws.onerror = (error) => {
console.error('WebSocket错误:', error);
this.onError?.(error);
};
} catch (error) {
console.error('WebSocket连接失败:', error);
}
}
disconnect() {
if (this.ws) {
this.ws.close(1000, '客户端主动断开连接');
}
}
reconnect() {
this.reconnectAttempts++;
console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
setTimeout(() => {
this.connect();
}, Math.pow(2, this.reconnectAttempts) * 1000); // 指数退避
}
sendMessage(type, message) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
const data = {
type: type,
message: message,
timestamp: new Date().toISOString()
};
this.ws.send(JSON.stringify(data));
return true;
} else {
console.error('WebSocket未连接');
return false;
}
}
// 消息处理
handleMessage(data) {
const handler = this.messageHandlers.get(data.type);
if (handler) {
handler(data);
} else {
console.log('收到消息:', data);
}
}
// 注册消息处理器
onMessage(type, handler) {
this.messageHandlers.set(type, handler);
}
// 心跳机制
startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
this.sendMessage('ping', {});
}, 30000); // 30秒心跳
}
stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
}
// 便捷方法
sendChatMessage(message) {
return this.sendMessage('chat_message', message);
}
requestWeather(city) {
return this.sendMessage('weather', { city: city });
}
requestSing(song, artist) {
return this.sendMessage('sing', { song: song, artist: artist });
}
requestDance(dance, style, duration = '30秒') {
return this.sendMessage('dance', {
dance: dance,
style: style,
duration: duration
});
}
}
// 使用示例
const client = new DeviceWebSocketClient('your-auth-token');
// 设置事件处理
client.onConnect = () => {
console.log('已连接到设备服务');
};
client.onDisconnect = (event) => {
console.log('与设备服务断开连接');
};
// 设置消息处理器
client.onMessage('chat_response', (data) => {
console.log('AI回复:', data.message);
});
client.onMessage('weather_response', (data) => {
console.log('天气信息:', data.data);
});
client.onMessage('connection_status', (data) => {
console.log('连接状态:', data.status);
});
// 连接
client.connect();
// 发送消息示例
setTimeout(() => {
client.sendChatMessage('你好,请介绍一下自己');
client.requestWeather('北京');
client.requestSing('青花瓷', '周杰伦');
}, 1000);
```
### 6.2 Python客户端
```python
# Python WebSocket客户端
import asyncio
import websockets
import json
import logging
logger = logging.getLogger(__name__)
class DeviceWebSocketClient:
def __init__(self, token: str, base_url: str = 'ws://localhost:8000'):
self.token = token
self.base_url = base_url
self.websocket = None
self.message_handlers = {}
self.running = False
async def connect(self):
"""连接到WebSocket服务器"""
try:
uri = f"{self.base_url}/ws/device/?token={self.token}"
self.websocket = await websockets.connect(uri)
self.running = True
logger.info("WebSocket连接已建立")
# 启动消息接收循环
await self.listen_for_messages()
except Exception as e:
logger.error(f"WebSocket连接失败: {str(e)}")
raise
async def disconnect(self):
"""断开WebSocket连接"""
self.running = False
if self.websocket:
await self.websocket.close()
logger.info("WebSocket连接已断开")
async def send_message(self, message_type: str, message_content):
"""发送消息"""
if not self.websocket:
raise ConnectionError("WebSocket未连接")
data = {
'type': message_type,
'message': message_content,
'timestamp': datetime.now().isoformat()
}
await self.websocket.send(json.dumps(data))
async def listen_for_messages(self):
"""监听消息"""
try:
async for message in self.websocket:
if not self.running:
break
try:
data = json.loads(message)
await self.handle_message(data)
except json.JSONDecodeError:
logger.error(f"无效的JSON消息: {message}")
except websockets.exceptions.ConnectionClosed:
logger.info("WebSocket连接已关闭")
except Exception as e:
logger.error(f"消息监听错误: {str(e)}")
async def handle_message(self, data):
"""处理接收到的消息"""
message_type = data.get('type')
handler = self.message_handlers.get(message_type)
if handler:
await handler(data)
else:
logger.info(f"收到消息: {data}")
def register_handler(self, message_type: str, handler):
"""注册消息处理器"""
self.message_handlers[message_type] = handler
# 便捷方法
async def send_chat_message(self, message: str):
"""发送聊天消息"""
await self.send_message('chat_message', message)
async def request_weather(self, city: str):
"""请求天气信息"""
await self.send_message('weather', {'city': city})
async def request_sing(self, song: str, artist: str):
"""请求唱歌"""
await self.send_message('sing', {'song': song, 'artist': artist})
# 使用示例
async def main():
client = DeviceWebSocketClient('your-auth-token')
# 注册消息处理器
async def handle_chat_response(data):
print(f"AI回复: {data['message']}")
async def handle_weather_response(data):
weather = data['data']['weather']['now']
print(f"天气: {weather['text']}, 温度: {weather['temp']}°C")
client.register_handler('chat_response', handle_chat_response)
client.register_handler('weather_response', handle_weather_response)
# 连接并发送消息
await client.connect()
await client.send_chat_message('你好')
await client.request_weather('上海')
# 保持连接
await asyncio.sleep(10)
await client.disconnect()
# 运行示例
if __name__ == '__main__':
asyncio.run(main())
```
## 7. 监控和调试
### 7.1 连接状态监控
```python
# device_interaction/monitoring.py
import logging
from typing import Dict, List
from django.core.cache import cache
from django.utils import timezone
logger = logging.getLogger(__name__)
class WebSocketMonitor:
"""WebSocket连接监控"""
@staticmethod
def record_connection(user_id: int, channel_name: str):
"""记录连接"""
connections = cache.get('ws_connections', {})
connections[str(user_id)] = {
'channel_name': channel_name,
'connected_at': timezone.now().isoformat(),
'last_activity': timezone.now().isoformat()
}
cache.set('ws_connections', connections, timeout=86400) # 24小时
logger.info(f"WebSocket connection recorded for user {user_id}")
@staticmethod
def record_disconnection(user_id: int):
"""记录断开连接"""
connections = cache.get('ws_connections', {})
if str(user_id) in connections:
del connections[str(user_id)]
cache.set('ws_connections', connections, timeout=86400)
logger.info(f"WebSocket disconnection recorded for user {user_id}")
@staticmethod
def update_activity(user_id: int):
"""更新活动时间"""
connections = cache.get('ws_connections', {})
if str(user_id) in connections:
connections[str(user_id)]['last_activity'] = timezone.now().isoformat()
cache.set('ws_connections', connections, timeout=86400)
@staticmethod
def get_online_users() -> List[int]:
"""获取在线用户列表"""
connections = cache.get('ws_connections', {})
return [int(user_id) for user_id in connections.keys()]
@staticmethod
def get_connection_stats() -> Dict:
"""获取连接统计"""
connections = cache.get('ws_connections', {})
return {
'total_connections': len(connections),
'connections': connections
}
```
### 7.2 性能监控
```python
# device_interaction/performance.py
import time
import logging
from functools import wraps
from django.core.cache import cache
logger = logging.getLogger(__name__)
def monitor_performance(operation_name: str):
"""性能监控装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
# 记录性能指标
cache_key = f"ws_perf:{operation_name}"
metrics = cache.get(cache_key, [])
metrics.append({
'duration': duration,
'timestamp': time.time(),
'success': True
})
# 保留最近100次记录
if len(metrics) > 100:
metrics = metrics[-100:]
cache.set(cache_key, metrics, timeout=3600)
if duration > 1.0: # 超过1秒记录警告
logger.warning(f"Slow WebSocket operation: {operation_name} took {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
logger.error(f"WebSocket operation failed: {operation_name} ({duration:.2f}s): {str(e)}")
raise
return wrapper
return decorator
# 使用示例
class DeviceConsumer(AsyncWebsocketConsumer):
@monitor_performance('handle_chat_message')
async def handle_chat_message(self, message_content):
# 处理逻辑
pass
```
## 8. 最佳实践
### 8.1 错误处理
```python
# WebSocket错误处理最佳实践
class DeviceConsumer(AsyncWebsocketConsumer):
async def handle_error(self, error: Exception, context: str = ""):
"""统一错误处理"""
error_message = str(error)
logger.error(f"WebSocket error in {context}: {error_message}")
# 发送错误消息给客户端
await self.send_message({
'type': 'error',
'message': 'Internal server error',
'code': 'INTERNAL_ERROR',
'timestamp': self.get_timestamp()
})
```
### 8.2 消息验证
```python
# 消息验证
from marshmallow import Schema, fields, ValidationError
class MessageSchema(Schema):
type = fields.Str(required=True)
message = fields.Raw(required=True)
timestamp = fields.DateTime(allow_none=True)
class DeviceConsumer(AsyncWebsocketConsumer):
async def receive(self, text_data):
try:
data = json.loads(text_data)
# 验证消息格式
schema = MessageSchema()
validated_data = schema.load(data)
# 处理验证后的消息
await self.process_validated_message(validated_data)
except json.JSONDecodeError:
await self.send_error("Invalid JSON format")
except ValidationError as e:
await self.send_error(f"Message validation error: {e.messages}")
```
### 8.3 安全考虑
```python
# 安全措施
class DeviceConsumer(AsyncWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.message_count = 0
self.last_message_time = time.time()
self.rate_limit_window = 60 # 60秒窗口
self.max_messages_per_window = 100 # 每窗口最多100条消息
async def receive(self, text_data):
# 速率限制
current_time = time.time()
if current_time - self.last_message_time > self.rate_limit_window:
self.message_count = 0
self.last_message_time = current_time
self.message_count += 1
if self.message_count > self.max_messages_per_window:
await self.send_error("Rate limit exceeded")
await self.close(code=4008)
return
# 消息长度限制
if len(text_data) > 10240: # 10KB限制
await self.send_error("Message too large")
return
# 处理消息
await super().receive(text_data)
```
这个WebSocket实现指南提供了完整的实时通信解决方案包括服务端实现、客户端示例、监控和最佳实践。