Workflow Fragment

Run long-lived workflows with durable steps, events, and retries.

The Workflows Fragment lets you define long-running processes with durable steps, retries, and waits. It stores workflow state in your database and exposes HTTP endpoints to create, control, and observe workflow instances.

Install

npm install @fragno-dev/workflows @fragno-dev/db

Define a Workflow

lib/workflows.ts
import { defineWorkflow, type WorkflowEvent, type WorkflowStep } from "@fragno-dev/workflows";

type ApprovalParams = {
  requestId: string;
  amount: number;
};

type ApprovalEvent = { approved: boolean };

type FulfillmentEvent = { confirmationId: string };

export const ApprovalWorkflow = defineWorkflow(
  { name: "approval-workflow" },
  async (event: WorkflowEvent<ApprovalParams>, step: WorkflowStep) => {
    const approval = await step.waitForEvent<ApprovalEvent>("approval", {
      type: "approval",
      timeout: "15 min",
    });

    await step.sleep("cooldown", "2 s");

    const fulfillment = await step.waitForEvent<FulfillmentEvent>("fulfillment", {
      type: "fulfillment",
      timeout: "15 min",
    });

    return { request: event.payload, approval, fulfillment };
  },
);

export const workflows = {
  approval: ApprovalWorkflow,
} as const;

Step-scoped mutations

Use step-scoped mutations to register database work that should commit with the step record. These mutations run after the step callback returns and are skipped on replay, so they cannot influence the step return value.

Step-scoped mutations commit in the same transaction that persists the step boundary. Service calls run before buffered mutations and step state updates. Unique constraint violations during step commit fail the step as non-retryable.

await step.do("persist-user", async (tx) => {
  const profile = await buildUserProfile();

  tx.serviceCalls(() => [usersService.createUser(profile), auditService.logUserCreate(profile)]);

  return profile.id;
});

If you need reads that affect the step output, run an explicit handler transaction inside the step body instead of relying on the step-scoped buffer.

Schema validation + output typing

If you provide a Standard Schema, params are validated on create/createBatch. If you provide an outputSchema, the workflow output is typed end-to-end.

import { z } from "zod";

const paramsSchema = z.object({ requestId: z.string(), amount: z.number() });
const outputSchema = z.object({ confirmationId: z.string() });

export const ApprovalWorkflow = defineWorkflow(
  { name: "approval-workflow", schema: paramsSchema, outputSchema },
  async (event, step) => {
    // ...
    return { confirmationId: "conf_123" };
  },
);

Create the Fragment Server

lib/workflows-fragment.ts
import { defaultFragnoRuntime, instantiate } from "@fragno-dev/core";
import { type DatabaseAdapter } from "@fragno-dev/db";
import { createDurableHooksProcessor } from "@fragno-dev/db/dispatchers/node";
import { workflowsFragmentDefinition, workflowsRoutesFactory } from "@fragno-dev/workflows";
import { workflows } from "./workflows";

export function createWorkflowsFragmentServer(adapter: DatabaseAdapter<any>) {
  const runtime = defaultFragnoRuntime;

  const config = { workflows, runtime };
  const fragment = instantiate(workflowsFragmentDefinition)
    .withConfig(config)
    .withRoutes([workflowsRoutesFactory])
    .withOptions({ databaseAdapter: adapter })
    .build();

  const dispatcher = createDurableHooksProcessor([fragment], {
    pollIntervalMs: 2000,
  });

  return { fragment, dispatcher };
}

Runtime Injection

Workflows require a FragnoRuntime for time and randomness. Use defaultFragnoRuntime for production, or inject a deterministic runtime for tests and model checking.

Testing Workflows

Use the test harness to drive workflow ticks and control time:

lib/workflows.test.ts
import { createWorkflowsTestHarness, createWorkflowsTestRuntime } from "@fragno-dev/workflows/test";
import { workflows } from "./workflows";

const runtime = createWorkflowsTestRuntime({ startAt: 0, seed: 42 });
const harness = await createWorkflowsTestHarness({
  workflows,
  adapter: { type: "drizzle-pglite" },
  runtime,
});

const instanceId = await harness.createInstance("approval", {
  params: { requestId: "req_1", amount: 125 },
});

await harness.runUntilIdle();
await harness.sendEvent("approval", instanceId, {
  type: "approval",
  payload: { approved: true },
});

harness.clock.advanceBy("2 s");
await harness.runUntilIdle();

const status = await harness.getStatus("approval", instanceId);

Scenario DSL

The scenario DSL lets you describe multi-step flows using a deterministic clock with minimal setup. It uses the workflows test harness under the hood and cleans up automatically.

lib/workflows.scenario.test.ts
import { defineScenario, runScenario, steps } from "@fragno-dev/workflows/scenario";
import { workflows } from "./workflows";

const scenario = defineScenario({
  name: "approval-flow",
  workflows,
  steps: [
    steps.initializeAndRunUntilIdle({
      workflow: "approval",
      id: "approval-1",
      params: { requestId: "req_1", amount: 125 },
    }),
    steps.eventAndRunUntilIdle({
      workflow: "approval",
      instanceId: "approval-1",
      event: { type: "approval", payload: { approved: true } },
    }),
  ],
});

await runScenario(scenario);

For a full walkthrough, see the Scenario DSL guide.

External Scheduling

For external schedulers, run a durable hooks dispatcher (Node or Cloudflare DO) so hooks are processed when work is enqueued.

Next Steps