Workflow Fragment
Run long-lived workflows with durable steps, events, and retries.
The Workflows Fragment lets you define long-running processes with durable steps, retries, and waits. It stores workflow state in your database and exposes HTTP endpoints to create, control, and observe workflow instances.
Install
npm install @fragno-dev/workflows @fragno-dev/dbDefine a Workflow
import { defineWorkflow, type WorkflowEvent, type WorkflowStep } from "@fragno-dev/workflows";
type ApprovalParams = {
requestId: string;
amount: number;
};
type ApprovalEvent = { approved: boolean };
type FulfillmentEvent = { confirmationId: string };
export const ApprovalWorkflow = defineWorkflow(
{ name: "approval-workflow" },
async (event: WorkflowEvent<ApprovalParams>, step: WorkflowStep) => {
const approval = await step.waitForEvent<ApprovalEvent>("approval", {
type: "approval",
timeout: "15 min",
});
await step.sleep("cooldown", "2 s");
const fulfillment = await step.waitForEvent<FulfillmentEvent>("fulfillment", {
type: "fulfillment",
timeout: "15 min",
});
return { request: event.payload, approval, fulfillment };
},
);
export const workflows = {
approval: ApprovalWorkflow,
} as const;Step-scoped mutations
Use step-scoped mutations to register database work that should commit with the step record. These mutations run after the step callback returns and are skipped on replay, so they cannot influence the step return value.
Step-scoped mutations commit in the same transaction that persists the step boundary. Service calls run before buffered mutations and step state updates. Unique constraint violations during step commit fail the step as non-retryable.
await step.do("persist-user", async (tx) => {
const profile = await buildUserProfile();
tx.serviceCalls(() => [usersService.createUser(profile), auditService.logUserCreate(profile)]);
return profile.id;
});If you need reads that affect the step output, run an explicit handler transaction inside the step body instead of relying on the step-scoped buffer.
Schema validation + output typing
If you provide a Standard Schema, params are validated on create/createBatch. If you provide an
outputSchema, the workflow output is typed end-to-end.
import { z } from "zod";
const paramsSchema = z.object({ requestId: z.string(), amount: z.number() });
const outputSchema = z.object({ confirmationId: z.string() });
export const ApprovalWorkflow = defineWorkflow(
{ name: "approval-workflow", schema: paramsSchema, outputSchema },
async (event, step) => {
// ...
return { confirmationId: "conf_123" };
},
);Create the Fragment Server
import { defaultFragnoRuntime, instantiate } from "@fragno-dev/core";
import { type DatabaseAdapter } from "@fragno-dev/db";
import { createDurableHooksProcessor } from "@fragno-dev/db/dispatchers/node";
import { workflowsFragmentDefinition, workflowsRoutesFactory } from "@fragno-dev/workflows";
import { workflows } from "./workflows";
export function createWorkflowsFragmentServer(adapter: DatabaseAdapter<any>) {
const runtime = defaultFragnoRuntime;
const config = { workflows, runtime };
const fragment = instantiate(workflowsFragmentDefinition)
.withConfig(config)
.withRoutes([workflowsRoutesFactory])
.withOptions({ databaseAdapter: adapter })
.build();
const dispatcher = createDurableHooksProcessor([fragment], {
pollIntervalMs: 2000,
});
return { fragment, dispatcher };
}Runtime Injection
Workflows require a FragnoRuntime for time and randomness. Use defaultFragnoRuntime for
production, or inject a deterministic runtime for tests and model checking.
Testing Workflows
Use the test harness to drive workflow ticks and control time:
import { createWorkflowsTestHarness, createWorkflowsTestRuntime } from "@fragno-dev/workflows/test";
import { workflows } from "./workflows";
const runtime = createWorkflowsTestRuntime({ startAt: 0, seed: 42 });
const harness = await createWorkflowsTestHarness({
workflows,
adapter: { type: "drizzle-pglite" },
runtime,
});
const instanceId = await harness.createInstance("approval", {
params: { requestId: "req_1", amount: 125 },
});
await harness.runUntilIdle();
await harness.sendEvent("approval", instanceId, {
type: "approval",
payload: { approved: true },
});
harness.clock.advanceBy("2 s");
await harness.runUntilIdle();
const status = await harness.getStatus("approval", instanceId);Scenario DSL
The scenario DSL lets you describe multi-step flows using a deterministic clock with minimal setup. It uses the workflows test harness under the hood and cleans up automatically.
import { defineScenario, runScenario, steps } from "@fragno-dev/workflows/scenario";
import { workflows } from "./workflows";
const scenario = defineScenario({
name: "approval-flow",
workflows,
steps: [
steps.initializeAndRunUntilIdle({
workflow: "approval",
id: "approval-1",
params: { requestId: "req_1", amount: 125 },
}),
steps.eventAndRunUntilIdle({
workflow: "approval",
instanceId: "approval-1",
event: { type: "approval", payload: { approved: true } },
}),
],
});
await runScenario(scenario);For a full walkthrough, see the Scenario DSL guide.
External Scheduling
For external schedulers, run a durable hooks dispatcher (Node or Cloudflare DO) so hooks are processed when work is enqueued.
Next Steps
- Explore the HTTP surface in the API routes reference.
- Manage workflows with the CLI.