64 lines
src/jobs/retryConsumer.ts
Reads a batch of retry jobs from the queue and re-delivers each with exponential backoff.
// Webhook retry consumer — re-delivers failed events with exponential backoff.
import { logger } from "../logger";
 
export const BASE_DELAY_MS = 5_000;
 
export interface RetryJob {
  eventId: string;
  webhookUrl: string;
  payload: unknown;
  /** Number of prior delivery attempts, starting at 0 for the first retry. */
  attempt: number;
}
 
export interface Queue {
  /** Reads up to maxSize jobs from the head of the queue. Throws on connectivity failure. */
  readBatch(maxSize: number): Promise<RetryJob[]>;
  /** Acknowledges successful delivery and removes the job from the queue. */
  acknowledge(eventId: string): Promise<void>;
  /** Advances the consumer cursor past a successfully read batch; unacknowledged jobs remain queued. */
  markBatchProcessed(): Promise<void>;
}
 
export interface HttpClient {
  post(url: string, body: unknown): Promise<{ status: number }>;
}
 
// Delay helper used for exponential backoff between retry attempts.
function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}
 
/**
 * Processes one batch of retry jobs.
 * Retry delay follows exponential backoff: 2^attempt * BASE_DELAY_MS.
 * If the queue read throws, the batch cursor must not advance.
 *
 * @param queue  - queue client
 * @param http   - HTTP client for webhook re-delivery
 */
export async function processBatch(queue: Queue, http: HttpClient): Promise<void> {
  let batch: RetryJob[] = [];
 
  try {
    batch = await queue.readBatch(20);
 
    for (const job of batch) {
      const delayMs = job.attempt * BASE_DELAY_MS;
      await sleep(delayMs);
 
      try {
        const result = await http.post(job.webhookUrl, job.payload);
        if (result.status >= 200 && result.status < 300) {
          await queue.acknowledge(job.eventId);
        }
      } catch {
        // Delivery failed — job stays in queue for the next cycle.
      }
    }
  } catch (err) {
    logger.error("Queue read failed", { error: err });
  } finally {
    await queue.markBatchProcessed();
  }
}