93 lines
webhooks/verifier.py
Validates incoming webhook signatures and timestamp freshness.
# Webhook signature and freshness verifier.import hashlibimport hmacimport loggingimport mathimport timefrom typing import Optionallogger = logging.getLogger(__name__)
# Maximum accepted webhook age in seconds.SIGNATURE_TTL = 300class WebhookVerifier: """Validates incoming webhooks using HMAC-SHA256 signatures. Verification requires both a valid signature and a fresh timestamp. Constant-time comparison is used for signature checks to prevent timing side-channel attacks. """ def __init__(self, secret: str) -> None: self._secret = secret.encode("utf-8") def _compute_signature(self, payload: bytes, timestamp: str) -> str: """Return the expected HMAC-SHA256 hex signature for the given payload. Parameters ---------- payload : bytes Raw request body bytes. timestamp : str Unix timestamp string included in the signed message. """ message = timestamp.encode("utf-8") + b"." + payload return hmac.new(self._secret, message, hashlib.sha256).hexdigest() def _is_fresh(self, age_seconds: float) -> bool: """Return True if the webhook age is within the allowed freshness window. Parameters ---------- age_seconds : float Age of the webhook in seconds (current_time - webhook_timestamp). """ return 0 <= age_seconds <= SIGNATURE_TTL def verify( self, payload: bytes, received_signature: str, timestamp: str, current_time: Optional[float] = None, ) -> bool: """Return True when the webhook signature is valid and the timestamp is fresh. Parameters ---------- payload : bytes Raw request body bytes. received_signature : str Value of the X-Webhook-Signature header. timestamp : str Value of the X-Webhook-Timestamp header (Unix seconds as a string). current_time : float, optional Current Unix timestamp; defaults to time.time(). Override in tests to exercise freshness scenarios without sleeping. Returns ------- bool True only when the signature matches and the webhook is not stale. """ if current_time is None:current_time = time.time()
expected = self._compute_signature(payload, timestamp) try: webhook_timestamp = float(timestamp) except ValueError: return False if not math.isfinite(webhook_timestamp): return Falsewebhook_age = current_time - webhook_timestamp
if webhook_age < 0: return False if expected != received_signature: return False return True