@consensus-tools/telemetry provides lightweight observability for consensus-tools. It buffers telemetry events, dispatches them to pluggable sinks, traces operations with spans, and redacts sensitive metadata before export.
pnpm add @consensus-tools/telemetry
ℹPeer dependency
This package requires @consensus-tools/schemas as a peer dependency.
import { EventBuffer, ConsoleSink, FileSink, createEvent } from "@consensus-tools/telemetry";
const buffer = new EventBuffer([new ConsoleSink(), new FileSink("./events.ndjson")]);
buffer.push(createEvent("job.created", "job_123", { title: "Review PR" }));
buffer.push(createEvent("job.resolved", "job_123", { winner: "agent-1" }));
await buffer.flush();
await buffer.close();
Buffered event pipeline. Auto-flushes at 100 events or every 5 seconds by default.
class EventBuffer {
constructor(sinks: Sink[], maxBufferSize?: number, flushIntervalMs?: number)
push(event: TelemetryEvent): void
flush(): Promise<void>
close(): Promise<void>
}
- sinks
Sink[] -- one or more output sinks
- maxBufferSize
number (optional) -- flush threshold, default 100
- flushIntervalMs
number (optional) -- auto-flush interval, default 5000
Create a telemetry event with a timestamp and optional metadata.
function createEvent(type: string, jobId?: string, metadata?: Record<string, unknown>): TelemetryEvent
Create a trace span to measure operation duration.
function createSpan(name: string, parentId?: string): TraceSpan
const parent = createSpan("workflow.run");
const child = createSpan("step.evaluate", parent.spanId);
Close a span, setting endTime and status. Pass an error string to mark the span as failed.
function closeSpan(span: TraceSpan, error?: string): TraceSpan
const span = createSpan("job.resolve");
const finished = closeSpan(span); // status: "ok"
const errSpan = closeSpan(span, "Timeout"); // status: "error"
Strip sensitive fields (apiKey, secret, token, password, credential) from event metadata.
function redact(event: TelemetryEvent): TelemetryEvent
const event = createEvent("auth.check", undefined, { user: "alice", token: "sk-abc" });
const safe = redact(event);
// safe.metadata => { user: "alice", token: "[REDACTED]" }
⚠Always redact before export
Call redact() on events before sending them to external sinks to avoid leaking API keys, tokens, or passwords.
Logs [timestamp] type jobId to stdout.
class ConsoleSink implements Sink {
name: "console"
}
Appends events as NDJSON to a file.
class FileSink implements Sink {
constructor(path: string)
name: "file"
}
Implement this interface to send events to any destination.
interface Sink {
name: string;
write(event: TelemetryEvent): Promise<void>;
writeSpan?(span: TraceSpan): Promise<void>;
flush?(): Promise<void>;
close?(): Promise<void>;
}
import type { Sink } from "@consensus-tools/telemetry";
import type { TelemetryEvent } from "@consensus-tools/schemas";
const httpSink: Sink = {
name: "http",
async write(event: TelemetryEvent) {
await fetch("https://telemetry.example.com/v1/events", {
method: "POST",
body: JSON.stringify(event),
});
},
async flush() { /* batch commit */ },
async close() { /* cleanup */ },
};
const buffer = new EventBuffer([httpSink], 50, 10_000);
import { createSpan, closeSpan } from "@consensus-tools/telemetry";
const parent = createSpan("workflow.run");
const child = createSpan("step.evaluate", parent.spanId);
// ... do work ...
closeSpan(child);
closeSpan(parent);
- Schemas package -- defines
TelemetryEvent and TraceSpan types
- Architecture -- where telemetry fits in the event system