360 lines
12 KiB
Python
360 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
BLE 配网协议测试脚本
|
|
|
|
用途: 模拟手机端,通过 BLE 与 ESP32 设备通信,测试自定义 GATT 配网协议。
|
|
依赖: pip install bleak
|
|
运行:
|
|
# 交互式配网(输入 SSID 和密码)
|
|
python tests/ble_provision_test.py --ssid "MyWiFi" --pwd "12345678"
|
|
|
|
# 扫描 WiFi 列表
|
|
python tests/ble_provision_test.py --scan-wifi
|
|
|
|
# 查询 WiFi 状态
|
|
python tests/ble_provision_test.py --get-status
|
|
|
|
# 指定设备名
|
|
python tests/ble_provision_test.py --device "Airhub_Ble" --ssid "MyWiFi" --pwd "12345678"
|
|
|
|
协议说明:
|
|
Service UUID: 0xABF0
|
|
Write Char: 0xABF1 (手机→设备, 二进制命令)
|
|
Notify Char: 0xABF2 (设备→手机, 二进制响应)
|
|
|
|
命令格式: [cmd(1字节)] + [payload...]
|
|
响应格式: [resp(1字节)] + [data...]
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import sys
|
|
import time
|
|
import struct
|
|
|
|
try:
|
|
from bleak import BleakClient, BleakScanner
|
|
from bleak.backends.characteristic import BleakGATTCharacteristic
|
|
except ImportError:
|
|
print("错误: 缺少 bleak 库,请执行: pip install bleak")
|
|
sys.exit(1)
|
|
|
|
# ============================================================
|
|
# BLE 参数定义 (与 bluetooth_provisioning.h 一致)
|
|
# ============================================================
|
|
|
|
SERVICE_UUID = "0000abf0-0000-1000-8000-00805f9b34fb"
|
|
CHAR_WRITE_UUID = "0000abf1-0000-1000-8000-00805f9b34fb"
|
|
CHAR_NOTIFY_UUID = "0000abf2-0000-1000-8000-00805f9b34fb"
|
|
DEFAULT_DEVICE = "Airhub_" # 前缀匹配,设备名格式: Airhub_xx:xx:xx:xx:xx:xx
|
|
|
|
# 命令码 (手机→设备)
|
|
CMD_SET_SSID = 0x01
|
|
CMD_SET_PASSWORD = 0x02
|
|
CMD_SET_BSSID = 0x03
|
|
CMD_CONNECT_AP = 0x04
|
|
CMD_DISCONNECT_AP = 0x05
|
|
CMD_GET_WIFI_LIST = 0x06
|
|
CMD_DISCONNECT_BLE = 0x07
|
|
CMD_SET_WIFI_MODE = 0x08
|
|
CMD_GET_WIFI_STATUS = 0x09
|
|
CMD_CUSTOM_DATA = 0x10
|
|
|
|
# 响应码 (设备→手机)
|
|
RESP_WIFI_STATUS = 0x81
|
|
RESP_WIFI_LIST = 0x82
|
|
RESP_WIFI_LIST_END = 0x83
|
|
RESP_CUSTOM_DATA = 0x84
|
|
|
|
|
|
class BleProvisionTester:
|
|
"""BLE 配网协议测试器"""
|
|
|
|
def __init__(self, device_name: str, timeout: float = 10.0):
|
|
self.device_name = device_name
|
|
self.timeout = timeout
|
|
self.client = None
|
|
self.notifications = []
|
|
self._notify_event = asyncio.Event()
|
|
|
|
def _on_notify(self, sender: BleakGATTCharacteristic, data: bytearray):
|
|
"""NOTIFY 回调"""
|
|
self.notifications.append(bytes(data))
|
|
self._notify_event.set()
|
|
self._print_notification(data)
|
|
|
|
@staticmethod
|
|
def _print_notification(data: bytearray):
|
|
"""解析并打印 NOTIFY 数据"""
|
|
if len(data) < 1:
|
|
print(f" <- [空数据]")
|
|
return
|
|
|
|
resp_type = data[0]
|
|
hex_str = data.hex(" ")
|
|
|
|
if resp_type == RESP_WIFI_STATUS:
|
|
success = data[1] if len(data) > 1 else 0
|
|
reason = data[2] if len(data) > 2 else 0
|
|
status = "成功" if success == 1 else f"失败 (原因码: {reason})"
|
|
print(f" <- [WiFi状态] {status} (raw: {hex_str})")
|
|
|
|
elif resp_type == RESP_WIFI_LIST:
|
|
if len(data) >= 3:
|
|
rssi = struct.unpack("b", bytes([data[1]]))[0] # 有符号
|
|
ssid_len = data[2]
|
|
ssid = data[3:3 + ssid_len].decode("utf-8", errors="replace")
|
|
print(f" <- [WiFi] RSSI={rssi}dBm SSID=\"{ssid}\"")
|
|
else:
|
|
print(f" <- [WiFi列表] 数据不完整 (raw: {hex_str})")
|
|
|
|
elif resp_type == RESP_WIFI_LIST_END:
|
|
print(f" <- [WiFi列表结束]")
|
|
|
|
elif resp_type == RESP_CUSTOM_DATA:
|
|
payload = data[1:]
|
|
print(f" <- [自定义数据] {payload.hex(' ')} text=\"{payload.decode('utf-8', errors='replace')}\"")
|
|
|
|
else:
|
|
print(f" <- [未知响应 0x{resp_type:02x}] {hex_str}")
|
|
|
|
async def _wait_notifications(self, timeout: float = None, count: int = 1):
|
|
"""等待指定数量的通知"""
|
|
timeout = timeout or self.timeout
|
|
start = len(self.notifications)
|
|
deadline = time.monotonic() + timeout
|
|
while len(self.notifications) - start < count:
|
|
remaining = deadline - time.monotonic()
|
|
if remaining <= 0:
|
|
break
|
|
self._notify_event.clear()
|
|
try:
|
|
await asyncio.wait_for(self._notify_event.wait(), timeout=remaining)
|
|
except asyncio.TimeoutError:
|
|
break
|
|
return self.notifications[start:]
|
|
|
|
async def scan_and_connect(self):
|
|
"""扫描并连接设备(支持前缀匹配)"""
|
|
print(f"正在扫描设备 '{self.device_name}*'...")
|
|
devices = await BleakScanner.discover(timeout=10.0)
|
|
device = None
|
|
for d in devices:
|
|
if d.name and d.name.startswith(self.device_name):
|
|
device = d
|
|
break
|
|
if not device:
|
|
print(f"未找到以 '{self.device_name}' 开头的设备,请确认设备已开机且处于配网模式")
|
|
return False
|
|
|
|
print(f"找到设备: {device.name} ({device.address})")
|
|
print(f"正在连接...")
|
|
|
|
self.client = BleakClient(device, timeout=15.0)
|
|
await self.client.connect()
|
|
print(f"已连接, MTU={self.client.mtu_size}")
|
|
|
|
# 验证服务
|
|
svc = self.client.services.get_service(SERVICE_UUID)
|
|
if not svc:
|
|
print(f"错误: 未发现配网服务 (UUID: 0xABF0)")
|
|
return False
|
|
print(f"发现配网服务 0xABF0")
|
|
|
|
# 启用 NOTIFY
|
|
await self.client.start_notify(CHAR_NOTIFY_UUID, self._on_notify)
|
|
print(f"NOTIFY 已启用 (0xABF2)")
|
|
await asyncio.sleep(0.3)
|
|
return True
|
|
|
|
async def send_cmd(self, cmd: int, payload: bytes = b""):
|
|
"""发送二进制命令"""
|
|
data = bytes([cmd]) + payload
|
|
hex_str = data.hex(" ")
|
|
print(f" -> [0x{cmd:02x}] {hex_str}")
|
|
await self.client.write_gatt_char(CHAR_WRITE_UUID, data, response=True)
|
|
|
|
async def provision_wifi(self, ssid: str, password: str):
|
|
"""执行 WiFi 配网流程"""
|
|
print(f"\n{'='*50}")
|
|
print(f" 开始配网: SSID=\"{ssid}\"")
|
|
print(f"{'='*50}\n")
|
|
|
|
# 第1步: 设置 SSID
|
|
print("[1/3] 发送 SSID...")
|
|
await self.send_cmd(CMD_SET_SSID, ssid.encode("utf-8"))
|
|
await asyncio.sleep(0.3)
|
|
|
|
# 第2步: 设置密码(设置密码后设备会自动发起连接)
|
|
print("[2/3] 发送密码...")
|
|
await self.send_cmd(CMD_SET_PASSWORD, password.encode("utf-8"))
|
|
|
|
# 第3步: 等待连接结果
|
|
print("[3/3] 等待WiFi连接结果 (最长30秒)...")
|
|
result = await self._wait_wifi_result(timeout=30.0)
|
|
|
|
if result is None:
|
|
print("\n超时: 未收到WiFi连接结果")
|
|
# 可尝试显式发送连接命令
|
|
print("尝试发送显式连接命令...")
|
|
await self.send_cmd(CMD_CONNECT_AP)
|
|
result = await self._wait_wifi_result(timeout=30.0)
|
|
|
|
if result is None:
|
|
print("\n配网结果: 超时,未收到设备响应")
|
|
elif result:
|
|
print("\n配网结果: WiFi 连接成功!")
|
|
else:
|
|
print("\n配网结果: WiFi 连接失败")
|
|
|
|
return result
|
|
|
|
async def _wait_wifi_result(self, timeout: float = 30.0):
|
|
"""等待 WiFi 状态通知"""
|
|
deadline = time.monotonic() + timeout
|
|
while time.monotonic() < deadline:
|
|
remaining = deadline - time.monotonic()
|
|
if remaining <= 0:
|
|
break
|
|
self._notify_event.clear()
|
|
try:
|
|
await asyncio.wait_for(self._notify_event.wait(), timeout=remaining)
|
|
except asyncio.TimeoutError:
|
|
break
|
|
# 检查最新通知
|
|
if self.notifications:
|
|
last = self.notifications[-1]
|
|
if len(last) >= 2 and last[0] == RESP_WIFI_STATUS:
|
|
return last[1] == 1 # 1=成功, 0=失败
|
|
return None
|
|
|
|
async def scan_wifi_list(self):
|
|
"""请求 WiFi 扫描列表"""
|
|
print(f"\n{'='*50}")
|
|
print(f" 扫描 WiFi 列表")
|
|
print(f"{'='*50}\n")
|
|
|
|
self.notifications.clear()
|
|
await self.send_cmd(CMD_GET_WIFI_LIST)
|
|
|
|
# WiFi 扫描需要时间,等待列表结束标记
|
|
print("等待扫描结果 (最长15秒)...")
|
|
deadline = time.monotonic() + 15.0
|
|
wifi_list = []
|
|
got_end = False
|
|
|
|
while time.monotonic() < deadline and not got_end:
|
|
remaining = deadline - time.monotonic()
|
|
if remaining <= 0:
|
|
break
|
|
self._notify_event.clear()
|
|
try:
|
|
await asyncio.wait_for(self._notify_event.wait(), timeout=remaining)
|
|
except asyncio.TimeoutError:
|
|
break
|
|
|
|
# 只处理最新一条通知
|
|
if self.notifications:
|
|
n = self.notifications[-1]
|
|
if len(n) >= 3 and n[0] == RESP_WIFI_LIST:
|
|
rssi = struct.unpack("b", bytes([n[1]]))[0]
|
|
ssid_len = n[2]
|
|
ssid = n[3:3 + ssid_len].decode("utf-8", errors="replace")
|
|
wifi_list.append({"ssid": ssid, "rssi": rssi})
|
|
elif len(n) >= 1 and n[0] == RESP_WIFI_LIST_END:
|
|
got_end = True
|
|
|
|
if wifi_list:
|
|
print(f"\n扫描到 {len(wifi_list)} 个WiFi网络:")
|
|
print(f" {'序号':>4} {'RSSI':>6} {'SSID'}")
|
|
print(f" {'─'*4} {'─'*6} {'─'*30}")
|
|
for i, w in enumerate(wifi_list, 1):
|
|
print(f" {i:>4} {w['rssi']:>4}dBm {w['ssid']}")
|
|
else:
|
|
print("未扫描到WiFi网络")
|
|
|
|
return wifi_list
|
|
|
|
async def get_wifi_status(self):
|
|
"""查询 WiFi 状态"""
|
|
print(f"\n{'='*50}")
|
|
print(f" 查询 WiFi 状态")
|
|
print(f"{'='*50}\n")
|
|
|
|
self.notifications.clear()
|
|
await self.send_cmd(CMD_GET_WIFI_STATUS)
|
|
await self._wait_notifications(timeout=5.0, count=1)
|
|
|
|
async def disconnect(self):
|
|
"""断开连接"""
|
|
if self.client and self.client.is_connected:
|
|
try:
|
|
await self.client.stop_notify(CHAR_NOTIFY_UUID)
|
|
except Exception:
|
|
pass
|
|
await self.client.disconnect()
|
|
print("已断开BLE连接")
|
|
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="BLE 配网协议测试脚本",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
使用示例:
|
|
# WiFi 配网
|
|
python tests/ble_provision_test.py --ssid "MyWiFi" --pwd "12345678"
|
|
|
|
# 扫描 WiFi 列表
|
|
python tests/ble_provision_test.py --scan-wifi
|
|
|
|
# 查询 WiFi 状态
|
|
python tests/ble_provision_test.py --get-status
|
|
"""
|
|
)
|
|
parser.add_argument("--device", default=DEFAULT_DEVICE,
|
|
help=f"BLE 设备名称 (默认: {DEFAULT_DEVICE})")
|
|
parser.add_argument("--ssid", help="要连接的 WiFi SSID")
|
|
parser.add_argument("--pwd", default="", help="WiFi 密码")
|
|
parser.add_argument("--scan-wifi", action="store_true",
|
|
help="扫描 WiFi 列表")
|
|
parser.add_argument("--get-status", action="store_true",
|
|
help="查询 WiFi 连接状态")
|
|
parser.add_argument("--timeout", type=float, default=10.0,
|
|
help="命令超时秒数 (默认: 10.0)")
|
|
args = parser.parse_args()
|
|
|
|
# 至少指定一个操作
|
|
if not args.ssid and not args.scan_wifi and not args.get_status:
|
|
parser.print_help()
|
|
print("\n请指定操作: --ssid/--scan-wifi/--get-status")
|
|
return 1
|
|
|
|
tester = BleProvisionTester(device_name=args.device, timeout=args.timeout)
|
|
|
|
# 扫描连接
|
|
if not await tester.scan_and_connect():
|
|
return 1
|
|
|
|
try:
|
|
# 执行操作
|
|
if args.scan_wifi:
|
|
await tester.scan_wifi_list()
|
|
|
|
if args.get_status:
|
|
await tester.get_wifi_status()
|
|
|
|
if args.ssid:
|
|
result = await tester.provision_wifi(args.ssid, args.pwd)
|
|
return 0 if result else 1
|
|
|
|
finally:
|
|
await tester.disconnect()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit_code = asyncio.run(main())
|
|
sys.exit(exit_code or 0)
|