Workflows

Multi-step workflow engine with graph execution, checkpoints, human-in-the-loop pauses, and cron scheduling.

Overview

@consensus-tools/workflows defines multi-step workflows with node-graph execution, checkpoint persistence, human-in-the-loop pauses, and cron scheduling.

Installation

pnpm add @consensus-tools/workflows

Quick start

import { WorkflowRunner } from "@consensus-tools/workflows";
import { MemoryStorage } from "@consensus-tools/storage";

const storage = new MemoryStorage();
await storage.init();
const runner = new WorkflowRunner(storage);

// Create and run a workflow
const workflow = await runner.createWorkflow("My Review", {
  nodes: [
    { id: "trigger", type: "trigger", label: "Start", config: { source: "manual" } },
    { id: "guard", type: "guard", label: "Code Guard", config: { guardType: "code_merge" } },
    { id: "approve", type: "hitl", label: "Human Approval", config: { channel: "slack" } },
    { id: "merge", type: "action", label: "Merge", config: { action: "github.merge_pr" } },
  ],
});

const run = await runner.run(workflow.id);
// run.status => "pending" | "running" | "completed" | "failed" | "cancelled" | "waiting"

// Resume after human approval
const resumed = await runner.resume(workflow.id, run.runId, "approved", "alice");

API reference

WorkflowRunner

The primary interface for creating, running, and resuming workflows.

import { WorkflowRunner } from "@consensus-tools/workflows";

const runner = new WorkflowRunner(storage);

// Create
const workflow = await runner.createWorkflow("Name", definition);

// Run
const run = await runner.run(workflow.id);

// Resume (after HITL pause)
const resumed = await runner.resume(workflow.id, run.runId, "approved", "alice");

// List runs
const runs = await runner.listRuns(workflow.id);

Built-in templates

Three pre-built workflow templates are included:

TemplateIDDescription
PR Merge Guardtemplate-github-prGitHub PR → parallel agent review → guard → human approval → merge
Linear Task Decompositiontemplate-linear-tasksLinear task → agent decomposition → guard → human approval → create subtasks
Cron Auto-Assigntemplate-linear-assignCron → fetch unassigned subtasks → parallel review → guard → assign
import { WorkflowRunner, prMergeGuardTemplate, listTemplates, getTemplateById } from "@consensus-tools/workflows";

const runner = new WorkflowRunner(storage);
runner.registerTemplate(prMergeGuardTemplate);

const templates = listTemplates();
const tmpl = getTemplateById("template-linear-tasks");

Node types

Node types: trigger, agent, group (parallel), guard, hitl, action.

import { NodeExecutor, validateWorkflowDefinition } from "@consensus-tools/workflows";

const validation = validateWorkflowDefinition(definition);
if (!validation.valid) console.error(validation.errors);

const executor = new NodeExecutor(deps); // deps: { engine, guardEngine, storage, ... }
const output = await executor.execute(node, context, { boardId, runId, workflowId });

CronScheduler

import { CronScheduler, shouldRunNow } from "@consensus-tools/workflows";

const cron = new CronScheduler(storage, async (workflowId) => {
  await runner.run(workflowId);
});

await cron.register(workflow.id, "*/30 * * * *"); // every 30 minutes
cron.start();

// Check manually
shouldRunNow("0 9 * * 1"); // true if it's 9:00 on Monday

await cron.list();
await cron.unregister(workflow.id);
cron.stop();

HITL pause/resume

HITL nodes pause execution at waiting status. Call runner.resume(workflowId, runId, decision, userId) to continue. No automatic retry on failed nodes — explicit resume required.

Exports reference

ExportKindDescription
WorkflowRunnerClassCreate, run, resume workflows with checkpoint persistence
NodeExecutorClassExecute individual nodes in a workflow graph
validateWorkflowDefinitionFunctionValidate a workflow definition before execution
CronSchedulerClassIStorage-backed cron scheduling with dedup
shouldRunNow(expr)FunctionCheck if a 5-field cron expression matches now
prMergeGuardTemplateConstTemplate: GitHub PR merge guard
linearTaskDecompTemplateConstTemplate: Linear task decomposition
cronAutoAssignTemplateConstTemplate: Cron auto-assign subtasks
listTemplates() / getTemplateById(id)FunctionDiscover built-in templates
WorkflowTemplateTypeTemplate definition with id, name, steps, and handler
WorkflowStepHandlerTypeHandler function for template-based workflow steps
WorkflowContextTypeContext passed to step handlers
WorkflowStepResultTypeStep result with status (completed / failed / waiting)
NodeExecutorDepsTypeDependencies for NodeExecutor constructor
WorkflowNodeTypeSingle node in a workflow graph definition
WorkflowDefinitionTypeFull workflow definition with nodes array
NodeOutputTypeOutput from a single node execution
TemplateDefinitionTypeMetadata for a built-in template
  • core -- JobEngine and storage that workflows depends on
  • notifications -- HITL dispatch via Slack, Teams, Discord
  • integrations -- GitHub and Linear adapters used by templates