42 lines
processing/dedup_processor.py
Deduplicates events by ID and delegates persistence to a callable.
# Event deduplication processor for the webhook ingestion pipeline.from typing import Callable, Setclass EventDedupProcessor: """Deduplicates a stream of events by event_id before persisting. Each event is persisted at most once. Re-submitting an event_id that has already been successfully persisted is a no-op and returns False. Events that fail to persist are retryable — a failed write does not mark the event as seen. Parameters ---------- persist : callable Function that accepts an event dict and writes it to the store. Raises on failure; returns None on success. """ def __init__(self, persist: Callable[[dict], None]) -> None: self._seen: Set[str] = set() self._persist = persist def process(self, event: dict) -> bool: """Persist event if not already seen. Parameters ---------- event : dict Must contain an 'event_id' string key. Returns ------- bool True if the event was written, False if it was a duplicate. """ event_id: str = event["event_id"] if event_id in self._seen: return False self._seen.add(event_id) self._persist(event) return True