Backend fixes: - session.py: record_failed/successful_login use flush() not commit() — callers own transaction boundary (BUG-2 atomicity fix) - auth.py: Add explicit commits after record_failed_login where callers raise immediately; add commit before TOTP mfa_token return path - passkeys.py: JOIN credential+user lookup in login/complete (W-1 perf) - passkeys.py: Move mfa_enforce_pending clear before main commit (S-2) - passkeys.py: Add Path(ge=1, le=2147483647) on DELETE endpoint (BUG-3) - auth.py: Switch has_passkeys from COUNT to EXISTS with LIMIT 1 (W-2) - passkey.py: Add single-worker nonce cache comment (H-1) Frontend fixes: - PasskeySection: emerald→green badge colors (W-3 palette) - PasskeySection: text-[11px]/text-[10px]→text-xs (W-4 a11y minimum) - PasskeySection: Scope deleteMutation.isPending to per-item (W-5) - nginx.conf: Permissions-Policy publickey-credentials use (self) (H-2) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
444 lines
16 KiB
Python
444 lines
16 KiB
Python
"""
|
|
Passkey (WebAuthn/FIDO2) router.
|
|
|
|
Endpoints (all under /api/auth/passkeys — registered in main.py):
|
|
|
|
POST /register/begin — Start passkey registration (auth + password required)
|
|
POST /register/complete — Complete registration ceremony (auth required)
|
|
POST /login/begin — Start passkey authentication (public, CSRF-exempt)
|
|
POST /login/complete — Complete authentication ceremony (public, CSRF-exempt)
|
|
GET / — List registered passkeys (auth required)
|
|
DELETE /{id} — Remove a passkey (auth + password required)
|
|
|
|
Security:
|
|
- Challenge tokens signed with itsdangerous (60s TTL, single-use nonce)
|
|
- Registration binds challenge to user_id, validated on complete (S-01)
|
|
- Registration requires password re-entry (V-02)
|
|
- Generic 401 on all auth failures (no credential enumeration)
|
|
- Constant-time response on login/begin (V-03)
|
|
- Failed passkey logins increment shared lockout counter
|
|
- Passkey login bypasses TOTP (passkey IS 2FA)
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Path, Request, Response
|
|
from pydantic import BaseModel, ConfigDict, Field
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
|
|
from app.database import get_db
|
|
from app.models.user import User
|
|
from app.models.passkey_credential import PasskeyCredential
|
|
from app.routers.auth import get_current_user
|
|
from app.services.audit import get_client_ip, log_audit_event
|
|
from app.services.auth import averify_password_with_upgrade
|
|
from app.services.session import (
|
|
create_db_session,
|
|
set_session_cookie,
|
|
check_account_lockout,
|
|
record_failed_login,
|
|
record_successful_login,
|
|
)
|
|
from app.services.passkey import (
|
|
create_challenge_token,
|
|
verify_challenge_token,
|
|
build_registration_options,
|
|
verify_registration as verify_registration_response_svc,
|
|
build_authentication_options,
|
|
verify_authentication as verify_authentication_response_svc,
|
|
)
|
|
from webauthn.helpers import bytes_to_base64url, base64url_to_bytes
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Request/Response schemas
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class PasskeyRegisterBeginRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
password: str = Field(max_length=128)
|
|
|
|
|
|
class PasskeyRegisterCompleteRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
credential: str = Field(max_length=8192)
|
|
challenge_token: str = Field(max_length=2048)
|
|
name: str = Field(min_length=1, max_length=100)
|
|
|
|
|
|
class PasskeyLoginBeginRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
username: str | None = Field(None, max_length=50)
|
|
|
|
|
|
class PasskeyLoginCompleteRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
credential: str = Field(max_length=8192)
|
|
challenge_token: str = Field(max_length=2048)
|
|
|
|
|
|
class PasskeyDeleteRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
password: str = Field(max_length=128)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Registration endpoints (authenticated)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/register/begin")
|
|
async def passkey_register_begin(
|
|
data: PasskeyRegisterBeginRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Start passkey registration. Requires password re-entry (V-02)."""
|
|
# V-02: Verify password before allowing registration
|
|
valid, new_hash = await averify_password_with_upgrade(
|
|
data.password, current_user.password_hash
|
|
)
|
|
if not valid:
|
|
raise HTTPException(status_code=401, detail="Invalid password")
|
|
if new_hash:
|
|
current_user.password_hash = new_hash
|
|
await db.commit()
|
|
|
|
# Load existing credential IDs for exclusion
|
|
result = await db.execute(
|
|
select(PasskeyCredential.credential_id).where(
|
|
PasskeyCredential.user_id == current_user.id
|
|
)
|
|
)
|
|
existing_ids = [
|
|
base64url_to_bytes(row[0]) for row in result.all()
|
|
]
|
|
|
|
options_json, challenge = build_registration_options(
|
|
user_id=current_user.id,
|
|
username=current_user.username,
|
|
existing_credential_ids=existing_ids,
|
|
)
|
|
token = create_challenge_token(challenge, user_id=current_user.id)
|
|
|
|
return {
|
|
"options": json.loads(options_json),
|
|
"challenge_token": token,
|
|
}
|
|
|
|
|
|
@router.post("/register/complete")
|
|
async def passkey_register_complete(
|
|
data: PasskeyRegisterCompleteRequest,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Complete passkey registration ceremony."""
|
|
# Verify challenge token — cross-check user binding (S-01) + single-use nonce (V-01)
|
|
challenge = verify_challenge_token(
|
|
data.challenge_token, expected_user_id=current_user.id
|
|
)
|
|
if challenge is None:
|
|
raise HTTPException(status_code=401, detail="Invalid or expired challenge")
|
|
|
|
try:
|
|
verified = verify_registration_response_svc(
|
|
credential_json=data.credential,
|
|
challenge=challenge,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Passkey registration verification failed: %s", e)
|
|
raise HTTPException(status_code=400, detail="Registration verification failed")
|
|
|
|
# Store credential
|
|
credential_id_b64 = bytes_to_base64url(verified.credential_id)
|
|
|
|
# Check for duplicate (race condition safety)
|
|
existing = await db.execute(
|
|
select(PasskeyCredential).where(
|
|
PasskeyCredential.credential_id == credential_id_b64
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
raise HTTPException(status_code=409, detail="Credential already registered")
|
|
|
|
# Extract transport hints if available
|
|
transports_json = None
|
|
if hasattr(verified, 'credential_device_type'):
|
|
pass # py_webauthn doesn't expose transports on VerifiedRegistration
|
|
# Transports come from the browser response — parse from credential JSON
|
|
try:
|
|
cred_data = json.loads(data.credential)
|
|
if "response" in cred_data and "transports" in cred_data["response"]:
|
|
transports_json = json.dumps(cred_data["response"]["transports"])
|
|
except (json.JSONDecodeError, KeyError):
|
|
pass
|
|
|
|
# Determine backup state from py_webauthn flags
|
|
backed_up = getattr(verified, 'credential_backed_up', False)
|
|
|
|
new_credential = PasskeyCredential(
|
|
user_id=current_user.id,
|
|
credential_id=credential_id_b64,
|
|
public_key=bytes_to_base64url(verified.credential_public_key),
|
|
sign_count=verified.sign_count,
|
|
name=data.name,
|
|
transports=transports_json,
|
|
backed_up=backed_up,
|
|
)
|
|
db.add(new_credential)
|
|
|
|
# B-02: If user has mfa_enforce_pending, clear it (passkey = MFA)
|
|
if current_user.mfa_enforce_pending:
|
|
current_user.mfa_enforce_pending = False
|
|
|
|
# Extract response data BEFORE commit (ORM expiry rule)
|
|
response_data = {
|
|
"id": None, # will be set after flush
|
|
"name": new_credential.name,
|
|
"created_at": None,
|
|
"backed_up": backed_up,
|
|
}
|
|
|
|
await db.flush()
|
|
response_data["id"] = new_credential.id
|
|
response_data["created_at"] = str(new_credential.created_at) if new_credential.created_at else None
|
|
|
|
await log_audit_event(
|
|
db, action="passkey.registered", actor_id=current_user.id,
|
|
detail={"credential_name": data.name},
|
|
ip=get_client_ip(request),
|
|
)
|
|
await db.commit()
|
|
|
|
return response_data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Authentication endpoints (unauthenticated — CSRF-exempt)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/login/begin")
|
|
async def passkey_login_begin(
|
|
data: PasskeyLoginBeginRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Start passkey authentication. CSRF-exempt, public endpoint."""
|
|
credential_data = None
|
|
|
|
if data.username:
|
|
# Look up user's credentials for allowCredentials
|
|
result = await db.execute(
|
|
select(User).where(User.username == data.username.lower().strip())
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
if user:
|
|
cred_result = await db.execute(
|
|
select(
|
|
PasskeyCredential.credential_id,
|
|
PasskeyCredential.transports,
|
|
).where(PasskeyCredential.user_id == user.id)
|
|
)
|
|
rows = cred_result.all()
|
|
if rows:
|
|
credential_data = []
|
|
for row in rows:
|
|
cid_bytes = base64url_to_bytes(row[0])
|
|
transports = json.loads(row[1]) if row[1] else None
|
|
credential_data.append((cid_bytes, transports))
|
|
|
|
# V-03: Generate options regardless of whether user exists or has passkeys.
|
|
# Identical response shape prevents timing enumeration.
|
|
options_json, challenge = build_authentication_options(
|
|
credential_ids_and_transports=credential_data,
|
|
)
|
|
token = create_challenge_token(challenge)
|
|
|
|
return {
|
|
"options": json.loads(options_json),
|
|
"challenge_token": token,
|
|
}
|
|
|
|
|
|
@router.post("/login/complete")
|
|
async def passkey_login_complete(
|
|
data: PasskeyLoginCompleteRequest,
|
|
request: Request,
|
|
response: Response,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Complete passkey authentication. CSRF-exempt, public endpoint."""
|
|
# Verify challenge token (60s TTL, single-use nonce V-01)
|
|
challenge = verify_challenge_token(data.challenge_token)
|
|
if challenge is None:
|
|
raise HTTPException(status_code=401, detail="Authentication failed")
|
|
|
|
# Parse credential_id from browser response
|
|
try:
|
|
cred_data = json.loads(data.credential)
|
|
raw_id_b64 = cred_data.get("rawId") or cred_data.get("id", "")
|
|
except (json.JSONDecodeError, KeyError):
|
|
raise HTTPException(status_code=401, detail="Authentication failed")
|
|
|
|
# Look up credential + user in a single JOIN query (W-1 perf fix)
|
|
result = await db.execute(
|
|
select(PasskeyCredential, User)
|
|
.join(User, User.id == PasskeyCredential.user_id)
|
|
.where(PasskeyCredential.credential_id == raw_id_b64)
|
|
)
|
|
row = result.one_or_none()
|
|
if not row:
|
|
raise HTTPException(status_code=401, detail="Authentication failed")
|
|
credential, user = row.tuple()
|
|
|
|
# Check account lockout (C-03)
|
|
await check_account_lockout(user)
|
|
|
|
# Check active status (C-03)
|
|
if not user.is_active:
|
|
raise HTTPException(status_code=401, detail="Authentication failed")
|
|
|
|
# Verify the authentication response
|
|
try:
|
|
verified = verify_authentication_response_svc(
|
|
credential_json=data.credential,
|
|
challenge=challenge,
|
|
credential_public_key=base64url_to_bytes(credential.public_key),
|
|
credential_current_sign_count=credential.sign_count,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Passkey authentication verification failed for user %s: %s", user.id, e)
|
|
# Increment failed login counter (shared with password auth)
|
|
await record_failed_login(db, user)
|
|
await log_audit_event(
|
|
db, action="passkey.login_failed", actor_id=user.id,
|
|
detail={"reason": "verification_failed"},
|
|
ip=get_client_ip(request),
|
|
)
|
|
await db.commit()
|
|
raise HTTPException(status_code=401, detail="Authentication failed")
|
|
|
|
# Update sign count (log anomaly but don't fail — S-05)
|
|
new_sign_count = verified.new_sign_count
|
|
if new_sign_count < credential.sign_count and credential.sign_count > 0:
|
|
logger.warning(
|
|
"Sign count anomaly for user %s credential %s: expected >= %d, got %d",
|
|
user.id, credential.id, credential.sign_count, new_sign_count,
|
|
)
|
|
await log_audit_event(
|
|
db, action="passkey.sign_count_anomaly", actor_id=user.id,
|
|
detail={
|
|
"credential_id": credential.id,
|
|
"expected": credential.sign_count,
|
|
"received": new_sign_count,
|
|
},
|
|
ip=get_client_ip(request),
|
|
)
|
|
|
|
credential.sign_count = new_sign_count
|
|
credential.last_used_at = datetime.now()
|
|
|
|
# Record successful login
|
|
await record_successful_login(db, user)
|
|
|
|
# Create session (shared service — enforces session cap)
|
|
client_ip = get_client_ip(request)
|
|
user_agent = request.headers.get("user-agent")
|
|
_, token = await create_db_session(db, user, client_ip, user_agent)
|
|
set_session_cookie(response, token)
|
|
|
|
# B-01: Handle must_change_password and mfa_enforce_pending flags
|
|
result_data: dict = {"authenticated": True}
|
|
if user.must_change_password:
|
|
result_data["must_change_password"] = True
|
|
# Passkey satisfies MFA — if mfa_enforce_pending, clear it (before commit)
|
|
if user.mfa_enforce_pending:
|
|
user.mfa_enforce_pending = False
|
|
|
|
await log_audit_event(
|
|
db, action="passkey.login_success", actor_id=user.id,
|
|
detail={"credential_name": credential.name},
|
|
ip=client_ip,
|
|
)
|
|
await db.commit()
|
|
|
|
return result_data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Management endpoints (authenticated)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/")
|
|
async def list_passkeys(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""List all passkeys for the current user."""
|
|
result = await db.execute(
|
|
select(PasskeyCredential)
|
|
.where(PasskeyCredential.user_id == current_user.id)
|
|
.order_by(PasskeyCredential.created_at.desc())
|
|
)
|
|
credentials = result.scalars().all()
|
|
|
|
return [
|
|
{
|
|
"id": c.id,
|
|
"name": c.name,
|
|
"created_at": str(c.created_at) if c.created_at else None,
|
|
"last_used_at": str(c.last_used_at) if c.last_used_at else None,
|
|
"backed_up": c.backed_up,
|
|
}
|
|
for c in credentials
|
|
]
|
|
|
|
|
|
@router.delete("/{credential_id}")
|
|
async def delete_passkey(
|
|
credential_id: int = Path(ge=1, le=2147483647),
|
|
data: PasskeyDeleteRequest = ...,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Delete a passkey. Requires password confirmation (S-06)."""
|
|
# Verify password
|
|
valid, new_hash = await averify_password_with_upgrade(
|
|
data.password, current_user.password_hash
|
|
)
|
|
if not valid:
|
|
raise HTTPException(status_code=401, detail="Invalid password")
|
|
if new_hash:
|
|
current_user.password_hash = new_hash
|
|
|
|
# Look up credential — verify ownership (IDOR prevention)
|
|
result = await db.execute(
|
|
select(PasskeyCredential).where(
|
|
PasskeyCredential.id == credential_id,
|
|
PasskeyCredential.user_id == current_user.id,
|
|
)
|
|
)
|
|
credential = result.scalar_one_or_none()
|
|
if not credential:
|
|
raise HTTPException(status_code=404, detail="Passkey not found")
|
|
|
|
cred_name = credential.name
|
|
await db.delete(credential)
|
|
|
|
await log_audit_event(
|
|
db, action="passkey.deleted", actor_id=current_user.id,
|
|
detail={"credential_name": cred_name, "credential_db_id": credential_id},
|
|
ip=get_client_ip(request),
|
|
)
|
|
await db.commit()
|
|
|
|
return {"message": "Passkey removed"}
|