From c70bee7295adc0cf37affdeb51f3adbbfa3034fd Mon Sep 17 00:00:00 2001 From: pmc <740076875@qq.com> Date: Mon, 27 Apr 2026 17:41:44 +0800 Subject: [PATCH] feat: update device interaction views Co-Authored-By: Claude Sonnet 4.6 --- qy_lty/device_interaction/views.py | 106 +++++++++++++++++++++-------- 1 file changed, 77 insertions(+), 29 deletions(-) diff --git a/qy_lty/device_interaction/views.py b/qy_lty/device_interaction/views.py index c7d09e5..3a07f22 100644 --- a/qy_lty/device_interaction/views.py +++ b/qy_lty/device_interaction/views.py @@ -47,6 +47,24 @@ from common.swagger_utils import get_standardized_response_schema import logging logger = logging.getLogger(__name__) + +def _merge_subv_segments(segments): + """合并字幕分片:自动检测累积式(每段以前段为前缀)vs 增量式(直接拼接)。 + segments: 已按 seq 升序排好的 [{seq, text}, ...] + """ + if not segments: + return '' + if len(segments) == 1: + return segments[0]['text'] + is_cumulative = all( + segments[i]['text'].startswith(segments[i - 1]['text']) + for i in range(1, len(segments)) + ) + if is_cumulative: + return segments[-1]['text'] + return ''.join(s['text'] for s in segments) + + # Create your views here. # 设备相关视图集 @@ -1303,7 +1321,7 @@ class VolcEngineTokenViewSet(viewsets.ViewSet): subtitle_items.append(subtitle) logger.info('Subtitle: %s', subtitle) - # === 字幕落库(策略 A:仅在 definite && paragraph 时写入)=== + # === 字幕落库(策略 B:按 paragraph 累积拼接 + AI 归属锁)=== try: # 主路径:尝试从 webhook 上下文提取 task_id(火山如带) webhook_task_id = ( @@ -1323,58 +1341,88 @@ class VolcEngineTokenViewSet(viewsets.ViewSet): if not rtc_bot_id: logger.error('RTC_Voice_Agent Bot 未配置,跳过字幕落库') else: + MAX_BUFFER_SEGMENTS = 50 # paragraph 始终不来时的兜底强制 flush for item in subtitle_items: - text = (item.get('text') or '').strip() - if not text: + text = item.get('text') or '' + # 中间识别结果不入库 + if not item.get('definite'): continue - # 策略 A:只在一句话最终段落时落库 - if not (item.get('definite') and item.get('paragraph')): + if not text.strip(): continue user_id_in_subtitle = item.get('userId') or '' sequence = item.get('sequence', 0) + is_paragraph_end = bool(item.get('paragraph')) # 解析 ParadiseUser 归属 paradise_user_id = None if user_id_in_subtitle == 'bot01': - # AI 字幕:主路径 task_id 索引 -> 兜底 last_active_user - if webhook_task_id: - paradise_user_id = cache.get(f"rtc_task_user:{webhook_task_id}") + # AI 字幕:先看"当前 AI 回合归属锁" + paradise_user_id = cache.get('rtc_current_bot_owner') if not paradise_user_id: - paradise_user_id = cache.get('rtc_last_active_user') + # 没锁就用主路径/兜底解析,并加锁 + if webhook_task_id: + paradise_user_id = cache.get(f"rtc_task_user:{webhook_task_id}") + if not paradise_user_id: + paradise_user_id = cache.get('rtc_last_active_user') + if paradise_user_id: + # 续期归属锁,确保整段 AI 回复内一致(无论中间多长) + cache.set('rtc_current_bot_owner', str(paradise_user_id), 300) sender = ChatMessage.SENDER_BOT elif user_id_in_subtitle.isdigit(): paradise_user_id = user_id_in_subtitle sender = ChatMessage.SENDER_USER - # 用户字幕到达,刷新最近活跃用户(给后续 bot01 字幕兜底) + # 用户字幕到达,刷新最近活跃用户(给后续 bot01 兜底) cache.set('rtc_last_active_user', user_id_in_subtitle, 60) else: logger.warning('字幕 userId %r 无法识别,跳过', user_id_in_subtitle) continue if not paradise_user_id: - logger.warning('字幕无法归属用户: userId=%s task_id=%s', - user_id_in_subtitle, webhook_task_id) + logger.warning('字幕无法归属用户: userId=%s task_id=%s sequence=%s', + user_id_in_subtitle, webhook_task_id, sequence) continue - # 防重:同一 (paradise_user_id, sequence) 只写一次 - dedup_key = f"rtc_subv_seen:{paradise_user_id}:{sequence}" - if cache.get(dedup_key): - continue - cache.set(dedup_key, 1, 300) + # 累积到 buffer + buffer_key = f"rtc_subv_buffer:{sender}:{paradise_user_id}" + buf = cache.get(buffer_key) or [] + buf.append({'seq': sequence, 'text': text}) - # 写入 ChatMessage(截断超长,DB 字段上限 2048) - try: - ChatMessage.objects.create( - user_id=int(paradise_user_id), - bot_id=rtc_bot_id, - message=text[:2048], - sender=sender, - message_type=ChatMessage.MESSAGE_TYPE_TEXT, - ) - except Exception as e: - logger.error('字幕落库失败: %s, sender=%s, text=%r', - e, sender, text[:100]) + force_flush = len(buf) > MAX_BUFFER_SEGMENTS + if force_flush: + logger.warning('字幕 buffer 超过 %d 段仍未见 paragraph 边界,强制落库 user=%s sender=%s', + MAX_BUFFER_SEGMENTS, paradise_user_id, sender) + + if is_paragraph_end or force_flush: + # 防重:同一 (sender, paradise_user_id, last_sequence) 只落一次 + dedup_key = f"rtc_subv_flushed:{sender}:{paradise_user_id}:{sequence}" + if cache.get(dedup_key): + cache.delete(buffer_key) + continue + cache.set(dedup_key, 1, 300) + + # 排序拼接(自适应累积/增量) + buf_sorted = sorted(buf, key=lambda x: x.get('seq', 0)) + full_text = _merge_subv_segments(buf_sorted).strip() + + if full_text: + try: + ChatMessage.objects.create( + user_id=int(paradise_user_id), + bot_id=rtc_bot_id, + message=full_text[:2048], + sender=sender, + message_type=ChatMessage.MESSAGE_TYPE_TEXT, + ) + logger.info('字幕落库成功: sender=%s user=%s len=%d segs=%d', + sender, paradise_user_id, len(full_text), len(buf_sorted)) + except Exception as e: + logger.error('字幕落库失败: %s, sender=%s, text=%r', + e, sender, full_text[:100]) + cache.delete(buffer_key) + else: + # 还在回合中,刷新 buffer TTL + cache.set(buffer_key, buf, 300) except Exception as e: logger.error('字幕落库流程异常: %s', e) # === 字幕落库结束 ===