From c219ec2fcf8bc7e6a7f161b152e3a4986cded171 Mon Sep 17 00:00:00 2001 From: repair-agent Date: Mon, 2 Mar 2026 17:16:26 +0800 Subject: [PATCH] =?UTF-8?q?feat(hw-ws-service):=20=E5=B0=86=20Go=20WebSock?= =?UTF-8?q?et=20=E6=9C=8D=E5=8A=A1=E7=BA=B3=E5=85=A5=20CI/CD=20=E5=B9=B6?= =?UTF-8?q?=E9=80=9A=E8=BF=87=20Traefik=20=E7=BB=9F=E4=B8=80=E5=85=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 变更内容 ### k8s/ingress.yaml - 新增 /xiaozhi/v1/ 路径规则,将 WebSocket 流量路由到 hw-ws-svc:8888 - Traefik 最长前缀优先,/xiaozhi/v1/ 不影响 / 下的 Django 路由 ### hw_service_go/k8s/service.yaml - Service 类型由 LoadBalancer 改为 ClusterIP - 移除阿里云 SLB 注解(通过 Traefik Ingress 统一暴露,不再需要独立公网 IP) ### hw_service_go/k8s/deployment.yaml - 镜像地址改为 ${CI_REGISTRY_IMAGE}/hw-ws-service:latest 占位符 - CI/CD 部署时统一通过 sed 替换为华为云 SWR 实际地址 ### hw_service_go/internal/server/server.go - 新增 GET /xiaozhi/v1/healthz 接口,返回 {"status":"ok","active_connections":N} - 用于部署后验证服务存活及当前连接数 ### .gitea/workflows/deploy.yaml - 新增 Build and Push HW WebSocket Service 步骤,构建并推送 hw_service_go 镜像 - 部署步骤新增 kubectl apply hw_service_go/k8s/deployment.yaml 和 service.yaml - 新增 kubectl rollout restart deployment/hw-ws-service ### run.sh - 本地同时启动 Django + hw_service_go 的开发脚本 Co-Authored-By: Claude Sonnet 4.6 --- .gitea/workflows/deploy.yaml | 13 + hw_service_go/.env.example | 13 + hw_service_go/CLAUDE.md | 396 ++++++++++++++++++ hw_service_go/Dockerfile | 49 +++ hw_service_go/cmd/main.go | 49 +++ hw_service_go/go.mod | 8 + hw_service_go/go.sum | 4 + hw_service_go/internal/audio/audio.go | 13 + hw_service_go/internal/audio/convert.go | 127 ++++++ hw_service_go/internal/config/config.go | 33 ++ .../internal/connection/connection.go | 85 ++++ .../internal/connection/connection_test.go | 177 ++++++++ .../internal/handler/audio_sender.go | 63 +++ .../internal/handler/audio_sender_test.go | 195 +++++++++ hw_service_go/internal/handler/story.go | 73 ++++ hw_service_go/internal/rtcclient/client.go | 100 +++++ .../internal/rtcclient/client_test.go | 142 +++++++ hw_service_go/internal/server/server.go | 216 ++++++++++ hw_service_go/k8s/deployment.yaml | 82 ++++ hw_service_go/k8s/service.yaml | 15 + hw_service_go/test/PLAN.md | 251 +++++++++++ k8s/ingress.yaml | 7 + run.sh | 103 +++++ 23 files changed, 2214 insertions(+) create mode 100644 hw_service_go/.env.example create mode 100644 hw_service_go/CLAUDE.md create mode 100644 hw_service_go/Dockerfile create mode 100644 hw_service_go/cmd/main.go create mode 100644 hw_service_go/go.mod create mode 100644 hw_service_go/go.sum create mode 100644 hw_service_go/internal/audio/audio.go create mode 100644 hw_service_go/internal/audio/convert.go create mode 100644 hw_service_go/internal/config/config.go create mode 100644 hw_service_go/internal/connection/connection.go create mode 100644 hw_service_go/internal/connection/connection_test.go create mode 100644 hw_service_go/internal/handler/audio_sender.go create mode 100644 hw_service_go/internal/handler/audio_sender_test.go create mode 100644 hw_service_go/internal/handler/story.go create mode 100644 hw_service_go/internal/rtcclient/client.go create mode 100644 hw_service_go/internal/rtcclient/client_test.go create mode 100644 hw_service_go/internal/server/server.go create mode 100644 hw_service_go/k8s/deployment.yaml create mode 100644 hw_service_go/k8s/service.yaml create mode 100644 hw_service_go/test/PLAN.md create mode 100755 run.sh diff --git a/.gitea/workflows/deploy.yaml b/.gitea/workflows/deploy.yaml index d5fbac2..6962451 100644 --- a/.gitea/workflows/deploy.yaml +++ b/.gitea/workflows/deploy.yaml @@ -37,6 +37,15 @@ jobs: --tag ${{ secrets.SWR_SERVER }}/${{ secrets.SWR_ORG }}/rtc-backend:latest \ . 2>&1 | tee /tmp/build.log + - name: Build and Push HW WebSocket Service + run: | + set -o pipefail + docker buildx build \ + --push \ + --provenance=false \ + --tag ${{ secrets.SWR_SERVER }}/${{ secrets.SWR_ORG }}/hw-ws-service:latest \ + ./hw_service_go 2>&1 | tee -a /tmp/build.log + - name: Setup Kubectl run: | curl -LO "https://dl.k8s.io/release/v1.28.2/bin/linux/amd64/kubectl" || \ @@ -68,13 +77,17 @@ jobs: # 2. 替换镜像地址 sed -i "s|\${CI_REGISTRY_IMAGE}/backend:latest|${{ secrets.SWR_SERVER }}/${{ secrets.SWR_ORG }}/rtc-backend:latest|g" $DEPLOY_FILE + sed -i "s|\${CI_REGISTRY_IMAGE}/hw-ws-service:latest|${{ secrets.SWR_SERVER }}/${{ secrets.SWR_ORG }}/hw-ws-service:latest|g" hw_service_go/k8s/deployment.yaml # 3. 应用配置并捕获输出 set -o pipefail { kubectl apply -f $DEPLOY_FILE kubectl apply -f $INGRESS_FILE + kubectl apply -f hw_service_go/k8s/deployment.yaml + kubectl apply -f hw_service_go/k8s/service.yaml kubectl rollout restart deployment/$DEPLOY_NAME + kubectl rollout restart deployment/hw-ws-service } 2>&1 | tee /tmp/deploy.log - name: Report failure to Log Center diff --git a/hw_service_go/.env.example b/hw_service_go/.env.example new file mode 100644 index 0000000..63c6dda --- /dev/null +++ b/hw_service_go/.env.example @@ -0,0 +1,13 @@ +# hw-ws-service 环境变量示例 +# 复制为 .env 并填入实际值(.env 不提交 git) + +# WebSocket 监听地址(默认 0.0.0.0) +HW_WS_HOST=0.0.0.0 + +# WebSocket 监听端口(默认 8888) +HW_WS_PORT=8888 + +# RTC 后端地址(必填) +# K8s 内部:http://rtc-backend-svc:8000 +# 本地开发:http://localhost:8000 +HW_RTC_BACKEND_URL=http://localhost:8000 diff --git a/hw_service_go/CLAUDE.md b/hw_service_go/CLAUDE.md new file mode 100644 index 0000000..ebb8c42 --- /dev/null +++ b/hw_service_go/CLAUDE.md @@ -0,0 +1,396 @@ +# hw_service_go - Claude Code 开发指南 + +> ESP32 硬件 WebSocket 通讯服务,负责接收设备指令并推送 Opus 音频流。 + +## 技术栈 + +- **Go 1.23+** +- `github.com/gorilla/websocket` — WebSocket 服务器 +- `github.com/hraban/opus` — CGO libopus 编码(需 `opus-dev`) +- `ffmpeg`(系统级二进制)— MP3/AAC 解码为 PCM +- K8s 部署,端口 **8888** + +## 目录结构 + +``` +hw_service_go/ +├── cmd/main.go # 唯一入口,只做启动和优雅关闭 +├── internal/ +│ ├── config/config.go # 环境变量,只读,不可变 +│ ├── server/server.go # HTTP Upgrader + 连接生命周期 +│ ├── connection/connection.go # 单连接状态,并发安全 +│ ├── handler/ +│ │ ├── story.go # 故事播放主流程 +│ │ └── audio_sender.go # Opus 帧流控发送 +│ ├── audio/convert.go # MP3→PCM→Opus 转码 +│ └── rtcclient/client.go # 调用 Django REST API +├── go.mod / go.sum +└── Dockerfile +``` + +> `internal/` 包不对外暴露,所有跨包通信通过显式函数参数传递,**不使用全局变量**。 + +--- + +## 一、代码规范 + +### 1.1 命名 + +| 类型 | 规范 | 示例 | +|------|------|------| +| 包名 | 小写单词,不含下划线 | `server`, `rtcclient` | +| 导出类型/函数 | UpperCamelCase | `Connection`, `HandleStory` | +| 非导出标识符 | lowerCamelCase | `abortCh`, `sendFrame` | +| 常量 | UpperCamelCase(非全大写)| `FrameSizeMs`, `PreBufferCount` | +| 接口 | 以行为命名,单方法接口加 `-er` 后缀 | `Sender`, `Converter` | +| 错误变量 | `Err` 前缀 | `ErrDeviceNotFound`, `ErrAudioConvert` | + +> **不使用** `SCREAMING_SNAKE_CASE` 常量,这是 C 习惯,不是 Go 惯例。 + +### 1.2 错误处理 + +```go +// ✅ 正确:始终包装上下文 +frames, err := audio.Convert(ctx, url) +if err != nil { + return fmt.Errorf("story handler: convert audio: %w", err) +} + +// ❌ 错误:丢弃错误 +frames, _ = audio.Convert(ctx, url) + +// ❌ 错误:panic 在业务逻辑里(仅允许在 main 初始化阶段) +frames, err = audio.Convert(ctx, url) +if err != nil { panic(err) } +``` + +- 错误链用 `%w`(支持 `errors.Is` / `errors.As`) +- 叶子函数返回 `errors.New()`,中间层用 `fmt.Errorf("context: %w", err)` +- 只在 `cmd/main.go` 初始化失败时允许 `log.Fatal` + +### 1.3 Context 使用 + +```go +// ✅ Context 作为第一个参数 +func (c *Client) FetchStory(ctx context.Context, mac string) (*StoryInfo, error) + +// ✅ 所有 I/O 操作绑定 context(支持超时/取消) +req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) +cmd := exec.CommandContext(ctx, "ffmpeg", ...) + +// ❌ 不存储 context 到结构体字段 +type Handler struct { + ctx context.Context // 禁止 +} +``` + +### 1.4 并发与 goroutine + +```go +// ✅ goroutine 必须有明确的退出机制 +go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-abortCh: + return + case frame := <-frameCh: + ws.WriteMessage(websocket.BinaryMessage, frame) + } + } +}() + +// ❌ 禁止裸 goroutine(无法追踪生命周期) +go processAudio(url) +``` + +- 每启动一个 goroutine,必须确保它**有且只有一个**退出路径 +- 使用 `sync.WaitGroup` 跟踪服务级 goroutine,确保优雅关闭时全部结束 +- Channel 方向声明:`send <-chan T`,`recv chan<- T`,减少误用 + +### 1.5 结构体初始化 + +```go +// ✅ 始终使用字段名初始化(顺序变更不会引入 bug) +conn := &Connection{ + WS: ws, + DeviceID: deviceID, + ClientID: clientID, +} + +// ❌ 位置初始化(字段顺序改变后静默错误) +conn := &Connection{ws, deviceID, clientID} +``` + +### 1.6 接口设计 + +```go +// ✅ 在使用方定义接口(而非实现方) +// audio/convert.go 不定义接口,由 handler 包定义它需要的最小接口 +package handler + +type AudioConverter interface { + Convert(ctx context.Context, url string) ([][]byte, error) +} +``` + +--- + +## 二、代码生成规范 + +### 2.1 新增消息类型处理器 + +硬件消息类型通过 `server.go` 的 `switch envelope.Type` 路由。新增类型时: + +1. 在 `handler/` 下创建 `.go` +2. 函数签名必须为:`func Handle(conn *connection.Connection, raw []byte)` +3. 在 `server.go` 的 switch 中注册 + +```go +// server.go +switch envelope.Type { +case "story": + go handler.HandleStory(conn, raw) +case "music": // 新增 + go handler.HandleMusic(conn, raw) // 新增 +} +``` + +### 2.2 新增配置项 + +所有配置**只能**通过环境变量注入,**不允许**读取配置文件或命令行参数(保持 12-Factor App 原则): + +```go +// config/config.go +type Config struct { + WSPort string // HW_WS_PORT,默认 "8888" + RTCBackendURL string // HW_RTC_BACKEND_URL,必填 + NewFeatureXXX string // HW_NEW_FEATURE_XXX,新增时遵循此格式 +} +``` + +- 环境变量前缀统一为 `HW_` +- 必填项在 `Load()` 中 `log.Fatal` 校验 +- 不使用 `viper` 等配置库(项目够小,标准库足够) + +### 2.3 Dockerfile 变更 + +Dockerfile 使用**多阶段构建**,修改时严格遵守: +- 构建阶段:`golang:1.23-alpine`,只安装编译依赖(`gcc musl-dev opus-dev`) +- 运行阶段:`alpine:3.20`,只安装运行时依赖(`opus ffmpeg ca-certificates`) +- 最终镜像不包含 Go 工具链、源码、测试文件 + +--- + +## 三、安全风险防范 + +### 3.1 ⚠️ exec 命令注入(最高优先级) + +`audio/convert.go` 调用 `exec.Command("ffmpeg", ...)` 时,**所有参数必须是硬编码常量,绝对不能包含任何用户输入**。 + +```go +// ✅ 安全:参数全部硬编码 +cmd := exec.CommandContext(ctx, "ffmpeg", + "-nostdin", + "-i", "pipe:0", // 始终从 stdin 读,不接受文件路径 + "-ar", "16000", + "-ac", "1", + "-f", "s16le", + "pipe:1", +) +cmd.Stdin = resp.Body // HTTP body 通过 stdin 传入,不是命令行参数 + +// ❌ 危险:audio_url 进入命令行参数(命令注入) +cmd := exec.CommandContext(ctx, "ffmpeg", "-i", audioURL, ...) + +// ❌ 危险:使用 shell 执行 +exec.Command("sh", "-c", "ffmpeg -i "+audioURL) +``` + +> `audioURL` 只能作为 HTTP 请求的 URL,由 `net/http` 处理,永远不进入 `exec.Command` 的参数列表。 + +### 3.2 WebSocket 输入验证 + +```go +// server.go:设置消息大小上限,防止内存耗尽攻击 +upgrader := websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true // IoT 设备无 Origin,允许所有来源 + }, +} + +// 连接建立后立即设置读限制 +ws.SetReadLimit(4 * 1024) // 文本消息上限 4KB(硬件不会发大消息) +``` + +```go +// 解析 JSON 时验证关键字段 +var msg StoryMessage +if err := json.Unmarshal(raw, &msg); err != nil { + return fmt.Errorf("invalid json: %w", err) +} +// device_id 来自 URL 参数(已在连接时验证),不信任消息体中的 device_id +``` + +### 3.3 资源耗尽防护 + +```go +// server.go:限制最大并发连接数 +const maxConnections = 500 + +func (s *Server) register(conn *Connection) error { + s.mu.Lock() + defer s.mu.Unlock() + if len(s.conns) >= maxConnections { + return ErrTooManyConnections + } + s.conns[conn.DeviceID] = conn + return nil +} + +// 同一设备同时只允许一个连接(防止设备重复连接内存泄漏) +if old, exists := s.conns[conn.DeviceID]; exists { + old.Close() // 踢掉旧连接 +} +``` + +```go +// audio/convert.go:ffmpeg 超时保护(防止卡死) +ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second) +defer cancel() +cmd := exec.CommandContext(ctx, "ffmpeg", ...) +``` + +### 3.4 HTTP 客户端安全 + +```go +// rtcclient/client.go:必须设置超时,防止 RTC 后端无响应时 goroutine 泄漏 +var httpClient = &http.Client{ + Timeout: 10 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 50, + IdleConnTimeout: 90 * time.Second, + }, + // 禁止无限重定向 + CheckRedirect: func(req *http.Request, via []*http.Request) error { + if len(via) >= 3 { + return errors.New("too many redirects") + } + return nil + }, +} +``` + +### 3.5 goroutine 泄漏防护 + +```go +// ✅ handler 必须响应 context 取消 +func HandleStory(conn *Connection, raw []byte) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() // 无论何种退出路径,context 都会被取消 + + frames, err := audio.Convert(ctx, story.AudioURL) + // ... +} + +// ✅ audio sender 通过 select 同时监听多个退出信号 +select { +case <-time.After(delay): +case <-abortCh: // 用户打断 + return +case <-ctx.Done(): // 超时或连接关闭 + return +} +``` + +### 3.6 日志安全 + +```go +// ✅ 日志中不输出敏感信息 +log.Printf("fetch story for device %s", conn.DeviceID) // MAC 地址可以记录(非个人数据) +log.Printf("audio url: %s", truncate(story.AudioURL, 60)) // URL 截断记录 + +// ❌ 不记录完整 audio_url(可能含签名 token) +log.Printf("audio url: %s", story.AudioURL) +``` + +--- + +## 四、测试规范 + +```go +// 测试文件命名:<被测文件>_test.go +// 测试函数命名:Test_ + +func TestFetchStoryByMAC_Success(t *testing.T) { ... } +func TestFetchStoryByMAC_DeviceNotFound(t *testing.T) { ... } +func TestSendOpusStream_AbortMidway(t *testing.T) { ... } +``` + +- 使用 `net/http/httptest` mock RTC 后端 HTTP 接口 +- 音频转码测试使用真实小文件(`testdata/short.mp3`,< 5s) +- 不测试 WebSocket 集成逻辑(由端到端脚本覆盖) + +--- + +## 五、常用命令 + +```bash +# 编译(在 hw_service_go/ 目录下) +go build ./... + +# 静态检查 +go vet ./... + +# 本地运行 +HW_RTC_BACKEND_URL=http://localhost:8000 go run ./cmd/main.go + +# 运行测试 +go test ./... -v -race # -race 开启竞态检测 + +# 格式化(提交前必须执行) +gofmt -w . +goimports -w . # 需安装: go install golang.org/x/tools/cmd/goimports@latest + +# 构建 Docker 镜像 +docker build -t hw-ws-service:dev . + +# 查看 goroutine 泄漏(开发调试) +curl http://localhost:8888/debug/pprof/goroutine?debug=1 +``` + +--- + +## 六、开发检查清单 + +**新增功能前:** +- [ ] 消息处理函数签名是否为 `func Handle(conn *connection.Connection, raw []byte)` +- [ ] 是否正确使用 `context.Context` 传递超时 +- [ ] 是否有 goroutine 退出机制(channel / context) + +**提交代码前:** +- [ ] `gofmt -w .` 格式化通过 +- [ ] `go vet ./...` 无警告 +- [ ] `go test ./... -race` 无 data race +- [ ] exec.Command 参数**不包含任何**来自外部的数据 +- [ ] 所有 HTTP 客户端调用都有超时设置 +- [ ] 新增环境变量已更新 `.env.example` 和 `k8s/deployment.yaml` + +**安全 review 要点:** +- [ ] `audio/convert.go`:audioURL 是否只经过 `http.Get()`,没有进入 `exec.Command` +- [ ] WebSocket `SetReadLimit` 是否已设置 +- [ ] 新增 goroutine 是否有对应的 `wg.Add(1)` 和 `defer wg.Done()` + +--- + +## 参考资料 + +- [Effective Go](https://go.dev/doc/effective_go) +- [Go Code Review Comments](https://github.com/golang/go/wiki/CodeReviewComments) +- [Uber Go Style Guide](https://github.com/uber-go/guide/blob/master/style.md) +- [gorilla/websocket 文档](https://pkg.go.dev/github.com/gorilla/websocket) +- [hraban/opus 文档](https://pkg.go.dev/github.com/hraban/opus) diff --git a/hw_service_go/Dockerfile b/hw_service_go/Dockerfile new file mode 100644 index 0000000..58b0e93 --- /dev/null +++ b/hw_service_go/Dockerfile @@ -0,0 +1,49 @@ +# ============================================================ +# hw-ws-service Dockerfile — 多阶段构建 +# 构建阶段:Go 编译(含 CGO for libopus) +# 运行阶段:Alpine + libopus + ffmpeg(最终镜像 ~60-80MB) +# ============================================================ + +# ---- 构建阶段 ---- +FROM golang:1.23-alpine AS builder + +# 安装 CGO 编译所需的 C 工具链和 libopus 开发头文件 +RUN apk add --no-cache gcc musl-dev opus-dev + +WORKDIR /app + +# 先拷贝 go.mod/go.sum 利用 Docker 层缓存(依赖未变时跳过 go mod download) +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +# CGO_ENABLED=1 必须开启(hraban/opus 是 CGO 库) +# -trimpath 去除本地路径信息(安全性) +# -ldflags="-s -w" 去除调试符号(缩减二进制大小) +RUN CGO_ENABLED=1 GOOS=linux \ + go build \ + -trimpath \ + -ldflags="-s -w" \ + -o hw-ws-service \ + ./cmd/main.go + +# ---- 运行阶段 ---- +FROM alpine:3.20 + +# 运行时依赖: +# opus — libopus 动态库(hraban/opus CGO 绑定需要) +# ffmpeg — MP3/AAC 解码为 PCM +# ca-certificates — HTTPS 请求 OSS 需要根证书 +RUN apk add --no-cache opus ffmpeg ca-certificates && \ + # 创建非 root 运行用户(安全最佳实践) + addgroup -S hwws && adduser -S hwws -G hwws + +COPY --from=builder /app/hw-ws-service /hw-ws-service + +# 以非 root 用户运行 +USER hwws + +EXPOSE 8888 + +ENTRYPOINT ["/hw-ws-service"] diff --git a/hw_service_go/cmd/main.go b/hw_service_go/cmd/main.go new file mode 100644 index 0000000..b581585 --- /dev/null +++ b/hw_service_go/cmd/main.go @@ -0,0 +1,49 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/qy/hw-ws-service/internal/config" + "github.com/qy/hw-ws-service/internal/rtcclient" + "github.com/qy/hw-ws-service/internal/server" +) + +func main() { + log.SetFlags(log.LstdFlags | log.Lmsgprefix) + log.SetPrefix("[hw-ws] ") + + cfg := config.Load() + addr := cfg.WSHost + ":" + cfg.WSPort + + client := rtcclient.New(cfg.RTCBackendURL) + srv := server.New(addr, client) + + // 后台启动服务器 + serverErr := make(chan error, 1) + go func() { + serverErr <- srv.ListenAndServe() + }() + + // 监听系统信号(K8s 滚动更新发送 SIGTERM) + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT) + + select { + case err := <-serverErr: + log.Fatalf("server error: %v", err) + case sig := <-sigCh: + log.Printf("received signal: %v, starting graceful shutdown...", sig) + } + + // 优雅关闭:最长 80s(与 K8s terminationGracePeriodSeconds=90 配合) + ctx, cancel := context.WithTimeout(context.Background(), 80*time.Second) + defer cancel() + srv.Shutdown(ctx) + + log.Println("shutdown complete") +} diff --git a/hw_service_go/go.mod b/hw_service_go/go.mod new file mode 100644 index 0000000..ee95318 --- /dev/null +++ b/hw_service_go/go.mod @@ -0,0 +1,8 @@ +module github.com/qy/hw-ws-service + +go 1.23 + +require ( + github.com/gorilla/websocket v1.5.3 + github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 +) diff --git a/hw_service_go/go.sum b/hw_service_go/go.sum new file mode 100644 index 0000000..c480cbc --- /dev/null +++ b/hw_service_go/go.sum @@ -0,0 +1,4 @@ +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302 h1:K7bmEmIesLcvCW0Ic2rCk6LtP5++nTnPmrO8mg5umlA= +github.com/hraban/opus v0.0.0-20230925203106-0188a62cb302/go.mod h1:YQQXrWHN3JEvCtw5ImyTCcPeU/ZLo/YMA+TpB64XdrU= diff --git a/hw_service_go/internal/audio/audio.go b/hw_service_go/internal/audio/audio.go new file mode 100644 index 0000000..d7d5b8c --- /dev/null +++ b/hw_service_go/internal/audio/audio.go @@ -0,0 +1,13 @@ +// Package audio 提供音频格式转换功能:从 URL 下载 MP3,转码为 Opus 帧列表。 +// 全程使用 ffmpeg stdin/stdout pipe,不写临时文件。 +package audio + +const ( + SampleRate = 16000 + Channels = 1 + FrameDurationMs = 60 + // FrameSize 是每个 Opus 帧包含的 PCM 采样数(16bit)。 + FrameSize = SampleRate * FrameDurationMs / 1000 // 960 samples + // PreBufferCount 是流控前快速预发送的帧数,减少硬件首帧延迟。 + PreBufferCount = 3 +) diff --git a/hw_service_go/internal/audio/convert.go b/hw_service_go/internal/audio/convert.go new file mode 100644 index 0000000..7e392ea --- /dev/null +++ b/hw_service_go/internal/audio/convert.go @@ -0,0 +1,127 @@ +package audio + +import ( + "context" + "encoding/binary" + "fmt" + "io" + "net/http" + "os/exec" + "time" + + "github.com/hraban/opus" +) + +// MP3URLToOpusFrames 从 audioURL 下载音频,通过 ffmpeg pipe 解码为 PCM, +// 再用 libopus 编码为 60ms 帧列表,全程流式处理不落磁盘。 +// +// ⚠️ 安全约束:audioURL 只能作为 http.Get 的参数, +// 绝对不能出现在 exec.Command 的参数列表中(防止命令注入)。 +func MP3URLToOpusFrames(ctx context.Context, audioURL string) ([][]byte, error) { + // 1. 下载音频(流式,不全量载入内存) + httpCtx, httpCancel := context.WithTimeout(ctx, 60*time.Second) + defer httpCancel() + + req, err := http.NewRequestWithContext(httpCtx, http.MethodGet, audioURL, nil) + if err != nil { + return nil, fmt.Errorf("audio: build request: %w", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("audio: download: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("audio: download status %d", resp.StatusCode) + } + + // 2. ffmpeg:stdin 读原始音频,stdout 输出 s16le PCM(16kHz 单声道) + // 所有参数硬编码,audioURL 不进入命令行(防命令注入) + ffmpegCtx, ffmpegCancel := context.WithTimeout(ctx, 120*time.Second) + defer ffmpegCancel() + + cmd := exec.CommandContext(ffmpegCtx, + "ffmpeg", + "-nostdin", + "-loglevel", "error", // 只输出错误,不污染 stdout pipe + "-i", "pipe:0", // 从 stdin 读输入 + "-ar", "16000", // 目标采样率 + "-ac", "1", // 单声道 + "-f", "s16le", // 输出格式:有符号 16bit 小端 PCM + "pipe:1", // 输出到 stdout + ) + cmd.Stdin = resp.Body // HTTP body 直接接 ffmpeg stdin,不经过磁盘 + + pcmReader, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("audio: stdout pipe: %w", err) + } + stderrPipe, _ := cmd.StderrPipe() + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("audio: start ffmpeg: %w", err) + } + + // 3. 逐帧读取 PCM 并实时 Opus 编码 + enc, err := opus.NewEncoder(SampleRate, Channels, opus.AppAudio) + if err != nil { + return nil, fmt.Errorf("audio: create encoder: %w", err) + } + + pcmBuf := make([]int16, FrameSize) // 960 int16 samples + opusBuf := make([]byte, 4000) // Opus 输出缓冲(4KB 足够单帧) + var frames [][]byte + + for { + err := binary.Read(pcmReader, binary.LittleEndian, pcmBuf) + if err == io.EOF || err == io.ErrUnexpectedEOF { + // 最后一帧不足时已补零(binary.Read 会读已有字节),直接编码 + if err == io.ErrUnexpectedEOF { + n, encErr := enc.Encode(pcmBuf, opusBuf) + if encErr == nil && n > 0 { + frame := make([]byte, n) + copy(frame, opusBuf[:n]) + frames = append(frames, frame) + } + } + break + } + if err != nil { + // ffmpeg 已结束(context cancel 等),读取结束 + break + } + + n, err := enc.Encode(pcmBuf, opusBuf) + if err != nil { + return nil, fmt.Errorf("audio: opus encode: %w", err) + } + frame := make([]byte, n) + copy(frame, opusBuf[:n]) + frames = append(frames, frame) + } + + // 排空 stderr 避免 ffmpeg 阻塞 + io.Copy(io.Discard, stderrPipe) + + if err := cmd.Wait(); err != nil { + // context 超时导致的退出不视为错误(已有 frames 可以播放) + if ffmpegCtx.Err() == nil { + return nil, fmt.Errorf("audio: ffmpeg exit: %w", err) + } + } + + if len(frames) == 0 { + return nil, fmt.Errorf("audio: no frames produced from %s", truncateURL(audioURL)) + } + + return frames, nil +} + +// truncateURL 截断 URL 用于日志,避免输出带签名的完整 URL。 +func truncateURL(u string) string { + if len(u) > 80 { + return u[:80] + "..." + } + return u +} diff --git a/hw_service_go/internal/config/config.go b/hw_service_go/internal/config/config.go new file mode 100644 index 0000000..922d877 --- /dev/null +++ b/hw_service_go/internal/config/config.go @@ -0,0 +1,33 @@ +package config + +import ( + "log" + "os" +) + +// Config 保存所有服务配置,全部通过环境变量注入(12-Factor App)。 +type Config struct { + WSHost string + WSPort string + RTCBackendURL string +} + +// Load 从环境变量读取配置,必填项缺失时直接 Fatal。 +func Load() *Config { + backendURL := getEnv("HW_RTC_BACKEND_URL", "") + if backendURL == "" { + log.Fatal("config: HW_RTC_BACKEND_URL is required") + } + return &Config{ + WSHost: getEnv("HW_WS_HOST", "0.0.0.0"), + WSPort: getEnv("HW_WS_PORT", "8888"), + RTCBackendURL: backendURL, + } +} + +func getEnv(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} diff --git a/hw_service_go/internal/connection/connection.go b/hw_service_go/internal/connection/connection.go new file mode 100644 index 0000000..30911fd --- /dev/null +++ b/hw_service_go/internal/connection/connection.go @@ -0,0 +1,85 @@ +// Package connection 管理单个 ESP32 硬件 WebSocket 连接的状态。 +package connection + +import ( + "encoding/json" + "fmt" + "sync" + + "github.com/gorilla/websocket" +) + +// Connection 保存单个硬件连接的状态,所有方法并发安全。 +type Connection struct { + WS *websocket.Conn + DeviceID string // MAC 地址,来自 URL 参数 device-id + ClientID string // 来自 URL 参数 client-id + + mu sync.Mutex + isPlaying bool + abortCh chan struct{} // close(abortCh) 通知流控 goroutine 中止播放 + + writeMu sync.Mutex // gorilla/websocket 写操作不并发安全,需独立锁 +} + +// New 创建新连接对象。 +func New(ws *websocket.Conn, deviceID, clientID string) *Connection { + return &Connection{ + WS: ws, + DeviceID: deviceID, + ClientID: clientID, + } +} + +// StartPlayback 开始新一轮播放,返回 abortCh 供流控 goroutine 监听。 +// 若已在播放,先中止上一轮再开始新的。 +func (c *Connection) StartPlayback() <-chan struct{} { + c.mu.Lock() + defer c.mu.Unlock() + + // 中止上一轮播放(若有) + if c.isPlaying && c.abortCh != nil { + close(c.abortCh) + } + + c.abortCh = make(chan struct{}) + c.isPlaying = true + return c.abortCh +} + +// StopPlayback 结束播放状态。 +func (c *Connection) StopPlayback() { + c.mu.Lock() + defer c.mu.Unlock() + c.isPlaying = false +} + +// IsPlaying 返回当前是否正在播放。 +func (c *Connection) IsPlaying() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.isPlaying +} + +// SendJSON 序列化 v 并以文本帧发送给设备,并发安全。 +func (c *Connection) SendJSON(v any) error { + data, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("connection: marshal json: %w", err) + } + c.writeMu.Lock() + defer c.writeMu.Unlock() + return c.WS.WriteMessage(websocket.TextMessage, data) +} + +// SendBinary 以二进制帧发送 Opus 数据,并发安全。 +func (c *Connection) SendBinary(data []byte) error { + c.writeMu.Lock() + defer c.writeMu.Unlock() + return c.WS.WriteMessage(websocket.BinaryMessage, data) +} + +// Close 关闭底层 WebSocket 连接。 +func (c *Connection) Close() { + c.WS.Close() +} diff --git a/hw_service_go/internal/connection/connection_test.go b/hw_service_go/internal/connection/connection_test.go new file mode 100644 index 0000000..be4a356 --- /dev/null +++ b/hw_service_go/internal/connection/connection_test.go @@ -0,0 +1,177 @@ +package connection_test + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/qy/hw-ws-service/internal/connection" +) + +// makeWSPair creates a real WebSocket pair for testing. +// Returns the server-side conn (what our code uses) and the client-side conn +// (what simulates the hardware). Call cleanup() after the test. +func makeWSPair(t *testing.T) (svrWS *websocket.Conn, cliWS *websocket.Conn, cleanup func()) { + t.Helper() + + ch := make(chan *websocket.Conn, 1) + done := make(chan struct{}) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + up := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }} + c, err := up.Upgrade(w, r, nil) + if err != nil { + t.Logf("upgrade error: %v", err) + return + } + ch <- c + <-done // hold handler open until cleanup + })) + + wsURL := "ws" + strings.TrimPrefix(srv.URL, "http") + cli, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + close(done) + srv.Close() + t.Fatalf("dial error: %v", err) + } + + svr := <-ch + return svr, cli, func() { + close(done) + svr.Close() + cli.Close() + srv.Close() + } +} + +func TestConnection_InitialState(t *testing.T) { + svrWS, _, cleanup := makeWSPair(t) + defer cleanup() + + conn := connection.New(svrWS, "AA:BB:CC:DD:EE:FF", "client-uuid") + if conn.DeviceID != "AA:BB:CC:DD:EE:FF" { + t.Errorf("DeviceID = %q", conn.DeviceID) + } + if conn.ClientID != "client-uuid" { + t.Errorf("ClientID = %q", conn.ClientID) + } + if conn.IsPlaying() { + t.Error("new connection should not be playing") + } +} + +func TestConnection_StartStopPlayback(t *testing.T) { + svrWS, _, cleanup := makeWSPair(t) + defer cleanup() + + conn := connection.New(svrWS, "dev1", "cli1") + + ch := conn.StartPlayback() + if ch == nil { + t.Fatal("StartPlayback should return a non-nil channel") + } + if !conn.IsPlaying() { + t.Error("IsPlaying should be true after StartPlayback") + } + + // Channel must still be open + select { + case <-ch: + t.Error("abortCh should not be closed yet") + default: + } + + conn.StopPlayback() + if conn.IsPlaying() { + t.Error("IsPlaying should be false after StopPlayback") + } +} + +// TestConnection_StartPlayback_AbortsOld verifies that calling StartPlayback a second +// time closes the previous abort channel, stopping any in-progress streaming. +func TestConnection_StartPlayback_AbortsOld(t *testing.T) { + svrWS, _, cleanup := makeWSPair(t) + defer cleanup() + + conn := connection.New(svrWS, "dev1", "cli1") + + ch1 := conn.StartPlayback() + ch2 := conn.StartPlayback() // should close ch1 + + // ch1 must be closed now + select { + case <-ch1: + // expected + case <-time.After(100 * time.Millisecond): + t.Error("first abortCh should be closed by second StartPlayback call") + } + + // ch2 must still be open + select { + case <-ch2: + t.Error("second abortCh should not be closed yet") + default: + } +} + +// TestConnection_SendJSON verifies JSON messages are delivered to the client. +func TestConnection_SendJSON(t *testing.T) { + svrWS, cliWS, cleanup := makeWSPair(t) + defer cleanup() + + conn := connection.New(svrWS, "dev1", "cli1") + + if err := conn.SendJSON(map[string]string{"type": "tts", "state": "start"}); err != nil { + t.Fatalf("SendJSON error: %v", err) + } + + cliWS.SetReadDeadline(time.Now().Add(2 * time.Second)) + msgType, data, err := cliWS.ReadMessage() + if err != nil { + t.Fatalf("client read error: %v", err) + } + if msgType != websocket.TextMessage { + t.Errorf("message type = %d, want TextMessage (%d)", msgType, websocket.TextMessage) + } + + var got map[string]string + if err := json.Unmarshal(data, &got); err != nil { + t.Fatalf("json.Unmarshal error: %v", err) + } + if got["type"] != "tts" { + t.Errorf("type = %q, want %q", got["type"], "tts") + } + if got["state"] != "start" { + t.Errorf("state = %q, want %q", got["state"], "start") + } +} + +// TestConnection_SendBinary verifies binary (Opus) frames are delivered to the client. +func TestConnection_SendBinary(t *testing.T) { + svrWS, cliWS, cleanup := makeWSPair(t) + defer cleanup() + + conn := connection.New(svrWS, "dev1", "cli1") + + payload := []byte{0x01, 0x02, 0x03, 0x04} + if err := conn.SendBinary(payload); err != nil { + t.Fatalf("SendBinary error: %v", err) + } + + cliWS.SetReadDeadline(time.Now().Add(2 * time.Second)) + msgType, data, err := cliWS.ReadMessage() + if err != nil { + t.Fatalf("client read error: %v", err) + } + if msgType != websocket.BinaryMessage { + t.Errorf("message type = %d, want BinaryMessage (%d)", msgType, websocket.BinaryMessage) + } + if string(data) != string(payload) { + t.Errorf("payload = %v, want %v", data, payload) + } +} diff --git a/hw_service_go/internal/handler/audio_sender.go b/hw_service_go/internal/handler/audio_sender.go new file mode 100644 index 0000000..f8b5f77 --- /dev/null +++ b/hw_service_go/internal/handler/audio_sender.go @@ -0,0 +1,63 @@ +package handler + +import ( + "time" + + "github.com/qy/hw-ws-service/internal/audio" + "github.com/qy/hw-ws-service/internal/connection" +) + +// SendOpusStream 将 Opus 帧列表按 60ms/帧的节奏流控发送给硬件。 +// +// 流控策略: +// 1. 预缓冲:前 PreBufferCount 帧立即发送,减少硬件首帧延迟 +// 2. 时序流控:按 (帧序号 × 60ms) 计算期望发送时间,select 等待 +// 3. 打断:监听 abortCh,收到关闭信号立即返回 +func SendOpusStream(conn *connection.Connection, frames [][]byte, abortCh <-chan struct{}) { + if len(frames) == 0 { + return + } + + startTime := time.Now() + playedMs := 0 + + // 阶段1:预缓冲,快速发送前 N 帧 + pre := audio.PreBufferCount + if pre > len(frames) { + pre = len(frames) + } + for _, f := range frames[:pre] { + select { + case <-abortCh: + return + default: + } + conn.SendBinary(f) //nolint:errcheck // 连接断开时下一次 ReadMessage 会返回错误 + } + playedMs = pre * audio.FrameDurationMs + + // 阶段2:时序流控 + for _, f := range frames[pre:] { + expectedAt := startTime.Add(time.Duration(playedMs) * time.Millisecond) + delay := time.Until(expectedAt) + + if delay > 0 { + select { + case <-time.After(delay): + // 到达预期发送时间,继续 + case <-abortCh: + return + } + } + + // delay <= 0:处理比预期慢,追赶进度,直接发送 + select { + case <-abortCh: + return + default: + } + + conn.SendBinary(f) //nolint:errcheck + playedMs += audio.FrameDurationMs + } +} diff --git a/hw_service_go/internal/handler/audio_sender_test.go b/hw_service_go/internal/handler/audio_sender_test.go new file mode 100644 index 0000000..35f486b --- /dev/null +++ b/hw_service_go/internal/handler/audio_sender_test.go @@ -0,0 +1,195 @@ +package handler_test + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/qy/hw-ws-service/internal/audio" + "github.com/qy/hw-ws-service/internal/connection" + "github.com/qy/hw-ws-service/internal/handler" +) + +// makeWSPair creates a real WebSocket pair for testing. +// svrWS is the server side (used by our Connection), cliWS simulates the hardware. +func makeWSPair(t *testing.T) (svrWS *websocket.Conn, cliWS *websocket.Conn, cleanup func()) { + t.Helper() + + ch := make(chan *websocket.Conn, 1) + done := make(chan struct{}) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + up := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }} + c, err := up.Upgrade(w, r, nil) + if err != nil { + t.Logf("upgrade error: %v", err) + return + } + ch <- c + <-done + })) + + wsURL := "ws" + strings.TrimPrefix(srv.URL, "http") + cli, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + close(done) + srv.Close() + t.Fatalf("dial error: %v", err) + } + + svr := <-ch + return svr, cli, func() { + close(done) + svr.Close() + cli.Close() + srv.Close() + } +} + +// makeFrames creates n fake Opus frames of 4 bytes each. +func makeFrames(n int) [][]byte { + frames := make([][]byte, n) + for i := range frames { + frames[i] = []byte{byte(i), byte(i >> 8), 0x00, 0xff} + } + return frames +} + +// TestSendOpusStream_Empty verifies that an empty frame list returns immediately. +func TestSendOpusStream_Empty(t *testing.T) { + svrWS, _, cleanup := makeWSPair(t) + defer cleanup() + + conn := connection.New(svrWS, "dev1", "cli1") + abort := make(chan struct{}) + + done := make(chan struct{}) + go func() { + handler.SendOpusStream(conn, nil, abort) + close(done) + }() + + select { + case <-done: + case <-time.After(500 * time.Millisecond): + t.Fatal("SendOpusStream did not return immediately for empty frames") + } +} + +// TestSendOpusStream_AllFramesSent verifies that all frames reach the client. +// Uses PreBufferCount+2 frames so both pre-buffer and timed paths are exercised. +func TestSendOpusStream_AllFramesSent(t *testing.T) { + svrWS, cliWS, cleanup := makeWSPair(t) + defer cleanup() + + conn := connection.New(svrWS, "dev1", "cli1") + + totalFrames := audio.PreBufferCount + 2 // 3 pre-buffer + 2 timed + frames := makeFrames(totalFrames) + + abort := make(chan struct{}) + senderDone := make(chan struct{}) + go func() { + handler.SendOpusStream(conn, frames, abort) + close(senderDone) + }() + + // Read all frames from the client side (simulates hardware receiving) + received := 0 + cliWS.SetReadDeadline(time.Now().Add(10 * time.Second)) + for received < totalFrames { + msgType, _, err := cliWS.ReadMessage() + if err != nil { + t.Fatalf("client read error after %d frames: %v", received, err) + } + if msgType == websocket.BinaryMessage { + received++ + } + } + + select { + case <-senderDone: + case <-time.After(2 * time.Second): + t.Fatal("SendOpusStream did not finish after all frames were sent") + } + + if received != totalFrames { + t.Errorf("received %d frames, want %d", received, totalFrames) + } +} + +// TestSendOpusStream_Abort verifies that closing abortCh stops streaming early. +func TestSendOpusStream_Abort(t *testing.T) { + svrWS, _, cleanup := makeWSPair(t) + defer cleanup() + + conn := connection.New(svrWS, "dev1", "cli1") + + // Many frames so timing control is active (pre-buffer finishes quickly, + // then the time.After select can receive the abort signal) + frames := makeFrames(100) + + abort := make(chan struct{}) + senderDone := make(chan struct{}) + go func() { + handler.SendOpusStream(conn, frames, abort) + close(senderDone) + }() + + // Close abort after pre-buffer has had time to finish but before timed frames complete + time.Sleep(20 * time.Millisecond) + close(abort) + + select { + case <-senderDone: + // SendOpusStream returned early — correct behaviour + case <-time.After(2 * time.Second): + t.Fatal("SendOpusStream did not abort within 2s after closing abortCh") + } +} + +// TestSendOpusStream_PreBufferOnly verifies frames <= PreBufferCount are all sent +// without entering the timed loop (should finish nearly instantly). +func TestSendOpusStream_PreBufferOnly(t *testing.T) { + svrWS, cliWS, cleanup := makeWSPair(t) + defer cleanup() + + conn := connection.New(svrWS, "dev1", "cli1") + + frames := makeFrames(audio.PreBufferCount) // exactly the pre-buffer count + abort := make(chan struct{}) + + start := time.Now() + senderDone := make(chan struct{}) + go func() { + handler.SendOpusStream(conn, frames, abort) + close(senderDone) + }() + + received := 0 + cliWS.SetReadDeadline(time.Now().Add(3 * time.Second)) + for received < len(frames) { + msgType, _, err := cliWS.ReadMessage() + if err != nil { + t.Fatalf("read error: %v", err) + } + if msgType == websocket.BinaryMessage { + received++ + } + } + + select { + case <-senderDone: + case <-time.After(time.Second): + t.Fatal("sender did not finish") + } + + elapsed := time.Since(start) + // Pre-buffer frames should not wait on the timer; allow 200ms for overhead + if elapsed > 200*time.Millisecond { + t.Errorf("pre-buffer-only send took too long: %v (want < 200ms)", elapsed) + } +} diff --git a/hw_service_go/internal/handler/story.go b/hw_service_go/internal/handler/story.go new file mode 100644 index 0000000..b90d876 --- /dev/null +++ b/hw_service_go/internal/handler/story.go @@ -0,0 +1,73 @@ +package handler + +import ( + "context" + "log" + "time" + + "github.com/qy/hw-ws-service/internal/audio" + "github.com/qy/hw-ws-service/internal/connection" + "github.com/qy/hw-ws-service/internal/rtcclient" +) + +// HandleStory 处理硬件发来的 {"type":"story"} 指令。 +// 在独立 goroutine 中调用,不阻塞消息读取循环。 +func HandleStory(conn *connection.Connection, client *rtcclient.Client) { + tag := "[story][" + conn.DeviceID + "]" + + // 整个故事播放流程最长允许 10 分钟 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + // 1. 通知硬件:TTS 开始 + if err := conn.SendJSON(map[string]string{"type": "tts", "state": "start"}); err != nil { + log.Printf("%s send start failed: %v", tag, err) + return + } + + // 确保异常退出时也发送 stop,避免硬件卡住 + defer func() { + conn.StopPlayback() + conn.SendJSON(map[string]string{"type": "tts", "state": "stop"}) //nolint:errcheck + }() + + // 2. 调用 RTC 后端获取故事 + story, err := client.FetchStoryByMAC(ctx, conn.DeviceID) + if err != nil { + log.Printf("%s fetch story error: %v", tag, err) + return + } + if story == nil { + log.Printf("%s no story available", tag) + return + } + log.Printf("%s playing: %s", tag, story.Title) + + // 3. 下载 MP3 并转码为 Opus 帧(CPU 密集,在当前 goroutine 中执行) + frames, err := audio.MP3URLToOpusFrames(ctx, story.AudioURL) + if err != nil { + log.Printf("%s audio convert error: %v", tag, err) + return + } + log.Printf("%s converted %d frames (~%.1fs)", tag, len(frames), + float64(len(frames)*audio.FrameDurationMs)/1000) + + // 4. 通知硬件:句子开始(发送故事标题) + if err := conn.SendJSON(map[string]any{ + "type": "tts", + "state": "sentence_start", + "text": story.Title, + }); err != nil { + log.Printf("%s send sentence_start failed: %v", tag, err) + return + } + + // 5. 开始播放,获取打断 channel + abortCh := conn.StartPlayback() + + // 6. 流控推送 Opus 帧 + SendOpusStream(conn, frames, abortCh) + + log.Printf("%s playback finished", tag) + // defer 会发送 stop 并调用 StopPlayback +} diff --git a/hw_service_go/internal/rtcclient/client.go b/hw_service_go/internal/rtcclient/client.go new file mode 100644 index 0000000..d02cf7e --- /dev/null +++ b/hw_service_go/internal/rtcclient/client.go @@ -0,0 +1,100 @@ +// Package rtcclient 封装对 RTC 后端 Django REST API 的 HTTP 调用。 +package rtcclient + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + "time" +) + +// StoryInfo 是 GET /api/v1/devices/stories/ 返回的故事信息。 +type StoryInfo struct { + Title string `json:"title"` + AudioURL string `json:"audio_url"` +} + +// Client 是 RTC 后端的 HTTP 客户端,复用连接池。 +type Client struct { + baseURL string + httpClient *http.Client +} + +// New 创建 Client,baseURL 形如 "http://rtc-backend-svc:8000"。 +func New(baseURL string) *Client { + return &Client{ + baseURL: strings.TrimRight(baseURL, "/"), + httpClient: &http.Client{ + Timeout: 10 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 50, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + }, + // 限制重定向次数,防止无限跳转 + CheckRedirect: func(req *http.Request, via []*http.Request) error { + if len(via) >= 3 { + return errors.New("rtcclient: too many redirects") + } + return nil + }, + }, + } +} + +// rtcResponse 是 RTC 后端的统一响应结构。 +type rtcResponse struct { + Code int `json:"code"` + Message string `json:"message"` + Data json.RawMessage `json:"data"` +} + +// FetchStoryByMAC 通过设备 MAC 地址获取随机故事。 +// 返回 nil, nil 表示设备/用户/故事不存在(非错误,调用方直接跳过)。 +func (c *Client) FetchStoryByMAC(ctx context.Context, mac string) (*StoryInfo, error) { + url := c.baseURL + "/api/v1/devices/stories/" + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("rtcclient: build request: %w", err) + } + + q := req.URL.Query() + q.Set("mac_address", strings.ToUpper(mac)) + req.URL.RawQuery = q.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("rtcclient: request failed: %w", err) + } + defer resp.Body.Close() + + // 404 表示设备/用户/故事不存在,不是服务器错误 + if resp.StatusCode == http.StatusNotFound { + return nil, nil + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("rtcclient: unexpected status %d", resp.StatusCode) + } + + var result rtcResponse + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("rtcclient: decode response: %w", err) + } + if result.Code != 0 { + return nil, nil // 业务错误(如暂无故事),返回 nil 让调用方处理 + } + + var story StoryInfo + if err := json.Unmarshal(result.Data, &story); err != nil { + return nil, fmt.Errorf("rtcclient: decode story: %w", err) + } + if story.Title == "" || story.AudioURL == "" { + return nil, nil + } + + return &story, nil +} diff --git a/hw_service_go/internal/rtcclient/client_test.go b/hw_service_go/internal/rtcclient/client_test.go new file mode 100644 index 0000000..c9ab8c4 --- /dev/null +++ b/hw_service_go/internal/rtcclient/client_test.go @@ -0,0 +1,142 @@ +package rtcclient_test + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/qy/hw-ws-service/internal/rtcclient" +) + +func successBody(title, audioURL string) []byte { + b, _ := json.Marshal(map[string]any{ + "code": 0, + "message": "success", + "data": map[string]string{ + "title": title, + "audio_url": audioURL, + }, + }) + return b +} + +func errorBody(code int, msg string) []byte { + b, _ := json.Marshal(map[string]any{ + "code": code, + "message": msg, + "data": nil, + }) + return b +} + +func TestFetchStoryByMAC_Success(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/v1/devices/stories/" { + t.Errorf("unexpected path: %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + w.Write(successBody("小红帽", "https://example.com/story.mp3")) + })) + defer srv.Close() + + client := rtcclient.New(srv.URL) + story, err := client.FetchStoryByMAC(context.Background(), "aa:bb:cc:dd:ee:ff") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if story == nil { + t.Fatal("expected story, got nil") + } + if story.Title != "小红帽" { + t.Errorf("title = %q, want %q", story.Title, "小红帽") + } + if story.AudioURL != "https://example.com/story.mp3" { + t.Errorf("audio_url = %q", story.AudioURL) + } +} + +// TestFetchStoryByMAC_MACUppercase verifies the client always sends uppercase MAC. +func TestFetchStoryByMAC_MACUppercase(t *testing.T) { + var gotMAC string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotMAC = r.URL.Query().Get("mac_address") + w.Header().Set("Content-Type", "application/json") + w.Write(successBody("test", "https://example.com/t.mp3")) + })) + defer srv.Close() + + client := rtcclient.New(srv.URL) + client.FetchStoryByMAC(context.Background(), "aa:bb:cc:dd:ee:ff") //nolint:errcheck + if gotMAC != "AA:BB:CC:DD:EE:FF" { + t.Errorf("MAC not uppercased: got %q, want %q", gotMAC, "AA:BB:CC:DD:EE:FF") + } +} + +// TestFetchStoryByMAC_NotFound verifies that HTTP 404 returns (nil, nil). +func TestFetchStoryByMAC_NotFound(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + })) + defer srv.Close() + + client := rtcclient.New(srv.URL) + story, err := client.FetchStoryByMAC(context.Background(), "AA:BB:CC:DD:EE:FF") + if err != nil { + t.Fatalf("unexpected error for 404: %v", err) + } + if story != nil { + t.Errorf("expected nil story for 404, got %+v", story) + } +} + +// TestFetchStoryByMAC_BusinessError verifies that code != 0 returns (nil, nil). +func TestFetchStoryByMAC_BusinessError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Write(errorBody(404, "暂无可播放的故事")) + })) + defer srv.Close() + + client := rtcclient.New(srv.URL) + story, err := client.FetchStoryByMAC(context.Background(), "AA:BB:CC:DD:EE:FF") + if err != nil { + t.Fatalf("unexpected error for business error response: %v", err) + } + if story != nil { + t.Errorf("expected nil story for business error, got %+v", story) + } +} + +// TestFetchStoryByMAC_ServerError verifies that HTTP 5xx returns an error. +func TestFetchStoryByMAC_ServerError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + client := rtcclient.New(srv.URL) + _, err := client.FetchStoryByMAC(context.Background(), "AA:BB:CC:DD:EE:FF") + if err == nil { + t.Error("expected error for HTTP 500, got nil") + } +} + +// TestFetchStoryByMAC_ContextCanceled verifies that a canceled context returns an error. +func TestFetchStoryByMAC_ContextCanceled(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Never respond — let the client time out + <-r.Context().Done() + })) + defer srv.Close() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + client := rtcclient.New(srv.URL) + _, err := client.FetchStoryByMAC(ctx, "AA:BB:CC:DD:EE:FF") + if err == nil { + t.Error("expected error for canceled context, got nil") + } +} diff --git a/hw_service_go/internal/server/server.go b/hw_service_go/internal/server/server.go new file mode 100644 index 0000000..5ba90c5 --- /dev/null +++ b/hw_service_go/internal/server/server.go @@ -0,0 +1,216 @@ +// Package server 实现 WebSocket 服务器,管理硬件设备连接的生命周期。 +package server + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "net" + "net/http" + "sync" + + "github.com/gorilla/websocket" + "github.com/qy/hw-ws-service/internal/connection" + "github.com/qy/hw-ws-service/internal/handler" + "github.com/qy/hw-ws-service/internal/rtcclient" +) + +const ( + // maxConnections 最大并发连接数,防止资源耗尽。 + maxConnections = 500 + // maxMessageBytes WebSocket 单条消息上限(4KB),防止内存耗尽攻击。 + maxMessageBytes = 4 * 1024 +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + // IoT 设备无浏览器 Origin,允许所有来源 + CheckOrigin: func(r *http.Request) bool { return true }, +} + +// Server 管理所有活跃的设备连接。 +type Server struct { + client *rtcclient.Client + httpServer *http.Server + + mu sync.Mutex + conns map[string]*connection.Connection // key: DeviceID + wg sync.WaitGroup // 跟踪所有连接 goroutine +} + +// New 创建 Server,addr 形如 "0.0.0.0:8888"。 +func New(addr string, client *rtcclient.Client) *Server { + s := &Server{ + client: client, + conns: make(map[string]*connection.Connection), + } + + mux := http.NewServeMux() + mux.HandleFunc("/xiaozhi/v1/healthz", s.handleStatus) + mux.HandleFunc("/xiaozhi/v1/", s.handleConn) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + s.httpServer = &http.Server{ + Addr: addr, + Handler: mux, + } + return s +} + +// ListenAndServe 启动服务器,阻塞直到服务器关闭。 +func (s *Server) ListenAndServe() error { + log.Printf("server: listening on %s", s.httpServer.Addr) + err := s.httpServer.ListenAndServe() + if errors.Is(err, http.ErrServerClosed) { + return nil + } + return err +} + +// Shutdown 优雅关闭:先停止接受新连接,再等待所有连接 goroutine 退出。 +func (s *Server) Shutdown(ctx context.Context) { + log.Println("server: shutting down...") + s.httpServer.Shutdown(ctx) //nolint:errcheck + + // 等待所有连接 goroutine 退出(由 ctx 超时兜底) + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + log.Println("server: all connections closed gracefully") + case <-ctx.Done(): + log.Println("server: shutdown timeout, forcing close") + } +} + +// handleConn 处理单个 WebSocket 连接的完整生命周期。 +// URL 格式:/xiaozhi/v1/?device-id=&client-id= +func (s *Server) handleConn(w http.ResponseWriter, r *http.Request) { + deviceID := r.URL.Query().Get("device-id") + clientID := r.URL.Query().Get("client-id") + + if deviceID == "" { + http.Error(w, "missing device-id", http.StatusBadRequest) + return + } + + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("server: upgrade failed for %s: %v", deviceID, err) + return + } + + // 设置单条消息大小上限 + ws.SetReadLimit(maxMessageBytes) + + conn := connection.New(ws, deviceID, clientID) + + if err := s.register(conn); err != nil { + log.Printf("server: register %s failed: %v", deviceID, err) + ws.Close() + return + } + + s.wg.Add(1) + defer func() { + conn.StopPlayback() + s.unregister(deviceID) + ws.Close() + s.wg.Done() + log.Printf("server: device %s disconnected, active=%d", deviceID, s.activeCount()) + }() + + log.Printf("server: device %s connected, active=%d", deviceID, s.activeCount()) + + // 消息读取循环 + for { + msgType, raw, err := ws.ReadMessage() + if err != nil { + if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + if !isNetworkClose(err) { + log.Printf("server: read error for %s: %v", deviceID, err) + } + } + return + } + + // 只处理文本消息(二进制为上行音频,本服务暂不处理) + if msgType != websocket.TextMessage { + continue + } + + var envelope struct { + Type string `json:"type"` + } + if err := json.Unmarshal(raw, &envelope); err != nil { + log.Printf("server: invalid json from %s: %v", deviceID, err) + continue + } + + switch envelope.Type { + case "story": + go handler.HandleStory(conn, s.client) + default: + log.Printf("server: unhandled message type %q from %s", envelope.Type, deviceID) + } + } +} + +// register 注册连接,若同一设备已有连接则踢掉旧连接。 +func (s *Server) register(conn *connection.Connection) error { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.conns) >= maxConnections { + return errors.New("server: max connections reached") + } + + // 同一设备同时只允许一个连接 + if old, exists := s.conns[conn.DeviceID]; exists { + log.Printf("server: kicking old connection for %s", conn.DeviceID) + old.Close() + } + + s.conns[conn.DeviceID] = conn + return nil +} + +func (s *Server) unregister(deviceID string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.conns, deviceID) +} + +func (s *Server) activeCount() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.conns) +} + +// handleStatus 返回服务状态和当前活跃连接数,用于部署后验证。 +// GET /xiaozhi/v1/healthz → {"status":"ok","active_connections":N} +func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + active := s.activeCount() + fmt.Fprintf(w, `{"status":"ok","active_connections":%d}`, active) +} + +// isNetworkClose 判断是否为普通的网络关闭错误(不需要打印日志)。 +func isNetworkClose(err error) bool { + var netErr *net.OpError + return errors.As(err, &netErr) +} diff --git a/hw_service_go/k8s/deployment.yaml b/hw_service_go/k8s/deployment.yaml new file mode 100644 index 0000000..ebb05a1 --- /dev/null +++ b/hw_service_go/k8s/deployment.yaml @@ -0,0 +1,82 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: hw-ws-service + labels: + app: hw-ws-service +spec: + replicas: 2 + selector: + matchLabels: + app: hw-ws-service + # WebSocket 连接有状态,滚动更新时使用 Recreate 或 RollingUpdate + 优雅关闭 + strategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: 0 # 始终保持至少 2 个 Pod 可用 + maxSurge: 1 + template: + metadata: + labels: + app: hw-ws-service + spec: + # 优雅关闭总时限:90s(服务内部等待 80s,留 10s 缓冲) + terminationGracePeriodSeconds: 90 + + containers: + - name: hw-ws-service + image: ${CI_REGISTRY_IMAGE}/hw-ws-service:latest + imagePullPolicy: Always + ports: + - name: ws + containerPort: 8888 + protocol: TCP + + env: + - name: HW_WS_HOST + value: "0.0.0.0" + - name: HW_WS_PORT + value: "8888" + - name: HW_RTC_BACKEND_URL + # 集群内部直接访问 rtc-backend Service,不走公网 + value: "http://rtc-backend-svc:8000" + + lifecycle: + preStop: + exec: + # 等待 5s 让 LB/Ingress 将流量从本 Pod 摘除,再开始关闭 + command: ["/bin/sh", "-c", "sleep 5"] + + # 就绪探针:TCP 握手成功才接流量 + readinessProbe: + tcpSocket: + port: 8888 + initialDelaySeconds: 3 + periodSeconds: 5 + failureThreshold: 3 + + # 存活探针:连续失败 3 次才重启(避免短暂抖动误杀) + livenessProbe: + tcpSocket: + port: 8888 + initialDelaySeconds: 10 + periodSeconds: 15 + failureThreshold: 3 + + # 资源限制(根据实际负载调整) + resources: + requests: + cpu: "100m" + memory: "128Mi" + limits: + cpu: "500m" + memory: "512Mi" + + # 优先调度到不同节点,避免单点故障 + topologySpreadConstraints: + - maxSkew: 1 + topologyKey: kubernetes.io/hostname + whenUnsatisfiable: DoNotSchedule + labelSelector: + matchLabels: + app: hw-ws-service diff --git a/hw_service_go/k8s/service.yaml b/hw_service_go/k8s/service.yaml new file mode 100644 index 0000000..aa33c90 --- /dev/null +++ b/hw_service_go/k8s/service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: hw-ws-svc + labels: + app: hw-ws-service +spec: + type: ClusterIP + selector: + app: hw-ws-service + ports: + - name: websocket + port: 8888 + targetPort: 8888 + protocol: TCP diff --git a/hw_service_go/test/PLAN.md b/hw_service_go/test/PLAN.md new file mode 100644 index 0000000..3db1569 --- /dev/null +++ b/hw_service_go/test/PLAN.md @@ -0,0 +1,251 @@ +# hw_service_go 本地硬件通讯测试计划 + +> 目标:用浏览器模拟 ESP32 硬件,验证 `hw_service_go` WebSocket 服务能否正常接收指令、获取故事、推送 Opus 音频。 + +--- + +## 一、协议对比分析 + +### 1.1 小智(xiaozhi-server)vs 我们的服务 + +| 维度 | xiaozhi-server | hw_service_go(本服务) | +|------|---------------|------------------------| +| **WebSocket URL** | `ws://host:port/xiaozhi/v1/?device-id=&client-id=` | 完全相同 | +| **连接参数** | `device-id`(MAC)、`client-id`(UUID)| 完全相同 | +| **握手消息** | 需要发送 `hello` JSON | **不需要**,连上即用 | +| **触发指令** | `listen`(语音输入) | **只需发 `{"type":"story"}`** | +| **音频方向** | 双向(硬件上传语音 + 服务下发 TTS)| **单向下行**(服务→硬件,推 Opus) | +| **Opus 编解码** | 需要编码(麦克风)+ 解码(播放)| **只需解码**(浏览器只播放) | +| **认证** | token 参数 | **无需认证**(仅 device-id 校验) | +| **消息复杂度** | hello/listen/stt/llm/tts/mcp | **只有 tts 系列** | + +### 1.2 我们服务的完整消息流 + +``` +浏览器(模拟硬件) hw_service_go Django + │ │ │ + │── WS 连接 ──────────────────────────→│ │ + │ ?device-id=AA:BB:CC:DD:EE:FF │ │ + │ &client-id=test-001 │ │ + │ │ │ + │── {"type":"story"} ────────────────→ │ │ + │ │── GET /api/v1/devices/ │ + │ │ stories/?mac_address │ + │ │ =AA:BB:CC:DD:EE:FF → │ + │ │ │ + │← {"type":"tts","state":"start"} ───── │ │ + │ │← {title, audio_url} ── │ + │ │ │ + │ │── 下载 MP3 ──────────→ CDN + │ │← MP3 二进制流 ─────── │ + │ │ │ + │ │ ffmpeg 转码 PCM→Opus │ + │ │ │ + │← {"type":"tts","state":"sentence_start","text":"故事标题"} ─── │ + │ │ │ + │← [Opus帧1 二进制] ─────────────────── │ 60ms/帧,前3帧预缓冲 │ + │← [Opus帧2 二进制] ─────────────────── │ │ + │← [Opus帧3 二进制] ─────────────────── │ │ + │← [Opus帧N 二进制] ─────────────────── │ 按时序流控发送 │ + │ │ │ + │← {"type":"tts","state":"stop"} ─────── │ │ + │ │ │ +``` + +### 1.3 Opus 音频参数(与小智完全一致) + +| 参数 | 值 | +|------|----| +| 采样率 | 16000 Hz | +| 声道 | 1(单声道)| +| 帧时长 | 60ms | +| 每帧采样数 | 960 | +| 编码器 | libopus(WASM) | + +--- + +## 二、前置条件检查 + +在开始测试之前,需要满足以下条件: + +### 2.1 服务运行状态 +- [ ] Django 后端运行在 `http://localhost:8000` +- [ ] `hw_service_go` 运行在 `ws://localhost:8888` +- [ ] 健康检查通过:`curl http://localhost:8888/healthz` 返回 200 + +### 2.2 Django 数据准备(关键!) + +测试必须使用一个在 Django 数据库中**真实存在**的设备 MAC 地址。 + +Django API 查询逻辑(`GET /api/v1/devices/stories/?mac_address=`): +- 根据 MAC 查找设备 → 找到设备绑定的用户 → 查找该用户的故事 +- 任何一步缺失,服务返回 `null`,硬件不会播放任何内容 + +**需要在 Django Admin 或 API 中准备:** +1. 注册一个设备,记下其 MAC 地址(格式:`AA:BB:CC:DD:EE:FF`) +2. 该设备需已绑定用户(owner) +3. 该用户名下有至少一个故事(有 `audio_url` 字段) + +> **快速验证**:`curl "http://localhost:8000/api/v1/devices/stories/?mac_address=你的MAC"` 应返回 `{"code":0,"data":{"title":"...","audio_url":"..."}}` + +--- + +## 三、测试程序设计 + +### 3.1 技术选型 + +| 方案 | 优点 | 缺点 | +|------|------|------| +| **纯 HTML+JS(推荐)** | 零依赖,直接浏览器打开,与小智方案一致 | - | +| Python 脚本 | 简单但无法播放音频 | 无法验证音频播放端到端 | +| Go 命令行 | 需额外音频库 | 环境搭建复杂 | + +**选择方案:纯 HTML+JS 单文件**,复用小智项目的 `libopus.js`(WASM)做解码。 + +### 3.2 文件结构 + +``` +hw_service_go/test/ +├── PLAN.md ← 本文件 +├── test.html ← 测试主页面(待实现) +└── libopus.js ← 复制自小智项目(Opus WASM 解码库) +``` + +`libopus.js` 来源: +``` +/Users/maidong/Desktop/zyc/jikashe/xiaozhi-server/main/xiaozhi-server/test/libopus.js +``` + +### 3.3 测试页面功能 + +``` +┌─────────────────────────────────────────────────────┐ +│ hw_service_go 硬件通讯测试 │ +├─────────────────────────────────────────────────────┤ +│ 服务地址: [ws://localhost:8888/xiaozhi/v1/ ] │ +│ device-id: [AA:BB:CC:DD:EE:FF ] │ +│ client-id: [test-browser-001 ] [随机生成] │ +├─────────────────────────────────────────────────────┤ +│ [连接] [断开] 状态: ● 未连接 │ +├─────────────────────────────────────────────────────┤ +│ [▶ 触发故事播放] [⏹ 停止] │ +├─────────────────────────────────────────────────────┤ +│ 消息日志 [清空] │ +│ ┌───────────────────────────────────────────────┐ │ +│ │ [10:23:01] → 已连接 │ │ +│ │ [10:23:02] → 发送: {"type":"story"} │ │ +│ │ [10:23:02] ← 收到: {"type":"tts","state":"start"} │ +│ │ [10:23:03] ← 收到: {"type":"tts","state":"sentence_start","text":"..."} │ +│ │ [10:23:03] ← 收到: [Binary] Opus帧 #1 (38 bytes) │ +│ │ [10:23:03] 🔊 开始播放... │ │ +│ │ [10:23:15] ← 收到: {"type":"tts","state":"stop"} │ +│ │ [10:23:15] 🔊 播放完毕 │ │ +│ └───────────────────────────────────────────────┘ │ +│ │ +│ 统计: 已收到 85 个Opus帧 | 约 5.1s 音频 │ +└─────────────────────────────────────────────────────┘ +``` + +### 3.4 核心实现逻辑 + +#### 连接流程 +```javascript +const ws = new WebSocket( + `ws://localhost:8888/xiaozhi/v1/?device-id=${deviceId}&client-id=${clientId}` +); +ws.binaryType = 'arraybuffer'; +``` + +#### 触发故事 +```javascript +ws.send(JSON.stringify({ type: 'story' })); +``` + +#### 接收消息处理 +```javascript +ws.onmessage = (event) => { + if (event.data instanceof ArrayBuffer) { + // 二进制:Opus 音频帧 + const opusFrame = new Uint8Array(event.data); + const pcm = opusDecoder.decode(opusFrame); // Int16Array + schedulePlay(pcm); // 排队播放 + } else { + // 文本:控制消息 + const msg = JSON.parse(event.data); + handleTtsControl(msg); // 处理 start/sentence_start/stop + } +}; +``` + +#### Opus 解码 + 播放(与小智方案完全一致) +- 使用 `libopus.js`(WASM)初始化解码器:16000Hz,单声道 +- 解码:`Int16Array` → `Float32Array` +- 使用 `AudioContext` + `AudioBufferSourceNode` 按时序排队播放 +- 使用 `BlockingQueue` 缓冲帧,避免播放卡顿 + +--- + +## 四、测试用例 + +### Case 1:基础连接测试 +- 输入正确的 `device-id` 和 `client-id` +- 期望:WebSocket 连接建立成功,状态变为"已连接" + +### Case 2:故事触发测试 +- 发送 `{"type":"story"}` +- 期望: + 1. 收到 `{"type":"tts","state":"start"}` + 2. 收到 `{"type":"tts","state":"sentence_start","text":"<故事标题>"}` + 3. 陆续收到多个二进制 Opus 帧 + 4. 最终收到 `{"type":"tts","state":"stop"}` + +### Case 3:音频播放验证 +- 期望:浏览器实际播放出故事音频,声音正常无杂音、无卡顿 + +### Case 4:设备不存在测试 +- 使用未注册的 MAC 地址 +- 期望:发送故事指令后立即收到 `{"type":"tts","state":"stop"}`(服务侧找不到故事,直接结束) + +### Case 5:重复触发测试 +- 播放过程中再次点击"触发故事" +- 期望:旧播放被打断,新故事从头开始(hw_service_go 的 `StartPlayback` 会 close 旧 abortCh) + +### Case 6:断线重连测试 +- 连接后断开,再重新连接 +- 期望:可以正常重新发起故事请求 + +--- + +## 五、实现步骤 + +1. **复制 libopus.js** + ```bash + cp /Users/maidong/Desktop/zyc/jikashe/xiaozhi-server/main/xiaozhi-server/test/libopus.js \ + /Users/maidong/Desktop/zyc/qy_gitlab/rtc_backend/hw_service_go/test/ + ``` + +2. **编写 test.html**(单文件,嵌入所有 JS) + - 参考小智 `StreamingContext.js` 和 `BlockingQueue.js` 的逻辑 + - 去掉录音/编码部分(我们只需解码) + - 保留 Opus 解码 + AudioContext 播放部分 + - 添加连接配置 UI 和消息日志面板 + +3. **浏览器打开测试** + ``` + 直接用浏览器打开 test.html(file:// 协议即可) + ``` + > 注意:macOS Safari 对 WebSocket + file:// 可能有限制,建议用 Chrome + +4. **按测试用例逐项验证** + +--- + +## 六、已知限制与注意事项 + +| 问题 | 说明 | +|------|------| +| **device-id 必须真实存在** | MAC 地址若未在 Django 数据库注册,服务会静默返回无故事 | +| **ffmpeg 必须安装** | `hw_service_go` 的转码依赖系统 `ffmpeg`,需提前安装 | +| **audio_url 必须可访问** | 故事的 MP3 链接需要能从本机下载(阿里云 OSS 等) | +| **浏览器 AudioContext 限制** | 需要用户交互(点击)后才能创建 AudioContext,不能自动播放 | +| **WASM 加载** | libopus.js 较大(844KB),首次加载需要等待约 1-2 秒 | diff --git a/k8s/ingress.yaml b/k8s/ingress.yaml index a182e2a..1491661 100644 --- a/k8s/ingress.yaml +++ b/k8s/ingress.yaml @@ -14,6 +14,13 @@ spec: - host: qiyuan-rtc-api.airlabs.art http: paths: + - path: /xiaozhi/v1/ + pathType: Prefix + backend: + service: + name: hw-ws-svc + port: + number: 8888 - path: / pathType: Prefix backend: diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..8ceebde --- /dev/null +++ b/run.sh @@ -0,0 +1,103 @@ +#!/bin/bash +# RTC Backend 启动脚本(同时启动 Django + hw_service_go) + +DJANGO_PORT=${1:-8000} +WS_PORT=${2:-8888} +PROJECT_DIR="$(cd "$(dirname "$0")" && pwd)" +VENV_PYTHON="$PROJECT_DIR/venv/bin/python" +GO_SERVICE_DIR="$PROJECT_DIR/hw_service_go" + +echo "=== RTC Backend 启动脚本 ===" +echo "Django 端口: $DJANGO_PORT" +echo "WebSocket 端口: $WS_PORT" +echo "项目: $PROJECT_DIR" +echo "" + +# ---- 检查 Go 环境 ---- +GO_BIN=$(which go 2>/dev/null) +if [ -z "$GO_BIN" ]; then + # Homebrew 安装的 Go 不在默认 PATH,尝试常见位置 + for candidate in \ + /opt/homebrew/bin/go \ + /usr/local/go/bin/go \ + /opt/homebrew/Cellar/go/*/libexec/bin/go; do + if [ -x "$candidate" ]; then + GO_BIN="$candidate" + break + fi + done +fi +if [ -z "$GO_BIN" ]; then + echo "[x] 未找到 go 命令,请安装 Go 或检查 PATH" + exit 1 +fi +echo "[✓] Go: $GO_BIN" + +# ---- 检查虚拟环境 ---- +if [ ! -f "$VENV_PYTHON" ]; then + echo "[x] 未找到虚拟环境: $VENV_PYTHON" + exit 1 +fi +echo "[✓] 虚拟环境就绪" + +# ---- 释放占用端口的函数 ---- +free_port() { + local port=$1 + local name=$2 + local pid + pid=$(lsof -ti :"$port" 2>/dev/null) + if [ -n "$pid" ]; then + echo "[!] 端口 $port ($name) 被占用,PID: $pid,正在终止..." + kill -9 "$pid" 2>/dev/null + sleep 1 + pid=$(lsof -ti :"$port" 2>/dev/null) + if [ -n "$pid" ]; then + echo "[x] 端口 $port 释放失败,请手动处理" + exit 1 + fi + echo "[✓] 端口 $port 已释放" + else + echo "[✓] 端口 $port 空闲" + fi +} + +free_port "$DJANGO_PORT" "Django" +free_port "$WS_PORT" "hw_service_go" + +# ---- 退出时清理所有子进程 ---- +cleanup() { + echo "" + echo "=== 正在关闭所有服务... ===" + kill "$DJANGO_PID" "$WS_PID" 2>/dev/null + wait "$DJANGO_PID" "$WS_PID" 2>/dev/null + echo "=== 所有服务已停止 ===" +} +trap cleanup SIGINT SIGTERM EXIT + +# ---- 启动 Django ---- +echo "" +echo "=== 启动 Django 开发服务器 (0.0.0.0:$DJANGO_PORT) ===" +cd "$PROJECT_DIR" +$VENV_PYTHON manage.py runserver "0.0.0.0:$DJANGO_PORT" & +DJANGO_PID=$! +echo "[✓] Django PID: $DJANGO_PID" + +# ---- 启动 hw_service_go ---- +echo "" +echo "=== 启动 hw_service_go WebSocket 服务 (0.0.0.0:$WS_PORT) ===" +cd "$GO_SERVICE_DIR" +HW_RTC_BACKEND_URL="http://localhost:$DJANGO_PORT" \ +HW_WS_PORT="$WS_PORT" \ +"$GO_BIN" run ./cmd/main.go & +WS_PID=$! +echo "[✓] hw_service_go PID: $WS_PID" + +echo "" +echo "=== 所有服务已启动,按 Ctrl+C 停止 ===" +echo " Django: http://localhost:$DJANGO_PORT" +echo " WebSocket: ws://localhost:$WS_PORT/xiaozhi/v1/" +echo " 健康检查: http://localhost:$WS_PORT/healthz" +echo "" + +# 等待任意一个进程退出,然后触发 cleanup +wait -n "$DJANGO_PID" "$WS_PID" 2>/dev/null || wait "$DJANGO_PID" "$WS_PID"