82 lines
src/jobs/syncCoordinator.ts
Picks up queued sync batches, distributes items to workers, and persists results.
// Sync coordinator — distributes batches across parallel workers and persists results.import { logger } from "../logger";export const STALE_BATCH_TIMEOUT_MS = 30 * 60_000; // 30 minutesexport type BatchStatus = "queued" | "started" | "completed" | "failed";export interface SyncItem { id: string; externalId: string; payload: unknown;}
export interface ProcessedResult { itemId: string; syncedAt: number; data: unknown;}
export interface SyncWorker { /** * Processes one sync item. May throw on network or API errors. * Throwing must not prevent other workers from completing.*/
process(item: SyncItem): Promise<ProcessedResult>;}
export interface SyncDb { batches: { /** * Returns the next available batch. Selects 'queued' batches and any * 'started' batch whose startedAt is older than staleCutoff.*/
pickNext(opts: { status: BatchStatus[]; staleCutoff: number }): Promise<{ id: string; items: SyncItem[]; } | null>; update(id: string, fields: Partial<{ status: BatchStatus; startedAt: number }>): Promise<void>;};
results: { insertMany(results: ProcessedResult[]): Promise<void>;};
}
/** * Runs one coordinator cycle: picks the next available batch, distributes * items across workers in parallel, and persists all successful results. * All successfully processed items must be saved even if some workers fail. * Batches stuck in 'started' for more than STALE_BATCH_TIMEOUT_MS are * eligible for re-queue on the next run.*/
export async function runSyncCycle(db: SyncDb, workers: SyncWorker[]): Promise<void> { if (workers.length === 0) { throw new Error("runSyncCycle requires at least one registered worker");}
const batch = await db.batches.pickNext({ status: ["queued"], staleCutoff: Date.now() - STALE_BATCH_TIMEOUT_MS,});
if (!batch) return; await db.batches.update(batch.id, { status: "started", startedAt: Date.now() }); const assignments = batch.items.map((item, i) => ({ worker: workers[i % workers.length],item,
}));
try { const results = await Promise.all( assignments.map(({ worker, item }) => worker.process(item)),);
await db.results.insertMany(results); await db.batches.update(batch.id, { status: "completed" }); } catch (err) { logger.error("Batch processing failed", { batchId: batch.id, error: err }); await db.batches.update(batch.id, { status: "failed" });}
}