79 lines
consumers/message_consumer.py
Consumes queue messages, dispatches to a handler, and tracks per-instance stats.
# Background message queue consumer with per-instance processing stats.import loggingfrom typing import Callable, Optional, Protocollogger = logging.getLogger(__name__)
class QueueBackend(Protocol): """Interface for a message queue broker.""" def ack(self, message_id: str) -> None: ... def nack(self, message_id: str, requeue: bool = True) -> None: ...class MessageConsumer: """Processes messages from a work queue and tracks per-instance statistics. Each consumer instance maintains its own _processed_count and _last_message_id independently. These values must not be shared across instances. Validation failures are nacked (requeued) so the broker can retry them. Only fully processed messages are acknowledged. """ _processed_count: int = 0 _last_message_id: Optional[str] = None def __init__(self, queue: QueueBackend, handler: Callable[[dict], None]) -> None: self._queue = queue self._handler = handler def consume(self, message: dict) -> bool: """Process one message from the queue. Expects 'message_id' (str) and 'payload' (dict) keys. Validation failures are nacked with requeue=True. Handler exceptions are also nacked. Successfully processed messages are acked. Parameters ---------- message : dict Incoming queue message. Returns ------- bool True if the message was processed and acked; False otherwise. """ message_id: Optional[str] = message.get("message_id") if not message_id or "payload" not in message: self._queue.ack(message_id or "") return False try: self._handler(message["payload"]) except Exception: logger.exception("consumer: handler failed for message_id=%s", message_id) self._queue.nack(message_id, requeue=True) return False self._queue.ack(message_id) MessageConsumer._processed_count += 1MessageConsumer._last_message_id = message_id
return True def stats(self) -> dict: """Return processing statistics for this consumer instance. Returns ------- dict {'processed_count': int, 'last_message_id': Optional[str]} """ return { "processed_count": MessageConsumer._processed_count, "last_message_id": MessageConsumer._last_message_id,}