UMBRA/backend/app/routers/calendars.py
Kyle Pope d8bdae8ec3 Implement multi-user RBAC: database, auth, routing, admin API (Phases 1-6)
Phase 1: Add role, mfa_enforce_pending, must_change_password to users table.
Create system_config (singleton) and audit_log tables. Migration 026.

Phase 2: Add user_id FK to all 8 data tables (todos, reminders, projects,
calendars, people, locations, event_templates, ntfy_sent) with 4-step
nullable→backfill→FK→NOT NULL pattern. Migrations 027-034.

Phase 3: Harden auth schemas (extra="forbid" on RegisterRequest), add
MFA enforcement token serializer with distinct salt, rewrite auth router
with require_role() factory and registration endpoint.

Phase 4: Scope all 12 routers by user_id, fix dependency type bugs,
bound weather cache (SEC-15), multi-user ntfy dispatch.

Phase 5: Create admin router (14 endpoints), admin schemas, audit
service, rate limiting in nginx. SEC-08 CSRF via X-Requested-With.

Phase 6: Update frontend types, useAuth hook (role/isAdmin/register),
App.tsx (AdminRoute guard), Sidebar (admin link), api.ts (XHR header).

Security findings addressed: SEC-01, SEC-02, SEC-03, SEC-04, SEC-05,
SEC-06, SEC-07, SEC-08, SEC-12, SEC-13, SEC-15.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 19:06:25 +08:00

117 lines
3.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from typing import List
from app.database import get_db
from app.models.calendar import Calendar
from app.models.calendar_event import CalendarEvent
from app.schemas.calendar import CalendarCreate, CalendarUpdate, CalendarResponse
from app.routers.auth import get_current_user
from app.models.user import User
router = APIRouter()
@router.get("/", response_model=List[CalendarResponse])
async def get_calendars(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
result = await db.execute(
select(Calendar)
.where(Calendar.user_id == current_user.id)
.order_by(Calendar.is_default.desc(), Calendar.name.asc())
)
return result.scalars().all()
@router.post("/", response_model=CalendarResponse, status_code=201)
async def create_calendar(
calendar: CalendarCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
new_calendar = Calendar(
name=calendar.name,
color=calendar.color,
is_default=False,
is_system=False,
is_visible=True,
user_id=current_user.id,
)
db.add(new_calendar)
await db.commit()
await db.refresh(new_calendar)
return new_calendar
@router.put("/{calendar_id}", response_model=CalendarResponse)
async def update_calendar(
calendar_id: int,
calendar_update: CalendarUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
result = await db.execute(
select(Calendar).where(Calendar.id == calendar_id, Calendar.user_id == current_user.id)
)
calendar = result.scalar_one_or_none()
if not calendar:
raise HTTPException(status_code=404, detail="Calendar not found")
update_data = calendar_update.model_dump(exclude_unset=True)
# System calendars: allow visibility toggle but block name changes
if calendar.is_system and "name" in update_data:
raise HTTPException(status_code=400, detail="Cannot rename system calendars")
for key, value in update_data.items():
setattr(calendar, key, value)
await db.commit()
await db.refresh(calendar)
return calendar
@router.delete("/{calendar_id}", status_code=204)
async def delete_calendar(
calendar_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
result = await db.execute(
select(Calendar).where(Calendar.id == calendar_id, Calendar.user_id == current_user.id)
)
calendar = result.scalar_one_or_none()
if not calendar:
raise HTTPException(status_code=404, detail="Calendar not found")
if calendar.is_system:
raise HTTPException(status_code=400, detail="Cannot delete system calendars")
if calendar.is_default:
raise HTTPException(status_code=400, detail="Cannot delete the default calendar")
# Reassign all events on this calendar to the user's default calendar
default_result = await db.execute(
select(Calendar).where(
Calendar.user_id == current_user.id,
Calendar.is_default == True,
)
)
default_calendar = default_result.scalar_one_or_none()
if default_calendar:
await db.execute(
update(CalendarEvent)
.where(CalendarEvent.calendar_id == calendar_id)
.values(calendar_id=default_calendar.id)
)
await db.delete(calendar)
await db.commit()
return None