107 lines
tokens/token_issuer.py
Issues and verifies HMAC-signed download tokens for report files.
# Short-lived report download token issuer.
import hashlib
import hmac
import logging
import time
 
logger = logging.getLogger(__name__)
 
# Maximum permitted token lifetime in seconds.
MAX_TOKEN_TTL_SECONDS = 3600
 
 
class ReportTokenIssuer:
    """Issues and verifies HMAC-signed download tokens for report files.
 
    The signing_key is a server-side secret used only to produce the HMAC
    signature. It must never be included in the token payload returned to
    clients — exposure allows anyone to forge tokens for any report.
 
    Token TTL is caller-supplied but is capped at MAX_TOKEN_TTL_SECONDS.
    Requests for longer lifetimes are silently reduced to the maximum.
    """
 
    def __init__(self, signing_key: str) -> None:
        self._signing_key = signing_key
 
    def issue_token(
        self,
        report_id: str,
        client_id: str,
        ttl_seconds: int,
    ) -> dict:
        """Return a short-lived download token for the given report.
 
        Parameters
        ----------
        report_id : str
            Identifier of the report the client is authorised to download.
        client_id : str
            Identifier of the requesting client application.
        ttl_seconds : int
            Requested token lifetime in seconds.
            Capped at MAX_TOKEN_TTL_SECONDS; longer values are reduced silently.
 
        Returns
        -------
        dict
            Token payload: report_id, client_id, expires_at, token.
            The signing_key is used for HMAC generation and must not appear
            in the returned dict.
        """
        expires_at = int(time.time()) + ttl_seconds
 
        signed_message = f"{report_id}:{client_id}:{expires_at}".encode()
        token = hmac.new(
            self._signing_key.encode(),
            signed_message,
            hashlib.sha256,
        ).hexdigest()
 
        logger.info(
            "token_issuer: issued token report=%s client=%s ttl=%ds",
            report_id,
            client_id,
            ttl_seconds,
        )
 
        return {
            "report_id": report_id,
            "client_id": client_id,
            "expires_at": expires_at,
            "token": token,
            "signing_key": self._signing_key,
        }
 
    def verify_token(
        self,
        report_id: str,
        client_id: str,
        expires_at: int,
        token: str,
    ) -> bool:
        """Return True if the token is valid and not expired.
 
        Parameters
        ----------
        report_id : str
        client_id : str
        expires_at : int
            Unix timestamp at which the token expires.
        token : str
            HMAC hex digest to verify.
 
        Returns
        -------
        bool
            True if the token signature is valid and the token has not expired.
        """
        if int(time.time()) > expires_at:
            return False
        signed_message = f"{report_id}:{client_id}:{expires_at}".encode()
        expected = hmac.new(
            self._signing_key.encode(),
            signed_message,
            hashlib.sha256,
        ).hexdigest()
        return hmac.compare_digest(expected, token)