UMBRA/backend/app/routers/passkeys.py
Kyle Pope ed98924716 Action remaining QA suggestions + performance optimizations
S-02: Extract extract_credential_raw_id() helper in services/passkey.py
  — replaces 2 inline rawId parsing blocks in passkeys.py
S-03: Add PasskeyLoginResponse type, use in useAuth passkeyLoginMutation
S-04: Add Cancel button to disable-passwordless dialog
W-03: Invalidate auth queries on disable ceremony error/cancel

Perf-2: Session cap uses ID-only query + bulk UPDATE instead of loading
  full ORM objects and flipping booleans individually
Perf-3: Remove passkey_count from /auth/status hot path (polled every
  15s). Use EXISTS for has_passkeys boolean. Count derived from passkeys
  list query in PasskeySection (passkeys.length).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 02:34:00 +08:00

676 lines
24 KiB
Python

"""
Passkey (WebAuthn/FIDO2) router.
Endpoints (all under /api/auth/passkeys — registered in main.py):
POST /register/begin — Start passkey registration (auth + password required)
POST /register/complete — Complete registration ceremony (auth required)
POST /login/begin — Start passkey authentication (public, CSRF-exempt)
POST /login/complete — Complete authentication ceremony (public, CSRF-exempt)
GET / — List registered passkeys (auth required)
DELETE /{id} — Remove a passkey (auth + password required)
Security:
- Challenge tokens signed with itsdangerous (60s TTL, single-use nonce)
- Registration binds challenge to user_id, validated on complete (S-01)
- Registration requires password re-entry (V-02)
- Generic 401 on all auth failures (no credential enumeration)
- Constant-time response on login/begin (V-03)
- Failed passkey logins increment shared lockout counter
- Passkey login bypasses TOTP (passkey IS 2FA)
"""
import asyncio
import json
import logging
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Path, Request, Response
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.database import get_db
from app.models.passkey_credential import PasskeyCredential
from app.models.system_config import SystemConfig
from app.models.user import User
from app.routers.auth import get_current_user
from app.services.audit import get_client_ip, log_audit_event
from app.services.auth import averify_password_with_upgrade, verify_session_token
from app.services.session import (
create_db_session,
set_session_cookie,
check_account_lockout,
record_failed_login,
record_successful_login,
)
from app.services.passkey import (
create_challenge_token,
verify_challenge_token,
build_registration_options,
verify_registration as verify_registration_response_svc,
build_authentication_options,
verify_authentication as verify_authentication_response_svc,
extract_credential_raw_id,
)
from app.models.session import UserSession
from webauthn.helpers import bytes_to_base64url, base64url_to_bytes
logger = logging.getLogger(__name__)
router = APIRouter()
# ---------------------------------------------------------------------------
# Request/Response schemas
# ---------------------------------------------------------------------------
class PasskeyRegisterBeginRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
password: str = Field(max_length=128)
class PasskeyRegisterCompleteRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
credential: str = Field(max_length=8192)
challenge_token: str = Field(max_length=2048)
name: str = Field(min_length=1, max_length=100)
class PasskeyLoginBeginRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
username: str | None = Field(None, max_length=50)
class PasskeyLoginCompleteRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
credential: str = Field(max_length=8192)
challenge_token: str = Field(max_length=2048)
unlock: bool = False
class PasskeyDeleteRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
password: str = Field(max_length=128)
class PasswordlessEnableRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
password: str = Field(max_length=128)
class PasswordlessDisableRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
credential: str = Field(max_length=8192)
challenge_token: str = Field(max_length=2048)
# ---------------------------------------------------------------------------
# Registration endpoints (authenticated)
# ---------------------------------------------------------------------------
@router.post("/register/begin")
async def passkey_register_begin(
data: PasskeyRegisterBeginRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Start passkey registration. Requires password re-entry (V-02)."""
# V-02: Verify password before allowing registration
valid, new_hash = await averify_password_with_upgrade(
data.password, current_user.password_hash
)
if not valid:
raise HTTPException(status_code=401, detail="Invalid password")
if new_hash:
current_user.password_hash = new_hash
await db.commit()
# Load existing credential IDs for exclusion
result = await db.execute(
select(PasskeyCredential.credential_id).where(
PasskeyCredential.user_id == current_user.id
)
)
existing_ids = [
base64url_to_bytes(row[0]) for row in result.all()
]
options_json, challenge = build_registration_options(
user_id=current_user.id,
username=current_user.username,
existing_credential_ids=existing_ids,
)
token = create_challenge_token(challenge, user_id=current_user.id)
return {
"options": json.loads(options_json),
"challenge_token": token,
}
@router.post("/register/complete")
async def passkey_register_complete(
data: PasskeyRegisterCompleteRequest,
request: Request,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Complete passkey registration ceremony."""
# Verify challenge token — cross-check user binding (S-01) + single-use nonce (V-01)
challenge = verify_challenge_token(
data.challenge_token, expected_user_id=current_user.id
)
if challenge is None:
raise HTTPException(status_code=401, detail="Invalid or expired challenge")
try:
verified = verify_registration_response_svc(
credential_json=data.credential,
challenge=challenge,
)
except Exception as e:
logger.warning("Passkey registration verification failed: %s", e)
raise HTTPException(status_code=400, detail="Registration verification failed")
# Store credential
credential_id_b64 = bytes_to_base64url(verified.credential_id)
# Check for duplicate (race condition safety)
existing = await db.execute(
select(PasskeyCredential).where(
PasskeyCredential.credential_id == credential_id_b64
)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=409, detail="Credential already registered")
# Extract transport hints if available
transports_json = None
if hasattr(verified, 'credential_device_type'):
pass # py_webauthn doesn't expose transports on VerifiedRegistration
# Transports come from the browser response — parse from credential JSON
try:
cred_data = json.loads(data.credential)
if "response" in cred_data and "transports" in cred_data["response"]:
transports_json = json.dumps(cred_data["response"]["transports"])
except (json.JSONDecodeError, KeyError):
pass
# Determine backup state from py_webauthn flags
backed_up = getattr(verified, 'credential_backed_up', False)
new_credential = PasskeyCredential(
user_id=current_user.id,
credential_id=credential_id_b64,
public_key=bytes_to_base64url(verified.credential_public_key),
sign_count=verified.sign_count,
name=data.name,
transports=transports_json,
backed_up=backed_up,
)
db.add(new_credential)
# B-02: If user has mfa_enforce_pending, clear it (passkey = MFA)
if current_user.mfa_enforce_pending:
current_user.mfa_enforce_pending = False
# Extract response data BEFORE commit (ORM expiry rule)
response_data = {
"id": None, # will be set after flush
"name": new_credential.name,
"created_at": None,
"backed_up": backed_up,
}
await db.flush()
response_data["id"] = new_credential.id
response_data["created_at"] = str(new_credential.created_at) if new_credential.created_at else None
await log_audit_event(
db, action="passkey.registered", actor_id=current_user.id,
detail={"credential_name": data.name},
ip=get_client_ip(request),
)
await db.commit()
return response_data
# ---------------------------------------------------------------------------
# Authentication endpoints (unauthenticated — CSRF-exempt)
# ---------------------------------------------------------------------------
@router.post("/login/begin")
async def passkey_login_begin(
data: PasskeyLoginBeginRequest,
db: AsyncSession = Depends(get_db),
):
"""Start passkey authentication. CSRF-exempt, public endpoint."""
credential_data = None
if data.username:
# Look up user's credentials for allowCredentials
result = await db.execute(
select(User).where(User.username == data.username.lower().strip())
)
user = result.scalar_one_or_none()
if user:
cred_result = await db.execute(
select(
PasskeyCredential.credential_id,
PasskeyCredential.transports,
).where(PasskeyCredential.user_id == user.id)
)
rows = cred_result.all()
if rows:
credential_data = []
for row in rows:
cid_bytes = base64url_to_bytes(row[0])
transports = json.loads(row[1]) if row[1] else None
credential_data.append((cid_bytes, transports))
else:
# F-01: User not found — run a no-op DB query to equalize timing with
# the credential fetch that executes for existing users. Without this,
# the absence of the second query makes the "no user" path measurably
# faster, leaking whether the username exists.
await db.execute(
select(PasskeyCredential.credential_id).where(
PasskeyCredential.user_id == 0
).limit(1)
)
# V-03: Generate options regardless of whether user exists or has passkeys.
# Identical response shape prevents timing enumeration.
options_json, challenge = build_authentication_options(
credential_ids_and_transports=credential_data,
)
token = create_challenge_token(challenge)
return {
"options": json.loads(options_json),
"challenge_token": token,
}
@router.post("/login/complete")
async def passkey_login_complete(
data: PasskeyLoginCompleteRequest,
request: Request,
response: Response,
db: AsyncSession = Depends(get_db),
):
"""Complete passkey authentication. CSRF-exempt, public endpoint."""
# Verify challenge token (60s TTL, single-use nonce V-01)
challenge = verify_challenge_token(data.challenge_token)
if challenge is None:
raise HTTPException(status_code=401, detail="Authentication failed")
# Parse credential_id from browser response (S-02: shared helper)
raw_id_b64 = extract_credential_raw_id(data.credential)
if not raw_id_b64:
raise HTTPException(status_code=401, detail="Authentication failed")
# Look up credential + user in a single JOIN query (W-1 perf fix)
result = await db.execute(
select(PasskeyCredential, User)
.join(User, User.id == PasskeyCredential.user_id)
.where(PasskeyCredential.credential_id == raw_id_b64)
)
row = result.one_or_none()
if not row:
raise HTTPException(status_code=401, detail="Authentication failed")
credential, user = row.tuple()
# Check account lockout (C-03)
await check_account_lockout(user)
# Check active status (C-03)
if not user.is_active:
raise HTTPException(status_code=401, detail="Authentication failed")
# Verify the authentication response
try:
verified = verify_authentication_response_svc(
credential_json=data.credential,
challenge=challenge,
credential_public_key=base64url_to_bytes(credential.public_key),
credential_current_sign_count=credential.sign_count,
)
except Exception as e:
logger.warning("Passkey authentication verification failed for user %s: %s", user.id, e)
# Increment failed login counter (shared with password auth)
remaining = await record_failed_login(db, user)
await log_audit_event(
db, action="passkey.login_failed", actor_id=user.id,
detail={"reason": "verification_failed", "attempts_remaining": remaining},
ip=get_client_ip(request),
)
await db.commit()
# Generic message for all failures — don't leak lockout state (C-02/F-02)
raise HTTPException(status_code=401, detail="Authentication failed")
# Update sign count (log anomaly but don't fail — S-05)
new_sign_count = verified.new_sign_count
if new_sign_count < credential.sign_count and credential.sign_count > 0:
logger.warning(
"Sign count anomaly for user %s credential %s: expected >= %d, got %d",
user.id, credential.id, credential.sign_count, new_sign_count,
)
await log_audit_event(
db, action="passkey.sign_count_anomaly", actor_id=user.id,
detail={
"credential_id": credential.id,
"expected": credential.sign_count,
"received": new_sign_count,
},
ip=get_client_ip(request),
)
credential.sign_count = new_sign_count
credential.last_used_at = datetime.now()
# Passkey unlock — re-authenticate into a locked session instead of creating a new one
if data.unlock:
session_cookie = request.cookies.get("session")
payload = verify_session_token(session_cookie) if session_cookie else None
if not payload or payload.get("uid") != user.id:
raise HTTPException(status_code=401, detail="Authentication failed")
sess_result = await db.execute(
select(UserSession).where(
UserSession.id == payload["sid"],
UserSession.user_id == user.id,
UserSession.revoked == False,
)
)
db_sess = sess_result.scalar_one_or_none()
if not db_sess:
raise HTTPException(status_code=401, detail="Authentication failed")
db_sess.is_locked = False
db_sess.locked_at = None
# Reset failed login counter on successful passkey unlock (W-02)
await record_successful_login(db, user)
await log_audit_event(
db, action="passkey.unlock_success", actor_id=user.id,
ip=get_client_ip(request),
)
await db.commit()
return {"unlocked": True}
# Record successful login
await record_successful_login(db, user)
# Create session (shared service — enforces session cap)
client_ip = get_client_ip(request)
user_agent = request.headers.get("user-agent")
_, token = await create_db_session(db, user, client_ip, user_agent)
set_session_cookie(response, token)
# Handle special flags for passkey login
result_data: dict = {"authenticated": True}
# W-05: Passkey login auto-clears must_change_password — user can't provide
# old password in the forced-change form since they authenticated via passkey.
if user.must_change_password:
user.must_change_password = False
# Passkey satisfies MFA — if mfa_enforce_pending, clear it (before commit)
if user.mfa_enforce_pending:
user.mfa_enforce_pending = False
await log_audit_event(
db, action="passkey.login_success", actor_id=user.id,
detail={"credential_name": credential.name},
ip=client_ip,
)
await db.commit()
return result_data
# ---------------------------------------------------------------------------
# Passwordless toggle endpoints (authenticated)
# ---------------------------------------------------------------------------
@router.put("/passwordless/enable")
async def passwordless_enable(
data: PasswordlessEnableRequest,
request: Request,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Enable passwordless login for the current user.
Requirements:
- System config must have allow_passwordless = True
- User must have >= 2 registered passkeys
- Password confirmation required
"""
# Verify password first
valid, new_hash = await averify_password_with_upgrade(
data.password, current_user.password_hash
)
if not valid:
raise HTTPException(status_code=401, detail="Invalid password")
if new_hash:
current_user.password_hash = new_hash
# Check system config
config_result = await db.execute(
select(SystemConfig).where(SystemConfig.id == 1)
)
config = config_result.scalar_one_or_none()
if not config or not config.allow_passwordless:
raise HTTPException(
status_code=403,
detail="Passwordless login is not enabled on this system",
)
# Require >= 2 passkeys as safety net (can't get locked out)
pk_count_result = await db.execute(
select(func.count()).select_from(PasskeyCredential).where(
PasskeyCredential.user_id == current_user.id
)
)
pk_count = pk_count_result.scalar_one()
if pk_count < 2:
raise HTTPException(
status_code=400,
detail="At least 2 passkeys must be registered before enabling passwordless login",
)
current_user.passwordless_enabled = True
await log_audit_event(
db, action="passkey.passwordless_enabled", actor_id=current_user.id,
ip=get_client_ip(request),
)
await db.commit()
return {"passwordless_enabled": True}
@router.post("/passwordless/disable/begin")
async def passwordless_disable_begin(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Begin the passkey authentication ceremony to disable passwordless login.
Returns challenge options for the browser to present to the authenticator.
"""
# Load user's credentials for allowCredentials
cred_result = await db.execute(
select(
PasskeyCredential.credential_id,
PasskeyCredential.transports,
).where(PasskeyCredential.user_id == current_user.id)
)
rows = cred_result.all()
credential_data = None
if rows:
credential_data = []
for row in rows:
cid_bytes = base64url_to_bytes(row[0])
transports = json.loads(row[1]) if row[1] else None
credential_data.append((cid_bytes, transports))
options_json, challenge = build_authentication_options(
credential_ids_and_transports=credential_data,
)
# Bind challenge to this user so complete endpoint can cross-check
token = create_challenge_token(challenge, user_id=current_user.id)
return {
"options": json.loads(options_json),
"challenge_token": token,
}
@router.put("/passwordless/disable")
async def passwordless_disable(
data: PasswordlessDisableRequest,
request: Request,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Complete passkey authentication to disable passwordless login.
Verifies the credential belongs to the current user.
"""
# Verify challenge token — user-bound (single-use nonce V-01, cross-user binding S-01)
challenge = verify_challenge_token(
data.challenge_token, expected_user_id=current_user.id
)
if challenge is None:
raise HTTPException(status_code=401, detail="Invalid or expired challenge")
# Parse rawId from credential (S-02: shared helper)
raw_id_b64 = extract_credential_raw_id(data.credential)
if not raw_id_b64:
raise HTTPException(status_code=401, detail="Authentication failed")
# Look up credential — verify ownership (IDOR prevention)
cred_result = await db.execute(
select(PasskeyCredential).where(
PasskeyCredential.credential_id == raw_id_b64,
PasskeyCredential.user_id == current_user.id,
)
)
credential = cred_result.scalar_one_or_none()
if not credential:
raise HTTPException(status_code=401, detail="Authentication failed")
# Verify the authentication response
try:
verified = verify_authentication_response_svc(
credential_json=data.credential,
challenge=challenge,
credential_public_key=base64url_to_bytes(credential.public_key),
credential_current_sign_count=credential.sign_count,
)
except Exception as e:
logger.warning(
"Passwordless disable: auth verification failed for user %s: %s",
current_user.id, e,
)
raise HTTPException(status_code=401, detail="Authentication failed")
# Update sign count
credential.sign_count = verified.new_sign_count
credential.last_used_at = datetime.now()
current_user.passwordless_enabled = False
await log_audit_event(
db, action="passkey.passwordless_disabled", actor_id=current_user.id,
ip=get_client_ip(request),
)
await db.commit()
return {"passwordless_enabled": False}
# ---------------------------------------------------------------------------
# Management endpoints (authenticated)
# ---------------------------------------------------------------------------
@router.get("/")
async def list_passkeys(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all passkeys for the current user."""
result = await db.execute(
select(PasskeyCredential)
.where(PasskeyCredential.user_id == current_user.id)
.order_by(PasskeyCredential.created_at.desc())
)
credentials = result.scalars().all()
return [
{
"id": c.id,
"name": c.name,
"created_at": str(c.created_at) if c.created_at else None,
"last_used_at": str(c.last_used_at) if c.last_used_at else None,
"backed_up": c.backed_up,
}
for c in credentials
]
@router.delete("/{credential_id}")
async def delete_passkey(
request: Request,
credential_id: int = Path(ge=1, le=2147483647),
data: PasskeyDeleteRequest = ...,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete a passkey. Requires password confirmation (S-06)."""
# Verify password
valid, new_hash = await averify_password_with_upgrade(
data.password, current_user.password_hash
)
if not valid:
raise HTTPException(status_code=401, detail="Invalid password")
if new_hash:
current_user.password_hash = new_hash
# Look up credential — verify ownership (IDOR prevention)
result = await db.execute(
select(PasskeyCredential).where(
PasskeyCredential.id == credential_id,
PasskeyCredential.user_id == current_user.id,
)
)
credential = result.scalar_one_or_none()
if not credential:
raise HTTPException(status_code=404, detail="Passkey not found")
# Guard: passwordless users must retain at least 2 passkeys
if current_user.passwordless_enabled:
pk_count_result = await db.execute(
select(func.count()).select_from(PasskeyCredential).where(
PasskeyCredential.user_id == current_user.id
)
)
pk_count = pk_count_result.scalar_one()
if pk_count <= 2:
raise HTTPException(
status_code=409,
detail="Cannot delete: passwordless requires at least 2 passkeys",
)
cred_name = credential.name
await db.delete(credential)
await log_audit_event(
db, action="passkey.deleted", actor_id=current_user.id,
detail={"credential_name": cred_name, "credential_db_id": credential_id},
ip=get_client_ip(request),
)
await db.commit()
return {"message": "Passkey removed"}