F-01 (passkeys.py): Add constant-time DB no-op on login/begin when username not found. Without it the absent credential-fetch query makes the "no user" path measurably faster, leaking username existence via timing. F-02 (session.py, auth.py, passkeys.py, totp.py): Change check_account_lockout from HTTP 423 to 401 — status-code analysis can no longer distinguish a locked account from an invalid credential. record_failed_login now returns remaining attempt count; callers use it for progressive UX warnings (<=3 attempts left, and on the locking attempt) without changing the 401 status code visible to attackers. Session-lock 423 path in get_current_user is unaffected. F-03 (nginx.conf): Replace set_real_ip_from 0.0.0.0/0 with RFC 1918 ranges (172.16.0.0/12, 10.0.0.0/8) to prevent external clients from spoofing X-Forwarded-For to bypass rate limiting. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
521 lines
19 KiB
Python
521 lines
19 KiB
Python
"""
|
|
TOTP MFA router.
|
|
|
|
Endpoints (all under /api/auth — registered in main.py with prefix="/api/auth"):
|
|
|
|
POST /totp/setup — Generate secret + QR + backup codes (auth required)
|
|
POST /totp/confirm — Verify first code, enable TOTP (auth required)
|
|
POST /totp-verify — MFA challenge: mfa_token + TOTP/backup code, issues session
|
|
POST /totp/disable — Disable TOTP (auth required, needs password + code)
|
|
POST /totp/backup-codes/regenerate — Regenerate backup codes (auth required, needs password + code)
|
|
GET /totp/status — { enabled, backup_codes_remaining } (auth required)
|
|
|
|
Security:
|
|
- TOTP secrets encrypted at rest (Fernet/AES-128-CBC, key derived from SECRET_KEY)
|
|
- Replay prevention via totp_usage table (unique on user_id+code+window)
|
|
- Backup codes hashed with Argon2id, shown plaintext once only
|
|
- Failed TOTP attempts increment user.failed_login_count (shared lockout counter)
|
|
- totp-verify uses mfa_token (not session cookie) — user is not yet authenticated
|
|
"""
|
|
import asyncio
|
|
import secrets
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, Response
|
|
from pydantic import BaseModel, ConfigDict, Field
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, delete
|
|
from sqlalchemy.exc import IntegrityError
|
|
|
|
from app.database import get_db
|
|
from app.models.user import User
|
|
from app.models.totp_usage import TOTPUsage
|
|
from app.models.backup_code import BackupCode
|
|
from app.routers.auth import get_current_user
|
|
from app.services.audit import get_client_ip
|
|
from app.services.auth import (
|
|
averify_password_with_upgrade,
|
|
verify_mfa_token,
|
|
verify_mfa_enforce_token,
|
|
)
|
|
from app.services.session import (
|
|
create_db_session,
|
|
set_session_cookie,
|
|
check_account_lockout,
|
|
record_failed_login,
|
|
record_successful_login,
|
|
)
|
|
from app.services.totp import (
|
|
generate_totp_secret,
|
|
encrypt_totp_secret,
|
|
decrypt_totp_secret,
|
|
get_totp_uri,
|
|
verify_totp_code,
|
|
generate_qr_base64,
|
|
generate_backup_codes,
|
|
)
|
|
|
|
|
|
# Argon2id for backup code hashing — treat each code like a password
|
|
from argon2 import PasswordHasher
|
|
from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
# Argon2id instance for backup code hashes (same params as password hashing)
|
|
_ph = PasswordHasher(
|
|
time_cost=2,
|
|
memory_cost=19456,
|
|
parallelism=1,
|
|
hash_len=32,
|
|
salt_len=16,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Request schemas
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TOTPConfirmRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
code: str = Field(min_length=6, max_length=6)
|
|
|
|
|
|
class TOTPVerifyRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
mfa_token: str = Field(max_length=256)
|
|
code: Optional[str] = Field(None, min_length=6, max_length=6) # 6-digit TOTP code
|
|
backup_code: Optional[str] = Field(None, max_length=9) # XXXX-XXXX backup code
|
|
|
|
|
|
class TOTPDisableRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
password: str = Field(max_length=128)
|
|
code: str = Field(min_length=6, max_length=6) # Current TOTP code required to disable
|
|
|
|
|
|
class BackupCodesRegenerateRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
password: str = Field(max_length=128)
|
|
code: str = Field(min_length=6, max_length=6) # Current TOTP code required to regenerate
|
|
|
|
|
|
class EnforceSetupRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
mfa_token: str = Field(max_length=256)
|
|
|
|
|
|
class EnforceConfirmRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
mfa_token: str = Field(max_length=256)
|
|
code: str = Field(min_length=6, max_length=6) # 6-digit TOTP code from authenticator app
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _store_backup_codes(db: AsyncSession, user_id: int, plaintext_codes: list[str]) -> None:
|
|
"""Hash and insert backup codes for the given user."""
|
|
# AC-2: Run Argon2id hashing in executor to avoid blocking event loop
|
|
loop = asyncio.get_running_loop()
|
|
for code in plaintext_codes:
|
|
code_hash = await loop.run_in_executor(None, _ph.hash, code)
|
|
db.add(BackupCode(user_id=user_id, code_hash=code_hash))
|
|
await db.commit()
|
|
|
|
|
|
async def _delete_backup_codes(db: AsyncSession, user_id: int) -> None:
|
|
"""Delete all backup codes for a user."""
|
|
await db.execute(delete(BackupCode).where(BackupCode.user_id == user_id))
|
|
await db.commit()
|
|
|
|
|
|
async def _verify_backup_code(
|
|
db: AsyncSession, user_id: int, submitted_code: str
|
|
) -> bool:
|
|
"""
|
|
Check submitted backup code against all unused hashes for the user.
|
|
On match, marks the code as used. Returns True if a valid unused code was found.
|
|
Uses Argon2id verification — constant-time by design.
|
|
"""
|
|
result = await db.execute(
|
|
select(BackupCode).where(
|
|
BackupCode.user_id == user_id,
|
|
BackupCode.used_at.is_(None),
|
|
)
|
|
)
|
|
unused_codes = result.scalars().all()
|
|
|
|
# AC-2: Run Argon2id verification in executor to avoid blocking event loop
|
|
loop = asyncio.get_running_loop()
|
|
for record in unused_codes:
|
|
try:
|
|
matched = await loop.run_in_executor(None, _ph.verify, record.code_hash, submitted_code)
|
|
if matched:
|
|
record.used_at = datetime.now()
|
|
await db.commit()
|
|
return True
|
|
except (VerifyMismatchError, VerificationError, InvalidHashError):
|
|
continue
|
|
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/totp/setup")
|
|
async def totp_setup(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
Generate a new TOTP secret, QR code, and backup codes.
|
|
Stores the encrypted secret with totp_enabled=False until confirmed.
|
|
|
|
Idempotent: calling again before confirmation overwrites the unconfirmed secret,
|
|
so browser refreshes mid-setup generate a fresh QR without error.
|
|
|
|
Returns { secret, qr_code_base64, backup_codes } — the only time plaintext
|
|
values are shown. The `secret` field is the raw base32 for manual entry.
|
|
"""
|
|
# Generate new secret (idempotent — overwrite any existing unconfirmed secret)
|
|
raw_secret = generate_totp_secret()
|
|
encrypted_secret = encrypt_totp_secret(raw_secret)
|
|
|
|
current_user.totp_secret = encrypted_secret
|
|
current_user.totp_enabled = False # Not enabled until /confirm called
|
|
|
|
# Generate backup codes — hash before storage, return plaintext once
|
|
plaintext_codes = generate_backup_codes(10)
|
|
await _delete_backup_codes(db, current_user.id) # Remove any previous unconfirmed codes
|
|
await _store_backup_codes(db, current_user.id, plaintext_codes)
|
|
|
|
await db.commit()
|
|
|
|
# Build QR code from provisioning URI
|
|
uri = get_totp_uri(encrypted_secret, current_user.username)
|
|
qr_base64 = generate_qr_base64(uri)
|
|
|
|
return {
|
|
"secret": raw_secret, # Raw base32 for manual authenticator entry
|
|
"qr_code_base64": qr_base64, # PNG QR code, data:image/png;base64,...
|
|
"backup_codes": plaintext_codes, # Shown once — user must save these
|
|
}
|
|
|
|
|
|
@router.post("/totp/confirm")
|
|
async def totp_confirm(
|
|
data: TOTPConfirmRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
Verify the first TOTP code from the authenticator app and enable TOTP.
|
|
Must be called after /setup while totp_enabled is still False.
|
|
"""
|
|
if not current_user.totp_secret:
|
|
raise HTTPException(status_code=400, detail="TOTP setup not started — call /setup first")
|
|
|
|
if current_user.totp_enabled:
|
|
raise HTTPException(status_code=400, detail="TOTP is already enabled")
|
|
|
|
matched_window = verify_totp_code(current_user.totp_secret, data.code)
|
|
if matched_window is None:
|
|
raise HTTPException(status_code=400, detail="Invalid code — check your authenticator app time sync")
|
|
|
|
current_user.totp_enabled = True
|
|
await db.commit()
|
|
|
|
return {"message": "TOTP enabled successfully"}
|
|
|
|
|
|
@router.post("/totp-verify")
|
|
async def totp_verify(
|
|
data: TOTPVerifyRequest,
|
|
request: Request,
|
|
response: Response,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""
|
|
MFA challenge endpoint — called after a successful password login when TOTP is enabled.
|
|
Accepts either a 6-digit TOTP code or a backup recovery code.
|
|
|
|
Uses the short-lived mfa_token (from POST /login) instead of a session cookie
|
|
because the user is not yet fully authenticated at this stage.
|
|
|
|
On success: issues a full session cookie and returns { authenticated: true }.
|
|
"""
|
|
if not data.code and not data.backup_code:
|
|
raise HTTPException(status_code=422, detail="Provide either 'code' or 'backup_code'")
|
|
|
|
# Validate the MFA challenge token (5-minute TTL)
|
|
user_id = verify_mfa_token(data.mfa_token)
|
|
if user_id is None:
|
|
raise HTTPException(status_code=401, detail="MFA session expired — please log in again")
|
|
|
|
result = await db.execute(select(User).where(User.id == user_id, User.is_active == True))
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(status_code=401, detail="User not found or inactive")
|
|
|
|
if not user.totp_enabled or not user.totp_secret:
|
|
raise HTTPException(status_code=400, detail="TOTP not configured for this account")
|
|
|
|
# Check account lockout (shared counter with password failures)
|
|
await check_account_lockout(user)
|
|
|
|
# --- Backup code path ---
|
|
if data.backup_code:
|
|
normalized = data.backup_code.strip().upper()
|
|
valid = await _verify_backup_code(db, user.id, normalized)
|
|
if not valid:
|
|
remaining = await record_failed_login(db, user)
|
|
await db.commit()
|
|
if remaining == 0:
|
|
raise HTTPException(status_code=401, detail="Account temporarily locked. Try again in 30 minutes.")
|
|
raise HTTPException(status_code=401, detail="Invalid backup code")
|
|
|
|
# Backup code accepted — reset lockout counter and issue session
|
|
await record_successful_login(db, user)
|
|
|
|
ip = get_client_ip(request)
|
|
user_agent = request.headers.get("user-agent")
|
|
_, token = await create_db_session(db, user, ip, user_agent)
|
|
set_session_cookie(response, token)
|
|
await db.commit()
|
|
return {"authenticated": True}
|
|
|
|
# --- TOTP code path ---
|
|
matched_window = verify_totp_code(user.totp_secret, data.code)
|
|
if matched_window is None:
|
|
remaining = await record_failed_login(db, user)
|
|
await db.commit()
|
|
if remaining == 0:
|
|
raise HTTPException(status_code=401, detail="Account temporarily locked. Try again in 30 minutes.")
|
|
raise HTTPException(status_code=401, detail="Invalid code")
|
|
|
|
# Replay prevention — record (user_id, code, actual_matching_window)
|
|
totp_record = TOTPUsage(user_id=user.id, code=data.code, window=matched_window)
|
|
db.add(totp_record)
|
|
try:
|
|
await db.flush()
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
raise HTTPException(status_code=401, detail="Code already used — wait for the next code")
|
|
|
|
# Success — reset lockout counter, update last_login_at, issue full session
|
|
await record_successful_login(db, user)
|
|
|
|
ip = get_client_ip(request)
|
|
user_agent = request.headers.get("user-agent")
|
|
_, token = await create_db_session(db, user, ip, user_agent)
|
|
set_session_cookie(response, token)
|
|
await db.commit()
|
|
return {"authenticated": True}
|
|
|
|
|
|
@router.post("/totp/disable")
|
|
async def totp_disable(
|
|
data: TOTPDisableRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
Disable TOTP for the current user.
|
|
Requires both current password AND a valid TOTP code as confirmation.
|
|
Clears totp_secret, sets totp_enabled=False, and deletes all backup codes.
|
|
"""
|
|
if not current_user.totp_enabled:
|
|
raise HTTPException(status_code=400, detail="TOTP is not enabled")
|
|
|
|
# Verify password (handles bcrypt→Argon2id upgrade transparently)
|
|
# AC-2: async wrapper to avoid blocking event loop
|
|
valid, new_hash = await averify_password_with_upgrade(data.password, current_user.password_hash)
|
|
if not valid:
|
|
raise HTTPException(status_code=401, detail="Invalid password")
|
|
|
|
if new_hash:
|
|
current_user.password_hash = new_hash
|
|
|
|
# Verify TOTP code — both checks required for disable
|
|
matched_window = verify_totp_code(current_user.totp_secret, data.code)
|
|
if matched_window is None:
|
|
raise HTTPException(status_code=401, detail="Invalid TOTP code")
|
|
|
|
# All checks passed — disable TOTP
|
|
current_user.totp_secret = None
|
|
current_user.totp_enabled = False
|
|
await _delete_backup_codes(db, current_user.id)
|
|
await db.commit()
|
|
|
|
return {"message": "TOTP disabled successfully"}
|
|
|
|
|
|
@router.post("/totp/backup-codes/regenerate")
|
|
async def regenerate_backup_codes(
|
|
data: BackupCodesRegenerateRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
Regenerate backup recovery codes.
|
|
Requires current password AND a valid TOTP code.
|
|
Deletes all existing backup codes and generates 10 fresh ones.
|
|
Returns plaintext codes once — never retrievable again.
|
|
"""
|
|
if not current_user.totp_enabled:
|
|
raise HTTPException(status_code=400, detail="TOTP is not enabled")
|
|
|
|
# AC-2: async wrapper to avoid blocking event loop
|
|
valid, new_hash = await averify_password_with_upgrade(data.password, current_user.password_hash)
|
|
if not valid:
|
|
raise HTTPException(status_code=401, detail="Invalid password")
|
|
|
|
if new_hash:
|
|
current_user.password_hash = new_hash
|
|
await db.commit()
|
|
|
|
matched_window = verify_totp_code(current_user.totp_secret, data.code)
|
|
if matched_window is None:
|
|
raise HTTPException(status_code=401, detail="Invalid TOTP code")
|
|
|
|
# Regenerate
|
|
plaintext_codes = generate_backup_codes(10)
|
|
await _delete_backup_codes(db, current_user.id)
|
|
await _store_backup_codes(db, current_user.id, plaintext_codes)
|
|
|
|
return {"backup_codes": plaintext_codes}
|
|
|
|
|
|
@router.post("/totp/enforce-setup")
|
|
async def enforce_setup_totp(
|
|
data: EnforceSetupRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""
|
|
Generate TOTP secret + QR code + backup codes during MFA enforcement.
|
|
|
|
Called after login returns mfa_setup_required=True. Uses the mfa_enforce_token
|
|
(not a session cookie) because the user is not yet fully authenticated.
|
|
|
|
Idempotent: regenerates secret if called again before confirm.
|
|
Returns { secret, qr_code_base64, backup_codes }.
|
|
"""
|
|
user_id = verify_mfa_enforce_token(data.mfa_token)
|
|
if user_id is None:
|
|
raise HTTPException(status_code=401, detail="Invalid or expired enforcement token — please log in again")
|
|
|
|
result = await db.execute(select(User).where(User.id == user_id, User.is_active == True))
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(status_code=401, detail="User not found or inactive")
|
|
|
|
if not user.mfa_enforce_pending:
|
|
raise HTTPException(status_code=400, detail="MFA enforcement is not pending for this account")
|
|
|
|
if user.totp_enabled:
|
|
raise HTTPException(status_code=400, detail="TOTP is already enabled for this account")
|
|
|
|
# Generate new secret (idempotent — overwrite any unconfirmed secret)
|
|
raw_secret = generate_totp_secret()
|
|
encrypted_secret = encrypt_totp_secret(raw_secret)
|
|
user.totp_secret = encrypted_secret
|
|
user.totp_enabled = False # Not enabled until enforce-confirm called
|
|
|
|
# Generate backup codes — hash before storage, return plaintext once
|
|
plaintext_codes = generate_backup_codes(10)
|
|
await _delete_backup_codes(db, user.id)
|
|
await _store_backup_codes(db, user.id, plaintext_codes)
|
|
|
|
await db.commit()
|
|
|
|
uri = get_totp_uri(encrypted_secret, user.username)
|
|
qr_base64 = generate_qr_base64(uri)
|
|
|
|
return {
|
|
"secret": raw_secret,
|
|
"qr_code_base64": qr_base64,
|
|
"backup_codes": plaintext_codes,
|
|
}
|
|
|
|
|
|
@router.post("/totp/enforce-confirm")
|
|
async def enforce_confirm_totp(
|
|
data: EnforceConfirmRequest,
|
|
request: Request,
|
|
response: Response,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""
|
|
Confirm TOTP setup during enforcement, clear the pending flag, issue a full session.
|
|
|
|
Must be called after /totp/enforce-setup while totp_enabled is still False.
|
|
On success: enables TOTP, clears mfa_enforce_pending, sets session cookie,
|
|
returns { authenticated: true }.
|
|
"""
|
|
user_id = verify_mfa_enforce_token(data.mfa_token)
|
|
if user_id is None:
|
|
raise HTTPException(status_code=401, detail="Invalid or expired enforcement token — please log in again")
|
|
|
|
result = await db.execute(select(User).where(User.id == user_id, User.is_active == True))
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(status_code=401, detail="User not found or inactive")
|
|
|
|
if not user.mfa_enforce_pending:
|
|
raise HTTPException(status_code=400, detail="MFA enforcement is not pending for this account")
|
|
|
|
if not user.totp_secret:
|
|
raise HTTPException(status_code=400, detail="TOTP setup not started — call /totp/enforce-setup first")
|
|
|
|
if user.totp_enabled:
|
|
raise HTTPException(status_code=400, detail="TOTP is already enabled")
|
|
|
|
# Verify the confirmation code
|
|
matched_window = verify_totp_code(user.totp_secret, data.code)
|
|
if matched_window is None:
|
|
raise HTTPException(status_code=400, detail="Invalid code — check your authenticator app time sync")
|
|
|
|
# Enable TOTP and clear the enforcement flag
|
|
user.totp_enabled = True
|
|
user.mfa_enforce_pending = False
|
|
user.last_login_at = datetime.now()
|
|
await db.commit()
|
|
|
|
# Issue a full session (now uses shared session service with cap enforcement)
|
|
ip = get_client_ip(request)
|
|
user_agent = request.headers.get("user-agent")
|
|
_, token = await create_db_session(db, user, ip, user_agent)
|
|
set_session_cookie(response, token)
|
|
|
|
return {"authenticated": True}
|
|
|
|
|
|
@router.get("/totp/status")
|
|
async def totp_status(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Return TOTP enabled state and count of remaining (unused) backup codes."""
|
|
remaining = 0
|
|
if current_user.totp_enabled:
|
|
result = await db.execute(
|
|
select(BackupCode).where(
|
|
BackupCode.user_id == current_user.id,
|
|
BackupCode.used_at.is_(None),
|
|
)
|
|
)
|
|
remaining = len(result.scalars().all())
|
|
|
|
return {
|
|
"enabled": current_user.totp_enabled,
|
|
"backup_codes_remaining": remaining,
|
|
}
|