@consensus-tools/workflows lets you define multi-step workflows as node graphs, execute them with checkpoint persistence, pause for human approval, and schedule recurring runs via cron. It builds on core's storage and job engine.
pnpm add @consensus-tools/workflows
import { WorkflowRunner } from "@consensus-tools/workflows";
import { createStorage } from "@consensus-tools/core";
const storage = await createStorage(config);
const runner = new WorkflowRunner(storage);
const workflow = await runner.createWorkflow("My Review", {
nodes: [
{ id: "trigger", type: "trigger", label: "Start", config: { source: "manual" } },
{ id: "guard", type: "guard", label: "Code Guard", config: { guardType: "code_merge" } },
{ id: "approve", type: "hitl", label: "Human Approval", config: { channel: "slack" } },
{ id: "merge", type: "action", label: "Merge", config: { action: "github.merge_pr" } },
],
});
const run = await runner.run(workflow.id);
// run.status => "completed" | "waiting" | "failed"
// Resume after human approval
const resumed = await runner.resume(workflow.id, run.runId, "approved", "alice");
Create, run, and resume workflows with checkpoint persistence.
class WorkflowRunner {
constructor(storage: IStorage);
createWorkflow(name: string, definition: WorkflowDefinition): Promise<Workflow>;
run(workflowId: string): Promise<WorkflowRun>;
resume(workflowId: string, runId: string, decision: string, actor: string): Promise<WorkflowRun>;
registerTemplate(template: WorkflowTemplate): void;
}
Executes individual nodes in a workflow graph.
class NodeExecutor {
constructor(deps: NodeExecutorDeps);
execute(node: WorkflowNode, context: object, ids: NodeExecIds): Promise<NodeOutput>;
}
Supported node types: trigger, agent, group (parallel), guard, hitl, action.
Validates a workflow definition before execution.
function validateWorkflowDefinition(
definition: WorkflowDefinition
): { valid: boolean; errors?: string[] };
Storage-backed cron scheduling with deduplication.
class CronScheduler {
constructor(storage: IStorage, onTick: (workflowId: string) => Promise<void>);
register(workflowId: string, cronExpr: string): Promise<void>;
unregister(workflowId: string): Promise<void>;
list(): Promise<CronEntry[]>;
start(): void;
stop(): void;
}
Check if a 5-field cron expression matches the current time.
function shouldRunNow(cronExpr: string): boolean;
function listTemplates(): TemplateDefinition[];
function getTemplateById(id: string): TemplateDefinition | undefined;
import {
WorkflowRunner,
prMergeGuardTemplate,
listTemplates,
getTemplateById,
} from "@consensus-tools/workflows";
const runner = new WorkflowRunner(storage);
runner.registerTemplate(prMergeGuardTemplate);
const templates = listTemplates();
// => [{ id: "template-github-pr", name: "Template 1 - GitHub PR Merge Guard", ... }, ...]
const tmpl = getTemplateById("template-linear-tasks");
Three built-in templates are included:
| Template | ID | Description |
|---|
| PR Merge Guard | template-github-pr | GitHub PR -> parallel agent review -> guard -> human approval -> merge |
| Linear Task Decomposition | template-linear-tasks | Linear task -> agent decomposition -> guard -> human approval -> create subtasks |
| Cron Auto-Assign | template-linear-assign | Cron -> fetch unassigned subtasks -> parallel review -> guard -> assign |
import { CronScheduler, shouldRunNow } from "@consensus-tools/workflows";
const cron = new CronScheduler(storage, async (workflowId) => {
await runner.run(workflowId);
});
await cron.register(workflow.id, "*/30 * * * *"); // every 30 minutes
cron.start();
// Check manually
shouldRunNow("0 9 * * 1"); // true if it's 9:00 on Monday
await cron.list();
await cron.unregister(workflow.id);
cron.stop();
import { NodeExecutor, validateWorkflowDefinition } from "@consensus-tools/workflows";
const validation = validateWorkflowDefinition(definition);
if (!validation.valid) console.error(validation.errors);
const executor = new NodeExecutor(deps);
const output = await executor.execute(node, context, { boardId, runId, workflowId });
⚠Validate before executing
Always call validateWorkflowDefinition() before passing a definition to WorkflowRunner or NodeExecutor. Invalid graphs may produce unexpected checkpoint states.
- core -- Provides storage backends and JobEngine used by workflows
- policies -- Resolution strategies used during workflow job resolution