- Add max_length constraints to all string fields in request schemas, matching DB column limits (title:255, description:5000, etc.) - Add min_length=1 to required name/title fields - Add ConfigDict(extra="forbid") to all request schemas to reject unknown fields (prevents silent field injection) - Add Path(ge=1, le=2147483647) to all integer path parameters across all routers to prevent integer overflow → 500 errors - Add max_length to TOTP inline schemas (code:6, mfa_token:256, etc.) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
188 lines
5.8 KiB
Python
188 lines
5.8 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Path, Query
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, or_
|
|
from datetime import datetime, timezone
|
|
from typing import Optional, List
|
|
import asyncio
|
|
import json
|
|
import urllib.request
|
|
import urllib.parse
|
|
import logging
|
|
|
|
from app.database import get_db
|
|
from app.models.location import Location
|
|
from app.schemas.location import LocationCreate, LocationUpdate, LocationResponse, LocationSearchResult
|
|
from app.routers.auth import get_current_user
|
|
from app.models.user import User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/search", response_model=List[LocationSearchResult])
|
|
async def search_locations(
|
|
q: str = Query(..., min_length=1),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Search locations from local DB and Nominatim OSM."""
|
|
results: List[LocationSearchResult] = []
|
|
|
|
# Local DB search — scoped to user's locations
|
|
local_query = (
|
|
select(Location)
|
|
.where(
|
|
Location.user_id == current_user.id,
|
|
or_(
|
|
Location.name.ilike(f"%{q}%"),
|
|
Location.address.ilike(f"%{q}%"),
|
|
),
|
|
)
|
|
.limit(5)
|
|
)
|
|
local_result = await db.execute(local_query)
|
|
local_locations = local_result.scalars().all()
|
|
|
|
for loc in local_locations:
|
|
results.append(
|
|
LocationSearchResult(
|
|
source="local",
|
|
location_id=loc.id,
|
|
name=loc.name,
|
|
address=loc.address,
|
|
)
|
|
)
|
|
|
|
# Nominatim proxy search (run in thread executor to avoid blocking event loop)
|
|
def _fetch_nominatim() -> list:
|
|
encoded_q = urllib.parse.quote(q)
|
|
url = f"https://nominatim.openstreetmap.org/search?q={encoded_q}&format=json&limit=5"
|
|
req = urllib.request.Request(url, headers={"User-Agent": "UMBRA-LifeManager/1.0"})
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
osm_data = await loop.run_in_executor(None, _fetch_nominatim)
|
|
for item in osm_data:
|
|
display_name = item.get("display_name", "")
|
|
name_parts = display_name.split(",", 1)
|
|
name = name_parts[0].strip()
|
|
address = name_parts[1].strip() if len(name_parts) > 1 else display_name
|
|
results.append(
|
|
LocationSearchResult(
|
|
source="nominatim",
|
|
name=name,
|
|
address=address,
|
|
)
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Nominatim search failed: {e}")
|
|
|
|
return results
|
|
|
|
|
|
@router.get("/", response_model=List[LocationResponse])
|
|
async def get_locations(
|
|
category: Optional[str] = Query(None),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get all locations with optional category filter."""
|
|
query = select(Location).where(Location.user_id == current_user.id)
|
|
|
|
if category:
|
|
query = query.where(Location.category == category)
|
|
|
|
query = query.order_by(Location.name.asc())
|
|
|
|
result = await db.execute(query)
|
|
locations = result.scalars().all()
|
|
|
|
return locations
|
|
|
|
|
|
@router.post("/", response_model=LocationResponse, status_code=201)
|
|
async def create_location(
|
|
location: LocationCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Create a new location."""
|
|
new_location = Location(**location.model_dump(), user_id=current_user.id)
|
|
db.add(new_location)
|
|
await db.commit()
|
|
await db.refresh(new_location)
|
|
|
|
return new_location
|
|
|
|
|
|
@router.get("/{location_id}", response_model=LocationResponse)
|
|
async def get_location(
|
|
location_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get a specific location by ID."""
|
|
result = await db.execute(
|
|
select(Location).where(Location.id == location_id, Location.user_id == current_user.id)
|
|
)
|
|
location = result.scalar_one_or_none()
|
|
|
|
if not location:
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
|
|
return location
|
|
|
|
|
|
@router.put("/{location_id}", response_model=LocationResponse)
|
|
async def update_location(
|
|
location_id: int = Path(ge=1, le=2147483647),
|
|
location_update: LocationUpdate = ...,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Update a location."""
|
|
result = await db.execute(
|
|
select(Location).where(Location.id == location_id, Location.user_id == current_user.id)
|
|
)
|
|
location = result.scalar_one_or_none()
|
|
|
|
if not location:
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
|
|
update_data = location_update.model_dump(exclude_unset=True)
|
|
|
|
for key, value in update_data.items():
|
|
setattr(location, key, value)
|
|
|
|
# Guarantee timestamp refresh regardless of DB driver behaviour
|
|
location.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
|
|
|
|
await db.commit()
|
|
await db.refresh(location)
|
|
|
|
return location
|
|
|
|
|
|
@router.delete("/{location_id}", status_code=204)
|
|
async def delete_location(
|
|
location_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Delete a location."""
|
|
result = await db.execute(
|
|
select(Location).where(Location.id == location_id, Location.user_id == current_user.id)
|
|
)
|
|
location = result.scalar_one_or_none()
|
|
|
|
if not location:
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
|
|
await db.delete(location)
|
|
await db.commit()
|
|
|
|
return None
|