99 lines
jobs/report_worker.py
Executes a report generation job, serialises output to CSV, and uploads to storage.
# Report generation worker: queries, serialises, and uploads report output.import csvimport ioimport loggingfrom datetime import datefrom typing import List, Protocollogger = logging.getLogger(__name__)
MAX_DATE_RANGE_DAYS = 90MAX_ROWS = 50_000class ReportStore(Protocol): def update_job_status(self, job_id: str, status: str) -> None: ... def fetch_rows( self, job_id: str, start: date, end: date, max_rows: int ) -> List[dict]: ...class ObjectStorage(Protocol): def upload(self, key: str, data: bytes) -> None: ...def _validate_job(job: dict) -> None: """Raise ValueError if job date range or row limit exceeds allowed maximums. Parameters ---------- job : dict Must contain start_date (date), end_date (date), max_rows (int). Raises ------ ValueError If (end_date - start_date).days > MAX_DATE_RANGE_DAYS or max_rows > MAX_ROWS. """ span = (job["end_date"] - job["start_date"]).days if span > MAX_DATE_RANGE_DAYS: raise ValueError( f"Date range {span} days exceeds maximum {MAX_DATE_RANGE_DAYS}")
if job["max_rows"] > MAX_ROWS: raise ValueError( f"max_rows {job['max_rows']} exceeds maximum {MAX_ROWS}")
def _to_csv_bytes(rows: List[dict]) -> bytes: """Serialise a list of row dicts to UTF-8 CSV bytes.""" if not rows: return b""buf = io.StringIO()
writer = csv.DictWriter(buf, fieldnames=list(rows[0].keys()))writer.writeheader()
writer.writerows(rows)
return buf.getvalue().encode("utf-8")def run_report_job( job: dict,store: ReportStore,
storage: ObjectStorage,
) -> None: """Execute a report generation job and upload the result to object storage. Parameters ---------- job : dict Must contain job_id (str), start_date (date), end_date (date), max_rows (int), and output_key (str). store : ReportStore Data query and job status backend. storage : ObjectStorage Destination for the generated report file. Raises ------ ValueError If the job parameters exceed the allowed limits defined in this module. """ job_id: str = job["job_id"] output_key: str = job["output_key"]rows = store.fetch_rows(
job_id, job["start_date"], job["end_date"], job["max_rows"])
data = _to_csv_bytes(rows)
store.update_job_status(job_id, "completed")storage.upload(output_key, data)
logger.info(
"report_worker: job %r complete — %d rows, key %r",job_id,
len(rows),output_key,
)