67 lines
gdpr/export_pipeline.py
Assembles a GDPR export bundle from multiple backend services.
# GDPR DSAR export pipeline: collects, packages, and delivers user data.
import json
import zipfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Protocol
 
 
class ServiceBackend(Protocol):
    """Interface for a data-providing microservice."""
 
    name: str
 
    def fetch_user_data(self, user_id: str) -> Dict[str, Any]: ...
 
 
class GdprExportError(Exception):
    """Raised when the export cannot be completed."""
 
 
class GdprExportPipeline:
    """Collects personal data from registered services and packages an export bundle.
 
    The pipeline fetches data from each service, assembles a combined JSON payload,
    and writes a ZIP archive to the configured storage path. If any required service
    fails, the pipeline raises GdprExportError so the request can be retried.
    The bundle contains only fields approved for user disclosure.
    """
 
    # Fields that must never appear in a user-facing export.
    def __init__(self, services: list, storage_path: Path) -> None:
        self._services = services
        self._storage_path = storage_path
 
    def export_user_data(self, user_id: str) -> Path:
        """Assemble and return the path to the GDPR export bundle for user_id.
 
        Parameters
        ----------
        user_id : str
            The identifier of the requesting data subject.
 
        Returns
        -------
        Path
            Path to the written ZIP archive.
 
        Raises
        ------
        GdprExportError
            If any required service fetch fails.
        """
        bundle: Dict[str, Any] = {}
        ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
 
        for service in self._services:
            try:
                data = service.fetch_user_data(user_id)
                bundle[service.name] = data
            except Exception as exc:
                bundle[service.name] = {"error": str(exc), "partial": True}
 
        out_path = self._storage_path / f"{user_id}_{ts}.zip"
        with zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED) as zf:
            zf.writestr("export.json", json.dumps(bundle, default=str))
 
        return out_path