Kyle Pope 27c65ce40d Fix Round 2 code review findings: type safety, security, and correctness
Backend:
- Add Literal types for status/priority fields (project_task, todo, project schemas)
- Add AccentColor Literal validation to prevent CSS injection (settings schema)
- Add PIN max-length (72 char bcrypt limit) validation
- Fix event date filtering to use correct range overlap logic
- Add revocation check to auth_status endpoint for consistency
- Config: env-aware SECRET_KEY fail-fast, configurable COOKIE_SECURE

Frontend:
- Add withCredentials to axios for cross-origin cookie support
- Replace .toISOString() with local date formatter in DashboardPage
- Replace `as any` casts with proper indexed type access in forms
- Nginx: add CSP, Referrer-Policy headers; remove deprecated X-XSS-Protection
- Nginx: duplicate security headers in static asset location block

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 15:18:49 +08:00

208 lines
6.3 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Response, Cookie, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import Optional
from collections import defaultdict
import time
import bcrypt
from itsdangerous import TimestampSigner, BadSignature
from app.database import get_db
from app.models.settings import Settings
from app.schemas.settings import SettingsCreate
from app.config import settings as app_settings
router = APIRouter()
# Initialize signer for session management
signer = TimestampSigner(app_settings.SECRET_KEY)
# Brute-force protection: track failed login attempts per IP
_failed_attempts: dict[str, list[float]] = defaultdict(list)
_MAX_ATTEMPTS = 5
_WINDOW_SECONDS = 300 # 5-minute lockout window
# Server-side session revocation (in-memory, sufficient for single-user app)
_revoked_sessions: set[str] = set()
def _check_rate_limit(ip: str) -> None:
"""Raise 429 if IP has exceeded failed login attempts."""
now = time.time()
attempts = _failed_attempts[ip]
# Prune old entries outside the window
_failed_attempts[ip] = [t for t in attempts if now - t < _WINDOW_SECONDS]
# Remove the key entirely if no recent attempts remain
if not _failed_attempts[ip]:
del _failed_attempts[ip]
elif len(_failed_attempts[ip]) >= _MAX_ATTEMPTS:
raise HTTPException(
status_code=429,
detail="Too many failed login attempts. Try again in a few minutes.",
)
def _record_failed_attempt(ip: str) -> None:
"""Record a failed login attempt for the given IP."""
_failed_attempts[ip].append(time.time())
def hash_pin(pin: str) -> str:
"""Hash a PIN using bcrypt."""
return bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode()
def verify_pin(pin: str, hashed: str) -> bool:
"""Verify a PIN against its hash."""
return bcrypt.checkpw(pin.encode(), hashed.encode())
def create_session_token(user_id: int) -> str:
"""Create a signed session token."""
return signer.sign(str(user_id)).decode()
def verify_session_token(token: str) -> Optional[int]:
"""Verify and extract user ID from session token."""
try:
unsigned = signer.unsign(token, max_age=86400 * 30) # 30 days
return int(unsigned)
except (BadSignature, ValueError):
return None
def _set_session_cookie(response: Response, token: str) -> None:
"""Set the session cookie with secure defaults."""
response.set_cookie(
key="session",
value=token,
httponly=True,
secure=app_settings.COOKIE_SECURE,
max_age=86400 * 30, # 30 days
samesite="lax",
)
async def get_current_session(
session_cookie: Optional[str] = Cookie(None, alias="session"),
db: AsyncSession = Depends(get_db)
) -> Settings:
"""Dependency to verify session and return current settings."""
if not session_cookie:
raise HTTPException(status_code=401, detail="Not authenticated")
# Check if session has been revoked
if session_cookie in _revoked_sessions:
raise HTTPException(status_code=401, detail="Session has been revoked")
user_id = verify_session_token(session_cookie)
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid or expired session")
result = await db.execute(select(Settings).where(Settings.id == user_id))
settings_obj = result.scalar_one_or_none()
if not settings_obj:
raise HTTPException(status_code=401, detail="Session invalid")
return settings_obj
@router.post("/setup")
async def setup_pin(
data: SettingsCreate,
response: Response,
db: AsyncSession = Depends(get_db)
):
"""Create initial PIN. Only works if no settings exist."""
result = await db.execute(select(Settings).with_for_update())
existing = result.scalar_one_or_none()
if existing:
raise HTTPException(status_code=400, detail="Setup already completed")
pin_hash = hash_pin(data.pin)
new_settings = Settings(pin_hash=pin_hash)
db.add(new_settings)
await db.commit()
await db.refresh(new_settings)
# Create session
token = create_session_token(new_settings.id)
_set_session_cookie(response, token)
return {"message": "Setup completed successfully", "authenticated": True}
@router.post("/login")
async def login(
data: SettingsCreate,
request: Request,
response: Response,
db: AsyncSession = Depends(get_db)
):
"""Verify PIN and create session."""
client_ip = request.client.host if request.client else "unknown"
_check_rate_limit(client_ip)
result = await db.execute(select(Settings))
settings_obj = result.scalar_one_or_none()
if not settings_obj:
raise HTTPException(status_code=400, detail="Setup required")
if not verify_pin(data.pin, settings_obj.pin_hash):
_record_failed_attempt(client_ip)
raise HTTPException(status_code=401, detail="Invalid PIN")
# Clear failed attempts on successful login
_failed_attempts.pop(client_ip, None)
# Create session
token = create_session_token(settings_obj.id)
_set_session_cookie(response, token)
return {"message": "Login successful", "authenticated": True}
@router.post("/logout")
async def logout(
response: Response,
session_cookie: Optional[str] = Cookie(None, alias="session")
):
"""Clear session cookie and invalidate server-side session."""
if session_cookie:
_revoked_sessions.add(session_cookie)
response.delete_cookie(
key="session",
httponly=True,
secure=app_settings.COOKIE_SECURE,
samesite="lax"
)
return {"message": "Logout successful"}
@router.get("/status")
async def auth_status(
session_cookie: Optional[str] = Cookie(None, alias="session"),
db: AsyncSession = Depends(get_db)
):
"""Check authentication status."""
result = await db.execute(select(Settings))
settings_obj = result.scalar_one_or_none()
setup_required = settings_obj is None
authenticated = False
if not setup_required and session_cookie:
if session_cookie in _revoked_sessions:
authenticated = False
else:
user_id = verify_session_token(session_cookie)
authenticated = user_id is not None
return {
"authenticated": authenticated,
"setup_required": setup_required
}