41 KiB
41 KiB
WebSocket 实时通信实现指南
本文档详细说明QY LTY Backend系统中WebSocket实时通信的实现架构、使用方法和最佳实践。
1. WebSocket架构概览
WebSocket Architecture:
┌─────────────────────────────────────────────┐
│ Client Layer │
│ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │
│ │Web App │ │Mobile │ │ Device │ │
│ │ │ │ App │ │ SDK │ │
│ └─────────┘ └─────────┘ └─────────────┘ │
└─────────────┬───────────────────────────────┘
│ WebSocket Connection
│ ws://domain/ws/device/
┌─────────────┴───────────────────────────────┐
│ Connection Layer │
│ ┌─────────────────────────────────────────┐ │
│ │ Django Channels │ │
│ │ ┌─────────────┐ ┌─────────────────┐ │ │
│ │ │ ASGI │ │ URL Routing │ │ │
│ │ │ Server │ │ │ │ │
│ │ │ (Daphne) │ │ WebSocket │ │ │
│ │ └─────────────┘ └─────────────────┘ │ │
│ └─────────────────────────────────────────┘ │
└─────────────┬───────────────────────────────┘
│
┌─────────────┴───────────────────────────────┐
│ Consumer Layer │
│ ┌─────────────────────────────────────────┐ │
│ │ DeviceConsumer │ │
│ │ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Connect │ │ Disconnect │ │ │
│ │ │ Handler │ │ Handler │ │ │
│ │ └─────────────┘ └─────────────────────┘ │ │
│ │ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Receive │ │ Send │ │ │
│ │ │ Handler │ │ Handler │ │ │
│ │ └─────────────┘ └─────────────────────┘ │ │
│ └─────────────────────────────────────────┘ │
└─────────────┬───────────────────────────────┘
│
┌─────────────┴───────────────────────────────┐
│ Channel Layer │
│ ┌─────────────────────────────────────────┐ │
│ │ Redis │ │
│ │ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Groups │ │ Messages │ │ │
│ │ │ Management │ │ Queue │ │ │
│ │ └─────────────┘ └─────────────────────┘ │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
2. 核心组件实现
2.1 ASGI配置
# qy_lty/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from device_interaction.routing import websocket_urlpatterns
from device_interaction.auth import TokenAuthMiddleware
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'qy_lty.settings')
application = ProtocolTypeRouter({
# HTTP请求处理
"http": get_asgi_application(),
# WebSocket连接处理
"websocket": TokenAuthMiddleware(
URLRouter(
websocket_urlpatterns
)
),
})
2.2 WebSocket路由配置
# device_interaction/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
# 基础设备连接
re_path(r'^ws/device/$', consumers.DeviceConsumer.as_asgi()),
# 基于Token的设备连接
re_path(r'^ws/device/token/(?P<token>[^/]+)/?$',
consumers.DeviceConsumer.as_asgi()),
# 基于用户ID的连接(已弃用,保留兼容性)
re_path(r'^ws/device/(?P<user_id>\d+)/?$',
consumers.DeviceConsumer.as_asgi()),
]
2.3 认证中间件
# device_interaction/auth.py
import logging
from channels.middleware import BaseMiddleware
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from rest_framework.authtoken.models import Token
from urllib.parse import parse_qs
logger = logging.getLogger(__name__)
class TokenAuthMiddleware(BaseMiddleware):
"""WebSocket Token认证中间件"""
async def __call__(self, scope, receive, send):
# 获取查询参数中的token
query_params = parse_qs(scope.get('query_string', b'').decode())
token = query_params.get('token', [None])[0]
# 或从URL路径中获取token
if not token and 'url_route' in scope:
url_kwargs = scope['url_route']['kwargs']
token = url_kwargs.get('token')
# 验证token并设置用户
if token:
user = await self.get_user_from_token(token)
scope['user'] = user
else:
scope['user'] = AnonymousUser()
return await super().__call__(scope, receive, send)
@database_sync_to_async
def get_user_from_token(self, token_key):
"""从Token获取用户"""
try:
token = Token.objects.select_related('user').get(key=token_key)
return token.user
except Token.DoesNotExist:
logger.warning(f"Invalid token: {token_key}")
return AnonymousUser()
3. Consumer实现
3.1 设备Consumer
# device_interaction/consumers.py
import json
import logging
from typing import Dict, Any
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from .models import Device, UserDevice
from .services import MessageService, WeatherService
logger = logging.getLogger(__name__)
class DeviceConsumer(AsyncWebsocketConsumer):
"""设备WebSocket消费者"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = None
self.device = None
self.user_group_name = None
self.device_group_name = None
async def connect(self):
"""处理WebSocket连接"""
try:
# 获取用户信息
self.user = self.scope.get('user')
if isinstance(self.user, AnonymousUser):
logger.warning("Anonymous user attempted to connect")
await self.close(code=4001)
return
# 获取用户设备信息
self.device = await self.get_user_primary_device(self.user.id)
if not self.device:
logger.warning(f"User {self.user.id} has no bound devices")
# 允许连接但记录警告
# await self.close(code=4002)
# return
# 设置组名
self.user_group_name = f"user_{self.user.id}"
if self.device:
self.device_group_name = f"device_{self.device.id}"
# 加入用户组
await self.channel_layer.group_add(
self.user_group_name,
self.channel_name
)
# 加入设备组
if self.device_group_name:
await self.channel_layer.group_add(
self.device_group_name,
self.channel_name
)
# 接受连接
await self.accept()
# 发送连接成功消息
await self.send_message({
'type': 'connection_status',
'status': 'connected',
'user_id': str(self.user.id),
'device_id': self.device.id if self.device else None,
'timestamp': self.get_timestamp()
})
logger.info(f"User {self.user.id} connected to WebSocket")
except Exception as e:
logger.error(f"WebSocket connection error: {str(e)}")
await self.close(code=4000)
async def disconnect(self, close_code):
"""处理WebSocket断开连接"""
try:
# 离开用户组
if self.user_group_name:
await self.channel_layer.group_discard(
self.user_group_name,
self.channel_name
)
# 离开设备组
if self.device_group_name:
await self.channel_layer.group_discard(
self.device_group_name,
self.channel_name
)
if self.user:
logger.info(f"User {self.user.id} disconnected from WebSocket (code: {close_code})")
except Exception as e:
logger.error(f"WebSocket disconnect error: {str(e)}")
async def receive(self, text_data):
"""处理接收到的消息"""
try:
# 解析消息
data = json.loads(text_data)
message_type = data.get('type')
message_content = data.get('message')
logger.info(f"Received message from user {self.user.id}: {message_type}")
# 根据消息类型处理
if message_type == 'chat_message':
await self.handle_chat_message(message_content)
elif message_type == 'weather':
await self.handle_weather_request(message_content)
elif message_type == 'sing':
await self.handle_sing_request(message_content)
elif message_type == 'dance':
await self.handle_dance_request(message_content)
elif message_type == 'ping':
await self.handle_ping()
else:
await self.send_error(f"Unsupported message type: {message_type}")
except json.JSONDecodeError:
await self.send_error("Invalid JSON format")
except Exception as e:
logger.error(f"Message handling error: {str(e)}")
await self.send_error("Internal server error")
async def handle_chat_message(self, message_content):
"""处理聊天消息"""
try:
# 这里可以集成AI聊天服务
response = await self.process_chat_message(message_content)
await self.send_message({
'type': 'chat_response',
'message': response,
'timestamp': self.get_timestamp()
})
except Exception as e:
await self.send_error(f"Chat processing error: {str(e)}")
async def handle_weather_request(self, message_content):
"""处理天气查询请求"""
try:
city_name = message_content.get('city', '北京')
# 调用天气服务
weather_service = WeatherService()
weather_data = await weather_service.get_weather(city_name)
if weather_data['success']:
await self.send_message({
'type': 'weather_response',
'data': weather_data['data'],
'timestamp': self.get_timestamp()
})
else:
await self.send_error(f"Weather query failed: {weather_data['error']}")
except Exception as e:
await self.send_error(f"Weather service error: {str(e)}")
async def handle_sing_request(self, message_content):
"""处理唱歌请求"""
try:
song_info = {
'song': message_content.get('song', ''),
'artist': message_content.get('artist', ''),
'action': 'start_singing'
}
await self.send_message({
'type': 'sing_response',
'data': song_info,
'message': f"开始播放:{song_info['song']} - {song_info['artist']}",
'timestamp': self.get_timestamp()
})
except Exception as e:
await self.send_error(f"Sing request error: {str(e)}")
async def handle_dance_request(self, message_content):
"""处理跳舞请求"""
try:
dance_info = {
'dance': message_content.get('dance', ''),
'style': message_content.get('style', ''),
'duration': message_content.get('duration', '30秒'),
'action': 'start_dancing'
}
await self.send_message({
'type': 'dance_response',
'data': dance_info,
'message': f"开始跳舞:{dance_info['dance']} ({dance_info['style']})",
'timestamp': self.get_timestamp()
})
except Exception as e:
await self.send_error(f"Dance request error: {str(e)}")
async def handle_ping(self):
"""处理心跳包"""
await self.send_message({
'type': 'pong',
'timestamp': self.get_timestamp()
})
# 组消息处理器
async def group_message(self, event):
"""处理组消息"""
message = event['message']
await self.send(text_data=json.dumps(message))
async def send_message(self, message: Dict[str, Any]):
"""发送消息到客户端"""
await self.send(text_data=json.dumps(message))
async def send_error(self, error_message: str):
"""发送错误消息"""
await self.send_message({
'type': 'error',
'message': error_message,
'timestamp': self.get_timestamp()
})
@database_sync_to_async
def get_user_primary_device(self, user_id: int):
"""获取用户的主要设备"""
try:
user_device = UserDevice.objects.select_related('device').filter(
user_id=user_id,
is_primary=True
).first()
return user_device.device if user_device else None
except Exception:
return None
async def process_chat_message(self, message_content: str) -> str:
"""处理聊天消息(可以集成AI服务)"""
# 这里可以调用AI对话服务
# 目前返回简单回复
return f"收到消息:{message_content}"
@staticmethod
def get_timestamp() -> str:
"""获取当前时间戳"""
from django.utils import timezone
return timezone.now().isoformat()
4. 消息服务
4.1 消息发送服务
# device_interaction/services.py
import json
import logging
from typing import Dict, Any, List, Optional
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from django.contrib.auth.models import User
from .models import UserDevice
logger = logging.getLogger(__name__)
class MessageService:
"""消息发送服务"""
def __init__(self):
self.channel_layer = get_channel_layer()
def send_to_user(self, user_id: int, message: Dict[str, Any]) -> bool:
"""发送消息给指定用户"""
try:
group_name = f"user_{user_id}"
async_to_sync(self.channel_layer.group_send)(
group_name,
{
'type': 'group_message',
'message': message
}
)
logger.info(f"Message sent to user {user_id}")
return True
except Exception as e:
logger.error(f"Failed to send message to user {user_id}: {str(e)}")
return False
def send_to_device(self, device_id: int, message: Dict[str, Any]) -> bool:
"""发送消息给指定设备"""
try:
group_name = f"device_{device_id}"
async_to_sync(self.channel_layer.group_send)(
group_name,
{
'type': 'group_message',
'message': message
}
)
logger.info(f"Message sent to device {device_id}")
return True
except Exception as e:
logger.error(f"Failed to send message to device {device_id}: {str(e)}")
return False
def broadcast_to_all_users(self, message: Dict[str, Any]) -> int:
"""广播消息给所有在线用户"""
try:
# 获取所有在线用户(这里需要维护在线用户列表)
online_users = self.get_online_users()
success_count = 0
for user_id in online_users:
if self.send_to_user(user_id, message):
success_count += 1
logger.info(f"Broadcast message sent to {success_count} users")
return success_count
except Exception as e:
logger.error(f"Broadcast failed: {str(e)}")
return 0
def get_online_users(self) -> List[int]:
"""获取在线用户列表(需要实现在线用户追踪)"""
# 这里可以通过Redis或数据库维护在线用户列表
# 简化实现,返回空列表
return []
class WeatherService:
"""天气服务"""
async def get_weather(self, city: str) -> Dict[str, Any]:
"""获取天气信息"""
try:
# 这里集成真实的天气API
# 示例返回模拟数据
weather_data = {
'city': {'name': city},
'weather': {
'now': {
'temp': '25',
'text': '晴',
'icon': '100'
},
'updateTime': '2023-01-01T12:00:00Z'
}
}
return {
'success': True,
'data': weather_data
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
4.2 消息队列处理
# device_interaction/message_queue.py
import json
import redis
import logging
from typing import Dict, Any
from django.conf import settings
logger = logging.getLogger(__name__)
class MessageQueue:
"""消息队列处理"""
def __init__(self):
self.redis_client = redis.Redis.from_url(settings.REDIS_LOCATION)
self.queue_name = 'device_messages'
def enqueue_message(self, user_id: int, message: Dict[str, Any]) -> bool:
"""将消息加入队列"""
try:
message_data = {
'user_id': user_id,
'message': message,
'timestamp': self._get_timestamp()
}
self.redis_client.lpush(
self.queue_name,
json.dumps(message_data)
)
logger.info(f"Message enqueued for user {user_id}")
return True
except Exception as e:
logger.error(f"Failed to enqueue message: {str(e)}")
return False
def dequeue_message(self) -> Dict[str, Any]:
"""从队列中取出消息"""
try:
message_json = self.redis_client.brpop(self.queue_name, timeout=1)
if message_json:
return json.loads(message_json[1])
return None
except Exception as e:
logger.error(f"Failed to dequeue message: {str(e)}")
return None
def get_queue_size(self) -> int:
"""获取队列大小"""
return self.redis_client.llen(self.queue_name)
def clear_queue(self) -> bool:
"""清空队列"""
try:
self.redis_client.delete(self.queue_name)
return True
except Exception as e:
logger.error(f"Failed to clear queue: {str(e)}")
return False
@staticmethod
def _get_timestamp() -> str:
from django.utils import timezone
return timezone.now().isoformat()
5. HTTP API集成
5.1 消息发送API
# device_interaction/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from .services import MessageService
class MessageViewSet(ViewSet):
"""消息管理API"""
permission_classes = [IsAuthenticated]
def create(self, request):
"""发送消息"""
try:
message_type = request.data.get('type')
message_content = request.data.get('message')
user_id = request.data.get('user_id')
if not all([message_type, message_content, user_id]):
return Response({
'status': 'error',
'message': '缺少必要参数'
}, status=400)
# 构造消息
message = {
'type': message_type,
'message': message_content,
'timestamp': timezone.now().isoformat()
}
# 发送消息
message_service = MessageService()
success = message_service.send_to_user(int(user_id), message)
if success:
return Response({
'status': 'success',
'message': '消息发送成功',
'data': message
})
else:
return Response({
'status': 'error',
'message': '消息发送失败'
}, status=500)
except Exception as e:
return Response({
'status': 'error',
'message': str(e)
}, status=500)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def send_message_to_user(request, user_id):
"""发送消息给指定用户"""
try:
message_data = {
'type': request.data.get('type', 'system'),
'message': request.data.get('message'),
'sender': request.user.username,
'timestamp': timezone.now().isoformat()
}
message_service = MessageService()
success = message_service.send_to_user(int(user_id), message_data)
return Response({
'status': 'success' if success else 'error',
'message': '消息发送成功' if success else '消息发送失败'
})
except Exception as e:
return Response({
'status': 'error',
'message': str(e)
}, status=500)
6. 客户端使用示例
6.1 JavaScript客户端
// WebSocket客户端示例
class DeviceWebSocketClient {
constructor(token, baseUrl = 'ws://localhost:8000') {
this.token = token;
this.baseUrl = baseUrl;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.heartbeatInterval = null;
this.messageHandlers = new Map();
}
connect() {
try {
const wsUrl = `${this.baseUrl}/ws/device/?token=${this.token}`;
this.ws = new WebSocket(wsUrl);
this.ws.onopen = (event) => {
console.log('WebSocket连接已建立');
this.reconnectAttempts = 0;
this.startHeartbeat();
this.onConnect?.(event);
};
this.ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
this.handleMessage(data);
} catch (error) {
console.error('消息解析错误:', error);
}
};
this.ws.onclose = (event) => {
console.log('WebSocket连接已关闭:', event.code, event.reason);
this.stopHeartbeat();
this.onDisconnect?.(event);
if (event.code !== 1000 && this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnect();
}
};
this.ws.onerror = (error) => {
console.error('WebSocket错误:', error);
this.onError?.(error);
};
} catch (error) {
console.error('WebSocket连接失败:', error);
}
}
disconnect() {
if (this.ws) {
this.ws.close(1000, '客户端主动断开连接');
}
}
reconnect() {
this.reconnectAttempts++;
console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
setTimeout(() => {
this.connect();
}, Math.pow(2, this.reconnectAttempts) * 1000); // 指数退避
}
sendMessage(type, message) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
const data = {
type: type,
message: message,
timestamp: new Date().toISOString()
};
this.ws.send(JSON.stringify(data));
return true;
} else {
console.error('WebSocket未连接');
return false;
}
}
// 消息处理
handleMessage(data) {
const handler = this.messageHandlers.get(data.type);
if (handler) {
handler(data);
} else {
console.log('收到消息:', data);
}
}
// 注册消息处理器
onMessage(type, handler) {
this.messageHandlers.set(type, handler);
}
// 心跳机制
startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
this.sendMessage('ping', {});
}, 30000); // 30秒心跳
}
stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
}
// 便捷方法
sendChatMessage(message) {
return this.sendMessage('chat_message', message);
}
requestWeather(city) {
return this.sendMessage('weather', { city: city });
}
requestSing(song, artist) {
return this.sendMessage('sing', { song: song, artist: artist });
}
requestDance(dance, style, duration = '30秒') {
return this.sendMessage('dance', {
dance: dance,
style: style,
duration: duration
});
}
}
// 使用示例
const client = new DeviceWebSocketClient('your-auth-token');
// 设置事件处理
client.onConnect = () => {
console.log('已连接到设备服务');
};
client.onDisconnect = (event) => {
console.log('与设备服务断开连接');
};
// 设置消息处理器
client.onMessage('chat_response', (data) => {
console.log('AI回复:', data.message);
});
client.onMessage('weather_response', (data) => {
console.log('天气信息:', data.data);
});
client.onMessage('connection_status', (data) => {
console.log('连接状态:', data.status);
});
// 连接
client.connect();
// 发送消息示例
setTimeout(() => {
client.sendChatMessage('你好,请介绍一下自己');
client.requestWeather('北京');
client.requestSing('青花瓷', '周杰伦');
}, 1000);
6.2 Python客户端
# Python WebSocket客户端
import asyncio
import websockets
import json
import logging
logger = logging.getLogger(__name__)
class DeviceWebSocketClient:
def __init__(self, token: str, base_url: str = 'ws://localhost:8000'):
self.token = token
self.base_url = base_url
self.websocket = None
self.message_handlers = {}
self.running = False
async def connect(self):
"""连接到WebSocket服务器"""
try:
uri = f"{self.base_url}/ws/device/?token={self.token}"
self.websocket = await websockets.connect(uri)
self.running = True
logger.info("WebSocket连接已建立")
# 启动消息接收循环
await self.listen_for_messages()
except Exception as e:
logger.error(f"WebSocket连接失败: {str(e)}")
raise
async def disconnect(self):
"""断开WebSocket连接"""
self.running = False
if self.websocket:
await self.websocket.close()
logger.info("WebSocket连接已断开")
async def send_message(self, message_type: str, message_content):
"""发送消息"""
if not self.websocket:
raise ConnectionError("WebSocket未连接")
data = {
'type': message_type,
'message': message_content,
'timestamp': datetime.now().isoformat()
}
await self.websocket.send(json.dumps(data))
async def listen_for_messages(self):
"""监听消息"""
try:
async for message in self.websocket:
if not self.running:
break
try:
data = json.loads(message)
await self.handle_message(data)
except json.JSONDecodeError:
logger.error(f"无效的JSON消息: {message}")
except websockets.exceptions.ConnectionClosed:
logger.info("WebSocket连接已关闭")
except Exception as e:
logger.error(f"消息监听错误: {str(e)}")
async def handle_message(self, data):
"""处理接收到的消息"""
message_type = data.get('type')
handler = self.message_handlers.get(message_type)
if handler:
await handler(data)
else:
logger.info(f"收到消息: {data}")
def register_handler(self, message_type: str, handler):
"""注册消息处理器"""
self.message_handlers[message_type] = handler
# 便捷方法
async def send_chat_message(self, message: str):
"""发送聊天消息"""
await self.send_message('chat_message', message)
async def request_weather(self, city: str):
"""请求天气信息"""
await self.send_message('weather', {'city': city})
async def request_sing(self, song: str, artist: str):
"""请求唱歌"""
await self.send_message('sing', {'song': song, 'artist': artist})
# 使用示例
async def main():
client = DeviceWebSocketClient('your-auth-token')
# 注册消息处理器
async def handle_chat_response(data):
print(f"AI回复: {data['message']}")
async def handle_weather_response(data):
weather = data['data']['weather']['now']
print(f"天气: {weather['text']}, 温度: {weather['temp']}°C")
client.register_handler('chat_response', handle_chat_response)
client.register_handler('weather_response', handle_weather_response)
# 连接并发送消息
await client.connect()
await client.send_chat_message('你好')
await client.request_weather('上海')
# 保持连接
await asyncio.sleep(10)
await client.disconnect()
# 运行示例
if __name__ == '__main__':
asyncio.run(main())
7. 监控和调试
7.1 连接状态监控
# device_interaction/monitoring.py
import logging
from typing import Dict, List
from django.core.cache import cache
from django.utils import timezone
logger = logging.getLogger(__name__)
class WebSocketMonitor:
"""WebSocket连接监控"""
@staticmethod
def record_connection(user_id: int, channel_name: str):
"""记录连接"""
connections = cache.get('ws_connections', {})
connections[str(user_id)] = {
'channel_name': channel_name,
'connected_at': timezone.now().isoformat(),
'last_activity': timezone.now().isoformat()
}
cache.set('ws_connections', connections, timeout=86400) # 24小时
logger.info(f"WebSocket connection recorded for user {user_id}")
@staticmethod
def record_disconnection(user_id: int):
"""记录断开连接"""
connections = cache.get('ws_connections', {})
if str(user_id) in connections:
del connections[str(user_id)]
cache.set('ws_connections', connections, timeout=86400)
logger.info(f"WebSocket disconnection recorded for user {user_id}")
@staticmethod
def update_activity(user_id: int):
"""更新活动时间"""
connections = cache.get('ws_connections', {})
if str(user_id) in connections:
connections[str(user_id)]['last_activity'] = timezone.now().isoformat()
cache.set('ws_connections', connections, timeout=86400)
@staticmethod
def get_online_users() -> List[int]:
"""获取在线用户列表"""
connections = cache.get('ws_connections', {})
return [int(user_id) for user_id in connections.keys()]
@staticmethod
def get_connection_stats() -> Dict:
"""获取连接统计"""
connections = cache.get('ws_connections', {})
return {
'total_connections': len(connections),
'connections': connections
}
7.2 性能监控
# device_interaction/performance.py
import time
import logging
from functools import wraps
from django.core.cache import cache
logger = logging.getLogger(__name__)
def monitor_performance(operation_name: str):
"""性能监控装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
# 记录性能指标
cache_key = f"ws_perf:{operation_name}"
metrics = cache.get(cache_key, [])
metrics.append({
'duration': duration,
'timestamp': time.time(),
'success': True
})
# 保留最近100次记录
if len(metrics) > 100:
metrics = metrics[-100:]
cache.set(cache_key, metrics, timeout=3600)
if duration > 1.0: # 超过1秒记录警告
logger.warning(f"Slow WebSocket operation: {operation_name} took {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
logger.error(f"WebSocket operation failed: {operation_name} ({duration:.2f}s): {str(e)}")
raise
return wrapper
return decorator
# 使用示例
class DeviceConsumer(AsyncWebsocketConsumer):
@monitor_performance('handle_chat_message')
async def handle_chat_message(self, message_content):
# 处理逻辑
pass
8. 最佳实践
8.1 错误处理
# WebSocket错误处理最佳实践
class DeviceConsumer(AsyncWebsocketConsumer):
async def handle_error(self, error: Exception, context: str = ""):
"""统一错误处理"""
error_message = str(error)
logger.error(f"WebSocket error in {context}: {error_message}")
# 发送错误消息给客户端
await self.send_message({
'type': 'error',
'message': 'Internal server error',
'code': 'INTERNAL_ERROR',
'timestamp': self.get_timestamp()
})
8.2 消息验证
# 消息验证
from marshmallow import Schema, fields, ValidationError
class MessageSchema(Schema):
type = fields.Str(required=True)
message = fields.Raw(required=True)
timestamp = fields.DateTime(allow_none=True)
class DeviceConsumer(AsyncWebsocketConsumer):
async def receive(self, text_data):
try:
data = json.loads(text_data)
# 验证消息格式
schema = MessageSchema()
validated_data = schema.load(data)
# 处理验证后的消息
await self.process_validated_message(validated_data)
except json.JSONDecodeError:
await self.send_error("Invalid JSON format")
except ValidationError as e:
await self.send_error(f"Message validation error: {e.messages}")
8.3 安全考虑
# 安全措施
class DeviceConsumer(AsyncWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.message_count = 0
self.last_message_time = time.time()
self.rate_limit_window = 60 # 60秒窗口
self.max_messages_per_window = 100 # 每窗口最多100条消息
async def receive(self, text_data):
# 速率限制
current_time = time.time()
if current_time - self.last_message_time > self.rate_limit_window:
self.message_count = 0
self.last_message_time = current_time
self.message_count += 1
if self.message_count > self.max_messages_per_window:
await self.send_error("Rate limit exceeded")
await self.close(code=4008)
return
# 消息长度限制
if len(text_data) > 10240: # 10KB限制
await self.send_error("Message too large")
return
# 处理消息
await super().receive(text_data)
这个WebSocket实现指南提供了完整的实时通信解决方案,包括服务端实现、客户端示例、监控和最佳实践。