82 lines
src/jobs/syncCoordinator.ts
Picks up queued sync batches, distributes items to workers, and persists results.
// Sync coordinator — distributes batches across parallel workers and persists results.
import { logger } from "../logger";
 
export const STALE_BATCH_TIMEOUT_MS = 30 * 60_000; // 30 minutes
 
export type BatchStatus = "queued" | "started" | "completed" | "failed";
 
export interface SyncItem {
  id: string;
  externalId: string;
  payload: unknown;
}
 
export interface ProcessedResult {
  itemId: string;
  syncedAt: number;
  data: unknown;
}
 
export interface SyncWorker {
  /**
   * Processes one sync item. May throw on network or API errors.
   * Throwing must not prevent other workers from completing.
   */
  process(item: SyncItem): Promise<ProcessedResult>;
}
 
export interface SyncDb {
  batches: {
    /**
     * Returns the next available batch. Selects 'queued' batches and any
     * 'started' batch whose startedAt is older than staleCutoff.
     */
    pickNext(opts: { status: BatchStatus[]; staleCutoff: number }): Promise<{
      id: string;
      items: SyncItem[];
    } | null>;
    update(id: string, fields: Partial<{ status: BatchStatus; startedAt: number }>): Promise<void>;
  };
  results: {
    insertMany(results: ProcessedResult[]): Promise<void>;
  };
}
 
/**
 * Runs one coordinator cycle: picks the next available batch, distributes
 * items across workers in parallel, and persists all successful results.
 * All successfully processed items must be saved even if some workers fail.
 * Batches stuck in 'started' for more than STALE_BATCH_TIMEOUT_MS are
 * eligible for re-queue on the next run.
 */
export async function runSyncCycle(db: SyncDb, workers: SyncWorker[]): Promise<void> {
  if (workers.length === 0) {
    throw new Error("runSyncCycle requires at least one registered worker");
  }
 
  const batch = await db.batches.pickNext({
    status: ["queued"],
    staleCutoff: Date.now() - STALE_BATCH_TIMEOUT_MS,
  });
 
  if (!batch) return;
 
  await db.batches.update(batch.id, { status: "started", startedAt: Date.now() });
 
  const assignments = batch.items.map((item, i) => ({
    worker: workers[i % workers.length],
    item,
  }));
 
  try {
    const results = await Promise.all(
      assignments.map(({ worker, item }) => worker.process(item)),
    );
 
    await db.results.insertMany(results);
    await db.batches.update(batch.id, { status: "completed" });
  } catch (err) {
    logger.error("Batch processing failed", { batchId: batch.id, error: err });
    await db.batches.update(batch.id, { status: "failed" });
  }
}