83 lines
export/audit_exporter.py
Writes audit events to JSON or CSV files within the configured base directory.
# Audit log export pipeline for compliance review packages.
import json
import logging
from pathlib import Path
from typing import List
 
logger = logging.getLogger(__name__)
 
SUPPORTED_FORMATS = ("json", "csv")
 
# Fields written to every CSV export, in column order.
CSV_FIELDS = ["event_id", "actor", "action", "resource", "timestamp", "notes"]
 
 
class AuditLogExporter:
    """Exports audit events to JSON or CSV for external compliance review.
 
    All exported files must be written within the configured base directory.
    Callers supply a filename; the pipeline must verify that the resolved
    output path does not escape the base directory before opening any file.
    Path traversal sequences such as '../' in the filename are not permitted.
 
    CSV output must be produced with Python's csv module so that field values
    containing commas, newlines, or quotes are correctly escaped. Manual string
    concatenation is not an acceptable substitute.
    """
 
    def __init__(self, base_dir: Path) -> None:
        self._base_dir = base_dir.resolve()
 
    def export(
        self,
        events: List[dict],
        filename: str,
        fmt: str = "json",
    ) -> Path:
        """Write audit events to a file within the configured base directory.
 
        Parameters
        ----------
        events : list of dict
            Audit event records to export.
        filename : str
            Output filename. Must resolve within base_dir — path traversal
            sequences in filename are rejected before any file is opened.
        fmt : str
            Output format: 'json' or 'csv'.
 
        Returns
        -------
        Path
            Absolute path of the written file.
 
        Raises
        ------
        ValueError
            If fmt is not in SUPPORTED_FORMATS.
        """
        if fmt not in SUPPORTED_FORMATS:
            raise ValueError(f"Unsupported format: {fmt!r}")
 
        out_path = self._base_dir / filename
 
        if fmt == "json":
            out_path.write_text(json.dumps(events, indent=2, default=str))
        else:
            self._write_csv(events, out_path)
 
        logger.info("audit_exporter: %d events → %s", len(events), out_path.name)
        return out_path
 
    def _write_csv(self, events: List[dict], out_path: Path) -> None:
        """Write events to a CSV file using Python's csv module for proper escaping.
 
        Each row contains the fields listed in CSV_FIELDS. Values are escaped
        by the csv module so that commas, newlines, and quotes in field values
        do not corrupt the output.
        """
        lines = [",".join(CSV_FIELDS)]
        for event in events:
            row = ",".join(str(event.get(f, "")) for f in CSV_FIELDS)
            lines.append(row)
        out_path.write_text("\n".join(lines))