From a8add9dc6e53cfbe9a9ad668ad09a8ad4002f533 Mon Sep 17 00:00:00 2001 From: pmc <740076875@qq.com> Date: Fri, 27 Mar 2026 18:03:08 +0800 Subject: [PATCH] feat: update device interaction module - Update apps, consumers, and serializers - Add scheduler and tasks modules Co-Authored-By: Claude Sonnet 4.6 --- qy_lty/device_interaction/apps.py | 10 ++++++ qy_lty/device_interaction/consumers.py | 43 ++++++++++++++++++++++-- qy_lty/device_interaction/scheduler.py | 27 +++++++++++++++ qy_lty/device_interaction/serializers.py | 14 ++++++-- qy_lty/device_interaction/tasks.py | 32 ++++++++++++++++++ 5 files changed, 121 insertions(+), 5 deletions(-) create mode 100644 qy_lty/device_interaction/scheduler.py create mode 100644 qy_lty/device_interaction/tasks.py diff --git a/qy_lty/device_interaction/apps.py b/qy_lty/device_interaction/apps.py index ceba299..fa81182 100644 --- a/qy_lty/device_interaction/apps.py +++ b/qy_lty/device_interaction/apps.py @@ -1,6 +1,16 @@ from django.apps import AppConfig +import logging + +logger = logging.getLogger(__name__) class DeviceInteractionConfig(AppConfig): default_auto_field = "django.db.models.BigAutoField" name = "device_interaction" + + def ready(self): + from .scheduler import start + try: + start() + except Exception as e: + logger.error(f"Error starting device online check scheduler: {e}") diff --git a/qy_lty/device_interaction/consumers.py b/qy_lty/device_interaction/consumers.py index b74a558..464931f 100644 --- a/qy_lty/device_interaction/consumers.py +++ b/qy_lty/device_interaction/consumers.py @@ -1,6 +1,8 @@ import json +import time from channels.generic.websocket import AsyncWebsocketConsumer from channels.db import database_sync_to_async +from django.core.cache import cache import logging logger = logging.getLogger(__name__) @@ -38,8 +40,9 @@ class DeviceConsumer(AsyncWebsocketConsumer): ) await self.accept() + self.device_mac = None # 设备MAC,收到device_info后赋值 logger.info('WebSocket connection accepted') - + except Exception as e: logger.error(f"Error in WebSocket connect: {str(e)}") await self.close(code=4002) @@ -103,7 +106,7 @@ class DeviceConsumer(AsyncWebsocketConsumer): @database_sync_to_async def update_device_status(self, mac_address, device_data): """ - 根据设备上报信息更新数据库中的设备状态 + 根据设备上报信息更新数据库中的设备状态,并记录心跳时间到Redis """ try: from .models import Device @@ -115,16 +118,45 @@ class DeviceConsumer(AsyncWebsocketConsumer): device.firmware_version = device_data['firmware_version'] if 'wifi_name' in device_data: device.wifi_name = device_data['wifi_name'] + if 'wifi_password' in device_data: + device.wifi_password = device_data['wifi_password'] if 'brightness' in device_data: device.brightness = device_data['brightness'] device.status = 'connected' device.save() + # 记录最后活跃时间到Redis(超时5分钟自动过期) + cache.set(f"device:last_seen:{mac_address}", time.time(), timeout=300) logger.info(f"Updated device status for MAC: {mac_address}") except Exception as e: logger.error(f"Failed to update device status: {str(e)}") + @database_sync_to_async + def mark_device_offline(self, mac_address): + """ + 将设备标记为离线 + """ + try: + from .models import Device + Device.objects.filter(mac_address=mac_address, status='connected').update(status='disconnected') + cache.delete(f"device:last_seen:{mac_address}") + logger.info(f"Device marked offline: {mac_address}") + except Exception as e: + logger.error(f"Failed to mark device offline: {str(e)}") + + @database_sync_to_async + def refresh_device_heartbeat(self, mac_address): + """ + 刷新设备心跳时间(仅更新Redis,不写DB) + """ + cache.set(f"device:last_seen:{mac_address}", time.time(), timeout=300) + async def disconnect(self, close_code): try: + # 设备断开时标记为离线 + if hasattr(self, 'device_mac') and self.device_mac: + await self.mark_device_offline(self.device_mac) + logger.info(f'Device {self.device_mac} marked offline on disconnect') + # 只有在用户已认证且已加入组的情况下才执行移除操作 if hasattr(self, 'group_name'): # 将用户从组中移除 @@ -174,7 +206,11 @@ class DeviceConsumer(AsyncWebsocketConsumer): return logger.info(f'Received message from user {self.user_id}: {message}') - + + # 刷新设备心跳(如果已知MAC) + if hasattr(self, 'device_mac') and self.device_mac: + await self.refresh_device_heartbeat(self.device_mac) + # 根据消息类型处理 if message_type == 'weather': try: @@ -285,6 +321,7 @@ class DeviceConsumer(AsyncWebsocketConsumer): # 更新数据库中的设备状态 mac_address = device_data.get('mac_address') if mac_address: + self.device_mac = mac_address # 记录当前连接对应的设备MAC await self.update_device_status(mac_address, device_data) # 广播到 group,让手机端也能收到 diff --git a/qy_lty/device_interaction/scheduler.py b/qy_lty/device_interaction/scheduler.py new file mode 100644 index 0000000..c4938d3 --- /dev/null +++ b/qy_lty/device_interaction/scheduler.py @@ -0,0 +1,27 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger +import logging + +logger = logging.getLogger(__name__) + +scheduler = BackgroundScheduler() + + +def start(): + if not scheduler.running: + from device_interaction.tasks import check_device_online_status + + scheduler.add_job( + check_device_online_status, + trigger=IntervalTrigger(seconds=60), + id='check_device_online_status', + replace_existing=True, + ) + logger.info("Device online check scheduler started") + scheduler.start() + + +def stop(): + if scheduler.running: + scheduler.shutdown() + logger.info("Device online check scheduler stopped") diff --git a/qy_lty/device_interaction/serializers.py b/qy_lty/device_interaction/serializers.py index 20f8c9b..1a7eb74 100644 --- a/qy_lty/device_interaction/serializers.py +++ b/qy_lty/device_interaction/serializers.py @@ -1,6 +1,8 @@ +import time from rest_framework import serializers -from .models import DeviceType, DeviceBatch, Device, UserDevice +from django.core.cache import cache from django.utils import timezone +from .models import DeviceType, DeviceBatch, Device, UserDevice class DeviceTypeSerializer(serializers.ModelSerializer): @@ -67,12 +69,20 @@ class UserDeviceSerializer(serializers.ModelSerializer): mac_address = serializers.ReadOnlyField(source='device.mac_address') device_type = serializers.ReadOnlyField(source='device.device_type.name') device_status = serializers.ReadOnlyField(source='device.status') + is_online = serializers.SerializerMethodField() battery_level = serializers.ReadOnlyField(source='device.battery_level') firmware_version = serializers.ReadOnlyField(source='device.firmware_version') wifi_name = serializers.ReadOnlyField(source='device.wifi_name') wifi_password = serializers.ReadOnlyField(source='device.wifi_password') brightness = serializers.ReadOnlyField(source='device.brightness') - + + def get_is_online(self, obj): + """根据Redis中的last_seen判断设备是否在线(3分钟内有上报)""" + last_seen = cache.get(f"device:last_seen:{obj.device.mac_address}") + if last_seen is None: + return False + return (time.time() - float(last_seen)) <= 180 + class Meta: model = UserDevice fields = '__all__' diff --git a/qy_lty/device_interaction/tasks.py b/qy_lty/device_interaction/tasks.py new file mode 100644 index 0000000..8cdf402 --- /dev/null +++ b/qy_lty/device_interaction/tasks.py @@ -0,0 +1,32 @@ +import time +import logging +from django.core.cache import cache + +logger = logging.getLogger(__name__) + +# 超时阈值:3分钟(设备每2分钟上报一次,留1分钟容差) +DEVICE_OFFLINE_TIMEOUT = 180 + + +def check_device_online_status(): + """ + 检查所有状态为 connected 的设备,如果超过3分钟没有上报数据,标记为离线。 + 由定时任务每60秒调用一次。 + """ + from device_interaction.models import Device + + connected_devices = Device.objects.filter(status='connected') + now = time.time() + offline_count = 0 + + for device in connected_devices: + last_seen = cache.get(f"device:last_seen:{device.mac_address}") + if last_seen is None or (now - float(last_seen)) > DEVICE_OFFLINE_TIMEOUT: + device.status = 'disconnected' + device.save(update_fields=['status']) + cache.delete(f"device:last_seen:{device.mac_address}") + offline_count += 1 + logger.info(f"Device {device.mac_address} marked offline (timeout)") + + if offline_count > 0: + logger.info(f"check_device_online_status: {offline_count} device(s) marked offline")