This release is 10 versions behind 0.3.130 — the latest version of @paimaexample/orchestrator. Jump to latest
@paimaexample/orchestrator@0.3.120
Works with
•JSR Score0%•It is unknown whether this package works with Cloudflare Workers, Node.js, Deno, Bun, Browsers




Downloads3/wk
•Publisheda month ago (0.3.120)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545#!/usr/bin/env -S deno run --allow-all import { ENV } from "jsr:@paimaexample/utils@^0.3.120/node-env"; import type { ValueOf } from "jsr:@paimaexample/utils@^0.3.120"; import "./http-server.ts"; import { dkill } from "jsr:@sylc/dkill@^0.12.3"; import { initTelemetry, logHandler, setCollectorStarted, setTUIStarted, setStdoutOutput, setStderrOutput, tsLogOrchestratorAdapter, } from "./logging.ts"; import { $, AbortProcessStart, type ProcessComponent, setForegroundProcess, shutdown, } from "./process.ts"; import { ComponentNames } from "jsr:@paimaexample/log@^0.3.120"; import { Tmux } from "./tmux/tmux.ts"; import type { LaunchableComponents } from "jsr:@paimaexample/log@^0.3.120"; import { type Static, Type } from "npm:@sinclair/typebox@0.34.41"; let appConfig: OrchestratorConfigType | null = null; let pFactory: ReturnType<typeof processFactory> | null = null; const ProcessLaunch = Type.Object({ name: Type.String(), description: Type.String({ default: '' }), stopProcessAtPort: Type.Array(Type.Number(), { default: [] }), dependsOn: Type.Array(Type.String(), { default: [] }), args: Type.Array(Type.String()), waitToExit: Type.Boolean({ default: true }), link: Type.String({ default: '' }), critical: Type.Boolean({ default: true }), logs: Type.Union( [Type.Literal('tsLogOrchestratorAdapter'), Type.Literal('raw'), Type.Literal('none')], { default: 'raw' } ), type: Type.Union( [Type.Literal('system-dependency'), Type.Literal('secondary')], { default: 'secondary' } ), // If not provided, the default command is "deno" command: Type.Optional(Type.String()), cwd: Type.Optional(Type.String()), }); /** * Orchestrator configurations * logs: log output mode * killProcessesByPort: ports to kill on startup * processes: components to start */ export const OrchestratorConfig = Type.Object({ // Log options. logs: Type.Union([ // No logs Type.Literal("none"), // Print only errors to terminal Type.Literal("stdout-err"), // Print all logs to terminal Type.Literal("stdout"), // Send to TUI and OTEL collector Type.Literal("development"), // Send to OTEL collector and print to terminal Type.Literal("production"), ], { default: "development" }), // This kills default processes that are open in specific ports. kill: Type.Object({ // TODO: kill.auto is workaround to kill processes that are still running from a previous run. // PGLite 5432. Frequently does not shutdown in some cases. // And other ports are checked. auto: Type.Boolean({ default: true }), }, { default: {} }), // Custom user defined processes to launch. // For example you can launch hardhat evm chains, wait to be ready and deploy contracts. processesToLaunch: Type.Array(Type.Union([ProcessLaunch, Type.Boolean()]), { default: [] }), // This can be customized for different locations of the packages. // nightly: jsr:@paimaexample // release: jsr:@paima // local development: @paima packageName: Type.String({ default: "jsr:@effectstream" }), packageVersion: Type.String({ default: "" }), // Processes to start processes: Type.Object({ // Main Processes [ComponentNames.EFFECTSTREAM_SYNC]: Type.Boolean({ default: true }), // Dev Tools [ComponentNames.CHECKER]: Type.Boolean({ default: true }), [ComponentNames.EFFECTSTREAM_PGLITE]: Type.Boolean({ default: false }), [ComponentNames.TMUX]: Type.Boolean({ default: true }), // DevOps [ComponentNames.COLLECTOR]: Type.Boolean({ default: true }), [ComponentNames.LOKI]: Type.Boolean({ default: true }), }, { default: {} }), }); type OrchestratorConfigType = Static<typeof OrchestratorConfig>; type Task = { name: string; config: Static<typeof ProcessLaunch> | SystemProcess; dependencies: Set<string>; dependents: Set<string>; status: 'pending' | 'running' | 'finished' | 'failed'; process?: ProcessComponent; }; type SystemProcess = { name: string; dependsOn: string[]; launch: () => Promise<ProcessComponent>; }; const setupLogging = (config: OrchestratorConfigType): void => { // Let's setup the output mode for logs. switch (config.logs) { case "none": break; case "stdout-err": setStderrOutput(); break; case "stdout": setStdoutOutput(); setStderrOutput(); break; case "development": setTUIStarted(ENV.TUI_LOG_PORT); initTelemetry(); break; case "production": setStdoutOutput(); setStderrOutput(); initTelemetry(); break; } } export async function start( config: OrchestratorConfigType, ): Promise<void> { // This is to redirect all logs.remote to the orchestrator, // where they will be redirected to the collector. Deno.env.set("EFFECTSTREAM_ORCHESTRATOR", "true"); appConfig = config; pFactory = processFactory(config); setupLogging(config); try { const tasks = new Map<string, Task>(); const processesToRun: (Static<typeof ProcessLaunch> | SystemProcess)[] = [...config.processesToLaunch.filter(p => typeof p !== 'boolean')]; // Build task graph for (const processConfig of processesToRun) { if (tasks.has(processConfig.name)) { console.error(`Error: Duplicate process name "${processConfig.name}" found. Process names must be unique.`); await shutdown(1); return; } tasks.set(processConfig.name, { name: processConfig.name, config: processConfig, dependencies: new Set(processConfig.dependsOn), dependents: new Set(), status: 'pending', }); } for (const task of tasks.values()) { for (const depName of task.dependencies) { const depTask = tasks.get(depName); if (depTask) { depTask.dependents.add(task.name); } else { console.error(`Error: Dependency "${depName}" for process "${task.name}" not found.`); await shutdown(1); return; } } } // Start System Processes const startProcess = processFactory(config); // Add system processes if (config.processes[ComponentNames.LOKI]) { await startProcess[ComponentNames.LOKI](); } if (config.processes[ComponentNames.CHECKER]) { await startProcess[ComponentNames.CHECKER](); } if (config.processes[ComponentNames.TMUX]) { await startProcess[ComponentNames.TMUX](); } if (config.processes[ComponentNames.COLLECTOR]) { await startProcess[ComponentNames.COLLECTOR](); } if (config.processes[ComponentNames.EFFECTSTREAM_PGLITE]) { await startProcess[ComponentNames.EFFECTSTREAM_PGLITE](); } if (config.processes[ComponentNames.APPLY_MIGRATIONS]) { await startProcess[ComponentNames.APPLY_MIGRATIONS](); } // Start User-defined Processes const pending = new Set<string>(tasks.keys()); const runningWaitToFinish = new Map<string, Promise<void>>(); const runningProcesses = new Map<string, Promise<void>>(); const finished = new Set<string>(); let circularDependencyLoopCount = 0; const CIRCULAR_DEPENDENCY_THRESHOLD = 50; let lastPendingSnapshot: Set<string> | null = null; const launchTask = async (task: Task): Promise<void> => { task.status = 'running'; let processComponent: ProcessComponent; if ('launch' in task.config) { // System process processComponent = await task.config.launch(); } else { // User-defined process const { name, args, logs, type, link, stopProcessAtPort, critical, command, cwd } = task.config; if (stopProcessAtPort.length > 0) { await dkill({ ports: stopProcessAtPort }); } try { processComponent = $({ command, args, component: name, log: logHandler({}, logs === 'tsLogOrchestratorAdapter' ? tsLogOrchestratorAdapter : undefined), cwd, abortController: type === "system-dependency" ? abortControllers.system : abortControllers.noncritical, link: link, critical: critical, }); } catch (e) { if (e instanceof AbortProcessStart) { // We stopped all processes, a waiting process is trying to start - as the dependency finished. return; } // Unknown error, throw error to be handled by the main loop. throw e; } } task.process = processComponent; const finish = (): void => { task.status = 'finished'; finished.add(task.name); runningWaitToFinish.delete(task.name); runningProcesses.delete(task.name); for (const dependentName of task.dependents) { const dependentTask = tasks.get(dependentName)!; dependentTask.dependencies.delete(task.name); } // Wake up the main loop if (waiter) { waiter(); waiter = null; } }; const waitToExit = 'waitToExit' in task.config ? task.config.waitToExit : true; if (waitToExit) { task.process.process.status.then(finish).catch(async err => { console.error(`Task ${task.name} failed with error: ${err}`); task.status = 'failed'; await shutdown(1, err); }); } else { finish(); } }; let waiter: (() => void) | null = null; while (pending.size > 0 || runningWaitToFinish.size > 0) { const executableTasks = Array.from(pending) .map(name => tasks.get(name)!) .filter(task => task.dependencies.size === 0); if (executableTasks.length > 0) { for (const task of executableTasks) { pending.delete(task.name); const taskPromise = launchTask(task); const waitToExit = 'waitToExit' in task.config ? task.config.waitToExit : true; if (waitToExit) { runningWaitToFinish.set(task.name, taskPromise); } else { runningProcesses.set(task.name, taskPromise); } } } else if (runningWaitToFinish.size > 0) { for (const pendingTaskName of pending) { const pendingTask = tasks.get(pendingTaskName)!; } await new Promise<void>(resolve => { waiter = resolve; }); } else if (pending.size > 0) { // Check if we have the same pending tasks as last iteration const currentPendingSnapshot = new Set(pending); const isSamePending = lastPendingSnapshot && currentPendingSnapshot.size === lastPendingSnapshot.size && [...currentPendingSnapshot].every(task => lastPendingSnapshot!.has(task)); if (isSamePending) { circularDependencyLoopCount++; } else { circularDependencyLoopCount = 0; } lastPendingSnapshot = currentPendingSnapshot; if (circularDependencyLoopCount >= CIRCULAR_DEPENDENCY_THRESHOLD) { console.error('Error: Circular dependency or missing dependency detected.'); console.error('Pending tasks:'); for (const pendingTaskName of pending) { const pendingTask = tasks.get(pendingTaskName)!; console.error(` - ${pendingTask.name} is waiting for: ${[...pendingTask.dependencies].join(', ')}`); } await shutdown(1); return; } } await new Promise(resolve => setTimeout(resolve, 100)); } // Wait for all processes to finish // Launch Paima Engine Main Sync Process await startProcess[ComponentNames.EFFECTSTREAM_SYNC](); } catch (e) { if (!(e instanceof AbortProcessStart)) { await shutdown(1, e); } } } export { appConfig, pFactory }; export const abortControllers = { // Abort controller for all critical processes system: new AbortController(), // Abort controller for all non-critical processes noncritical: new AbortController(), // Abort controller for Developer UI developerUI: new AbortController(), }; export const processFactory = (config: OrchestratorConfigType): Record< ValueOf<typeof LaunchableComponents>, () => Promise<ProcessComponent> > => ({ [ComponentNames.TMUX]: async (): Promise<ProcessComponent> => { if (config.kill.auto) { await dkill({ ports: [ENV.TUI_LOG_PORT] }); } await Tmux.install(); const tmux = new Tmux(); await tmux.startSession(); const tmuxConsole = $({ ...tmux.getAttachCommand(), component: ComponentNames.TMUX, abortController: abortControllers.developerUI, critical: true, // log, this process must no be redirected to the collector // this writes directly to the stdout }); tmuxConsole.process.status.then(() => { tmux.killServer(); }); setForegroundProcess(tmuxConsole.process); return tmuxConsole; }, [ComponentNames.EXPLORER]: async (): Promise<ProcessComponent> => { if (config.kill.auto) { await dkill({ ports: [ENV.EFFECTSTREAM_EXPLORER_PORT] }); } const explorer = $({ args: ["task", "-f", config.packageName + "/explorer", "dev"], component: ComponentNames.EXPLORER, log: logHandler(), abortController: abortControllers.developerUI, critical: false, }); await explorer.process.status; return explorer; }, [ComponentNames.COLLECTOR]: async (): Promise<ProcessComponent> => { if (config.kill.auto) { await dkill({ ports: [ENV.OTEL_COLLECTOR_PORT, 12345] }); // 12345 is the port for the Grafana Alloy web UI } // deno -A npm:@paimaexample/grafana-alloy grafana-alloy const otlpCollector = $({ args: ["-A", "npm:@paimaexample/grafana-alloy", "grafana-alloy"], // collector always has to post logs directly to console // otherwise, it gets stuck in an infinite loop of sending to itself log: logHandler({ disableCollector: true, disableTUI: true, disableStdOut: true, disableStderr: true, }), component: ComponentNames.COLLECTOR, abortController: abortControllers.noncritical, critical: false, }); void otlpCollector.process.status; await (new Deno.Command("wait-on", { args: [`tcp:${ENV.OTEL_COLLECTOR_PORT}`], })).spawn().status; setCollectorStarted(ENV.OTEL_COLLECTOR_PORT); return otlpCollector; }, [ComponentNames.LOKI]: async (): Promise<ProcessComponent> => { if (config.kill.auto) { await dkill({ ports: [3100] }); } const loki = $({ args: ["-A", "npm:@paimaexample/grafana-loki", "grafana-loki"], component: ComponentNames.LOKI, log: logHandler( { disableCollector: true, disableTUI: true, disableStdOut: true, disableStderr: true, } ), abortController: abortControllers.noncritical, critical: false, }); void loki.process.status; return loki; }, [ComponentNames.CHECKER]: async (): Promise<ProcessComponent> => { const checker = $({ args: ["task", "check"], component: ComponentNames.CHECKER, stdout: "inherit", stderr: "inherit", abortController: abortControllers.noncritical, critical: true, }); await Promise.all([checker.process.status]); return checker; }, [ComponentNames.EFFECTSTREAM_SYNC]: async (): Promise<ProcessComponent> => { if (config.kill.auto) { await dkill({ ports: [ENV.EFFECTSTREAM_API_PORT] }); } const node = $({ args: ["task", "node:start"], log: logHandler({}, tsLogOrchestratorAdapter), component: ComponentNames.EFFECTSTREAM_SYNC, namespace: [], // these should get a "paima" namespace added to them automatically abortController: abortControllers.system, // Allow sync to fail without killing the orchestrator; TUI/R restart can recover it. critical: false, }); await Promise.all([node.process.status]); return node; }, [ComponentNames.EFFECTSTREAM_PGLITE]: async (): Promise<ProcessComponent> => { if (config.kill.auto) { await dkill({ ports: [ENV.DB_PORT] }); } const paimaDb = $({ // TODO: run pgtyped:up only depending on parameters? args: [ "run", "-A", config.packageName + "/db/start-pglite", "--port", String(ENV.DB_PORT), ], log: logHandler(), component: ComponentNames.EFFECTSTREAM_PGLITE, abortController: abortControllers.system, critical: true, }); void paimaDb.process.status; // need to await sub-service start below await (new Deno.Command("wait-on", { args: [`tcp:${ENV.DB_PORT}`], })).spawn().status; return paimaDb; }, [ComponentNames.APPLY_MIGRATIONS]: async (): Promise<ProcessComponent> => { const externalPaimaDb = $({ args: [ "run", "-A", config.packageName + "/db/apply-migrations", ], component: ComponentNames.APPLY_MIGRATIONS, log: logHandler({}, tsLogOrchestratorAdapter), abortController: abortControllers.system, critical: true, }); await externalPaimaDb.process.status; return externalPaimaDb; }, });