UMBRA/backend/app/main.py
Kyle Pope 581efa183a H-01: Add global CSRF middleware for all mutating endpoints
Replace the admin-only verify_xhr dependency with a pure ASGI
CSRFHeaderMiddleware that validates X-Requested-With: XMLHttpRequest
on all POST/PUT/PATCH/DELETE requests globally. Pre-auth endpoints
(login, setup, register, totp-verify, enforce-setup/confirm) are
exempt. This closes the CSRF gap where non-admin routes accepted
requests without origin validation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 15:39:42 +08:00

142 lines
5.4 KiB
Python

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.config import settings
from app.database import engine
from app.routers import auth, todos, events, calendars, reminders, projects, people, locations, settings as settings_router, dashboard, weather, event_templates
from app.routers import totp, admin
from app.jobs.notifications import run_notification_dispatch
# Import models so Alembic's autogenerate can discover them
from app.models import user as _user_model # noqa: F401
from app.models import session as _session_model # noqa: F401
from app.models import totp_usage as _totp_usage_model # noqa: F401
from app.models import backup_code as _backup_code_model # noqa: F401
from app.models import system_config as _system_config_model # noqa: F401
from app.models import audit_log as _audit_log_model # noqa: F401
# ---------------------------------------------------------------------------
# Pure ASGI CSRF middleware — SEC-08 (global)
# ---------------------------------------------------------------------------
class CSRFHeaderMiddleware:
"""
Require X-Requested-With: XMLHttpRequest on all state-mutating requests.
Browsers never send this header cross-origin without a CORS preflight,
which our CORS policy blocks. This prevents CSRF attacks from simple
form submissions and cross-origin fetches.
Uses pure ASGI (not BaseHTTPMiddleware) to avoid streaming/memory overhead.
"""
_EXEMPT_PATHS = frozenset({
"/health",
"/",
"/api/auth/login",
"/api/auth/setup",
"/api/auth/register",
"/api/auth/totp-verify",
"/api/auth/totp/enforce-setup",
"/api/auth/totp/enforce-confirm",
})
_MUTATING_METHODS = frozenset({"POST", "PUT", "PATCH", "DELETE"})
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
method = scope.get("method", "")
path = scope.get("path", "")
if method in self._MUTATING_METHODS and path not in self._EXEMPT_PATHS:
headers = dict(scope.get("headers", []))
if headers.get(b"x-requested-with") != b"XMLHttpRequest":
body = b'{"detail":"Invalid request origin"}'
await send({
"type": "http.response.start",
"status": 403,
"headers": [
[b"content-type", b"application/json"],
[b"content-length", str(len(body)).encode()],
],
})
await send({"type": "http.response.body", "body": body})
return
await self.app(scope, receive, send)
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = AsyncIOScheduler()
scheduler.add_job(
run_notification_dispatch,
"interval",
minutes=1,
id="ntfy_dispatch",
max_instances=1, # prevent overlap if a run takes longer than 60s
)
scheduler.start()
yield
scheduler.shutdown(wait=False)
await engine.dispose()
_is_dev = settings.ENVIRONMENT == "development"
app = FastAPI(
title="UMBRA API",
description="Backend API for UMBRA application",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs" if _is_dev else None,
redoc_url="/redoc" if _is_dev else None,
openapi_url="/openapi.json" if _is_dev else None,
)
# Middleware stack — added in reverse order (last added = outermost).
# CSRF is added first (innermost), then CORS wraps it (outermost).
# This ensures CORS headers appear on CSRF 403 responses.
app.add_middleware(CSRFHeaderMiddleware)
# CORS configuration — outermost layer
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in settings.CORS_ORIGINS.split(",") if o.strip()],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
allow_headers=["Content-Type", "Authorization", "Cookie", "X-Requested-With"],
)
# Include routers with /api prefix
app.include_router(auth.router, prefix="/api/auth", tags=["Authentication"])
app.include_router(todos.router, prefix="/api/todos", tags=["Todos"])
app.include_router(events.router, prefix="/api/events", tags=["Calendar Events"])
app.include_router(calendars.router, prefix="/api/calendars", tags=["Calendars"])
app.include_router(reminders.router, prefix="/api/reminders", tags=["Reminders"])
app.include_router(projects.router, prefix="/api/projects", tags=["Projects"])
app.include_router(people.router, prefix="/api/people", tags=["People"])
app.include_router(locations.router, prefix="/api/locations", tags=["Locations"])
app.include_router(settings_router.router, prefix="/api/settings", tags=["Settings"])
app.include_router(dashboard.router, prefix="/api", tags=["Dashboard"])
app.include_router(weather.router, prefix="/api/weather", tags=["Weather"])
app.include_router(event_templates.router, prefix="/api/event-templates", tags=["Event Templates"])
app.include_router(totp.router, prefix="/api/auth", tags=["TOTP MFA"])
app.include_router(admin.router, prefix="/api/admin", tags=["Admin"])
@app.get("/")
async def root():
return {"message": "UMBRA API is running"}
@app.get("/health")
async def health_check():
return {"status": "healthy"}