Backend: - Add Literal types for status/priority fields (project_task, todo, project schemas) - Add AccentColor Literal validation to prevent CSS injection (settings schema) - Add PIN max-length (72 char bcrypt limit) validation - Fix event date filtering to use correct range overlap logic - Add revocation check to auth_status endpoint for consistency - Config: env-aware SECRET_KEY fail-fast, configurable COOKIE_SECURE Frontend: - Add withCredentials to axios for cross-origin cookie support - Replace .toISOString() with local date formatter in DashboardPage - Replace `as any` casts with proper indexed type access in forms - Nginx: add CSP, Referrer-Policy headers; remove deprecated X-XSS-Protection - Nginx: duplicate security headers in static asset location block Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
208 lines
6.3 KiB
Python
208 lines
6.3 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Response, Cookie, Request
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
from typing import Optional
|
|
from collections import defaultdict
|
|
import time
|
|
import bcrypt
|
|
from itsdangerous import TimestampSigner, BadSignature
|
|
|
|
from app.database import get_db
|
|
from app.models.settings import Settings
|
|
from app.schemas.settings import SettingsCreate
|
|
from app.config import settings as app_settings
|
|
|
|
router = APIRouter()
|
|
|
|
# Initialize signer for session management
|
|
signer = TimestampSigner(app_settings.SECRET_KEY)
|
|
|
|
# Brute-force protection: track failed login attempts per IP
|
|
_failed_attempts: dict[str, list[float]] = defaultdict(list)
|
|
_MAX_ATTEMPTS = 5
|
|
_WINDOW_SECONDS = 300 # 5-minute lockout window
|
|
|
|
# Server-side session revocation (in-memory, sufficient for single-user app)
|
|
_revoked_sessions: set[str] = set()
|
|
|
|
|
|
def _check_rate_limit(ip: str) -> None:
|
|
"""Raise 429 if IP has exceeded failed login attempts."""
|
|
now = time.time()
|
|
attempts = _failed_attempts[ip]
|
|
# Prune old entries outside the window
|
|
_failed_attempts[ip] = [t for t in attempts if now - t < _WINDOW_SECONDS]
|
|
# Remove the key entirely if no recent attempts remain
|
|
if not _failed_attempts[ip]:
|
|
del _failed_attempts[ip]
|
|
elif len(_failed_attempts[ip]) >= _MAX_ATTEMPTS:
|
|
raise HTTPException(
|
|
status_code=429,
|
|
detail="Too many failed login attempts. Try again in a few minutes.",
|
|
)
|
|
|
|
|
|
def _record_failed_attempt(ip: str) -> None:
|
|
"""Record a failed login attempt for the given IP."""
|
|
_failed_attempts[ip].append(time.time())
|
|
|
|
|
|
def hash_pin(pin: str) -> str:
|
|
"""Hash a PIN using bcrypt."""
|
|
return bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
def verify_pin(pin: str, hashed: str) -> bool:
|
|
"""Verify a PIN against its hash."""
|
|
return bcrypt.checkpw(pin.encode(), hashed.encode())
|
|
|
|
|
|
def create_session_token(user_id: int) -> str:
|
|
"""Create a signed session token."""
|
|
return signer.sign(str(user_id)).decode()
|
|
|
|
|
|
def verify_session_token(token: str) -> Optional[int]:
|
|
"""Verify and extract user ID from session token."""
|
|
try:
|
|
unsigned = signer.unsign(token, max_age=86400 * 30) # 30 days
|
|
return int(unsigned)
|
|
except (BadSignature, ValueError):
|
|
return None
|
|
|
|
|
|
def _set_session_cookie(response: Response, token: str) -> None:
|
|
"""Set the session cookie with secure defaults."""
|
|
response.set_cookie(
|
|
key="session",
|
|
value=token,
|
|
httponly=True,
|
|
secure=app_settings.COOKIE_SECURE,
|
|
max_age=86400 * 30, # 30 days
|
|
samesite="lax",
|
|
)
|
|
|
|
|
|
async def get_current_session(
|
|
session_cookie: Optional[str] = Cookie(None, alias="session"),
|
|
db: AsyncSession = Depends(get_db)
|
|
) -> Settings:
|
|
"""Dependency to verify session and return current settings."""
|
|
if not session_cookie:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
# Check if session has been revoked
|
|
if session_cookie in _revoked_sessions:
|
|
raise HTTPException(status_code=401, detail="Session has been revoked")
|
|
|
|
user_id = verify_session_token(session_cookie)
|
|
if user_id is None:
|
|
raise HTTPException(status_code=401, detail="Invalid or expired session")
|
|
|
|
result = await db.execute(select(Settings).where(Settings.id == user_id))
|
|
settings_obj = result.scalar_one_or_none()
|
|
|
|
if not settings_obj:
|
|
raise HTTPException(status_code=401, detail="Session invalid")
|
|
|
|
return settings_obj
|
|
|
|
|
|
@router.post("/setup")
|
|
async def setup_pin(
|
|
data: SettingsCreate,
|
|
response: Response,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Create initial PIN. Only works if no settings exist."""
|
|
result = await db.execute(select(Settings).with_for_update())
|
|
existing = result.scalar_one_or_none()
|
|
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Setup already completed")
|
|
|
|
pin_hash = hash_pin(data.pin)
|
|
new_settings = Settings(pin_hash=pin_hash)
|
|
db.add(new_settings)
|
|
await db.commit()
|
|
await db.refresh(new_settings)
|
|
|
|
# Create session
|
|
token = create_session_token(new_settings.id)
|
|
_set_session_cookie(response, token)
|
|
|
|
return {"message": "Setup completed successfully", "authenticated": True}
|
|
|
|
|
|
@router.post("/login")
|
|
async def login(
|
|
data: SettingsCreate,
|
|
request: Request,
|
|
response: Response,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Verify PIN and create session."""
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
_check_rate_limit(client_ip)
|
|
|
|
result = await db.execute(select(Settings))
|
|
settings_obj = result.scalar_one_or_none()
|
|
|
|
if not settings_obj:
|
|
raise HTTPException(status_code=400, detail="Setup required")
|
|
|
|
if not verify_pin(data.pin, settings_obj.pin_hash):
|
|
_record_failed_attempt(client_ip)
|
|
raise HTTPException(status_code=401, detail="Invalid PIN")
|
|
|
|
# Clear failed attempts on successful login
|
|
_failed_attempts.pop(client_ip, None)
|
|
|
|
# Create session
|
|
token = create_session_token(settings_obj.id)
|
|
_set_session_cookie(response, token)
|
|
|
|
return {"message": "Login successful", "authenticated": True}
|
|
|
|
|
|
@router.post("/logout")
|
|
async def logout(
|
|
response: Response,
|
|
session_cookie: Optional[str] = Cookie(None, alias="session")
|
|
):
|
|
"""Clear session cookie and invalidate server-side session."""
|
|
if session_cookie:
|
|
_revoked_sessions.add(session_cookie)
|
|
response.delete_cookie(
|
|
key="session",
|
|
httponly=True,
|
|
secure=app_settings.COOKIE_SECURE,
|
|
samesite="lax"
|
|
)
|
|
return {"message": "Logout successful"}
|
|
|
|
|
|
@router.get("/status")
|
|
async def auth_status(
|
|
session_cookie: Optional[str] = Cookie(None, alias="session"),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""Check authentication status."""
|
|
result = await db.execute(select(Settings))
|
|
settings_obj = result.scalar_one_or_none()
|
|
|
|
setup_required = settings_obj is None
|
|
authenticated = False
|
|
|
|
if not setup_required and session_cookie:
|
|
if session_cookie in _revoked_sessions:
|
|
authenticated = False
|
|
else:
|
|
user_id = verify_session_token(session_cookie)
|
|
authenticated = user_id is not None
|
|
|
|
return {
|
|
"authenticated": authenticated,
|
|
"setup_required": setup_required
|
|
}
|