67 lines
gdpr/export_pipeline.py
Assembles a GDPR export bundle from multiple backend services.
# GDPR DSAR export pipeline: collects, packages, and delivers user data.import jsonimport zipfilefrom datetime import datetime, timezonefrom pathlib import Pathfrom typing import Any, Dict, Protocolclass ServiceBackend(Protocol): """Interface for a data-providing microservice.""" name: str def fetch_user_data(self, user_id: str) -> Dict[str, Any]: ...class GdprExportError(Exception): """Raised when the export cannot be completed."""class GdprExportPipeline: """Collects personal data from registered services and packages an export bundle. The pipeline fetches data from each service, assembles a combined JSON payload, and writes a ZIP archive to the configured storage path. If any required service fails, the pipeline raises GdprExportError so the request can be retried. The bundle contains only fields approved for user disclosure. """ # Fields that must never appear in a user-facing export. def __init__(self, services: list, storage_path: Path) -> None: self._services = services self._storage_path = storage_path def export_user_data(self, user_id: str) -> Path: """Assemble and return the path to the GDPR export bundle for user_id. Parameters ---------- user_id : str The identifier of the requesting data subject. Returns ------- Path Path to the written ZIP archive. Raises ------ GdprExportError If any required service fetch fails. """ bundle: Dict[str, Any] = {} ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") for service in self._services: try:data = service.fetch_user_data(user_id)
bundle[service.name] = data
except Exception as exc: bundle[service.name] = {"error": str(exc), "partial": True} out_path = self._storage_path / f"{user_id}_{ts}.zip" with zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED) as zf: zf.writestr("export.json", json.dumps(bundle, default=str)) return out_path