Durable Workflow Fragments

Build fragments that run durable workflows via the Workflows service.

You want a fragment with long-running steps, retries, and waits. You do not want to ship a worker loop in every fragment. You already have the Workflows fragment.

Why this is cool: you get durable steps, timers, and retries, but your fragment keeps its own API. Your users do not have to learn the workflows HTTP surface unless they want to. You get the workflow engine without making it the product.

The trick is composition: your fragment defines workflows and consumes the workflows service. The app wires everything together.

Remember: fragment authors do not instantiate the Workflows fragment. This is why usesService("workflows") exists.

The Pattern

  1. Define workflow definitions inside your fragment package.
  2. Export a requiredWorkflows registry.
  3. Declare usesService("workflows") in your fragment definition.
  4. Call the service from your routes.
  5. Use step.do + tx.mutate for atomic writes.
  6. Test with the Scenario DSL.

Step 1: Define a Workflow

The workflow name is a string you will pass to createInstance, sendEvent, and getInstanceStatus. Keep it stable and reuse it.

src/workflows.ts
import { defineWorkflow } from "@fragno-dev/workflows";
import { z } from "zod";
import { myFragmentSchema } from "./schema";

const sessionParamsSchema = z.object({
  sessionId: z.string(),
  agentName: z.string(),
});

export const SessionWorkflow = defineWorkflow(
  { name: "session-workflow", schema: sessionParamsSchema },
  async (event, step) => {
    // Wait for an external signal before continuing.
    const message = await step.waitForEvent("user-message", {
      type: "user_message",
      timeout: "7 days",
    });

    await step.do("persist-message", (tx) => {
      // Writes inside step.do are persisted with the step boundary.
      tx.mutate((ctx) => {
        ctx.forSchema(myFragmentSchema).create("event", {
          sessionId: event.payload.sessionId,
          payload: message.payload ?? null,
        });
      });
    });

    return { ok: true };
  },
);

Step 2: Export the Workflows Your Fragment Requires

This is how the app knows which workflows to register.

src/index.ts
import { SessionWorkflow } from "./workflows";

export const requiredWorkflows = {
  // Exposed so the app can register this workflow in the Workflows fragment.
  session: SessionWorkflow,
} as const;

Step 3: Declare the Workflows Service Dependency

Your fragment consumes the Workflows fragment via usesService. You only need the methods you plan to call.

src/fragment/definition.ts
import { defineFragment, type InstantiatedFragmentFromDefinition } from "@fragno-dev/core";
import { withDatabase } from "@fragno-dev/db";
import type { workflowsFragmentDefinition } from "@fragno-dev/workflows";

import { myFragmentSchema } from "./schema";

type WorkflowsService = InstantiatedFragmentFromDefinition<
  typeof workflowsFragmentDefinition
>["services"];

export type WorkflowServiceApi = Pick<
  WorkflowsService,
  "createInstance" | "sendEvent" | "getInstanceStatus" | "listHistory"
>;

export type MyFragmentConfig = {
  /* fragment config */
};

export const myFragmentDefinition = defineFragment("my-fragment")
  .extend(withDatabase(myFragmentSchema))
  // Declare the dependency so routes can call workflows.*.
  .usesService<"workflows", WorkflowServiceApi>("workflows")
  .build();

Expose a server-side factory that accepts services:

src/fragment/index.ts
import type { FragnoPublicConfigWithDatabase } from "@fragno-dev/db";
import { instantiate } from "@fragno-dev/core";
import type { MyFragmentConfig, WorkflowServiceApi } from "./definition";
import { myFragmentDefinition } from "./definition";
import { myRoutesFactory } from "./routes";

export function createMyFragment(
  config: MyFragmentConfig,
  options: FragnoPublicConfigWithDatabase,
  services: { workflows: WorkflowServiceApi },
) {
  return (
    instantiate(myFragmentDefinition)
      .withConfig(config)
      .withRoutes([myRoutesFactory])
      .withOptions(options)
      // App supplies the workflows service here.
      .withServices(services)
      .build()
  );
}

Step 4: Call the Workflows Service from Routes

This is where your fragment turns domain events into durable workflow progress.

src/fragment/routes.ts
import { defineRoutes } from "@fragno-dev/core";
import { createId } from "@fragno-dev/db/id";
import { z } from "zod";
import { myFragmentDefinition } from "./definition";
import { myFragmentSchema } from "./schema";

const createSessionSchema = z.object({
  agentName: z.string(),
});

export const myRoutesFactory = defineRoutes(myFragmentDefinition).create(
  ({ defineRoute, serviceDeps }) => [
    defineRoute({
      method: "POST",
      path: "/sessions",
      inputSchema: createSessionSchema,
      errorCodes: ["WORKFLOWS_REQUIRED"],
      handler: async function ({ input }, { json, error }) {
        const values = await input.valid();
        const workflows = serviceDeps.workflows;
        if (!workflows) {
          return error({ message: "Workflows service missing.", code: "WORKFLOWS_REQUIRED" });
        }

        const sessionId = createId();
        const created = await this.handlerTx()
          // Create the workflow instance and the session record in one transaction.
          .withServiceCalls(() => [
            workflows.createInstance("session-workflow", {
              id: sessionId,
              params: { sessionId, agentName: values.agentName },
            }),
          ])
          .mutate(({ forSchema }) => {
            const uow = forSchema(myFragmentSchema);
            uow.create("session", {
              id: sessionId,
              agent: values.agentName,
              status: "active",
            });
          })
          .transform(({ serviceResult }) => serviceResult[0])
          .execute();

        return json({ id: sessionId, status: created.details.status });
      },
    }),
  ],
);

Step 5: The “Free Atomicity” Trick

Inside a workflow step you can call tx.mutate to write to your fragment’s schema. That mutation is committed in the same transaction as the workflow step record. So you get two benefits:

  • Atomicity: your fragment write and the workflow step commit together or not at all.
  • Amortization: both writes are batched into a single commit.

This is how you write to your own schema while still letting workflows own the execution model.

src/workflows.ts
await step.do("persist-message", (tx) => {
  tx.mutate((ctx) => {
    ctx.forSchema(myFragmentSchema).create("event", {
      sessionId: event.payload.sessionId,
      payload: message.payload ?? null,
    });
  });

  return { ok: true };
});

Step 6: Test with the Scenario DSL

If your workflow writes to your fragment schema, include a fragment instance in the harness so the tables exist.

src/workflows.scenario.test.ts
import { defineFragment, instantiate } from "@fragno-dev/core";
import { withDatabase } from "@fragno-dev/db";
import { createScenarioSteps, defineScenario, runScenario } from "@fragno-dev/workflows/scenario";

import { requiredWorkflows } from "./index";
import { myFragmentSchema } from "./schema";

const dbFragment = defineFragment("my-fragment")
  // Include your schema so scenario runs have tables to write into.
  .extend(withDatabase(myFragmentSchema))
  .build();

const steps = createScenarioSteps<typeof requiredWorkflows, {}>();

const scenario = defineScenario({
  name: "session-flow",
  workflows: requiredWorkflows,
  harness: {
    // Inject a fragment instance into the harness so workflow steps can mutate schema.
    configureBuilder: (builder) => builder.withFragment("my", instantiate(dbFragment)),
  },
  steps: [
    steps.initializeAndRunUntilIdle({
      workflow: "session",
      id: "sess-1",
      params: { sessionId: "sess-1", agentName: "default" },
    }),
    steps.eventAndRunUntilIdle({
      workflow: "session",
      instanceId: "sess-1",
      event: { type: "user_message", payload: { text: "hi" } },
    }),
  ],
});

await runScenario(scenario);

Integration Note (For App Authors)

This is how your users will integrate it in practice. They instantiate the Workflows fragment, register all workflows from all fragments, pass the workflows service into each fragment, and run the dispatcher.

src/server/init.ts
import { defaultFragnoRuntime } from "@fragno-dev/core";
import { createDurableHooksProcessor } from "@fragno-dev/db/dispatchers/node";
import { migrate } from "@fragno-dev/db";
import { createWorkflowsFragment } from "@fragno-dev/workflows";

import { createMyFragment } from "./fragment";
import { requiredWorkflows } from "./index";

export async function initServer(adapter) {
  const workflowsFragment = createWorkflowsFragment(
    { workflows: { ...requiredWorkflows }, runtime: defaultFragnoRuntime },
    { databaseAdapter: adapter },
  );

  const myFragment = createMyFragment(
    {
      /* fragment config */
    },
    { databaseAdapter: adapter },
    // Wire the workflows service into your fragment.
    { workflows: workflowsFragment.services },
  );

  await migrate(workflowsFragment);
  await migrate(myFragment);

  const dispatcher = createDurableHooksProcessor([workflowsFragment, myFragment], {
    pollIntervalMs: 250,
  });
  // Without a dispatcher, workflows will not advance.
  dispatcher.startPolling();

  return { workflowsFragment, myFragment, dispatcher };
}

If workflows look stuck, check the dispatcher first. It is usually the missing piece.