""" Passkey (WebAuthn/FIDO2) router. Endpoints (all under /api/auth/passkeys — registered in main.py): POST /register/begin — Start passkey registration (auth + password required) POST /register/complete — Complete registration ceremony (auth required) POST /login/begin — Start passkey authentication (public, CSRF-exempt) POST /login/complete — Complete authentication ceremony (public, CSRF-exempt) GET / — List registered passkeys (auth required) DELETE /{id} — Remove a passkey (auth + password required) Security: - Challenge tokens signed with itsdangerous (60s TTL, single-use nonce) - Registration binds challenge to user_id, validated on complete (S-01) - Registration requires password re-entry (V-02) - Generic 401 on all auth failures (no credential enumeration) - Constant-time response on login/begin (V-03) - Failed passkey logins increment shared lockout counter - Passkey login bypasses TOTP (passkey IS 2FA) """ import asyncio import json import logging from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Path, Request, Response from pydantic import BaseModel, ConfigDict, Field from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.database import get_db from app.models.user import User from app.models.passkey_credential import PasskeyCredential from app.routers.auth import get_current_user from app.services.audit import get_client_ip, log_audit_event from app.services.auth import averify_password_with_upgrade from app.services.session import ( create_db_session, set_session_cookie, check_account_lockout, record_failed_login, record_successful_login, ) from app.services.passkey import ( create_challenge_token, verify_challenge_token, build_registration_options, verify_registration as verify_registration_response_svc, build_authentication_options, verify_authentication as verify_authentication_response_svc, ) from webauthn.helpers import bytes_to_base64url, base64url_to_bytes logger = logging.getLogger(__name__) router = APIRouter() # --------------------------------------------------------------------------- # Request/Response schemas # --------------------------------------------------------------------------- class PasskeyRegisterBeginRequest(BaseModel): model_config = ConfigDict(extra="forbid") password: str = Field(max_length=128) class PasskeyRegisterCompleteRequest(BaseModel): model_config = ConfigDict(extra="forbid") credential: str = Field(max_length=8192) challenge_token: str = Field(max_length=2048) name: str = Field(min_length=1, max_length=100) class PasskeyLoginBeginRequest(BaseModel): model_config = ConfigDict(extra="forbid") username: str | None = Field(None, max_length=50) class PasskeyLoginCompleteRequest(BaseModel): model_config = ConfigDict(extra="forbid") credential: str = Field(max_length=8192) challenge_token: str = Field(max_length=2048) class PasskeyDeleteRequest(BaseModel): model_config = ConfigDict(extra="forbid") password: str = Field(max_length=128) # --------------------------------------------------------------------------- # Registration endpoints (authenticated) # --------------------------------------------------------------------------- @router.post("/register/begin") async def passkey_register_begin( data: PasskeyRegisterBeginRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Start passkey registration. Requires password re-entry (V-02).""" # V-02: Verify password before allowing registration valid, new_hash = await averify_password_with_upgrade( data.password, current_user.password_hash ) if not valid: raise HTTPException(status_code=401, detail="Invalid password") if new_hash: current_user.password_hash = new_hash await db.commit() # Load existing credential IDs for exclusion result = await db.execute( select(PasskeyCredential.credential_id).where( PasskeyCredential.user_id == current_user.id ) ) existing_ids = [ base64url_to_bytes(row[0]) for row in result.all() ] options_json, challenge = build_registration_options( user_id=current_user.id, username=current_user.username, existing_credential_ids=existing_ids, ) token = create_challenge_token(challenge, user_id=current_user.id) return { "options": json.loads(options_json), "challenge_token": token, } @router.post("/register/complete") async def passkey_register_complete( data: PasskeyRegisterCompleteRequest, request: Request, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Complete passkey registration ceremony.""" # Verify challenge token — cross-check user binding (S-01) + single-use nonce (V-01) challenge = verify_challenge_token( data.challenge_token, expected_user_id=current_user.id ) if challenge is None: raise HTTPException(status_code=401, detail="Invalid or expired challenge") try: verified = verify_registration_response_svc( credential_json=data.credential, challenge=challenge, ) except Exception as e: logger.warning("Passkey registration verification failed: %s", e) raise HTTPException(status_code=400, detail="Registration verification failed") # Store credential credential_id_b64 = bytes_to_base64url(verified.credential_id) # Check for duplicate (race condition safety) existing = await db.execute( select(PasskeyCredential).where( PasskeyCredential.credential_id == credential_id_b64 ) ) if existing.scalar_one_or_none(): raise HTTPException(status_code=409, detail="Credential already registered") # Extract transport hints if available transports_json = None if hasattr(verified, 'credential_device_type'): pass # py_webauthn doesn't expose transports on VerifiedRegistration # Transports come from the browser response — parse from credential JSON try: cred_data = json.loads(data.credential) if "response" in cred_data and "transports" in cred_data["response"]: transports_json = json.dumps(cred_data["response"]["transports"]) except (json.JSONDecodeError, KeyError): pass # Determine backup state from py_webauthn flags backed_up = getattr(verified, 'credential_backed_up', False) new_credential = PasskeyCredential( user_id=current_user.id, credential_id=credential_id_b64, public_key=bytes_to_base64url(verified.credential_public_key), sign_count=verified.sign_count, name=data.name, transports=transports_json, backed_up=backed_up, ) db.add(new_credential) # B-02: If user has mfa_enforce_pending, clear it (passkey = MFA) if current_user.mfa_enforce_pending: current_user.mfa_enforce_pending = False # Extract response data BEFORE commit (ORM expiry rule) response_data = { "id": None, # will be set after flush "name": new_credential.name, "created_at": None, "backed_up": backed_up, } await db.flush() response_data["id"] = new_credential.id response_data["created_at"] = str(new_credential.created_at) if new_credential.created_at else None await log_audit_event( db, action="passkey.registered", actor_id=current_user.id, detail={"credential_name": data.name}, ip=get_client_ip(request), ) await db.commit() return response_data # --------------------------------------------------------------------------- # Authentication endpoints (unauthenticated — CSRF-exempt) # --------------------------------------------------------------------------- @router.post("/login/begin") async def passkey_login_begin( data: PasskeyLoginBeginRequest, db: AsyncSession = Depends(get_db), ): """Start passkey authentication. CSRF-exempt, public endpoint.""" credential_data = None if data.username: # Look up user's credentials for allowCredentials result = await db.execute( select(User).where(User.username == data.username.lower().strip()) ) user = result.scalar_one_or_none() if user: cred_result = await db.execute( select( PasskeyCredential.credential_id, PasskeyCredential.transports, ).where(PasskeyCredential.user_id == user.id) ) rows = cred_result.all() if rows: credential_data = [] for row in rows: cid_bytes = base64url_to_bytes(row[0]) transports = json.loads(row[1]) if row[1] else None credential_data.append((cid_bytes, transports)) # V-03: Generate options regardless of whether user exists or has passkeys. # Identical response shape prevents timing enumeration. options_json, challenge = build_authentication_options( credential_ids_and_transports=credential_data, ) token = create_challenge_token(challenge) return { "options": json.loads(options_json), "challenge_token": token, } @router.post("/login/complete") async def passkey_login_complete( data: PasskeyLoginCompleteRequest, request: Request, response: Response, db: AsyncSession = Depends(get_db), ): """Complete passkey authentication. CSRF-exempt, public endpoint.""" # Verify challenge token (60s TTL, single-use nonce V-01) challenge = verify_challenge_token(data.challenge_token) if challenge is None: raise HTTPException(status_code=401, detail="Authentication failed") # Parse credential_id from browser response try: cred_data = json.loads(data.credential) raw_id_b64 = cred_data.get("rawId") or cred_data.get("id", "") except (json.JSONDecodeError, KeyError): raise HTTPException(status_code=401, detail="Authentication failed") # Look up credential + user in a single JOIN query (W-1 perf fix) result = await db.execute( select(PasskeyCredential, User) .join(User, User.id == PasskeyCredential.user_id) .where(PasskeyCredential.credential_id == raw_id_b64) ) row = result.one_or_none() if not row: raise HTTPException(status_code=401, detail="Authentication failed") credential, user = row.tuple() # Check account lockout (C-03) await check_account_lockout(user) # Check active status (C-03) if not user.is_active: raise HTTPException(status_code=401, detail="Authentication failed") # Verify the authentication response try: verified = verify_authentication_response_svc( credential_json=data.credential, challenge=challenge, credential_public_key=base64url_to_bytes(credential.public_key), credential_current_sign_count=credential.sign_count, ) except Exception as e: logger.warning("Passkey authentication verification failed for user %s: %s", user.id, e) # Increment failed login counter (shared with password auth) await record_failed_login(db, user) await log_audit_event( db, action="passkey.login_failed", actor_id=user.id, detail={"reason": "verification_failed"}, ip=get_client_ip(request), ) await db.commit() raise HTTPException(status_code=401, detail="Authentication failed") # Update sign count (log anomaly but don't fail — S-05) new_sign_count = verified.new_sign_count if new_sign_count < credential.sign_count and credential.sign_count > 0: logger.warning( "Sign count anomaly for user %s credential %s: expected >= %d, got %d", user.id, credential.id, credential.sign_count, new_sign_count, ) await log_audit_event( db, action="passkey.sign_count_anomaly", actor_id=user.id, detail={ "credential_id": credential.id, "expected": credential.sign_count, "received": new_sign_count, }, ip=get_client_ip(request), ) credential.sign_count = new_sign_count credential.last_used_at = datetime.now() # Record successful login await record_successful_login(db, user) # Create session (shared service — enforces session cap) client_ip = get_client_ip(request) user_agent = request.headers.get("user-agent") _, token = await create_db_session(db, user, client_ip, user_agent) set_session_cookie(response, token) # B-01: Handle must_change_password and mfa_enforce_pending flags result_data: dict = {"authenticated": True} if user.must_change_password: result_data["must_change_password"] = True # Passkey satisfies MFA — if mfa_enforce_pending, clear it (before commit) if user.mfa_enforce_pending: user.mfa_enforce_pending = False await log_audit_event( db, action="passkey.login_success", actor_id=user.id, detail={"credential_name": credential.name}, ip=client_ip, ) await db.commit() return result_data # --------------------------------------------------------------------------- # Management endpoints (authenticated) # --------------------------------------------------------------------------- @router.get("/") async def list_passkeys( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """List all passkeys for the current user.""" result = await db.execute( select(PasskeyCredential) .where(PasskeyCredential.user_id == current_user.id) .order_by(PasskeyCredential.created_at.desc()) ) credentials = result.scalars().all() return [ { "id": c.id, "name": c.name, "created_at": str(c.created_at) if c.created_at else None, "last_used_at": str(c.last_used_at) if c.last_used_at else None, "backed_up": c.backed_up, } for c in credentials ] @router.delete("/{credential_id}") async def delete_passkey( request: Request, credential_id: int = Path(ge=1, le=2147483647), data: PasskeyDeleteRequest = ..., db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Delete a passkey. Requires password confirmation (S-06).""" # Verify password valid, new_hash = await averify_password_with_upgrade( data.password, current_user.password_hash ) if not valid: raise HTTPException(status_code=401, detail="Invalid password") if new_hash: current_user.password_hash = new_hash # Look up credential — verify ownership (IDOR prevention) result = await db.execute( select(PasskeyCredential).where( PasskeyCredential.id == credential_id, PasskeyCredential.user_id == current_user.id, ) ) credential = result.scalar_one_or_none() if not credential: raise HTTPException(status_code=404, detail="Passkey not found") cred_name = credential.name await db.delete(credential) await log_audit_event( db, action="passkey.deleted", actor_id=current_user.id, detail={"credential_name": cred_name, "credential_db_id": credential_id}, ip=get_client_ip(request), ) await db.commit() return {"message": "Passkey removed"}