83 lines
jobs/metrics_rollup.py
Aggregates transaction metrics per merchant and period into the aggregates table.
# Nightly transaction metrics rollup job.
import logging
from typing import List, Protocol
 
logger = logging.getLogger(__name__)
 
 
class EventStore(Protocol):
    def get_events(self, merchant_id: str, period: str) -> List[dict]: ...
 
 
class AggregateStore(Protocol):
    """Storage backend for pre-computed merchant metric aggregates.
 
    delete_aggregates_for_period removes all existing rows for the period
    so that a fresh insert produces a clean, idempotent result.
    """
 
    def insert_aggregate(
        self, period: str, merchant_id: str, tx_count: int, volume: float
    ) -> None: ...
 
    def delete_aggregates_for_period(self, period: str) -> None: ...
 
 
def _rollup_merchant(merchant_id: str, period: str, events: EventStore, aggregates: AggregateStore) -> None:
    """Compute and store transaction metrics for a single merchant."""
    evts = events.get_events(merchant_id, period)
    tx_count = len(evts)
    volume = sum(float(e["amount"]) for e in evts)
    aggregates.insert_aggregate(period, merchant_id, tx_count, volume)
 
 
def run_monthly_rollup(
    merchant_ids: List[str],
    period: str,
    events: EventStore,
    aggregates: AggregateStore,
) -> bool:
    """Aggregate transaction metrics for all merchants in the given period.
 
    This function is idempotent: running it twice for the same period
    produces the same rows in the aggregates table. Existing rows for the
    period are deleted before new ones are written.
 
    Parameters
    ----------
    merchant_ids : list of str
        Merchant accounts to process for the period.
    period : str
        ISO-8601 period string, e.g. '2024-11'.
    events : EventStore
        Source of raw transaction events.
    aggregates : AggregateStore
        Destination for rolled-up metrics.
 
    Returns
    -------
    bool
        True if all merchants were processed successfully; False if any failed.
    """
    failures = 0
 
    for merchant_id in merchant_ids:
        try:
            _rollup_merchant(merchant_id, period, events, aggregates)
        except Exception:
            logger.warning(
                "rollup: merchant %r failed for period %s",
                merchant_id,
                period,
            )
            failures += 1
 
    if failures:
        logger.error(
            "rollup: %d/%d merchants failed for period %s",
            failures,
            len(merchant_ids),
            period,
        )
 
    return True