pmc a8add9dc6e
All checks were successful
Build and Deploy LTY / build-and-deploy (push) Successful in 29m50s
feat: update device interaction module
- Update apps, consumers, and serializers
- Add scheduler and tasks modules

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 18:03:08 +08:00

601 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from django.core.cache import cache
import logging
logger = logging.getLogger(__name__)
class DeviceConsumer(AsyncWebsocketConsumer):
async def connect(self):
try:
# 检查用户是否已认证
if not self.scope['user'] or not self.scope['user'].is_authenticated:
# 尝试从URL路径中获取token进行认证
token = self.get_token_from_url()
if token:
# 使用token进行用户认证
user = await self.authenticate_with_token(token)
if user:
self.scope['user'] = user
else:
logger.warning("WebSocket connection rejected: Invalid token")
await self.close(code=4001)
return
else:
logger.warning("WebSocket connection rejected: User not authenticated and no token provided")
await self.close(code=4001)
return
# 使用用户ID作为组名
self.user_id = str(self.scope['user'].id)
self.group_name = f"device_{self.user_id}"
logger.info(f'WebSocket connected for user {self.user_id} in group {self.group_name}')
# 将用户加入对应的组
await self.channel_layer.group_add(
self.group_name,
self.channel_name
)
await self.accept()
self.device_mac = None # 设备MAC收到device_info后赋值
logger.info('WebSocket connection accepted')
except Exception as e:
logger.error(f"Error in WebSocket connect: {str(e)}")
await self.close(code=4002)
def get_token_from_url(self):
"""
从WebSocket URL路径中提取token
支持两种方式:
1. URL格式示例: ws://domain/ws/device/token/abcdef1234/
2. 通过路由捕获的命名参数: ws://domain/ws/device/token/abcdef1234/
"""
try:
# 方式1: 检查URL路由捕获的命名参数
if 'url_route' in self.scope and 'kwargs' in self.scope['url_route']:
kwargs = self.scope['url_route']['kwargs']
if 'token' in kwargs:
return kwargs['token']
# 方式2: 手动解析URL路径
path = self.scope['path']
parts = path.split('/')
if 'token' in parts:
# 获取token后面的值
token_index = parts.index('token')
if token_index + 1 < len(parts):
return parts[token_index + 1]
return None
except Exception as e:
logger.error(f"Error extracting token from URL: {str(e)}")
return None
@database_sync_to_async
def authenticate_with_token(self, token_key):
"""
使用RedisTokenAuthentication验证token
"""
try:
# 将导入移到函数内部避免Django AppRegistryNotReady错误
from userapp.authentication import RedisTokenAuthentication
from django.contrib.auth.models import AnonymousUser
# 创建一个模拟的request对象
class MockRequest:
def __init__(self, token):
self.headers = {'Authorization': f'Bearer {token}'}
# 使用RedisTokenAuthentication验证token
auth = RedisTokenAuthentication()
result = auth.authenticate(MockRequest(token_key))
if result is None:
return None
user, _ = result
return user
except Exception as e:
logger.error(f"Token authentication error: {str(e)}")
return None
@database_sync_to_async
def update_device_status(self, mac_address, device_data):
"""
根据设备上报信息更新数据库中的设备状态并记录心跳时间到Redis
"""
try:
from .models import Device
device = Device.objects.filter(mac_address=mac_address).first()
if device:
if 'battery_level' in device_data:
device.battery_level = device_data['battery_level']
if 'firmware_version' in device_data:
device.firmware_version = device_data['firmware_version']
if 'wifi_name' in device_data:
device.wifi_name = device_data['wifi_name']
if 'wifi_password' in device_data:
device.wifi_password = device_data['wifi_password']
if 'brightness' in device_data:
device.brightness = device_data['brightness']
device.status = 'connected'
device.save()
# 记录最后活跃时间到Redis超时5分钟自动过期
cache.set(f"device:last_seen:{mac_address}", time.time(), timeout=300)
logger.info(f"Updated device status for MAC: {mac_address}")
except Exception as e:
logger.error(f"Failed to update device status: {str(e)}")
@database_sync_to_async
def mark_device_offline(self, mac_address):
"""
将设备标记为离线
"""
try:
from .models import Device
Device.objects.filter(mac_address=mac_address, status='connected').update(status='disconnected')
cache.delete(f"device:last_seen:{mac_address}")
logger.info(f"Device marked offline: {mac_address}")
except Exception as e:
logger.error(f"Failed to mark device offline: {str(e)}")
@database_sync_to_async
def refresh_device_heartbeat(self, mac_address):
"""
刷新设备心跳时间仅更新Redis不写DB
"""
cache.set(f"device:last_seen:{mac_address}", time.time(), timeout=300)
async def disconnect(self, close_code):
try:
# 设备断开时标记为离线
if hasattr(self, 'device_mac') and self.device_mac:
await self.mark_device_offline(self.device_mac)
logger.info(f'Device {self.device_mac} marked offline on disconnect')
# 只有在用户已认证且已加入组的情况下才执行移除操作
if hasattr(self, 'group_name'):
# 将用户从组中移除
await self.channel_layer.group_discard(
self.group_name,
self.channel_name
)
logger.info(f'WebSocket disconnected for group {self.group_name}')
except Exception as e:
logger.error(f"Error in WebSocket disconnect: {str(e)}")
async def receive(self, text_data=None, bytes_data=None):
"""
处理从 WebSocket 接收到的消息(支持文本和二进制数据)
"""
try:
# 处理二进制数据
if bytes_data:
logger.info(f'Received binary message from user {self.user_id}')
# 如果需要处理二进制数据,可以在这里添加代码
await self.send(text_data=json.dumps({
'status': 'error',
'message': '不支持二进制数据'
}))
return
# 处理文本数据
if not text_data:
logger.warning("Received message with no content")
await self.send(text_data=json.dumps({
'status': 'error',
'message': '消息内容不能为空'
}))
return
# 解析 JSON 数据
text_data_json = json.loads(text_data)
message_type = text_data_json.get('type', 'text')
message = text_data_json.get('message')
if not message:
logger.warning("Received message with no content")
await self.send(text_data=json.dumps({
'status': 'error',
'message': '消息内容不能为空'
}))
return
logger.info(f'Received message from user {self.user_id}: {message}')
# 刷新设备心跳如果已知MAC
if hasattr(self, 'device_mac') and self.device_mac:
await self.refresh_device_heartbeat(self.device_mac)
# 根据消息类型处理
if message_type == 'weather':
try:
# 直接使用传来的天气数据
weather_data = json.loads(message)
# 发送天气信息
await self.channel_layer.group_send(
self.group_name,
{
'type': 'weather',
'message': weather_data,
'user_id': self.user_id
}
)
await self.send(text_data=json.dumps({
'status': 'success',
'message': '天气信息已发送',
'data': weather_data
}))
except json.JSONDecodeError:
await self.send(text_data=json.dumps({
'status': 'error',
'message': '消息格式错误'
}))
elif message_type == 'sing':
# 处理唱歌消息
await self.channel_layer.group_send(
self.group_name,
{
'type': 'sing',
'message': message,
'user_id': self.user_id
}
)
await self.send(text_data=json.dumps({
'status': 'success',
'message': '唱歌消息已发送'
}))
elif message_type == 'dance':
# 处理跳舞消息
await self.channel_layer.group_send(
self.group_name,
{
'type': 'dance',
'message': message,
'user_id': self.user_id
}
)
await self.send(text_data=json.dumps({
'status': 'success',
'message': '跳舞消息已发送'
}))
elif message_type == 'touch':
# 处理触摸消息
await self.channel_layer.group_send(
self.group_name,
{
'type': 'touch',
'message': message,
'user_id': self.user_id
}
)
await self.send(text_data=json.dumps({
'status': 'success',
'message': '触摸消息已发送~'
}))
elif message_type == 'flow_light':
# 处理开关流水灯消息
await self.channel_layer.group_send(
self.group_name,
{
'type': 'flow_light',
'message': message,
'user_id': self.user_id
}
)
await self.send(text_data=json.dumps({
'status': 'success',
'message': '触摸消息已发送~'
}))
elif message_type == 'conversation_status':
# 处理对话状态消息
await self.channel_layer.group_send(
self.group_name,
{
'type': 'conversation_status',
'message': message,
'user_id': self.user_id
}
)
await self.send(text_data=json.dumps({
'status': 'success',
'message': '对话状态消息已发送'
}))
elif message_type == 'device_info':
# 处理设备信息上报设备连WiFi后发送MAC、电量等
try:
device_data = json.loads(message) if isinstance(message, str) else message
# 更新数据库中的设备状态
mac_address = device_data.get('mac_address')
if mac_address:
self.device_mac = mac_address # 记录当前连接对应的设备MAC
await self.update_device_status(mac_address, device_data)
# 广播到 group让手机端也能收到
await self.channel_layer.group_send(
self.group_name,
{
'type': 'device_info',
'message': device_data,
'user_id': self.user_id
}
)
await self.send(text_data=json.dumps({
'status': 'success',
'message': '设备信息已更新'
}))
except Exception as e:
logger.error(f"Error processing device_info: {str(e)}")
await self.send(text_data=json.dumps({
'status': 'error',
'message': f'设备信息处理失败: {str(e)}'
}))
elif message_type == 'factory_reset':
# 处理恢复出厂设置消息
await self.channel_layer.group_send(
self.group_name,
{
'type': 'factory_reset',
'message': message,
'user_id': self.user_id
}
)
await self.send(text_data=json.dumps({
'status': 'success',
'message': '恢复出厂设置指令已发送'
}))
else:
# 处理普通文本消息
await self.channel_layer.group_send(
self.group_name,
{
'type': 'chat_message',
'message': message,
'user_id': self.user_id
}
)
await self.send(text_data=json.dumps({
'status': 'success',
'message': '消息已发送'
}))
except json.JSONDecodeError:
logger.error("Received invalid JSON data")
await self.send(text_data=json.dumps({
'status': 'error',
'message': '无效的JSON数据'
}))
except Exception as e:
logger.error(f"Error in WebSocket receive: {str(e)}")
await self.send(text_data=json.dumps({
'status': 'error',
'message': str(e)
}))
async def chat_message(self, event):
"""
处理聊天消息
将消息发送给 WebSocket
"""
try:
message = event['message']
user_id = event['user_id']
# 发送消息到 WebSocket
await self.send(text_data=json.dumps({
'type': 'chat_message',
'message': message,
'user_id': user_id
}))
except Exception as e:
logger.error(f"Error in chat_message: {str(e)}")
async def weather(self, event):
"""
处理天气消息
将天气信息发送给 WebSocket
"""
try:
message = event['message']
user_id = event['user_id']
# 发送天气消息到 WebSocket
await self.send(text_data=json.dumps({
'type': 'weather',
'message': message,
'user_id': user_id
}))
except Exception as e:
logger.error(f"Error in weather: {str(e)}")
async def sing(self, event):
"""
处理唱歌消息
将唱歌信息发送给 WebSocket
"""
try:
message = event['message']
user_id = event['user_id']
# 发送唱歌消息到 WebSocket
await self.send(text_data=json.dumps({
'type': 'sing',
'message': message,
'user_id': user_id
}))
except Exception as e:
logger.error(f"Error in sing: {str(e)}")
async def dance(self, event):
"""
处理跳舞消息
将跳舞信息发送给 WebSocket
"""
try:
message = event['message']
user_id = event['user_id']
# 发送跳舞消息到 WebSocket
await self.send(text_data=json.dumps({
'type': 'dance',
'message': message,
'user_id': user_id
}))
except Exception as e:
logger.error(f"Error in dance: {str(e)}")
async def touch(self, event):
"""
处理触摸消息
将触摸信息发送给 WebSocket
"""
try:
message = event['message']
user_id = event['user_id']
# 发送触摸消息到 WebSocket
await self.send(text_data=json.dumps({
'type': 'touch',
'message': message,
'user_id': user_id
}))
except Exception as e:
logger.error(f"Error in touch: {str(e)}")
async def flow_light(self, event):
"""
处理开关流水灯消息
将开关流水灯信息发送给 WebSocket
"""
try:
message = event['message']
user_id = event['user_id']
# 发送跳舞消息到 WebSocket
await self.send(text_data=json.dumps({
'type': 'flow_light',
'message': message,
'user_id': user_id
}))
except Exception as e:
logger.error(f"Error in flow_light: {str(e)}")
async def conversation_status(self, event):
"""
处理对话状态消息
将状态信息发送给 WebSocket
"""
try:
message = event['message']
# 发送对话状态消息到 WebSocket
await self.send(text_data=json.dumps({
'type': 'conversation_status',
'message': message
}))
logger.info(f"Sent conversation status to WebSocket: {message.get('stage_code')} {message}")
except Exception as e:
logger.error(f"Error in conversation_status: {str(e)}")
async def dance(self, event):
"""
处理跳舞消息
将跳舞信息发送给 WebSocket
"""
try:
message = event['message']
# 发送对话状态消息到 WebSocket
await self.send(text_data=json.dumps({
'type': 'dance',
'message': message
}))
logger.info(f"Sent dance to WebSocket: {message}")
except Exception as e:
logger.error(f"Error in dance: {str(e)}")
async def factory_reset(self, event):
"""
处理恢复出厂设置消息
将恢复出厂设置指令发送给 WebSocket
"""
try:
message = event['message']
user_id = event['user_id']
# 发送恢复出厂设置消息到 WebSocket
await self.send(text_data=json.dumps({
'type': 'factory_reset',
'message': message,
'user_id': user_id
}))
except Exception as e:
logger.error(f"Error in factory_reset: {str(e)}")
async def device_info(self, event):
"""
处理设备信息上报消息
将设备信息转发给 WebSocket手机端会收到
"""
try:
message = event['message']
user_id = event.get('user_id', '')
await self.send(text_data=json.dumps({
'type': 'device_info',
'message': message,
'user_id': user_id
}))
logger.info(f"Sent device_info to WebSocket: mac={message.get('mac_address', 'unknown')}")
except Exception as e:
logger.error(f"Error in device_info: {str(e)}")
async def conversation_subtitle(self, event):
"""
处理对话字幕消息
将字幕信息发送给 WebSocket
"""
try:
message = event['message']
# 发送字幕消息到 WebSocket
await self.send(text_data=json.dumps({
'type': 'conversation_subtitle',
'message': message
}))
logger.info(f"Sent subtitle to WebSocket: {len(message.get('subtitles', []))} items")
except Exception as e:
logger.error(f"Error in conversation_subtitle: {str(e)}")
async def forward_message(self, event):
"""
转发消息
将接收到的消息直接转发给 WebSocket
"""
try:
message = event['message']
# 发送消息到 WebSocket
await self.send(text_data=json.dumps({
'type': 'forward_message',
'message': message
}))
except Exception as e:
logger.error(f"Error in forward_message: {str(e)}")