491 lines
17 KiB
Python
491 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
BLE JSON 通讯模块测试脚本
|
|
|
|
用途: 模拟 APP 端,通过 BLE 与 ESP32 设备通信,验证所有 JSON 命令的收发。
|
|
依赖: pip install bleak
|
|
运行: python tests/ble_json_test.py [--device DEVICE_NAME] [--timeout SECONDS]
|
|
|
|
测试覆盖:
|
|
1. 设备扫描与连接
|
|
2. GATT Service/Characteristic 发现
|
|
3. NOTIFY 启用
|
|
4. 所有 JSON 命令的请求-响应验证
|
|
5. 错误处理 (非法 JSON、缺少参数、未知命令)
|
|
6. msg_id 关联性验证
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
try:
|
|
from bleak import BleakClient, BleakScanner
|
|
from bleak.backends.characteristic import BleakGATTCharacteristic
|
|
except ImportError:
|
|
print("错误: 缺少 bleak 库,请执行: pip install bleak")
|
|
sys.exit(1)
|
|
|
|
# ============================================================
|
|
# BLE 参数定义 (与 ble_service_config.h 一致)
|
|
# ============================================================
|
|
|
|
SERVICE_UUID = "0000ab00-0000-1000-8000-00805f9b34fb"
|
|
CHAR_WRITE_UUID = "0000ab01-0000-1000-8000-00805f9b34fb"
|
|
CHAR_NOTIFY_UUID = "0000ab02-0000-1000-8000-00805f9b34fb"
|
|
CHAR_STATUS_UUID = "0000ab03-0000-1000-8000-00805f9b34fb"
|
|
DEFAULT_DEVICE_NAME = "Kapi_BLE"
|
|
|
|
# ============================================================
|
|
# 测试框架
|
|
# ============================================================
|
|
|
|
@dataclass
|
|
class TestResult:
|
|
name: str
|
|
passed: bool
|
|
detail: str = ""
|
|
duration_ms: float = 0.0
|
|
|
|
|
|
class BleJsonTester:
|
|
"""BLE JSON 通讯测试器"""
|
|
|
|
def __init__(self, device_name: str, timeout: float = 5.0):
|
|
self.device_name = device_name
|
|
self.timeout = timeout
|
|
self.client: Optional[BleakClient] = None
|
|
self.responses: list = []
|
|
self._response_event = asyncio.Event()
|
|
self._msg_id_counter = 0
|
|
self.results: list[TestResult] = []
|
|
|
|
def _next_msg_id(self) -> int:
|
|
self._msg_id_counter += 1
|
|
return self._msg_id_counter
|
|
|
|
def _on_notify(self, sender: BleakGATTCharacteristic, data: bytearray):
|
|
"""NOTIFY 回调: 接收设备返回的 JSON 数据"""
|
|
try:
|
|
text = data.decode("utf-8")
|
|
parsed = json.loads(text)
|
|
self.responses.append(parsed)
|
|
self._response_event.set()
|
|
except (UnicodeDecodeError, json.JSONDecodeError) as e:
|
|
self.responses.append({"_raw": data.hex(), "_error": str(e)})
|
|
self._response_event.set()
|
|
|
|
async def _send_cmd(self, cmd: str, data: Optional[dict] = None,
|
|
msg_id: Optional[int] = None, raw: Optional[str] = None) -> Optional[dict]:
|
|
"""
|
|
发送 JSON 命令并等待响应。
|
|
|
|
Args:
|
|
cmd: 命令名
|
|
data: 命令数据 (可选)
|
|
msg_id: 消息 ID (可选,自动生成)
|
|
raw: 直接发送原始字符串 (跳过 JSON 构建)
|
|
|
|
Returns:
|
|
解析后的 JSON 响应,超时返回 None
|
|
"""
|
|
if msg_id is None:
|
|
msg_id = self._next_msg_id()
|
|
|
|
# 清空之前的响应
|
|
self.responses.clear()
|
|
self._response_event.clear()
|
|
|
|
# 构建并发送
|
|
if raw is not None:
|
|
payload = raw.encode("utf-8")
|
|
else:
|
|
request = {"cmd": cmd, "id": msg_id}
|
|
if data is not None:
|
|
request["data"] = data
|
|
payload = json.dumps(request, separators=(",", ":")).encode("utf-8")
|
|
|
|
await self.client.write_gatt_char(CHAR_WRITE_UUID, payload, response=True)
|
|
|
|
# 等待响应
|
|
try:
|
|
await asyncio.wait_for(self._response_event.wait(), timeout=self.timeout)
|
|
except asyncio.TimeoutError:
|
|
return None
|
|
|
|
return self.responses[-1] if self.responses else None
|
|
|
|
def _record(self, name: str, passed: bool, detail: str = "", duration_ms: float = 0.0):
|
|
self.results.append(TestResult(name, passed, detail, duration_ms))
|
|
status = "PASS" if passed else "FAIL"
|
|
symbol = "+" if passed else "x"
|
|
print(f" [{symbol}] {name}: {status} {detail}")
|
|
|
|
# ============================================================
|
|
# 测试用例
|
|
# ============================================================
|
|
|
|
async def test_scan_device(self):
|
|
"""测试 1: 扫描 BLE 设备"""
|
|
t0 = time.monotonic()
|
|
print(f"\n正在扫描设备 '{self.device_name}'...")
|
|
device = await BleakScanner.find_device_by_name(
|
|
self.device_name, timeout=10.0
|
|
)
|
|
dt = (time.monotonic() - t0) * 1000
|
|
|
|
if device:
|
|
self._record("test_scan_device", True,
|
|
f"找到设备 addr={device.address}", dt)
|
|
return device
|
|
else:
|
|
self._record("test_scan_device", False,
|
|
"未找到设备,请确认设备已开机且蓝牙广播中", dt)
|
|
return None
|
|
|
|
async def test_discover_service(self):
|
|
"""测试 2: 发现 GATT Service 和 Characteristic"""
|
|
t0 = time.monotonic()
|
|
services = self.client.services
|
|
|
|
svc = services.get_service(SERVICE_UUID)
|
|
if not svc:
|
|
self._record("test_discover_service", False,
|
|
f"未发现 Service {SERVICE_UUID}")
|
|
return False
|
|
|
|
chars_found = []
|
|
for uuid in [CHAR_WRITE_UUID, CHAR_NOTIFY_UUID, CHAR_STATUS_UUID]:
|
|
char = svc.get_characteristic(uuid)
|
|
if char:
|
|
chars_found.append(uuid.split("-")[0][-4:].upper())
|
|
else:
|
|
self._record("test_discover_service", False,
|
|
f"缺少 Characteristic {uuid}")
|
|
return False
|
|
|
|
dt = (time.monotonic() - t0) * 1000
|
|
self._record("test_discover_service", True,
|
|
f"Service 0xAB00, Chars: {chars_found}", dt)
|
|
return True
|
|
|
|
async def test_enable_notify(self):
|
|
"""测试 3: 启用 NOTIFY"""
|
|
t0 = time.monotonic()
|
|
try:
|
|
await self.client.start_notify(CHAR_NOTIFY_UUID, self._on_notify)
|
|
dt = (time.monotonic() - t0) * 1000
|
|
self._record("test_enable_notify", True,
|
|
"NOTIFY 已启用 (CCCD 写入成功)", dt)
|
|
return True
|
|
except Exception as e:
|
|
dt = (time.monotonic() - t0) * 1000
|
|
self._record("test_enable_notify", False, str(e), dt)
|
|
return False
|
|
|
|
async def test_ping(self):
|
|
"""测试 4: ping/pong 连通性测试"""
|
|
t0 = time.monotonic()
|
|
msg_id = self._next_msg_id()
|
|
resp = await self._send_cmd("ping", msg_id=msg_id)
|
|
dt = (time.monotonic() - t0) * 1000
|
|
|
|
if resp is None:
|
|
self._record("test_ping", False, "超时未收到响应", dt)
|
|
return
|
|
|
|
ok = (resp.get("cmd") == "ping"
|
|
and resp.get("code") == 0
|
|
and resp.get("msg") == "pong"
|
|
and resp.get("id") == msg_id)
|
|
|
|
self._record("test_ping", ok,
|
|
f"响应: {json.dumps(resp, ensure_ascii=False)}", dt)
|
|
|
|
async def test_status(self):
|
|
"""测试 5: 查询设备状态"""
|
|
t0 = time.monotonic()
|
|
resp = await self._send_cmd("status")
|
|
dt = (time.monotonic() - t0) * 1000
|
|
|
|
if resp is None:
|
|
self._record("test_status", False, "超时未收到响应", dt)
|
|
return
|
|
|
|
data = resp.get("data", {})
|
|
has_state = "s" in data
|
|
code_ok = resp.get("code") == 0
|
|
valid_states = ["unknown", "starting", "configuring", "idle",
|
|
"connecting", "listening", "speaking", "dialog",
|
|
"upgrading", "activating", "fatal_error"]
|
|
state_ok = data.get("s") in valid_states
|
|
|
|
ok = code_ok and has_state and state_ok
|
|
detail = f"state={data.get('s')}, bat={data.get('bat')}, vol={data.get('vol')}"
|
|
self._record("test_status", ok, detail, dt)
|
|
|
|
async def test_dev_info(self):
|
|
"""测试 6: 查询设备信息"""
|
|
t0 = time.monotonic()
|
|
resp = await self._send_cmd("dev_info")
|
|
dt = (time.monotonic() - t0) * 1000
|
|
|
|
if resp is None:
|
|
self._record("test_dev_info", False, "超时未收到响应", dt)
|
|
return
|
|
|
|
data = resp.get("data", {})
|
|
required_fields = ["mac", "board", "fw", "chip"]
|
|
missing = [f for f in required_fields if f not in data]
|
|
|
|
ok = resp.get("code") == 0 and len(missing) == 0
|
|
if ok:
|
|
detail = f"mac={data['mac']}, board={data['board']}, fw={data['fw']}, chip={data['chip']}"
|
|
else:
|
|
detail = f"缺少字段: {missing}, 响应: {json.dumps(resp, ensure_ascii=False)}"
|
|
self._record("test_dev_info", ok, detail, dt)
|
|
|
|
async def test_wifi_list(self):
|
|
"""测试 7: WiFi 扫描列表"""
|
|
t0 = time.monotonic()
|
|
# WiFi 扫描是阻塞的,给更长超时
|
|
old_timeout = self.timeout
|
|
self.timeout = 15.0
|
|
resp = await self._send_cmd("wifi_list")
|
|
self.timeout = old_timeout
|
|
dt = (time.monotonic() - t0) * 1000
|
|
|
|
if resp is None:
|
|
self._record("test_wifi_list", False, "超时未收到响应 (WiFi扫描可能耗时较长)", dt)
|
|
return
|
|
|
|
data = resp.get("data", {})
|
|
wifi_list = data.get("list", [])
|
|
code_ok = resp.get("code") == 0
|
|
is_list = isinstance(wifi_list, list)
|
|
|
|
ok = code_ok and is_list
|
|
if ok and len(wifi_list) > 0:
|
|
first = wifi_list[0]
|
|
has_fields = "ssid" in first and "rssi" in first
|
|
ok = ok and has_fields
|
|
detail = f"扫描到 {len(wifi_list)} 个网络"
|
|
if len(wifi_list) > 0:
|
|
detail += f", 第1个: ssid={first.get('ssid')}, rssi={first.get('rssi')}"
|
|
else:
|
|
detail = f"扫描到 {len(wifi_list)} 个网络"
|
|
|
|
self._record("test_wifi_list", ok, detail, dt)
|
|
|
|
async def test_set_vol(self):
|
|
"""测试 8: 设置音量并验证"""
|
|
# 先获取当前音量
|
|
resp0 = await self._send_cmd("status")
|
|
original_vol = resp0.get("data", {}).get("vol", 50) if resp0 else 50
|
|
|
|
# 设置新音量
|
|
target_vol = 35 if original_vol != 35 else 65
|
|
t0 = time.monotonic()
|
|
resp = await self._send_cmd("set_vol", {"vol": target_vol})
|
|
dt = (time.monotonic() - t0) * 1000
|
|
|
|
if resp is None:
|
|
self._record("test_set_vol", False, "超时未收到响应", dt)
|
|
return
|
|
|
|
set_ok = resp.get("code") == 0
|
|
|
|
# 验证: 再查一次 status 确认音量变化
|
|
await asyncio.sleep(0.3)
|
|
resp2 = await self._send_cmd("status")
|
|
new_vol = resp2.get("data", {}).get("vol") if resp2 else None
|
|
verify_ok = new_vol == target_vol
|
|
|
|
ok = set_ok and verify_ok
|
|
detail = f"设置 vol={target_vol}, set_ok={set_ok}, 验证 vol={new_vol}"
|
|
self._record("test_set_vol", ok, detail, dt)
|
|
|
|
# 恢复原音量
|
|
await self._send_cmd("set_vol", {"vol": original_vol})
|
|
|
|
async def test_set_wifi_missing_ssid(self):
|
|
"""测试 9: set_wifi 缺少 ssid 参数"""
|
|
t0 = time.monotonic()
|
|
resp = await self._send_cmd("set_wifi", {"pwd": "12345678"})
|
|
dt = (time.monotonic() - t0) * 1000
|
|
|
|
if resp is None:
|
|
self._record("test_set_wifi_missing_ssid", False, "超时未收到响应", dt)
|
|
return
|
|
|
|
ok = resp.get("code") == -1 and "ssid" in resp.get("msg", "").lower()
|
|
self._record("test_set_wifi_missing_ssid", ok,
|
|
f"code={resp.get('code')}, msg={resp.get('msg')}", dt)
|
|
|
|
async def test_unknown_cmd(self):
|
|
"""测试 10: 未知命令"""
|
|
t0 = time.monotonic()
|
|
resp = await self._send_cmd("this_cmd_does_not_exist")
|
|
dt = (time.monotonic() - t0) * 1000
|
|
|
|
if resp is None:
|
|
self._record("test_unknown_cmd", False, "超时未收到响应", dt)
|
|
return
|
|
|
|
ok = resp.get("code") == -99
|
|
self._record("test_unknown_cmd", ok,
|
|
f"code={resp.get('code')}, msg={resp.get('msg')}", dt)
|
|
|
|
async def test_invalid_json(self):
|
|
"""测试 11: 发送非法 JSON 数据"""
|
|
t0 = time.monotonic()
|
|
resp = await self._send_cmd("", raw="{this is not valid json!}")
|
|
dt = (time.monotonic() - t0) * 1000
|
|
|
|
if resp is None:
|
|
self._record("test_invalid_json", False, "超时未收到响应 (可能设备未发送错误通知)", dt)
|
|
return
|
|
|
|
# 设备应返回 error 响应
|
|
ok = resp.get("code", 0) != 0 or resp.get("cmd") == "error"
|
|
self._record("test_invalid_json", ok,
|
|
f"响应: {json.dumps(resp, ensure_ascii=False)}", dt)
|
|
|
|
async def test_msg_id_correlation(self):
|
|
"""测试 12: msg_id 关联性验证 — 连续发送多个命令,验证每个响应的 id 正确"""
|
|
t0 = time.monotonic()
|
|
ids = [100, 200, 300]
|
|
results = []
|
|
|
|
for mid in ids:
|
|
self.responses.clear()
|
|
self._response_event.clear()
|
|
|
|
request = {"cmd": "ping", "id": mid}
|
|
payload = json.dumps(request, separators=(",", ":")).encode("utf-8")
|
|
await self.client.write_gatt_char(CHAR_WRITE_UUID, payload, response=True)
|
|
|
|
try:
|
|
await asyncio.wait_for(self._response_event.wait(), timeout=self.timeout)
|
|
resp = self.responses[-1] if self.responses else None
|
|
except asyncio.TimeoutError:
|
|
resp = None
|
|
|
|
if resp:
|
|
results.append((mid, resp.get("id")))
|
|
else:
|
|
results.append((mid, None))
|
|
|
|
await asyncio.sleep(0.2)
|
|
|
|
dt = (time.monotonic() - t0) * 1000
|
|
all_match = all(sent == recv for sent, recv in results)
|
|
detail = ", ".join(f"sent={s}->recv={r}" for s, r in results)
|
|
self._record("test_msg_id_correlation", all_match, detail, dt)
|
|
|
|
# ============================================================
|
|
# 主运行流程
|
|
# ============================================================
|
|
|
|
async def run(self):
|
|
"""执行全部测试"""
|
|
print("=" * 60)
|
|
print(" BLE JSON 通讯模块 - 自动化测试")
|
|
print("=" * 60)
|
|
|
|
# Step 1: 扫描
|
|
device = await self.test_scan_device()
|
|
if not device:
|
|
self._print_summary()
|
|
return
|
|
|
|
# Step 2: 连接
|
|
print(f"\n正在连接 {device.address}...")
|
|
try:
|
|
self.client = BleakClient(device, timeout=10.0)
|
|
await self.client.connect()
|
|
print(f" 已连接, MTU={self.client.mtu_size}")
|
|
except Exception as e:
|
|
print(f" 连接失败: {e}")
|
|
self._print_summary()
|
|
return
|
|
|
|
try:
|
|
# Step 3-12: 运行测试用例
|
|
print("\n--- 服务发现 ---")
|
|
if not await self.test_discover_service():
|
|
return
|
|
|
|
print("\n--- NOTIFY 启用 ---")
|
|
if not await self.test_enable_notify():
|
|
return
|
|
|
|
# 等待 CCCD 写入生效
|
|
await asyncio.sleep(0.5)
|
|
|
|
print("\n--- 功能测试 ---")
|
|
await self.test_ping()
|
|
await self.test_status()
|
|
await self.test_dev_info()
|
|
await self.test_wifi_list()
|
|
await self.test_set_vol()
|
|
|
|
print("\n--- 错误处理测试 ---")
|
|
await self.test_set_wifi_missing_ssid()
|
|
await self.test_unknown_cmd()
|
|
await self.test_invalid_json()
|
|
|
|
print("\n--- 关联性测试 ---")
|
|
await self.test_msg_id_correlation()
|
|
|
|
finally:
|
|
print("\n断开连接...")
|
|
try:
|
|
await self.client.stop_notify(CHAR_NOTIFY_UUID)
|
|
except Exception:
|
|
pass
|
|
await self.client.disconnect()
|
|
|
|
self._print_summary()
|
|
|
|
def _print_summary(self):
|
|
"""打印测试总结"""
|
|
print("\n" + "=" * 60)
|
|
total = len(self.results)
|
|
passed = sum(1 for r in self.results if r.passed)
|
|
failed = total - passed
|
|
|
|
print(f" 测试结果: {passed}/{total} 通过, {failed} 失败")
|
|
print("=" * 60)
|
|
|
|
if failed > 0:
|
|
print("\n失败用例:")
|
|
for r in self.results:
|
|
if not r.passed:
|
|
print(f" [x] {r.name}: {r.detail}")
|
|
print()
|
|
|
|
# 返回退出码
|
|
return 0 if failed == 0 else 1
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="BLE JSON 通讯模块测试")
|
|
parser.add_argument("--device", default=DEFAULT_DEVICE_NAME,
|
|
help=f"BLE 设备名称 (默认: {DEFAULT_DEVICE_NAME})")
|
|
parser.add_argument("--timeout", type=float, default=5.0,
|
|
help="命令响应超时秒数 (默认: 5.0)")
|
|
args = parser.parse_args()
|
|
|
|
tester = BleJsonTester(device_name=args.device, timeout=args.timeout)
|
|
exit_code = asyncio.run(tester.run())
|
|
sys.exit(exit_code or 0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|