Skip to main content
Home
This release is 46 versions behind 0.3.104 — the latest version of @paimaexample/collector. Jump to latest
It is unknown whether this package works with Cloudflare Workers, Node.js, Deno, Bun, Browsers
It is unknown whether this package works with Cloudflare Workers
It is unknown whether this package works with Node.js
It is unknown whether this package works with Deno
It is unknown whether this package works with Bun
It is unknown whether this package works with Browsers
JSR Score
58%
Published
2 months ago (0.3.50)
Package root>src>http-server.ts
import fastify from "npm:fastify@^5.4.0"; import { type IInstrumentationScope, type ILogRecord, TExportLogsServiceRequest, TExportMetricsServiceRequest, TExportTraceServiceRequest, } from "./typebox.ts"; import { Value } from "npm:/@sinclair/typebox@^0.34.30/value"; import { otelStringify, parseFixed64 } from "./parse.ts"; import { ComponentNames, log as logger, type Namespace, SeverityNumber, } from "jsr:@paimaexample/log@^0.3.50"; import { ENV } from "jsr:@paimaexample/utils@^0.3.50/node-env"; // this file is based on https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/ // TODO: maybe this should run on a different port and then forward to 4318 const PORT = ENV.OTEL_COLLECTOR_PORT; // default port for OTLP HTTP traces const server = fastify(); function printError(e: any, namespace: string, request: any) { let requestBody; try { requestBody = JSON.stringify(request.body, null, 2); } catch (e) { requestBody = ""; } logger.local( ComponentNames.COLLECTOR, [namespace], SeverityNumber.ERROR, (log) => log(e, requestBody), ); } // Export in batches of 50ms to avoid getting in the incorrect order. const TUI_LOG_URL = ENV.TUI_LOG_URL + ":" + ENV.TUI_LOG_PORT; let lastExportTime = 0; type OTelExportData = { component: string; namespace: Namespace; level: SeverityNumber; message: string | string[]; }; const exportBatch: OTelExportData[] = []; // Export in batches of 50ms function exportData(data: OTelExportData) { exportBatch.push({ ...data, message: Array.isArray(data.message) ? data.message : [data.message], }); const now = Date.now(); if (now - lastExportTime > 50) { fetch(TUI_LOG_URL + "/v1/data", { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify(exportBatch), }).catch(() => {}); exportBatch.length = 0; lastExportTime = now; } } // DANGER: these endpoints only support JSON and reject other OpenTelemetry data formats // in the future, we can support protobuf using `otlp-transformer` once it has a stable release server.post("/v1/traces", async (request: any, reply: any) => { try { const parsed = Value.Parse(TExportTraceServiceRequest, request.body); // TODO: do we want to print traces? for (const log of parsed.resourceSpans ?? []) { for (const scopeSpans of log.scopeSpans) { const { scope } = scopeSpans; if (scope == null || scope.name == null) { continue; } for (const span of scopeSpans.spans ?? []) { const start = Temporal.Instant.fromEpochMilliseconds( Number(parseFixed64(span.startTimeUnixNano) / 1_000_000n), ); const end = Temporal.Instant.fromEpochMilliseconds( Number(parseFixed64(span.endTimeUnixNano) / 1_000_000n), ); const duration = start.until(end); const seconds = duration.total("seconds"); exportData({ component: scope.name, namespace: [span.name], level: SeverityNumber.DEBUG, // toFixed(3) means we can display up to ms precision // adding more tends to lead to outputs like 0.30000000000000004 message: `ended (${seconds.toFixed(3)}s)`, }); } } } } catch (e) { printError(e, "/v1/traces", request); throw e; } // TODO: proper response reply.status(200).send("Traces received"); }); server.post("/v1/metrics", async (request: any, reply: any) => { try { // TODO: do we want to print metrics? const parsed = Value.Parse(TExportMetricsServiceRequest, request.body); for (const metric of parsed.resourceMetrics ?? []) { for (const scopeMetric of metric.scopeMetrics) { const scope = scopeMetric.scope; if (scope == null || scope.name == null) { continue; } for (const metric of scopeMetric.metrics) { // trying to figure out how to print all of these to console is non-trivial // especially since a lot of it is very metric-specific like parsing attributes exportData({ component: scope.name, namespace: [metric.name], level: SeverityNumber.TRACE, message: "updated", }); } } } } catch (e) { printError(e, "/v1/metrics", request); throw e; } // TODO: proper response reply.status(200).send("Metrics received"); }); server.post("/v1/logs", async (request: any, reply: any) => { try { const parsed = Value.Parse(TExportLogsServiceRequest, request.body); for (const log of parsed.resourceLogs ?? []) { for (const scopeLog of log.scopeLogs) { const scope = scopeLog.scope; if (scope == null || scope.name == null) { continue; } for (const logRecord of scopeLog.logRecords ?? []) { if (scope.name.startsWith("dolos")) { const parts = handleDolos(scope, logRecord); if (parts == null) { continue; } exportData(parts); return; } // TODO: replace the `tslog` timestamp with timeUnixNano const namespace = (() => { const attrNamespace = logRecord.attributes.find((a) => a.key === "namespace" )?.value; if (attrNamespace == null) { return []; } return JSON.parse(otelStringify(attrNamespace)); })(); exportData({ component: scope.name, namespace: namespace, level: (logRecord.severityNumber as unknown as SeverityNumber) ?? SeverityNumber.UNSPECIFIED, message: JSON.parse(otelStringify(logRecord.body)), }); } } } } catch (e) { printError(e, "/v1/logs", request); throw e; } // TODO: proper response reply.status(200).send("Logs received"); }); function handleDolos( scope: IInstrumentationScope, record: ILogRecord, ): undefined | { component: string; namespace: Namespace; level: SeverityNumber; message: string[]; } { // overly noisy if (scope.name === "dolos::sync::roll") { return undefined; } const namespaceParts = scope.name.split("::"); const attributes = record.attributes.map((attr) => `${attr.key}: ${JSON.parse(otelStringify(attr.value))}` ); return { component: namespaceParts[0], namespace: namespaceParts.slice(1), level: (record.severityNumber as unknown as SeverityNumber) ?? SeverityNumber.UNSPECIFIED, message: [JSON.parse(otelStringify(record.body)), ...attributes], }; } // Start the server server.listen({ port: PORT, host: "0.0.0.0" }, (err: any, address: any) => { if (err) { logger.local( ComponentNames.COLLECTOR, [], SeverityNumber.FATAL, (log) => log(err), ); Deno.exit(1); } logger.local( ComponentNames.COLLECTOR, [], SeverityNumber.INFO, (log) => log(`in-memory OpenTelemetry Collector running on ${address}`), ); });