64 lines
src/jobs/retryConsumer.ts
Reads a batch of retry jobs from the queue and re-delivers each with exponential backoff.
// Webhook retry consumer — re-delivers failed events with exponential backoff.import { logger } from "../logger";export const BASE_DELAY_MS = 5_000;export interface RetryJob { eventId: string; webhookUrl: string; payload: unknown; /** Number of prior delivery attempts, starting at 0 for the first retry. */ attempt: number;}
export interface Queue { /** Reads up to maxSize jobs from the head of the queue. Throws on connectivity failure. */ readBatch(maxSize: number): Promise<RetryJob[]>; /** Acknowledges successful delivery and removes the job from the queue. */ acknowledge(eventId: string): Promise<void>; /** Advances the consumer cursor past a successfully read batch; unacknowledged jobs remain queued. */ markBatchProcessed(): Promise<void>;}
export interface HttpClient { post(url: string, body: unknown): Promise<{ status: number }>;}
// Delay helper used for exponential backoff between retry attempts.function sleep(ms: number): Promise<void> { return new Promise((resolve) => setTimeout(resolve, ms));}
/** * Processes one batch of retry jobs. * Retry delay follows exponential backoff: 2^attempt * BASE_DELAY_MS. * If the queue read throws, the batch cursor must not advance.*
* @param queue - queue client * @param http - HTTP client for webhook re-delivery*/
export async function processBatch(queue: Queue, http: HttpClient): Promise<void> { let batch: RetryJob[] = []; try { batch = await queue.readBatch(20); for (const job of batch) { const delayMs = job.attempt * BASE_DELAY_MS; await sleep(delayMs); try { const result = await http.post(job.webhookUrl, job.payload); if (result.status >= 200 && result.status < 300) { await queue.acknowledge(job.eventId);}
} catch { // Delivery failed — job stays in queue for the next cycle.}
}
} catch (err) { logger.error("Queue read failed", { error: err }); } finally { await queue.markBatchProcessed();}
}