This release is 46 versions behind 0.3.104 — the latest version of @paimaexample/collector. Jump to latest
@paimaexample/collector@0.3.50
It is unknown whether this package works with Cloudflare Workers, Node.js, Deno, Bun, Browsers




JSR Score
58%
Published
2 months ago (0.3.50)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233import fastify from "npm:fastify@^5.4.0"; import { type IInstrumentationScope, type ILogRecord, TExportLogsServiceRequest, TExportMetricsServiceRequest, TExportTraceServiceRequest, } from "./typebox.ts"; import { Value } from "npm:/@sinclair/typebox@^0.34.30/value"; import { otelStringify, parseFixed64 } from "./parse.ts"; import { ComponentNames, log as logger, type Namespace, SeverityNumber, } from "jsr:@paimaexample/log@^0.3.50"; import { ENV } from "jsr:@paimaexample/utils@^0.3.50/node-env"; // this file is based on https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/ // TODO: maybe this should run on a different port and then forward to 4318 const PORT = ENV.OTEL_COLLECTOR_PORT; // default port for OTLP HTTP traces const server = fastify(); function printError(e: any, namespace: string, request: any) { let requestBody; try { requestBody = JSON.stringify(request.body, null, 2); } catch (e) { requestBody = ""; } logger.local( ComponentNames.COLLECTOR, [namespace], SeverityNumber.ERROR, (log) => log(e, requestBody), ); } // Export in batches of 50ms to avoid getting in the incorrect order. const TUI_LOG_URL = ENV.TUI_LOG_URL + ":" + ENV.TUI_LOG_PORT; let lastExportTime = 0; type OTelExportData = { component: string; namespace: Namespace; level: SeverityNumber; message: string | string[]; }; const exportBatch: OTelExportData[] = []; // Export in batches of 50ms function exportData(data: OTelExportData) { exportBatch.push({ ...data, message: Array.isArray(data.message) ? data.message : [data.message], }); const now = Date.now(); if (now - lastExportTime > 50) { fetch(TUI_LOG_URL + "/v1/data", { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify(exportBatch), }).catch(() => {}); exportBatch.length = 0; lastExportTime = now; } } // DANGER: these endpoints only support JSON and reject other OpenTelemetry data formats // in the future, we can support protobuf using `otlp-transformer` once it has a stable release server.post("/v1/traces", async (request: any, reply: any) => { try { const parsed = Value.Parse(TExportTraceServiceRequest, request.body); // TODO: do we want to print traces? for (const log of parsed.resourceSpans ?? []) { for (const scopeSpans of log.scopeSpans) { const { scope } = scopeSpans; if (scope == null || scope.name == null) { continue; } for (const span of scopeSpans.spans ?? []) { const start = Temporal.Instant.fromEpochMilliseconds( Number(parseFixed64(span.startTimeUnixNano) / 1_000_000n), ); const end = Temporal.Instant.fromEpochMilliseconds( Number(parseFixed64(span.endTimeUnixNano) / 1_000_000n), ); const duration = start.until(end); const seconds = duration.total("seconds"); exportData({ component: scope.name, namespace: [span.name], level: SeverityNumber.DEBUG, // toFixed(3) means we can display up to ms precision // adding more tends to lead to outputs like 0.30000000000000004 message: `ended (${seconds.toFixed(3)}s)`, }); } } } } catch (e) { printError(e, "/v1/traces", request); throw e; } // TODO: proper response reply.status(200).send("Traces received"); }); server.post("/v1/metrics", async (request: any, reply: any) => { try { // TODO: do we want to print metrics? const parsed = Value.Parse(TExportMetricsServiceRequest, request.body); for (const metric of parsed.resourceMetrics ?? []) { for (const scopeMetric of metric.scopeMetrics) { const scope = scopeMetric.scope; if (scope == null || scope.name == null) { continue; } for (const metric of scopeMetric.metrics) { // trying to figure out how to print all of these to console is non-trivial // especially since a lot of it is very metric-specific like parsing attributes exportData({ component: scope.name, namespace: [metric.name], level: SeverityNumber.TRACE, message: "updated", }); } } } } catch (e) { printError(e, "/v1/metrics", request); throw e; } // TODO: proper response reply.status(200).send("Metrics received"); }); server.post("/v1/logs", async (request: any, reply: any) => { try { const parsed = Value.Parse(TExportLogsServiceRequest, request.body); for (const log of parsed.resourceLogs ?? []) { for (const scopeLog of log.scopeLogs) { const scope = scopeLog.scope; if (scope == null || scope.name == null) { continue; } for (const logRecord of scopeLog.logRecords ?? []) { if (scope.name.startsWith("dolos")) { const parts = handleDolos(scope, logRecord); if (parts == null) { continue; } exportData(parts); return; } // TODO: replace the `tslog` timestamp with timeUnixNano const namespace = (() => { const attrNamespace = logRecord.attributes.find((a) => a.key === "namespace" )?.value; if (attrNamespace == null) { return []; } return JSON.parse(otelStringify(attrNamespace)); })(); exportData({ component: scope.name, namespace: namespace, level: (logRecord.severityNumber as unknown as SeverityNumber) ?? SeverityNumber.UNSPECIFIED, message: JSON.parse(otelStringify(logRecord.body)), }); } } } } catch (e) { printError(e, "/v1/logs", request); throw e; } // TODO: proper response reply.status(200).send("Logs received"); }); function handleDolos( scope: IInstrumentationScope, record: ILogRecord, ): undefined | { component: string; namespace: Namespace; level: SeverityNumber; message: string[]; } { // overly noisy if (scope.name === "dolos::sync::roll") { return undefined; } const namespaceParts = scope.name.split("::"); const attributes = record.attributes.map((attr) => `${attr.key}: ${JSON.parse(otelStringify(attr.value))}` ); return { component: namespaceParts[0], namespace: namespaceParts.slice(1), level: (record.severityNumber as unknown as SeverityNumber) ?? SeverityNumber.UNSPECIFIED, message: [JSON.parse(otelStringify(record.body)), ...attributes], }; } // Start the server server.listen({ port: PORT, host: "0.0.0.0" }, (err: any, address: any) => { if (err) { logger.local( ComponentNames.COLLECTOR, [], SeverityNumber.FATAL, (log) => log(err), ); Deno.exit(1); } logger.local( ComponentNames.COLLECTOR, [], SeverityNumber.INFO, (log) => log(`in-memory OpenTelemetry Collector running on ${address}`), ); });