""" TOTP MFA router. Endpoints (all under /api/auth — registered in main.py with prefix="/api/auth"): POST /totp/setup — Generate secret + QR + backup codes (auth required) POST /totp/confirm — Verify first code, enable TOTP (auth required) POST /totp-verify — MFA challenge: mfa_token + TOTP/backup code, issues session POST /totp/disable — Disable TOTP (auth required, needs password + code) POST /totp/backup-codes/regenerate — Regenerate backup codes (auth required, needs password + code) GET /totp/status — { enabled, backup_codes_remaining } (auth required) Security: - TOTP secrets encrypted at rest (Fernet/AES-128-CBC, key derived from SECRET_KEY) - Replay prevention via totp_usage table (unique on user_id+code+window) - Backup codes hashed with Argon2id, shown plaintext once only - Failed TOTP attempts increment user.failed_login_count (shared lockout counter) - totp-verify uses mfa_token (not session cookie) — user is not yet authenticated """ import uuid import secrets import logging from datetime import datetime, timedelta from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Request, Response from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete from sqlalchemy.exc import IntegrityError from app.database import get_db from app.models.user import User from app.models.session import UserSession from app.models.totp_usage import TOTPUsage from app.models.backup_code import BackupCode from app.routers.auth import get_current_user, _set_session_cookie from app.services.auth import ( verify_password_with_upgrade, hash_password, verify_mfa_token, create_session_token, ) from app.services.totp import ( generate_totp_secret, encrypt_totp_secret, decrypt_totp_secret, get_totp_uri, verify_totp_code, generate_qr_base64, generate_backup_codes, ) from app.config import settings as app_settings # Argon2id for backup code hashing — treat each code like a password from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError logger = logging.getLogger(__name__) router = APIRouter() # Argon2id instance for backup code hashes (same params as password hashing) _ph = PasswordHasher( time_cost=2, memory_cost=19456, parallelism=1, hash_len=32, salt_len=16, ) # --------------------------------------------------------------------------- # Request schemas # --------------------------------------------------------------------------- class TOTPConfirmRequest(BaseModel): code: str class TOTPVerifyRequest(BaseModel): mfa_token: str code: Optional[str] = None # 6-digit TOTP code backup_code: Optional[str] = None # Alternative: XXXX-XXXX backup code class TOTPDisableRequest(BaseModel): password: str code: str # Current TOTP code required to disable class BackupCodesRegenerateRequest(BaseModel): password: str code: str # Current TOTP code required to regenerate # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- async def _store_backup_codes(db: AsyncSession, user_id: int, plaintext_codes: list[str]) -> None: """Hash and insert backup codes for the given user.""" for code in plaintext_codes: code_hash = _ph.hash(code) db.add(BackupCode(user_id=user_id, code_hash=code_hash)) await db.commit() async def _delete_backup_codes(db: AsyncSession, user_id: int) -> None: """Delete all backup codes for a user.""" await db.execute(delete(BackupCode).where(BackupCode.user_id == user_id)) await db.commit() async def _verify_backup_code( db: AsyncSession, user_id: int, submitted_code: str ) -> bool: """ Check submitted backup code against all unused hashes for the user. On match, marks the code as used. Returns True if a valid unused code was found. Uses Argon2id verification — constant-time by design. """ result = await db.execute( select(BackupCode).where( BackupCode.user_id == user_id, BackupCode.used_at.is_(None), ) ) unused_codes = result.scalars().all() for record in unused_codes: try: if _ph.verify(record.code_hash, submitted_code): record.used_at = datetime.now() await db.commit() return True except (VerifyMismatchError, VerificationError, InvalidHashError): continue return False async def _create_full_session( db: AsyncSession, user: User, request: Request, ) -> str: """Create a UserSession row and return the signed cookie token.""" session_id = uuid.uuid4().hex expires_at = datetime.now() + timedelta(days=app_settings.SESSION_MAX_AGE_DAYS) ip = request.client.host if request.client else None user_agent = request.headers.get("user-agent") db_session = UserSession( id=session_id, user_id=user.id, expires_at=expires_at, ip_address=ip[:45] if ip else None, user_agent=(user_agent or "")[:255] if user_agent else None, ) db.add(db_session) await db.commit() return create_session_token(user.id, session_id) # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @router.post("/totp/setup") async def totp_setup( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Generate a new TOTP secret, QR code, and backup codes. Stores the encrypted secret with totp_enabled=False until confirmed. Idempotent: calling again before confirmation overwrites the unconfirmed secret, so browser refreshes mid-setup generate a fresh QR without error. Returns { secret, qr_code_base64, backup_codes } — the only time plaintext values are shown. The `secret` field is the raw base32 for manual entry. """ # Generate new secret (idempotent — overwrite any existing unconfirmed secret) raw_secret = generate_totp_secret() encrypted_secret = encrypt_totp_secret(raw_secret) current_user.totp_secret = encrypted_secret current_user.totp_enabled = False # Not enabled until /confirm called # Generate backup codes — hash before storage, return plaintext once plaintext_codes = generate_backup_codes(10) await _delete_backup_codes(db, current_user.id) # Remove any previous unconfirmed codes await _store_backup_codes(db, current_user.id, plaintext_codes) await db.commit() # Build QR code from provisioning URI uri = get_totp_uri(encrypted_secret, current_user.username) qr_base64 = generate_qr_base64(uri) return { "secret": raw_secret, # Raw base32 for manual authenticator entry "qr_code_base64": qr_base64, # PNG QR code, data:image/png;base64,... "backup_codes": plaintext_codes, # Shown once — user must save these } @router.post("/totp/confirm") async def totp_confirm( data: TOTPConfirmRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Verify the first TOTP code from the authenticator app and enable TOTP. Must be called after /setup while totp_enabled is still False. """ if not current_user.totp_secret: raise HTTPException(status_code=400, detail="TOTP setup not started — call /setup first") if current_user.totp_enabled: raise HTTPException(status_code=400, detail="TOTP is already enabled") matched_window = verify_totp_code(current_user.totp_secret, data.code) if matched_window is None: raise HTTPException(status_code=400, detail="Invalid code — check your authenticator app time sync") current_user.totp_enabled = True await db.commit() return {"message": "TOTP enabled successfully"} @router.post("/totp-verify") async def totp_verify( data: TOTPVerifyRequest, request: Request, response: Response, db: AsyncSession = Depends(get_db), ): """ MFA challenge endpoint — called after a successful password login when TOTP is enabled. Accepts either a 6-digit TOTP code or a backup recovery code. Uses the short-lived mfa_token (from POST /login) instead of a session cookie because the user is not yet fully authenticated at this stage. On success: issues a full session cookie and returns { authenticated: true }. """ if not data.code and not data.backup_code: raise HTTPException(status_code=422, detail="Provide either 'code' or 'backup_code'") # Validate the MFA challenge token (5-minute TTL) user_id = verify_mfa_token(data.mfa_token) if user_id is None: raise HTTPException(status_code=401, detail="MFA session expired — please log in again") result = await db.execute(select(User).where(User.id == user_id, User.is_active == True)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=401, detail="User not found or inactive") if not user.totp_enabled or not user.totp_secret: raise HTTPException(status_code=400, detail="TOTP not configured for this account") # Check account lockout (shared counter with password failures) if user.locked_until and datetime.now() < user.locked_until: remaining = int((user.locked_until - datetime.now()).total_seconds() / 60) + 1 raise HTTPException( status_code=423, detail=f"Account locked. Try again in {remaining} minutes.", ) # --- Backup code path --- if data.backup_code: normalized = data.backup_code.strip().upper() valid = await _verify_backup_code(db, user.id, normalized) if not valid: user.failed_login_count += 1 if user.failed_login_count >= 10: user.locked_until = datetime.now() + timedelta(minutes=30) await db.commit() raise HTTPException(status_code=401, detail="Invalid backup code") # Backup code accepted — reset lockout counter and issue session user.failed_login_count = 0 user.locked_until = None user.last_login_at = datetime.now() await db.commit() token = await _create_full_session(db, user, request) _set_session_cookie(response, token) return {"authenticated": True} # --- TOTP code path --- matched_window = verify_totp_code(user.totp_secret, data.code) if matched_window is None: user.failed_login_count += 1 if user.failed_login_count >= 10: user.locked_until = datetime.now() + timedelta(minutes=30) await db.commit() raise HTTPException(status_code=401, detail="Invalid code") # Replay prevention — record (user_id, code, actual_matching_window) totp_record = TOTPUsage(user_id=user.id, code=data.code, window=matched_window) db.add(totp_record) try: await db.commit() except IntegrityError: await db.rollback() raise HTTPException(status_code=401, detail="Code already used — wait for the next code") # Success — reset lockout counter, update last_login_at, issue full session user.failed_login_count = 0 user.locked_until = None user.last_login_at = datetime.now() await db.commit() token = await _create_full_session(db, user, request) _set_session_cookie(response, token) return {"authenticated": True} @router.post("/totp/disable") async def totp_disable( data: TOTPDisableRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Disable TOTP for the current user. Requires both current password AND a valid TOTP code as confirmation. Clears totp_secret, sets totp_enabled=False, and deletes all backup codes. """ if not current_user.totp_enabled: raise HTTPException(status_code=400, detail="TOTP is not enabled") # Verify password (handles bcrypt→Argon2id upgrade transparently) valid, new_hash = verify_password_with_upgrade(data.password, current_user.password_hash) if not valid: raise HTTPException(status_code=401, detail="Invalid password") if new_hash: current_user.password_hash = new_hash # Verify TOTP code — both checks required for disable matched_window = verify_totp_code(current_user.totp_secret, data.code) if matched_window is None: raise HTTPException(status_code=401, detail="Invalid TOTP code") # All checks passed — disable TOTP current_user.totp_secret = None current_user.totp_enabled = False await _delete_backup_codes(db, current_user.id) await db.commit() return {"message": "TOTP disabled successfully"} @router.post("/totp/backup-codes/regenerate") async def regenerate_backup_codes( data: BackupCodesRegenerateRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Regenerate backup recovery codes. Requires current password AND a valid TOTP code. Deletes all existing backup codes and generates 10 fresh ones. Returns plaintext codes once — never retrievable again. """ if not current_user.totp_enabled: raise HTTPException(status_code=400, detail="TOTP is not enabled") valid, new_hash = verify_password_with_upgrade(data.password, current_user.password_hash) if not valid: raise HTTPException(status_code=401, detail="Invalid password") if new_hash: current_user.password_hash = new_hash await db.commit() matched_window = verify_totp_code(current_user.totp_secret, data.code) if matched_window is None: raise HTTPException(status_code=401, detail="Invalid TOTP code") # Regenerate plaintext_codes = generate_backup_codes(10) await _delete_backup_codes(db, current_user.id) await _store_backup_codes(db, current_user.id, plaintext_codes) return {"backup_codes": plaintext_codes} @router.get("/totp/status") async def totp_status( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Return TOTP enabled state and count of remaining (unused) backup codes.""" remaining = 0 if current_user.totp_enabled: result = await db.execute( select(BackupCode).where( BackupCode.user_id == current_user.id, BackupCode.used_at.is_(None), ) ) remaining = len(result.scalars().all()) return { "enabled": current_user.totp_enabled, "backup_codes_remaining": remaining, }