# WebSocket 实时通信实现指南 本文档详细说明QY LTY Backend系统中WebSocket实时通信的实现架构、使用方法和最佳实践。 ## 1. WebSocket架构概览 ``` WebSocket Architecture: ┌─────────────────────────────────────────────┐ │ Client Layer │ │ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │ │ │Web App │ │Mobile │ │ Device │ │ │ │ │ │ App │ │ SDK │ │ │ └─────────┘ └─────────┘ └─────────────┘ │ └─────────────┬───────────────────────────────┘ │ WebSocket Connection │ ws://domain/ws/device/ ┌─────────────┴───────────────────────────────┐ │ Connection Layer │ │ ┌─────────────────────────────────────────┐ │ │ │ Django Channels │ │ │ │ ┌─────────────┐ ┌─────────────────┐ │ │ │ │ │ ASGI │ │ URL Routing │ │ │ │ │ │ Server │ │ │ │ │ │ │ │ (Daphne) │ │ WebSocket │ │ │ │ │ └─────────────┘ └─────────────────┘ │ │ │ └─────────────────────────────────────────┘ │ └─────────────┬───────────────────────────────┘ │ ┌─────────────┴───────────────────────────────┐ │ Consumer Layer │ │ ┌─────────────────────────────────────────┐ │ │ │ DeviceConsumer │ │ │ │ ┌─────────────┐ ┌─────────────────────┐ │ │ │ │ │ Connect │ │ Disconnect │ │ │ │ │ │ Handler │ │ Handler │ │ │ │ │ └─────────────┘ └─────────────────────┘ │ │ │ │ ┌─────────────┐ ┌─────────────────────┐ │ │ │ │ │ Receive │ │ Send │ │ │ │ │ │ Handler │ │ Handler │ │ │ │ │ └─────────────┘ └─────────────────────┘ │ │ │ └─────────────────────────────────────────┘ │ └─────────────┬───────────────────────────────┘ │ ┌─────────────┴───────────────────────────────┐ │ Channel Layer │ │ ┌─────────────────────────────────────────┐ │ │ │ Redis │ │ │ │ ┌─────────────┐ ┌─────────────────────┐ │ │ │ │ │ Groups │ │ Messages │ │ │ │ │ │ Management │ │ Queue │ │ │ │ │ └─────────────┘ └─────────────────────┘ │ │ │ └─────────────────────────────────────────┘ │ └─────────────────────────────────────────────┘ ``` ## 2. 核心组件实现 ### 2.1 ASGI配置 ```python # qy_lty/asgi.py import os from django.core.asgi import get_asgi_application from channels.routing import ProtocolTypeRouter, URLRouter from channels.auth import AuthMiddlewareStack from device_interaction.routing import websocket_urlpatterns from device_interaction.auth import TokenAuthMiddleware os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'qy_lty.settings') application = ProtocolTypeRouter({ # HTTP请求处理 "http": get_asgi_application(), # WebSocket连接处理 "websocket": TokenAuthMiddleware( URLRouter( websocket_urlpatterns ) ), }) ``` ### 2.2 WebSocket路由配置 ```python # device_interaction/routing.py from django.urls import re_path from . import consumers websocket_urlpatterns = [ # 基础设备连接 re_path(r'^ws/device/$', consumers.DeviceConsumer.as_asgi()), # 基于Token的设备连接 re_path(r'^ws/device/token/(?P[^/]+)/?$', consumers.DeviceConsumer.as_asgi()), # 基于用户ID的连接(已弃用,保留兼容性) re_path(r'^ws/device/(?P\d+)/?$', consumers.DeviceConsumer.as_asgi()), ] ``` ### 2.3 认证中间件 ```python # device_interaction/auth.py import logging from channels.middleware import BaseMiddleware from channels.db import database_sync_to_async from django.contrib.auth.models import AnonymousUser from rest_framework.authtoken.models import Token from urllib.parse import parse_qs logger = logging.getLogger(__name__) class TokenAuthMiddleware(BaseMiddleware): """WebSocket Token认证中间件""" async def __call__(self, scope, receive, send): # 获取查询参数中的token query_params = parse_qs(scope.get('query_string', b'').decode()) token = query_params.get('token', [None])[0] # 或从URL路径中获取token if not token and 'url_route' in scope: url_kwargs = scope['url_route']['kwargs'] token = url_kwargs.get('token') # 验证token并设置用户 if token: user = await self.get_user_from_token(token) scope['user'] = user else: scope['user'] = AnonymousUser() return await super().__call__(scope, receive, send) @database_sync_to_async def get_user_from_token(self, token_key): """从Token获取用户""" try: token = Token.objects.select_related('user').get(key=token_key) return token.user except Token.DoesNotExist: logger.warning(f"Invalid token: {token_key}") return AnonymousUser() ``` ## 3. Consumer实现 ### 3.1 设备Consumer ```python # device_interaction/consumers.py import json import logging from typing import Dict, Any from channels.generic.websocket import AsyncWebsocketConsumer from channels.db import database_sync_to_async from django.contrib.auth.models import AnonymousUser from .models import Device, UserDevice from .services import MessageService, WeatherService logger = logging.getLogger(__name__) class DeviceConsumer(AsyncWebsocketConsumer): """设备WebSocket消费者""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.user = None self.device = None self.user_group_name = None self.device_group_name = None async def connect(self): """处理WebSocket连接""" try: # 获取用户信息 self.user = self.scope.get('user') if isinstance(self.user, AnonymousUser): logger.warning("Anonymous user attempted to connect") await self.close(code=4001) return # 获取用户设备信息 self.device = await self.get_user_primary_device(self.user.id) if not self.device: logger.warning(f"User {self.user.id} has no bound devices") # 允许连接但记录警告 # await self.close(code=4002) # return # 设置组名 self.user_group_name = f"user_{self.user.id}" if self.device: self.device_group_name = f"device_{self.device.id}" # 加入用户组 await self.channel_layer.group_add( self.user_group_name, self.channel_name ) # 加入设备组 if self.device_group_name: await self.channel_layer.group_add( self.device_group_name, self.channel_name ) # 接受连接 await self.accept() # 发送连接成功消息 await self.send_message({ 'type': 'connection_status', 'status': 'connected', 'user_id': str(self.user.id), 'device_id': self.device.id if self.device else None, 'timestamp': self.get_timestamp() }) logger.info(f"User {self.user.id} connected to WebSocket") except Exception as e: logger.error(f"WebSocket connection error: {str(e)}") await self.close(code=4000) async def disconnect(self, close_code): """处理WebSocket断开连接""" try: # 离开用户组 if self.user_group_name: await self.channel_layer.group_discard( self.user_group_name, self.channel_name ) # 离开设备组 if self.device_group_name: await self.channel_layer.group_discard( self.device_group_name, self.channel_name ) if self.user: logger.info(f"User {self.user.id} disconnected from WebSocket (code: {close_code})") except Exception as e: logger.error(f"WebSocket disconnect error: {str(e)}") async def receive(self, text_data): """处理接收到的消息""" try: # 解析消息 data = json.loads(text_data) message_type = data.get('type') message_content = data.get('message') logger.info(f"Received message from user {self.user.id}: {message_type}") # 根据消息类型处理 if message_type == 'chat_message': await self.handle_chat_message(message_content) elif message_type == 'weather': await self.handle_weather_request(message_content) elif message_type == 'sing': await self.handle_sing_request(message_content) elif message_type == 'dance': await self.handle_dance_request(message_content) elif message_type == 'ping': await self.handle_ping() else: await self.send_error(f"Unsupported message type: {message_type}") except json.JSONDecodeError: await self.send_error("Invalid JSON format") except Exception as e: logger.error(f"Message handling error: {str(e)}") await self.send_error("Internal server error") async def handle_chat_message(self, message_content): """处理聊天消息""" try: # 这里可以集成AI聊天服务 response = await self.process_chat_message(message_content) await self.send_message({ 'type': 'chat_response', 'message': response, 'timestamp': self.get_timestamp() }) except Exception as e: await self.send_error(f"Chat processing error: {str(e)}") async def handle_weather_request(self, message_content): """处理天气查询请求""" try: city_name = message_content.get('city', '北京') # 调用天气服务 weather_service = WeatherService() weather_data = await weather_service.get_weather(city_name) if weather_data['success']: await self.send_message({ 'type': 'weather_response', 'data': weather_data['data'], 'timestamp': self.get_timestamp() }) else: await self.send_error(f"Weather query failed: {weather_data['error']}") except Exception as e: await self.send_error(f"Weather service error: {str(e)}") async def handle_sing_request(self, message_content): """处理唱歌请求""" try: song_info = { 'song': message_content.get('song', ''), 'artist': message_content.get('artist', ''), 'action': 'start_singing' } await self.send_message({ 'type': 'sing_response', 'data': song_info, 'message': f"开始播放:{song_info['song']} - {song_info['artist']}", 'timestamp': self.get_timestamp() }) except Exception as e: await self.send_error(f"Sing request error: {str(e)}") async def handle_dance_request(self, message_content): """处理跳舞请求""" try: dance_info = { 'dance': message_content.get('dance', ''), 'style': message_content.get('style', ''), 'duration': message_content.get('duration', '30秒'), 'action': 'start_dancing' } await self.send_message({ 'type': 'dance_response', 'data': dance_info, 'message': f"开始跳舞:{dance_info['dance']} ({dance_info['style']})", 'timestamp': self.get_timestamp() }) except Exception as e: await self.send_error(f"Dance request error: {str(e)}") async def handle_ping(self): """处理心跳包""" await self.send_message({ 'type': 'pong', 'timestamp': self.get_timestamp() }) # 组消息处理器 async def group_message(self, event): """处理组消息""" message = event['message'] await self.send(text_data=json.dumps(message)) async def send_message(self, message: Dict[str, Any]): """发送消息到客户端""" await self.send(text_data=json.dumps(message)) async def send_error(self, error_message: str): """发送错误消息""" await self.send_message({ 'type': 'error', 'message': error_message, 'timestamp': self.get_timestamp() }) @database_sync_to_async def get_user_primary_device(self, user_id: int): """获取用户的主要设备""" try: user_device = UserDevice.objects.select_related('device').filter( user_id=user_id, is_primary=True ).first() return user_device.device if user_device else None except Exception: return None async def process_chat_message(self, message_content: str) -> str: """处理聊天消息(可以集成AI服务)""" # 这里可以调用AI对话服务 # 目前返回简单回复 return f"收到消息:{message_content}" @staticmethod def get_timestamp() -> str: """获取当前时间戳""" from django.utils import timezone return timezone.now().isoformat() ``` ## 4. 消息服务 ### 4.1 消息发送服务 ```python # device_interaction/services.py import json import logging from typing import Dict, Any, List, Optional from channels.layers import get_channel_layer from asgiref.sync import async_to_sync from django.contrib.auth.models import User from .models import UserDevice logger = logging.getLogger(__name__) class MessageService: """消息发送服务""" def __init__(self): self.channel_layer = get_channel_layer() def send_to_user(self, user_id: int, message: Dict[str, Any]) -> bool: """发送消息给指定用户""" try: group_name = f"user_{user_id}" async_to_sync(self.channel_layer.group_send)( group_name, { 'type': 'group_message', 'message': message } ) logger.info(f"Message sent to user {user_id}") return True except Exception as e: logger.error(f"Failed to send message to user {user_id}: {str(e)}") return False def send_to_device(self, device_id: int, message: Dict[str, Any]) -> bool: """发送消息给指定设备""" try: group_name = f"device_{device_id}" async_to_sync(self.channel_layer.group_send)( group_name, { 'type': 'group_message', 'message': message } ) logger.info(f"Message sent to device {device_id}") return True except Exception as e: logger.error(f"Failed to send message to device {device_id}: {str(e)}") return False def broadcast_to_all_users(self, message: Dict[str, Any]) -> int: """广播消息给所有在线用户""" try: # 获取所有在线用户(这里需要维护在线用户列表) online_users = self.get_online_users() success_count = 0 for user_id in online_users: if self.send_to_user(user_id, message): success_count += 1 logger.info(f"Broadcast message sent to {success_count} users") return success_count except Exception as e: logger.error(f"Broadcast failed: {str(e)}") return 0 def get_online_users(self) -> List[int]: """获取在线用户列表(需要实现在线用户追踪)""" # 这里可以通过Redis或数据库维护在线用户列表 # 简化实现,返回空列表 return [] class WeatherService: """天气服务""" async def get_weather(self, city: str) -> Dict[str, Any]: """获取天气信息""" try: # 这里集成真实的天气API # 示例返回模拟数据 weather_data = { 'city': {'name': city}, 'weather': { 'now': { 'temp': '25', 'text': '晴', 'icon': '100' }, 'updateTime': '2023-01-01T12:00:00Z' } } return { 'success': True, 'data': weather_data } except Exception as e: return { 'success': False, 'error': str(e) } ``` ### 4.2 消息队列处理 ```python # device_interaction/message_queue.py import json import redis import logging from typing import Dict, Any from django.conf import settings logger = logging.getLogger(__name__) class MessageQueue: """消息队列处理""" def __init__(self): self.redis_client = redis.Redis.from_url(settings.REDIS_LOCATION) self.queue_name = 'device_messages' def enqueue_message(self, user_id: int, message: Dict[str, Any]) -> bool: """将消息加入队列""" try: message_data = { 'user_id': user_id, 'message': message, 'timestamp': self._get_timestamp() } self.redis_client.lpush( self.queue_name, json.dumps(message_data) ) logger.info(f"Message enqueued for user {user_id}") return True except Exception as e: logger.error(f"Failed to enqueue message: {str(e)}") return False def dequeue_message(self) -> Dict[str, Any]: """从队列中取出消息""" try: message_json = self.redis_client.brpop(self.queue_name, timeout=1) if message_json: return json.loads(message_json[1]) return None except Exception as e: logger.error(f"Failed to dequeue message: {str(e)}") return None def get_queue_size(self) -> int: """获取队列大小""" return self.redis_client.llen(self.queue_name) def clear_queue(self) -> bool: """清空队列""" try: self.redis_client.delete(self.queue_name) return True except Exception as e: logger.error(f"Failed to clear queue: {str(e)}") return False @staticmethod def _get_timestamp() -> str: from django.utils import timezone return timezone.now().isoformat() ``` ## 5. HTTP API集成 ### 5.1 消息发送API ```python # device_interaction/views.py from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.permissions import IsAuthenticated from .services import MessageService class MessageViewSet(ViewSet): """消息管理API""" permission_classes = [IsAuthenticated] def create(self, request): """发送消息""" try: message_type = request.data.get('type') message_content = request.data.get('message') user_id = request.data.get('user_id') if not all([message_type, message_content, user_id]): return Response({ 'status': 'error', 'message': '缺少必要参数' }, status=400) # 构造消息 message = { 'type': message_type, 'message': message_content, 'timestamp': timezone.now().isoformat() } # 发送消息 message_service = MessageService() success = message_service.send_to_user(int(user_id), message) if success: return Response({ 'status': 'success', 'message': '消息发送成功', 'data': message }) else: return Response({ 'status': 'error', 'message': '消息发送失败' }, status=500) except Exception as e: return Response({ 'status': 'error', 'message': str(e) }, status=500) @api_view(['POST']) @permission_classes([IsAuthenticated]) def send_message_to_user(request, user_id): """发送消息给指定用户""" try: message_data = { 'type': request.data.get('type', 'system'), 'message': request.data.get('message'), 'sender': request.user.username, 'timestamp': timezone.now().isoformat() } message_service = MessageService() success = message_service.send_to_user(int(user_id), message_data) return Response({ 'status': 'success' if success else 'error', 'message': '消息发送成功' if success else '消息发送失败' }) except Exception as e: return Response({ 'status': 'error', 'message': str(e) }, status=500) ``` ## 6. 客户端使用示例 ### 6.1 JavaScript客户端 ```javascript // WebSocket客户端示例 class DeviceWebSocketClient { constructor(token, baseUrl = 'ws://localhost:8000') { this.token = token; this.baseUrl = baseUrl; this.ws = null; this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; this.heartbeatInterval = null; this.messageHandlers = new Map(); } connect() { try { const wsUrl = `${this.baseUrl}/ws/device/?token=${this.token}`; this.ws = new WebSocket(wsUrl); this.ws.onopen = (event) => { console.log('WebSocket连接已建立'); this.reconnectAttempts = 0; this.startHeartbeat(); this.onConnect?.(event); }; this.ws.onmessage = (event) => { try { const data = JSON.parse(event.data); this.handleMessage(data); } catch (error) { console.error('消息解析错误:', error); } }; this.ws.onclose = (event) => { console.log('WebSocket连接已关闭:', event.code, event.reason); this.stopHeartbeat(); this.onDisconnect?.(event); if (event.code !== 1000 && this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnect(); } }; this.ws.onerror = (error) => { console.error('WebSocket错误:', error); this.onError?.(error); }; } catch (error) { console.error('WebSocket连接失败:', error); } } disconnect() { if (this.ws) { this.ws.close(1000, '客户端主动断开连接'); } } reconnect() { this.reconnectAttempts++; console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`); setTimeout(() => { this.connect(); }, Math.pow(2, this.reconnectAttempts) * 1000); // 指数退避 } sendMessage(type, message) { if (this.ws && this.ws.readyState === WebSocket.OPEN) { const data = { type: type, message: message, timestamp: new Date().toISOString() }; this.ws.send(JSON.stringify(data)); return true; } else { console.error('WebSocket未连接'); return false; } } // 消息处理 handleMessage(data) { const handler = this.messageHandlers.get(data.type); if (handler) { handler(data); } else { console.log('收到消息:', data); } } // 注册消息处理器 onMessage(type, handler) { this.messageHandlers.set(type, handler); } // 心跳机制 startHeartbeat() { this.heartbeatInterval = setInterval(() => { this.sendMessage('ping', {}); }, 30000); // 30秒心跳 } stopHeartbeat() { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; } } // 便捷方法 sendChatMessage(message) { return this.sendMessage('chat_message', message); } requestWeather(city) { return this.sendMessage('weather', { city: city }); } requestSing(song, artist) { return this.sendMessage('sing', { song: song, artist: artist }); } requestDance(dance, style, duration = '30秒') { return this.sendMessage('dance', { dance: dance, style: style, duration: duration }); } } // 使用示例 const client = new DeviceWebSocketClient('your-auth-token'); // 设置事件处理 client.onConnect = () => { console.log('已连接到设备服务'); }; client.onDisconnect = (event) => { console.log('与设备服务断开连接'); }; // 设置消息处理器 client.onMessage('chat_response', (data) => { console.log('AI回复:', data.message); }); client.onMessage('weather_response', (data) => { console.log('天气信息:', data.data); }); client.onMessage('connection_status', (data) => { console.log('连接状态:', data.status); }); // 连接 client.connect(); // 发送消息示例 setTimeout(() => { client.sendChatMessage('你好,请介绍一下自己'); client.requestWeather('北京'); client.requestSing('青花瓷', '周杰伦'); }, 1000); ``` ### 6.2 Python客户端 ```python # Python WebSocket客户端 import asyncio import websockets import json import logging logger = logging.getLogger(__name__) class DeviceWebSocketClient: def __init__(self, token: str, base_url: str = 'ws://localhost:8000'): self.token = token self.base_url = base_url self.websocket = None self.message_handlers = {} self.running = False async def connect(self): """连接到WebSocket服务器""" try: uri = f"{self.base_url}/ws/device/?token={self.token}" self.websocket = await websockets.connect(uri) self.running = True logger.info("WebSocket连接已建立") # 启动消息接收循环 await self.listen_for_messages() except Exception as e: logger.error(f"WebSocket连接失败: {str(e)}") raise async def disconnect(self): """断开WebSocket连接""" self.running = False if self.websocket: await self.websocket.close() logger.info("WebSocket连接已断开") async def send_message(self, message_type: str, message_content): """发送消息""" if not self.websocket: raise ConnectionError("WebSocket未连接") data = { 'type': message_type, 'message': message_content, 'timestamp': datetime.now().isoformat() } await self.websocket.send(json.dumps(data)) async def listen_for_messages(self): """监听消息""" try: async for message in self.websocket: if not self.running: break try: data = json.loads(message) await self.handle_message(data) except json.JSONDecodeError: logger.error(f"无效的JSON消息: {message}") except websockets.exceptions.ConnectionClosed: logger.info("WebSocket连接已关闭") except Exception as e: logger.error(f"消息监听错误: {str(e)}") async def handle_message(self, data): """处理接收到的消息""" message_type = data.get('type') handler = self.message_handlers.get(message_type) if handler: await handler(data) else: logger.info(f"收到消息: {data}") def register_handler(self, message_type: str, handler): """注册消息处理器""" self.message_handlers[message_type] = handler # 便捷方法 async def send_chat_message(self, message: str): """发送聊天消息""" await self.send_message('chat_message', message) async def request_weather(self, city: str): """请求天气信息""" await self.send_message('weather', {'city': city}) async def request_sing(self, song: str, artist: str): """请求唱歌""" await self.send_message('sing', {'song': song, 'artist': artist}) # 使用示例 async def main(): client = DeviceWebSocketClient('your-auth-token') # 注册消息处理器 async def handle_chat_response(data): print(f"AI回复: {data['message']}") async def handle_weather_response(data): weather = data['data']['weather']['now'] print(f"天气: {weather['text']}, 温度: {weather['temp']}°C") client.register_handler('chat_response', handle_chat_response) client.register_handler('weather_response', handle_weather_response) # 连接并发送消息 await client.connect() await client.send_chat_message('你好') await client.request_weather('上海') # 保持连接 await asyncio.sleep(10) await client.disconnect() # 运行示例 if __name__ == '__main__': asyncio.run(main()) ``` ## 7. 监控和调试 ### 7.1 连接状态监控 ```python # device_interaction/monitoring.py import logging from typing import Dict, List from django.core.cache import cache from django.utils import timezone logger = logging.getLogger(__name__) class WebSocketMonitor: """WebSocket连接监控""" @staticmethod def record_connection(user_id: int, channel_name: str): """记录连接""" connections = cache.get('ws_connections', {}) connections[str(user_id)] = { 'channel_name': channel_name, 'connected_at': timezone.now().isoformat(), 'last_activity': timezone.now().isoformat() } cache.set('ws_connections', connections, timeout=86400) # 24小时 logger.info(f"WebSocket connection recorded for user {user_id}") @staticmethod def record_disconnection(user_id: int): """记录断开连接""" connections = cache.get('ws_connections', {}) if str(user_id) in connections: del connections[str(user_id)] cache.set('ws_connections', connections, timeout=86400) logger.info(f"WebSocket disconnection recorded for user {user_id}") @staticmethod def update_activity(user_id: int): """更新活动时间""" connections = cache.get('ws_connections', {}) if str(user_id) in connections: connections[str(user_id)]['last_activity'] = timezone.now().isoformat() cache.set('ws_connections', connections, timeout=86400) @staticmethod def get_online_users() -> List[int]: """获取在线用户列表""" connections = cache.get('ws_connections', {}) return [int(user_id) for user_id in connections.keys()] @staticmethod def get_connection_stats() -> Dict: """获取连接统计""" connections = cache.get('ws_connections', {}) return { 'total_connections': len(connections), 'connections': connections } ``` ### 7.2 性能监控 ```python # device_interaction/performance.py import time import logging from functools import wraps from django.core.cache import cache logger = logging.getLogger(__name__) def monitor_performance(operation_name: str): """性能监控装饰器""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time # 记录性能指标 cache_key = f"ws_perf:{operation_name}" metrics = cache.get(cache_key, []) metrics.append({ 'duration': duration, 'timestamp': time.time(), 'success': True }) # 保留最近100次记录 if len(metrics) > 100: metrics = metrics[-100:] cache.set(cache_key, metrics, timeout=3600) if duration > 1.0: # 超过1秒记录警告 logger.warning(f"Slow WebSocket operation: {operation_name} took {duration:.2f}s") return result except Exception as e: duration = time.time() - start_time logger.error(f"WebSocket operation failed: {operation_name} ({duration:.2f}s): {str(e)}") raise return wrapper return decorator # 使用示例 class DeviceConsumer(AsyncWebsocketConsumer): @monitor_performance('handle_chat_message') async def handle_chat_message(self, message_content): # 处理逻辑 pass ``` ## 8. 最佳实践 ### 8.1 错误处理 ```python # WebSocket错误处理最佳实践 class DeviceConsumer(AsyncWebsocketConsumer): async def handle_error(self, error: Exception, context: str = ""): """统一错误处理""" error_message = str(error) logger.error(f"WebSocket error in {context}: {error_message}") # 发送错误消息给客户端 await self.send_message({ 'type': 'error', 'message': 'Internal server error', 'code': 'INTERNAL_ERROR', 'timestamp': self.get_timestamp() }) ``` ### 8.2 消息验证 ```python # 消息验证 from marshmallow import Schema, fields, ValidationError class MessageSchema(Schema): type = fields.Str(required=True) message = fields.Raw(required=True) timestamp = fields.DateTime(allow_none=True) class DeviceConsumer(AsyncWebsocketConsumer): async def receive(self, text_data): try: data = json.loads(text_data) # 验证消息格式 schema = MessageSchema() validated_data = schema.load(data) # 处理验证后的消息 await self.process_validated_message(validated_data) except json.JSONDecodeError: await self.send_error("Invalid JSON format") except ValidationError as e: await self.send_error(f"Message validation error: {e.messages}") ``` ### 8.3 安全考虑 ```python # 安全措施 class DeviceConsumer(AsyncWebsocketConsumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.message_count = 0 self.last_message_time = time.time() self.rate_limit_window = 60 # 60秒窗口 self.max_messages_per_window = 100 # 每窗口最多100条消息 async def receive(self, text_data): # 速率限制 current_time = time.time() if current_time - self.last_message_time > self.rate_limit_window: self.message_count = 0 self.last_message_time = current_time self.message_count += 1 if self.message_count > self.max_messages_per_window: await self.send_error("Rate limit exceeded") await self.close(code=4008) return # 消息长度限制 if len(text_data) > 10240: # 10KB限制 await self.send_error("Message too large") return # 处理消息 await super().receive(text_data) ``` 这个WebSocket实现指南提供了完整的实时通信解决方案,包括服务端实现、客户端示例、监控和最佳实践。