#!/usr/bin/env python3 """ BLE 配网协议测试脚本 用途: 模拟手机端,通过 BLE 与 ESP32 设备通信,测试自定义 GATT 配网协议。 依赖: pip install bleak 运行: # 交互式配网(输入 SSID 和密码) python tests/ble_provision_test.py --ssid "MyWiFi" --pwd "12345678" # 扫描 WiFi 列表 python tests/ble_provision_test.py --scan-wifi # 查询 WiFi 状态 python tests/ble_provision_test.py --get-status # 指定设备名 python tests/ble_provision_test.py --device "Airhub_Ble" --ssid "MyWiFi" --pwd "12345678" 协议说明: Service UUID: 0xABF0 Write Char: 0xABF1 (手机→设备, 二进制命令) Notify Char: 0xABF2 (设备→手机, 二进制响应) 命令格式: [cmd(1字节)] + [payload...] 响应格式: [resp(1字节)] + [data...] """ import argparse import asyncio import sys import time import struct try: from bleak import BleakClient, BleakScanner from bleak.backends.characteristic import BleakGATTCharacteristic except ImportError: print("错误: 缺少 bleak 库,请执行: pip install bleak") sys.exit(1) # ============================================================ # BLE 参数定义 (与 bluetooth_provisioning.h 一致) # ============================================================ SERVICE_UUID = "0000abf0-0000-1000-8000-00805f9b34fb" CHAR_WRITE_UUID = "0000abf1-0000-1000-8000-00805f9b34fb" CHAR_NOTIFY_UUID = "0000abf2-0000-1000-8000-00805f9b34fb" DEFAULT_DEVICE = "Airhub_" # 前缀匹配,设备名格式: Airhub_xx:xx:xx:xx:xx:xx # 命令码 (手机→设备) CMD_SET_SSID = 0x01 CMD_SET_PASSWORD = 0x02 CMD_SET_BSSID = 0x03 CMD_CONNECT_AP = 0x04 CMD_DISCONNECT_AP = 0x05 CMD_GET_WIFI_LIST = 0x06 CMD_DISCONNECT_BLE = 0x07 CMD_SET_WIFI_MODE = 0x08 CMD_GET_WIFI_STATUS = 0x09 CMD_CUSTOM_DATA = 0x10 # 响应码 (设备→手机) RESP_WIFI_STATUS = 0x81 RESP_WIFI_LIST = 0x82 RESP_WIFI_LIST_END = 0x83 RESP_CUSTOM_DATA = 0x84 class BleProvisionTester: """BLE 配网协议测试器""" def __init__(self, device_name: str, timeout: float = 10.0): self.device_name = device_name self.timeout = timeout self.client = None self.notifications = [] self._notify_event = asyncio.Event() def _on_notify(self, sender: BleakGATTCharacteristic, data: bytearray): """NOTIFY 回调""" self.notifications.append(bytes(data)) self._notify_event.set() self._print_notification(data) @staticmethod def _print_notification(data: bytearray): """解析并打印 NOTIFY 数据""" if len(data) < 1: print(f" <- [空数据]") return resp_type = data[0] hex_str = data.hex(" ") if resp_type == RESP_WIFI_STATUS: success = data[1] if len(data) > 1 else 0 reason = data[2] if len(data) > 2 else 0 status = "成功" if success == 1 else f"失败 (原因码: {reason})" print(f" <- [WiFi状态] {status} (raw: {hex_str})") elif resp_type == RESP_WIFI_LIST: if len(data) >= 3: rssi = struct.unpack("b", bytes([data[1]]))[0] # 有符号 ssid_len = data[2] ssid = data[3:3 + ssid_len].decode("utf-8", errors="replace") print(f" <- [WiFi] RSSI={rssi}dBm SSID=\"{ssid}\"") else: print(f" <- [WiFi列表] 数据不完整 (raw: {hex_str})") elif resp_type == RESP_WIFI_LIST_END: print(f" <- [WiFi列表结束]") elif resp_type == RESP_CUSTOM_DATA: payload = data[1:] print(f" <- [自定义数据] {payload.hex(' ')} text=\"{payload.decode('utf-8', errors='replace')}\"") else: print(f" <- [未知响应 0x{resp_type:02x}] {hex_str}") async def _wait_notifications(self, timeout: float = None, count: int = 1): """等待指定数量的通知""" timeout = timeout or self.timeout start = len(self.notifications) deadline = time.monotonic() + timeout while len(self.notifications) - start < count: remaining = deadline - time.monotonic() if remaining <= 0: break self._notify_event.clear() try: await asyncio.wait_for(self._notify_event.wait(), timeout=remaining) except asyncio.TimeoutError: break return self.notifications[start:] async def scan_and_connect(self): """扫描并连接设备(支持前缀匹配)""" print(f"正在扫描设备 '{self.device_name}*'...") devices = await BleakScanner.discover(timeout=10.0) device = None for d in devices: if d.name and d.name.startswith(self.device_name): device = d break if not device: print(f"未找到以 '{self.device_name}' 开头的设备,请确认设备已开机且处于配网模式") return False print(f"找到设备: {device.name} ({device.address})") print(f"正在连接...") self.client = BleakClient(device, timeout=15.0) await self.client.connect() print(f"已连接, MTU={self.client.mtu_size}") # 验证服务 svc = self.client.services.get_service(SERVICE_UUID) if not svc: print(f"错误: 未发现配网服务 (UUID: 0xABF0)") return False print(f"发现配网服务 0xABF0") # 启用 NOTIFY await self.client.start_notify(CHAR_NOTIFY_UUID, self._on_notify) print(f"NOTIFY 已启用 (0xABF2)") await asyncio.sleep(0.3) return True async def send_cmd(self, cmd: int, payload: bytes = b""): """发送二进制命令""" data = bytes([cmd]) + payload hex_str = data.hex(" ") print(f" -> [0x{cmd:02x}] {hex_str}") await self.client.write_gatt_char(CHAR_WRITE_UUID, data, response=True) async def provision_wifi(self, ssid: str, password: str): """执行 WiFi 配网流程""" print(f"\n{'='*50}") print(f" 开始配网: SSID=\"{ssid}\"") print(f"{'='*50}\n") # 第1步: 设置 SSID print("[1/3] 发送 SSID...") await self.send_cmd(CMD_SET_SSID, ssid.encode("utf-8")) await asyncio.sleep(0.3) # 第2步: 设置密码(设置密码后设备会自动发起连接) print("[2/3] 发送密码...") await self.send_cmd(CMD_SET_PASSWORD, password.encode("utf-8")) # 第3步: 等待连接结果 print("[3/3] 等待WiFi连接结果 (最长30秒)...") result = await self._wait_wifi_result(timeout=30.0) if result is None: print("\n超时: 未收到WiFi连接结果") # 可尝试显式发送连接命令 print("尝试发送显式连接命令...") await self.send_cmd(CMD_CONNECT_AP) result = await self._wait_wifi_result(timeout=30.0) if result is None: print("\n配网结果: 超时,未收到设备响应") elif result: print("\n配网结果: WiFi 连接成功!") else: print("\n配网结果: WiFi 连接失败") return result async def _wait_wifi_result(self, timeout: float = 30.0): """等待 WiFi 状态通知""" deadline = time.monotonic() + timeout while time.monotonic() < deadline: remaining = deadline - time.monotonic() if remaining <= 0: break self._notify_event.clear() try: await asyncio.wait_for(self._notify_event.wait(), timeout=remaining) except asyncio.TimeoutError: break # 检查最新通知 if self.notifications: last = self.notifications[-1] if len(last) >= 2 and last[0] == RESP_WIFI_STATUS: return last[1] == 1 # 1=成功, 0=失败 return None async def scan_wifi_list(self): """请求 WiFi 扫描列表""" print(f"\n{'='*50}") print(f" 扫描 WiFi 列表") print(f"{'='*50}\n") self.notifications.clear() await self.send_cmd(CMD_GET_WIFI_LIST) # WiFi 扫描需要时间,等待列表结束标记 print("等待扫描结果 (最长15秒)...") deadline = time.monotonic() + 15.0 wifi_list = [] got_end = False while time.monotonic() < deadline and not got_end: remaining = deadline - time.monotonic() if remaining <= 0: break self._notify_event.clear() try: await asyncio.wait_for(self._notify_event.wait(), timeout=remaining) except asyncio.TimeoutError: break # 只处理最新一条通知 if self.notifications: n = self.notifications[-1] if len(n) >= 3 and n[0] == RESP_WIFI_LIST: rssi = struct.unpack("b", bytes([n[1]]))[0] ssid_len = n[2] ssid = n[3:3 + ssid_len].decode("utf-8", errors="replace") wifi_list.append({"ssid": ssid, "rssi": rssi}) elif len(n) >= 1 and n[0] == RESP_WIFI_LIST_END: got_end = True if wifi_list: print(f"\n扫描到 {len(wifi_list)} 个WiFi网络:") print(f" {'序号':>4} {'RSSI':>6} {'SSID'}") print(f" {'─'*4} {'─'*6} {'─'*30}") for i, w in enumerate(wifi_list, 1): print(f" {i:>4} {w['rssi']:>4}dBm {w['ssid']}") else: print("未扫描到WiFi网络") return wifi_list async def get_wifi_status(self): """查询 WiFi 状态""" print(f"\n{'='*50}") print(f" 查询 WiFi 状态") print(f"{'='*50}\n") self.notifications.clear() await self.send_cmd(CMD_GET_WIFI_STATUS) await self._wait_notifications(timeout=5.0, count=1) async def disconnect(self): """断开连接""" if self.client and self.client.is_connected: try: await self.client.stop_notify(CHAR_NOTIFY_UUID) except Exception: pass await self.client.disconnect() print("已断开BLE连接") async def main(): parser = argparse.ArgumentParser( description="BLE 配网协议测试脚本", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: # WiFi 配网 python tests/ble_provision_test.py --ssid "MyWiFi" --pwd "12345678" # 扫描 WiFi 列表 python tests/ble_provision_test.py --scan-wifi # 查询 WiFi 状态 python tests/ble_provision_test.py --get-status """ ) parser.add_argument("--device", default=DEFAULT_DEVICE, help=f"BLE 设备名称 (默认: {DEFAULT_DEVICE})") parser.add_argument("--ssid", help="要连接的 WiFi SSID") parser.add_argument("--pwd", default="", help="WiFi 密码") parser.add_argument("--scan-wifi", action="store_true", help="扫描 WiFi 列表") parser.add_argument("--get-status", action="store_true", help="查询 WiFi 连接状态") parser.add_argument("--timeout", type=float, default=10.0, help="命令超时秒数 (默认: 10.0)") args = parser.parse_args() # 至少指定一个操作 if not args.ssid and not args.scan_wifi and not args.get_status: parser.print_help() print("\n请指定操作: --ssid/--scan-wifi/--get-status") return 1 tester = BleProvisionTester(device_name=args.device, timeout=args.timeout) # 扫描连接 if not await tester.scan_and_connect(): return 1 try: # 执行操作 if args.scan_wifi: await tester.scan_wifi_list() if args.get_status: await tester.get_wifi_status() if args.ssid: result = await tester.provision_wifi(args.ssid, args.pwd) return 0 if result else 1 finally: await tester.disconnect() return 0 if __name__ == "__main__": exit_code = asyncio.run(main()) sys.exit(exit_code or 0)