UMBRA/backend/app/routers/locations.py
Kyle Pope 80418172db Rework Nominatim results: name as label, address as full street address
Name now uses the Nominatim place/building label (e.g. "The Quadrant")
when available, falling back to street address. Address field now
contains the full formatted address (house number, road, suburb, city,
state, postcode, country) instead of just the city/state portion.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 15:58:20 +08:00

217 lines
7.0 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Path, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, or_
from datetime import datetime, timezone
from typing import Optional, List
import asyncio
import json
import urllib.request
import urllib.parse
import logging
import re
from app.database import get_db
from app.models.location import Location
from app.schemas.location import LocationCreate, LocationUpdate, LocationResponse, LocationSearchResult
from app.routers.auth import get_current_user
from app.models.user import User
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/search", response_model=List[LocationSearchResult])
async def search_locations(
q: str = Query(..., min_length=1),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Search locations from local DB and Nominatim OSM."""
results: List[LocationSearchResult] = []
# Local DB search — scoped to user's locations
local_query = (
select(Location)
.where(
Location.user_id == current_user.id,
or_(
Location.name.ilike(f"%{q}%"),
Location.address.ilike(f"%{q}%"),
),
)
.limit(5)
)
local_result = await db.execute(local_query)
local_locations = local_result.scalars().all()
for loc in local_locations:
results.append(
LocationSearchResult(
source="local",
location_id=loc.id,
name=loc.name,
address=loc.address,
)
)
# Nominatim proxy search (run in thread executor to avoid blocking event loop)
def _fetch_nominatim() -> list:
encoded_q = urllib.parse.quote(q)
url = f"https://nominatim.openstreetmap.org/search?q={encoded_q}&format=json&addressdetails=1&limit=5"
req = urllib.request.Request(url, headers={"User-Agent": "UMBRA-LifeManager/1.0"})
with urllib.request.urlopen(req, timeout=5) as resp:
return json.loads(resp.read().decode())
try:
loop = asyncio.get_running_loop()
osm_data = await loop.run_in_executor(None, _fetch_nominatim)
for item in osm_data:
display_name = item.get("display_name", "")
addr = item.get("address", {})
house_number = addr.get("house_number", "")
road = addr.get("road", "")
# If Nominatim didn't return a house_number but the user's
# query starts with one, preserve it from the original query.
if not house_number and road:
m = re.match(r"^(\d+[\w/-]*)\s+", q.strip())
if m:
house_number = m.group(1)
# Name = place/building label from Nominatim (e.g. "The Quadrant").
# Falls back to street address if no distinct place name exists.
osm_name = item.get("name", "")
street = f"{house_number} {road}" if house_number and road else road
if osm_name and osm_name != road:
name = osm_name
elif street:
name = street
else:
name = display_name.split(",", 1)[0].strip()
# Address = full street address with suburb/state/postcode.
addr_parts = []
if street:
addr_parts.append(street)
for key in ("suburb", "city", "state", "postcode", "country"):
val = addr.get(key, "")
if val:
addr_parts.append(val)
address = ", ".join(addr_parts) if addr_parts else display_name
results.append(
LocationSearchResult(
source="nominatim",
name=name,
address=address,
)
)
except Exception as e:
logger.warning(f"Nominatim search failed: {e}")
return results
@router.get("/", response_model=List[LocationResponse])
async def get_locations(
category: Optional[str] = Query(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get all locations with optional category filter."""
query = select(Location).where(Location.user_id == current_user.id)
if category:
query = query.where(Location.category == category)
query = query.order_by(Location.name.asc())
result = await db.execute(query)
locations = result.scalars().all()
return locations
@router.post("/", response_model=LocationResponse, status_code=201)
async def create_location(
location: LocationCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Create a new location."""
new_location = Location(**location.model_dump(), user_id=current_user.id)
db.add(new_location)
await db.commit()
await db.refresh(new_location)
return new_location
@router.get("/{location_id}", response_model=LocationResponse)
async def get_location(
location_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get a specific location by ID."""
result = await db.execute(
select(Location).where(Location.id == location_id, Location.user_id == current_user.id)
)
location = result.scalar_one_or_none()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
return location
@router.put("/{location_id}", response_model=LocationResponse)
async def update_location(
location_id: int = Path(ge=1, le=2147483647),
location_update: LocationUpdate = ...,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update a location."""
result = await db.execute(
select(Location).where(Location.id == location_id, Location.user_id == current_user.id)
)
location = result.scalar_one_or_none()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
update_data = location_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(location, key, value)
# Guarantee timestamp refresh regardless of DB driver behaviour
location.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
await db.commit()
await db.refresh(location)
return location
@router.delete("/{location_id}", status_code=204)
async def delete_location(
location_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete a location."""
result = await db.execute(
select(Location).where(Location.id == location_id, Location.user_id == current_user.id)
)
location = result.scalar_one_or_none()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
await db.delete(location)
await db.commit()
return None