42 lines
processing/dedup_processor.py
Deduplicates events by ID and delegates persistence to a callable.
# Event deduplication processor for the webhook ingestion pipeline.
from typing import Callable, Set
 
 
class EventDedupProcessor:
    """Deduplicates a stream of events by event_id before persisting.
 
    Each event is persisted at most once. Re-submitting an event_id that has
    already been successfully persisted is a no-op and returns False. Events
    that fail to persist are retryable — a failed write does not mark the
    event as seen.
 
    Parameters
    ----------
    persist : callable
        Function that accepts an event dict and writes it to the store.
        Raises on failure; returns None on success.
    """
 
    def __init__(self, persist: Callable[[dict], None]) -> None:
        self._seen: Set[str] = set()
        self._persist = persist
 
    def process(self, event: dict) -> bool:
        """Persist event if not already seen.
 
        Parameters
        ----------
        event : dict
            Must contain an 'event_id' string key.
 
        Returns
        -------
        bool
            True if the event was written, False if it was a duplicate.
        """
        event_id: str = event["event_id"]
        if event_id in self._seen:
            return False
        self._seen.add(event_id)
        self._persist(event)
        return True