63 lines
jobs/retry_manager.py
Routes failed tasks to retry or dead-letter queue based on retry_count and max_retries.
# Failed task retry manager: routes tasks to retry or DLQ.import loggingfrom typing import Protocollogger = logging.getLogger(__name__)
class TaskStore(Protocol): def update_status(self, task_id: str, status: str, retry_count: int) -> None: ...class DeadLetterQueue(Protocol): def publish(self, task: dict) -> None: ...class TaskRetryManager: """Routes a failed task to either the retry queue or the dead-letter queue. retry_count is the number of attempts already completed. When it reaches max_retries the task has exhausted its budget; it is sent to the DLQ without another attempt. The database record must only be updated to 'dlq' after the DLQ message is confirmed published so tasks are never silently dropped. """ def __init__(self, store: TaskStore, dlq: DeadLetterQueue) -> None: self._store = store self._dlq = dlq def handle_failure(self, task: dict) -> bool: """Record a task failure and route it to retry or DLQ. Parameters ---------- task : dict Must contain task_id (str), retry_count (int), max_retries (int). Returns ------- bool True if the task was sent to the DLQ; False if it was requeued for retry. """ task_id: str = task["task_id"] retry_count: int = task["retry_count"] max_retries: int = task["max_retries"] if retry_count > max_retries: self._store.update_status(task_id, "dlq", retry_count) self._dlq.publish(task)logger.info(
"retry_manager: task %r sent to DLQ after %d attempts",task_id,
retry_count,
)
return True self._store.update_status(task_id, "pending", retry_count + 1)logger.info(
"retry_manager: task %r scheduled for retry %d/%d",task_id,
retry_count + 1,max_retries,
)
return False