Skip to main content
Home
This release is 10 versions behind 0.3.130 — the latest version of @paimaexample/orchestrator. Jump to latest
Works with
It is unknown whether this package works with Cloudflare Workers, Node.js, Deno, Bun, Browsers
It is unknown whether this package works with Cloudflare Workers
It is unknown whether this package works with Node.js
It is unknown whether this package works with Deno
It is unknown whether this package works with Bun
It is unknown whether this package works with Browsers
JSR Score0%
Downloads3/wk
Publisheda month ago (0.3.120)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
#!/usr/bin/env -S deno run --allow-all import { ENV } from "jsr:@paimaexample/utils@^0.3.120/node-env"; import type { ValueOf } from "jsr:@paimaexample/utils@^0.3.120"; import "./http-server.ts"; import { dkill } from "jsr:@sylc/dkill@^0.12.3"; import { initTelemetry, logHandler, setCollectorStarted, setTUIStarted, setStdoutOutput, setStderrOutput, tsLogOrchestratorAdapter, } from "./logging.ts"; import { $, AbortProcessStart, type ProcessComponent, setForegroundProcess, shutdown, } from "./process.ts"; import { ComponentNames } from "jsr:@paimaexample/log@^0.3.120"; import { Tmux } from "./tmux/tmux.ts"; import type { LaunchableComponents } from "jsr:@paimaexample/log@^0.3.120"; import { type Static, Type } from "npm:@sinclair/typebox@0.34.41"; let appConfig: OrchestratorConfigType | null = null; let pFactory: ReturnType<typeof processFactory> | null = null; const ProcessLaunch = Type.Object({ name: Type.String(), description: Type.String({ default: '' }), stopProcessAtPort: Type.Array(Type.Number(), { default: [] }), dependsOn: Type.Array(Type.String(), { default: [] }), args: Type.Array(Type.String()), waitToExit: Type.Boolean({ default: true }), link: Type.String({ default: '' }), critical: Type.Boolean({ default: true }), logs: Type.Union( [Type.Literal('tsLogOrchestratorAdapter'), Type.Literal('raw'), Type.Literal('none')], { default: 'raw' } ), type: Type.Union( [Type.Literal('system-dependency'), Type.Literal('secondary')], { default: 'secondary' } ), // If not provided, the default command is "deno" command: Type.Optional(Type.String()), cwd: Type.Optional(Type.String()), }); /** * Orchestrator configurations * logs: log output mode * killProcessesByPort: ports to kill on startup * processes: components to start */ export const OrchestratorConfig = Type.Object({ // Log options. logs: Type.Union([ // No logs Type.Literal("none"), // Print only errors to terminal Type.Literal("stdout-err"), // Print all logs to terminal Type.Literal("stdout"), // Send to TUI and OTEL collector Type.Literal("development"), // Send to OTEL collector and print to terminal Type.Literal("production"), ], { default: "development" }), // This kills default processes that are open in specific ports. kill: Type.Object({ // TODO: kill.auto is workaround to kill processes that are still running from a previous run. // PGLite 5432. Frequently does not shutdown in some cases. // And other ports are checked. auto: Type.Boolean({ default: true }), }, { default: {} }), // Custom user defined processes to launch. // For example you can launch hardhat evm chains, wait to be ready and deploy contracts. processesToLaunch: Type.Array(Type.Union([ProcessLaunch, Type.Boolean()]), { default: [] }), // This can be customized for different locations of the packages. // nightly: jsr:@paimaexample // release: jsr:@paima // local development: @paima packageName: Type.String({ default: "jsr:@effectstream" }), packageVersion: Type.String({ default: "" }), // Processes to start processes: Type.Object({ // Main Processes [ComponentNames.EFFECTSTREAM_SYNC]: Type.Boolean({ default: true }), // Dev Tools [ComponentNames.CHECKER]: Type.Boolean({ default: true }), [ComponentNames.EFFECTSTREAM_PGLITE]: Type.Boolean({ default: false }), [ComponentNames.TMUX]: Type.Boolean({ default: true }), // DevOps [ComponentNames.COLLECTOR]: Type.Boolean({ default: true }), [ComponentNames.LOKI]: Type.Boolean({ default: true }), }, { default: {} }), }); type OrchestratorConfigType = Static<typeof OrchestratorConfig>; type Task = { name: string; config: Static<typeof ProcessLaunch> | SystemProcess; dependencies: Set<string>; dependents: Set<string>; status: 'pending' | 'running' | 'finished' | 'failed'; process?: ProcessComponent; }; type SystemProcess = { name: string; dependsOn: string[]; launch: () => Promise<ProcessComponent>; }; const setupLogging = (config: OrchestratorConfigType): void => { // Let's setup the output mode for logs. switch (config.logs) { case "none": break; case "stdout-err": setStderrOutput(); break; case "stdout": setStdoutOutput(); setStderrOutput(); break; case "development": setTUIStarted(ENV.TUI_LOG_PORT); initTelemetry(); break; case "production": setStdoutOutput(); setStderrOutput(); initTelemetry(); break; } } export async function start( config: OrchestratorConfigType, ): Promise<void> { // This is to redirect all logs.remote to the orchestrator, // where they will be redirected to the collector. Deno.env.set("EFFECTSTREAM_ORCHESTRATOR", "true"); appConfig = config; pFactory = processFactory(config); setupLogging(config); try { const tasks = new Map<string, Task>(); const processesToRun: (Static<typeof ProcessLaunch> | SystemProcess)[] = [...config.processesToLaunch.filter(p => typeof p !== 'boolean')]; // Build task graph for (const processConfig of processesToRun) { if (tasks.has(processConfig.name)) { console.error(`Error: Duplicate process name "${processConfig.name}" found. Process names must be unique.`); await shutdown(1); return; } tasks.set(processConfig.name, { name: processConfig.name, config: processConfig, dependencies: new Set(processConfig.dependsOn), dependents: new Set(), status: 'pending', }); } for (const task of tasks.values()) { for (const depName of task.dependencies) { const depTask = tasks.get(depName); if (depTask) { depTask.dependents.add(task.name); } else { console.error(`Error: Dependency "${depName}" for process "${task.name}" not found.`); await shutdown(1); return; } } } // Start System Processes const startProcess = processFactory(config); // Add system processes if (config.processes[ComponentNames.LOKI]) { await startProcess[ComponentNames.LOKI](); } if (config.processes[ComponentNames.CHECKER]) { await startProcess[ComponentNames.CHECKER](); } if (config.processes[ComponentNames.TMUX]) { await startProcess[ComponentNames.TMUX](); } if (config.processes[ComponentNames.COLLECTOR]) { await startProcess[ComponentNames.COLLECTOR](); } if (config.processes[ComponentNames.EFFECTSTREAM_PGLITE]) { await startProcess[ComponentNames.EFFECTSTREAM_PGLITE](); } if (config.processes[ComponentNames.APPLY_MIGRATIONS]) { await startProcess[ComponentNames.APPLY_MIGRATIONS](); } // Start User-defined Processes const pending = new Set<string>(tasks.keys()); const runningWaitToFinish = new Map<string, Promise<void>>(); const runningProcesses = new Map<string, Promise<void>>(); const finished = new Set<string>(); let circularDependencyLoopCount = 0; const CIRCULAR_DEPENDENCY_THRESHOLD = 50; let lastPendingSnapshot: Set<string> | null = null; const launchTask = async (task: Task): Promise<void> => { task.status = 'running'; let processComponent: ProcessComponent; if ('launch' in task.config) { // System process processComponent = await task.config.launch(); } else { // User-defined process const { name, args, logs, type, link, stopProcessAtPort, critical, command, cwd } = task.config; if (stopProcessAtPort.length > 0) { await dkill({ ports: stopProcessAtPort }); } try { processComponent = $({ command, args, component: name, log: logHandler({}, logs === 'tsLogOrchestratorAdapter' ? tsLogOrchestratorAdapter : undefined), cwd, abortController: type === "system-dependency" ? abortControllers.system : abortControllers.noncritical, link: link, critical: critical, }); } catch (e) { if (e instanceof AbortProcessStart) { // We stopped all processes, a waiting process is trying to start - as the dependency finished. return; } // Unknown error, throw error to be handled by the main loop. throw e; } } task.process = processComponent; const finish = (): void => { task.status = 'finished'; finished.add(task.name); runningWaitToFinish.delete(task.name); runningProcesses.delete(task.name); for (const dependentName of task.dependents) { const dependentTask = tasks.get(dependentName)!; dependentTask.dependencies.delete(task.name); } // Wake up the main loop if (waiter) { waiter(); waiter = null; } }; const waitToExit = 'waitToExit' in task.config ? task.config.waitToExit : true; if (waitToExit) { task.process.process.status.then(finish).catch(async err => { console.error(`Task ${task.name} failed with error: ${err}`); task.status = 'failed'; await shutdown(1, err); }); } else { finish(); } }; let waiter: (() => void) | null = null; while (pending.size > 0 || runningWaitToFinish.size > 0) { const executableTasks = Array.from(pending) .map(name => tasks.get(name)!) .filter(task => task.dependencies.size === 0); if (executableTasks.length > 0) { for (const task of executableTasks) { pending.delete(task.name); const taskPromise = launchTask(task); const waitToExit = 'waitToExit' in task.config ? task.config.waitToExit : true; if (waitToExit) { runningWaitToFinish.set(task.name, taskPromise); } else { runningProcesses.set(task.name, taskPromise); } } } else if (runningWaitToFinish.size > 0) { for (const pendingTaskName of pending) { const pendingTask = tasks.get(pendingTaskName)!; } await new Promise<void>(resolve => { waiter = resolve; }); } else if (pending.size > 0) { // Check if we have the same pending tasks as last iteration const currentPendingSnapshot = new Set(pending); const isSamePending = lastPendingSnapshot && currentPendingSnapshot.size === lastPendingSnapshot.size && [...currentPendingSnapshot].every(task => lastPendingSnapshot!.has(task)); if (isSamePending) { circularDependencyLoopCount++; } else { circularDependencyLoopCount = 0; } lastPendingSnapshot = currentPendingSnapshot; if (circularDependencyLoopCount >= CIRCULAR_DEPENDENCY_THRESHOLD) { console.error('Error: Circular dependency or missing dependency detected.'); console.error('Pending tasks:'); for (const pendingTaskName of pending) { const pendingTask = tasks.get(pendingTaskName)!; console.error(` - ${pendingTask.name} is waiting for: ${[...pendingTask.dependencies].join(', ')}`); } await shutdown(1); return; } } await new Promise(resolve => setTimeout(resolve, 100)); } // Wait for all processes to finish // Launch Paima Engine Main Sync Process await startProcess[ComponentNames.EFFECTSTREAM_SYNC](); } catch (e) { if (!(e instanceof AbortProcessStart)) { await shutdown(1, e); } } } export { appConfig, pFactory }; export const abortControllers = { // Abort controller for all critical processes system: new AbortController(), // Abort controller for all non-critical processes noncritical: new AbortController(), // Abort controller for Developer UI developerUI: new AbortController(), }; export const processFactory = (config: OrchestratorConfigType): Record< ValueOf<typeof LaunchableComponents>, () => Promise<ProcessComponent> > => ({ [ComponentNames.TMUX]: async (): Promise<ProcessComponent> => { if (config.kill.auto) { await dkill({ ports: [ENV.TUI_LOG_PORT] }); } await Tmux.install(); const tmux = new Tmux(); await tmux.startSession(); const tmuxConsole = $({ ...tmux.getAttachCommand(), component: ComponentNames.TMUX, abortController: abortControllers.developerUI, critical: true, // log, this process must no be redirected to the collector // this writes directly to the stdout }); tmuxConsole.process.status.then(() => { tmux.killServer(); }); setForegroundProcess(tmuxConsole.process); return tmuxConsole; }, [ComponentNames.EXPLORER]: async (): Promise<ProcessComponent> => { if (config.kill.auto) { await dkill({ ports: [ENV.EFFECTSTREAM_EXPLORER_PORT] }); } const explorer = $({ args: ["task", "-f", config.packageName + "/explorer", "dev"], component: ComponentNames.EXPLORER, log: logHandler(), abortController: abortControllers.developerUI, critical: false, }); await explorer.process.status; return explorer; }, [ComponentNames.COLLECTOR]: async (): Promise<ProcessComponent> => { if (config.kill.auto) { await dkill({ ports: [ENV.OTEL_COLLECTOR_PORT, 12345] }); // 12345 is the port for the Grafana Alloy web UI } // deno -A npm:@paimaexample/grafana-alloy grafana-alloy const otlpCollector = $({ args: ["-A", "npm:@paimaexample/grafana-alloy", "grafana-alloy"], // collector always has to post logs directly to console // otherwise, it gets stuck in an infinite loop of sending to itself log: logHandler({ disableCollector: true, disableTUI: true, disableStdOut: true, disableStderr: true, }), component: ComponentNames.COLLECTOR, abortController: abortControllers.noncritical, critical: false, }); void otlpCollector.process.status; await (new Deno.Command("wait-on", { args: [`tcp:${ENV.OTEL_COLLECTOR_PORT}`], })).spawn().status; setCollectorStarted(ENV.OTEL_COLLECTOR_PORT); return otlpCollector; }, [ComponentNames.LOKI]: async (): Promise<ProcessComponent> => { if (config.kill.auto) { await dkill({ ports: [3100] }); } const loki = $({ args: ["-A", "npm:@paimaexample/grafana-loki", "grafana-loki"], component: ComponentNames.LOKI, log: logHandler( { disableCollector: true, disableTUI: true, disableStdOut: true, disableStderr: true, } ), abortController: abortControllers.noncritical, critical: false, }); void loki.process.status; return loki; }, [ComponentNames.CHECKER]: async (): Promise<ProcessComponent> => { const checker = $({ args: ["task", "check"], component: ComponentNames.CHECKER, stdout: "inherit", stderr: "inherit", abortController: abortControllers.noncritical, critical: true, }); await Promise.all([checker.process.status]); return checker; }, [ComponentNames.EFFECTSTREAM_SYNC]: async (): Promise<ProcessComponent> => { if (config.kill.auto) { await dkill({ ports: [ENV.EFFECTSTREAM_API_PORT] }); } const node = $({ args: ["task", "node:start"], log: logHandler({}, tsLogOrchestratorAdapter), component: ComponentNames.EFFECTSTREAM_SYNC, namespace: [], // these should get a "paima" namespace added to them automatically abortController: abortControllers.system, // Allow sync to fail without killing the orchestrator; TUI/R restart can recover it. critical: false, }); await Promise.all([node.process.status]); return node; }, [ComponentNames.EFFECTSTREAM_PGLITE]: async (): Promise<ProcessComponent> => { if (config.kill.auto) { await dkill({ ports: [ENV.DB_PORT] }); } const paimaDb = $({ // TODO: run pgtyped:up only depending on parameters? args: [ "run", "-A", config.packageName + "/db/start-pglite", "--port", String(ENV.DB_PORT), ], log: logHandler(), component: ComponentNames.EFFECTSTREAM_PGLITE, abortController: abortControllers.system, critical: true, }); void paimaDb.process.status; // need to await sub-service start below await (new Deno.Command("wait-on", { args: [`tcp:${ENV.DB_PORT}`], })).spawn().status; return paimaDb; }, [ComponentNames.APPLY_MIGRATIONS]: async (): Promise<ProcessComponent> => { const externalPaimaDb = $({ args: [ "run", "-A", config.packageName + "/db/apply-migrations", ], component: ComponentNames.APPLY_MIGRATIONS, log: logHandler({}, tsLogOrchestratorAdapter), abortController: abortControllers.system, critical: true, }); await externalPaimaDb.process.status; return externalPaimaDb; }, });