UMBRA/backend/app/routers/locations.py
Kyle Pope d4117818c7 Preserve house number from user query when Nominatim omits it
Many addresses resolve to just the road in Nominatim (no house_number
in response). Now extracts the leading number from the user's original
search query and prepends it to the road name, so "123 Adelaide Terrace"
stays as "123 Adelaide Terrace" instead of just "Adelaide Terrace".

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 14:52:46 +08:00

212 lines
6.9 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Path, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, or_
from datetime import datetime, timezone
from typing import Optional, List
import asyncio
import json
import urllib.request
import urllib.parse
import logging
import re
from app.database import get_db
from app.models.location import Location
from app.schemas.location import LocationCreate, LocationUpdate, LocationResponse, LocationSearchResult
from app.routers.auth import get_current_user
from app.models.user import User
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/search", response_model=List[LocationSearchResult])
async def search_locations(
q: str = Query(..., min_length=1),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Search locations from local DB and Nominatim OSM."""
results: List[LocationSearchResult] = []
# Local DB search — scoped to user's locations
local_query = (
select(Location)
.where(
Location.user_id == current_user.id,
or_(
Location.name.ilike(f"%{q}%"),
Location.address.ilike(f"%{q}%"),
),
)
.limit(5)
)
local_result = await db.execute(local_query)
local_locations = local_result.scalars().all()
for loc in local_locations:
results.append(
LocationSearchResult(
source="local",
location_id=loc.id,
name=loc.name,
address=loc.address,
)
)
# Nominatim proxy search (run in thread executor to avoid blocking event loop)
def _fetch_nominatim() -> list:
encoded_q = urllib.parse.quote(q)
url = f"https://nominatim.openstreetmap.org/search?q={encoded_q}&format=json&addressdetails=1&limit=5"
req = urllib.request.Request(url, headers={"User-Agent": "UMBRA-LifeManager/1.0"})
with urllib.request.urlopen(req, timeout=5) as resp:
return json.loads(resp.read().decode())
try:
loop = asyncio.get_running_loop()
osm_data = await loop.run_in_executor(None, _fetch_nominatim)
for item in osm_data:
display_name = item.get("display_name", "")
addr = item.get("address", {})
house_number = addr.get("house_number", "")
road = addr.get("road", "")
# If Nominatim didn't return a house_number but the user's
# query starts with one, preserve it from the original query.
if not house_number and road:
m = re.match(r"^(\d+[\w/-]*)\s+", q.strip())
if m:
house_number = m.group(1)
# Build a name that preserves the house number
if house_number and road:
name = f"{house_number} {road}"
elif road:
name = road
else:
# Fallback: first comma-separated segment
name = display_name.split(",", 1)[0].strip()
# Address: everything after the street portion
name_prefix = f"{house_number}, " if house_number else ""
road_prefix = f"{road}, " if road else ""
strip_prefix = name_prefix + road_prefix
if strip_prefix and display_name.startswith(strip_prefix):
address = display_name[len(strip_prefix):].strip()
else:
name_parts = display_name.split(",", 1)
address = name_parts[1].strip() if len(name_parts) > 1 else display_name
results.append(
LocationSearchResult(
source="nominatim",
name=name,
address=address,
)
)
except Exception as e:
logger.warning(f"Nominatim search failed: {e}")
return results
@router.get("/", response_model=List[LocationResponse])
async def get_locations(
category: Optional[str] = Query(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get all locations with optional category filter."""
query = select(Location).where(Location.user_id == current_user.id)
if category:
query = query.where(Location.category == category)
query = query.order_by(Location.name.asc())
result = await db.execute(query)
locations = result.scalars().all()
return locations
@router.post("/", response_model=LocationResponse, status_code=201)
async def create_location(
location: LocationCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Create a new location."""
new_location = Location(**location.model_dump(), user_id=current_user.id)
db.add(new_location)
await db.commit()
await db.refresh(new_location)
return new_location
@router.get("/{location_id}", response_model=LocationResponse)
async def get_location(
location_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get a specific location by ID."""
result = await db.execute(
select(Location).where(Location.id == location_id, Location.user_id == current_user.id)
)
location = result.scalar_one_or_none()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
return location
@router.put("/{location_id}", response_model=LocationResponse)
async def update_location(
location_id: int = Path(ge=1, le=2147483647),
location_update: LocationUpdate = ...,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update a location."""
result = await db.execute(
select(Location).where(Location.id == location_id, Location.user_id == current_user.id)
)
location = result.scalar_one_or_none()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
update_data = location_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(location, key, value)
# Guarantee timestamp refresh regardless of DB driver behaviour
location.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
await db.commit()
await db.refresh(location)
return location
@router.delete("/{location_id}", status_code=204)
async def delete_location(
location_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete a location."""
result = await db.execute(
select(Location).where(Location.id == location_id, Location.user_id == current_user.id)
)
location = result.scalar_one_or_none()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
await db.delete(location)
await db.commit()
return None