HTTP 请求流式响应
约 3945 字大约 13 分钟
2025-01-21
HTTP 请求流式响应(Streaming Response)
是一种技术,允许服务器在不等待整个响应准备就绪的情况下,将数据逐步发送到客户端。这种方式减少了延迟,并能在长时间运行的任务或大数据传输场景中显著提高用户体验。
最近两年做 AI Agent
相关项目,最常用的就是流式响应,这样能够提升用户体验,减少等待时间。
特点
流式响应的主要有如下特点
- 分块传输(Chunked Transfer Encoding):
- 服务器将响应分成多个小块发送,而不是一次性返回完整的内容。
- 浏览器或客户端可以逐步解析收到的数据,而无需等待整个响应完成。
- 减少首字节延迟(TTFB):
- 服务器可以在计算或处理过程中逐步输出结果,而不是等待最终计算完成再返回全部数据。
- 持续连接:
- 服务器保持连接,持续向客户端发送更新数据,适用于实时应用。
应用场景
更具流式响应的特点来看,其应用场景主要包括:
- 大数据处理(如日志流、视频流)
- 实时推送(如聊天应用、监控系统)
- 长时间运行的任务(如导出大文件、数据分析)
- Server-Sent Events (SSE)
- AI 任务流式响应(如 ChatGPT、OpenAI API 响应)
提示 这里说一下 LLM
与 流式响应
的结合,LLM
的推理的过程就是,不断地参数下一个字符,这天然就流式的,所以其和流式响应的结合是很完美的。比如 ChatGPT
,其返回的 HTTP response
里 header Content-Type
的值就是 text/event-stream
,这样就可以实现流式响应,提升用户体验。我们
使用
数据格式
首先接口如果需要支持流式返回,需要把 HTTP response
里 header Content-Type
值设置为 text/event-stream
,表面响应是流式的。
流式响应
使用两个换行符来分隔前后的消息,每条消息支持4种属性,属性名和属性值之间用冒号区分,新起一行(即使用一个换行符)创建下一个属性。
id: D420B6E8-2F51-4235-B778-5C1C494681E8
event: city-notification
retry: 3000
data: Plainville
id: 3AC67AA0-2852-4D30-9103-23CEF4B43D6D
event: city-notification
retry: 3000
data: Bellevue
上面有两条消息(两个 server sent event
),两条消息之间有一个空行,即 Plainville
与 下一个 'id'
之间有2个换行符。
每条消息都有4个属性:
id
- 当前消息的id
event
- 当前消息的类型,根据业务需要自行定义。比如例子里是city-notification
,如果是天气推送,这个值可以是weather
;新闻推送,这个值可以是news
data
- 这条消息的内容,只能为文本类型(SSE
使用的是UTF8
编码的文本格式)。可以直接一个字符串,也可以是JSON
字符串或者自定义的字符串格式retry
- 值必须是数字(非数字自动忽略), 不同于其他的属性,这个属性跟当前消息无关,而是跟这次SSE
连接相关 —— 如果连接中断,客户端应该间隔多少毫秒再尝试重新连接。
只有上述四种属性是合法,其他属性都会被忽略。
提示 这只是规范,我们可以不用都遵循,比如 event
属性,我们完全可以不使用。
使用 FastAPI
来实现流式响应
需要接口支持流式返回,需要注意的是 HTTP response
里 header Content-Type
的值需要为 text/event-stream
。
下面是一个精简示例,隐藏了一些具体实现,只展示大概结构。
'''
Author: matiastang
Date: 2024-06-12 18:03:17
LastEditors: matiastang
LastEditTime: 2024-07-23 11:28:41
FilePath: /qk-agents/app/router/openai/chat.py
Description: openai chat
'''
from typing import Generator
from fastapi.responses import StreamingResponse
from fastapi import APIRouter, Request
from pydantic import BaseModel, Field
from agent.types.enum import OpenAIModel
from agent.openai.openai_agent import OpenAIAgent
class ChatBody(BaseModel):
"""
chat body
"""
# 问题
question: str = Field(min_length=1)
router = APIRouter(
prefix="/agent/openai",
tags=["agent.openai"],
)
@router.post('/chat')
def openai_chat(body: ChatBody, request: Request):
request_id: str = request.state.request_id
question = body.question
# 请求
response = OpenAIAgent(agent_name).chat(
question,
)
def stream_generator():
for chunk in response:
yield f"data: {chunk}\n\n"
return StreamingResponse(stream_generator(), media_type='text/event-stream')
可见使用fastapi框架来实现流式响应很简单,只需要在返回值里使用 StreamingResponse
,并传入一个生成器函数即可。而LLM的返回值是一个生成器,所以直接使用即可。
注意 这里只展示了一个大概,不保证能直接运行。具体实现细节可以根据业务需求调整。
使用 fetch
来处理流式响应
fetch
本身不支持直接支持流式输出,但可以使用 ReadableStream
和 TextDecoder
等 Web Streams API
来完成流式响应的处理。
const url = '测试地址'
fetch(url, {
method: 'POST',
body: JSON.stringify({
// ...参数
}),
dataType: 'text/event-stream',
headers:{
"Content-Type": 'application/json',
},
})
.then(res => {
// 检查响应是否成功
if (!res.ok) {
throw new Error('Network response was not ok');
}
return res.body
})
.then(async(body) => {
const reader = body?.getReader()
if (!reader) {
return;
}
const textDecoder = new TextDecoder()
let result = true
while (result) {
// 读取数据流
const { done, value } = await reader.read()
if (done) {
console.log("Stream ended")
result = false
break
}
const chunkText = textDecoder.decode(value)
console.info('chunk:', chunkText)
}
})
.catch(error => {
console.error('Fetch error:', error)
})
上面与我们之前的其他fetch请求有几个不同的地方:
dataType: 'text/event-stream'
,表示请求的数据类型是text/event-stream
,即流式响应。如果不设置,默认是application/json
。res.body
是一个ReadableStream
,需要使用getReader()
方法获取一个ReadableStreamDefaultReader
对象,然后使用read()
方法来读取数据流。TextDecoder
用于将Uint8Array
转换为字符串,因为ReadableStream
的数据是Uint8Array
类型。while (result)
循环用于不断读取数据流,直到数据流结束。
问题
记录一下在实际开发中使用流式响应时,遇到了一些问题。
流式效果不明显
在测试流式响应时,发现效果不明显。前端的表现效果是:结果是分块显示的,但是每次是很多个字符一起显示,输出不是很流畅。记得有一段时间通义千问也是有这种现象,我后面使用通义千问模型的时候,发现是他们模型响应的问题导致的,每包数据量比较大,导致前端效果不明显。和我之前遇到的情况不一样,我在接口打印了模型的输出,发现是一两个字符一包返回的,但在前端接收到的数据来看是很多个字符一起返回的,所以怀疑是Nginx的问题。最好发现是 Nginx
缓冲了响应数据,导致流式效果不明显。解决方法是在 Nginx
配置文件中禁用响应缓冲,具体配置如下:
# llm测试转发
location /api/llmtest/ {
proxy_pass http://127.0.0.1:8028/;
# 禁用Nginx的响应缓冲
proxy_buffering off;
# 设置其他必要的HTTP头部
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 可选:设置超时时间
proxy_read_timeout 300;
proxy_send_timeout 300;
}
大概意思就是,接口虽然是一两个字符一包一包的响应的,但是Nginx在转发的时候,把数据缓存起来,达到一定数量了之后,在一起返回给前端,所以前端看到的效果是很多个字符一起显示的。禁用Nginx的响应缓冲,就可以看到流式效果了。
粘包与截包
虽然我们的服务是一两个字符一包一包的响应的,Nginx
代理也是收到一包就转发一包的,但是在流式传输中,还是可能会出现粘包和截包现象。粘包是指多个数据包被合并成一个数据包发送,而截包是指数据包被截断,导致数据丢失,这是两个不同的问题。
提示 粘包和截包现象在TCP协议中是常见的,因为TCP协议是基于字节流的,没有消息边界。所以处理流式传输时有这些问题。
思考 粘包和截包现象,个人感觉主要还是网络波动导致的 收到数据包的速度 和 处理数据包的数据的速度 不一致导致的。当接收的速度快,处理的速度慢时,当处理完一包的数据,去读取下一包数据时,获取的是几包数据合并的数据,这就是粘包现象。当接收的速度慢,处理的速度快时,当处理完一包的数据,去读取下一包数据时,获取的数据包不完整,这就是截包现象。之前做过实验,在前端处理完一包数据,等待一点儿时间再去读取下一包数据时可以读去到多包数据(明显比不延时多)。后端发送一包数据后,延迟一下,再发送下一包数据,前端发生 粘包 的情况就会明显降低(同样的接口明显比不延时的时候低)。延时只是测试,每包都延时一点儿会导致最后的显示用时明显高于不延时的时候,所以延时只是测试,不能作为实际使用。
粘包
问题分析
前面也说了粘包现象是指前端一次读取到多个数据包合并成的数据。比如后端发送了h
、e
、l
、l
、o
,5包数据,前端一次读取到的是hello
,这就是粘包现象。
这就出现了一个疑问,前端一次接收到了hello
和分多次接收h
、e
、l
、l
、o
,然后再合并为hello
影响也不大,这个想法是正常的,当我们处理好了接口迭代和Nginx
转发,出现粘包就是网络波动引起的,如果每包数据都是字符串的话,那这个合并的问题就不大。但是,有时候我们流式响应的并不是一个简单的字符串,而是一个json
字符串,比如:{v: 'h'}
、{v: 'e'}
、{v: 'l'}
、{v: 'l'}
、{v: 'o'}
,前端一次读取到的是类似{v: 'h'}{v: 'e'}{v: 'l'}{v: 'l'}{v: 'o'}
的字符串,这就影响到我们解析了。
粘包解决方法
前面也说了,对于 粘包 之后不影响处理逻辑和显示的,我们可以不管,仅仅是流式效果不明显。比如每包数据为字符串。
对应每包数据有附加数据,是JSON字符串的,粘包之后会影响到我么解析数据,这种情况下,可以考虑使用一些技术分离出每包数据,对于JSON字符串来说,我们可以通过正则再次把其分成多包数据,再处理。或者为了稳定,我们可以在每包数据中添加一个标识符以此来分离出每包数据。
前面说的方案都是补救措施。我们应该考虑一下,正确的设计我们的分包逻辑,这样可以减少我们处理这种异常的情况,比如:我们把一些附加数据,放到最后一包里面,比如h
、e
、l
、l
、o
、{id: '123'}
,这样的话即使前面的包发送 粘包 也不会影响我们的逻辑和显示,我们只需要处理最后一包可能得 粘包 现象,比如o{id: '123'}
,这也很好处理。
截包
如果接口返回的每包数据过大,则会出现掉数据和合并多包的情况。一般返回分片的字符串没什么问题,如果是包装了一些数据的json
字符串,则需要多调试一下,防止数据量过大导致掉内容或合并的情况。(感觉这些情况请求的缓存有关系)
- 如果是掉数据的情况就没法了,只能想其他办法。比如把每包中相同的数据提取出来,在开始或结尾的时候单独发送,以减少每包的数据量。
- 如果是出现了多包合并的情况,可以在服务端响应的时候延时一点儿缓解,但是不能彻底解决。可以再在解码之后处理,比如手动拆解,然后合并数据。
上面的处理方式,都需要多进行测试验证才行。对于我这边是可以行的解决方案。
流传输中会出现黏包现象
思考
其他因素的影响
在 fetch API
中,reader.read()
读取流式响应数据时,没有固定的最大长度限制,但其行为受以下因素影响:
浏览器实现的限制
不同的浏览器对流处理的实现可能存在差异,通常会受到以下因素的限制:
- 缓冲区大小:
浏览器可能会对接收到的数据进行内部缓冲,例如 Chrome 会将数据存储在一定大小的缓冲区中(如 64KB、256KB 等),然后再交给应用处理。
- 内存管理:
如果长时间不消费流数据,浏览器可能会暂停接收,防止内存占用过高。
- 网络流控制(Flow Control):
浏览器和服务器之间可能会有 TCP 级别的流控,控制数据流的接收速率。
网络和服务器的限制
- 服务器端的
Transfer-Encoding: chunked
- 服务器通常会使用 chunked 编码分块发送数据,每个块大小不固定(可能几 KB,也可能更大)。
- 如果服务器没有限制,浏览器可以无限持续地接收流。
响应时间限制:
某些服务器可能会在长时间不读取的情况下断开连接。
fetch 和 ReadableStream 相关的限制
- 在 fetch 的 ReadableStream API 中,数据会被逐步读取,每次
reader.read()
可能不会获取到完整的块,而是以流式分块方式传递,常见特性: - 每次
read()
读取的大小是不确定的,由底层流控制。 - 在文本数据中,可能会在多次读取中拆分 UTF-8 字符,需注意字符编码处理。
- 可以在
stream.pipeTo()
等管道方法中控制数据的速率。
处理大数据流的建议
如果你的流式响应数据很大,建议采取以下方式来优化处理:
增量处理数据(避免等待整个流完成)
while (result) {
const { done, value } = await reader.read();
if (done) break;
const chunkText = textDecoder.decode(value, { stream: true }); // 使用stream参数增量解码
console.info('Received chunk:', chunkText);
}
分段处理数据,例如检测特定分隔符(如 \n
)再处理:
let buffer = '';
while (result) {
const { done, value } = await reader.read();
if (done) break;
buffer += textDecoder.decode(value, { stream: true });
let parts = buffer.split('\n');
parts.forEach(part => {
if (part.trim()) console.log("Processed:", part);
});
buffer = parts.pop(); // 保留未完整的部分
}
限制读取速率(防止消耗过多内存)
- 可以在读取数据时加入
await new Promise(r => setTimeout(r, 10))
限制读取频率。
浏览器可调参数
部分浏览器提供隐藏参数,可以调整流处理行为,例如 Chrome
可以在 about:config
中调整 net.prefetch
或相关流式配置项(非标准)。
总结如下:
- 在
fetch API
中,reader.read()
读取流式响应数据时,没有固定的最大长度限制,但受限于浏览器的缓冲区、网络流控制和服务器端的策略。 - 浏览器会自动处理数据流,但大数据传输时应注意内存使用。
- 可以使用
TextDecoder({ stream: true })
增量解码数据,避免数据过大导致阻塞。
如果你的应用需要处理超大流数据,建议监控内存消耗,并进行增量处理以提高性能和可靠性。