from fastapi import APIRouter, Depends, HTTPException, Response, Cookie, Request from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from typing import Optional from collections import defaultdict import time import bcrypt from itsdangerous import TimestampSigner, BadSignature from app.database import get_db from app.models.settings import Settings from app.schemas.settings import SettingsCreate from app.config import settings as app_settings router = APIRouter() # Initialize signer for session management signer = TimestampSigner(app_settings.SECRET_KEY) # Brute-force protection: track failed login attempts per IP _failed_attempts: dict[str, list[float]] = defaultdict(list) _MAX_ATTEMPTS = 5 _WINDOW_SECONDS = 300 # 5-minute lockout window def _check_rate_limit(ip: str) -> None: """Raise 429 if IP has exceeded failed login attempts.""" now = time.time() attempts = _failed_attempts[ip] # Prune old entries outside the window _failed_attempts[ip] = [t for t in attempts if now - t < _WINDOW_SECONDS] if len(_failed_attempts[ip]) >= _MAX_ATTEMPTS: raise HTTPException( status_code=429, detail="Too many failed login attempts. Try again in a few minutes.", ) def _record_failed_attempt(ip: str) -> None: """Record a failed login attempt for the given IP.""" _failed_attempts[ip].append(time.time()) def hash_pin(pin: str) -> str: """Hash a PIN using bcrypt.""" return bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode() def verify_pin(pin: str, hashed: str) -> bool: """Verify a PIN against its hash.""" return bcrypt.checkpw(pin.encode(), hashed.encode()) def create_session_token(user_id: int) -> str: """Create a signed session token.""" return signer.sign(str(user_id)).decode() def verify_session_token(token: str) -> Optional[int]: """Verify and extract user ID from session token.""" try: unsigned = signer.unsign(token, max_age=86400 * 30) # 30 days return int(unsigned) except (BadSignature, ValueError): return None def _set_session_cookie(response: Response, token: str) -> None: """Set the session cookie with secure defaults.""" response.set_cookie( key="session", value=token, httponly=True, secure=True, max_age=86400 * 30, # 30 days samesite="lax", ) async def get_current_session( session_cookie: Optional[str] = Cookie(None, alias="session"), db: AsyncSession = Depends(get_db) ) -> Settings: """Dependency to verify session and return current settings.""" if not session_cookie: raise HTTPException(status_code=401, detail="Not authenticated") user_id = verify_session_token(session_cookie) if user_id is None: raise HTTPException(status_code=401, detail="Invalid or expired session") result = await db.execute(select(Settings).where(Settings.id == user_id)) settings_obj = result.scalar_one_or_none() if not settings_obj: raise HTTPException(status_code=401, detail="Session invalid") return settings_obj @router.post("/setup") async def setup_pin( data: SettingsCreate, response: Response, db: AsyncSession = Depends(get_db) ): """Create initial PIN. Only works if no settings exist.""" result = await db.execute(select(Settings)) existing = result.scalar_one_or_none() if existing: raise HTTPException(status_code=400, detail="Setup already completed") pin_hash = hash_pin(data.pin) new_settings = Settings(pin_hash=pin_hash) db.add(new_settings) await db.commit() await db.refresh(new_settings) # Create session token = create_session_token(new_settings.id) _set_session_cookie(response, token) return {"message": "Setup completed successfully", "authenticated": True} @router.post("/login") async def login( data: SettingsCreate, request: Request, response: Response, db: AsyncSession = Depends(get_db) ): """Verify PIN and create session.""" client_ip = request.client.host if request.client else "unknown" _check_rate_limit(client_ip) result = await db.execute(select(Settings)) settings_obj = result.scalar_one_or_none() if not settings_obj: raise HTTPException(status_code=400, detail="Setup required") if not verify_pin(data.pin, settings_obj.pin_hash): _record_failed_attempt(client_ip) raise HTTPException(status_code=401, detail="Invalid PIN") # Clear failed attempts on successful login _failed_attempts.pop(client_ip, None) # Create session token = create_session_token(settings_obj.id) _set_session_cookie(response, token) return {"message": "Login successful", "authenticated": True} @router.post("/logout") async def logout(response: Response): """Clear session cookie.""" response.delete_cookie(key="session") return {"message": "Logout successful"} @router.get("/status") async def auth_status( session_cookie: Optional[str] = Cookie(None, alias="session"), db: AsyncSession = Depends(get_db) ): """Check authentication status.""" result = await db.execute(select(Settings)) settings_obj = result.scalar_one_or_none() setup_required = settings_obj is None authenticated = False if not setup_required and session_cookie: user_id = verify_session_token(session_cookie) authenticated = user_id is not None return { "authenticated": authenticated, "setup_required": setup_required }