Workflow Fragment
Run long-lived workflows with durable steps, events, and retries.
The Workflows Fragment lets you define long-running processes with durable steps, retries, and waits. It stores workflow state in your database and exposes HTTP endpoints to create, control, and observe workflow instances.
Install
npm install @fragno-dev/fragment-workflows @fragno-dev/workflows-dispatcher-nodeDefine a Workflow
import {
WorkflowEntrypoint,
type WorkflowEvent,
type WorkflowStep,
} from "@fragno-dev/fragment-workflows";
type ApprovalParams = {
requestId: string;
amount: number;
};
type ApprovalEvent = { approved: boolean };
type FulfillmentEvent = { confirmationId: string };
export class ApprovalWorkflow extends WorkflowEntrypoint<unknown, ApprovalParams> {
async run(event: WorkflowEvent<ApprovalParams>, step: WorkflowStep) {
const approval = await step.waitForEvent<ApprovalEvent>("approval", {
type: "approval",
timeout: "15 min",
});
await step.sleep("cooldown", "2 s");
const fulfillment = await step.waitForEvent<FulfillmentEvent>("fulfillment", {
type: "fulfillment",
timeout: "15 min",
});
return { request: event.payload, approval, fulfillment };
}
}
export const workflows = {
approval: { name: "approval-workflow", workflow: ApprovalWorkflow },
} as const;Create the Fragment Server
import { defaultFragnoRuntime, instantiate } from "@fragno-dev/core";
import type { DatabaseAdapter } from "@fragno-dev/db";
import {
createWorkflowsRunner,
workflowsFragmentDefinition,
workflowsRoutesFactory,
} from "@fragno-dev/fragment-workflows";
import { createInProcessDispatcher } from "@fragno-dev/workflows-dispatcher-node";
import { workflows } from "./workflows";
export function createWorkflowsFragmentServer(adapter: DatabaseAdapter<any>) {
const runtime = defaultFragnoRuntime;
let runner: ReturnType<typeof createWorkflowsRunner> | null = null;
const dispatcher = createInProcessDispatcher({
wake: () => {
if (!runner) {
return;
}
void runner.tick({ maxInstances: 5, maxSteps: 50 });
},
pollIntervalMs: 2000,
});
const config = { workflows, dispatcher, runtime, enableRunnerTick: true };
const fragment = instantiate(workflowsFragmentDefinition)
.withConfig(config)
.withRoutes([workflowsRoutesFactory])
.withOptions({ databaseAdapter: adapter })
.build();
runner = createWorkflowsRunner({ fragment, workflows, runtime });
config.runner = runner;
return { fragment, dispatcher };
}Runtime Injection
Workflows require a FragnoRuntime for time and randomness. Use defaultFragnoRuntime for
production, or inject a deterministic runtime for tests and model checking.
Testing Workflows
Use the test harness to drive the runner and control time:
import {
createWorkflowsTestHarness,
createWorkflowsTestRuntime,
} from "@fragno-dev/fragment-workflows/test";
import { workflows } from "./workflows";
const runtime = createWorkflowsTestRuntime({ startAt: 0, seed: 42 });
const harness = await createWorkflowsTestHarness({
workflows,
adapter: { type: "drizzle-pglite" },
runtime,
});
const instanceId = await harness.createInstance("approval", {
params: { requestId: "req_1", amount: 125 },
});
await harness.runUntilIdle();
await harness.sendEvent("approval", instanceId, {
type: "approval",
payload: { approved: true },
});
harness.clock.advanceBy("2 s");
await harness.runUntilIdle();
const status = await harness.getStatus("approval", instanceId);HTTP Tick Integration
If you want an external scheduler to drive execution, keep enableRunnerTick: true and send
POST /_runner/tick to your fragment. A durable hook handler or cron can trigger it:
await fetch("https://your-app.example.com/api/workflows/_runner/tick", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ maxInstances: 10, maxSteps: 100 }),
});Next Steps
- Explore the HTTP surface in the API routes reference.
- Manage workflows with the CLI.