@aklinker1/job-queue@0.6.9Built and signed on GitHub ActionsBuilt and signed on GitHub Actions
Lightweight single process job queue
@aklinker1/job-queue
Docs • Changelog • API Reference
Lightweight single process job queue.
Features
- 💾 Persistence
- 🎛️ Multiple queues with adjustable weights
- 🔄 Error handling and retries
- 🦕 Deno, Bun, and Node support
- 🖥️ Web UI
Usage
Basic Usage
import { createJobQueue } from "@aklinker1/job-queue"; import { createSqlitePersister } from "@aklinker1/job-queue/persisters/sqlite"; import { Database } from "@db/sqlite"; // 1. Open database const db = new Database("queue.db", { int64: true }); // 2. Create a queue const queue = createJobQueue({ persister: createSqlitePersister(db), }); // 3. Define a job const processDocumentJob = queue.defineJob({ name: "processDocument", perform: (file: string) => { // ... }, }); // 4. Run the job processDocumentJob.performAsync("/path/to/file.pdf");
Jobs
Use queue.defineJob to create as many jobs as you need. Jobs can be chained together:
import { walk, type WalkEntry } from "@std/fs"; const processDirectory = queue.defineJob({ name: "processDirectory", perform: async (dir: string) => { const entries = await walk(dir, { exts: [".pdf"], }); entries.forEach((entry) => { processPdf.performAsync(entry); }); }, }); const processPdf = queue.defineJob({ name: "processPdf", perform: async (entry: WalkEntry) => { // Do something with the PDF }, });
Job arguments must be serializable via JSON.stringify. So you can't pass class instances, circular objects, or functions.
Jobs must be idempotent (they must be safe to re-run). If the application is stopped (from power loss, the app restarts, or from failure), jobs will be interrupted half-way through and they must be designed to be re-ran safely.
Basically, each job is guaranteed to ran at least once, but not only once, and you need to design your jobs around this behavior.
Scheduling Jobs
There are three ways to schedule a job to run:
performAsync(...args)run a job ASAPperformIn(msec, ...args)run a job after a durationperformOn(date, ...args)run a job at a specific date
Cron
This library does not provide any APIs for scheduling or running tasks on an interval. Instead, use an existing library like croner (available on both NPM and JSR) to schedule tasks:
import { Cron } from "croner"; const exampleJob = queue.defineJob({ // ... }); // Schedule to run every day at midnight new Cron("@daily", () => exampleJob.performAsync());
Error Handling
By default, a job is retried 25 times over 21 days with an exponential backoff, same as Sidekiq. Once it has failed 25 times, it will be marked as "dead" and can be re-ran via the JS API or the Web UI.
You can't customize the backoff behavior, but you can customize the max retry count globally or per job.
const queue = createJobQueue({ // ... retry: 5, }); // Will retry 5 times (6 runs in total) const job1 = queue.defineJob({ // ... }); // Will retry 10 times (11 runs in total) const job2 = queue.defineJob({ // ... retry: 10, }); // Never retry (only run once) const job3 = queue.defineJob({ // ... retry: false, });
Add Dashboard to Web App

See createServer docs.
Runtimes
You can use @aklinker1/job-queue in your runtime of choice. Just use any of the compatible database packages:
- Deno:
@db/sqlite - Bun:
bun:sqlite - Node:
better-sqlite3
Deno
import { createJobQueue } from "@aklinker1/job-queue"; import { createSqlitePersister } from "@aklinker1/job-queue/persisters/sqlite"; import { Database } from "@db/sqlite"; const db = new Database("queue.db", { // Required - @aklinker1/job-queue uses integer columns to store timestamps, // which will overflow the int32 data type used by default int64: true, }); const queue = createJobQueue({ persister: createSqlitePersister(db), });
Bun
import { createJobQueue } from "@aklinker1/job-queue"; import { createSqlitePersister } from "@aklinker1/job-queue/persisters/sqlite"; import { Database } from "bun:sqlite"; const db = new Database("queue.db"); const queue = createJobQueue({ persister: createSqlitePersister(db), });
NodeJS
import { createJobQueue } from "@aklinker1/job-queue"; import { createSqlitePersister } from "@aklinker1/job-queue/persisters/sqlite"; import Database from "better-sqlite3"; const db = new Database("queue.db"); const queue = createJobQueue({ persister: createSqlitePersister(db), });
Add Package
deno add jsr:@aklinker1/job-queue
Import symbol
import * as job_queue from "@aklinker1/job-queue";
Import directly with a jsr specifier
import * as job_queue from "jsr:@aklinker1/job-queue";
Add Package
pnpm i jsr:@aklinker1/job-queue
pnpm dlx jsr add @aklinker1/job-queue
Import symbol
import * as job_queue from "@aklinker1/job-queue";
Add Package
yarn add jsr:@aklinker1/job-queue
yarn dlx jsr add @aklinker1/job-queue
Import symbol
import * as job_queue from "@aklinker1/job-queue";
Add Package
vlt install jsr:@aklinker1/job-queue
Import symbol
import * as job_queue from "@aklinker1/job-queue";
Add Package
npx jsr add @aklinker1/job-queue
Import symbol
import * as job_queue from "@aklinker1/job-queue";
Add Package
bunx jsr add @aklinker1/job-queue
Import symbol
import * as job_queue from "@aklinker1/job-queue";