Workflow Fragment

Run long-lived workflows with durable steps, events, and retries.

The Workflows Fragment lets you define long-running processes with durable steps, retries, and waits. It stores workflow state in your database and exposes HTTP endpoints to create, control, and observe workflow instances.

Install

npm install @fragno-dev/fragment-workflows @fragno-dev/workflows-dispatcher-node

Define a Workflow

lib/workflows.ts
import {
  WorkflowEntrypoint,
  type WorkflowEvent,
  type WorkflowStep,
} from "@fragno-dev/fragment-workflows";

type ApprovalParams = {
  requestId: string;
  amount: number;
};

type ApprovalEvent = { approved: boolean };

type FulfillmentEvent = { confirmationId: string };

export class ApprovalWorkflow extends WorkflowEntrypoint<unknown, ApprovalParams> {
  async run(event: WorkflowEvent<ApprovalParams>, step: WorkflowStep) {
    const approval = await step.waitForEvent<ApprovalEvent>("approval", {
      type: "approval",
      timeout: "15 min",
    });

    await step.sleep("cooldown", "2 s");

    const fulfillment = await step.waitForEvent<FulfillmentEvent>("fulfillment", {
      type: "fulfillment",
      timeout: "15 min",
    });

    return { request: event.payload, approval, fulfillment };
  }
}

export const workflows = {
  approval: { name: "approval-workflow", workflow: ApprovalWorkflow },
} as const;

Create the Fragment Server

lib/workflows-fragment.ts
import { defaultFragnoRuntime, instantiate } from "@fragno-dev/core";
import type { DatabaseAdapter } from "@fragno-dev/db";
import {
  createWorkflowsRunner,
  workflowsFragmentDefinition,
  workflowsRoutesFactory,
} from "@fragno-dev/fragment-workflows";
import { createInProcessDispatcher } from "@fragno-dev/workflows-dispatcher-node";
import { workflows } from "./workflows";

export function createWorkflowsFragmentServer(adapter: DatabaseAdapter<any>) {
  const runtime = defaultFragnoRuntime;
  let runner: ReturnType<typeof createWorkflowsRunner> | null = null;
  const dispatcher = createInProcessDispatcher({
    wake: () => {
      if (!runner) {
        return;
      }
      void runner.tick({ maxInstances: 5, maxSteps: 50 });
    },
    pollIntervalMs: 2000,
  });

  const config = { workflows, dispatcher, runtime, enableRunnerTick: true };
  const fragment = instantiate(workflowsFragmentDefinition)
    .withConfig(config)
    .withRoutes([workflowsRoutesFactory])
    .withOptions({ databaseAdapter: adapter })
    .build();

  runner = createWorkflowsRunner({ fragment, workflows, runtime });
  config.runner = runner;

  return { fragment, dispatcher };
}

Runtime Injection

Workflows require a FragnoRuntime for time and randomness. Use defaultFragnoRuntime for production, or inject a deterministic runtime for tests and model checking.

Testing Workflows

Use the test harness to drive the runner and control time:

lib/workflows.test.ts
import {
  createWorkflowsTestHarness,
  createWorkflowsTestRuntime,
} from "@fragno-dev/fragment-workflows/test";
import { workflows } from "./workflows";

const runtime = createWorkflowsTestRuntime({ startAt: 0, seed: 42 });
const harness = await createWorkflowsTestHarness({
  workflows,
  adapter: { type: "drizzle-pglite" },
  runtime,
});

const instanceId = await harness.createInstance("approval", {
  params: { requestId: "req_1", amount: 125 },
});

await harness.runUntilIdle();
await harness.sendEvent("approval", instanceId, {
  type: "approval",
  payload: { approved: true },
});

harness.clock.advanceBy("2 s");
await harness.runUntilIdle();

const status = await harness.getStatus("approval", instanceId);

HTTP Tick Integration

If you want an external scheduler to drive execution, keep enableRunnerTick: true and send POST /_runner/tick to your fragment. A durable hook handler or cron can trigger it:

await fetch("https://your-app.example.com/api/workflows/_runner/tick", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ maxInstances: 10, maxSteps: 100 }),
});

Next Steps