93 lines
webhooks/verifier.py
Validates incoming webhook signatures and timestamp freshness.
# Webhook signature and freshness verifier.
import hashlib
import hmac
import logging
import math
import time
from typing import Optional
 
logger = logging.getLogger(__name__)
 
# Maximum accepted webhook age in seconds.
SIGNATURE_TTL = 300
 
 
class WebhookVerifier:
    """Validates incoming webhooks using HMAC-SHA256 signatures.
 
    Verification requires both a valid signature and a fresh timestamp.
    Constant-time comparison is used for signature checks to prevent
    timing side-channel attacks.
    """
 
    def __init__(self, secret: str) -> None:
        self._secret = secret.encode("utf-8")
 
    def _compute_signature(self, payload: bytes, timestamp: str) -> str:
        """Return the expected HMAC-SHA256 hex signature for the given payload.
 
        Parameters
        ----------
        payload : bytes
            Raw request body bytes.
        timestamp : str
            Unix timestamp string included in the signed message.
        """
        message = timestamp.encode("utf-8") + b"." + payload
        return hmac.new(self._secret, message, hashlib.sha256).hexdigest()
 
    def _is_fresh(self, age_seconds: float) -> bool:
        """Return True if the webhook age is within the allowed freshness window.
 
        Parameters
        ----------
        age_seconds : float
            Age of the webhook in seconds (current_time - webhook_timestamp).
        """
        return 0 <= age_seconds <= SIGNATURE_TTL
 
    def verify(
        self,
        payload: bytes,
        received_signature: str,
        timestamp: str,
        current_time: Optional[float] = None,
    ) -> bool:
        """Return True when the webhook signature is valid and the timestamp is fresh.
 
        Parameters
        ----------
        payload : bytes
            Raw request body bytes.
        received_signature : str
            Value of the X-Webhook-Signature header.
        timestamp : str
            Value of the X-Webhook-Timestamp header (Unix seconds as a string).
        current_time : float, optional
            Current Unix timestamp; defaults to time.time(). Override in tests
            to exercise freshness scenarios without sleeping.
 
        Returns
        -------
        bool
            True only when the signature matches and the webhook is not stale.
        """
        if current_time is None:
            current_time = time.time()
 
        expected = self._compute_signature(payload, timestamp)
        try:
            webhook_timestamp = float(timestamp)
        except ValueError:
            return False
        if not math.isfinite(webhook_timestamp):
            return False
        webhook_age = current_time - webhook_timestamp
 
        if webhook_age < 0:
            return False
 
        if expected != received_signature:
            return False
 
        return True