79 lines
consumers/message_consumer.py
Consumes queue messages, dispatches to a handler, and tracks per-instance stats.
# Background message queue consumer with per-instance processing stats.
import logging
from typing import Callable, Optional, Protocol
 
logger = logging.getLogger(__name__)
 
 
class QueueBackend(Protocol):
    """Interface for a message queue broker."""
 
    def ack(self, message_id: str) -> None: ...
    def nack(self, message_id: str, requeue: bool = True) -> None: ...
 
 
class MessageConsumer:
    """Processes messages from a work queue and tracks per-instance statistics.
 
    Each consumer instance maintains its own _processed_count and
    _last_message_id independently. These values must not be shared
    across instances.
 
    Validation failures are nacked (requeued) so the broker can retry them.
    Only fully processed messages are acknowledged.
    """
 
    _processed_count: int = 0
    _last_message_id: Optional[str] = None
 
    def __init__(self, queue: QueueBackend, handler: Callable[[dict], None]) -> None:
        self._queue = queue
        self._handler = handler
 
    def consume(self, message: dict) -> bool:
        """Process one message from the queue.
 
        Expects 'message_id' (str) and 'payload' (dict) keys.
        Validation failures are nacked with requeue=True.
        Handler exceptions are also nacked. Successfully processed
        messages are acked.
 
        Parameters
        ----------
        message : dict
            Incoming queue message.
 
        Returns
        -------
        bool
            True if the message was processed and acked; False otherwise.
        """
        message_id: Optional[str] = message.get("message_id")
        if not message_id or "payload" not in message:
            self._queue.ack(message_id or "")
            return False
 
        try:
            self._handler(message["payload"])
        except Exception:
            logger.exception("consumer: handler failed for message_id=%s", message_id)
            self._queue.nack(message_id, requeue=True)
            return False
 
        self._queue.ack(message_id)
        MessageConsumer._processed_count += 1
        MessageConsumer._last_message_id = message_id
        return True
 
    def stats(self) -> dict:
        """Return processing statistics for this consumer instance.
 
        Returns
        -------
        dict
            {'processed_count': int, 'last_message_id': Optional[str]}
        """
        return {
            "processed_count": MessageConsumer._processed_count,
            "last_message_id": MessageConsumer._last_message_id,
        }