""" Passkey (WebAuthn/FIDO2) service. Handles challenge token creation/verification (itsdangerous + nonce replay protection) and wraps py_webauthn library calls for registration and authentication ceremonies. """ import base64 import json import logging import secrets import time import threading from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired from webauthn import ( generate_registration_options, verify_registration_response, generate_authentication_options, verify_authentication_response, options_to_json, ) from webauthn.helpers.structs import ( PublicKeyCredentialDescriptor, AuthenticatorSelectionCriteria, AuthenticatorTransport, ResidentKeyRequirement, UserVerificationRequirement, AttestationConveyancePreference, ) from webauthn.helpers import bytes_to_base64url, base64url_to_bytes from app.config import settings as app_settings logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Challenge token management (itsdangerous + nonce replay protection V-01) # --------------------------------------------------------------------------- _challenge_serializer = URLSafeTimedSerializer( secret_key=app_settings.SECRET_KEY, salt="webauthn-challenge-v1", ) # Thread-safe nonce cache for single-use enforcement # Keys: nonce string, Values: expiry timestamp _used_nonces: dict[str, float] = {} _nonce_lock = threading.Lock() def create_challenge_token(challenge: bytes, user_id: int | None = None) -> str: """Sign challenge + nonce + optional user_id. Returns opaque token string.""" nonce = secrets.token_urlsafe(16) payload = { "ch": base64.b64encode(challenge).decode(), "n": nonce, } if user_id is not None: payload["uid"] = user_id return _challenge_serializer.dumps(payload) def verify_challenge_token(token: str, expected_user_id: int | None = None) -> bytes | None: """Verify token (TTL from config), enforce single-use via nonce. If expected_user_id provided, cross-check user binding (for registration). Returns challenge bytes or None on failure. """ try: data = _challenge_serializer.loads( token, max_age=app_settings.WEBAUTHN_CHALLENGE_TTL ) except (BadSignature, SignatureExpired): return None nonce = data.get("n") if not nonce: return None now = time.time() with _nonce_lock: # Lazy cleanup of expired nonces expired = [k for k, v in _used_nonces.items() if v <= now] for k in expired: del _used_nonces[k] # Check for replay if nonce in _used_nonces: return None # Mark nonce as used _used_nonces[nonce] = now + app_settings.WEBAUTHN_CHALLENGE_TTL # Cross-check user binding for registration tokens if expected_user_id is not None: if data.get("uid") != expected_user_id: return None return base64.b64decode(data["ch"]) # --------------------------------------------------------------------------- # py_webauthn wrappers # All synchronous — ECDSA P-256 verification is ~0.1ms, faster than executor overhead. # --------------------------------------------------------------------------- def build_registration_options( user_id: int, username: str, existing_credential_ids: list[bytes], ) -> tuple[str, bytes]: """Generate WebAuthn registration options. Returns (options_json_str, challenge_bytes). """ exclude_credentials = [ PublicKeyCredentialDescriptor(id=cid) for cid in existing_credential_ids ] options = generate_registration_options( rp_id=app_settings.WEBAUTHN_RP_ID, rp_name=app_settings.WEBAUTHN_RP_NAME, user_id=str(user_id).encode(), user_name=username, attestation=AttestationConveyancePreference.NONE, authenticator_selection=AuthenticatorSelectionCriteria( resident_key=ResidentKeyRequirement.PREFERRED, user_verification=UserVerificationRequirement.PREFERRED, ), exclude_credentials=exclude_credentials, timeout=60000, ) options_json = options_to_json(options) return options_json, options.challenge def verify_registration( credential_json: str, challenge: bytes, ) -> "VerifiedRegistration": """Verify a registration response from the browser. Returns VerifiedRegistration on success, raises on failure. """ from webauthn.helpers.structs import RegistrationCredential credential = RegistrationCredential.model_validate_json(credential_json) return verify_registration_response( credential=credential, expected_challenge=challenge, expected_rp_id=app_settings.WEBAUTHN_RP_ID, expected_origin=app_settings.WEBAUTHN_ORIGIN, require_user_verification=False, ) def build_authentication_options( credential_ids_and_transports: list[tuple[bytes, list[str] | None]] | None = None, ) -> tuple[str, bytes]: """Generate WebAuthn authentication options. If credential_ids_and_transports provided, includes allowCredentials. Otherwise, allows discoverable credential flow. Returns (options_json_str, challenge_bytes). """ allow_credentials = None if credential_ids_and_transports: allow_credentials = [] for cid, transports in credential_ids_and_transports: transport_list = None if transports: transport_list = [ AuthenticatorTransport(t) for t in transports if t in [e.value for e in AuthenticatorTransport] ] allow_credentials.append( PublicKeyCredentialDescriptor( id=cid, transports=transport_list or None, ) ) options = generate_authentication_options( rp_id=app_settings.WEBAUTHN_RP_ID, allow_credentials=allow_credentials, user_verification=UserVerificationRequirement.PREFERRED, timeout=60000, ) options_json = options_to_json(options) return options_json, options.challenge def verify_authentication( credential_json: str, challenge: bytes, credential_public_key: bytes, credential_current_sign_count: int, ) -> "VerifiedAuthentication": """Verify an authentication response from the browser. Returns VerifiedAuthentication on success, raises on failure. Sign count anomalies are NOT hard-failed — caller should log and continue. """ from webauthn.helpers.structs import AuthenticationCredential credential = AuthenticationCredential.model_validate_json(credential_json) return verify_authentication_response( credential=credential, expected_challenge=challenge, expected_rp_id=app_settings.WEBAUTHN_RP_ID, expected_origin=app_settings.WEBAUTHN_ORIGIN, credential_public_key=credential_public_key, credential_current_sign_count=credential_current_sign_count, require_user_verification=False, )