Workflows

Multi-step workflow engine with graph execution, checkpoints, human-in-the-loop pauses, and cron scheduling.

Overview

@consensus-tools/workflows lets you define multi-step workflows as node graphs, execute them with checkpoint persistence, pause for human approval, and schedule recurring runs via cron. It builds on core's storage and job engine.

Installation

pnpm add @consensus-tools/workflows

Quick start

import { WorkflowRunner } from "@consensus-tools/workflows";
import { createStorage } from "@consensus-tools/core";

const storage = await createStorage(config);
const runner = new WorkflowRunner(storage);

const workflow = await runner.createWorkflow("My Review", {
  nodes: [
    { id: "trigger", type: "trigger", label: "Start", config: { source: "manual" } },
    { id: "guard", type: "guard", label: "Code Guard", config: { guardType: "code_merge" } },
    { id: "approve", type: "hitl", label: "Human Approval", config: { channel: "slack" } },
    { id: "merge", type: "action", label: "Merge", config: { action: "github.merge_pr" } },
  ],
});

const run = await runner.run(workflow.id);
// run.status => "completed" | "waiting" | "failed"

// Resume after human approval
const resumed = await runner.resume(workflow.id, run.runId, "approved", "alice");

API reference

WorkflowRunner

Create, run, and resume workflows with checkpoint persistence.

class WorkflowRunner {
  constructor(storage: IStorage);
  createWorkflow(name: string, definition: WorkflowDefinition): Promise<Workflow>;
  run(workflowId: string): Promise<WorkflowRun>;
  resume(workflowId: string, runId: string, decision: string, actor: string): Promise<WorkflowRun>;
  registerTemplate(template: WorkflowTemplate): void;
}

NodeExecutor

Executes individual nodes in a workflow graph.

class NodeExecutor {
  constructor(deps: NodeExecutorDeps);
  execute(node: WorkflowNode, context: object, ids: NodeExecIds): Promise<NodeOutput>;
}

Supported node types: trigger, agent, group (parallel), guard, hitl, action.

validateWorkflowDefinition()

Validates a workflow definition before execution.

function validateWorkflowDefinition(
  definition: WorkflowDefinition
): { valid: boolean; errors?: string[] };

CronScheduler

Storage-backed cron scheduling with deduplication.

class CronScheduler {
  constructor(storage: IStorage, onTick: (workflowId: string) => Promise<void>);
  register(workflowId: string, cronExpr: string): Promise<void>;
  unregister(workflowId: string): Promise<void>;
  list(): Promise<CronEntry[]>;
  start(): void;
  stop(): void;
}

shouldRunNow()

Check if a 5-field cron expression matches the current time.

function shouldRunNow(cronExpr: string): boolean;

Template helpers

function listTemplates(): TemplateDefinition[];
function getTemplateById(id: string): TemplateDefinition | undefined;

Examples

Using built-in templates

import {
  WorkflowRunner,
  prMergeGuardTemplate,
  listTemplates,
  getTemplateById,
} from "@consensus-tools/workflows";

const runner = new WorkflowRunner(storage);
runner.registerTemplate(prMergeGuardTemplate);

const templates = listTemplates();
// => [{ id: "template-github-pr", name: "Template 1 - GitHub PR Merge Guard", ... }, ...]

const tmpl = getTemplateById("template-linear-tasks");

Three built-in templates are included:

TemplateIDDescription
PR Merge Guardtemplate-github-prGitHub PR -> parallel agent review -> guard -> human approval -> merge
Linear Task Decompositiontemplate-linear-tasksLinear task -> agent decomposition -> guard -> human approval -> create subtasks
Cron Auto-Assigntemplate-linear-assignCron -> fetch unassigned subtasks -> parallel review -> guard -> assign

Cron scheduling

import { CronScheduler, shouldRunNow } from "@consensus-tools/workflows";

const cron = new CronScheduler(storage, async (workflowId) => {
  await runner.run(workflowId);
});

await cron.register(workflow.id, "*/30 * * * *"); // every 30 minutes
cron.start();

// Check manually
shouldRunNow("0 9 * * 1"); // true if it's 9:00 on Monday

await cron.list();
await cron.unregister(workflow.id);
cron.stop();

Node execution with validation

import { NodeExecutor, validateWorkflowDefinition } from "@consensus-tools/workflows";

const validation = validateWorkflowDefinition(definition);
if (!validation.valid) console.error(validation.errors);

const executor = new NodeExecutor(deps);
const output = await executor.execute(node, context, { boardId, runId, workflowId });

Validate before executing

Always call validateWorkflowDefinition() before passing a definition to WorkflowRunner or NodeExecutor. Invalid graphs may produce unexpected checkpoint states.

  • core -- Provides storage backends and JobEngine used by workflows
  • policies -- Resolution strategies used during workflow job resolution