lty/qy_lty/docs/features/websocket.md
2026-03-17 13:17:02 +08:00

41 KiB
Raw Blame History

WebSocket 实时通信实现指南

本文档详细说明QY LTY Backend系统中WebSocket实时通信的实现架构、使用方法和最佳实践。

1. WebSocket架构概览

WebSocket Architecture:
┌─────────────────────────────────────────────┐
│              Client Layer                   │
│  ┌─────────┐  ┌─────────┐  ┌─────────────┐  │
│  │Web App  │  │Mobile   │  │   Device    │  │
│  │         │  │  App    │  │    SDK      │  │
│  └─────────┘  └─────────┘  └─────────────┘  │
└─────────────┬───────────────────────────────┘
              │ WebSocket Connection
              │ ws://domain/ws/device/
┌─────────────┴───────────────────────────────┐
│            Connection Layer                 │
│  ┌─────────────────────────────────────────┐ │
│  │        Django Channels               │ │
│  │  ┌─────────────┐  ┌─────────────────┐ │ │
│  │  │   ASGI      │  │   URL Routing   │ │ │
│  │  │  Server     │  │                 │ │ │
│  │  │ (Daphne)    │  │  WebSocket      │ │ │
│  │  └─────────────┘  └─────────────────┘ │ │
│  └─────────────────────────────────────────┘ │
└─────────────┬───────────────────────────────┘
              │
┌─────────────┴───────────────────────────────┐
│           Consumer Layer                    │
│  ┌─────────────────────────────────────────┐ │
│  │        DeviceConsumer                   │ │
│  │  ┌─────────────┐  ┌─────────────────────┐ │ │
│  │  │   Connect   │  │     Disconnect      │ │ │
│  │  │   Handler   │  │      Handler        │ │ │
│  │  └─────────────┘  └─────────────────────┘ │ │
│  │  ┌─────────────┐  ┌─────────────────────┐ │ │
│  │  │   Receive   │  │       Send          │ │ │
│  │  │   Handler   │  │     Handler         │ │ │
│  │  └─────────────┘  └─────────────────────┘ │ │
│  └─────────────────────────────────────────┘ │
└─────────────┬───────────────────────────────┘
              │
┌─────────────┴───────────────────────────────┐
│           Channel Layer                     │
│  ┌─────────────────────────────────────────┐ │
│  │            Redis                        │ │
│  │  ┌─────────────┐  ┌─────────────────────┐ │ │
│  │  │   Groups    │  │     Messages        │ │ │
│  │  │ Management  │  │      Queue          │ │ │
│  │  └─────────────┘  └─────────────────────┘ │ │
│  └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────┘

2. 核心组件实现

2.1 ASGI配置

# qy_lty/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from device_interaction.routing import websocket_urlpatterns
from device_interaction.auth import TokenAuthMiddleware

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'qy_lty.settings')

application = ProtocolTypeRouter({
    # HTTP请求处理
    "http": get_asgi_application(),
    
    # WebSocket连接处理
    "websocket": TokenAuthMiddleware(
        URLRouter(
            websocket_urlpatterns
        )
    ),
})

2.2 WebSocket路由配置

# device_interaction/routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    # 基础设备连接
    re_path(r'^ws/device/$', consumers.DeviceConsumer.as_asgi()),
    
    # 基于Token的设备连接
    re_path(r'^ws/device/token/(?P<token>[^/]+)/?$', 
            consumers.DeviceConsumer.as_asgi()),
    
    # 基于用户ID的连接已弃用保留兼容性
    re_path(r'^ws/device/(?P<user_id>\d+)/?$', 
            consumers.DeviceConsumer.as_asgi()),
]

2.3 认证中间件

# device_interaction/auth.py
import logging
from channels.middleware import BaseMiddleware
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from rest_framework.authtoken.models import Token
from urllib.parse import parse_qs

logger = logging.getLogger(__name__)

class TokenAuthMiddleware(BaseMiddleware):
    """WebSocket Token认证中间件"""
    
    async def __call__(self, scope, receive, send):
        # 获取查询参数中的token
        query_params = parse_qs(scope.get('query_string', b'').decode())
        token = query_params.get('token', [None])[0]
        
        # 或从URL路径中获取token
        if not token and 'url_route' in scope:
            url_kwargs = scope['url_route']['kwargs']
            token = url_kwargs.get('token')
        
        # 验证token并设置用户
        if token:
            user = await self.get_user_from_token(token)
            scope['user'] = user
        else:
            scope['user'] = AnonymousUser()
        
        return await super().__call__(scope, receive, send)
    
    @database_sync_to_async
    def get_user_from_token(self, token_key):
        """从Token获取用户"""
        try:
            token = Token.objects.select_related('user').get(key=token_key)
            return token.user
        except Token.DoesNotExist:
            logger.warning(f"Invalid token: {token_key}")
            return AnonymousUser()

3. Consumer实现

3.1 设备Consumer

# device_interaction/consumers.py
import json
import logging
from typing import Dict, Any
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from .models import Device, UserDevice
from .services import MessageService, WeatherService

logger = logging.getLogger(__name__)

class DeviceConsumer(AsyncWebsocketConsumer):
    """设备WebSocket消费者"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.user = None
        self.device = None
        self.user_group_name = None
        self.device_group_name = None
    
    async def connect(self):
        """处理WebSocket连接"""
        try:
            # 获取用户信息
            self.user = self.scope.get('user')
            
            if isinstance(self.user, AnonymousUser):
                logger.warning("Anonymous user attempted to connect")
                await self.close(code=4001)
                return
            
            # 获取用户设备信息
            self.device = await self.get_user_primary_device(self.user.id)
            
            if not self.device:
                logger.warning(f"User {self.user.id} has no bound devices")
                # 允许连接但记录警告
                # await self.close(code=4002)
                # return
            
            # 设置组名
            self.user_group_name = f"user_{self.user.id}"
            if self.device:
                self.device_group_name = f"device_{self.device.id}"
            
            # 加入用户组
            await self.channel_layer.group_add(
                self.user_group_name,
                self.channel_name
            )
            
            # 加入设备组
            if self.device_group_name:
                await self.channel_layer.group_add(
                    self.device_group_name,
                    self.channel_name
                )
            
            # 接受连接
            await self.accept()
            
            # 发送连接成功消息
            await self.send_message({
                'type': 'connection_status',
                'status': 'connected',
                'user_id': str(self.user.id),
                'device_id': self.device.id if self.device else None,
                'timestamp': self.get_timestamp()
            })
            
            logger.info(f"User {self.user.id} connected to WebSocket")
            
        except Exception as e:
            logger.error(f"WebSocket connection error: {str(e)}")
            await self.close(code=4000)
    
    async def disconnect(self, close_code):
        """处理WebSocket断开连接"""
        try:
            # 离开用户组
            if self.user_group_name:
                await self.channel_layer.group_discard(
                    self.user_group_name,
                    self.channel_name
                )
            
            # 离开设备组
            if self.device_group_name:
                await self.channel_layer.group_discard(
                    self.device_group_name,
                    self.channel_name
                )
            
            if self.user:
                logger.info(f"User {self.user.id} disconnected from WebSocket (code: {close_code})")
        
        except Exception as e:
            logger.error(f"WebSocket disconnect error: {str(e)}")
    
    async def receive(self, text_data):
        """处理接收到的消息"""
        try:
            # 解析消息
            data = json.loads(text_data)
            message_type = data.get('type')
            message_content = data.get('message')
            
            logger.info(f"Received message from user {self.user.id}: {message_type}")
            
            # 根据消息类型处理
            if message_type == 'chat_message':
                await self.handle_chat_message(message_content)
            elif message_type == 'weather':
                await self.handle_weather_request(message_content)
            elif message_type == 'sing':
                await self.handle_sing_request(message_content)
            elif message_type == 'dance':
                await self.handle_dance_request(message_content)
            elif message_type == 'ping':
                await self.handle_ping()
            else:
                await self.send_error(f"Unsupported message type: {message_type}")
        
        except json.JSONDecodeError:
            await self.send_error("Invalid JSON format")
        except Exception as e:
            logger.error(f"Message handling error: {str(e)}")
            await self.send_error("Internal server error")
    
    async def handle_chat_message(self, message_content):
        """处理聊天消息"""
        try:
            # 这里可以集成AI聊天服务
            response = await self.process_chat_message(message_content)
            
            await self.send_message({
                'type': 'chat_response',
                'message': response,
                'timestamp': self.get_timestamp()
            })
        
        except Exception as e:
            await self.send_error(f"Chat processing error: {str(e)}")
    
    async def handle_weather_request(self, message_content):
        """处理天气查询请求"""
        try:
            city_name = message_content.get('city', '北京')
            
            # 调用天气服务
            weather_service = WeatherService()
            weather_data = await weather_service.get_weather(city_name)
            
            if weather_data['success']:
                await self.send_message({
                    'type': 'weather_response',
                    'data': weather_data['data'],
                    'timestamp': self.get_timestamp()
                })
            else:
                await self.send_error(f"Weather query failed: {weather_data['error']}")
        
        except Exception as e:
            await self.send_error(f"Weather service error: {str(e)}")
    
    async def handle_sing_request(self, message_content):
        """处理唱歌请求"""
        try:
            song_info = {
                'song': message_content.get('song', ''),
                'artist': message_content.get('artist', ''),
                'action': 'start_singing'
            }
            
            await self.send_message({
                'type': 'sing_response',
                'data': song_info,
                'message': f"开始播放:{song_info['song']} - {song_info['artist']}",
                'timestamp': self.get_timestamp()
            })
        
        except Exception as e:
            await self.send_error(f"Sing request error: {str(e)}")
    
    async def handle_dance_request(self, message_content):
        """处理跳舞请求"""
        try:
            dance_info = {
                'dance': message_content.get('dance', ''),
                'style': message_content.get('style', ''),
                'duration': message_content.get('duration', '30秒'),
                'action': 'start_dancing'
            }
            
            await self.send_message({
                'type': 'dance_response',
                'data': dance_info,
                'message': f"开始跳舞:{dance_info['dance']} ({dance_info['style']})",
                'timestamp': self.get_timestamp()
            })
        
        except Exception as e:
            await self.send_error(f"Dance request error: {str(e)}")
    
    async def handle_ping(self):
        """处理心跳包"""
        await self.send_message({
            'type': 'pong',
            'timestamp': self.get_timestamp()
        })
    
    # 组消息处理器
    async def group_message(self, event):
        """处理组消息"""
        message = event['message']
        await self.send(text_data=json.dumps(message))
    
    async def send_message(self, message: Dict[str, Any]):
        """发送消息到客户端"""
        await self.send(text_data=json.dumps(message))
    
    async def send_error(self, error_message: str):
        """发送错误消息"""
        await self.send_message({
            'type': 'error',
            'message': error_message,
            'timestamp': self.get_timestamp()
        })
    
    @database_sync_to_async
    def get_user_primary_device(self, user_id: int):
        """获取用户的主要设备"""
        try:
            user_device = UserDevice.objects.select_related('device').filter(
                user_id=user_id,
                is_primary=True
            ).first()
            return user_device.device if user_device else None
        except Exception:
            return None
    
    async def process_chat_message(self, message_content: str) -> str:
        """处理聊天消息可以集成AI服务"""
        # 这里可以调用AI对话服务
        # 目前返回简单回复
        return f"收到消息:{message_content}"
    
    @staticmethod
    def get_timestamp() -> str:
        """获取当前时间戳"""
        from django.utils import timezone
        return timezone.now().isoformat()

4. 消息服务

4.1 消息发送服务

# device_interaction/services.py
import json
import logging
from typing import Dict, Any, List, Optional
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from django.contrib.auth.models import User
from .models import UserDevice

logger = logging.getLogger(__name__)

class MessageService:
    """消息发送服务"""
    
    def __init__(self):
        self.channel_layer = get_channel_layer()
    
    def send_to_user(self, user_id: int, message: Dict[str, Any]) -> bool:
        """发送消息给指定用户"""
        try:
            group_name = f"user_{user_id}"
            
            async_to_sync(self.channel_layer.group_send)(
                group_name,
                {
                    'type': 'group_message',
                    'message': message
                }
            )
            
            logger.info(f"Message sent to user {user_id}")
            return True
        
        except Exception as e:
            logger.error(f"Failed to send message to user {user_id}: {str(e)}")
            return False
    
    def send_to_device(self, device_id: int, message: Dict[str, Any]) -> bool:
        """发送消息给指定设备"""
        try:
            group_name = f"device_{device_id}"
            
            async_to_sync(self.channel_layer.group_send)(
                group_name,
                {
                    'type': 'group_message',
                    'message': message
                }
            )
            
            logger.info(f"Message sent to device {device_id}")
            return True
        
        except Exception as e:
            logger.error(f"Failed to send message to device {device_id}: {str(e)}")
            return False
    
    def broadcast_to_all_users(self, message: Dict[str, Any]) -> int:
        """广播消息给所有在线用户"""
        try:
            # 获取所有在线用户(这里需要维护在线用户列表)
            online_users = self.get_online_users()
            success_count = 0
            
            for user_id in online_users:
                if self.send_to_user(user_id, message):
                    success_count += 1
            
            logger.info(f"Broadcast message sent to {success_count} users")
            return success_count
        
        except Exception as e:
            logger.error(f"Broadcast failed: {str(e)}")
            return 0
    
    def get_online_users(self) -> List[int]:
        """获取在线用户列表(需要实现在线用户追踪)"""
        # 这里可以通过Redis或数据库维护在线用户列表
        # 简化实现,返回空列表
        return []

class WeatherService:
    """天气服务"""
    
    async def get_weather(self, city: str) -> Dict[str, Any]:
        """获取天气信息"""
        try:
            # 这里集成真实的天气API
            # 示例返回模拟数据
            weather_data = {
                'city': {'name': city},
                'weather': {
                    'now': {
                        'temp': '25',
                        'text': '晴',
                        'icon': '100'
                    },
                    'updateTime': '2023-01-01T12:00:00Z'
                }
            }
            
            return {
                'success': True,
                'data': weather_data
            }
        
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }

4.2 消息队列处理

# device_interaction/message_queue.py
import json
import redis
import logging
from typing import Dict, Any
from django.conf import settings

logger = logging.getLogger(__name__)

class MessageQueue:
    """消息队列处理"""
    
    def __init__(self):
        self.redis_client = redis.Redis.from_url(settings.REDIS_LOCATION)
        self.queue_name = 'device_messages'
    
    def enqueue_message(self, user_id: int, message: Dict[str, Any]) -> bool:
        """将消息加入队列"""
        try:
            message_data = {
                'user_id': user_id,
                'message': message,
                'timestamp': self._get_timestamp()
            }
            
            self.redis_client.lpush(
                self.queue_name,
                json.dumps(message_data)
            )
            
            logger.info(f"Message enqueued for user {user_id}")
            return True
        
        except Exception as e:
            logger.error(f"Failed to enqueue message: {str(e)}")
            return False
    
    def dequeue_message(self) -> Dict[str, Any]:
        """从队列中取出消息"""
        try:
            message_json = self.redis_client.brpop(self.queue_name, timeout=1)
            if message_json:
                return json.loads(message_json[1])
            return None
        
        except Exception as e:
            logger.error(f"Failed to dequeue message: {str(e)}")
            return None
    
    def get_queue_size(self) -> int:
        """获取队列大小"""
        return self.redis_client.llen(self.queue_name)
    
    def clear_queue(self) -> bool:
        """清空队列"""
        try:
            self.redis_client.delete(self.queue_name)
            return True
        except Exception as e:
            logger.error(f"Failed to clear queue: {str(e)}")
            return False
    
    @staticmethod
    def _get_timestamp() -> str:
        from django.utils import timezone
        return timezone.now().isoformat()

5. HTTP API集成

5.1 消息发送API

# device_interaction/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from .services import MessageService

class MessageViewSet(ViewSet):
    """消息管理API"""
    permission_classes = [IsAuthenticated]
    
    def create(self, request):
        """发送消息"""
        try:
            message_type = request.data.get('type')
            message_content = request.data.get('message')
            user_id = request.data.get('user_id')
            
            if not all([message_type, message_content, user_id]):
                return Response({
                    'status': 'error',
                    'message': '缺少必要参数'
                }, status=400)
            
            # 构造消息
            message = {
                'type': message_type,
                'message': message_content,
                'timestamp': timezone.now().isoformat()
            }
            
            # 发送消息
            message_service = MessageService()
            success = message_service.send_to_user(int(user_id), message)
            
            if success:
                return Response({
                    'status': 'success',
                    'message': '消息发送成功',
                    'data': message
                })
            else:
                return Response({
                    'status': 'error',
                    'message': '消息发送失败'
                }, status=500)
        
        except Exception as e:
            return Response({
                'status': 'error',
                'message': str(e)
            }, status=500)

@api_view(['POST'])
@permission_classes([IsAuthenticated])
def send_message_to_user(request, user_id):
    """发送消息给指定用户"""
    try:
        message_data = {
            'type': request.data.get('type', 'system'),
            'message': request.data.get('message'),
            'sender': request.user.username,
            'timestamp': timezone.now().isoformat()
        }
        
        message_service = MessageService()
        success = message_service.send_to_user(int(user_id), message_data)
        
        return Response({
            'status': 'success' if success else 'error',
            'message': '消息发送成功' if success else '消息发送失败'
        })
    
    except Exception as e:
        return Response({
            'status': 'error',
            'message': str(e)
        }, status=500)

6. 客户端使用示例

6.1 JavaScript客户端

// WebSocket客户端示例
class DeviceWebSocketClient {
    constructor(token, baseUrl = 'ws://localhost:8000') {
        this.token = token;
        this.baseUrl = baseUrl;
        this.ws = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
        this.heartbeatInterval = null;
        this.messageHandlers = new Map();
    }
    
    connect() {
        try {
            const wsUrl = `${this.baseUrl}/ws/device/?token=${this.token}`;
            this.ws = new WebSocket(wsUrl);
            
            this.ws.onopen = (event) => {
                console.log('WebSocket连接已建立');
                this.reconnectAttempts = 0;
                this.startHeartbeat();
                this.onConnect?.(event);
            };
            
            this.ws.onmessage = (event) => {
                try {
                    const data = JSON.parse(event.data);
                    this.handleMessage(data);
                } catch (error) {
                    console.error('消息解析错误:', error);
                }
            };
            
            this.ws.onclose = (event) => {
                console.log('WebSocket连接已关闭:', event.code, event.reason);
                this.stopHeartbeat();
                this.onDisconnect?.(event);
                
                if (event.code !== 1000 && this.reconnectAttempts < this.maxReconnectAttempts) {
                    this.reconnect();
                }
            };
            
            this.ws.onerror = (error) => {
                console.error('WebSocket错误:', error);
                this.onError?.(error);
            };
        } catch (error) {
            console.error('WebSocket连接失败:', error);
        }
    }
    
    disconnect() {
        if (this.ws) {
            this.ws.close(1000, '客户端主动断开连接');
        }
    }
    
    reconnect() {
        this.reconnectAttempts++;
        console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
        
        setTimeout(() => {
            this.connect();
        }, Math.pow(2, this.reconnectAttempts) * 1000); // 指数退避
    }
    
    sendMessage(type, message) {
        if (this.ws && this.ws.readyState === WebSocket.OPEN) {
            const data = {
                type: type,
                message: message,
                timestamp: new Date().toISOString()
            };
            
            this.ws.send(JSON.stringify(data));
            return true;
        } else {
            console.error('WebSocket未连接');
            return false;
        }
    }
    
    // 消息处理
    handleMessage(data) {
        const handler = this.messageHandlers.get(data.type);
        if (handler) {
            handler(data);
        } else {
            console.log('收到消息:', data);
        }
    }
    
    // 注册消息处理器
    onMessage(type, handler) {
        this.messageHandlers.set(type, handler);
    }
    
    // 心跳机制
    startHeartbeat() {
        this.heartbeatInterval = setInterval(() => {
            this.sendMessage('ping', {});
        }, 30000); // 30秒心跳
    }
    
    stopHeartbeat() {
        if (this.heartbeatInterval) {
            clearInterval(this.heartbeatInterval);
            this.heartbeatInterval = null;
        }
    }
    
    // 便捷方法
    sendChatMessage(message) {
        return this.sendMessage('chat_message', message);
    }
    
    requestWeather(city) {
        return this.sendMessage('weather', { city: city });
    }
    
    requestSing(song, artist) {
        return this.sendMessage('sing', { song: song, artist: artist });
    }
    
    requestDance(dance, style, duration = '30秒') {
        return this.sendMessage('dance', { 
            dance: dance, 
            style: style, 
            duration: duration 
        });
    }
}

// 使用示例
const client = new DeviceWebSocketClient('your-auth-token');

// 设置事件处理
client.onConnect = () => {
    console.log('已连接到设备服务');
};

client.onDisconnect = (event) => {
    console.log('与设备服务断开连接');
};

// 设置消息处理器
client.onMessage('chat_response', (data) => {
    console.log('AI回复:', data.message);
});

client.onMessage('weather_response', (data) => {
    console.log('天气信息:', data.data);
});

client.onMessage('connection_status', (data) => {
    console.log('连接状态:', data.status);
});

// 连接
client.connect();

// 发送消息示例
setTimeout(() => {
    client.sendChatMessage('你好,请介绍一下自己');
    client.requestWeather('北京');
    client.requestSing('青花瓷', '周杰伦');
}, 1000);

6.2 Python客户端

# Python WebSocket客户端
import asyncio
import websockets
import json
import logging

logger = logging.getLogger(__name__)

class DeviceWebSocketClient:
    def __init__(self, token: str, base_url: str = 'ws://localhost:8000'):
        self.token = token
        self.base_url = base_url
        self.websocket = None
        self.message_handlers = {}
        self.running = False
    
    async def connect(self):
        """连接到WebSocket服务器"""
        try:
            uri = f"{self.base_url}/ws/device/?token={self.token}"
            self.websocket = await websockets.connect(uri)
            self.running = True
            
            logger.info("WebSocket连接已建立")
            
            # 启动消息接收循环
            await self.listen_for_messages()
            
        except Exception as e:
            logger.error(f"WebSocket连接失败: {str(e)}")
            raise
    
    async def disconnect(self):
        """断开WebSocket连接"""
        self.running = False
        if self.websocket:
            await self.websocket.close()
            logger.info("WebSocket连接已断开")
    
    async def send_message(self, message_type: str, message_content):
        """发送消息"""
        if not self.websocket:
            raise ConnectionError("WebSocket未连接")
        
        data = {
            'type': message_type,
            'message': message_content,
            'timestamp': datetime.now().isoformat()
        }
        
        await self.websocket.send(json.dumps(data))
    
    async def listen_for_messages(self):
        """监听消息"""
        try:
            async for message in self.websocket:
                if not self.running:
                    break
                
                try:
                    data = json.loads(message)
                    await self.handle_message(data)
                except json.JSONDecodeError:
                    logger.error(f"无效的JSON消息: {message}")
        
        except websockets.exceptions.ConnectionClosed:
            logger.info("WebSocket连接已关闭")
        except Exception as e:
            logger.error(f"消息监听错误: {str(e)}")
    
    async def handle_message(self, data):
        """处理接收到的消息"""
        message_type = data.get('type')
        handler = self.message_handlers.get(message_type)
        
        if handler:
            await handler(data)
        else:
            logger.info(f"收到消息: {data}")
    
    def register_handler(self, message_type: str, handler):
        """注册消息处理器"""
        self.message_handlers[message_type] = handler
    
    # 便捷方法
    async def send_chat_message(self, message: str):
        """发送聊天消息"""
        await self.send_message('chat_message', message)
    
    async def request_weather(self, city: str):
        """请求天气信息"""
        await self.send_message('weather', {'city': city})
    
    async def request_sing(self, song: str, artist: str):
        """请求唱歌"""
        await self.send_message('sing', {'song': song, 'artist': artist})

# 使用示例
async def main():
    client = DeviceWebSocketClient('your-auth-token')
    
    # 注册消息处理器
    async def handle_chat_response(data):
        print(f"AI回复: {data['message']}")
    
    async def handle_weather_response(data):
        weather = data['data']['weather']['now']
        print(f"天气: {weather['text']}, 温度: {weather['temp']}°C")
    
    client.register_handler('chat_response', handle_chat_response)
    client.register_handler('weather_response', handle_weather_response)
    
    # 连接并发送消息
    await client.connect()
    
    await client.send_chat_message('你好')
    await client.request_weather('上海')
    
    # 保持连接
    await asyncio.sleep(10)
    
    await client.disconnect()

# 运行示例
if __name__ == '__main__':
    asyncio.run(main())

7. 监控和调试

7.1 连接状态监控

# device_interaction/monitoring.py
import logging
from typing import Dict, List
from django.core.cache import cache
from django.utils import timezone

logger = logging.getLogger(__name__)

class WebSocketMonitor:
    """WebSocket连接监控"""
    
    @staticmethod
    def record_connection(user_id: int, channel_name: str):
        """记录连接"""
        connections = cache.get('ws_connections', {})
        connections[str(user_id)] = {
            'channel_name': channel_name,
            'connected_at': timezone.now().isoformat(),
            'last_activity': timezone.now().isoformat()
        }
        cache.set('ws_connections', connections, timeout=86400)  # 24小时
        
        logger.info(f"WebSocket connection recorded for user {user_id}")
    
    @staticmethod
    def record_disconnection(user_id: int):
        """记录断开连接"""
        connections = cache.get('ws_connections', {})
        if str(user_id) in connections:
            del connections[str(user_id)]
            cache.set('ws_connections', connections, timeout=86400)
        
        logger.info(f"WebSocket disconnection recorded for user {user_id}")
    
    @staticmethod
    def update_activity(user_id: int):
        """更新活动时间"""
        connections = cache.get('ws_connections', {})
        if str(user_id) in connections:
            connections[str(user_id)]['last_activity'] = timezone.now().isoformat()
            cache.set('ws_connections', connections, timeout=86400)
    
    @staticmethod
    def get_online_users() -> List[int]:
        """获取在线用户列表"""
        connections = cache.get('ws_connections', {})
        return [int(user_id) for user_id in connections.keys()]
    
    @staticmethod
    def get_connection_stats() -> Dict:
        """获取连接统计"""
        connections = cache.get('ws_connections', {})
        return {
            'total_connections': len(connections),
            'connections': connections
        }

7.2 性能监控

# device_interaction/performance.py
import time
import logging
from functools import wraps
from django.core.cache import cache

logger = logging.getLogger(__name__)

def monitor_performance(operation_name: str):
    """性能监控装饰器"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = await func(*args, **kwargs)
                duration = time.time() - start_time
                
                # 记录性能指标
                cache_key = f"ws_perf:{operation_name}"
                metrics = cache.get(cache_key, [])
                metrics.append({
                    'duration': duration,
                    'timestamp': time.time(),
                    'success': True
                })
                
                # 保留最近100次记录
                if len(metrics) > 100:
                    metrics = metrics[-100:]
                
                cache.set(cache_key, metrics, timeout=3600)
                
                if duration > 1.0:  # 超过1秒记录警告
                    logger.warning(f"Slow WebSocket operation: {operation_name} took {duration:.2f}s")
                
                return result
            
            except Exception as e:
                duration = time.time() - start_time
                logger.error(f"WebSocket operation failed: {operation_name} ({duration:.2f}s): {str(e)}")
                raise
        
        return wrapper
    return decorator

# 使用示例
class DeviceConsumer(AsyncWebsocketConsumer):
    @monitor_performance('handle_chat_message')
    async def handle_chat_message(self, message_content):
        # 处理逻辑
        pass

8. 最佳实践

8.1 错误处理

# WebSocket错误处理最佳实践
class DeviceConsumer(AsyncWebsocketConsumer):
    async def handle_error(self, error: Exception, context: str = ""):
        """统一错误处理"""
        error_message = str(error)
        logger.error(f"WebSocket error in {context}: {error_message}")
        
        # 发送错误消息给客户端
        await self.send_message({
            'type': 'error',
            'message': 'Internal server error',
            'code': 'INTERNAL_ERROR',
            'timestamp': self.get_timestamp()
        })

8.2 消息验证

# 消息验证
from marshmallow import Schema, fields, ValidationError

class MessageSchema(Schema):
    type = fields.Str(required=True)
    message = fields.Raw(required=True)
    timestamp = fields.DateTime(allow_none=True)

class DeviceConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        try:
            data = json.loads(text_data)
            
            # 验证消息格式
            schema = MessageSchema()
            validated_data = schema.load(data)
            
            # 处理验证后的消息
            await self.process_validated_message(validated_data)
            
        except json.JSONDecodeError:
            await self.send_error("Invalid JSON format")
        except ValidationError as e:
            await self.send_error(f"Message validation error: {e.messages}")

8.3 安全考虑

# 安全措施
class DeviceConsumer(AsyncWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.message_count = 0
        self.last_message_time = time.time()
        self.rate_limit_window = 60  # 60秒窗口
        self.max_messages_per_window = 100  # 每窗口最多100条消息
    
    async def receive(self, text_data):
        # 速率限制
        current_time = time.time()
        if current_time - self.last_message_time > self.rate_limit_window:
            self.message_count = 0
            self.last_message_time = current_time
        
        self.message_count += 1
        
        if self.message_count > self.max_messages_per_window:
            await self.send_error("Rate limit exceeded")
            await self.close(code=4008)
            return
        
        # 消息长度限制
        if len(text_data) > 10240:  # 10KB限制
            await self.send_error("Message too large")
            return
        
        # 处理消息
        await super().receive(text_data)

这个WebSocket实现指南提供了完整的实时通信解决方案包括服务端实现、客户端示例、监控和最佳实践。