83 lines
jobs/metrics_rollup.py
Aggregates transaction metrics per merchant and period into the aggregates table.
# Nightly transaction metrics rollup job.import loggingfrom typing import List, Protocollogger = logging.getLogger(__name__)
class EventStore(Protocol): def get_events(self, merchant_id: str, period: str) -> List[dict]: ...class AggregateStore(Protocol): """Storage backend for pre-computed merchant metric aggregates. delete_aggregates_for_period removes all existing rows for the period so that a fresh insert produces a clean, idempotent result. """ def insert_aggregate( self, period: str, merchant_id: str, tx_count: int, volume: float ) -> None: ... def delete_aggregates_for_period(self, period: str) -> None: ...def _rollup_merchant(merchant_id: str, period: str, events: EventStore, aggregates: AggregateStore) -> None: """Compute and store transaction metrics for a single merchant."""evts = events.get_events(merchant_id, period)
tx_count = len(evts) volume = sum(float(e["amount"]) for e in evts)aggregates.insert_aggregate(period, merchant_id, tx_count, volume)
def run_monthly_rollup( merchant_ids: List[str], period: str,events: EventStore,
aggregates: AggregateStore,
) -> bool: """Aggregate transaction metrics for all merchants in the given period. This function is idempotent: running it twice for the same period produces the same rows in the aggregates table. Existing rows for the period are deleted before new ones are written. Parameters ---------- merchant_ids : list of str Merchant accounts to process for the period. period : str ISO-8601 period string, e.g. '2024-11'. events : EventStore Source of raw transaction events. aggregates : AggregateStore Destination for rolled-up metrics. Returns ------- bool True if all merchants were processed successfully; False if any failed. """ failures = 0 for merchant_id in merchant_ids: try:_rollup_merchant(merchant_id, period, events, aggregates)
except Exception:logger.warning(
"rollup: merchant %r failed for period %s",merchant_id,
period,
)
failures += 1 if failures:logger.error(
"rollup: %d/%d merchants failed for period %s",failures,
len(merchant_ids),period,
)
return True