This release is 3 versions behind 1.1.4 — the latest version of @iuioiua/redis. Jump to latest
Built and signed on GitHub ActionsBuilt and signed on GitHub Actions
Built and signed on GitHub Actions
Fast, lightweight Redis client built upon the Web Streams API.
This package works with Cloudflare Workers, Node.js, Deno, Bun, Browsers




JSR Score
100%
Published
2 months ago (1.1.1)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493// deno-lint-ignore-file no-explicit-any import { chunk } from "jsr:/@std/collections@^1.0.10/chunk"; import { concat } from "jsr:/@std/bytes@^1.0.5/concat"; /** * A Redis client for interacting with a Redis server. * * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { assertEquals } from "@std/assert/equals"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * const reply1 = await redisClient.sendCommand(["SET", "hello", "world"]); * assertEquals(reply1, "OK"); * * const reply2 = await redisClient.sendCommand(["GET", "hello"]); * assertEquals(reply2, "world"); * ``` * * @module */ /** Command sent to a Redis server. */ export type Command = (string | number | Uint8Array)[]; /** Reply received from a Redis server and triggered by a command. */ export type Reply = | string | number | null | boolean | bigint | Record<string, any> | Reply[]; const encoder = new TextEncoder(); const decoder = new TextDecoder(); const ARRAY_PREFIX = "*".charCodeAt(0); const ATTRIBUTE_PREFIX = "|".charCodeAt(0); const BIG_NUMBER_PREFIX = "(".charCodeAt(0); const BLOB_ERROR_PREFIX = "!".charCodeAt(0); const BOOLEAN_PREFIX = "#".charCodeAt(0); const BULK_STRING_PREFIX = "$".charCodeAt(0); const DOUBLE_PREFIX = ",".charCodeAt(0); const ERROR_PREFIX = "-".charCodeAt(0); const INTEGER_PREFIX = ":".charCodeAt(0); const MAP_PREFIX = "%".charCodeAt(0); const NULL_PREFIX = "_".charCodeAt(0); const PUSH_PREFIX = ">".charCodeAt(0); const SET_PREFIX = "~".charCodeAt(0); const SIMPLE_STRING_PREFIX = "+".charCodeAt(0); const VERBATIM_STRING_PREFIX = "=".charCodeAt(0); const CRLF_BYTES = encoder.encode("\r\n"); const ARRAY_PREFIX_BYTES = encoder.encode("*"); const BULK_STRING_PREFIX_BYTES = encoder.encode("$"); /** * Transforms a command, which is an array of arguments, into an RESP request. * * @see {@link https://redis.io/docs/reference/protocol-spec/#send-commands-to-a-redis-server} */ function createRequest(command: Command): Uint8Array { const lines = [ ARRAY_PREFIX_BYTES, encoder.encode(command.length.toString()), CRLF_BYTES, ]; for (const arg of command) { const bytes = arg instanceof Uint8Array ? arg : encoder.encode(arg.toString()); lines.push( BULK_STRING_PREFIX_BYTES, encoder.encode(bytes.byteLength.toString()), CRLF_BYTES, bytes, CRLF_BYTES, ); } return concat(lines); } async function* readLines( readable: ReadableStream<Uint8Array<ArrayBufferLike>>, ) { let chunks: Uint8Array<ArrayBufferLike> = new Uint8Array(); for await (const chunk of readable) { chunks = concat([chunks, chunk]) as Uint8Array<ArrayBufferLike>; let index; while ( (index = chunks.indexOf(CRLF_BYTES[0])) !== -1 && chunks[index + 1] === CRLF_BYTES[1] ) { yield chunks.subarray(0, index); chunks = chunks.subarray(index + 2); } } } function readNReplies( iterator: AsyncIterableIterator<Uint8Array>, length: number, raw = false, ): Promise<Reply[]> { return Array.fromAsync({ length }, () => readReply(iterator, raw)); } function parseLine(value: Uint8Array): string { return decoder.decode(value.slice(1)); } async function readReply( iterator: AsyncIterableIterator<Uint8Array>, raw = false, ): Promise<Reply> { const { value } = await iterator.next(); switch (value[0]) { case ARRAY_PREFIX: case PUSH_PREFIX: { const length = Number(parseLine(value)); return length === -1 ? null : await readNReplies(iterator, length); } case ATTRIBUTE_PREFIX: { // TODO: include attribute data somehow const length = Number(parseLine(value)) * 2; // Read but don't return attribute data await readNReplies(iterator, length); return readReply(iterator, raw); } case BIG_NUMBER_PREFIX: return BigInt(parseLine(value)); case BLOB_ERROR_PREFIX: { // Skip to reading the next line, which is a string const { value } = await iterator.next(); return Promise.reject(decoder.decode(value)); } case BOOLEAN_PREFIX: return parseLine(value) === "t"; case BULK_STRING_PREFIX: case VERBATIM_STRING_PREFIX: return parseLine(value) === "-1" ? null : readReply(iterator, raw); case DOUBLE_PREFIX: case INTEGER_PREFIX: { switch (parseLine(value)) { case "inf": return Infinity; case "-inf": return -Infinity; default: return Number(parseLine(value)); } } case ERROR_PREFIX: return Promise.reject(parseLine(value)); case MAP_PREFIX: { const length = Number(parseLine(value)) * 2; const array = await readNReplies(iterator, length); return Object.fromEntries(chunk(array, 2)); } case NULL_PREFIX: return null; case SET_PREFIX: return new Set( await readNReplies(iterator, Number(parseLine(value)), raw), ); case SIMPLE_STRING_PREFIX: return parseLine(value); // No prefix default: return raw ? value : decoder.decode(value); } } /** * A Redis client for interacting with a Redis server. * * @example Send RESPv2 commands * * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { assertEquals } from "@std/assert/equals"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * const reply1 = await redisClient.sendCommand(["SET", "hello", "world"]); * assertEquals(reply1, "OK"); * * const reply2 = await redisClient.sendCommand(["GET", "hello"]); * assertEquals(reply2, "world"); * ``` * * @example Send RESP3 commands * * Switch to * {@link https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md | RESP3} * by sending a {@link https://redis.io/docs/latest/commands/hello/ | HELLO} * command with the version number 3. * * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { assertEquals } from "@std/assert/equals"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * // Switch to RESP3 * await redisClient.sendCommand(["HELLO", 3]); * * const reply1 = await redisClient.sendCommand(["HSET", "myhash", "foo", 1, "bar", 2]); * assertEquals(reply1, 2); * * const reply2 = await redisClient.sendCommand(["HGETALL", "myhash"]); * assertEquals(reply2, { foo: "1", bar: "2" }); * ``` * * @example Receive raw data * * Receive raw data by setting the `raw` parameter to `true` for your given * method. This functionality is exclusive to bulk string replies. * * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { assertEquals } from "@std/assert/equals"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * const data = new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); * * const reply1 = await redisClient.sendCommand(["SET", "data", data]); * assertEquals(reply1, "OK"); * * const reply2 = await redisClient.sendCommand(["GET", "data"], true); * assertEquals(reply2, data); * ``` * * @example Execute operations with timeouts * * See the Deno Standard Library's * {@linkcode https://jsr.io/@std/async/doc/~/deadline | deadline()} for more * information. This function can be applied to any asynchronous operation. * * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { deadline } from "jsr:@std/async/deadline"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * // Rejects with a timeout error if the command takes longer than 100 milliseconds. * await deadline(redisClient.sendCommand(["GET", "foo"]), 100); * ``` * * @example Retry operations * * See the Deno Standard Library's * {@linkcode https://jsr.io/@std/async/doc/~/retry | retry()} for more * information. This function can be applied to any asynchronous operation. * * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { retry } from "jsr:@std/async/retry"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * // Retries to connect until successful const the exponential backoff algorithm. * await retry(() => redisClient.sendCommand(["GET", "foo"])); * ``` * * @example Pipeline commands * * See * {@link https://redis.io/docs/latest/develop/use/pipelining/ | Redis pipelining} * for more information. * * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { assertEquals } from "@std/assert/equals"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * const replies = await redisClient.pipelineCommands([ * ["INCR", "Y"], * ["INCR", "Y"], * ["INCR", "Y"], * ["INCR", "Y"], * ]); * assertEquals(replies, [1, 2, 3, 4]); * ``` * * @example Use pub/sub channels * * See * {@link https://redis.io/docs/latest/develop/interact/pubsub/ | Redis Pub/Sub} * for more information. * * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { assertEquals } from "@std/assert/equals"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * await redisClient.writeCommand(["SUBSCRIBE", "mychannel"]); * for await (const reply of redisClient.readReplies()) { * assertEquals(reply, ["subscribe", "mychannel", 1]); * break; * } * await redisClient.writeCommand(["UNSUBSCRIBE", "mychannel"]); * ``` * * @example Perform transaction * * See {@link https://redis.io/docs/latest/develop/interact/transactions/ | Transactions} * for more information. * * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { assertEquals } from "@std/assert/equals"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * assertEquals(await redisClient.sendCommand(["MULTI"]), "OK"); * assertEquals(await redisClient.sendCommand(["INCR", "QUX"]), "QUEUED"); * assertEquals(await redisClient.sendCommand(["INCR", "QUX"]), "QUEUED"); * assertEquals(await redisClient.sendCommand(["EXEC"]), [1, 2]); * ``` * * @example Execute Lua scripts * * See * {@link https://redis.io/docs/latest/develop/interact/programmability/eval-intro/ | Scripting with Lua} * for more information. * * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { assertEquals } from "@std/assert/equals"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * const reply1 = await redisClient.sendCommand(["EVAL", "return ARGV[1]", 0, "hello"]); * assertEquals(reply1, "hello"); * * const reply2 = await redisClient.sendCommand([ * "FUNCTION", * "LOAD", * "#!lua name=mylib\nredis.register_function('knockknock', function() return 'Who\\'s there?' end)", * ]); * assertEquals(reply2, "mylib"); * * const reply3 = await redisClient.sendCommand(["FCALL", "knockknock", 0]); * assertEquals(reply3, "Who's there?"); * ``` */ export class RedisClient { #writer: WritableStreamDefaultWriter<Uint8Array>; #lines: AsyncIterableIterator<Uint8Array>; #queue: Promise<any> = Promise.resolve(); constructor( conn: { readable: ReadableStream<Uint8Array>; writable: WritableStream<Uint8Array>; }, ) { this.#writer = conn.writable.getWriter(); this.#lines = readLines(conn.readable); } #enqueue<T>(task: () => Promise<T>): Promise<T> { this.#queue = this.#queue.then(task); return this.#queue; } /** * Sends a command to the Redis server and returns the reply. * * @example Basic usage * * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { assertEquals } from "@std/assert/equals"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * const reply1 = await redisClient.sendCommand(["SET", "hello", "world"]); * assertEquals(reply1, "OK"); * * const reply2 = await redisClient.sendCommand(["GET", "hello"]); * assertEquals(reply2, "world"); * ``` */ sendCommand(command: Command, raw = false): Promise<Reply> { return this.#enqueue(() => { this.#writer.write(createRequest(command)); return readReply(this.#lines, raw); }); } /** * Writes a command to the Redis server without listening for a reply. * * @example Basic usage * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { assertEquals } from "@std/assert/equals"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * await redisClient.writeCommand(["SUBSCRIBE", "mychannel"]); * for await (const reply of redisClient.readReplies()) { * assertEquals(reply, ["subscribe", "mychannel", 1]); * break; * } * await redisClient.writeCommand(["UNSUBSCRIBE", "mychannel"]); * ``` */ writeCommand(command: Command): Promise<void> { return this.#enqueue(() => this.#writer.write(createRequest(command))); } /** * Used for pub/sub. Listens for replies from the Redis server. * * See * {@link https://redis.io/docs/latest/develop/interact/pubsub/ | Redis Pub/Sub} * for more information. * * @example Basic usage * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { assertEquals } from "@std/assert/equals"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * await redisClient.writeCommand(["SUBSCRIBE", "mychannel"]); * for await (const reply of redisClient.readReplies()) { * assertEquals(reply, ["subscribe", "mychannel", 1]); * break; * } * await redisClient.writeCommand(["UNSUBSCRIBE", "mychannel"]); * ``` */ async *readReplies(raw = false): AsyncIterableIterator<Reply> { while (true) { yield readReply(this.#lines, raw); } } /** * Pipelines commands to the Redis server and returns the replies. * * See * {@link https://redis.io/docs/latest/develop/use/pipelining/ | Redis pipelining} * for more information. * * @example Basic usage * * ```ts ignore * import { RedisClient } from "@iuioiua/redis"; * import { assertEquals } from "@std/assert/equals"; * * const redisConn = await Deno.connect({ port: 6379 }); * const redisClient = new RedisClient(redisConn); * * const replies = await redisClient.pipelineCommands([ * ["INCR", "Y"], * ["INCR", "Y"], * ["INCR", "Y"], * ["INCR", "Y"], * ]); * assertEquals(replies, [1, 2, 3, 4]); * ``` */ pipelineCommands(commands: Command[], raw = false): Promise<Reply[]> { return this.#enqueue(() => { const bytes = concat(commands.map(createRequest)); this.#writer.write(bytes); return readNReplies(this.#lines, commands.length, raw); }); } }