Skip to main content
Home
Works with
It is unknown whether this package works with Cloudflare Workers, Node.js, Deno, Bun, Browsers
It is unknown whether this package works with Cloudflare Workers
It is unknown whether this package works with Node.js
It is unknown whether this package works with Deno
It is unknown whether this package works with Bun
It is unknown whether this package works with Browsers
JSR Score41%
Publisheda month ago (0.1.0)
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213
import { ShapeStream, isChangeMessage, isControlMessage, isVisibleInSnapshot, } from "npm:@electric-sql/client@^1.2.0"; import { Store } from "npm:@tanstack/store@^0.8.0"; import DebugModule from "npm:debug@^4.4.3"; import { DeduplicatedLoadSubset } from "npm:@tanstack/db@^0.5.9"; import { ExpectedNumberInAwaitTxIdError, StreamAbortedError, TimeoutWaitingForMatchError, TimeoutWaitingForTxIdError, } from "./errors.ts"; import { compileSQLWithColumnAliases } from "./sql-compiler.ts"; import type { BaseCollectionConfig, CollectionConfig, DeleteMutationFnParams, InsertMutationFnParams, LoadSubsetOptions, SyncConfig, SyncMode, UpdateMutationFnParams, UtilsRecord, } from "npm:@tanstack/db@^0.5.9"; import type { StandardSchemaV1 } from "npm:@standard-schema/spec@^1.0.0"; import type { ControlMessage, GetExtensions, Message, PostgresSnapshot, Row, ShapeStreamOptions, } from "npm:@electric-sql/client@^1.2.0"; // Re-export for user convenience in custom match functions export { isChangeMessage, isControlMessage } from "npm:@electric-sql/client@^1.2.0"; const debug = DebugModule.debug(`ts/db:electric`); /** * Symbol for internal test hooks (hidden from public API) */ export const ELECTRIC_TEST_HOOKS: Symbol = Symbol(`electricTestHooks`); /** * Internal test hooks interface (for testing only) */ export interface ElectricTestHooks { /** * Called before marking collection ready after first up-to-date in progressive mode * Allows tests to pause and validate snapshot phase before atomic swap completes */ beforeMarkingReady?: () => Promise<void>; } type GelSchemaMapping = { [pgTableName: string]: { name: string; properties: Record<string, string>; links: Record<string, string>; }; }; type GelReverseSchemaMapping = { [gelObjectName: string]: { id: string; properties: Record<string, string>; links: Record<string, string>; }; }; /** * Type representing a transaction ID in ElectricSQL */ export type Txid = number; /** * Custom match function type - receives stream messages and returns boolean * indicating if the mutation has been synchronized */ export type MatchFunction<T extends Row<unknown>> = ( message: Message<T>, ) => boolean; /** * Matching strategies for Electric synchronization * Handlers can return: * - Txid strategy: { txid: number | number[], timeout?: number } (recommended) * - Void (no return value) - mutation completes without waiting * * The optional timeout property specifies how long to wait for the txid(s) in milliseconds. * If not specified, defaults to 5000ms. */ export type MatchingStrategy = { txid: Txid | Array<Txid>; timeout?: number; } | void; /** * Type representing a snapshot end message */ type SnapshotEndMessage = ControlMessage & { headers: { control: `snapshot-end` }; }; // The `InferSchemaOutput` and `ResolveType` are copied from the `@tanstack/db` package // but we modified `InferSchemaOutput` slightly to restrict the schema output to `Row<unknown>` // This is needed in order for `GetExtensions` to be able to infer the parser extensions type from the schema type InferSchemaOutput<T> = T extends StandardSchemaV1 ? StandardSchemaV1.InferOutput<T> extends Row<unknown> ? StandardSchemaV1.InferOutput<T> : Record<string, unknown> : Record<string, unknown>; /** * The mode of sync to use for the collection. * @default `eager` * @description * - `eager`: * - syncs all data immediately on preload * - collection will be marked as ready once the sync is complete * - there is no incremental sync * - `on-demand`: * - syncs data in incremental snapshots when the collection is queried * - collection will be marked as ready immediately after the first snapshot is synced * - `progressive`: * - syncs all data for the collection in the background * - uses incremental snapshots during the initial sync to provide a fast path to the data required for queries * - collection will be marked as ready once the full sync is complete */ export type ElectricSyncMode = SyncMode | `progressive`; /** * Configuration interface for Electric collection options * @template T - The type of items in the collection * @template TSchema - The schema type for validation */ export interface ElectricCollectionConfig< T extends Row<unknown> = Row<unknown>, TSchema extends StandardSchemaV1 = never, > extends Omit< BaseCollectionConfig< T, string | number, TSchema, ElectricCollectionUtils<T>, any >, `onInsert` | `onUpdate` | `onDelete` | `syncMode` > { /** * Configuration options for the ElectricSQL ShapeStream */ shapeOptions: ShapeStreamOptions<GetExtensions<T>>; syncMode?: ElectricSyncMode; /** * Internal test hooks (for testing only) * Hidden via Symbol to prevent accidental usage in production */ [ELECTRIC_TEST_HOOKS]?: ElectricTestHooks; /** * Optional asynchronous handler function called before an insert operation * @param params Object containing transaction and collection information * @returns Promise resolving to { txid, timeout? } or void * @example * // Basic Electric insert handler with txid (recommended) * onInsert: async ({ transaction }) => { * const newItem = transaction.mutations[0].modified * const result = await api.todos.create({ * data: newItem * }) * return { txid: result.txid } * } * * @example * // Insert handler with custom timeout * onInsert: async ({ transaction }) => { * const newItem = transaction.mutations[0].modified * const result = await api.todos.create({ * data: newItem * }) * return { txid: result.txid, timeout: 10000 } // Wait up to 10 seconds * } * * @example * // Insert handler with multiple items - return array of txids * onInsert: async ({ transaction }) => { * const items = transaction.mutations.map(m => m.modified) * const results = await Promise.all( * items.map(item => api.todos.create({ data: item })) * ) * return { txid: results.map(r => r.txid) } * } * * @example * // Use awaitMatch utility for custom matching * onInsert: async ({ transaction, collection }) => { * const newItem = transaction.mutations[0].modified * await api.todos.create({ data: newItem }) * await collection.utils.awaitMatch( * (message) => isChangeMessage(message) && * message.headers.operation === 'insert' && * message.value.name === newItem.name * ) * } */ onInsert?: ( params: InsertMutationFnParams< T, string | number, ElectricCollectionUtils<T> >, ) => Promise<MatchingStrategy>; /** * Optional asynchronous handler function called before an update operation * @param params Object containing transaction and collection information * @returns Promise resolving to { txid, timeout? } or void * @example * // Basic Electric update handler with txid (recommended) * onUpdate: async ({ transaction }) => { * const { original, changes } = transaction.mutations[0] * const result = await api.todos.update({ * where: { id: original.id }, * data: changes * }) * return { txid: result.txid } * } * * @example * // Use awaitMatch utility for custom matching * onUpdate: async ({ transaction, collection }) => { * const { original, changes } = transaction.mutations[0] * await api.todos.update({ where: { id: original.id }, data: changes }) * await collection.utils.awaitMatch( * (message) => isChangeMessage(message) && * message.headers.operation === 'update' && * message.value.id === original.id * ) * } */ onUpdate?: ( params: UpdateMutationFnParams< T, string | number, ElectricCollectionUtils<T> >, ) => Promise<MatchingStrategy>; /** * Optional asynchronous handler function called before a delete operation * @param params Object containing transaction and collection information * @returns Promise resolving to { txid, timeout? } or void * @example * // Basic Electric delete handler with txid (recommended) * onDelete: async ({ transaction }) => { * const mutation = transaction.mutations[0] * const result = await api.todos.delete({ * id: mutation.original.id * }) * return { txid: result.txid } * } * * @example * // Use awaitMatch utility for custom matching * onDelete: async ({ transaction, collection }) => { * const mutation = transaction.mutations[0] * await api.todos.delete({ id: mutation.original.id }) * await collection.utils.awaitMatch( * (message) => isChangeMessage(message) && * message.headers.operation === 'delete' && * message.value.id === mutation.original.id * ) * } */ onDelete?: ( params: DeleteMutationFnParams< T, string | number, ElectricCollectionUtils<T> >, ) => Promise<MatchingStrategy>; } function isUpToDateMessage<T extends Row<unknown>>( message: Message<T>, ): message is ControlMessage & { up_to_date: true } { return isControlMessage(message) && message.headers.control === `up-to-date`; } function isMustRefetchMessage<T extends Row<unknown>>( message: Message<T>, ): message is ControlMessage & { headers: { control: `must-refetch` } } { return ( isControlMessage(message) && message.headers.control === `must-refetch` ); } function isSnapshotEndMessage<T extends Row<unknown>>( message: Message<T>, ): message is SnapshotEndMessage { return ( isControlMessage(message) && message.headers.control === `snapshot-end` ); } function parseSnapshotMessage(message: SnapshotEndMessage): PostgresSnapshot { return { xmin: message.headers.xmin, xmax: message.headers.xmax, xip_list: message.headers.xip_list, }; } // Check if a message contains txids in its headers function hasTxids<T extends Row<unknown>>( message: Message<T>, ): message is Message<T> & { headers: { txids?: Array<Txid> } } { return `txids` in message.headers && Array.isArray(message.headers.txids); } /** * Creates a deduplicated loadSubset handler for progressive/on-demand modes * Returns null for eager mode, or a DeduplicatedLoadSubset instance for other modes. * Handles fetching snapshots in progressive mode during buffering phase, * and requesting snapshots in on-demand mode */ function createLoadSubsetDedupe<T extends Row<unknown>>({ stream, syncMode, isBufferingInitialSync, begin, write, commit, collectionId, gelColumnAliases = {}, }: { stream: ShapeStream<T>; syncMode: ElectricSyncMode; isBufferingInitialSync: () => boolean; begin: () => void; write: (mutation: { type: `insert` | `update` | `delete`; value: T; metadata: Record<string, unknown>; }) => void; commit: () => void; collectionId?: string; gelColumnAliases?: Record<string, string>; }): DeduplicatedLoadSubset | null { // Eager mode doesn't need subset loading if (syncMode === `eager`) { return null; } // close over gelColumnAliases so this function still matches protocol const loadSubset = async (opts: LoadSubsetOptions) => { // In progressive mode, use fetchSnapshot during snapshot phase if (isBufferingInitialSync()) { // Progressive mode snapshot phase: fetch and apply immediately const snapshotParams = compileSQLWithColumnAliases<T>( opts, gelColumnAliases, ); try { const { data: rows } = await stream.fetchSnapshot(snapshotParams); // Check again if we're still buffering - we might have received up-to-date // and completed the atomic swap while waiting for the snapshot if (!isBufferingInitialSync()) { debug( `${collectionId ? `[${collectionId}] ` : ``}Ignoring snapshot - sync completed while fetching`, ); return; } // Apply snapshot data in a sync transaction (only if we have data) if (rows.length > 0) { begin(); for (const row of rows) { write({ type: `insert`, value: row.value, metadata: { ...row.headers, }, }); } commit(); debug( `${collectionId ? `[${collectionId}] ` : ``}Applied snapshot with ${rows.length} rows`, ); } } catch (error) { debug( `${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`, error, ); throw error; } } else if (syncMode === `progressive`) { // Progressive mode after full sync complete: no need to load more return; } else { // On-demand mode: use requestSnapshot const snapshotParams = compileSQLWithColumnAliases<T>( opts, gelColumnAliases, ); await stream.requestSnapshot(snapshotParams); } }; return new DeduplicatedLoadSubset({ loadSubset }); } /** * Type for the awaitTxId utility function */ export type AwaitTxIdFn = (txId: Txid, timeout?: number) => Promise<boolean>; /** * Type for the awaitMatch utility function */ export type AwaitMatchFn<T extends Row<unknown>> = ( matchFn: MatchFunction<T>, timeout?: number, ) => Promise<boolean>; /** * Electric collection utilities type */ export interface ElectricCollectionUtils< T extends Row<unknown> = Row<unknown>, > extends UtilsRecord { awaitTxId: AwaitTxIdFn; awaitMatch: AwaitMatchFn<T>; } /** * Creates Electric collection options for use with a standard Collection * * @template T - The explicit type of items in the collection (highest priority) * @template TSchema - The schema type for validation and type inference (second priority) * @template TFallback - The fallback type if no explicit or schema type is provided * @param config - Configuration options for the Electric collection * @returns Collection options with utilities */ type GelCollectionConfig = { gelEntityName: string; gelInternalSchema: GelReverseSchemaMapping; }; export function gelElectricCollectionOptions<TSchema extends StandardSchemaV1>( config: ElectricCollectionConfig<InferSchemaOutput<TSchema>, TSchema> & GelCollectionConfig & { schema: TSchema; }, ): Omit< CollectionConfig<InferSchemaOutput<TSchema>, string | number, TSchema>, "utils" > & { id?: string; utils: ElectricCollectionUtils<InferSchemaOutput<TSchema>>; schema: TSchema; }; // Overload for when no schema is provided export function gelElectricCollectionOptions<TRow extends Row<unknown>>( config: ElectricCollectionConfig<TRow> & GelCollectionConfig & { schema?: never; // prohibit schema }, ): Omit<CollectionConfig<TRow, string | number>, "utils"> & { id?: string; utils: ElectricCollectionUtils<TRow>; schema?: never; // no schema in the result }; export function gelElectricCollectionOptions<T extends Row<unknown>>( config: ElectricCollectionConfig<T, any> & GelCollectionConfig, ): Omit< CollectionConfig<T, string | number, any, ElectricCollectionUtils<T>>, `utils` > & { id?: string; utils: ElectricCollectionUtils<T>; schema?: any; } { const seenTxids = new Store<Set<Txid>>(new Set([])); const seenSnapshots = new Store<Array<PostgresSnapshot>>([]); const internalSyncMode = config.syncMode ?? `eager`; const finalSyncMode = internalSyncMode === `progressive` ? `on-demand` : internalSyncMode; const pendingMatches = new Store< Map< string, { matchFn: (message: Message<any>) => boolean; resolve: (value: boolean) => void; reject: (error: Error) => void; timeoutId: ReturnType<typeof setTimeout>; matched: boolean; } > >(new Map()); // Buffer messages since last up-to-date to handle race conditions const currentBatchMessages = new Store<Array<Message<any>>>([]); /** * Helper function to remove multiple matches from the pendingMatches store */ const removePendingMatches = (matchIds: Array<string>) => { if (matchIds.length > 0) { pendingMatches.setState((current) => { const newMatches = new Map(current); matchIds.forEach((id) => newMatches.delete(id)); return newMatches; }); } }; /** * Helper function to resolve and cleanup matched pending matches */ const resolveMatchedPendingMatches = () => { const matchesToResolve: Array<string> = []; pendingMatches.state.forEach((match, matchId) => { if (match.matched) { clearTimeout(match.timeoutId); match.resolve(true); matchesToResolve.push(matchId); debug( `${config.id ? `[${config.id}] ` : ``}awaitMatch resolved on up-to-date for match %s`, matchId, ); } }); removePendingMatches(matchesToResolve); }; const sync = createGelElectricSync<T>(config.shapeOptions, { seenTxids, seenSnapshots, syncMode: internalSyncMode, pendingMatches, currentBatchMessages, removePendingMatches, resolveMatchedPendingMatches, collectionId: config.id, testHooks: config[ELECTRIC_TEST_HOOKS], internalGelSchema: config.gelInternalSchema, gelEntityName: config.gelEntityName, }); /** * Wait for a specific transaction ID to be synced * @param txId The transaction ID to wait for as a number * @param timeout Optional timeout in milliseconds (defaults to 5000ms) * @returns Promise that resolves when the txId is synced */ const awaitTxId: AwaitTxIdFn = async ( txId: Txid, timeout: number = 5000, ): Promise<boolean> => { debug( `${config.id ? `[${config.id}] ` : ``}awaitTxId called with txid %d`, txId, ); if (typeof txId !== `number`) { throw new ExpectedNumberInAwaitTxIdError(typeof txId, config.id); } // First check if the txid is in the seenTxids store const hasTxid = seenTxids.state.has(txId); if (hasTxid) return true; // Then check if the txid is in any of the seen snapshots const hasSnapshot = seenSnapshots.state.some((snapshot) => isVisibleInSnapshot(txId, snapshot), ); if (hasSnapshot) return true; return new Promise((resolve, reject) => { const timeoutId = setTimeout(() => { unsubscribeSeenTxids(); unsubscribeSeenSnapshots(); reject(new TimeoutWaitingForTxIdError(txId, config.id)); }, timeout); const unsubscribeSeenTxids = seenTxids.subscribe(() => { if (seenTxids.state.has(txId)) { debug( `${config.id ? `[${config.id}] ` : ``}awaitTxId found match for txid %o`, txId, ); clearTimeout(timeoutId); unsubscribeSeenTxids(); unsubscribeSeenSnapshots(); resolve(true); } }); const unsubscribeSeenSnapshots = seenSnapshots.subscribe(() => { const visibleSnapshot = seenSnapshots.state.find((snapshot) => isVisibleInSnapshot(txId, snapshot), ); if (visibleSnapshot) { debug( `${config.id ? `[${config.id}] ` : ``}awaitTxId found match for txid %o in snapshot %o`, txId, visibleSnapshot, ); clearTimeout(timeoutId); unsubscribeSeenSnapshots(); unsubscribeSeenTxids(); resolve(true); } }); }); }; /** * Wait for a custom match function to find a matching message * @param matchFn Function that returns true when a message matches * @param timeout Optional timeout in milliseconds (defaults to 5000ms) * @returns Promise that resolves when a matching message is found */ const awaitMatch: AwaitMatchFn<any> = async ( matchFn: MatchFunction<any>, timeout: number = 3000, ): Promise<boolean> => { debug( `${config.id ? `[${config.id}] ` : ``}awaitMatch called with custom function`, ); return new Promise((resolve, reject) => { const matchId = Math.random().toString(36); const cleanupMatch = () => { pendingMatches.setState((current) => { const newMatches = new Map(current); newMatches.delete(matchId); return newMatches; }); }; const onTimeout = () => { cleanupMatch(); reject(new TimeoutWaitingForMatchError(config.id)); }; const timeoutId = setTimeout(onTimeout, timeout); // We need access to the stream messages to check against the match function // This will be handled by the sync configuration const checkMatch = (message: Message<any>) => { if (matchFn(message)) { debug( `${config.id ? `[${config.id}] ` : ``}awaitMatch found matching message, waiting for up-to-date`, ); // Mark as matched but don't resolve yet - wait for up-to-date pendingMatches.setState((current) => { const newMatches = new Map(current); const existing = newMatches.get(matchId); if (existing) { newMatches.set(matchId, { ...existing, matched: true }); } return newMatches; }); return true; } return false; }; // Check against current batch messages first to handle race conditions for (const message of currentBatchMessages.state) { if (matchFn(message)) { debug( `${config.id ? `[${config.id}] ` : ``}awaitMatch found immediate match in current batch, waiting for up-to-date`, ); // Register match as already matched pendingMatches.setState((current) => { const newMatches = new Map(current); newMatches.set(matchId, { matchFn: checkMatch, resolve, reject, timeoutId, matched: true, // Already matched }); return newMatches; }); return; } } // Store the match function for the sync process to use // We'll add this to a pending matches store pendingMatches.setState((current) => { const newMatches = new Map(current); newMatches.set(matchId, { matchFn: checkMatch, resolve, reject, timeoutId, matched: false, }); return newMatches; }); }); }; /** * Process matching strategy and wait for synchronization */ const processMatchingStrategy = async ( result: MatchingStrategy, ): Promise<void> => { // Only wait if result contains txid if (result && `txid` in result) { const timeout = result.timeout; // Handle both single txid and array of txids if (Array.isArray(result.txid)) { await Promise.all(result.txid.map((txid) => awaitTxId(txid, timeout))); } else { await awaitTxId(result.txid, timeout); } } // If result is void/undefined, don't wait - mutation completes immediately }; // Create wrapper handlers for direct persistence operations that handle different matching strategies const wrappedOnInsert = config.onInsert ? async ( params: InsertMutationFnParams< any, string | number, ElectricCollectionUtils<T> >, ) => { const handlerResult = await config.onInsert!(params); await processMatchingStrategy(handlerResult); return handlerResult; } : undefined; const wrappedOnUpdate = config.onUpdate ? async ( params: UpdateMutationFnParams< any, string | number, ElectricCollectionUtils<T> >, ) => { const handlerResult = await config.onUpdate!(params); await processMatchingStrategy(handlerResult); return handlerResult; } : undefined; const wrappedOnDelete = config.onDelete ? async ( params: DeleteMutationFnParams< any, string | number, ElectricCollectionUtils<T> >, ) => { const handlerResult = await config.onDelete!(params); await processMatchingStrategy(handlerResult); return handlerResult; } : undefined; // Extract standard Collection config properties const { shapeOptions: _shapeOptions, onInsert: _onInsert, onUpdate: _onUpdate, onDelete: _onDelete, ...restConfig } = config; return { ...restConfig, syncMode: finalSyncMode, sync, onInsert: wrappedOnInsert, onUpdate: wrappedOnUpdate, onDelete: wrappedOnDelete, utils: { awaitTxId, awaitMatch, }, }; } /** * Internal function to create ElectricSQL sync configuration */ function createGelElectricSync<T extends Row<unknown>>( shapeOptions: ShapeStreamOptions<GetExtensions<T>>, options: { syncMode: ElectricSyncMode; seenTxids: Store<Set<Txid>>; seenSnapshots: Store<Array<PostgresSnapshot>>; pendingMatches: Store< Map< string, { matchFn: (message: Message<T>) => boolean; resolve: (value: boolean) => void; reject: (error: Error) => void; timeoutId: ReturnType<typeof setTimeout>; matched: boolean; } > >; currentBatchMessages: Store<Array<Message<T>>>; removePendingMatches: (matchIds: Array<string>) => void; resolveMatchedPendingMatches: () => void; collectionId?: string; testHooks?: ElectricTestHooks; internalGelSchema: GelReverseSchemaMapping; gelEntityName: string; }, ): SyncConfig<T> { const { seenTxids, seenSnapshots, syncMode, pendingMatches, currentBatchMessages, removePendingMatches, resolveMatchedPendingMatches, collectionId, testHooks, internalGelSchema, gelEntityName, } = options; const MAX_BATCH_MESSAGES = 1000; // Safety limit for message buffer // Store for the relation schema information const relationSchema = new Store<string | undefined>(undefined); /** * Get the sync metadata for insert operations * @returns Record containing relation information */ const getSyncMetadata = (): Record<string, unknown> => { // Use the stored schema if available, otherwise default to 'public' const schema = relationSchema.state || `public`; return { relation: shapeOptions.params?.table ? [schema, shapeOptions.params.table] : undefined, }; }; let unsubscribeStream: () => void; return { sync: (params: Parameters<SyncConfig<T>[`sync`]>[0]) => { const { begin, write, commit, markReady, truncate, collection } = params; // Wrap markReady to wait for test hook in progressive mode let progressiveReadyGate: Promise<void> | null = null; const wrappedMarkReady = (isBuffering: boolean) => { // Only create gate if we're in buffering phase (first up-to-date) if ( isBuffering && syncMode === `progressive` && testHooks?.beforeMarkingReady ) { // Create a new gate promise for this sync cycle progressiveReadyGate = testHooks.beforeMarkingReady(); progressiveReadyGate.then(() => { markReady(); }); } else { // No hook, not buffering, or already past first up-to-date markReady(); } }; // Abort controller for the stream - wraps the signal if provided const abortController = new AbortController(); if (shapeOptions.signal) { shapeOptions.signal.addEventListener( `abort`, () => { abortController.abort(); }, { once: true, }, ); if (shapeOptions.signal.aborted) { abortController.abort(); } } // Cleanup pending matches on abort abortController.signal.addEventListener(`abort`, () => { pendingMatches.setState((current) => { current.forEach((match) => { clearTimeout(match.timeoutId); match.reject(new StreamAbortedError()); }); return new Map(); // Clear all pending matches }); }); const stream = new ShapeStream({ ...shapeOptions, // In on-demand mode, we only want to sync changes, so we set the log to `changes_only` log: syncMode === `on-demand` ? `changes_only` : undefined, // In on-demand mode, we only need the changes from the point of time the collection was created // so we default to `now` when there is no saved offset. offset: shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined), signal: abortController.signal, onError: (errorParams) => { // Just immediately mark ready if there's an error to avoid blocking // apps waiting for `.preload()` to finish. // Note that Electric sends a 409 error on a `must-refetch` message, but the // ShapeStream handled this and it will not reach this handler, therefor // this markReady will not be triggers by a `must-refetch`. markReady(); if (shapeOptions.onError) { return shapeOptions.onError(errorParams); } else { console.error( `An error occurred while syncing collection: ${collection.id}, \n` + `it has been marked as ready to avoid blocking apps waiting for '.preload()' to finish. \n` + `You can provide an 'onError' handler on the shapeOptions to handle this error, and this message will not be logged.`, errorParams, ); } return; }, }); let transactionStarted = false; const newTxids = new Set<Txid>(); const newSnapshots: Array<PostgresSnapshot> = []; let hasReceivedUpToDate = false; // Track if we've completed initial sync in progressive mode // Progressive mode state // Helper to determine if we're buffering the initial sync const isBufferingInitialSync = () => syncMode === `progressive` && !hasReceivedUpToDate; const bufferedMessages: Array<Message<T>> = []; // Buffer change messages during initial sync const aliasesForThisTable = internalGelSchema[gelEntityName]; if (!aliasesForThisTable) throw new Error("Gel object not found in schema"); // Create deduplicated loadSubset wrapper for non-eager modes // This prevents redundant snapshot requests when multiple concurrent // live queries request overlapping or subset predicates const loadSubsetDedupe = createLoadSubsetDedupe({ stream, syncMode, isBufferingInitialSync, begin, write, commit, collectionId, gelColumnAliases: { ...aliasesForThisTable.links, ...aliasesForThisTable.properties, }, }); unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => { let hasUpToDate = false; let hasSnapshotEnd = false; for (const message of messages) { // Add message to current batch buffer (for race condition handling) if (isChangeMessage(message)) { currentBatchMessages.setState((currentBuffer) => { const newBuffer = [...currentBuffer, message]; // Limit buffer size for safety if (newBuffer.length > MAX_BATCH_MESSAGES) { newBuffer.splice(0, newBuffer.length - MAX_BATCH_MESSAGES); } return newBuffer; }); } // Check for txids in the message and add them to our store // Skip during buffered initial sync in progressive mode (txids will be extracted during atomic swap) if (hasTxids(message) && !isBufferingInitialSync()) { message.headers.txids?.forEach((txid) => newTxids.add(txid)); } // Check pending matches against this message // Note: matchFn will mark matches internally, we don't resolve here const matchesToRemove: Array<string> = []; pendingMatches.state.forEach((match, matchId) => { if (!match.matched) { try { match.matchFn(message); } catch (err) { // If matchFn throws, clean up and reject the promise clearTimeout(match.timeoutId); match.reject( err instanceof Error ? err : new Error(String(err)), ); matchesToRemove.push(matchId); debug(`matchFn error: %o`, err); } } }); // Remove matches that errored removePendingMatches(matchesToRemove); if (isChangeMessage(message)) { // Check if the message contains schema information const schema = message.headers.schema; if (schema && typeof schema === `string`) { // Store the schema for future use if it's a valid string relationSchema.setState(() => schema); } // In buffered initial sync of progressive mode, buffer messages instead of writing if (isBufferingInitialSync()) { bufferedMessages.push(message); } else { // Normal processing: write changes immediately if (!transactionStarted) { begin(); transactionStarted = true; } write({ type: message.headers.operation, value: message.value, // Include the primary key and relation info in the metadata metadata: { ...message.headers, }, }); } } else if (isSnapshotEndMessage(message)) { // Skip snapshot-end tracking during buffered initial sync (will be extracted during atomic swap) if (!isBufferingInitialSync()) { newSnapshots.push(parseSnapshotMessage(message)); } hasSnapshotEnd = true; } else if (isUpToDateMessage(message)) { hasUpToDate = true; } else if (isMustRefetchMessage(message)) { debug( `${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate`, ); // Start a transaction and truncate the collection if (!transactionStarted) { begin(); transactionStarted = true; } truncate(); // Reset the loadSubset deduplication state since we're starting fresh // This ensures that previously loaded predicates don't prevent refetching after truncate loadSubsetDedupe?.reset(); // Reset flags so we continue accumulating changes until next up-to-date hasUpToDate = false; hasSnapshotEnd = false; hasReceivedUpToDate = false; // Reset for progressive mode (isBufferingInitialSync will reflect this) bufferedMessages.length = 0; // Clear buffered messages } } if (hasUpToDate || hasSnapshotEnd) { // PROGRESSIVE MODE: Atomic swap on first up-to-date if (isBufferingInitialSync() && hasUpToDate) { debug( `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages`, ); // Start atomic swap transaction begin(); // Truncate to clear all snapshot data truncate(); // Apply all buffered change messages and extract txids/snapshots for (const bufferedMsg of bufferedMessages) { if (isChangeMessage(bufferedMsg)) { write({ type: bufferedMsg.headers.operation, value: bufferedMsg.value, metadata: { ...bufferedMsg.headers, }, }); // Extract txids from buffered messages (will be committed to store after transaction) if (hasTxids(bufferedMsg)) { bufferedMsg.headers.txids?.forEach((txid) => newTxids.add(txid), ); } } else if (isSnapshotEndMessage(bufferedMsg)) { // Extract snapshots from buffered messages (will be committed to store after transaction) newSnapshots.push(parseSnapshotMessage(bufferedMsg)); } } // Commit the atomic swap commit(); // Exit buffering phase by marking that we've received up-to-date // isBufferingInitialSync() will now return false bufferedMessages.length = 0; debug( `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Atomic swap complete, now in normal sync mode`, ); } else { // Normal mode or on-demand: commit transaction if one was started // In eager mode, only commit on snapshot-end if we've already received // the first up-to-date, because the snapshot-end in the log could be from // a significant period before the stream is actually up to date const shouldCommit = hasUpToDate || syncMode === `on-demand` || hasReceivedUpToDate; if (transactionStarted && shouldCommit) { commit(); transactionStarted = false; } } // Clear the current batch buffer since we're now up-to-date currentBatchMessages.setState(() => []); if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) { // Mark the collection as ready now that sync is up to date wrappedMarkReady(isBufferingInitialSync()); } // Track that we've received the first up-to-date for progressive mode if (hasUpToDate) { hasReceivedUpToDate = true; } // Always commit txids when we receive up-to-date, regardless of transaction state seenTxids.setState((currentTxids) => { const clonedSeen = new Set<Txid>(currentTxids); if (newTxids.size > 0) { debug( `${collectionId ? `[${collectionId}] ` : ``}new txids synced from pg %O`, Array.from(newTxids), ); } newTxids.forEach((txid) => clonedSeen.add(txid)); newTxids.clear(); return clonedSeen; }); // Always commit snapshots when we receive up-to-date, regardless of transaction state seenSnapshots.setState((currentSnapshots) => { const seen = [...currentSnapshots, ...newSnapshots]; newSnapshots.forEach((snapshot) => debug( `${collectionId ? `[${collectionId}] ` : ``}new snapshot synced from pg %o`, snapshot, ), ); newSnapshots.length = 0; return seen; }); // Resolve all matched pending matches on up-to-date resolveMatchedPendingMatches(); } }); // Return the deduplicated loadSubset if available (on-demand or progressive mode) // The loadSubset method is auto-bound, so it can be safely returned directly return { loadSubset: loadSubsetDedupe?.loadSubset, cleanup: () => { // Unsubscribe from the stream unsubscribeStream(); // Abort the abort controller to stop the stream abortController.abort(); // Reset deduplication tracking so collection can load fresh data if restarted loadSubsetDedupe?.reset(); }, }; }, // Expose the getSyncMetadata function getSyncMetadata, }; }