Telemetry

Lightweight observability with buffered events, pluggable sinks, tracing spans, and sensitive data redaction.

Overview

@consensus-tools/telemetry provides lightweight observability for consensus-tools. It buffers telemetry events, dispatches them to pluggable sinks, traces operations with spans, and redacts sensitive metadata before export.

Installation

pnpm add @consensus-tools/telemetry

Peer dependency

This package requires @consensus-tools/schemas as a peer dependency.

Quick start

import { EventBuffer, ConsoleSink, FileSink, createEvent } from "@consensus-tools/telemetry";

const buffer = new EventBuffer([new ConsoleSink(), new FileSink("./events.ndjson")]);

buffer.push(createEvent("job.created", "job_123", { title: "Review PR" }));
buffer.push(createEvent("job.resolved", "job_123", { winner: "agent-1" }));

await buffer.flush();
await buffer.close();

API reference

EventBuffer

Buffered event pipeline. Auto-flushes at 100 events or every 5 seconds by default.

class EventBuffer {
  constructor(sinks: Sink[], maxBufferSize?: number, flushIntervalMs?: number)
  push(event: TelemetryEvent): void
  flush(): Promise<void>
  close(): Promise<void>
}
  • sinks Sink[] -- one or more output sinks
  • maxBufferSize number (optional) -- flush threshold, default 100
  • flushIntervalMs number (optional) -- auto-flush interval, default 5000

createEvent(type, jobId?, metadata?)

Create a telemetry event with a timestamp and optional metadata.

function createEvent(type: string, jobId?: string, metadata?: Record<string, unknown>): TelemetryEvent

createSpan(name, parentId?)

Create a trace span to measure operation duration.

function createSpan(name: string, parentId?: string): TraceSpan
const parent = createSpan("workflow.run");
const child = createSpan("step.evaluate", parent.spanId);

closeSpan(span, error?)

Close a span, setting endTime and status. Pass an error string to mark the span as failed.

function closeSpan(span: TraceSpan, error?: string): TraceSpan
const span = createSpan("job.resolve");
const finished = closeSpan(span);           // status: "ok"
const errSpan = closeSpan(span, "Timeout"); // status: "error"

redact(event)

Strip sensitive fields (apiKey, secret, token, password, credential) from event metadata.

function redact(event: TelemetryEvent): TelemetryEvent
const event = createEvent("auth.check", undefined, { user: "alice", token: "sk-abc" });
const safe = redact(event);
// safe.metadata => { user: "alice", token: "[REDACTED]" }

Always redact before export

Call redact() on events before sending them to external sinks to avoid leaking API keys, tokens, or passwords.

ConsoleSink

Logs [timestamp] type jobId to stdout.

class ConsoleSink implements Sink {
  name: "console"
}

FileSink

Appends events as NDJSON to a file.

class FileSink implements Sink {
  constructor(path: string)
  name: "file"
}

Sink (interface)

Implement this interface to send events to any destination.

interface Sink {
  name: string;
  write(event: TelemetryEvent): Promise<void>;
  writeSpan?(span: TraceSpan): Promise<void>;
  flush?(): Promise<void>;
  close?(): Promise<void>;
}

Examples

Custom HTTP sink

import type { Sink } from "@consensus-tools/telemetry";
import type { TelemetryEvent } from "@consensus-tools/schemas";

const httpSink: Sink = {
  name: "http",
  async write(event: TelemetryEvent) {
    await fetch("https://telemetry.example.com/v1/events", {
      method: "POST",
      body: JSON.stringify(event),
    });
  },
  async flush() { /* batch commit */ },
  async close() { /* cleanup */ },
};

const buffer = new EventBuffer([httpSink], 50, 10_000);

Nested tracing

import { createSpan, closeSpan } from "@consensus-tools/telemetry";

const parent = createSpan("workflow.run");
const child = createSpan("step.evaluate", parent.spanId);
// ... do work ...
closeSpan(child);
closeSpan(parent);