UMBRA/backend/app/services/passkey.py
Kyle Pope ab84c7bc53 Fix review findings: transaction atomicity, perf, and UI polish
Backend fixes:
- session.py: record_failed/successful_login use flush() not commit()
  — callers own transaction boundary (BUG-2 atomicity fix)
- auth.py: Add explicit commits after record_failed_login where callers
  raise immediately; add commit before TOTP mfa_token return path
- passkeys.py: JOIN credential+user lookup in login/complete (W-1 perf)
- passkeys.py: Move mfa_enforce_pending clear before main commit (S-2)
- passkeys.py: Add Path(ge=1, le=2147483647) on DELETE endpoint (BUG-3)
- auth.py: Switch has_passkeys from COUNT to EXISTS with LIMIT 1 (W-2)
- passkey.py: Add single-worker nonce cache comment (H-1)

Frontend fixes:
- PasskeySection: emerald→green badge colors (W-3 palette)
- PasskeySection: text-[11px]/text-[10px]→text-xs (W-4 a11y minimum)
- PasskeySection: Scope deleteMutation.isPending to per-item (W-5)
- nginx.conf: Permissions-Policy publickey-credentials use (self) (H-2)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 22:59:59 +08:00

224 lines
7.2 KiB
Python

"""
Passkey (WebAuthn/FIDO2) service.
Handles challenge token creation/verification (itsdangerous + nonce replay protection)
and wraps py_webauthn library calls for registration and authentication ceremonies.
"""
import base64
import json
import logging
import secrets
import time
import threading
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
from webauthn import (
generate_registration_options,
verify_registration_response,
generate_authentication_options,
verify_authentication_response,
options_to_json,
)
from webauthn.helpers.structs import (
PublicKeyCredentialDescriptor,
AuthenticatorSelectionCriteria,
AuthenticatorTransport,
ResidentKeyRequirement,
UserVerificationRequirement,
AttestationConveyancePreference,
)
from webauthn.helpers import bytes_to_base64url, base64url_to_bytes
from app.config import settings as app_settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Challenge token management (itsdangerous + nonce replay protection V-01)
# ---------------------------------------------------------------------------
_challenge_serializer = URLSafeTimedSerializer(
secret_key=app_settings.SECRET_KEY,
salt="webauthn-challenge-v1",
)
# Thread-safe nonce cache for single-use enforcement.
# Keys: nonce string, Values: expiry timestamp.
# NOTE: This is process-local. If scaling to multiple uvicorn workers,
# move nonce tracking to Redis or a DB table with unique constraint.
# Current deployment: single worker (Dockerfile --workers 1).
_used_nonces: dict[str, float] = {}
_nonce_lock = threading.Lock()
def create_challenge_token(challenge: bytes, user_id: int | None = None) -> str:
"""Sign challenge + nonce + optional user_id. Returns opaque token string."""
nonce = secrets.token_urlsafe(16)
payload = {
"ch": base64.b64encode(challenge).decode(),
"n": nonce,
}
if user_id is not None:
payload["uid"] = user_id
return _challenge_serializer.dumps(payload)
def verify_challenge_token(token: str, expected_user_id: int | None = None) -> bytes | None:
"""Verify token (TTL from config), enforce single-use via nonce.
If expected_user_id provided, cross-check user binding (for registration).
Returns challenge bytes or None on failure.
"""
try:
data = _challenge_serializer.loads(
token, max_age=app_settings.WEBAUTHN_CHALLENGE_TTL
)
except (BadSignature, SignatureExpired):
return None
nonce = data.get("n")
if not nonce:
return None
now = time.time()
with _nonce_lock:
# Lazy cleanup of expired nonces
expired = [k for k, v in _used_nonces.items() if v <= now]
for k in expired:
del _used_nonces[k]
# Check for replay
if nonce in _used_nonces:
return None
# Mark nonce as used
_used_nonces[nonce] = now + app_settings.WEBAUTHN_CHALLENGE_TTL
# Cross-check user binding for registration tokens
if expected_user_id is not None:
if data.get("uid") != expected_user_id:
return None
return base64.b64decode(data["ch"])
# ---------------------------------------------------------------------------
# py_webauthn wrappers
# All synchronous — ECDSA P-256 verification is ~0.1ms, faster than executor overhead.
# ---------------------------------------------------------------------------
def build_registration_options(
user_id: int,
username: str,
existing_credential_ids: list[bytes],
) -> tuple[str, bytes]:
"""Generate WebAuthn registration options.
Returns (options_json_str, challenge_bytes).
"""
exclude_credentials = [
PublicKeyCredentialDescriptor(id=cid)
for cid in existing_credential_ids
]
options = generate_registration_options(
rp_id=app_settings.WEBAUTHN_RP_ID,
rp_name=app_settings.WEBAUTHN_RP_NAME,
user_id=str(user_id).encode(),
user_name=username,
attestation=AttestationConveyancePreference.NONE,
authenticator_selection=AuthenticatorSelectionCriteria(
resident_key=ResidentKeyRequirement.PREFERRED,
user_verification=UserVerificationRequirement.PREFERRED,
),
exclude_credentials=exclude_credentials,
timeout=60000,
)
options_json = options_to_json(options)
return options_json, options.challenge
def verify_registration(
credential_json: str,
challenge: bytes,
) -> "VerifiedRegistration":
"""Verify a registration response from the browser.
Returns VerifiedRegistration on success, raises on failure.
"""
from webauthn.helpers.structs import RegistrationCredential
credential = RegistrationCredential.model_validate_json(credential_json)
return verify_registration_response(
credential=credential,
expected_challenge=challenge,
expected_rp_id=app_settings.WEBAUTHN_RP_ID,
expected_origin=app_settings.WEBAUTHN_ORIGIN,
require_user_verification=False,
)
def build_authentication_options(
credential_ids_and_transports: list[tuple[bytes, list[str] | None]] | None = None,
) -> tuple[str, bytes]:
"""Generate WebAuthn authentication options.
If credential_ids_and_transports provided, includes allowCredentials.
Otherwise, allows discoverable credential flow.
Returns (options_json_str, challenge_bytes).
"""
allow_credentials = None
if credential_ids_and_transports:
allow_credentials = []
for cid, transports in credential_ids_and_transports:
transport_list = None
if transports:
transport_list = [
AuthenticatorTransport(t)
for t in transports
if t in [e.value for e in AuthenticatorTransport]
]
allow_credentials.append(
PublicKeyCredentialDescriptor(
id=cid,
transports=transport_list or None,
)
)
options = generate_authentication_options(
rp_id=app_settings.WEBAUTHN_RP_ID,
allow_credentials=allow_credentials,
user_verification=UserVerificationRequirement.PREFERRED,
timeout=60000,
)
options_json = options_to_json(options)
return options_json, options.challenge
def verify_authentication(
credential_json: str,
challenge: bytes,
credential_public_key: bytes,
credential_current_sign_count: int,
) -> "VerifiedAuthentication":
"""Verify an authentication response from the browser.
Returns VerifiedAuthentication on success, raises on failure.
Sign count anomalies are NOT hard-failed — caller should log and continue.
"""
from webauthn.helpers.structs import AuthenticationCredential
credential = AuthenticationCredential.model_validate_json(credential_json)
return verify_authentication_response(
credential=credential,
expected_challenge=challenge,
expected_rp_id=app_settings.WEBAUTHN_RP_ID,
expected_origin=app_settings.WEBAUTHN_ORIGIN,
credential_public_key=credential_public_key,
credential_current_sign_count=credential_current_sign_count,
require_user_verification=False,
)