import json import time from channels.generic.websocket import AsyncWebsocketConsumer from channels.db import database_sync_to_async from django.core.cache import cache import logging logger = logging.getLogger(__name__) class DeviceConsumer(AsyncWebsocketConsumer): async def connect(self): try: # 检查用户是否已认证 if not self.scope['user'] or not self.scope['user'].is_authenticated: # 尝试从URL路径中获取token进行认证 token = self.get_token_from_url() if token: # 使用token进行用户认证 user = await self.authenticate_with_token(token) if user: self.scope['user'] = user else: logger.warning("WebSocket connection rejected: Invalid token") await self.close(code=4001) return else: logger.warning("WebSocket connection rejected: User not authenticated and no token provided") await self.close(code=4001) return # 使用用户ID作为组名 self.user_id = str(self.scope['user'].id) self.group_name = f"device_{self.user_id}" logger.info(f'WebSocket connected for user {self.user_id} in group {self.group_name}') # 将用户加入对应的组 await self.channel_layer.group_add( self.group_name, self.channel_name ) await self.accept() self.device_mac = None # 设备MAC,收到device_info后赋值 logger.info('WebSocket connection accepted') except Exception as e: logger.error(f"Error in WebSocket connect: {str(e)}") await self.close(code=4002) def get_token_from_url(self): """ 从WebSocket URL路径中提取token 支持两种方式: 1. URL格式示例: ws://domain/ws/device/token/abcdef1234/ 2. 通过路由捕获的命名参数: ws://domain/ws/device/token/abcdef1234/ """ try: # 方式1: 检查URL路由捕获的命名参数 if 'url_route' in self.scope and 'kwargs' in self.scope['url_route']: kwargs = self.scope['url_route']['kwargs'] if 'token' in kwargs: return kwargs['token'] # 方式2: 手动解析URL路径 path = self.scope['path'] parts = path.split('/') if 'token' in parts: # 获取token后面的值 token_index = parts.index('token') if token_index + 1 < len(parts): return parts[token_index + 1] return None except Exception as e: logger.error(f"Error extracting token from URL: {str(e)}") return None @database_sync_to_async def authenticate_with_token(self, token_key): """ 使用RedisTokenAuthentication验证token """ try: # 将导入移到函数内部,避免Django AppRegistryNotReady错误 from userapp.authentication import RedisTokenAuthentication from django.contrib.auth.models import AnonymousUser # 创建一个模拟的request对象 class MockRequest: def __init__(self, token): self.headers = {'Authorization': f'Bearer {token}'} # 使用RedisTokenAuthentication验证token auth = RedisTokenAuthentication() result = auth.authenticate(MockRequest(token_key)) if result is None: return None user, _ = result return user except Exception as e: logger.error(f"Token authentication error: {str(e)}") return None @database_sync_to_async def update_device_status(self, mac_address, device_data): """ 根据设备上报信息更新数据库中的设备状态,并记录心跳时间到Redis """ try: from .models import Device device = Device.objects.filter(mac_address=mac_address).first() if device: if 'battery_level' in device_data: device.battery_level = device_data['battery_level'] if 'firmware_version' in device_data: device.firmware_version = device_data['firmware_version'] if 'wifi_name' in device_data: device.wifi_name = device_data['wifi_name'] if 'wifi_password' in device_data: device.wifi_password = device_data['wifi_password'] if 'brightness' in device_data: device.brightness = device_data['brightness'] device.status = 'connected' device.save() # 记录最后活跃时间到Redis(超时5分钟自动过期) cache.set(f"device:last_seen:{mac_address}", time.time(), timeout=300) logger.info(f"Updated device status for MAC: {mac_address}") except Exception as e: logger.error(f"Failed to update device status: {str(e)}") @database_sync_to_async def mark_device_offline(self, mac_address): """ 将设备标记为离线 """ try: from .models import Device Device.objects.filter(mac_address=mac_address, status='connected').update(status='disconnected') cache.delete(f"device:last_seen:{mac_address}") logger.info(f"Device marked offline: {mac_address}") except Exception as e: logger.error(f"Failed to mark device offline: {str(e)}") @database_sync_to_async def refresh_device_heartbeat(self, mac_address): """ 刷新设备心跳时间(仅更新Redis,不写DB) """ cache.set(f"device:last_seen:{mac_address}", time.time(), timeout=300) async def disconnect(self, close_code): try: # 设备断开时标记为离线 if hasattr(self, 'device_mac') and self.device_mac: await self.mark_device_offline(self.device_mac) logger.info(f'Device {self.device_mac} marked offline on disconnect') # 只有在用户已认证且已加入组的情况下才执行移除操作 if hasattr(self, 'group_name'): # 将用户从组中移除 await self.channel_layer.group_discard( self.group_name, self.channel_name ) logger.info(f'WebSocket disconnected for group {self.group_name}') except Exception as e: logger.error(f"Error in WebSocket disconnect: {str(e)}") async def receive(self, text_data=None, bytes_data=None): """ 处理从 WebSocket 接收到的消息(支持文本和二进制数据) """ try: # 处理二进制数据 if bytes_data: logger.info(f'Received binary message from user {self.user_id}') # 如果需要处理二进制数据,可以在这里添加代码 await self.send(text_data=json.dumps({ 'status': 'error', 'message': '不支持二进制数据' })) return # 处理文本数据 if not text_data: logger.warning("Received message with no content") await self.send(text_data=json.dumps({ 'status': 'error', 'message': '消息内容不能为空' })) return # 解析 JSON 数据 text_data_json = json.loads(text_data) message_type = text_data_json.get('type', 'text') message = text_data_json.get('message') if not message: logger.warning("Received message with no content") await self.send(text_data=json.dumps({ 'status': 'error', 'message': '消息内容不能为空' })) return logger.info(f'Received message from user {self.user_id}: {message}') # 刷新设备心跳(如果已知MAC) if hasattr(self, 'device_mac') and self.device_mac: await self.refresh_device_heartbeat(self.device_mac) # 根据消息类型处理 if message_type == 'weather': try: # 直接使用传来的天气数据 weather_data = json.loads(message) # 发送天气信息 await self.channel_layer.group_send( self.group_name, { 'type': 'weather', 'message': weather_data, 'user_id': self.user_id } ) await self.send(text_data=json.dumps({ 'status': 'success', 'message': '天气信息已发送', 'data': weather_data })) except json.JSONDecodeError: await self.send(text_data=json.dumps({ 'status': 'error', 'message': '消息格式错误' })) elif message_type == 'sing': # 处理唱歌消息 await self.channel_layer.group_send( self.group_name, { 'type': 'sing', 'message': message, 'user_id': self.user_id } ) await self.send(text_data=json.dumps({ 'status': 'success', 'message': '唱歌消息已发送' })) elif message_type == 'dance': # 处理跳舞消息 await self.channel_layer.group_send( self.group_name, { 'type': 'dance', 'message': message, 'user_id': self.user_id } ) await self.send(text_data=json.dumps({ 'status': 'success', 'message': '跳舞消息已发送' })) elif message_type == 'touch': # 处理触摸消息 await self.channel_layer.group_send( self.group_name, { 'type': 'touch', 'message': message, 'user_id': self.user_id } ) await self.send(text_data=json.dumps({ 'status': 'success', 'message': '触摸消息已发送~' })) elif message_type == 'flow_light': # 处理开关流水灯消息 await self.channel_layer.group_send( self.group_name, { 'type': 'flow_light', 'message': message, 'user_id': self.user_id } ) await self.send(text_data=json.dumps({ 'status': 'success', 'message': '触摸消息已发送~' })) elif message_type == 'conversation_status': # 处理对话状态消息 await self.channel_layer.group_send( self.group_name, { 'type': 'conversation_status', 'message': message, 'user_id': self.user_id } ) await self.send(text_data=json.dumps({ 'status': 'success', 'message': '对话状态消息已发送' })) elif message_type == 'device_info': # 处理设备信息上报(设备连WiFi后发送MAC、电量等) try: device_data = json.loads(message) if isinstance(message, str) else message # 更新数据库中的设备状态 mac_address = device_data.get('mac_address') if mac_address: self.device_mac = mac_address # 记录当前连接对应的设备MAC await self.update_device_status(mac_address, device_data) # 广播到 group,让手机端也能收到 await self.channel_layer.group_send( self.group_name, { 'type': 'device_info', 'message': device_data, 'user_id': self.user_id } ) await self.send(text_data=json.dumps({ 'status': 'success', 'message': '设备信息已更新' })) except Exception as e: logger.error(f"Error processing device_info: {str(e)}") await self.send(text_data=json.dumps({ 'status': 'error', 'message': f'设备信息处理失败: {str(e)}' })) elif message_type == 'factory_reset': # 处理恢复出厂设置消息 await self.channel_layer.group_send( self.group_name, { 'type': 'factory_reset', 'message': message, 'user_id': self.user_id } ) await self.send(text_data=json.dumps({ 'status': 'success', 'message': '恢复出厂设置指令已发送' })) elif message_type == 'device_state': # 处理手机设备状态消息 await self.channel_layer.group_send( self.group_name, { 'type': 'device_state', 'message': message, 'user_id': self.user_id } ) await self.send(text_data=json.dumps({ 'status': 'success', 'message': '设备状态消息已发送' })) else: # 处理普通文本消息 await self.channel_layer.group_send( self.group_name, { 'type': 'chat_message', 'message': message, 'user_id': self.user_id } ) await self.send(text_data=json.dumps({ 'status': 'success', 'message': '消息已发送' })) except json.JSONDecodeError: logger.error("Received invalid JSON data") await self.send(text_data=json.dumps({ 'status': 'error', 'message': '无效的JSON数据' })) except Exception as e: logger.error(f"Error in WebSocket receive: {str(e)}") await self.send(text_data=json.dumps({ 'status': 'error', 'message': str(e) })) async def chat_message(self, event): """ 处理聊天消息 将消息发送给 WebSocket """ try: message = event['message'] user_id = event['user_id'] # 发送消息到 WebSocket await self.send(text_data=json.dumps({ 'type': 'chat_message', 'message': message, 'user_id': user_id })) except Exception as e: logger.error(f"Error in chat_message: {str(e)}") async def weather(self, event): """ 处理天气消息 将天气信息发送给 WebSocket """ try: message = event['message'] user_id = event['user_id'] # 发送天气消息到 WebSocket await self.send(text_data=json.dumps({ 'type': 'weather', 'message': message, 'user_id': user_id })) except Exception as e: logger.error(f"Error in weather: {str(e)}") async def sing(self, event): """ 处理唱歌消息 将唱歌信息发送给 WebSocket """ try: message = event['message'] user_id = event['user_id'] # 发送唱歌消息到 WebSocket await self.send(text_data=json.dumps({ 'type': 'sing', 'message': message, 'user_id': user_id })) except Exception as e: logger.error(f"Error in sing: {str(e)}") async def dance(self, event): """ 处理跳舞消息 将跳舞信息发送给 WebSocket """ try: message = event['message'] user_id = event['user_id'] # 发送跳舞消息到 WebSocket await self.send(text_data=json.dumps({ 'type': 'dance', 'message': message, 'user_id': user_id })) except Exception as e: logger.error(f"Error in dance: {str(e)}") async def touch(self, event): """ 处理触摸消息 将触摸信息发送给 WebSocket """ try: message = event['message'] user_id = event['user_id'] # 发送触摸消息到 WebSocket await self.send(text_data=json.dumps({ 'type': 'touch', 'message': message, 'user_id': user_id })) except Exception as e: logger.error(f"Error in touch: {str(e)}") async def flow_light(self, event): """ 处理开关流水灯消息 将开关流水灯信息发送给 WebSocket """ try: message = event['message'] user_id = event['user_id'] # 发送跳舞消息到 WebSocket await self.send(text_data=json.dumps({ 'type': 'flow_light', 'message': message, 'user_id': user_id })) except Exception as e: logger.error(f"Error in flow_light: {str(e)}") async def conversation_status(self, event): """ 处理对话状态消息 将状态信息发送给 WebSocket """ try: message = event['message'] # 发送对话状态消息到 WebSocket await self.send(text_data=json.dumps({ 'type': 'conversation_status', 'message': message })) logger.info(f"Sent conversation status to WebSocket: {message.get('stage_code')}, {message}") except Exception as e: logger.error(f"Error in conversation_status: {str(e)}") async def dance(self, event): """ 处理跳舞消息 将跳舞信息发送给 WebSocket """ try: message = event['message'] # 发送对话状态消息到 WebSocket await self.send(text_data=json.dumps({ 'type': 'dance', 'message': message })) logger.info(f"Sent dance to WebSocket: {message}") except Exception as e: logger.error(f"Error in dance: {str(e)}") async def factory_reset(self, event): """ 处理恢复出厂设置消息 将恢复出厂设置指令发送给 WebSocket """ try: message = event['message'] user_id = event['user_id'] # 发送恢复出厂设置消息到 WebSocket await self.send(text_data=json.dumps({ 'type': 'factory_reset', 'message': message, 'user_id': user_id })) except Exception as e: logger.error(f"Error in factory_reset: {str(e)}") async def device_state(self, event): """ 处理手机设备状态消息 将设备状态信息转发给 WebSocket """ try: message = event['message'] user_id = event['user_id'] await self.send(text_data=json.dumps({ 'type': 'device_state', 'message': message, 'user_id': user_id })) except Exception as e: logger.error(f"Error in device_state: {str(e)}") async def device_info(self, event): """ 处理设备信息上报消息 将设备信息转发给 WebSocket(手机端会收到) """ try: message = event['message'] user_id = event.get('user_id', '') await self.send(text_data=json.dumps({ 'type': 'device_info', 'message': message, 'user_id': user_id })) logger.info(f"Sent device_info to WebSocket: mac={message.get('mac_address', 'unknown')}") except Exception as e: logger.error(f"Error in device_info: {str(e)}") async def conversation_subtitle(self, event): """ 处理对话字幕消息 将字幕信息发送给 WebSocket """ try: message = event['message'] # 发送字幕消息到 WebSocket await self.send(text_data=json.dumps({ 'type': 'conversation_subtitle', 'message': message })) logger.info(f"Sent subtitle to WebSocket: {len(message.get('subtitles', []))} items") except Exception as e: logger.error(f"Error in conversation_subtitle: {str(e)}") async def forward_message(self, event): """ 转发消息 将接收到的消息直接转发给 WebSocket """ try: message = event['message'] # 发送消息到 WebSocket await self.send(text_data=json.dumps({ 'type': 'forward_message', 'message': message })) except Exception as e: logger.error(f"Error in forward_message: {str(e)}")