From 5ce2fb8aed3332a14a163a83f0bc30895203ae7e Mon Sep 17 00:00:00 2001 From: Misaka12843 Date: Fri, 27 Feb 2026 11:48:30 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E4=BC=A0=E8=BE=93=E8=AF=AD=E5=BA=8F=E9=94=99=E8=AF=AF=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../stream_backend.py" | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git "a/\347\233\270\345\205\263\345\220\216\347\253\257\351\241\271\347\233\256/\346\265\201\345\274\217\350\276\223\345\207\272/stream_backend.py" "b/\347\233\270\345\205\263\345\220\216\347\253\257\351\241\271\347\233\256/\346\265\201\345\274\217\350\276\223\345\207\272/stream_backend.py" index ec36be8..94b697b 100644 --- "a/\347\233\270\345\205\263\345\220\216\347\253\257\351\241\271\347\233\256/\346\265\201\345\274\217\350\276\223\345\207\272/stream_backend.py" +++ "b/\347\233\270\345\205\263\345\220\216\347\253\257\351\241\271\347\233\256/\346\265\201\345\274\217\350\276\223\345\207\272/stream_backend.py" @@ -1,5 +1,5 @@ # coding: utf-8 -VERSION = "1.0.2" +VERSION = "1.0.3" import asyncio from contextlib import asynccontextmanager @@ -14,6 +14,12 @@ import logging CLEANUP_INTERVAL = 24 * 60 * 60 # 清理过期任务的间隔(秒) +FORCE_THRESHOLD = 150 # 强制分割的阈值 + +# 当 AI 生成并成功分割出一个完整分段后,后端强制等待的时间。 +# 根据你的前端消耗速度,可自行调整 0.2 ~ 1.0 等值 +STREAM_PART_DELAY = 0.5 + SPLIT_STR_TUPLE = (',', ',', '。', '!', '!', '?', '?', ';', ';', ':', ':', '~', '--', '——', '...', '……', '\n', '\t', '\r') SYM_PAIRS = { "(": (")", 11), @@ -51,7 +57,6 @@ SPLIT_TOKENS = set(SPLIT_STR_TUPLE) ALL_TOKENS = sorted(OPEN_TOKENS | CLOSE_TOKENS | SPLIT_TOKENS, key=len, reverse=True) -FORCE_THRESHOLD = 150 # 强制分割的阈值 stream_data: Dict[str, Dict[str, Any]] = {} stream_lock = threading.Lock() # 线程锁 @@ -138,6 +143,8 @@ def process_stream(response, stream_id: str): try: part = "" for chunk in response: + has_new_part = False # !!!新增:用来标记本轮是否切分出了新的片段!!! + with stream_lock: if stream_id not in stream_data: return @@ -154,13 +161,21 @@ def process_stream(response, stream_id: str): if seg: (idx, token) = seg + # 找到符号,截断并存入 parts 数组 data['parts'].append(part[: idx + len(token)]) part = part[idx + len(token):] + has_new_part = True # 标记产生新片段 if cal_len(part) >= FORCE_THRESHOLD: # 如果长度超过阈值,则强制分割 data['parts'].append(part) part = "" + has_new_part = True # 标记产生新片段 + + # !!!新增:如果产生了新的分段,在此等待延迟,注意千万不要放在 with stream_lock 中阻塞! + if has_new_part: + time.sleep(STREAM_PART_DELAY) + # 兜底:处理结束后如果 part 还有剩余的尾巴,加进去 with stream_lock: if stream_id in stream_data: if part: @@ -201,7 +216,7 @@ async def start_completion( with stream_lock: stream_data[stream_id] = { - 'timestamp': time.time(), + 'time': time.time(), # !!!修复:将原代码的 timestamp 改为 time,与清理函数一致,防止崩塌 !!! 'model': body_obj['model'], 'prompt_tokens': prompt_tokens, 'parts': [], @@ -252,7 +267,7 @@ async def poll_completion( return { "status": data['status'], "results": results, - "next_after": len(parts) + "next_after": len(parts) } except HTTPException as he: logger.error(f"HTTP异常:{str(he)}")