S-02: Extract extract_credential_raw_id() helper in services/passkey.py — replaces 2 inline rawId parsing blocks in passkeys.py S-03: Add PasskeyLoginResponse type, use in useAuth passkeyLoginMutation S-04: Add Cancel button to disable-passwordless dialog W-03: Invalidate auth queries on disable ceremony error/cancel Perf-2: Session cap uses ID-only query + bulk UPDATE instead of loading full ORM objects and flipping booleans individually Perf-3: Remove passkey_count from /auth/status hot path (polled every 15s). Use EXISTS for has_passkeys boolean. Count derived from passkeys list query in PasskeySection (passkeys.length). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
241 lines
7.8 KiB
Python
241 lines
7.8 KiB
Python
"""
|
|
Passkey (WebAuthn/FIDO2) service.
|
|
|
|
Handles challenge token creation/verification (itsdangerous + nonce replay protection)
|
|
and wraps py_webauthn library calls for registration and authentication ceremonies.
|
|
"""
|
|
import base64
|
|
import json
|
|
import logging
|
|
import secrets
|
|
import time
|
|
import threading
|
|
|
|
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
|
|
|
|
from webauthn import (
|
|
generate_registration_options,
|
|
verify_registration_response,
|
|
generate_authentication_options,
|
|
verify_authentication_response,
|
|
options_to_json,
|
|
)
|
|
from webauthn.helpers.structs import (
|
|
PublicKeyCredentialDescriptor,
|
|
AuthenticatorSelectionCriteria,
|
|
AuthenticatorTransport,
|
|
ResidentKeyRequirement,
|
|
UserVerificationRequirement,
|
|
AttestationConveyancePreference,
|
|
)
|
|
from webauthn.helpers import (
|
|
bytes_to_base64url,
|
|
base64url_to_bytes,
|
|
parse_registration_credential_json,
|
|
parse_authentication_credential_json,
|
|
)
|
|
|
|
from app.config import settings as app_settings
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Credential JSON helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def extract_credential_raw_id(credential_json: str) -> str | None:
|
|
"""Extract the base64url-encoded rawId from a WebAuthn credential JSON string.
|
|
|
|
Returns None if parsing fails.
|
|
"""
|
|
try:
|
|
cred_data = json.loads(credential_json)
|
|
return cred_data.get("rawId") or cred_data.get("id") or None
|
|
except (json.JSONDecodeError, KeyError, TypeError):
|
|
return None
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Challenge token management (itsdangerous + nonce replay protection V-01)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_challenge_serializer = URLSafeTimedSerializer(
|
|
secret_key=app_settings.SECRET_KEY,
|
|
salt="webauthn-challenge-v1",
|
|
)
|
|
|
|
# Thread-safe nonce cache for single-use enforcement.
|
|
# Keys: nonce string, Values: expiry timestamp.
|
|
# NOTE: This is process-local. If scaling to multiple uvicorn workers,
|
|
# move nonce tracking to Redis or a DB table with unique constraint.
|
|
# Current deployment: single worker (Dockerfile --workers 1).
|
|
_used_nonces: dict[str, float] = {}
|
|
_nonce_lock = threading.Lock()
|
|
|
|
|
|
def create_challenge_token(challenge: bytes, user_id: int | None = None) -> str:
|
|
"""Sign challenge + nonce + optional user_id. Returns opaque token string."""
|
|
nonce = secrets.token_urlsafe(16)
|
|
payload = {
|
|
"ch": base64.b64encode(challenge).decode(),
|
|
"n": nonce,
|
|
}
|
|
if user_id is not None:
|
|
payload["uid"] = user_id
|
|
return _challenge_serializer.dumps(payload)
|
|
|
|
|
|
def verify_challenge_token(token: str, expected_user_id: int | None = None) -> bytes | None:
|
|
"""Verify token (TTL from config), enforce single-use via nonce.
|
|
|
|
If expected_user_id provided, cross-check user binding (for registration).
|
|
Returns challenge bytes or None on failure.
|
|
"""
|
|
try:
|
|
data = _challenge_serializer.loads(
|
|
token, max_age=app_settings.WEBAUTHN_CHALLENGE_TTL
|
|
)
|
|
except (BadSignature, SignatureExpired):
|
|
return None
|
|
|
|
nonce = data.get("n")
|
|
if not nonce:
|
|
return None
|
|
|
|
now = time.time()
|
|
with _nonce_lock:
|
|
# Lazy cleanup of expired nonces
|
|
expired = [k for k, v in _used_nonces.items() if v <= now]
|
|
for k in expired:
|
|
del _used_nonces[k]
|
|
|
|
# Check for replay
|
|
if nonce in _used_nonces:
|
|
return None
|
|
|
|
# Mark nonce as used
|
|
_used_nonces[nonce] = now + app_settings.WEBAUTHN_CHALLENGE_TTL
|
|
|
|
# Cross-check user binding for registration tokens
|
|
if expected_user_id is not None:
|
|
if data.get("uid") != expected_user_id:
|
|
return None
|
|
|
|
return base64.b64decode(data["ch"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# py_webauthn wrappers
|
|
# All synchronous — ECDSA P-256 verification is ~0.1ms, faster than executor overhead.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_registration_options(
|
|
user_id: int,
|
|
username: str,
|
|
existing_credential_ids: list[bytes],
|
|
) -> tuple[str, bytes]:
|
|
"""Generate WebAuthn registration options.
|
|
|
|
Returns (options_json_str, challenge_bytes).
|
|
"""
|
|
exclude_credentials = [
|
|
PublicKeyCredentialDescriptor(id=cid)
|
|
for cid in existing_credential_ids
|
|
]
|
|
|
|
options = generate_registration_options(
|
|
rp_id=app_settings.WEBAUTHN_RP_ID,
|
|
rp_name=app_settings.WEBAUTHN_RP_NAME,
|
|
user_id=str(user_id).encode(),
|
|
user_name=username,
|
|
attestation=AttestationConveyancePreference.NONE,
|
|
authenticator_selection=AuthenticatorSelectionCriteria(
|
|
resident_key=ResidentKeyRequirement.PREFERRED,
|
|
user_verification=UserVerificationRequirement.PREFERRED,
|
|
),
|
|
exclude_credentials=exclude_credentials,
|
|
timeout=60000,
|
|
)
|
|
|
|
options_json = options_to_json(options)
|
|
return options_json, options.challenge
|
|
|
|
|
|
def verify_registration(
|
|
credential_json: str,
|
|
challenge: bytes,
|
|
) -> "VerifiedRegistration":
|
|
"""Verify a registration response from the browser.
|
|
|
|
Returns VerifiedRegistration on success, raises on failure.
|
|
"""
|
|
credential = parse_registration_credential_json(credential_json)
|
|
return verify_registration_response(
|
|
credential=credential,
|
|
expected_challenge=challenge,
|
|
expected_rp_id=app_settings.WEBAUTHN_RP_ID,
|
|
expected_origin=app_settings.WEBAUTHN_ORIGIN,
|
|
require_user_verification=False,
|
|
)
|
|
|
|
|
|
def build_authentication_options(
|
|
credential_ids_and_transports: list[tuple[bytes, list[str] | None]] | None = None,
|
|
) -> tuple[str, bytes]:
|
|
"""Generate WebAuthn authentication options.
|
|
|
|
If credential_ids_and_transports provided, includes allowCredentials.
|
|
Otherwise, allows discoverable credential flow.
|
|
Returns (options_json_str, challenge_bytes).
|
|
"""
|
|
allow_credentials = None
|
|
if credential_ids_and_transports:
|
|
allow_credentials = []
|
|
for cid, transports in credential_ids_and_transports:
|
|
transport_list = None
|
|
if transports:
|
|
transport_list = [
|
|
AuthenticatorTransport(t)
|
|
for t in transports
|
|
if t in [e.value for e in AuthenticatorTransport]
|
|
]
|
|
allow_credentials.append(
|
|
PublicKeyCredentialDescriptor(
|
|
id=cid,
|
|
transports=transport_list or None,
|
|
)
|
|
)
|
|
|
|
options = generate_authentication_options(
|
|
rp_id=app_settings.WEBAUTHN_RP_ID,
|
|
allow_credentials=allow_credentials,
|
|
user_verification=UserVerificationRequirement.PREFERRED,
|
|
timeout=60000,
|
|
)
|
|
|
|
options_json = options_to_json(options)
|
|
return options_json, options.challenge
|
|
|
|
|
|
def verify_authentication(
|
|
credential_json: str,
|
|
challenge: bytes,
|
|
credential_public_key: bytes,
|
|
credential_current_sign_count: int,
|
|
) -> "VerifiedAuthentication":
|
|
"""Verify an authentication response from the browser.
|
|
|
|
Returns VerifiedAuthentication on success, raises on failure.
|
|
Sign count anomalies are NOT hard-failed — caller should log and continue.
|
|
"""
|
|
credential = parse_authentication_credential_json(credential_json)
|
|
return verify_authentication_response(
|
|
credential=credential,
|
|
expected_challenge=challenge,
|
|
expected_rp_id=app_settings.WEBAUTHN_RP_ID,
|
|
expected_origin=app_settings.WEBAUTHN_ORIGIN,
|
|
credential_public_key=credential_public_key,
|
|
credential_current_sign_count=credential_current_sign_count,
|
|
require_user_verification=False,
|
|
)
|