""" TOTP MFA router. Endpoints (all under /api/auth — registered in main.py with prefix="/api/auth"): POST /totp/setup — Generate secret + QR + backup codes (auth required) POST /totp/confirm — Verify first code, enable TOTP (auth required) POST /totp-verify — MFA challenge: mfa_token + TOTP/backup code, issues session POST /totp/disable — Disable TOTP (auth required, needs password + code) POST /totp/backup-codes/regenerate — Regenerate backup codes (auth required, needs password + code) GET /totp/status — { enabled, backup_codes_remaining } (auth required) Security: - TOTP secrets encrypted at rest (Fernet/AES-128-CBC, key derived from SECRET_KEY) - Replay prevention via totp_usage table (unique on user_id+code+window) - Backup codes hashed with Argon2id, shown plaintext once only - Failed TOTP attempts increment user.failed_login_count (shared lockout counter) - totp-verify uses mfa_token (not session cookie) — user is not yet authenticated """ import asyncio import uuid import secrets import logging from datetime import datetime, timedelta from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Request, Response from pydantic import BaseModel, ConfigDict, Field from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete from sqlalchemy.exc import IntegrityError from app.database import get_db from app.models.user import User from app.models.session import UserSession from app.models.totp_usage import TOTPUsage from app.models.backup_code import BackupCode from app.routers.auth import get_current_user, _set_session_cookie from app.services.audit import get_client_ip from app.services.auth import ( averify_password_with_upgrade, verify_mfa_token, verify_mfa_enforce_token, create_session_token, ) from app.services.totp import ( generate_totp_secret, encrypt_totp_secret, decrypt_totp_secret, get_totp_uri, verify_totp_code, generate_qr_base64, generate_backup_codes, ) from app.config import settings as app_settings # Argon2id for backup code hashing — treat each code like a password from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError logger = logging.getLogger(__name__) router = APIRouter() # Argon2id instance for backup code hashes (same params as password hashing) _ph = PasswordHasher( time_cost=2, memory_cost=19456, parallelism=1, hash_len=32, salt_len=16, ) # --------------------------------------------------------------------------- # Request schemas # --------------------------------------------------------------------------- class TOTPConfirmRequest(BaseModel): model_config = ConfigDict(extra="forbid") code: str = Field(min_length=6, max_length=6) class TOTPVerifyRequest(BaseModel): model_config = ConfigDict(extra="forbid") mfa_token: str = Field(max_length=256) code: Optional[str] = Field(None, min_length=6, max_length=6) # 6-digit TOTP code backup_code: Optional[str] = Field(None, max_length=9) # XXXX-XXXX backup code class TOTPDisableRequest(BaseModel): model_config = ConfigDict(extra="forbid") password: str = Field(max_length=128) code: str = Field(min_length=6, max_length=6) # Current TOTP code required to disable class BackupCodesRegenerateRequest(BaseModel): model_config = ConfigDict(extra="forbid") password: str = Field(max_length=128) code: str = Field(min_length=6, max_length=6) # Current TOTP code required to regenerate class EnforceSetupRequest(BaseModel): model_config = ConfigDict(extra="forbid") mfa_token: str = Field(max_length=256) class EnforceConfirmRequest(BaseModel): model_config = ConfigDict(extra="forbid") mfa_token: str = Field(max_length=256) code: str = Field(min_length=6, max_length=6) # 6-digit TOTP code from authenticator app # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- async def _store_backup_codes(db: AsyncSession, user_id: int, plaintext_codes: list[str]) -> None: """Hash and insert backup codes for the given user.""" # AC-2: Run Argon2id hashing in executor to avoid blocking event loop loop = asyncio.get_running_loop() for code in plaintext_codes: code_hash = await loop.run_in_executor(None, _ph.hash, code) db.add(BackupCode(user_id=user_id, code_hash=code_hash)) await db.commit() async def _delete_backup_codes(db: AsyncSession, user_id: int) -> None: """Delete all backup codes for a user.""" await db.execute(delete(BackupCode).where(BackupCode.user_id == user_id)) await db.commit() async def _verify_backup_code( db: AsyncSession, user_id: int, submitted_code: str ) -> bool: """ Check submitted backup code against all unused hashes for the user. On match, marks the code as used. Returns True if a valid unused code was found. Uses Argon2id verification — constant-time by design. """ result = await db.execute( select(BackupCode).where( BackupCode.user_id == user_id, BackupCode.used_at.is_(None), ) ) unused_codes = result.scalars().all() # AC-2: Run Argon2id verification in executor to avoid blocking event loop loop = asyncio.get_running_loop() for record in unused_codes: try: matched = await loop.run_in_executor(None, _ph.verify, record.code_hash, submitted_code) if matched: record.used_at = datetime.now() await db.commit() return True except (VerifyMismatchError, VerificationError, InvalidHashError): continue return False async def _create_full_session( db: AsyncSession, user: User, request: Request, ) -> str: """Create a UserSession row and return the signed cookie token.""" session_id = uuid.uuid4().hex expires_at = datetime.now() + timedelta(days=app_settings.SESSION_MAX_AGE_DAYS) ip = get_client_ip(request) user_agent = request.headers.get("user-agent") db_session = UserSession( id=session_id, user_id=user.id, expires_at=expires_at, ip_address=ip[:45] if ip else None, user_agent=(user_agent or "")[:255] if user_agent else None, ) db.add(db_session) await db.commit() return create_session_token(user.id, session_id) # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @router.post("/totp/setup") async def totp_setup( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Generate a new TOTP secret, QR code, and backup codes. Stores the encrypted secret with totp_enabled=False until confirmed. Idempotent: calling again before confirmation overwrites the unconfirmed secret, so browser refreshes mid-setup generate a fresh QR without error. Returns { secret, qr_code_base64, backup_codes } — the only time plaintext values are shown. The `secret` field is the raw base32 for manual entry. """ # Generate new secret (idempotent — overwrite any existing unconfirmed secret) raw_secret = generate_totp_secret() encrypted_secret = encrypt_totp_secret(raw_secret) current_user.totp_secret = encrypted_secret current_user.totp_enabled = False # Not enabled until /confirm called # Generate backup codes — hash before storage, return plaintext once plaintext_codes = generate_backup_codes(10) await _delete_backup_codes(db, current_user.id) # Remove any previous unconfirmed codes await _store_backup_codes(db, current_user.id, plaintext_codes) await db.commit() # Build QR code from provisioning URI uri = get_totp_uri(encrypted_secret, current_user.username) qr_base64 = generate_qr_base64(uri) return { "secret": raw_secret, # Raw base32 for manual authenticator entry "qr_code_base64": qr_base64, # PNG QR code, data:image/png;base64,... "backup_codes": plaintext_codes, # Shown once — user must save these } @router.post("/totp/confirm") async def totp_confirm( data: TOTPConfirmRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Verify the first TOTP code from the authenticator app and enable TOTP. Must be called after /setup while totp_enabled is still False. """ if not current_user.totp_secret: raise HTTPException(status_code=400, detail="TOTP setup not started — call /setup first") if current_user.totp_enabled: raise HTTPException(status_code=400, detail="TOTP is already enabled") matched_window = verify_totp_code(current_user.totp_secret, data.code) if matched_window is None: raise HTTPException(status_code=400, detail="Invalid code — check your authenticator app time sync") current_user.totp_enabled = True await db.commit() return {"message": "TOTP enabled successfully"} @router.post("/totp-verify") async def totp_verify( data: TOTPVerifyRequest, request: Request, response: Response, db: AsyncSession = Depends(get_db), ): """ MFA challenge endpoint — called after a successful password login when TOTP is enabled. Accepts either a 6-digit TOTP code or a backup recovery code. Uses the short-lived mfa_token (from POST /login) instead of a session cookie because the user is not yet fully authenticated at this stage. On success: issues a full session cookie and returns { authenticated: true }. """ if not data.code and not data.backup_code: raise HTTPException(status_code=422, detail="Provide either 'code' or 'backup_code'") # Validate the MFA challenge token (5-minute TTL) user_id = verify_mfa_token(data.mfa_token) if user_id is None: raise HTTPException(status_code=401, detail="MFA session expired — please log in again") result = await db.execute(select(User).where(User.id == user_id, User.is_active == True)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=401, detail="User not found or inactive") if not user.totp_enabled or not user.totp_secret: raise HTTPException(status_code=400, detail="TOTP not configured for this account") # Check account lockout (shared counter with password failures) if user.locked_until and datetime.now() < user.locked_until: remaining = int((user.locked_until - datetime.now()).total_seconds() / 60) + 1 raise HTTPException( status_code=423, detail=f"Account locked. Try again in {remaining} minutes.", ) # --- Backup code path --- if data.backup_code: normalized = data.backup_code.strip().upper() valid = await _verify_backup_code(db, user.id, normalized) if not valid: user.failed_login_count += 1 if user.failed_login_count >= 10: user.locked_until = datetime.now() + timedelta(minutes=30) await db.commit() raise HTTPException(status_code=401, detail="Invalid backup code") # Backup code accepted — reset lockout counter and issue session user.failed_login_count = 0 user.locked_until = None user.last_login_at = datetime.now() await db.commit() token = await _create_full_session(db, user, request) _set_session_cookie(response, token) return {"authenticated": True} # --- TOTP code path --- matched_window = verify_totp_code(user.totp_secret, data.code) if matched_window is None: user.failed_login_count += 1 if user.failed_login_count >= 10: user.locked_until = datetime.now() + timedelta(minutes=30) await db.commit() raise HTTPException(status_code=401, detail="Invalid code") # Replay prevention — record (user_id, code, actual_matching_window) totp_record = TOTPUsage(user_id=user.id, code=data.code, window=matched_window) db.add(totp_record) try: await db.commit() except IntegrityError: await db.rollback() raise HTTPException(status_code=401, detail="Code already used — wait for the next code") # Success — reset lockout counter, update last_login_at, issue full session user.failed_login_count = 0 user.locked_until = None user.last_login_at = datetime.now() await db.commit() token = await _create_full_session(db, user, request) _set_session_cookie(response, token) return {"authenticated": True} @router.post("/totp/disable") async def totp_disable( data: TOTPDisableRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Disable TOTP for the current user. Requires both current password AND a valid TOTP code as confirmation. Clears totp_secret, sets totp_enabled=False, and deletes all backup codes. """ if not current_user.totp_enabled: raise HTTPException(status_code=400, detail="TOTP is not enabled") # Verify password (handles bcrypt→Argon2id upgrade transparently) # AC-2: async wrapper to avoid blocking event loop valid, new_hash = await averify_password_with_upgrade(data.password, current_user.password_hash) if not valid: raise HTTPException(status_code=401, detail="Invalid password") if new_hash: current_user.password_hash = new_hash # Verify TOTP code — both checks required for disable matched_window = verify_totp_code(current_user.totp_secret, data.code) if matched_window is None: raise HTTPException(status_code=401, detail="Invalid TOTP code") # All checks passed — disable TOTP current_user.totp_secret = None current_user.totp_enabled = False await _delete_backup_codes(db, current_user.id) await db.commit() return {"message": "TOTP disabled successfully"} @router.post("/totp/backup-codes/regenerate") async def regenerate_backup_codes( data: BackupCodesRegenerateRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Regenerate backup recovery codes. Requires current password AND a valid TOTP code. Deletes all existing backup codes and generates 10 fresh ones. Returns plaintext codes once — never retrievable again. """ if not current_user.totp_enabled: raise HTTPException(status_code=400, detail="TOTP is not enabled") # AC-2: async wrapper to avoid blocking event loop valid, new_hash = await averify_password_with_upgrade(data.password, current_user.password_hash) if not valid: raise HTTPException(status_code=401, detail="Invalid password") if new_hash: current_user.password_hash = new_hash await db.commit() matched_window = verify_totp_code(current_user.totp_secret, data.code) if matched_window is None: raise HTTPException(status_code=401, detail="Invalid TOTP code") # Regenerate plaintext_codes = generate_backup_codes(10) await _delete_backup_codes(db, current_user.id) await _store_backup_codes(db, current_user.id, plaintext_codes) return {"backup_codes": plaintext_codes} @router.post("/totp/enforce-setup") async def enforce_setup_totp( data: EnforceSetupRequest, db: AsyncSession = Depends(get_db), ): """ Generate TOTP secret + QR code + backup codes during MFA enforcement. Called after login returns mfa_setup_required=True. Uses the mfa_enforce_token (not a session cookie) because the user is not yet fully authenticated. Idempotent: regenerates secret if called again before confirm. Returns { secret, qr_code_base64, backup_codes }. """ user_id = verify_mfa_enforce_token(data.mfa_token) if user_id is None: raise HTTPException(status_code=401, detail="Invalid or expired enforcement token — please log in again") result = await db.execute(select(User).where(User.id == user_id, User.is_active == True)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=401, detail="User not found or inactive") if not user.mfa_enforce_pending: raise HTTPException(status_code=400, detail="MFA enforcement is not pending for this account") if user.totp_enabled: raise HTTPException(status_code=400, detail="TOTP is already enabled for this account") # Generate new secret (idempotent — overwrite any unconfirmed secret) raw_secret = generate_totp_secret() encrypted_secret = encrypt_totp_secret(raw_secret) user.totp_secret = encrypted_secret user.totp_enabled = False # Not enabled until enforce-confirm called # Generate backup codes — hash before storage, return plaintext once plaintext_codes = generate_backup_codes(10) await _delete_backup_codes(db, user.id) await _store_backup_codes(db, user.id, plaintext_codes) await db.commit() uri = get_totp_uri(encrypted_secret, user.username) qr_base64 = generate_qr_base64(uri) return { "secret": raw_secret, "qr_code_base64": qr_base64, "backup_codes": plaintext_codes, } @router.post("/totp/enforce-confirm") async def enforce_confirm_totp( data: EnforceConfirmRequest, request: Request, response: Response, db: AsyncSession = Depends(get_db), ): """ Confirm TOTP setup during enforcement, clear the pending flag, issue a full session. Must be called after /totp/enforce-setup while totp_enabled is still False. On success: enables TOTP, clears mfa_enforce_pending, sets session cookie, returns { authenticated: true }. """ user_id = verify_mfa_enforce_token(data.mfa_token) if user_id is None: raise HTTPException(status_code=401, detail="Invalid or expired enforcement token — please log in again") result = await db.execute(select(User).where(User.id == user_id, User.is_active == True)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=401, detail="User not found or inactive") if not user.mfa_enforce_pending: raise HTTPException(status_code=400, detail="MFA enforcement is not pending for this account") if not user.totp_secret: raise HTTPException(status_code=400, detail="TOTP setup not started — call /totp/enforce-setup first") if user.totp_enabled: raise HTTPException(status_code=400, detail="TOTP is already enabled") # Verify the confirmation code matched_window = verify_totp_code(user.totp_secret, data.code) if matched_window is None: raise HTTPException(status_code=400, detail="Invalid code — check your authenticator app time sync") # Enable TOTP and clear the enforcement flag user.totp_enabled = True user.mfa_enforce_pending = False user.last_login_at = datetime.now() await db.commit() # Issue a full session token = await _create_full_session(db, user, request) _set_session_cookie(response, token) return {"authenticated": True} @router.get("/totp/status") async def totp_status( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return TOTP enabled state and count of remaining (unused) backup codes.""" remaining = 0 if current_user.totp_enabled: result = await db.execute( select(BackupCode).where( BackupCode.user_id == current_user.id, BackupCode.used_at.is_(None), ) ) remaining = len(result.scalars().all()) return { "enabled": current_user.totp_enabled, "backup_codes_remaining": remaining, }