Data Synchronization

Sync data between external systems using durable polling, batch processing, webhooks, and reconciliation patterns.

Data synchronization is one of the most common workflow use cases. Whether you are polling a CRM for updated records, pushing batches of data to a warehouse, or reacting to webhook notifications, Workflow DevKit gives you durable building blocks that survive crashes, retries, and cold starts.

This guide covers the core patterns you will need. Each pattern is self-contained, so you can jump to whichever one fits your use case.

Polling Pattern

The simplest sync strategy is to poll an external system at regular intervals using sleep() in a loop. The workflow stays alive between polls, so you never miss an interval even if your compute restarts.

workflows/crm-poll.ts
import { sleep } from "workflow";

declare function fetchUpdatedContacts(since: Date): Promise<Contact[]>; // @setup
declare function upsertContacts(contacts: Contact[]): Promise<void>; // @setup
declare type Contact = { id: string; name: string; updatedAt: string }; // @setup

export async function crmPollingSync() {
  "use workflow";

  let cursor = new Date();

  while (true) {
    const contacts = await fetchUpdatedContacts(cursor); 

    if (contacts.length > 0) {
      await upsertContacts(contacts);
      cursor = new Date();
    }

    await sleep("5 minutes"); 
  }
}

The step functions handle the actual I/O with full Node.js access:

workflows/crm-poll-steps.ts
declare const db: { contacts: { upsert(opts: any): Promise<void> } }; // @setup

async function fetchUpdatedContacts(since: Date) {
  "use step";

  const response = await fetch(
    `https://api.example-crm.com/contacts?updated_since=${since.toISOString()}`
  );

  if (!response.ok) {
    throw new Error(`CRM API error: ${response.status}`);
  }

  return response.json();
}

async function upsertContacts(contacts: Array<{ id: string; name: string; updatedAt: Date }>) {
  "use step";

  for (const contact of contacts) {
    await db.contacts.upsert({
      where: { externalId: contact.id },
      data: contact,
    });
  }
}

Because sleep() is durable, the workflow does not consume compute while waiting. The runtime suspends execution and resumes it when the interval elapses.

Batch Sync Pattern

When you need to sync many records at once, process them in parallel with Promise.all. Each record is handled by its own step, so a single failure does not block the rest.

workflows/batch-sync.ts
declare function fetchPendingRecords(): Promise<Record[]>; // @setup
declare function syncRecord(record: Record): Promise<void>; // @setup
declare type Record = { id: string; data: string }; // @setup

export async function batchSync() {
  "use workflow";

  const records = await fetchPendingRecords();

  await Promise.all( 
    records.map((record) => syncRecord(record)) 
  ); 
}
workflows/batch-sync-steps.ts
import { getStepMetadata } from "workflow";

async function fetchPendingRecords() {
  "use step";

  const response = await fetch("https://api.example.com/records?status=pending");
  return response.json();
}

async function syncRecord(record: { id: string; data: string }) {
  "use step";

  const { stepId } = getStepMetadata();

  await fetch(`https://api.destination.com/records/${record.id}`, {
    method: "PUT",
    headers: {
      "Content-Type": "application/json",
      "Idempotency-Key": stepId, 
    },
    body: JSON.stringify(record),
  });
}

Each step retries independently on failure. The stepId idempotency key prevents duplicate writes even if a step retries. See Idempotency for more on this pattern.

Webhook-Driven Sync

Instead of polling, you can react to changes in real time using createWebhook(). Register the webhook URL with the external system, and the workflow resumes whenever a notification arrives.

workflows/webhook-sync.ts
import { createWebhook } from "workflow";

declare function registerWebhook(url: string): Promise<void>; // @setup
declare function processChange(change: Change): Promise<void>; // @setup
declare type Change = { type: string; recordId: string; data: unknown }; // @setup

export async function webhookDrivenSync() {
  "use workflow";

  const webhook = createWebhook<Change>({
    respondWith: Response.json({ received: true }),
  });

  // Register the webhook URL with the external system
  await registerWebhook(webhook.url); 

  // Process incoming change notifications
  for await (const request of webhook) { 
    const change = await request.json();
    await processChange(change);
  }
}
workflows/webhook-sync-steps.ts
declare const db: { records: { upsert(opts: any): Promise<void> } }; // @setup

async function registerWebhook(url: string) {
  "use step";

  await fetch("https://api.example.com/webhooks", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ url, events: ["record.updated", "record.created"] }),
  });
}

async function processChange(change: { type: string; recordId: string; data: unknown }) {
  "use step";

  await db.records.upsert({
    where: { externalId: change.recordId },
    data: change.data,
  });
}

The for await loop keeps the workflow alive, processing each event as it arrives. The static respondWith immediately acknowledges every incoming request so the external system does not time out.

See Hooks & Webhooks for the full guide on creating webhooks, handling multiple events, and sending dynamic responses.

Reconciliation Pattern

When two systems can drift out of sync, periodically reconcile them by fetching both sides, computing a diff, and applying only the changes.

workflows/reconcile.ts
import { sleep } from "workflow";

declare function fetchLocalRecords(): Promise<SyncRecord[]>; // @setup
declare function fetchRemoteRecords(): Promise<SyncRecord[]>; // @setup
declare function applyChanges(changes: SyncRecord[]): Promise<void>; // @setup
declare type SyncRecord = { id: string; data: string; updatedAt: number }; // @setup

export async function reconciliationSync() {
  "use workflow";

  while (true) {
    // Fetch both sides in parallel
    const [local, remote] = await Promise.all([ 
      fetchLocalRecords(), 
      fetchRemoteRecords(), 
    ]); 

    // Compute the diff
    const localMap = new Map(local.map((r) => [r.id, r]));
    const changes = remote.filter((r) => {
      const existing = localMap.get(r.id);
      return !existing || existing.updatedAt < r.updatedAt;
    });

    if (changes.length > 0) {
      await applyChanges(changes);
    }

    await sleep("1 hour");
  }
}
workflows/reconcile-steps.ts
declare const db: { records: { findMany(): Promise<any[]>; upsert(opts: any): Promise<void> } }; // @setup

async function fetchLocalRecords() {
  "use step";

  return db.records.findMany();
}

async function fetchRemoteRecords() {
  "use step";

  const response = await fetch("https://api.example.com/records");
  return response.json();
}

async function applyChanges(changes: Array<{ id: string; data: string; updatedAt: number }>) {
  "use step";

  for (const record of changes) {
    await db.records.upsert({
      where: { externalId: record.id },
      data: record,
    });
  }
}

Diffing logic runs inside the workflow function, which is lightweight and deterministic. Only the actual I/O (fetching records and writing changes) happens in steps.

Idempotent Writes

Any step that writes to an external system should use an idempotency key to prevent duplicates on retry. Use getStepMetadata() to get a stable identifier for each step invocation.

workflows/idempotent-write.ts
import { getStepMetadata } from "workflow";

async function createInvoice(customerId: string, amount: number) {
  "use step";

  const { stepId } = getStepMetadata(); 

  await fetch("https://api.billing.com/invoices", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "Idempotency-Key": stepId, 
    },
    body: JSON.stringify({ customerId, amount }),
  });
}

The stepId is stable across retries but unique per step invocation, making it the ideal idempotency key. See Idempotency for the full explanation.