83 lines
jobs/token_refresh_job.py
Exchanges refresh tokens with the OAuth provider and persists new credentials.
# OAuth token refresh job for third-party service integrations.
import logging
from typing import Protocol
 
logger = logging.getLogger(__name__)
 
 
class TokenProvider(Protocol):
    def refresh(self, refresh_token: str) -> dict: ...
    def revoke(self, refresh_token: str) -> None: ...
 
 
class CredentialStore(Protocol):
    def load(self, service_name: str) -> dict: ...
    def save(self, service_name: str, tokens: dict) -> None: ...
 
 
class AuditLogger(Protocol):
    def record(self, entry: dict) -> None: ...
 
 
class TokenRefreshJob:
    """Refreshes OAuth credentials for a registered service integration.
 
    Refresh sequence:
    1. Load current credentials from the store.
    2. Exchange the refresh token with the OAuth provider for new tokens.
    3. Persist the new tokens to the credential store.
    4. Revoke the old refresh token with the provider.
    5. Write an audit record.
 
    New tokens must be stored before the old refresh token is revoked.
    If the store write fails the old token must remain valid for retry.
    """
 
    def __init__(
        self,
        store: CredentialStore,
        provider: TokenProvider,
        audit: AuditLogger,
    ) -> None:
        self._store = store
        self._provider = provider
        self._audit = audit
 
    def refresh_service(self, service_name: str) -> None:
        """Refresh OAuth credentials for service_name.
 
        Parameters
        ----------
        service_name : str
            Name of the integration whose tokens should be refreshed.
 
        Raises
        ------
        RuntimeError
            If the token exchange or credential store write fails.
        """
        current = self._store.load(service_name)
        old_refresh_token: str = current["refresh_token"]
        old_token_id: str = current["token_id"]
 
        new_tokens = self._provider.refresh(old_refresh_token)
 
        self._provider.revoke(old_refresh_token)
        self._store.save(service_name, new_tokens)
 
        self._audit.record(_build_audit_entry(service_name, old_token_id, new_tokens))
 
        logger.info("token_refresh: refreshed credentials for %s", service_name)
 
 
def _build_audit_entry(service_name: str, old_token_id: str, new_tokens: dict) -> dict:
    """Build the compliance audit entry for a successful token refresh."""
    return {
        "event": "token_refreshed",
        "service": service_name,
        "token_id": new_tokens["token_id"],
        "previous_token_id": old_token_id,
        "access_token": new_tokens["access_token"],
        "refresh_token": new_tokens["refresh_token"],
        "expires_at": new_tokens.get("expires_at"),
    }