AI 通过SSE流式渲染到页面
目的
使用SSE+deepseekAPI+react+node.js实现流式输出markdown语法
效果速看
SSE简介
- SSE(Server-Sent Events)译为服务器推送事件,通过
EventSource
接口实现服务器推送通信。 EventSource
实例会对 HTTP 服务器开启一个持久化的连接
,以text/event-stream
格式发送事件
SSE使用
服务端设置
- 请求头设置
Content-Type
为text/event-stream
来开启SSE- 连接状态
Connection
为keep-alive
- 实时推送需要设置
Cache-Control
为no-cahce
停用缓存
const express = require("express");
const cors = require("cors");
const app = express();
// 允许跨域
app.use(cors());
// SSE端点
app.get("/sse", (req, res) => {
// 设置SSE需要的headers
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
// 定时发送消息
let counter = 0;
const intervalId = setInterval(() => {
counter++;
res.write(`event: message\n`);
res.write(`data: 这是第${counter}次推送\n\n`);
if (counter > 10) {
clearInterval(intervalId);
}
}, 1000);
// 客户端断开连接时清理
req.on("close", () => {
clearInterval(intervalId);
console.log("客户端断开连接");
});
});
// 启动服务器
const PORT = 8080;
app.listen(PORT, () => {
console.log(`Server running on http://localhost:${PORT}`);
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
客户端设置
new EventSource
创建实例- 监听
message
事件
<html>
<div id="root"></div>
<script>
const sse = new EventSource("http://localhost:8080/sse");
sse.addEventListener("message", (e) => {
console.log("---数据", e.data);
root.innerHTML += e.data + " ";
//页面渲染
});
</script>
</html>
2
3
4
5
6
7
8
9
10
11
- 缺点:
EventSource
只支持get
请求
可读流实现SSE
ReadableStream
用
fetch
发送post
请求,使用ReadableStream
实现流式传输
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
},
body: JSON.stringify(data),
}).catch((err) => {
console.log("err报错了", err);
});
// 获取 ReadableStream 并创建读取器
const reader = response.body.getReader();
}
2
3
4
5
6
7
8
9
10
11
12
13
reader.read()
得到的value
为字节流
TextDecoder 解码
const decoder = new TextDecoder();
// 持续读取流数据
while (true) {
const { done, value } = await reader.read();
if (done) {
reader.releaseLock();
break;
} // 流结束
console.log("字节流", value);
const chunk = decoder.decode(value);
console.log("解码后数据为", chunk);
}
2
3
4
5
6
7
8
9
10
11
12
markdown解析
引入markdown库
import { marked } from "marked";
marked.parse(value.current)
2
失去重连
- 注意:自己实现SSE的话, SSE将
失去重连
- 解决方法:第三方库
@microsoft/fetch-event-source
import { fetchEventSource } from '@microsoft/fetch-event-source';
fetchEventSource('http://localhost:3000/sse',{
method:'POST',
headers:{
'Content-Type':'application/json'
},
body:JSON.stringify({message:'Hello, SSE!'}),
onmessage:(event)=>{
console.log(JSON.parse(event.data))
},
onerror:(event)=>{
console.log(event)
}
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
具体代码
AI 调用
let OpenAI = require("openai");
const openai = new OpenAI({
baseURL: "https://api.deepseek.com/v1",
apiKey: "XXXXXX",//这里用自己的
});
async function main(content, res) {
try {
const response = await openai.chat.completions.create({
messages: [
{
role: "system",
content:
"You are a translation expert. Please format your responses in Markdown syntax.",
},
{ role: "user", content: content },
],
model: "deepseek-chat",
stream: true,
});
for await (const chunk of response) {
if (chunk.choices && chunk.choices[0] && chunk.choices[0].delta) {
const content = chunk.choices[0].delta.content || "";
res.write(`data: ${JSON.stringify({ text: content })}\n\n`);
}
}
} catch (error) {
console.error("Error in OpenAI request:", error);
res.write(
`event: error\ndata: ${JSON.stringify({ error: error.message })}\n\n`
);
} finally {
res.end();
}
}
module.exports = { main };
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
服务器
const express = require("express");
const app = express();
const PORT = 8080;
const cors = require("cors");
const { main } = require("./ai.js");
// 允许跨域
app.use(cors());
app.use(express.json());
app.post("/sse", (req, res) => {
try {
let body = req.body;
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
main(body.content, res);
} catch (error) {
console.error("Error in handling request:", error);
res.write(
`event: error\ndata: ${JSON.stringify({ error: error.message })}\n\n`
);
res.end();
}
});
app.listen(PORT, () => {
console.log(`Server running on http://localhost:${PORT}`);
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
前端代码
import React, { useState, useRef } from "react";
import { marked } from "marked";
const App = () => {
const value = useRef("");
const [renderedContent, setRenderedContent] = useState("");
const inputRef = useRef(null);
const fetchSSE = async () => {
const url = "http://localhost:8080/sse";
const inputValue = inputRef.current.value;
const data = {
userId: 123,
content: inputValue,
date: new Date().toISOString(),
};
try {
const response =
inputValue &&
(await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
},
body: JSON.stringify(data),
}));
// 获取 ReadableStream 并创建读取器
const reader = response.body.getReader();
const decoder = new TextDecoder();
// 持续读取流数据
while (true) {
const { done, value } = await reader.read();
if (done) {
reader.releaseLock();
break;
} // 流结束
const chunk = decoder.decode(value);
const events = chunk.split("\n\n"); // SSE 事件以双换行分隔
events.forEach((event) => {
if (event.trim() === "") return;
parseSSEEvent(event);
});
}
} catch (err) {
console.log("err报错了", err);
}
};
// 解析单个 SSE 事件
const parseSSEEvent = (event) => {
const lines = event;
let dataObj = JSON.parse(lines.split(": ")[1]);
value.current += dataObj.text;
setRenderedContent(marked.parse(value.current));
};
return (
<div>
<input type="text" ref={inputRef} />
<button onClick={fetchSSE}>发送</button>
<div dangerouslySetInnerHTML={{ __html: renderedContent }} />
</div>
);
};
export default App;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
SSE 简介
Server-Sent Events (SSE) 基于HTTP协议,是一种允许服务器主动向客户端推送数据的技术。
- 与传统的 HTTP 请求-响应模式区别:SSE 允许服务器在建立连接后,可以持久地向客户端发送消息,也就是建立一个长连接。
- 与 Websocket 区别: SSE 并不支持客户端向服务端发送消息,即 SSE 为单工通信。
- 与长轮询区别:不需要客户端反复请求数据
目前大模型问答的数据交互基本都是基于SSE去实现的,采用EventStream的事件流方式,实际效果像打字机一样,一段一段的返回答案。
采用这种方式有两种好处:
- 响应效率:响应效率大大提升,一边响应一边返回结果
- 用户体验:显著提高用户体验,给用户感觉就像是真实的对话一样
传统http:
客户端: POST 完整问题 → 服务器: 处理完成后返回完整答案(延迟高)
-------------------------------------------------------------
SSE:
客户端: POST 问题 → 服务器: 立即返回流式 token(低延迟)
▲ │
└───────────────────────┘ 单工通道
2
3
4
5
6
7
8
9
协议与消息
协议
SSE 协议本质是浏览器发起 http 请求,服务器在接受请求后返回消息,但是需要在响应头需要加下以下几个字段
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
2
3
- Content-Type: text/event-stream:SSE API规定推送事件流的类型为 text/event-stream。
- Cache-Control: no-cache:必须指定浏览器不缓存服务端发送的数据,以确保浏览器可以实时显示服务端发送的数据。
- Connection: keep-alive:SSE 是一个一直保持开启的 TCP 连接
消息格式
每条消息由一行或多行字段组成,每个字段组成形式为:字段名:字段值,同时字段以行为单位,每行一个(即以 \n 结尾)每次推送,可由多个消息组成,每个消息之间以空行分隔(即最后一个字段以\n\n结尾)
// 第一条消息
event:xxx(\n)
data:xxx
id: xx
retry:xx (\n\n)
// 第二条消息
event:xxx
data:xxx
2
3
4
5
6
7
8
9
字段类型
字段类型包含以下四种:
- event:指定事件类型
- 浏览器可以使用 addEventListener 方法在当前 EventSource 对象上进行监听
- 如果消息体没有event字段,则会触发 message 事件上的事件处理函数
- data:消息数据
- 数据内容只能以一个字符串的文本形式进行发送
- 如果需要发送一个对象时,需要将该对象以一个 JSON 格式的字符串的形式进行发送
- id:事件ID,事件的唯一标识符
- 如果发生断连,浏览器可以把收到的最后一个事件ID放到请求头Last-Event-Id 中进行重连
- retry:重连时间
- 由服务器指定重连时间,是一个整数值,单位为ms
- 如果没有指定retry,就由浏览器自行决定每隔多久与服务端建立一次连接
deepseek的具体示例
event: ready(\n)
data: {}(\n\n)
event: update_session(\n)
data: {"updated_at":1745054696.793322}(\n\n)
data: {"v": "好的", "p": "response/thinking_content"}
data: {"v": ",", "o": "APPEND"}
data: {"v": "我现在"}
data: {"v": "需要"}
data: {"v": "详细"}
data: {"v": "理解"}
...
event: close
data: {}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
注意
- 除上述四个字段外,其他所有字段都会被忽略
基于eventSource实现
在浏览器端,可以使用 JavaScript 的 EventSource API 创建 EventSource 对象,监听服务器发送的事件
建立连接
EventSource 接受两个参数:URL 和 options
- URL:请求地址
- options :是一个可选的对象,包含 withCredentials 属性,表示是否发送凭证(cookie、HTTP认证信息等)到服务端,默认为 false
const eventSource = new EventSource('服务器接口地址', { withCredentials: true })
监听事件
EventSource 对象触发的事件主要包括以下四种:
- open 事件:当成功连接到服务端时触发。
- message 事件:当接收到服务器发送的消息时触发。该事件对象的 data 属性包含了服务器发送的消息内容。
- error 事件:当发生错误时触发。该事件对象的 event 属性包含了错误信息。
- 自定义事件:可以监听event参数指定事件
eventSource.addEventListener('open', function(event) {
console.log('Connection opened')
})
eventSource.addEventListener('message', function(event) {
console.log('Received message: ' + event.data);
})
// 监听自定义事件
eventSource.addEventListener('ready', function(event) {
console.log('Received message: ' + event.data);
})
eventSource.addEventListener('error', function(event) {
console.log('Error occurred: ' + event.event);
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
使用限制
- 无法自定义请求头内容
- 只支持get请求,携带参数字段长度有限制
基于fetch-event-source实现
由于EventSource API限制,满足不了实际使用的需求,这边使用库 @microsoft/fetch-event-source
它既支持get及post请求,也支持自定义请求头内容。
具体实现
建立连接后,分别监听四种事件(open,message,close,error)
import { fetchEventSource } from '@microsoft/fetch-event-source';
async getFetchEventSource() {
const ctrl = new AbortController();
await fetchEventSource(`/frmp-rcmp/V1/stream/chat`, {
method: 'get',
headers: {
'Content-Type': 'text/event-stream',
'request-id': this.messageId,
},
openWhenHidden: true, // 页面退至后台保存连接
signal: ctrl.signal, // 用于终止请求
onopen(response) {
// 连接成功时触发
if (response.ok) {
console.log('连接成功');
return;
}
throw new Error('连接失败');
},
onmessage(event) {
// 检查是否是结束标识符
if (event.data == '[DONE]') {
console.log('数据接收完成');
return;
}
// 检查事件类型
if (event.event === 'custom-event') {
console.log('收到自定义事件:', event.data);
const customData = JSON.parse(event.data);
handleCustomEvent(customData); // 处理自定义事件
} else if (event.event === 'message') {
console.log('收到标准消息:', event.data);
const messageData = JSON.parse(event.data);
handleMessageEvent(messageData); // 处理标准消息
}
},
onclose() {
// 连接关闭时触发
console.log('连接终止');
_this.streamEnd = true;
},
onerror(err) {
// 错误处理(默认会抛出异常并自动重试)
console.error('错误:', err);
throw err; // 抛出错误会触发重试机制
},
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
底层实现
这个库的底层是基于fetch进行封装的,核心数据流程图如下:
初始化阶段
- 合并headers,设置默认值
- 绑定visibility事件监听,用于控制页面退至后台是否保持连接
- 初始化AbortController
- 绑定abort事件,用于终止请求
fetch 并未原生提供终止操作方法,可以通过 DOM API [AbortController]实现 fetch 请求终止操作
function fetchEventSource(input, {
headers,
onopen,
onmessage,
onerror,
fetch: inputFetch
}) {
// 1. 合并headers,设置默认accept
const headers = { accept: 'text/event-stream', ...inputHeaders };
// 2. 绑定页面可见性监听(自动重连)
if (!openWhenHidden) {
document.addEventListener('visibilitychange', onVisibilityChange);
}
// 3. 初始化AbortController
let curRequestController = new AbortController();
// 4. 绑定abort事件,用于终止请求
inputSignal?.addEventListener('abort', () => {
dispose();
resolve(); // don't waste time constructing/logging errors
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 页面可见性变化时自动重连
function onVisibilityChange() {
curRequestController.abort();
if (!document.hidden) create();
}
// 用户不可见时暂停连接
document.addEventListener('visibilitychange', onVisibilityChange);
2
3
4
5
6
7
8
请求处理流程
- 调用 fetch 发起 SSE 请求
- 验证是否连接成功,调用 onopen 回调
- 使用 getBytes、getLines 和 getMessages 逐层解析数据流
- 正常关闭,调用onclose回调
async function create() {
// 1. 发起Fetch请求(支持自定义fetch函数)
const response = await fetch(input, {
signal: curRequestController.signal
});
// 2. 验证Content-Type(默认逻辑可覆盖)
await onopen(response);
// 3. 流式处理管道(核心解析逻辑)
await getBytes(
response.body!,
getLines(
getMessages(
id => headers['last-event-id'] = id, // 更新last-event-id
retry => retryInterval = retry, // 动态调整重试间隔
onmessage // 触发用户消息回调
)
)
);
// 4. 正常关闭连接
onclose?.();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
流式数据处理
代码浏览器响应流 (ReadableStream)
→ getBytes() // 按字节读取
→ getLines() // 按行分割(\n\n)
→ getMessages()// 解析SSE事件字段
→ onmessage() // 用户回调
2
3
4
5
getBytes
- 通过流式读取器循环读取流中数据
- 每次读取到一块数据后,调用 getLines 的回调函数 onChunk。
export async function getBytes(stream, onChunk) {
// 创建一个流的读取器
const reader = stream.getReader();
let result;
// 循环读取流中的数据,直到流结束
while (!(result = await reader.read()).done) {
// 将读取到的字节块传递给回调函数 onChunk
onChunk(result.value);
}
}
2
3
4
5
6
7
8
9
10
11
getLines
根据特殊符号(:,\n,\r)将字节块按行分割,得到以下行:
event: custom-event
data: {"key": "value"}
<空行>
data: Hello, world!
2
3
4
5
6
7
核心代码如下
// 遍历缓冲区中的每个字节,寻找换行符以分割行
while (position < bufLength) {
// 如果上一个字符是 \r,则跳过紧随其后的 \n
if (discardTrailingNewline) {
if (buffer[position] === 10) { // 换行符(\n)
lineStart = ++position;
}
discardTrailingNewline = false;
}
let lineEnd = -1; // 当前行的结束位置
// 查找行结束位置(遇到冒号、回车符或换行符时停止)
for (; position < bufLength && lineEnd === -1; ++position) {
switch (buffer[position]) {
case 58: // 冒号(:)
// 如果尚未记录字段名长度,则计算字段名长度
if (fieldLength === -1) {
fieldLength = position - lineStart;
}
break;
case 13: // 回车符(\r)
discardTrailingNewline = true;
case 10: // 换行符(\n)
lineEnd = position; // 标记行结束位置
break;
}
}
// 如果未找到行结束位置,则退出循环
if (lineEnd === -1) {
break;
}
// 调用回调函数 onLine 处理当前行的数据
onLine(buffer.subarray(lineStart, lineEnd), fieldLength);
// 更新下一行的起始位置,并重置字段长度
lineStart = position;
fieldLength = -1;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
getMessages
基于TextDecoder解析每行的字节数组,生成以下消息对象
{ event: 'custom-event', data: '{"key": "value"}' }
{ event: 'message', data: 'Hello, world!' }
2
核心代码如下:
let message = newMessage();
const decoder = new TextDecoder(); // 用于解码字节数组为字符串
// 解码字段名(如 event: 或 data:)
const field = decoder.decode(line.subarray(0, fieldLength));
// 计算字段值的起始位置(考虑冒号后是否有空格)
const valueOffset = fieldLength + (line[fieldLength + 1] === 32 ? 2 : 1);
// 解码字段值
const value = decoder.decode(line.subarray(valueOffset));
// 根据字段名处理不同的字段
switch (field) {
case 'data':
// 如果 data 字段有多行,则用换行符拼接
message.data = message.data
? message.data + '\n' + value
: value;
break;
case 'event':
// 设置事件类型
message.event = value;
break;
case 'id':
// 设置消息 ID,并调用 onId 回调函数
onId(message.id = value);
break;
case 'retry':
// 设置重试间隔时间,并调用 onRetry 回调函数
const retry = parseInt(value, 10);
if (!isNaN(retry)) {
onRetry(message.retry = retry);
}
break;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
错误处理和重试
- 触发error事件
- 设置重试定时器,进行重连
try {
// ... 主流程 ...
} catch (err) {
// 1. 非主动中止的错误才会触发重试
if (!curRequestController.signal.aborted) {
// 2. 用户自定义错误处理策略
const interval = onerror?.(err) ?? retryInterval;
// 3. 动态设置重试定时器
retryTimer = setTimeout(create, interval);
}
}
2
3
4
5
6
7
8
9
10
11
遇到的问题
数据格式不规范
响应体的数据格式一定要严格标准格式来,不然会导致流式输出失效
event:xxx(\n)
data:xxx(\n\n)
event:xxx
data:xxx
2
3
4
5
开启压缩会导致流式输出失效
核心原因:
- 当启用压缩(如 Gzip/Brotli)时,数据需要完整压缩后才能输出,这会破坏流式传输的分块特性:
# 流式传输应有的效果(分块输出)
[Chunk1] --> [Chunk2] --> [Chunk3]
# 开启压缩后的效果(等待全部压缩)
[ Entire Compressed Data ] --> 一次性输出
2
3
4
5
在开发环境将webpack的compress设置为false
- compress 主要用于 响应压缩(Gzip/Brotli),目的是减少传输体积,提升加载速度
const isProduction = process.env.NODE_ENV === 'production';
devServer: {
compress: isProduction, // 开发环境关闭压缩
},
2
3
4
修改测试环境和生产环境的Nginx配置,针对流式接口禁用压缩
- 跟后端约定好接口名称(比如:需要带有stream)
- 对流式输出的接口内容(如实时日志、动态数据)禁用压缩。
- gzip off:对此路由禁用压缩。
- proxy_buffering off:禁止 Nginx 缓冲后端响应数据,确保数据直接流式传输到客户端。
# 匹配带有 stream 的接口路径,并关闭 gzip 压缩
location ~* /stream {
gzip off;
proxy_buffering off; # 关闭代理缓冲
}
2
3
4
5