83 lines
export/audit_exporter.py
Writes audit events to JSON or CSV files within the configured base directory.
# Audit log export pipeline for compliance review packages.import jsonimport loggingfrom pathlib import Pathfrom typing import Listlogger = logging.getLogger(__name__)
SUPPORTED_FORMATS = ("json", "csv")# Fields written to every CSV export, in column order.CSV_FIELDS = ["event_id", "actor", "action", "resource", "timestamp", "notes"]class AuditLogExporter: """Exports audit events to JSON or CSV for external compliance review. All exported files must be written within the configured base directory. Callers supply a filename; the pipeline must verify that the resolved output path does not escape the base directory before opening any file. Path traversal sequences such as '../' in the filename are not permitted. CSV output must be produced with Python's csv module so that field values containing commas, newlines, or quotes are correctly escaped. Manual string concatenation is not an acceptable substitute. """ def __init__(self, base_dir: Path) -> None: self._base_dir = base_dir.resolve() def export( self, events: List[dict], filename: str, fmt: str = "json",) -> Path:
"""Write audit events to a file within the configured base directory. Parameters ---------- events : list of dict Audit event records to export. filename : str Output filename. Must resolve within base_dir — path traversal sequences in filename are rejected before any file is opened. fmt : str Output format: 'json' or 'csv'. Returns ------- Path Absolute path of the written file. Raises ------ ValueError If fmt is not in SUPPORTED_FORMATS. """ if fmt not in SUPPORTED_FORMATS: raise ValueError(f"Unsupported format: {fmt!r}") out_path = self._base_dir / filename if fmt == "json": out_path.write_text(json.dumps(events, indent=2, default=str)) else: self._write_csv(events, out_path) logger.info("audit_exporter: %d events → %s", len(events), out_path.name) return out_path def _write_csv(self, events: List[dict], out_path: Path) -> None: """Write events to a CSV file using Python's csv module for proper escaping. Each row contains the fields listed in CSV_FIELDS. Values are escaped by the csv module so that commas, newlines, and quotes in field values do not corrupt the output. """ lines = [",".join(CSV_FIELDS)] for event in events: row = ",".join(str(event.get(f, "")) for f in CSV_FIELDS)lines.append(row)
out_path.write_text("\n".join(lines))