New files: - models/passkey_credential.py: PasskeyCredential model with indexed credential_id - alembic 061: Create passkey_credentials table - services/passkey.py: Challenge token management (itsdangerous + nonce replay protection) and py_webauthn wrappers for registration/authentication - routers/passkeys.py: 6 endpoints (register begin/complete, login begin/complete, list, delete) with full security hardening Changes: - config.py: WEBAUTHN_RP_ID, RP_NAME, ORIGIN, CHALLENGE_TTL settings - main.py: Mount passkey router, add CSRF exemptions for login endpoints - auth.py: Add has_passkeys to /auth/status response - nginx.conf: Rate limiting on all passkey endpoints, Permissions-Policy updated for publickey-credentials-get/create - requirements.txt: Add webauthn>=2.1.0 Security: password re-entry for registration (V-02), single-use nonce challenges (V-01), constant-time login/begin (V-03), shared lockout counter, generic 401 errors, audit logging on all events. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
221 lines
7.0 KiB
Python
221 lines
7.0 KiB
Python
"""
|
|
Passkey (WebAuthn/FIDO2) service.
|
|
|
|
Handles challenge token creation/verification (itsdangerous + nonce replay protection)
|
|
and wraps py_webauthn library calls for registration and authentication ceremonies.
|
|
"""
|
|
import base64
|
|
import json
|
|
import logging
|
|
import secrets
|
|
import time
|
|
import threading
|
|
|
|
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
|
|
|
|
from webauthn import (
|
|
generate_registration_options,
|
|
verify_registration_response,
|
|
generate_authentication_options,
|
|
verify_authentication_response,
|
|
options_to_json,
|
|
)
|
|
from webauthn.helpers.structs import (
|
|
PublicKeyCredentialDescriptor,
|
|
AuthenticatorSelectionCriteria,
|
|
AuthenticatorTransport,
|
|
ResidentKeyRequirement,
|
|
UserVerificationRequirement,
|
|
AttestationConveyancePreference,
|
|
)
|
|
from webauthn.helpers import bytes_to_base64url, base64url_to_bytes
|
|
|
|
from app.config import settings as app_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Challenge token management (itsdangerous + nonce replay protection V-01)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_challenge_serializer = URLSafeTimedSerializer(
|
|
secret_key=app_settings.SECRET_KEY,
|
|
salt="webauthn-challenge-v1",
|
|
)
|
|
|
|
# Thread-safe nonce cache for single-use enforcement
|
|
# Keys: nonce string, Values: expiry timestamp
|
|
_used_nonces: dict[str, float] = {}
|
|
_nonce_lock = threading.Lock()
|
|
|
|
|
|
def create_challenge_token(challenge: bytes, user_id: int | None = None) -> str:
|
|
"""Sign challenge + nonce + optional user_id. Returns opaque token string."""
|
|
nonce = secrets.token_urlsafe(16)
|
|
payload = {
|
|
"ch": base64.b64encode(challenge).decode(),
|
|
"n": nonce,
|
|
}
|
|
if user_id is not None:
|
|
payload["uid"] = user_id
|
|
return _challenge_serializer.dumps(payload)
|
|
|
|
|
|
def verify_challenge_token(token: str, expected_user_id: int | None = None) -> bytes | None:
|
|
"""Verify token (TTL from config), enforce single-use via nonce.
|
|
|
|
If expected_user_id provided, cross-check user binding (for registration).
|
|
Returns challenge bytes or None on failure.
|
|
"""
|
|
try:
|
|
data = _challenge_serializer.loads(
|
|
token, max_age=app_settings.WEBAUTHN_CHALLENGE_TTL
|
|
)
|
|
except (BadSignature, SignatureExpired):
|
|
return None
|
|
|
|
nonce = data.get("n")
|
|
if not nonce:
|
|
return None
|
|
|
|
now = time.time()
|
|
with _nonce_lock:
|
|
# Lazy cleanup of expired nonces
|
|
expired = [k for k, v in _used_nonces.items() if v <= now]
|
|
for k in expired:
|
|
del _used_nonces[k]
|
|
|
|
# Check for replay
|
|
if nonce in _used_nonces:
|
|
return None
|
|
|
|
# Mark nonce as used
|
|
_used_nonces[nonce] = now + app_settings.WEBAUTHN_CHALLENGE_TTL
|
|
|
|
# Cross-check user binding for registration tokens
|
|
if expected_user_id is not None:
|
|
if data.get("uid") != expected_user_id:
|
|
return None
|
|
|
|
return base64.b64decode(data["ch"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# py_webauthn wrappers
|
|
# All synchronous — ECDSA P-256 verification is ~0.1ms, faster than executor overhead.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_registration_options(
|
|
user_id: int,
|
|
username: str,
|
|
existing_credential_ids: list[bytes],
|
|
) -> tuple[str, bytes]:
|
|
"""Generate WebAuthn registration options.
|
|
|
|
Returns (options_json_str, challenge_bytes).
|
|
"""
|
|
exclude_credentials = [
|
|
PublicKeyCredentialDescriptor(id=cid)
|
|
for cid in existing_credential_ids
|
|
]
|
|
|
|
options = generate_registration_options(
|
|
rp_id=app_settings.WEBAUTHN_RP_ID,
|
|
rp_name=app_settings.WEBAUTHN_RP_NAME,
|
|
user_id=str(user_id).encode(),
|
|
user_name=username,
|
|
attestation=AttestationConveyancePreference.NONE,
|
|
authenticator_selection=AuthenticatorSelectionCriteria(
|
|
resident_key=ResidentKeyRequirement.PREFERRED,
|
|
user_verification=UserVerificationRequirement.PREFERRED,
|
|
),
|
|
exclude_credentials=exclude_credentials,
|
|
timeout=60000,
|
|
)
|
|
|
|
options_json = options_to_json(options)
|
|
return options_json, options.challenge
|
|
|
|
|
|
def verify_registration(
|
|
credential_json: str,
|
|
challenge: bytes,
|
|
) -> "VerifiedRegistration":
|
|
"""Verify a registration response from the browser.
|
|
|
|
Returns VerifiedRegistration on success, raises on failure.
|
|
"""
|
|
from webauthn.helpers.structs import RegistrationCredential
|
|
|
|
credential = RegistrationCredential.model_validate_json(credential_json)
|
|
return verify_registration_response(
|
|
credential=credential,
|
|
expected_challenge=challenge,
|
|
expected_rp_id=app_settings.WEBAUTHN_RP_ID,
|
|
expected_origin=app_settings.WEBAUTHN_ORIGIN,
|
|
require_user_verification=False,
|
|
)
|
|
|
|
|
|
def build_authentication_options(
|
|
credential_ids_and_transports: list[tuple[bytes, list[str] | None]] | None = None,
|
|
) -> tuple[str, bytes]:
|
|
"""Generate WebAuthn authentication options.
|
|
|
|
If credential_ids_and_transports provided, includes allowCredentials.
|
|
Otherwise, allows discoverable credential flow.
|
|
Returns (options_json_str, challenge_bytes).
|
|
"""
|
|
allow_credentials = None
|
|
if credential_ids_and_transports:
|
|
allow_credentials = []
|
|
for cid, transports in credential_ids_and_transports:
|
|
transport_list = None
|
|
if transports:
|
|
transport_list = [
|
|
AuthenticatorTransport(t)
|
|
for t in transports
|
|
if t in [e.value for e in AuthenticatorTransport]
|
|
]
|
|
allow_credentials.append(
|
|
PublicKeyCredentialDescriptor(
|
|
id=cid,
|
|
transports=transport_list or None,
|
|
)
|
|
)
|
|
|
|
options = generate_authentication_options(
|
|
rp_id=app_settings.WEBAUTHN_RP_ID,
|
|
allow_credentials=allow_credentials,
|
|
user_verification=UserVerificationRequirement.PREFERRED,
|
|
timeout=60000,
|
|
)
|
|
|
|
options_json = options_to_json(options)
|
|
return options_json, options.challenge
|
|
|
|
|
|
def verify_authentication(
|
|
credential_json: str,
|
|
challenge: bytes,
|
|
credential_public_key: bytes,
|
|
credential_current_sign_count: int,
|
|
) -> "VerifiedAuthentication":
|
|
"""Verify an authentication response from the browser.
|
|
|
|
Returns VerifiedAuthentication on success, raises on failure.
|
|
Sign count anomalies are NOT hard-failed — caller should log and continue.
|
|
"""
|
|
from webauthn.helpers.structs import AuthenticationCredential
|
|
|
|
credential = AuthenticationCredential.model_validate_json(credential_json)
|
|
return verify_authentication_response(
|
|
credential=credential,
|
|
expected_challenge=challenge,
|
|
expected_rp_id=app_settings.WEBAUTHN_RP_ID,
|
|
expected_origin=app_settings.WEBAUTHN_ORIGIN,
|
|
credential_public_key=credential_public_key,
|
|
credential_current_sign_count=credential_current_sign_count,
|
|
require_user_verification=False,
|
|
)
|