""" Authentication router — username/password with DB-backed sessions and account lockout. Session flow: POST /setup → create User + Settings row → issue session cookie POST /login → verify credentials → check lockout → insert UserSession → issue cookie → if TOTP enabled: return mfa_token instead of full session POST /logout → mark session revoked in DB → delete cookie GET /status → verify user exists + session valid Security layers: 1. IP-based in-memory rate limit (5 attempts / 5 min) — outer guard, username enumeration 2. DB-backed account lockout (10 failures → 30-min lock, HTTP 423) — per-user guard 3. Session revocation stored in DB (survives container restarts) 4. bcrypt→Argon2id transparent upgrade on first login with migrated hash """ import uuid import time from collections import defaultdict from datetime import datetime, timedelta from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Request, Response, Cookie from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.database import get_db from app.models.user import User from app.models.session import UserSession from app.models.settings import Settings from app.schemas.auth import SetupRequest, LoginRequest, ChangePasswordRequest, VerifyPasswordRequest from app.services.auth import ( hash_password, verify_password_with_upgrade, create_session_token, verify_session_token, create_mfa_token, ) from app.config import settings as app_settings router = APIRouter() # --------------------------------------------------------------------------- # IP-based in-memory rate limit (retained as outer layer for all login attempts) # --------------------------------------------------------------------------- _failed_attempts: dict[str, list[float]] = defaultdict(list) _MAX_IP_ATTEMPTS = 5 _IP_WINDOW_SECONDS = 300 # 5 minutes _MAX_TRACKED_IPS = 10000 # cap to prevent unbounded memory growth def _check_ip_rate_limit(ip: str) -> None: """Raise 429 if the IP has exceeded the failure window.""" now = time.time() # Purge all stale entries if the dict is oversized (spray attack defense) if len(_failed_attempts) > _MAX_TRACKED_IPS: stale_ips = [k for k, v in _failed_attempts.items() if all(now - t >= _IP_WINDOW_SECONDS for t in v)] for k in stale_ips: del _failed_attempts[k] # If still over cap after purge, clear everything (all entries are within window # but we can't let memory grow unbounded — login will still hit account lockout) if len(_failed_attempts) > _MAX_TRACKED_IPS: _failed_attempts.clear() _failed_attempts[ip] = [t for t in _failed_attempts[ip] if now - t < _IP_WINDOW_SECONDS] if not _failed_attempts[ip]: _failed_attempts.pop(ip, None) elif len(_failed_attempts[ip]) >= _MAX_IP_ATTEMPTS: raise HTTPException( status_code=429, detail="Too many failed login attempts. Try again in a few minutes.", ) def _record_ip_failure(ip: str) -> None: _failed_attempts[ip].append(time.time()) # --------------------------------------------------------------------------- # Cookie helper # --------------------------------------------------------------------------- def _set_session_cookie(response: Response, token: str) -> None: response.set_cookie( key="session", value=token, httponly=True, secure=app_settings.COOKIE_SECURE, max_age=app_settings.SESSION_MAX_AGE_DAYS * 86400, samesite="lax", ) # --------------------------------------------------------------------------- # Auth dependencies — export get_current_user and get_current_settings # --------------------------------------------------------------------------- async def get_current_user( request: Request, session_cookie: Optional[str] = Cookie(None, alias="session"), db: AsyncSession = Depends(get_db), ) -> User: """ Dependency that verifies the session cookie and returns the authenticated User. Replaces the old get_current_session (which returned Settings). Any router that hasn't been updated will get a compile-time type error. """ if not session_cookie: raise HTTPException(status_code=401, detail="Not authenticated") payload = verify_session_token(session_cookie) if payload is None: raise HTTPException(status_code=401, detail="Invalid or expired session") user_id: int = payload.get("uid") session_id: str = payload.get("sid") if user_id is None or session_id is None: raise HTTPException(status_code=401, detail="Malformed session token") # Verify session is active in DB (covers revocation + expiry) session_result = await db.execute( select(UserSession).where( UserSession.id == session_id, UserSession.user_id == user_id, UserSession.revoked == False, UserSession.expires_at > datetime.now(), ) ) db_session = session_result.scalar_one_or_none() if not db_session: raise HTTPException(status_code=401, detail="Session has been revoked or expired") user_result = await db.execute( select(User).where(User.id == user_id, User.is_active == True) ) user = user_result.scalar_one_or_none() if not user: raise HTTPException(status_code=401, detail="User not found or inactive") return user async def get_current_settings( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ) -> Settings: """ Convenience dependency for routers that need Settings access. Always chain after get_current_user — never use standalone. """ result = await db.execute( select(Settings).where(Settings.user_id == current_user.id) ) settings_obj = result.scalar_one_or_none() if not settings_obj: raise HTTPException(status_code=500, detail="Settings not found for user") return settings_obj # --------------------------------------------------------------------------- # Account lockout helpers # --------------------------------------------------------------------------- async def _check_account_lockout(user: User) -> None: """Raise HTTP 423 if the account is currently locked.""" if user.locked_until and datetime.now() < user.locked_until: remaining = int((user.locked_until - datetime.now()).total_seconds() / 60) + 1 raise HTTPException( status_code=423, detail=f"Account locked. Try again in {remaining} minutes.", ) async def _record_failed_login(db: AsyncSession, user: User) -> None: """Increment failure counter; lock account after 10 failures.""" user.failed_login_count += 1 if user.failed_login_count >= 10: user.locked_until = datetime.now() + timedelta(minutes=30) await db.commit() async def _record_successful_login(db: AsyncSession, user: User) -> None: """Reset failure counter and update last_login_at.""" user.failed_login_count = 0 user.locked_until = None user.last_login_at = datetime.now() await db.commit() # --------------------------------------------------------------------------- # Session creation helper # --------------------------------------------------------------------------- async def _create_db_session( db: AsyncSession, user: User, ip: str, user_agent: str | None, ) -> tuple[str, str]: """Insert a UserSession row and return (session_id, signed_cookie_token).""" session_id = uuid.uuid4().hex expires_at = datetime.now() + timedelta(days=app_settings.SESSION_MAX_AGE_DAYS) db_session = UserSession( id=session_id, user_id=user.id, expires_at=expires_at, ip_address=ip[:45] if ip else None, # clamp to column width user_agent=(user_agent or "")[:255] if user_agent else None, ) db.add(db_session) await db.commit() token = create_session_token(user.id, session_id) return session_id, token # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @router.post("/setup") async def setup( data: SetupRequest, response: Response, request: Request, db: AsyncSession = Depends(get_db), ): """ First-time setup: create the User record and a linked Settings row. Only works when no users exist (i.e., fresh install). """ existing = await db.execute(select(User)) if existing.scalar_one_or_none(): raise HTTPException(status_code=400, detail="Setup already completed") password_hash = hash_password(data.password) new_user = User(username=data.username, password_hash=password_hash) db.add(new_user) await db.flush() # assign new_user.id before creating Settings # Create Settings row linked to this user with all defaults new_settings = Settings(user_id=new_user.id) db.add(new_settings) await db.commit() ip = request.client.host if request.client else "unknown" user_agent = request.headers.get("user-agent") _, token = await _create_db_session(db, new_user, ip, user_agent) _set_session_cookie(response, token) return {"message": "Setup completed successfully", "authenticated": True} @router.post("/login") async def login( data: LoginRequest, request: Request, response: Response, db: AsyncSession = Depends(get_db), ): """ Authenticate with username + password. Returns: { authenticated: true } — on success (no TOTP) { authenticated: false, totp_required: true, mfa_token: "..." } — TOTP pending HTTP 401 — wrong credentials (generic; never reveals which field is wrong) HTTP 423 — account locked HTTP 429 — IP rate limited """ client_ip = request.client.host if request.client else "unknown" _check_ip_rate_limit(client_ip) # Lookup user — do NOT differentiate "user not found" from "wrong password" result = await db.execute(select(User).where(User.username == data.username)) user = result.scalar_one_or_none() if not user: _record_ip_failure(client_ip) raise HTTPException(status_code=401, detail="Invalid username or password") await _check_account_lockout(user) # Transparent bcrypt→Argon2id upgrade valid, new_hash = verify_password_with_upgrade(data.password, user.password_hash) if not valid: _record_ip_failure(client_ip) await _record_failed_login(db, user) raise HTTPException(status_code=401, detail="Invalid username or password") # Persist upgraded hash if migration happened if new_hash: user.password_hash = new_hash # Clear IP failures and update user state _failed_attempts.pop(client_ip, None) await _record_successful_login(db, user) # If TOTP is enabled, issue a short-lived MFA challenge token instead of a full session if user.totp_enabled: mfa_token = create_mfa_token(user.id) return { "authenticated": False, "totp_required": True, "mfa_token": mfa_token, } user_agent = request.headers.get("user-agent") _, token = await _create_db_session(db, user, client_ip, user_agent) _set_session_cookie(response, token) return {"authenticated": True} @router.post("/logout") async def logout( response: Response, session_cookie: Optional[str] = Cookie(None, alias="session"), db: AsyncSession = Depends(get_db), ): """Revoke the current session in DB and clear the cookie.""" if session_cookie: payload = verify_session_token(session_cookie) if payload: session_id = payload.get("sid") if session_id: result = await db.execute( select(UserSession).where(UserSession.id == session_id) ) db_session = result.scalar_one_or_none() if db_session: db_session.revoked = True await db.commit() response.delete_cookie( key="session", httponly=True, secure=app_settings.COOKIE_SECURE, samesite="lax", ) return {"message": "Logout successful"} @router.get("/status") async def auth_status( session_cookie: Optional[str] = Cookie(None, alias="session"), db: AsyncSession = Depends(get_db), ): """ Check authentication status and whether initial setup has been performed. Used by the frontend to decide whether to show login vs setup screen. """ user_result = await db.execute(select(User)) existing_user = user_result.scalar_one_or_none() setup_required = existing_user is None authenticated = False if not setup_required and session_cookie: payload = verify_session_token(session_cookie) if payload: user_id = payload.get("uid") session_id = payload.get("sid") if user_id and session_id: session_result = await db.execute( select(UserSession).where( UserSession.id == session_id, UserSession.user_id == user_id, UserSession.revoked == False, UserSession.expires_at > datetime.now(), ) ) authenticated = session_result.scalar_one_or_none() is not None return {"authenticated": authenticated, "setup_required": setup_required} @router.post("/verify-password") async def verify_password( data: VerifyPasswordRequest, request: Request, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Verify the current user's password without changing anything. Used by the frontend lock screen to re-authenticate without a full login. Also handles transparent bcrypt→Argon2id upgrade. Shares the same rate-limit and lockout guards as /login. """ client_ip = request.client.host if request.client else "unknown" _check_ip_rate_limit(client_ip) await _check_account_lockout(current_user) valid, new_hash = verify_password_with_upgrade(data.password, current_user.password_hash) if not valid: _record_ip_failure(client_ip) await _record_failed_login(db, current_user) raise HTTPException(status_code=401, detail="Invalid password") _failed_attempts.pop(client_ip, None) # Persist upgraded hash if migration happened if new_hash: current_user.password_hash = new_hash await db.commit() return {"verified": True} @router.post("/change-password") async def change_password( data: ChangePasswordRequest, request: Request, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Change the current user's password. Requires old password verification.""" client_ip = request.client.host if request.client else "unknown" _check_ip_rate_limit(client_ip) await _check_account_lockout(current_user) valid, _ = verify_password_with_upgrade(data.old_password, current_user.password_hash) if not valid: _record_ip_failure(client_ip) await _record_failed_login(db, current_user) raise HTTPException(status_code=401, detail="Invalid current password") current_user.password_hash = hash_password(data.new_password) await db.commit() return {"message": "Password changed successfully"}