SSE

基本概念

Server-Sent Events(SSE)是一种允许服务器向客户端实时推送更新的 Web 技术。与传统的请求 - 响应模式不同,SSE 建立了一个单向的通信通道,服务器可以主动向客户端发送数据,而客户端只能被动接收。这种特性使得 SSE 非常适合用于实时数据更新的场景,如实时新闻推送、股票行情更新等。

本质

参考文献:阮一峰

  • 严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。

  • 也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。

  • SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE,其他浏览器都支持。

与WebSocket 区别

特性 SSE (Server-Sent Events) WebSocket
通信方向 单工(服务器到客户端) 全双工
协议 基于HTTP 基于 TCP
自动重连 内置断线重连和消息追踪的功能 不在协议范围内,需手动实现
连接数 HTTP/1.1 限制为每个域名 6 个 ;HTTP/2 可协商(默认 100) 主要取决于服务器配置和资源,而非浏览器固定限制
数据格式 仅文本(二进制数据需要编码后传送) 支持文本和二进制
量级 轻量级,使用简单 相对复杂
状态 无状态( 基于HTTP 协议) 有状态
使用场景 单向数据流、通知、实时更新 需要双向通信的应用、聊天、游戏
自定义事件 支持 自定义事件类型 不支持
浏览器支持 较广泛,IE不支持 全面支持

EventSource 对象

参考文献:mdn

EventSource 接口是 web 内容与服务器发送事件通信的接口。一个 EventSource 实例会对 HTTP 服务器开启一个持久化的连接,以 text/event-stream 格式发送事件,此连接会一直保持开启直到通过调用 EventSource.close() 关闭。

兼容性

SSE 的客户端 API 部署在EventSource对象上。下面的代码可以检测浏览器是否支持 SSE。其中:不兼容IE

1
2
3
if ('EventSource' in window) {
// ...
}

Server-Sent Events (SSE)只能使用 GET 请求进行连接。

构造函数

使用 SSE 时,浏览器首先生成一个EventSource实例,向服务器发起连接。

1
const eventSource = new EventSource(url, options);

上面的url可以与当前网址同域,也可以跨域。跨域时,可以指定第二个参数,打开withCredentials属性,表示是否一起发送 Cookie。

1
const eventSource = new EventSource(url, { withCredentials: true });

实例属性

  • EventSource.readyState:表明连接的当前状态。该属性只读,可以取以下值。

    • 0:相当于常量EventSource.CONNECTING,表示连接还未建立,或者断线正在重连。

    • 1:相当于常量EventSource.OPEN,表示连接已经建立,可以接受数据。

    • 2:相当于常量EventSource.CLOSED,表示连接已断,且不会重连。

    1
    2
    3
    4
    5
    6
    7
    if (eventSource.readyState === EventSource.CONNECTING) {
    console.log('正在连接服务器...');
    } else if (eventSource.readyState === EventSource.OPEN) {
    console.log('已经连接上服务器!');
    } else if (eventSource.readyState === EventSource.CLOSED) {
    console.log('连接已经关闭。');
    }
  • EventSource.url:表示事件源的URL字符串,该属性只读。

  • EventSource.withCredentials:一个布尔值,指示EventSource对象是否使用 CORS 凭据设置进行实例化(true),或未使用 CORS 凭据设置进行实例化(false,默认值)。

实例方法

EventSource的方法**close()**用于关闭当前的连接,如果调用了这个方法,底部将EventSource.readyState这个属性值设置为2(关闭)

1
eventSource.close();

事件

事件 事件处理程序 描述
open eventSource.onopen 连接建立时触发
message eventSource.onmessage 客户端接收服务端数据时触发
error eventSource.onerror 通信发生错误时触发(比如连接中断)

数据格式

参考文献:前端也能这么丝滑!Node + Vue3 实现 SSE 流式文本输出

服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本,具有如下的 HTTP 头信息。其中Content-Type必须指定 MIME 类型为text/event-stream

1
2
3
Content-Type: text/event-stream  
Cache-Control: no-cache
Connection: keep-alive

后端推送的数据格式有点讲究,必须长这样:

1
data: 你要发的内容\n\n

每条消息都要以两个换行结尾(\n\n),否则前端收不到。比如express后端这样写:

1
res.write("data:hello world\n\n");  // 这里必须使用res.write() 而不能使用其他;(因为其他方法会自动关闭连接)

前端就能在 event.data 里拿到 hello world

除了 data:,其实还可以有这些字段:

  • id: 消息编号,前端可以拿来做断点续传
  • event: 自定义事件类型,默认是message事件,前端可以用 addEventListener 监听该事件
  • retry:服务器可以用retry字段,指定浏览器重新发起连接的时间间隔(毫秒);两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。

具体代码

完整代码地址:https://github.com/fsllala/study/tree/main/sse_study

后端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
const express = require('express');
const cors = require('cors');
const app = express();
const port = 3000;
app.use(cors()); // 解决跨域问题

app.get('/sse', (req, res) => {
// 设置响应头
res.set({
'Content-Type': 'text/event-stream', // SSE的响应头必须设置为text/event-stream
'Cache-Control': 'no-cache', // 禁止缓存,确保每次都能拿到最新的数据。
Connection: 'keep-alive', // 保持TCP长连接 允许服务端持续推送数据
});

// 快速响应: 立即把响应头发送给客户端,建立 SSE 通道。这样客户端就能马上开始接收数据,而不是等到服务端响应结束。
res.flushHeaders();

setInterval(() => {
// data: 数据\n\n (固定格式)
res.write(`data: ${Math.random()}\n\n`); // 这里必须使用res.write() 而不能使用其他;(因为其他方法会自动关闭连接)
}, 1000);
});

app.listen(port, () => {
console.log(`Server is running on port ${port}`);
});

前端

1
2
3
4
5
6
7
8
9
10
11
12
13
const eventSource = new EventSource('http://localhost:3000/sse');

eventSource.onopen = () => {
console.log('SSE连接成功');
};

eventSource.onmessage = (event) => {
console.log(event.data); // 打印数据
};

eventSource.onerror = (event) => {
console.log(event);
};

测试效果如下:

sse01

自定义事件类型

我们可以通过测试效果看到,ID为空,类型为message,这里我们可以进行修改:

1
2
3
4
5
6
7
setInterval(() => { 
res.write(`id: ${Date.now()}\n`); // 事件ID
res.write(`event: changedMessage\n`); // 自定义事件名
res.write(`retry: 1000\n`); // 重试时间
// data: 数据\n\n (固定格式)
res.write(`data: ${Math.random()}\n\n`); // 这里必须使用res.write() 而不能使用其他;(因为其他方法会自动关闭连接)
}, 1000);
1
2
3
eventSource.addEventListener('changedMessage', (event) => {  // 前端监听的事件要和后端类型保持一致
console.log(event.data); // 打印数据
});

image-20250713161326267

打字机效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const express = require('express');
const cors = require('cors');
const fs = require('node:fs');
const app = express();
const port = 3000;
app.use(cors()); // 解决跨域问题

const text = fs.readFileSync('./春晓.txt', 'utf-8'); // 春眠不觉晓,处处闻啼鸟。夜来风雨声,花落知多少。

app.get('/sse', (req, res) => {
// 设置响应头
res.set({
'Content-Type': 'text/event-stream', // SSE的响应头必须设置为text/event-stream
'Cache-Control': 'no-cache', // 禁止缓存,确保每次都能拿到最新的数据。
Connection: 'keep-alive', // 保持TCP长连接 允许服务端持续推送数据
});

res.flushHeaders(); // 快速响应: 让浏览器尽早识别这是一个 SSE 连接

const textArray = [...text];
let index = 0;
let timer = null;

timer = setInterval(() => {
if (index >= text.length) {
clearInterval(timer);
return;
}
res.write(`data: ${textArray[index]}\n\n`);
index++;
}, 100);

});

app.listen(port, () => {
console.log(`Server is running on port ${port}`);
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const eventSource = new EventSource('http://localhost:3000/sse');

eventSource.addEventListener('open', () => {
console.log('SSE连接成功');
});


const app = document.getElementById('app') as HTMLDivElement;
const arr: string[] = [];
eventSource.addEventListener('message', (event) => {
arr.push(...event.data.split(''));
app.innerHTML += arr.shift();
});

eventSource.addEventListener('error', (event) => {
console.log(event);
});

daziji

fetch

参考文献:你知道AI如何通过SSE流式渲染到页面的吗(附带完整案例)

Server-Sent Events (SSE)只能使用 GET 请求进行连接。如果接口是POST请求,则不能通过SSE实现,需要借助fetch。(ajax不能读取流数据)

使用场景:

  • 携带请求头信息:原生 EventSource 不支持在构造函数中添加自定义请求头

  • 参数传递有限:只能通过 URL 查询参数传递数据到后端。例如:

    1
    const eventSource = new EventSource('http://localhost:3000/sse?token=abc123&userId=123');

后端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const express = require('express');
const cors = require('cors');

const app = express();
const port = 3000;
app.use(cors()); // 解决跨域问题

// 这里改成post请求
app.post('/sse', (req, res) => {
// 设置响应头
res.set({
'Content-Type': 'text/event-stream', // SSE的响应头必须设置为text/event-stream
'Cache-Control': 'no-cache', // 禁止缓存,确保每次都能拿到最新的数据。
Connection: 'keep-alive', // 保持TCP长连接 允许服务端持续推送数据
});

res.flushHeaders(); // 快速响应: 让浏览器尽早识别这是一个 SSE 连接

timer = setInterval(() => {
res.write(`id: ${Date.now()}\n`);
res.write(`data: ${Math.random()}\n\n`);
}, 1000);
});

app.listen(port, () => {
console.log(`Server is running on port ${port}`);
});

前端

前端需要借助fetch请求POST的SSE。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async function startSSE() {
const response = await fetch('http://localhost:3000/sse', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
message: 'Hello, world!',
}),
});
const reader = response.body?.getReader(); // 可读流 读取流式数据
const read = await reader?.read(); // reader?.read()是promise
console.log("read", read);
/**
* {done: false, value: Uint8Array(44)}
* done: false 表示数据未结束; true 表示数据结束;
* value: Uint8Array(44) 表示二进制数据内容; (字节数组)
*/
const decoder = new TextDecoder(); // 解码器 读取字节数组
const text = decoder.decode(read?.value);
console.log("text", text);
}
startSSE();

image-20250714220828166

这里虽然SSE一直在推送数据,但是前端仅仅收到一次;因为仅仅await reader?.read()读取了一次,并且通过 decoder.decode(read?.value)解析了一次;如果一直读取解析需要写一个while循环;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async function startSSE() {
const response = await fetch('http://localhost:3000/sse', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
message: 'Hello, world!',
}),
});
const reader = response.body?.getReader(); // 可读流 读取流式数据
// const read = await reader?.read(); // reader?.read()是promise
// console.log('read', read);
/**
* {done: false, value: Uint8Array(44)}
* done: false 表示数据未结束; true 表示数据结束;
* value: Uint8Array(44) 表示二进制数据内容; (字节数组)
*/
const decoder = new TextDecoder(); // 解码器 读取字节数组
// const text = decoder.decode(read?.value);
// console.log("text", text);

while (true) {
const read = await reader?.read();
if (read?.done) break;
const text = decoder.decode(read?.value);
console.log('text', text);
}
}
startSSE();

获取具体的data数据

1
2
3
4
5
6
7
8
9
10
11
12
while (true) {
const read = await reader?.read();
if (read?.done) break;
const text = decoder.decode(read?.value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data:')) {
const data = line.split('data:')[1];
console.log(data);
}
}
}

但是使用fetch是没有自动重连的;虽然可以自己实现,但是还是太繁琐了,这里可以借助第三方库。

fetch-event-source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 前端代码  (后端不做修改)
import { fetchEventSource } from '@microsoft/fetch-event-source';

fetchEventSource('http://localhost:3000/sse', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
message: 'Hello, world!',
}),
onmessage(event) {
console.log('event', event);
},
onerror(event) {
console.log('event', event);
},
});

效果如下动图所示:可以看到可以像EventSource 一样接收数据,并可以自动重连。

fetchSSE

微信小程序

微信小程序不支持标准的 EventSource 和 fetch API。对于 SSE (Server-Sent Events),需要通过wx.request() + enableChunked: true + onChunkReceived 回调来模拟。

参考文献:

具体实现

官网wx.request 在2.20.2基础库后加上了enableChunked(启用分块)属性。

  • 属性:enableChunked启用分块。
  • 方法:RequestTask.onChunkReceived(function listener)监听 Transfer-Encoding Chunk Received 事件。当接收到新的chunk时触发。
  • 方法:RequestTask.offChunkReceived(function listener)移除 Transfer-Encoding Chunk Received 事件的监听函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const requestTask = wx.request({
url: 'http://localhost:3000/sse', // 按照你所对接API接口
enableChunked: true,
header: {'Content-Type': 'application/json'},
method: 'post',
data: 'XXX', // 按照你所对接API接口参数
success: () => {
console.info('发送成功')
},
fail() {
wx.showToast({
title: '发送失败,请重试',
icon: 'error'
})
}
})

// 添加数据块接收处理
requestTask.onChunkReceived(function(res) {
console.log('收到数据块:', res.data)
})

现象:微信开发者工具的Network看不到数据的推送,但是onChunkReceived可以监听并打印出数据的推送;

wx_sse01

解析ArrayBuffer

需要下载text-encoding插件。text-encoding 是一个用于处理文本编码和解码的库,特别适合在微信小程序中处理 ArrayBuffer 数据。这里就涉及到了小程序下载npm微信官方文档-npm 支持

  1. 初始化 package.json:在根目录,cmd 之后 npm init -y
  2. 下载依赖:npm install text-encoding --save
  3. 通过微信开发者工具构建 npm:在微信开发者工具中点击”工具” -> “构建 npm”,将 npm 包构建到小程序中。

image-20250715233259388

  1. 引入依赖包,并使用;
1
2
3
4
5
6
7
8
9
10
// xxx.js
const TextDecoder = require('text-encoding').TextDecoder;
// 使用 TextDecoder 解析 ArrayBuffer
requestTask.onChunkReceived(function(res) {
if (res.data instanceof ArrayBuffer) {
const decoder = new TextDecoder('utf-8');
const str = decoder.decode(new Uint8Array(res.data));
console.log('解码后的文本:', str);
}
});

image-20250715233550805

在 SSE 场景中的完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
const TextDecoder = require('text-encoding').TextDecoder;
const requestTask = wx.request({
url: 'http://localhost:3000/sse', // 按照你所对接API接口
enableChunked: true,
header: { 'Content-Type': 'application/json' },
method: 'post',
data: 'XXX', // 按照你所对接API接口参数
success: () => {
console.info('发送成功')
},
fail() {
wx.showToast({
title: '发送失败,请重试',
icon: 'error'
})
}
})

const decoder = new TextDecoder('utf-8');

// 处理接收到的数据块
requestTask.onChunkReceived(function (res) {
if (res.data instanceof ArrayBuffer) {
try {
// 解码为字符串
const str = decoder.decode(new Uint8Array(res.data));
console.log('收到SSE数据:', str);

// 如果数据是JSON格式,可以进一步解析
try {
const jsonData = JSON.parse(str);
console.log('解析后的JSON数据:', jsonData);
} catch (e) {
// 数据可能不是完整的JSON或不是JSON格式
console.log('接收到的是普通文本数据');
}
} catch (error) {
console.error('解码数据时出错:', error);
}
}
});

wx_sse02