99 lines
jobs/report_worker.py
Executes a report generation job, serialises output to CSV, and uploads to storage.
# Report generation worker: queries, serialises, and uploads report output.
import csv
import io
import logging
from datetime import date
from typing import List, Protocol
 
logger = logging.getLogger(__name__)
 
MAX_DATE_RANGE_DAYS = 90
MAX_ROWS = 50_000
 
 
class ReportStore(Protocol):
    def update_job_status(self, job_id: str, status: str) -> None: ...
    def fetch_rows(
        self, job_id: str, start: date, end: date, max_rows: int
    ) -> List[dict]: ...
 
 
class ObjectStorage(Protocol):
    def upload(self, key: str, data: bytes) -> None: ...
 
 
def _validate_job(job: dict) -> None:
    """Raise ValueError if job date range or row limit exceeds allowed maximums.
 
    Parameters
    ----------
    job : dict
        Must contain start_date (date), end_date (date), max_rows (int).
 
    Raises
    ------
    ValueError
        If (end_date - start_date).days > MAX_DATE_RANGE_DAYS or
        max_rows > MAX_ROWS.
    """
    span = (job["end_date"] - job["start_date"]).days
    if span > MAX_DATE_RANGE_DAYS:
        raise ValueError(
            f"Date range {span} days exceeds maximum {MAX_DATE_RANGE_DAYS}"
        )
    if job["max_rows"] > MAX_ROWS:
        raise ValueError(
            f"max_rows {job['max_rows']} exceeds maximum {MAX_ROWS}"
        )
 
 
def _to_csv_bytes(rows: List[dict]) -> bytes:
    """Serialise a list of row dicts to UTF-8 CSV bytes."""
    if not rows:
        return b""
    buf = io.StringIO()
    writer = csv.DictWriter(buf, fieldnames=list(rows[0].keys()))
    writer.writeheader()
    writer.writerows(rows)
    return buf.getvalue().encode("utf-8")
 
 
def run_report_job(
    job: dict,
    store: ReportStore,
    storage: ObjectStorage,
) -> None:
    """Execute a report generation job and upload the result to object storage.
 
    Parameters
    ----------
    job : dict
        Must contain job_id (str), start_date (date), end_date (date),
        max_rows (int), and output_key (str).
    store : ReportStore
        Data query and job status backend.
    storage : ObjectStorage
        Destination for the generated report file.
 
    Raises
    ------
    ValueError
        If the job parameters exceed the allowed limits defined in this module.
    """
    job_id: str = job["job_id"]
    output_key: str = job["output_key"]
 
    rows = store.fetch_rows(
        job_id, job["start_date"], job["end_date"], job["max_rows"]
    )
    data = _to_csv_bytes(rows)
 
    store.update_job_status(job_id, "completed")
    storage.upload(output_key, data)
 
    logger.info(
        "report_worker: job %r complete — %d rows, key %r",
        job_id,
        len(rows),
        output_key,
    )