UMBRA/backend/app/routers/passkeys.py
Kyle Pope 9234880648 Fix SyntaxError: reorder delete_passkey params
Move `request: Request` (no default) before parameters with defaults
to fix 'parameter without default follows parameter with default'.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 23:05:31 +08:00

444 lines
16 KiB
Python

"""
Passkey (WebAuthn/FIDO2) router.
Endpoints (all under /api/auth/passkeys — registered in main.py):
POST /register/begin — Start passkey registration (auth + password required)
POST /register/complete — Complete registration ceremony (auth required)
POST /login/begin — Start passkey authentication (public, CSRF-exempt)
POST /login/complete — Complete authentication ceremony (public, CSRF-exempt)
GET / — List registered passkeys (auth required)
DELETE /{id} — Remove a passkey (auth + password required)
Security:
- Challenge tokens signed with itsdangerous (60s TTL, single-use nonce)
- Registration binds challenge to user_id, validated on complete (S-01)
- Registration requires password re-entry (V-02)
- Generic 401 on all auth failures (no credential enumeration)
- Constant-time response on login/begin (V-03)
- Failed passkey logins increment shared lockout counter
- Passkey login bypasses TOTP (passkey IS 2FA)
"""
import asyncio
import json
import logging
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Path, Request, Response
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.database import get_db
from app.models.user import User
from app.models.passkey_credential import PasskeyCredential
from app.routers.auth import get_current_user
from app.services.audit import get_client_ip, log_audit_event
from app.services.auth import averify_password_with_upgrade
from app.services.session import (
create_db_session,
set_session_cookie,
check_account_lockout,
record_failed_login,
record_successful_login,
)
from app.services.passkey import (
create_challenge_token,
verify_challenge_token,
build_registration_options,
verify_registration as verify_registration_response_svc,
build_authentication_options,
verify_authentication as verify_authentication_response_svc,
)
from webauthn.helpers import bytes_to_base64url, base64url_to_bytes
logger = logging.getLogger(__name__)
router = APIRouter()
# ---------------------------------------------------------------------------
# Request/Response schemas
# ---------------------------------------------------------------------------
class PasskeyRegisterBeginRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
password: str = Field(max_length=128)
class PasskeyRegisterCompleteRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
credential: str = Field(max_length=8192)
challenge_token: str = Field(max_length=2048)
name: str = Field(min_length=1, max_length=100)
class PasskeyLoginBeginRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
username: str | None = Field(None, max_length=50)
class PasskeyLoginCompleteRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
credential: str = Field(max_length=8192)
challenge_token: str = Field(max_length=2048)
class PasskeyDeleteRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
password: str = Field(max_length=128)
# ---------------------------------------------------------------------------
# Registration endpoints (authenticated)
# ---------------------------------------------------------------------------
@router.post("/register/begin")
async def passkey_register_begin(
data: PasskeyRegisterBeginRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Start passkey registration. Requires password re-entry (V-02)."""
# V-02: Verify password before allowing registration
valid, new_hash = await averify_password_with_upgrade(
data.password, current_user.password_hash
)
if not valid:
raise HTTPException(status_code=401, detail="Invalid password")
if new_hash:
current_user.password_hash = new_hash
await db.commit()
# Load existing credential IDs for exclusion
result = await db.execute(
select(PasskeyCredential.credential_id).where(
PasskeyCredential.user_id == current_user.id
)
)
existing_ids = [
base64url_to_bytes(row[0]) for row in result.all()
]
options_json, challenge = build_registration_options(
user_id=current_user.id,
username=current_user.username,
existing_credential_ids=existing_ids,
)
token = create_challenge_token(challenge, user_id=current_user.id)
return {
"options": json.loads(options_json),
"challenge_token": token,
}
@router.post("/register/complete")
async def passkey_register_complete(
data: PasskeyRegisterCompleteRequest,
request: Request,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Complete passkey registration ceremony."""
# Verify challenge token — cross-check user binding (S-01) + single-use nonce (V-01)
challenge = verify_challenge_token(
data.challenge_token, expected_user_id=current_user.id
)
if challenge is None:
raise HTTPException(status_code=401, detail="Invalid or expired challenge")
try:
verified = verify_registration_response_svc(
credential_json=data.credential,
challenge=challenge,
)
except Exception as e:
logger.warning("Passkey registration verification failed: %s", e)
raise HTTPException(status_code=400, detail="Registration verification failed")
# Store credential
credential_id_b64 = bytes_to_base64url(verified.credential_id)
# Check for duplicate (race condition safety)
existing = await db.execute(
select(PasskeyCredential).where(
PasskeyCredential.credential_id == credential_id_b64
)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=409, detail="Credential already registered")
# Extract transport hints if available
transports_json = None
if hasattr(verified, 'credential_device_type'):
pass # py_webauthn doesn't expose transports on VerifiedRegistration
# Transports come from the browser response — parse from credential JSON
try:
cred_data = json.loads(data.credential)
if "response" in cred_data and "transports" in cred_data["response"]:
transports_json = json.dumps(cred_data["response"]["transports"])
except (json.JSONDecodeError, KeyError):
pass
# Determine backup state from py_webauthn flags
backed_up = getattr(verified, 'credential_backed_up', False)
new_credential = PasskeyCredential(
user_id=current_user.id,
credential_id=credential_id_b64,
public_key=bytes_to_base64url(verified.credential_public_key),
sign_count=verified.sign_count,
name=data.name,
transports=transports_json,
backed_up=backed_up,
)
db.add(new_credential)
# B-02: If user has mfa_enforce_pending, clear it (passkey = MFA)
if current_user.mfa_enforce_pending:
current_user.mfa_enforce_pending = False
# Extract response data BEFORE commit (ORM expiry rule)
response_data = {
"id": None, # will be set after flush
"name": new_credential.name,
"created_at": None,
"backed_up": backed_up,
}
await db.flush()
response_data["id"] = new_credential.id
response_data["created_at"] = str(new_credential.created_at) if new_credential.created_at else None
await log_audit_event(
db, action="passkey.registered", actor_id=current_user.id,
detail={"credential_name": data.name},
ip=get_client_ip(request),
)
await db.commit()
return response_data
# ---------------------------------------------------------------------------
# Authentication endpoints (unauthenticated — CSRF-exempt)
# ---------------------------------------------------------------------------
@router.post("/login/begin")
async def passkey_login_begin(
data: PasskeyLoginBeginRequest,
db: AsyncSession = Depends(get_db),
):
"""Start passkey authentication. CSRF-exempt, public endpoint."""
credential_data = None
if data.username:
# Look up user's credentials for allowCredentials
result = await db.execute(
select(User).where(User.username == data.username.lower().strip())
)
user = result.scalar_one_or_none()
if user:
cred_result = await db.execute(
select(
PasskeyCredential.credential_id,
PasskeyCredential.transports,
).where(PasskeyCredential.user_id == user.id)
)
rows = cred_result.all()
if rows:
credential_data = []
for row in rows:
cid_bytes = base64url_to_bytes(row[0])
transports = json.loads(row[1]) if row[1] else None
credential_data.append((cid_bytes, transports))
# V-03: Generate options regardless of whether user exists or has passkeys.
# Identical response shape prevents timing enumeration.
options_json, challenge = build_authentication_options(
credential_ids_and_transports=credential_data,
)
token = create_challenge_token(challenge)
return {
"options": json.loads(options_json),
"challenge_token": token,
}
@router.post("/login/complete")
async def passkey_login_complete(
data: PasskeyLoginCompleteRequest,
request: Request,
response: Response,
db: AsyncSession = Depends(get_db),
):
"""Complete passkey authentication. CSRF-exempt, public endpoint."""
# Verify challenge token (60s TTL, single-use nonce V-01)
challenge = verify_challenge_token(data.challenge_token)
if challenge is None:
raise HTTPException(status_code=401, detail="Authentication failed")
# Parse credential_id from browser response
try:
cred_data = json.loads(data.credential)
raw_id_b64 = cred_data.get("rawId") or cred_data.get("id", "")
except (json.JSONDecodeError, KeyError):
raise HTTPException(status_code=401, detail="Authentication failed")
# Look up credential + user in a single JOIN query (W-1 perf fix)
result = await db.execute(
select(PasskeyCredential, User)
.join(User, User.id == PasskeyCredential.user_id)
.where(PasskeyCredential.credential_id == raw_id_b64)
)
row = result.one_or_none()
if not row:
raise HTTPException(status_code=401, detail="Authentication failed")
credential, user = row.tuple()
# Check account lockout (C-03)
await check_account_lockout(user)
# Check active status (C-03)
if not user.is_active:
raise HTTPException(status_code=401, detail="Authentication failed")
# Verify the authentication response
try:
verified = verify_authentication_response_svc(
credential_json=data.credential,
challenge=challenge,
credential_public_key=base64url_to_bytes(credential.public_key),
credential_current_sign_count=credential.sign_count,
)
except Exception as e:
logger.warning("Passkey authentication verification failed for user %s: %s", user.id, e)
# Increment failed login counter (shared with password auth)
await record_failed_login(db, user)
await log_audit_event(
db, action="passkey.login_failed", actor_id=user.id,
detail={"reason": "verification_failed"},
ip=get_client_ip(request),
)
await db.commit()
raise HTTPException(status_code=401, detail="Authentication failed")
# Update sign count (log anomaly but don't fail — S-05)
new_sign_count = verified.new_sign_count
if new_sign_count < credential.sign_count and credential.sign_count > 0:
logger.warning(
"Sign count anomaly for user %s credential %s: expected >= %d, got %d",
user.id, credential.id, credential.sign_count, new_sign_count,
)
await log_audit_event(
db, action="passkey.sign_count_anomaly", actor_id=user.id,
detail={
"credential_id": credential.id,
"expected": credential.sign_count,
"received": new_sign_count,
},
ip=get_client_ip(request),
)
credential.sign_count = new_sign_count
credential.last_used_at = datetime.now()
# Record successful login
await record_successful_login(db, user)
# Create session (shared service — enforces session cap)
client_ip = get_client_ip(request)
user_agent = request.headers.get("user-agent")
_, token = await create_db_session(db, user, client_ip, user_agent)
set_session_cookie(response, token)
# B-01: Handle must_change_password and mfa_enforce_pending flags
result_data: dict = {"authenticated": True}
if user.must_change_password:
result_data["must_change_password"] = True
# Passkey satisfies MFA — if mfa_enforce_pending, clear it (before commit)
if user.mfa_enforce_pending:
user.mfa_enforce_pending = False
await log_audit_event(
db, action="passkey.login_success", actor_id=user.id,
detail={"credential_name": credential.name},
ip=client_ip,
)
await db.commit()
return result_data
# ---------------------------------------------------------------------------
# Management endpoints (authenticated)
# ---------------------------------------------------------------------------
@router.get("/")
async def list_passkeys(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all passkeys for the current user."""
result = await db.execute(
select(PasskeyCredential)
.where(PasskeyCredential.user_id == current_user.id)
.order_by(PasskeyCredential.created_at.desc())
)
credentials = result.scalars().all()
return [
{
"id": c.id,
"name": c.name,
"created_at": str(c.created_at) if c.created_at else None,
"last_used_at": str(c.last_used_at) if c.last_used_at else None,
"backed_up": c.backed_up,
}
for c in credentials
]
@router.delete("/{credential_id}")
async def delete_passkey(
request: Request,
credential_id: int = Path(ge=1, le=2147483647),
data: PasskeyDeleteRequest = ...,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete a passkey. Requires password confirmation (S-06)."""
# Verify password
valid, new_hash = await averify_password_with_upgrade(
data.password, current_user.password_hash
)
if not valid:
raise HTTPException(status_code=401, detail="Invalid password")
if new_hash:
current_user.password_hash = new_hash
# Look up credential — verify ownership (IDOR prevention)
result = await db.execute(
select(PasskeyCredential).where(
PasskeyCredential.id == credential_id,
PasskeyCredential.user_id == current_user.id,
)
)
credential = result.scalar_one_or_none()
if not credential:
raise HTTPException(status_code=404, detail="Passkey not found")
cred_name = credential.name
await db.delete(credential)
await log_audit_event(
db, action="passkey.deleted", actor_id=current_user.id,
detail={"credential_name": cred_name, "credential_db_id": credential_id},
ip=get_client_ip(request),
)
await db.commit()
return {"message": "Passkey removed"}