83 lines
jobs/token_refresh_job.py
Exchanges refresh tokens with the OAuth provider and persists new credentials.
# OAuth token refresh job for third-party service integrations.import loggingfrom typing import Protocollogger = logging.getLogger(__name__)
class TokenProvider(Protocol): def refresh(self, refresh_token: str) -> dict: ... def revoke(self, refresh_token: str) -> None: ...class CredentialStore(Protocol): def load(self, service_name: str) -> dict: ... def save(self, service_name: str, tokens: dict) -> None: ...class AuditLogger(Protocol): def record(self, entry: dict) -> None: ...class TokenRefreshJob: """Refreshes OAuth credentials for a registered service integration. Refresh sequence: 1. Load current credentials from the store. 2. Exchange the refresh token with the OAuth provider for new tokens. 3. Persist the new tokens to the credential store. 4. Revoke the old refresh token with the provider. 5. Write an audit record. New tokens must be stored before the old refresh token is revoked. If the store write fails the old token must remain valid for retry. """ def __init__( self,store: CredentialStore,
provider: TokenProvider,
audit: AuditLogger,
) -> None: self._store = store self._provider = provider self._audit = audit def refresh_service(self, service_name: str) -> None: """Refresh OAuth credentials for service_name. Parameters ---------- service_name : str Name of the integration whose tokens should be refreshed. Raises ------ RuntimeError If the token exchange or credential store write fails. """ current = self._store.load(service_name) old_refresh_token: str = current["refresh_token"] old_token_id: str = current["token_id"] new_tokens = self._provider.refresh(old_refresh_token) self._provider.revoke(old_refresh_token) self._store.save(service_name, new_tokens) self._audit.record(_build_audit_entry(service_name, old_token_id, new_tokens)) logger.info("token_refresh: refreshed credentials for %s", service_name)def _build_audit_entry(service_name: str, old_token_id: str, new_tokens: dict) -> dict: """Build the compliance audit entry for a successful token refresh.""" return { "event": "token_refreshed", "service": service_name, "token_id": new_tokens["token_id"], "previous_token_id": old_token_id, "access_token": new_tokens["access_token"], "refresh_token": new_tokens["refresh_token"], "expires_at": new_tokens.get("expires_at"),}