Kyle Pope 2f58282c31 M-01+M-03: Add input validation and extra=forbid to all request schemas
- Add max_length constraints to all string fields in request schemas,
  matching DB column limits (title:255, description:5000, etc.)
- Add min_length=1 to required name/title fields
- Add ConfigDict(extra="forbid") to all request schemas to reject
  unknown fields (prevents silent field injection)
- Add Path(ge=1, le=2147483647) to all integer path parameters across
  all routers to prevent integer overflow → 500 errors
- Add max_length to TOTP inline schemas (code:6, mfa_token:256, etc.)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 15:43:55 +08:00

535 lines
19 KiB
Python

"""
TOTP MFA router.
Endpoints (all under /api/auth — registered in main.py with prefix="/api/auth"):
POST /totp/setup — Generate secret + QR + backup codes (auth required)
POST /totp/confirm — Verify first code, enable TOTP (auth required)
POST /totp-verify — MFA challenge: mfa_token + TOTP/backup code, issues session
POST /totp/disable — Disable TOTP (auth required, needs password + code)
POST /totp/backup-codes/regenerate — Regenerate backup codes (auth required, needs password + code)
GET /totp/status — { enabled, backup_codes_remaining } (auth required)
Security:
- TOTP secrets encrypted at rest (Fernet/AES-128-CBC, key derived from SECRET_KEY)
- Replay prevention via totp_usage table (unique on user_id+code+window)
- Backup codes hashed with Argon2id, shown plaintext once only
- Failed TOTP attempts increment user.failed_login_count (shared lockout counter)
- totp-verify uses mfa_token (not session cookie) — user is not yet authenticated
"""
import uuid
import secrets
import logging
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete
from sqlalchemy.exc import IntegrityError
from app.database import get_db
from app.models.user import User
from app.models.session import UserSession
from app.models.totp_usage import TOTPUsage
from app.models.backup_code import BackupCode
from app.routers.auth import get_current_user, _set_session_cookie
from app.services.auth import (
verify_password_with_upgrade,
hash_password,
verify_mfa_token,
verify_mfa_enforce_token,
create_session_token,
)
from app.services.totp import (
generate_totp_secret,
encrypt_totp_secret,
decrypt_totp_secret,
get_totp_uri,
verify_totp_code,
generate_qr_base64,
generate_backup_codes,
)
from app.config import settings as app_settings
# Argon2id for backup code hashing — treat each code like a password
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError
logger = logging.getLogger(__name__)
router = APIRouter()
# Argon2id instance for backup code hashes (same params as password hashing)
_ph = PasswordHasher(
time_cost=2,
memory_cost=19456,
parallelism=1,
hash_len=32,
salt_len=16,
)
# ---------------------------------------------------------------------------
# Request schemas
# ---------------------------------------------------------------------------
class TOTPConfirmRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
code: str = Field(min_length=6, max_length=6)
class TOTPVerifyRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
mfa_token: str = Field(max_length=256)
code: Optional[str] = Field(None, min_length=6, max_length=6) # 6-digit TOTP code
backup_code: Optional[str] = Field(None, max_length=9) # XXXX-XXXX backup code
class TOTPDisableRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
password: str = Field(max_length=128)
code: str = Field(min_length=6, max_length=6) # Current TOTP code required to disable
class BackupCodesRegenerateRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
password: str = Field(max_length=128)
code: str = Field(min_length=6, max_length=6) # Current TOTP code required to regenerate
class EnforceSetupRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
mfa_token: str = Field(max_length=256)
class EnforceConfirmRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
mfa_token: str = Field(max_length=256)
code: str = Field(min_length=6, max_length=6) # 6-digit TOTP code from authenticator app
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
async def _store_backup_codes(db: AsyncSession, user_id: int, plaintext_codes: list[str]) -> None:
"""Hash and insert backup codes for the given user."""
for code in plaintext_codes:
code_hash = _ph.hash(code)
db.add(BackupCode(user_id=user_id, code_hash=code_hash))
await db.commit()
async def _delete_backup_codes(db: AsyncSession, user_id: int) -> None:
"""Delete all backup codes for a user."""
await db.execute(delete(BackupCode).where(BackupCode.user_id == user_id))
await db.commit()
async def _verify_backup_code(
db: AsyncSession, user_id: int, submitted_code: str
) -> bool:
"""
Check submitted backup code against all unused hashes for the user.
On match, marks the code as used. Returns True if a valid unused code was found.
Uses Argon2id verification — constant-time by design.
"""
result = await db.execute(
select(BackupCode).where(
BackupCode.user_id == user_id,
BackupCode.used_at.is_(None),
)
)
unused_codes = result.scalars().all()
for record in unused_codes:
try:
if _ph.verify(record.code_hash, submitted_code):
record.used_at = datetime.now()
await db.commit()
return True
except (VerifyMismatchError, VerificationError, InvalidHashError):
continue
return False
async def _create_full_session(
db: AsyncSession,
user: User,
request: Request,
) -> str:
"""Create a UserSession row and return the signed cookie token."""
session_id = uuid.uuid4().hex
expires_at = datetime.now() + timedelta(days=app_settings.SESSION_MAX_AGE_DAYS)
ip = request.client.host if request.client else None
user_agent = request.headers.get("user-agent")
db_session = UserSession(
id=session_id,
user_id=user.id,
expires_at=expires_at,
ip_address=ip[:45] if ip else None,
user_agent=(user_agent or "")[:255] if user_agent else None,
)
db.add(db_session)
await db.commit()
return create_session_token(user.id, session_id)
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@router.post("/totp/setup")
async def totp_setup(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Generate a new TOTP secret, QR code, and backup codes.
Stores the encrypted secret with totp_enabled=False until confirmed.
Idempotent: calling again before confirmation overwrites the unconfirmed secret,
so browser refreshes mid-setup generate a fresh QR without error.
Returns { secret, qr_code_base64, backup_codes } — the only time plaintext
values are shown. The `secret` field is the raw base32 for manual entry.
"""
# Generate new secret (idempotent — overwrite any existing unconfirmed secret)
raw_secret = generate_totp_secret()
encrypted_secret = encrypt_totp_secret(raw_secret)
current_user.totp_secret = encrypted_secret
current_user.totp_enabled = False # Not enabled until /confirm called
# Generate backup codes — hash before storage, return plaintext once
plaintext_codes = generate_backup_codes(10)
await _delete_backup_codes(db, current_user.id) # Remove any previous unconfirmed codes
await _store_backup_codes(db, current_user.id, plaintext_codes)
await db.commit()
# Build QR code from provisioning URI
uri = get_totp_uri(encrypted_secret, current_user.username)
qr_base64 = generate_qr_base64(uri)
return {
"secret": raw_secret, # Raw base32 for manual authenticator entry
"qr_code_base64": qr_base64, # PNG QR code, data:image/png;base64,...
"backup_codes": plaintext_codes, # Shown once — user must save these
}
@router.post("/totp/confirm")
async def totp_confirm(
data: TOTPConfirmRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Verify the first TOTP code from the authenticator app and enable TOTP.
Must be called after /setup while totp_enabled is still False.
"""
if not current_user.totp_secret:
raise HTTPException(status_code=400, detail="TOTP setup not started — call /setup first")
if current_user.totp_enabled:
raise HTTPException(status_code=400, detail="TOTP is already enabled")
matched_window = verify_totp_code(current_user.totp_secret, data.code)
if matched_window is None:
raise HTTPException(status_code=400, detail="Invalid code — check your authenticator app time sync")
current_user.totp_enabled = True
await db.commit()
return {"message": "TOTP enabled successfully"}
@router.post("/totp-verify")
async def totp_verify(
data: TOTPVerifyRequest,
request: Request,
response: Response,
db: AsyncSession = Depends(get_db),
):
"""
MFA challenge endpoint — called after a successful password login when TOTP is enabled.
Accepts either a 6-digit TOTP code or a backup recovery code.
Uses the short-lived mfa_token (from POST /login) instead of a session cookie
because the user is not yet fully authenticated at this stage.
On success: issues a full session cookie and returns { authenticated: true }.
"""
if not data.code and not data.backup_code:
raise HTTPException(status_code=422, detail="Provide either 'code' or 'backup_code'")
# Validate the MFA challenge token (5-minute TTL)
user_id = verify_mfa_token(data.mfa_token)
if user_id is None:
raise HTTPException(status_code=401, detail="MFA session expired — please log in again")
result = await db.execute(select(User).where(User.id == user_id, User.is_active == True))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=401, detail="User not found or inactive")
if not user.totp_enabled or not user.totp_secret:
raise HTTPException(status_code=400, detail="TOTP not configured for this account")
# Check account lockout (shared counter with password failures)
if user.locked_until and datetime.now() < user.locked_until:
remaining = int((user.locked_until - datetime.now()).total_seconds() / 60) + 1
raise HTTPException(
status_code=423,
detail=f"Account locked. Try again in {remaining} minutes.",
)
# --- Backup code path ---
if data.backup_code:
normalized = data.backup_code.strip().upper()
valid = await _verify_backup_code(db, user.id, normalized)
if not valid:
user.failed_login_count += 1
if user.failed_login_count >= 10:
user.locked_until = datetime.now() + timedelta(minutes=30)
await db.commit()
raise HTTPException(status_code=401, detail="Invalid backup code")
# Backup code accepted — reset lockout counter and issue session
user.failed_login_count = 0
user.locked_until = None
user.last_login_at = datetime.now()
await db.commit()
token = await _create_full_session(db, user, request)
_set_session_cookie(response, token)
return {"authenticated": True}
# --- TOTP code path ---
matched_window = verify_totp_code(user.totp_secret, data.code)
if matched_window is None:
user.failed_login_count += 1
if user.failed_login_count >= 10:
user.locked_until = datetime.now() + timedelta(minutes=30)
await db.commit()
raise HTTPException(status_code=401, detail="Invalid code")
# Replay prevention — record (user_id, code, actual_matching_window)
totp_record = TOTPUsage(user_id=user.id, code=data.code, window=matched_window)
db.add(totp_record)
try:
await db.commit()
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=401, detail="Code already used — wait for the next code")
# Success — reset lockout counter, update last_login_at, issue full session
user.failed_login_count = 0
user.locked_until = None
user.last_login_at = datetime.now()
await db.commit()
token = await _create_full_session(db, user, request)
_set_session_cookie(response, token)
return {"authenticated": True}
@router.post("/totp/disable")
async def totp_disable(
data: TOTPDisableRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Disable TOTP for the current user.
Requires both current password AND a valid TOTP code as confirmation.
Clears totp_secret, sets totp_enabled=False, and deletes all backup codes.
"""
if not current_user.totp_enabled:
raise HTTPException(status_code=400, detail="TOTP is not enabled")
# Verify password (handles bcrypt→Argon2id upgrade transparently)
valid, new_hash = verify_password_with_upgrade(data.password, current_user.password_hash)
if not valid:
raise HTTPException(status_code=401, detail="Invalid password")
if new_hash:
current_user.password_hash = new_hash
# Verify TOTP code — both checks required for disable
matched_window = verify_totp_code(current_user.totp_secret, data.code)
if matched_window is None:
raise HTTPException(status_code=401, detail="Invalid TOTP code")
# All checks passed — disable TOTP
current_user.totp_secret = None
current_user.totp_enabled = False
await _delete_backup_codes(db, current_user.id)
await db.commit()
return {"message": "TOTP disabled successfully"}
@router.post("/totp/backup-codes/regenerate")
async def regenerate_backup_codes(
data: BackupCodesRegenerateRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Regenerate backup recovery codes.
Requires current password AND a valid TOTP code.
Deletes all existing backup codes and generates 10 fresh ones.
Returns plaintext codes once — never retrievable again.
"""
if not current_user.totp_enabled:
raise HTTPException(status_code=400, detail="TOTP is not enabled")
valid, new_hash = verify_password_with_upgrade(data.password, current_user.password_hash)
if not valid:
raise HTTPException(status_code=401, detail="Invalid password")
if new_hash:
current_user.password_hash = new_hash
await db.commit()
matched_window = verify_totp_code(current_user.totp_secret, data.code)
if matched_window is None:
raise HTTPException(status_code=401, detail="Invalid TOTP code")
# Regenerate
plaintext_codes = generate_backup_codes(10)
await _delete_backup_codes(db, current_user.id)
await _store_backup_codes(db, current_user.id, plaintext_codes)
return {"backup_codes": plaintext_codes}
@router.post("/totp/enforce-setup")
async def enforce_setup_totp(
data: EnforceSetupRequest,
db: AsyncSession = Depends(get_db),
):
"""
Generate TOTP secret + QR code + backup codes during MFA enforcement.
Called after login returns mfa_setup_required=True. Uses the mfa_enforce_token
(not a session cookie) because the user is not yet fully authenticated.
Idempotent: regenerates secret if called again before confirm.
Returns { secret, qr_code_base64, backup_codes }.
"""
user_id = verify_mfa_enforce_token(data.mfa_token)
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid or expired enforcement token — please log in again")
result = await db.execute(select(User).where(User.id == user_id, User.is_active == True))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=401, detail="User not found or inactive")
if not user.mfa_enforce_pending:
raise HTTPException(status_code=400, detail="MFA enforcement is not pending for this account")
if user.totp_enabled:
raise HTTPException(status_code=400, detail="TOTP is already enabled for this account")
# Generate new secret (idempotent — overwrite any unconfirmed secret)
raw_secret = generate_totp_secret()
encrypted_secret = encrypt_totp_secret(raw_secret)
user.totp_secret = encrypted_secret
user.totp_enabled = False # Not enabled until enforce-confirm called
# Generate backup codes — hash before storage, return plaintext once
plaintext_codes = generate_backup_codes(10)
await _delete_backup_codes(db, user.id)
await _store_backup_codes(db, user.id, plaintext_codes)
await db.commit()
uri = get_totp_uri(encrypted_secret, user.username)
qr_base64 = generate_qr_base64(uri)
return {
"secret": raw_secret,
"qr_code_base64": qr_base64,
"backup_codes": plaintext_codes,
}
@router.post("/totp/enforce-confirm")
async def enforce_confirm_totp(
data: EnforceConfirmRequest,
request: Request,
response: Response,
db: AsyncSession = Depends(get_db),
):
"""
Confirm TOTP setup during enforcement, clear the pending flag, issue a full session.
Must be called after /totp/enforce-setup while totp_enabled is still False.
On success: enables TOTP, clears mfa_enforce_pending, sets session cookie,
returns { authenticated: true }.
"""
user_id = verify_mfa_enforce_token(data.mfa_token)
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid or expired enforcement token — please log in again")
result = await db.execute(select(User).where(User.id == user_id, User.is_active == True))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=401, detail="User not found or inactive")
if not user.mfa_enforce_pending:
raise HTTPException(status_code=400, detail="MFA enforcement is not pending for this account")
if not user.totp_secret:
raise HTTPException(status_code=400, detail="TOTP setup not started — call /totp/enforce-setup first")
if user.totp_enabled:
raise HTTPException(status_code=400, detail="TOTP is already enabled")
# Verify the confirmation code
matched_window = verify_totp_code(user.totp_secret, data.code)
if matched_window is None:
raise HTTPException(status_code=400, detail="Invalid code — check your authenticator app time sync")
# Enable TOTP and clear the enforcement flag
user.totp_enabled = True
user.mfa_enforce_pending = False
user.last_login_at = datetime.now()
await db.commit()
# Issue a full session
token = await _create_full_session(db, user, request)
_set_session_cookie(response, token)
return {"authenticated": True}
@router.get("/totp/status")
async def totp_status(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return TOTP enabled state and count of remaining (unused) backup codes."""
remaining = 0
if current_user.totp_enabled:
result = await db.execute(
select(BackupCode).where(
BackupCode.user_id == current_user.id,
BackupCode.used_at.is_(None),
)
)
remaining = len(result.scalars().all())
return {
"enabled": current_user.totp_enabled,
"backup_codes_remaining": remaining,
}