Baji_Rtc_Toy/tests/ble_json_test.py

491 lines
17 KiB
Python

#!/usr/bin/env python3
"""
BLE JSON 通讯模块测试脚本
用途: 模拟 APP 端,通过 BLE 与 ESP32 设备通信,验证所有 JSON 命令的收发。
依赖: pip install bleak
运行: python tests/ble_json_test.py [--device DEVICE_NAME] [--timeout SECONDS]
测试覆盖:
1. 设备扫描与连接
2. GATT Service/Characteristic 发现
3. NOTIFY 启用
4. 所有 JSON 命令的请求-响应验证
5. 错误处理 (非法 JSON、缺少参数、未知命令)
6. msg_id 关联性验证
"""
import argparse
import asyncio
import json
import sys
import time
from dataclasses import dataclass, field
from typing import Optional
try:
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
except ImportError:
print("错误: 缺少 bleak 库,请执行: pip install bleak")
sys.exit(1)
# ============================================================
# BLE 参数定义 (与 ble_service_config.h 一致)
# ============================================================
SERVICE_UUID = "0000ab00-0000-1000-8000-00805f9b34fb"
CHAR_WRITE_UUID = "0000ab01-0000-1000-8000-00805f9b34fb"
CHAR_NOTIFY_UUID = "0000ab02-0000-1000-8000-00805f9b34fb"
CHAR_STATUS_UUID = "0000ab03-0000-1000-8000-00805f9b34fb"
DEFAULT_DEVICE_NAME = "Kapi_BLE"
# ============================================================
# 测试框架
# ============================================================
@dataclass
class TestResult:
name: str
passed: bool
detail: str = ""
duration_ms: float = 0.0
class BleJsonTester:
"""BLE JSON 通讯测试器"""
def __init__(self, device_name: str, timeout: float = 5.0):
self.device_name = device_name
self.timeout = timeout
self.client: Optional[BleakClient] = None
self.responses: list = []
self._response_event = asyncio.Event()
self._msg_id_counter = 0
self.results: list[TestResult] = []
def _next_msg_id(self) -> int:
self._msg_id_counter += 1
return self._msg_id_counter
def _on_notify(self, sender: BleakGATTCharacteristic, data: bytearray):
"""NOTIFY 回调: 接收设备返回的 JSON 数据"""
try:
text = data.decode("utf-8")
parsed = json.loads(text)
self.responses.append(parsed)
self._response_event.set()
except (UnicodeDecodeError, json.JSONDecodeError) as e:
self.responses.append({"_raw": data.hex(), "_error": str(e)})
self._response_event.set()
async def _send_cmd(self, cmd: str, data: Optional[dict] = None,
msg_id: Optional[int] = None, raw: Optional[str] = None) -> Optional[dict]:
"""
发送 JSON 命令并等待响应。
Args:
cmd: 命令名
data: 命令数据 (可选)
msg_id: 消息 ID (可选,自动生成)
raw: 直接发送原始字符串 (跳过 JSON 构建)
Returns:
解析后的 JSON 响应,超时返回 None
"""
if msg_id is None:
msg_id = self._next_msg_id()
# 清空之前的响应
self.responses.clear()
self._response_event.clear()
# 构建并发送
if raw is not None:
payload = raw.encode("utf-8")
else:
request = {"cmd": cmd, "id": msg_id}
if data is not None:
request["data"] = data
payload = json.dumps(request, separators=(",", ":")).encode("utf-8")
await self.client.write_gatt_char(CHAR_WRITE_UUID, payload, response=True)
# 等待响应
try:
await asyncio.wait_for(self._response_event.wait(), timeout=self.timeout)
except asyncio.TimeoutError:
return None
return self.responses[-1] if self.responses else None
def _record(self, name: str, passed: bool, detail: str = "", duration_ms: float = 0.0):
self.results.append(TestResult(name, passed, detail, duration_ms))
status = "PASS" if passed else "FAIL"
symbol = "+" if passed else "x"
print(f" [{symbol}] {name}: {status} {detail}")
# ============================================================
# 测试用例
# ============================================================
async def test_scan_device(self):
"""测试 1: 扫描 BLE 设备"""
t0 = time.monotonic()
print(f"\n正在扫描设备 '{self.device_name}'...")
device = await BleakScanner.find_device_by_name(
self.device_name, timeout=10.0
)
dt = (time.monotonic() - t0) * 1000
if device:
self._record("test_scan_device", True,
f"找到设备 addr={device.address}", dt)
return device
else:
self._record("test_scan_device", False,
"未找到设备,请确认设备已开机且蓝牙广播中", dt)
return None
async def test_discover_service(self):
"""测试 2: 发现 GATT Service 和 Characteristic"""
t0 = time.monotonic()
services = self.client.services
svc = services.get_service(SERVICE_UUID)
if not svc:
self._record("test_discover_service", False,
f"未发现 Service {SERVICE_UUID}")
return False
chars_found = []
for uuid in [CHAR_WRITE_UUID, CHAR_NOTIFY_UUID, CHAR_STATUS_UUID]:
char = svc.get_characteristic(uuid)
if char:
chars_found.append(uuid.split("-")[0][-4:].upper())
else:
self._record("test_discover_service", False,
f"缺少 Characteristic {uuid}")
return False
dt = (time.monotonic() - t0) * 1000
self._record("test_discover_service", True,
f"Service 0xAB00, Chars: {chars_found}", dt)
return True
async def test_enable_notify(self):
"""测试 3: 启用 NOTIFY"""
t0 = time.monotonic()
try:
await self.client.start_notify(CHAR_NOTIFY_UUID, self._on_notify)
dt = (time.monotonic() - t0) * 1000
self._record("test_enable_notify", True,
"NOTIFY 已启用 (CCCD 写入成功)", dt)
return True
except Exception as e:
dt = (time.monotonic() - t0) * 1000
self._record("test_enable_notify", False, str(e), dt)
return False
async def test_ping(self):
"""测试 4: ping/pong 连通性测试"""
t0 = time.monotonic()
msg_id = self._next_msg_id()
resp = await self._send_cmd("ping", msg_id=msg_id)
dt = (time.monotonic() - t0) * 1000
if resp is None:
self._record("test_ping", False, "超时未收到响应", dt)
return
ok = (resp.get("cmd") == "ping"
and resp.get("code") == 0
and resp.get("msg") == "pong"
and resp.get("id") == msg_id)
self._record("test_ping", ok,
f"响应: {json.dumps(resp, ensure_ascii=False)}", dt)
async def test_status(self):
"""测试 5: 查询设备状态"""
t0 = time.monotonic()
resp = await self._send_cmd("status")
dt = (time.monotonic() - t0) * 1000
if resp is None:
self._record("test_status", False, "超时未收到响应", dt)
return
data = resp.get("data", {})
has_state = "s" in data
code_ok = resp.get("code") == 0
valid_states = ["unknown", "starting", "configuring", "idle",
"connecting", "listening", "speaking", "dialog",
"upgrading", "activating", "fatal_error"]
state_ok = data.get("s") in valid_states
ok = code_ok and has_state and state_ok
detail = f"state={data.get('s')}, bat={data.get('bat')}, vol={data.get('vol')}"
self._record("test_status", ok, detail, dt)
async def test_dev_info(self):
"""测试 6: 查询设备信息"""
t0 = time.monotonic()
resp = await self._send_cmd("dev_info")
dt = (time.monotonic() - t0) * 1000
if resp is None:
self._record("test_dev_info", False, "超时未收到响应", dt)
return
data = resp.get("data", {})
required_fields = ["mac", "board", "fw", "chip"]
missing = [f for f in required_fields if f not in data]
ok = resp.get("code") == 0 and len(missing) == 0
if ok:
detail = f"mac={data['mac']}, board={data['board']}, fw={data['fw']}, chip={data['chip']}"
else:
detail = f"缺少字段: {missing}, 响应: {json.dumps(resp, ensure_ascii=False)}"
self._record("test_dev_info", ok, detail, dt)
async def test_wifi_list(self):
"""测试 7: WiFi 扫描列表"""
t0 = time.monotonic()
# WiFi 扫描是阻塞的,给更长超时
old_timeout = self.timeout
self.timeout = 15.0
resp = await self._send_cmd("wifi_list")
self.timeout = old_timeout
dt = (time.monotonic() - t0) * 1000
if resp is None:
self._record("test_wifi_list", False, "超时未收到响应 (WiFi扫描可能耗时较长)", dt)
return
data = resp.get("data", {})
wifi_list = data.get("list", [])
code_ok = resp.get("code") == 0
is_list = isinstance(wifi_list, list)
ok = code_ok and is_list
if ok and len(wifi_list) > 0:
first = wifi_list[0]
has_fields = "ssid" in first and "rssi" in first
ok = ok and has_fields
detail = f"扫描到 {len(wifi_list)} 个网络"
if len(wifi_list) > 0:
detail += f", 第1个: ssid={first.get('ssid')}, rssi={first.get('rssi')}"
else:
detail = f"扫描到 {len(wifi_list)} 个网络"
self._record("test_wifi_list", ok, detail, dt)
async def test_set_vol(self):
"""测试 8: 设置音量并验证"""
# 先获取当前音量
resp0 = await self._send_cmd("status")
original_vol = resp0.get("data", {}).get("vol", 50) if resp0 else 50
# 设置新音量
target_vol = 35 if original_vol != 35 else 65
t0 = time.monotonic()
resp = await self._send_cmd("set_vol", {"vol": target_vol})
dt = (time.monotonic() - t0) * 1000
if resp is None:
self._record("test_set_vol", False, "超时未收到响应", dt)
return
set_ok = resp.get("code") == 0
# 验证: 再查一次 status 确认音量变化
await asyncio.sleep(0.3)
resp2 = await self._send_cmd("status")
new_vol = resp2.get("data", {}).get("vol") if resp2 else None
verify_ok = new_vol == target_vol
ok = set_ok and verify_ok
detail = f"设置 vol={target_vol}, set_ok={set_ok}, 验证 vol={new_vol}"
self._record("test_set_vol", ok, detail, dt)
# 恢复原音量
await self._send_cmd("set_vol", {"vol": original_vol})
async def test_set_wifi_missing_ssid(self):
"""测试 9: set_wifi 缺少 ssid 参数"""
t0 = time.monotonic()
resp = await self._send_cmd("set_wifi", {"pwd": "12345678"})
dt = (time.monotonic() - t0) * 1000
if resp is None:
self._record("test_set_wifi_missing_ssid", False, "超时未收到响应", dt)
return
ok = resp.get("code") == -1 and "ssid" in resp.get("msg", "").lower()
self._record("test_set_wifi_missing_ssid", ok,
f"code={resp.get('code')}, msg={resp.get('msg')}", dt)
async def test_unknown_cmd(self):
"""测试 10: 未知命令"""
t0 = time.monotonic()
resp = await self._send_cmd("this_cmd_does_not_exist")
dt = (time.monotonic() - t0) * 1000
if resp is None:
self._record("test_unknown_cmd", False, "超时未收到响应", dt)
return
ok = resp.get("code") == -99
self._record("test_unknown_cmd", ok,
f"code={resp.get('code')}, msg={resp.get('msg')}", dt)
async def test_invalid_json(self):
"""测试 11: 发送非法 JSON 数据"""
t0 = time.monotonic()
resp = await self._send_cmd("", raw="{this is not valid json!}")
dt = (time.monotonic() - t0) * 1000
if resp is None:
self._record("test_invalid_json", False, "超时未收到响应 (可能设备未发送错误通知)", dt)
return
# 设备应返回 error 响应
ok = resp.get("code", 0) != 0 or resp.get("cmd") == "error"
self._record("test_invalid_json", ok,
f"响应: {json.dumps(resp, ensure_ascii=False)}", dt)
async def test_msg_id_correlation(self):
"""测试 12: msg_id 关联性验证 — 连续发送多个命令,验证每个响应的 id 正确"""
t0 = time.monotonic()
ids = [100, 200, 300]
results = []
for mid in ids:
self.responses.clear()
self._response_event.clear()
request = {"cmd": "ping", "id": mid}
payload = json.dumps(request, separators=(",", ":")).encode("utf-8")
await self.client.write_gatt_char(CHAR_WRITE_UUID, payload, response=True)
try:
await asyncio.wait_for(self._response_event.wait(), timeout=self.timeout)
resp = self.responses[-1] if self.responses else None
except asyncio.TimeoutError:
resp = None
if resp:
results.append((mid, resp.get("id")))
else:
results.append((mid, None))
await asyncio.sleep(0.2)
dt = (time.monotonic() - t0) * 1000
all_match = all(sent == recv for sent, recv in results)
detail = ", ".join(f"sent={s}->recv={r}" for s, r in results)
self._record("test_msg_id_correlation", all_match, detail, dt)
# ============================================================
# 主运行流程
# ============================================================
async def run(self):
"""执行全部测试"""
print("=" * 60)
print(" BLE JSON 通讯模块 - 自动化测试")
print("=" * 60)
# Step 1: 扫描
device = await self.test_scan_device()
if not device:
self._print_summary()
return
# Step 2: 连接
print(f"\n正在连接 {device.address}...")
try:
self.client = BleakClient(device, timeout=10.0)
await self.client.connect()
print(f" 已连接, MTU={self.client.mtu_size}")
except Exception as e:
print(f" 连接失败: {e}")
self._print_summary()
return
try:
# Step 3-12: 运行测试用例
print("\n--- 服务发现 ---")
if not await self.test_discover_service():
return
print("\n--- NOTIFY 启用 ---")
if not await self.test_enable_notify():
return
# 等待 CCCD 写入生效
await asyncio.sleep(0.5)
print("\n--- 功能测试 ---")
await self.test_ping()
await self.test_status()
await self.test_dev_info()
await self.test_wifi_list()
await self.test_set_vol()
print("\n--- 错误处理测试 ---")
await self.test_set_wifi_missing_ssid()
await self.test_unknown_cmd()
await self.test_invalid_json()
print("\n--- 关联性测试 ---")
await self.test_msg_id_correlation()
finally:
print("\n断开连接...")
try:
await self.client.stop_notify(CHAR_NOTIFY_UUID)
except Exception:
pass
await self.client.disconnect()
self._print_summary()
def _print_summary(self):
"""打印测试总结"""
print("\n" + "=" * 60)
total = len(self.results)
passed = sum(1 for r in self.results if r.passed)
failed = total - passed
print(f" 测试结果: {passed}/{total} 通过, {failed} 失败")
print("=" * 60)
if failed > 0:
print("\n失败用例:")
for r in self.results:
if not r.passed:
print(f" [x] {r.name}: {r.detail}")
print()
# 返回退出码
return 0 if failed == 0 else 1
def main():
parser = argparse.ArgumentParser(description="BLE JSON 通讯模块测试")
parser.add_argument("--device", default=DEFAULT_DEVICE_NAME,
help=f"BLE 设备名称 (默认: {DEFAULT_DEVICE_NAME})")
parser.add_argument("--timeout", type=float, default=5.0,
help="命令响应超时秒数 (默认: 5.0)")
args = parser.parse_args()
tester = BleJsonTester(device_name=args.device, timeout=args.timeout)
exit_code = asyncio.run(tester.run())
sys.exit(exit_code or 0)
if __name__ == "__main__":
main()