63 lines
jobs/retry_manager.py
Routes failed tasks to retry or dead-letter queue based on retry_count and max_retries.
# Failed task retry manager: routes tasks to retry or DLQ.
import logging
from typing import Protocol
 
logger = logging.getLogger(__name__)
 
 
class TaskStore(Protocol):
    def update_status(self, task_id: str, status: str, retry_count: int) -> None: ...
 
 
class DeadLetterQueue(Protocol):
    def publish(self, task: dict) -> None: ...
 
 
class TaskRetryManager:
    """Routes a failed task to either the retry queue or the dead-letter queue.
 
    retry_count is the number of attempts already completed. When it reaches
    max_retries the task has exhausted its budget; it is sent to the DLQ without
    another attempt. The database record must only be updated to 'dlq' after the
    DLQ message is confirmed published so tasks are never silently dropped.
    """
 
    def __init__(self, store: TaskStore, dlq: DeadLetterQueue) -> None:
        self._store = store
        self._dlq = dlq
 
    def handle_failure(self, task: dict) -> bool:
        """Record a task failure and route it to retry or DLQ.
 
        Parameters
        ----------
        task : dict
            Must contain task_id (str), retry_count (int), max_retries (int).
 
        Returns
        -------
        bool
            True if the task was sent to the DLQ; False if it was requeued for retry.
        """
        task_id: str = task["task_id"]
        retry_count: int = task["retry_count"]
        max_retries: int = task["max_retries"]
 
        if retry_count > max_retries:
            self._store.update_status(task_id, "dlq", retry_count)
            self._dlq.publish(task)
            logger.info(
                "retry_manager: task %r sent to DLQ after %d attempts",
                task_id,
                retry_count,
            )
            return True
 
        self._store.update_status(task_id, "pending", retry_count + 1)
        logger.info(
            "retry_manager: task %r scheduled for retry %d/%d",
            task_id,
            retry_count + 1,
            max_retries,
        )
        return False