Baji_Rtc_Toy/tests/ble_provision_test.py

360 lines
12 KiB
Python

#!/usr/bin/env python3
"""
BLE 配网协议测试脚本
用途: 模拟手机端,通过 BLE 与 ESP32 设备通信,测试自定义 GATT 配网协议。
依赖: pip install bleak
运行:
# 交互式配网(输入 SSID 和密码)
python tests/ble_provision_test.py --ssid "MyWiFi" --pwd "12345678"
# 扫描 WiFi 列表
python tests/ble_provision_test.py --scan-wifi
# 查询 WiFi 状态
python tests/ble_provision_test.py --get-status
# 指定设备名
python tests/ble_provision_test.py --device "Airhub_Ble" --ssid "MyWiFi" --pwd "12345678"
协议说明:
Service UUID: 0xABF0
Write Char: 0xABF1 (手机→设备, 二进制命令)
Notify Char: 0xABF2 (设备→手机, 二进制响应)
命令格式: [cmd(1字节)] + [payload...]
响应格式: [resp(1字节)] + [data...]
"""
import argparse
import asyncio
import sys
import time
import struct
try:
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
except ImportError:
print("错误: 缺少 bleak 库,请执行: pip install bleak")
sys.exit(1)
# ============================================================
# BLE 参数定义 (与 bluetooth_provisioning.h 一致)
# ============================================================
SERVICE_UUID = "0000abf0-0000-1000-8000-00805f9b34fb"
CHAR_WRITE_UUID = "0000abf1-0000-1000-8000-00805f9b34fb"
CHAR_NOTIFY_UUID = "0000abf2-0000-1000-8000-00805f9b34fb"
DEFAULT_DEVICE = "Airhub_" # 前缀匹配,设备名格式: Airhub_xx:xx:xx:xx:xx:xx
# 命令码 (手机→设备)
CMD_SET_SSID = 0x01
CMD_SET_PASSWORD = 0x02
CMD_SET_BSSID = 0x03
CMD_CONNECT_AP = 0x04
CMD_DISCONNECT_AP = 0x05
CMD_GET_WIFI_LIST = 0x06
CMD_DISCONNECT_BLE = 0x07
CMD_SET_WIFI_MODE = 0x08
CMD_GET_WIFI_STATUS = 0x09
CMD_CUSTOM_DATA = 0x10
# 响应码 (设备→手机)
RESP_WIFI_STATUS = 0x81
RESP_WIFI_LIST = 0x82
RESP_WIFI_LIST_END = 0x83
RESP_CUSTOM_DATA = 0x84
class BleProvisionTester:
"""BLE 配网协议测试器"""
def __init__(self, device_name: str, timeout: float = 10.0):
self.device_name = device_name
self.timeout = timeout
self.client = None
self.notifications = []
self._notify_event = asyncio.Event()
def _on_notify(self, sender: BleakGATTCharacteristic, data: bytearray):
"""NOTIFY 回调"""
self.notifications.append(bytes(data))
self._notify_event.set()
self._print_notification(data)
@staticmethod
def _print_notification(data: bytearray):
"""解析并打印 NOTIFY 数据"""
if len(data) < 1:
print(f" <- [空数据]")
return
resp_type = data[0]
hex_str = data.hex(" ")
if resp_type == RESP_WIFI_STATUS:
success = data[1] if len(data) > 1 else 0
reason = data[2] if len(data) > 2 else 0
status = "成功" if success == 1 else f"失败 (原因码: {reason})"
print(f" <- [WiFi状态] {status} (raw: {hex_str})")
elif resp_type == RESP_WIFI_LIST:
if len(data) >= 3:
rssi = struct.unpack("b", bytes([data[1]]))[0] # 有符号
ssid_len = data[2]
ssid = data[3:3 + ssid_len].decode("utf-8", errors="replace")
print(f" <- [WiFi] RSSI={rssi}dBm SSID=\"{ssid}\"")
else:
print(f" <- [WiFi列表] 数据不完整 (raw: {hex_str})")
elif resp_type == RESP_WIFI_LIST_END:
print(f" <- [WiFi列表结束]")
elif resp_type == RESP_CUSTOM_DATA:
payload = data[1:]
print(f" <- [自定义数据] {payload.hex(' ')} text=\"{payload.decode('utf-8', errors='replace')}\"")
else:
print(f" <- [未知响应 0x{resp_type:02x}] {hex_str}")
async def _wait_notifications(self, timeout: float = None, count: int = 1):
"""等待指定数量的通知"""
timeout = timeout or self.timeout
start = len(self.notifications)
deadline = time.monotonic() + timeout
while len(self.notifications) - start < count:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
self._notify_event.clear()
try:
await asyncio.wait_for(self._notify_event.wait(), timeout=remaining)
except asyncio.TimeoutError:
break
return self.notifications[start:]
async def scan_and_connect(self):
"""扫描并连接设备(支持前缀匹配)"""
print(f"正在扫描设备 '{self.device_name}*'...")
devices = await BleakScanner.discover(timeout=10.0)
device = None
for d in devices:
if d.name and d.name.startswith(self.device_name):
device = d
break
if not device:
print(f"未找到以 '{self.device_name}' 开头的设备,请确认设备已开机且处于配网模式")
return False
print(f"找到设备: {device.name} ({device.address})")
print(f"正在连接...")
self.client = BleakClient(device, timeout=15.0)
await self.client.connect()
print(f"已连接, MTU={self.client.mtu_size}")
# 验证服务
svc = self.client.services.get_service(SERVICE_UUID)
if not svc:
print(f"错误: 未发现配网服务 (UUID: 0xABF0)")
return False
print(f"发现配网服务 0xABF0")
# 启用 NOTIFY
await self.client.start_notify(CHAR_NOTIFY_UUID, self._on_notify)
print(f"NOTIFY 已启用 (0xABF2)")
await asyncio.sleep(0.3)
return True
async def send_cmd(self, cmd: int, payload: bytes = b""):
"""发送二进制命令"""
data = bytes([cmd]) + payload
hex_str = data.hex(" ")
print(f" -> [0x{cmd:02x}] {hex_str}")
await self.client.write_gatt_char(CHAR_WRITE_UUID, data, response=True)
async def provision_wifi(self, ssid: str, password: str):
"""执行 WiFi 配网流程"""
print(f"\n{'='*50}")
print(f" 开始配网: SSID=\"{ssid}\"")
print(f"{'='*50}\n")
# 第1步: 设置 SSID
print("[1/3] 发送 SSID...")
await self.send_cmd(CMD_SET_SSID, ssid.encode("utf-8"))
await asyncio.sleep(0.3)
# 第2步: 设置密码(设置密码后设备会自动发起连接)
print("[2/3] 发送密码...")
await self.send_cmd(CMD_SET_PASSWORD, password.encode("utf-8"))
# 第3步: 等待连接结果
print("[3/3] 等待WiFi连接结果 (最长30秒)...")
result = await self._wait_wifi_result(timeout=30.0)
if result is None:
print("\n超时: 未收到WiFi连接结果")
# 可尝试显式发送连接命令
print("尝试发送显式连接命令...")
await self.send_cmd(CMD_CONNECT_AP)
result = await self._wait_wifi_result(timeout=30.0)
if result is None:
print("\n配网结果: 超时,未收到设备响应")
elif result:
print("\n配网结果: WiFi 连接成功!")
else:
print("\n配网结果: WiFi 连接失败")
return result
async def _wait_wifi_result(self, timeout: float = 30.0):
"""等待 WiFi 状态通知"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
self._notify_event.clear()
try:
await asyncio.wait_for(self._notify_event.wait(), timeout=remaining)
except asyncio.TimeoutError:
break
# 检查最新通知
if self.notifications:
last = self.notifications[-1]
if len(last) >= 2 and last[0] == RESP_WIFI_STATUS:
return last[1] == 1 # 1=成功, 0=失败
return None
async def scan_wifi_list(self):
"""请求 WiFi 扫描列表"""
print(f"\n{'='*50}")
print(f" 扫描 WiFi 列表")
print(f"{'='*50}\n")
self.notifications.clear()
await self.send_cmd(CMD_GET_WIFI_LIST)
# WiFi 扫描需要时间,等待列表结束标记
print("等待扫描结果 (最长15秒)...")
deadline = time.monotonic() + 15.0
wifi_list = []
got_end = False
while time.monotonic() < deadline and not got_end:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
self._notify_event.clear()
try:
await asyncio.wait_for(self._notify_event.wait(), timeout=remaining)
except asyncio.TimeoutError:
break
# 只处理最新一条通知
if self.notifications:
n = self.notifications[-1]
if len(n) >= 3 and n[0] == RESP_WIFI_LIST:
rssi = struct.unpack("b", bytes([n[1]]))[0]
ssid_len = n[2]
ssid = n[3:3 + ssid_len].decode("utf-8", errors="replace")
wifi_list.append({"ssid": ssid, "rssi": rssi})
elif len(n) >= 1 and n[0] == RESP_WIFI_LIST_END:
got_end = True
if wifi_list:
print(f"\n扫描到 {len(wifi_list)} 个WiFi网络:")
print(f" {'序号':>4} {'RSSI':>6} {'SSID'}")
print(f" {''*4} {''*6} {''*30}")
for i, w in enumerate(wifi_list, 1):
print(f" {i:>4} {w['rssi']:>4}dBm {w['ssid']}")
else:
print("未扫描到WiFi网络")
return wifi_list
async def get_wifi_status(self):
"""查询 WiFi 状态"""
print(f"\n{'='*50}")
print(f" 查询 WiFi 状态")
print(f"{'='*50}\n")
self.notifications.clear()
await self.send_cmd(CMD_GET_WIFI_STATUS)
await self._wait_notifications(timeout=5.0, count=1)
async def disconnect(self):
"""断开连接"""
if self.client and self.client.is_connected:
try:
await self.client.stop_notify(CHAR_NOTIFY_UUID)
except Exception:
pass
await self.client.disconnect()
print("已断开BLE连接")
async def main():
parser = argparse.ArgumentParser(
description="BLE 配网协议测试脚本",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
# WiFi 配网
python tests/ble_provision_test.py --ssid "MyWiFi" --pwd "12345678"
# 扫描 WiFi 列表
python tests/ble_provision_test.py --scan-wifi
# 查询 WiFi 状态
python tests/ble_provision_test.py --get-status
"""
)
parser.add_argument("--device", default=DEFAULT_DEVICE,
help=f"BLE 设备名称 (默认: {DEFAULT_DEVICE})")
parser.add_argument("--ssid", help="要连接的 WiFi SSID")
parser.add_argument("--pwd", default="", help="WiFi 密码")
parser.add_argument("--scan-wifi", action="store_true",
help="扫描 WiFi 列表")
parser.add_argument("--get-status", action="store_true",
help="查询 WiFi 连接状态")
parser.add_argument("--timeout", type=float, default=10.0,
help="命令超时秒数 (默认: 10.0)")
args = parser.parse_args()
# 至少指定一个操作
if not args.ssid and not args.scan_wifi and not args.get_status:
parser.print_help()
print("\n请指定操作: --ssid/--scan-wifi/--get-status")
return 1
tester = BleProvisionTester(device_name=args.device, timeout=args.timeout)
# 扫描连接
if not await tester.scan_and_connect():
return 1
try:
# 执行操作
if args.scan_wifi:
await tester.scan_wifi_list()
if args.get_status:
await tester.get_wifi_status()
if args.ssid:
result = await tester.provision_wifi(args.ssid, args.pwd)
return 0 if result else 1
finally:
await tester.disconnect()
return 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code or 0)