UMBRA/backend/app/routers/calendars.py
Kyle Pope dd862bfa48 Fix QA review findings: 3 critical, 5 warnings, 1 suggestion
Critical:
- C-01: Populate member_count in GET /calendars for shared calendars
- C-02: Differentiate 423 lock errors in drag-drop onError (show lock-specific toast)
- C-03: Add expired lock purge to APScheduler housekeeping job

Warnings:
- W-01: Replace setattr loop with explicit field assignment in update_member
- W-02: Cap sync `since` param to 7 days to prevent unbounded scans
- W-05: Remove cosmetic isShared toggle (is_shared is auto-managed by invite flow)
- W-06: Populate preferred_name in _build_member_response from user model
- W-07: Add releaseMutation to release callback dependency array

Suggestion:
- S-06: Remove unused ConvertToSharedRequest schema

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 23:41:08 +08:00

139 lines
4.4 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Path
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import func, select, update
from typing import List
from app.database import get_db
from app.models.calendar import Calendar
from app.models.calendar_event import CalendarEvent
from app.models.calendar_member import CalendarMember
from app.schemas.calendar import CalendarCreate, CalendarUpdate, CalendarResponse
from app.routers.auth import get_current_user
from app.models.user import User
router = APIRouter()
@router.get("/", response_model=List[CalendarResponse])
async def get_calendars(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
result = await db.execute(
select(Calendar)
.where(Calendar.user_id == current_user.id)
.order_by(Calendar.is_default.desc(), Calendar.name.asc())
)
calendars = result.scalars().all()
# Populate member_count for shared calendars
cal_ids = [c.id for c in calendars if c.is_shared]
count_map: dict[int, int] = {}
if cal_ids:
counts = await db.execute(
select(CalendarMember.calendar_id, func.count())
.where(
CalendarMember.calendar_id.in_(cal_ids),
CalendarMember.status == "accepted",
)
.group_by(CalendarMember.calendar_id)
)
count_map = dict(counts.all())
return [
CalendarResponse.model_validate(c, from_attributes=True).model_copy(
update={"member_count": count_map.get(c.id, 0)}
)
for c in calendars
]
@router.post("/", response_model=CalendarResponse, status_code=201)
async def create_calendar(
calendar: CalendarCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
new_calendar = Calendar(
name=calendar.name,
color=calendar.color,
is_default=False,
is_system=False,
is_visible=True,
user_id=current_user.id,
)
db.add(new_calendar)
await db.commit()
await db.refresh(new_calendar)
return new_calendar
@router.put("/{calendar_id}", response_model=CalendarResponse)
async def update_calendar(
calendar_id: int = Path(ge=1, le=2147483647),
calendar_update: CalendarUpdate = ...,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
result = await db.execute(
select(Calendar).where(Calendar.id == calendar_id, Calendar.user_id == current_user.id)
)
calendar = result.scalar_one_or_none()
if not calendar:
raise HTTPException(status_code=404, detail="Calendar not found")
update_data = calendar_update.model_dump(exclude_unset=True)
# System calendars: allow visibility toggle but block name changes
if calendar.is_system and "name" in update_data:
raise HTTPException(status_code=400, detail="Cannot rename system calendars")
for key, value in update_data.items():
setattr(calendar, key, value)
await db.commit()
await db.refresh(calendar)
return calendar
@router.delete("/{calendar_id}", status_code=204)
async def delete_calendar(
calendar_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
result = await db.execute(
select(Calendar).where(Calendar.id == calendar_id, Calendar.user_id == current_user.id)
)
calendar = result.scalar_one_or_none()
if not calendar:
raise HTTPException(status_code=404, detail="Calendar not found")
if calendar.is_system:
raise HTTPException(status_code=400, detail="Cannot delete system calendars")
if calendar.is_default:
raise HTTPException(status_code=400, detail="Cannot delete the default calendar")
# Reassign all events on this calendar to the user's default calendar
default_result = await db.execute(
select(Calendar).where(
Calendar.user_id == current_user.id,
Calendar.is_default == True,
)
)
default_calendar = default_result.scalar_one_or_none()
if default_calendar:
await db.execute(
update(CalendarEvent)
.where(CalendarEvent.calendar_id == calendar_id)
.values(calendar_id=default_calendar.id)
)
await db.delete(calendar)
await db.commit()
return None