Skip to content

Server-Sent Events (SSE):如何在 Hono 中实现实时通知?

Server-Sent Events (SSE) 是一种服务器主动向客户端推送文本数据的轻量级协议。它基于 HTTP,使用 text/event-stream MIME 类型,非常适合实现 实时日志、通知、进度更新 等场景。

Hono 通过 c.stream() 方法原生支持 SSE,让你可以轻松构建高效的实时通信。

一、SSE 核心概念

特性说明
单向通信仅服务器 → 客户端
基于 HTTP兼容现有基础设施(无需 WebSocket)
自动重连客户端断线后会自动尝试重连
文本数据只能传输 UTF-8 文本(JSON 常见)
低延迟连接保持打开,无轮询开销

适用场景:股票行情、聊天室(只读)、系统监控、CI/CD 构建日志。

二、Hono 中的 c.stream() 实现 SSE

Hono 的 c.stream() 允许你以流式方式发送数据,完美适配 SSE。

1. 基础 SSE 路由
ts
// routes/sse.ts
import { Hono } from 'hono'

const app = new Hono()

app.get('/sse', async (c) => {
  // 设置正确的 Content-Type
  const stream = new ReadableStream({
    async start(controller) {
      // 模拟持续发送消息
      const interval = setInterval(() => {
        const data = {
          time: new Date().toISOString(),
          message: 'Hello from Hono SSE!',
          random: Math.random()
        }

        // 发送 SSE 格式数据
        controller.enqueue(`data: ${JSON.stringify(data)}\n\n`)
      }, 2000)

      // 清理资源
      c.req.raw.signal.addEventListener('abort', () => {
        clearInterval(interval)
        controller.close()
      })
    }
  })

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive'
    }
  })
})
关键点:
  • data: ... 后必须有 \n\n 表示一条消息结束;
  • 使用 ReadableStreamcontroller.enqueue() 流式发送;
  • 监听 abort 事件,避免内存泄漏。

三、使用 c.stream() 语法糖(推荐)

Hono 提供了更简洁的 c.stream() 方法:

ts
app.get('/sse', (c) => {
  return c.stream(async (stream) => {
    // 发送初始消息
    await stream.write('data: {"msg": "Connected"}\n\n')

    let counter = 0
    const interval = setInterval(async () => {
      if (counter >= 10) {
        await stream.write('data: {"msg": "Ending"}\n\n')
        await stream.close()
        clearInterval(interval)
        return
      }

      const data = {
        id: counter,
        timestamp: Date.now(),
        value: `Message ${counter}`
      }

      // 写入 SSE 数据块
      await stream.write(`id: ${counter}\n`)
      await stream.write(`event: update\n`) // 自定义事件类型
      await stream.write(`data: ${JSON.stringify(data)}\n\n`)

      counter++
    }, 1500)

    // 清理
    c.req.raw.signal.addEventListener('abort', () => {
      clearInterval(interval)
    })
  })
})
输出示例:
http
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache

data: {"msg":"Connected"}

id: 0
event: update
data: {"id":0,"timestamp":1730281234567,"value":"Message 0"}

id: 1
event: update
data: {"id":1,"timestamp":1730281236067,"value":"Message 1"}

四、前端接收 SSE 通知(Vue3 示例)

ts
// composables/useSSE.ts
import { ref, onMounted, onUnmounted } from 'vue'

export function useSSE(url: string) {
  const events = ref<any[]>([])
  const error = ref<string | null>(null)
  const isConnected = ref(false)

  let eventSource: EventSource | null = null

  const connect = () => {
    eventSource = new EventSource(url)

    eventSource.onopen = () => {
      console.log('SSE Connected')
      isConnected.value = true
      error.value = null
    }

    eventSource.onmessage = (e) => {
      console.log('Received:', e.data)
      const data = JSON.parse(e.data)
      events.value.push({ type: 'message', data, time: new Date() })
    }

    // 监听自定义事件
    eventSource.addEventListener('update', (e) => {
      const data = JSON.parse(e.data)
      events.value.push({ type: 'update', data, time: new Date() })
    })

    eventSource.onerror = (err) => {
      console.error('SSE Error:', err)
      error.value = 'Connection failed'
      isConnected.value = false
      // 浏览器会自动重连
    }
  }

  const disconnect = () => {
    if (eventSource) {
      eventSource.close()
      eventSource = null
    }
  }

  onMounted(connect)
  onUnmounted(disconnect)

  return { events, error, isConnected, reconnect: connect, disconnect }
}
在组件中使用:
text
<script setup lang="ts">
import { useSSE } from '@/composables/useSSE'

const { events, error, isConnected } = useSSE('/api/sse')
</script>

<template>
  <div>
    <p>Status: {{ isConnected ? 'Connected' : 'Disconnected' }}</p>
    <p v-if="error" style="color: red">{{ error }}</p>
    
    <div v-for="(event, i) in events" :key="i">
      [{{ event.time.toLocaleTimeString() }}] 
      {{ event.type }}: {{ JSON.stringify(event.data) }}
    </div>
  </div>
</template>

五、高级用法:广播通知(多客户端)

使用 WebSocket 风格的广播机制:

text
// lib/sse-broadcaster.ts
type Client = {
  stream: any
  id: string
}

class SSEBroadcaster {
  private clients: Client[] = []

  addClient(id: string, stream: any) {
    this.clients.push({ id, stream })
    console.log(`Client added: ${id}, total: ${this.clients.length}`)

    // 移除时清理
    const remove = () => {
      this.clients = this.clients.filter(c => c.id !== id)
      console.log(`Client removed: ${id}`)
    }

    stream.controller.signal.addEventListener('abort', remove)
  }

  broadcast(data: any) {
    const message = `data: ${JSON.stringify(data)}\n\n`
    this.clients.forEach(client => {
      client.stream.write(message).catch(remove)
    })
  }

  getClientsCount() {
    return this.clients.length
  }
}

export const broadcaster = new SSEBroadcaster()
在路由中使用:
ts
app.get('/sse/news', (c) => {
  const clientId = `client-${Date.now()}-${Math.random().toString(36)}`
  
  return c.stream(async (stream) => {
    broadcaster.addClient(clientId, stream)
    await stream.write(`data: {"welcome": "Welcome ${clientId}!"}\n\n`)

    // 模拟广播
    const timer = setInterval(() => {
      broadcaster.broadcast({ type: 'news', content: 'Breaking news!', time: new Date().toISOString() })
    }, 5000)

    c.req.raw.signal.addEventListener('abort', () => {
      clearInterval(timer)
    })
  })
})

六、最佳实践与注意事项

实践说明
✅ 设置超时避免连接无限期挂起
✅ 添加心跳定期发送 :\n\n 注释防止代理超时
✅ 使用 id: 字段支持断线重连后的消息恢复
✅ 错误处理捕获 stream.write 异常
✅ 资源清理监听 abort 事件释放定时器
✅ 生产环境启用 Gzip减少带宽(需流式压缩支持)
心跳示例:
ts
// 每 30s 发送一个注释,防止 Nginx 等代理断开
setInterval(() => {
  stream.write(':\n\n') // 空注释,不触发 onmessage
}, 30_000)

七、SSE vs WebSocket 对比

特性SSEWebSocket
协议HTTPTCP
方向服务端 → 客户端双向
复杂度
兼容性更好(HTTP 代理友好)需要 WS 支持
二进制
自动重连❌(需手动实现)

结论:如果只需要服务器推送文本,优先选择 SSE。

八、总结

通过 Hono 的 c.stream(),你可以:

  • 轻松实现实时通知、日志流、状态更新
  • 利用 ReadableStream 进行高效流式传输
  • 支持自动重连、自定义事件、ID 跟踪
  • 结合 BFF 模式,为前端提供统一的实时接口。

SSE 是构建现代实时 Web 应用的简单而强大的工具,Hono 让其实现变得异常简洁。