Server-Sent Events (SSE):如何在 Hono 中实现实时通知?
Server-Sent Events (SSE) 是一种服务器主动向客户端推送文本数据的轻量级协议。它基于 HTTP,使用 text/event-stream MIME 类型,非常适合实现 实时日志、通知、进度更新 等场景。
Hono 通过 c.stream() 方法原生支持 SSE,让你可以轻松构建高效的实时通信。
一、SSE 核心概念
| 特性 | 说明 |
|---|---|
| 单向通信 | 仅服务器 → 客户端 |
| 基于 HTTP | 兼容现有基础设施(无需 WebSocket) |
| 自动重连 | 客户端断线后会自动尝试重连 |
| 文本数据 | 只能传输 UTF-8 文本(JSON 常见) |
| 低延迟 | 连接保持打开,无轮询开销 |
适用场景:股票行情、聊天室(只读)、系统监控、CI/CD 构建日志。
二、Hono 中的 c.stream() 实现 SSE
Hono 的 c.stream() 允许你以流式方式发送数据,完美适配 SSE。
1. 基础 SSE 路由
ts
// routes/sse.ts
import { Hono } from 'hono'
const app = new Hono()
app.get('/sse', async (c) => {
// 设置正确的 Content-Type
const stream = new ReadableStream({
async start(controller) {
// 模拟持续发送消息
const interval = setInterval(() => {
const data = {
time: new Date().toISOString(),
message: 'Hello from Hono SSE!',
random: Math.random()
}
// 发送 SSE 格式数据
controller.enqueue(`data: ${JSON.stringify(data)}\n\n`)
}, 2000)
// 清理资源
c.req.raw.signal.addEventListener('abort', () => {
clearInterval(interval)
controller.close()
})
}
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
})
})关键点:
data: ...后必须有\n\n表示一条消息结束;- 使用
ReadableStream和controller.enqueue()流式发送; - 监听
abort事件,避免内存泄漏。
三、使用 c.stream() 语法糖(推荐)
Hono 提供了更简洁的 c.stream() 方法:
ts
app.get('/sse', (c) => {
return c.stream(async (stream) => {
// 发送初始消息
await stream.write('data: {"msg": "Connected"}\n\n')
let counter = 0
const interval = setInterval(async () => {
if (counter >= 10) {
await stream.write('data: {"msg": "Ending"}\n\n')
await stream.close()
clearInterval(interval)
return
}
const data = {
id: counter,
timestamp: Date.now(),
value: `Message ${counter}`
}
// 写入 SSE 数据块
await stream.write(`id: ${counter}\n`)
await stream.write(`event: update\n`) // 自定义事件类型
await stream.write(`data: ${JSON.stringify(data)}\n\n`)
counter++
}, 1500)
// 清理
c.req.raw.signal.addEventListener('abort', () => {
clearInterval(interval)
})
})
})输出示例:
http
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
data: {"msg":"Connected"}
id: 0
event: update
data: {"id":0,"timestamp":1730281234567,"value":"Message 0"}
id: 1
event: update
data: {"id":1,"timestamp":1730281236067,"value":"Message 1"}四、前端接收 SSE 通知(Vue3 示例)
ts
// composables/useSSE.ts
import { ref, onMounted, onUnmounted } from 'vue'
export function useSSE(url: string) {
const events = ref<any[]>([])
const error = ref<string | null>(null)
const isConnected = ref(false)
let eventSource: EventSource | null = null
const connect = () => {
eventSource = new EventSource(url)
eventSource.onopen = () => {
console.log('SSE Connected')
isConnected.value = true
error.value = null
}
eventSource.onmessage = (e) => {
console.log('Received:', e.data)
const data = JSON.parse(e.data)
events.value.push({ type: 'message', data, time: new Date() })
}
// 监听自定义事件
eventSource.addEventListener('update', (e) => {
const data = JSON.parse(e.data)
events.value.push({ type: 'update', data, time: new Date() })
})
eventSource.onerror = (err) => {
console.error('SSE Error:', err)
error.value = 'Connection failed'
isConnected.value = false
// 浏览器会自动重连
}
}
const disconnect = () => {
if (eventSource) {
eventSource.close()
eventSource = null
}
}
onMounted(connect)
onUnmounted(disconnect)
return { events, error, isConnected, reconnect: connect, disconnect }
}在组件中使用:
text
<script setup lang="ts">
import { useSSE } from '@/composables/useSSE'
const { events, error, isConnected } = useSSE('/api/sse')
</script>
<template>
<div>
<p>Status: {{ isConnected ? 'Connected' : 'Disconnected' }}</p>
<p v-if="error" style="color: red">{{ error }}</p>
<div v-for="(event, i) in events" :key="i">
[{{ event.time.toLocaleTimeString() }}]
{{ event.type }}: {{ JSON.stringify(event.data) }}
</div>
</div>
</template>五、高级用法:广播通知(多客户端)
使用 WebSocket 风格的广播机制:
text
// lib/sse-broadcaster.ts
type Client = {
stream: any
id: string
}
class SSEBroadcaster {
private clients: Client[] = []
addClient(id: string, stream: any) {
this.clients.push({ id, stream })
console.log(`Client added: ${id}, total: ${this.clients.length}`)
// 移除时清理
const remove = () => {
this.clients = this.clients.filter(c => c.id !== id)
console.log(`Client removed: ${id}`)
}
stream.controller.signal.addEventListener('abort', remove)
}
broadcast(data: any) {
const message = `data: ${JSON.stringify(data)}\n\n`
this.clients.forEach(client => {
client.stream.write(message).catch(remove)
})
}
getClientsCount() {
return this.clients.length
}
}
export const broadcaster = new SSEBroadcaster()在路由中使用:
ts
app.get('/sse/news', (c) => {
const clientId = `client-${Date.now()}-${Math.random().toString(36)}`
return c.stream(async (stream) => {
broadcaster.addClient(clientId, stream)
await stream.write(`data: {"welcome": "Welcome ${clientId}!"}\n\n`)
// 模拟广播
const timer = setInterval(() => {
broadcaster.broadcast({ type: 'news', content: 'Breaking news!', time: new Date().toISOString() })
}, 5000)
c.req.raw.signal.addEventListener('abort', () => {
clearInterval(timer)
})
})
})六、最佳实践与注意事项
| 实践 | 说明 |
|---|---|
| ✅ 设置超时 | 避免连接无限期挂起 |
| ✅ 添加心跳 | 定期发送 :\n\n 注释防止代理超时 |
✅ 使用 id: 字段 | 支持断线重连后的消息恢复 |
| ✅ 错误处理 | 捕获 stream.write 异常 |
| ✅ 资源清理 | 监听 abort 事件释放定时器 |
| ✅ 生产环境启用 Gzip | 减少带宽(需流式压缩支持) |
心跳示例:
ts
// 每 30s 发送一个注释,防止 Nginx 等代理断开
setInterval(() => {
stream.write(':\n\n') // 空注释,不触发 onmessage
}, 30_000)七、SSE vs WebSocket 对比
| 特性 | SSE | WebSocket |
|---|---|---|
| 协议 | HTTP | TCP |
| 方向 | 服务端 → 客户端 | 双向 |
| 复杂度 | 低 | 高 |
| 兼容性 | 更好(HTTP 代理友好) | 需要 WS 支持 |
| 二进制 | ❌ | ✅ |
| 自动重连 | ✅ | ❌(需手动实现) |
结论:如果只需要服务器推送文本,优先选择 SSE。
八、总结
通过 Hono 的 c.stream(),你可以:
- 轻松实现实时通知、日志流、状态更新;
- 利用
ReadableStream进行高效流式传输; - 支持自动重连、自定义事件、ID 跟踪;
- 结合 BFF 模式,为前端提供统一的实时接口。
SSE 是构建现代实时 Web 应用的简单而强大的工具,Hono 让其实现变得异常简洁。