This release is 10 versions behind 4.7.10 — the latest version of @hono/hono. Jump to latest
Built and signed on GitHub ActionsBuilt and signed on GitHub Actions
Built and signed on GitHub Actions
Web framework built on Web Standards
This package works with Cloudflare Workers, Node.js, Deno, Bun, Browsers




JSR Score
76%
Published
4 months ago (4.6.20)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596/** * @module * Stream utility. */ export class StreamingApi { private writer: WritableStreamDefaultWriter<Uint8Array> private encoder: TextEncoder private writable: WritableStream private abortSubscribers: (() => void | Promise<void>)[] = [] responseReadable: ReadableStream /** * Whether the stream has been aborted. */ aborted: boolean = false /** * Whether the stream has been closed normally. */ closed: boolean = false constructor(writable: WritableStream, _readable: ReadableStream) { this.writable = writable this.writer = writable.getWriter() this.encoder = new TextEncoder() const reader = _readable.getReader() // in case the user disconnects, let the reader know to cancel // this in-turn results in responseReadable being closed // and writeSSE method no longer blocks indefinitely this.abortSubscribers.push(async () => { await reader.cancel() }) this.responseReadable = new ReadableStream({ async pull(controller) { const { done, value } = await reader.read() done ? controller.close() : controller.enqueue(value) }, cancel: () => { this.abort() }, }) } async write(input: Uint8Array | string): Promise<StreamingApi> { try { if (typeof input === 'string') { input = this.encoder.encode(input) } await this.writer.write(input) } catch { // Do nothing. If you want to handle errors, create a stream by yourself. } return this } async writeln(input: string): Promise<StreamingApi> { await this.write(input + '\n') return this } sleep(ms: number): Promise<unknown> { return new Promise((res) => setTimeout(res, ms)) } async close() { try { await this.writer.close() } catch { // Do nothing. If you want to handle errors, create a stream by yourself. } this.closed = true } async pipe(body: ReadableStream) { this.writer.releaseLock() await body.pipeTo(this.writable, { preventClose: true }) this.writer = this.writable.getWriter() } onAbort(listener: () => void | Promise<void>) { this.abortSubscribers.push(listener) } /** * Abort the stream. * You can call this method when stream is aborted by external event. */ abort() { if (!this.aborted) { this.aborted = true this.abortSubscribers.forEach((subscriber) => subscriber()) } } }