UMBRA/backend/app/routers/people.py
Kyle Pope cb9f74a387 Entity pages enhancement: backend model extensions, shared components, Locations rebuild, panel animations
- Add migrations 019/020: extend Person (first/last name, nickname, is_favourite, company, job_title, mobile, category) and Location (is_frequent, contact_number, email)
- Update Person/Location models, schemas, and routers with new fields + name denormalisation
- Create shared component library: EntityTable, EntityDetailPanel, CategoryFilterBar, CopyableField, CategoryAutocomplete, useTableVisibility hook
- Rebuild LocationsPage: table layout with sortable columns, detail side panel, category filter bar, frequent pinned section
- Extend LocationForm with contact number, email, frequent toggle, category autocomplete
- Add animated panel transitions to ProjectDetail (55/45 split with cubic-bezier easing)
- Update TypeScript interfaces for Person and Location

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 21:10:26 +08:00

143 lines
4.1 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from datetime import datetime, timezone
from typing import Optional, List
from app.database import get_db
from app.models.person import Person
from app.schemas.person import PersonCreate, PersonUpdate, PersonResponse
from app.routers.auth import get_current_session
from app.models.settings import Settings
router = APIRouter()
def _compute_display_name(
first_name: Optional[str],
last_name: Optional[str],
nickname: Optional[str],
name: Optional[str],
) -> str:
"""Denormalise a display name. Nickname wins; else 'First Last'; else legacy name."""
if nickname:
return nickname
full = ((first_name or '') + ' ' + (last_name or '')).strip()
if full:
return full
return name or ''
@router.get("/", response_model=List[PersonResponse])
async def get_people(
search: Optional[str] = Query(None),
category: Optional[str] = Query(None),
db: AsyncSession = Depends(get_db),
current_user: Settings = Depends(get_current_session)
):
"""Get all people with optional search and category filter."""
query = select(Person)
if search:
query = query.where(Person.name.ilike(f"%{search}%"))
if category:
query = query.where(Person.category == category)
query = query.order_by(Person.name.asc())
result = await db.execute(query)
people = result.scalars().all()
return people
@router.post("/", response_model=PersonResponse, status_code=201)
async def create_person(
person: PersonCreate,
db: AsyncSession = Depends(get_db),
current_user: Settings = Depends(get_current_session)
):
"""Create a new person with denormalised display name."""
new_person = Person(**person.model_dump())
new_person.name = _compute_display_name(
new_person.first_name,
new_person.last_name,
new_person.nickname,
new_person.name,
)
db.add(new_person)
await db.commit()
await db.refresh(new_person)
return new_person
@router.get("/{person_id}", response_model=PersonResponse)
async def get_person(
person_id: int,
db: AsyncSession = Depends(get_db),
current_user: Settings = Depends(get_current_session)
):
"""Get a specific person by ID."""
result = await db.execute(select(Person).where(Person.id == person_id))
person = result.scalar_one_or_none()
if not person:
raise HTTPException(status_code=404, detail="Person not found")
return person
@router.put("/{person_id}", response_model=PersonResponse)
async def update_person(
person_id: int,
person_update: PersonUpdate,
db: AsyncSession = Depends(get_db),
current_user: Settings = Depends(get_current_session)
):
"""Update a person and refresh the denormalised display name."""
result = await db.execute(select(Person).where(Person.id == person_id))
person = result.scalar_one_or_none()
if not person:
raise HTTPException(status_code=404, detail="Person not found")
update_data = person_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(person, key, value)
# Recompute display name after applying updates
person.name = _compute_display_name(
person.first_name,
person.last_name,
person.nickname,
person.name,
)
# Guarantee timestamp refresh regardless of DB driver behaviour
person.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
await db.commit()
await db.refresh(person)
return person
@router.delete("/{person_id}", status_code=204)
async def delete_person(
person_id: int,
db: AsyncSession = Depends(get_db),
current_user: Settings = Depends(get_current_session)
):
"""Delete a person."""
result = await db.execute(select(Person).where(Person.id == person_id))
person = result.scalar_one_or_none()
if not person:
raise HTTPException(status_code=404, detail="Person not found")
await db.delete(person)
await db.commit()
return None