This release is 9 versions behind 4.7.9 — the latest version of @hono/hono. Jump to latest
Built and signed on GitHub ActionsBuilt and signed on GitHub Actions
Built and signed on GitHub Actions
Web framework built on Web Standards
This package works with Cloudflare Workers, Node.js, Deno, Bun, Browsers




JSR Score
76%
Published
4 months ago (4.6.20)
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394import type { Context } from '../../context.ts' import { HtmlEscapedCallbackPhase, resolveCallback } from '../../utils/html.ts' import { StreamingApi } from '../../utils/stream.ts' import { isOldBunVersion } from './utils.ts' export interface SSEMessage { data: string | Promise<string> event?: string id?: string retry?: number } export class SSEStreamingApi extends StreamingApi { constructor(writable: WritableStream, readable: ReadableStream) { super(writable, readable) } async writeSSE(message: SSEMessage) { const data = await resolveCallback(message.data, HtmlEscapedCallbackPhase.Stringify, false, {}) const dataLines = (data as string) .split('\n') .map((line) => { return `data: ${line}` }) .join('\n') const sseData = [ message.event && `event: ${message.event}`, dataLines, message.id && `id: ${message.id}`, message.retry && `retry: ${message.retry}`, ] .filter(Boolean) .join('\n') + '\n\n' await this.write(sseData) } } const run = async ( stream: SSEStreamingApi, cb: (stream: SSEStreamingApi) => Promise<void>, onError?: (e: Error, stream: SSEStreamingApi) => Promise<void> ): Promise<void> => { try { await cb(stream) } catch (e) { if (e instanceof Error && onError) { await onError(e, stream) await stream.writeSSE({ event: 'error', data: e.message, }) } else { console.error(e) } } finally { stream.close() } } const contextStash: WeakMap<ReadableStream, Context> = new WeakMap<ReadableStream, Context>() export const streamSSE = ( c: Context, cb: (stream: SSEStreamingApi) => Promise<void>, onError?: (e: Error, stream: SSEStreamingApi) => Promise<void> ): Response => { const { readable, writable } = new TransformStream() const stream = new SSEStreamingApi(writable, readable) // Until Bun v1.1.27, Bun didn't call cancel() on the ReadableStream for Response objects from Bun.serve() if (isOldBunVersion()) { c.req.raw.signal.addEventListener('abort', () => { if (!stream.closed) { stream.abort() } }) } // in bun, `c` is destroyed when the request is returned, so hold it until the end of streaming contextStash.set(stream.responseReadable, c) c.header('Transfer-Encoding', 'chunked') c.header('Content-Type', 'text/event-stream') c.header('Cache-Control', 'no-cache') c.header('Connection', 'keep-alive') run(stream, cb, onError) return c.newResponse(stream.responseReadable) }