Name now uses the Nominatim place/building label (e.g. "The Quadrant") when available, falling back to street address. Address field now contains the full formatted address (house number, road, suburb, city, state, postcode, country) instead of just the city/state portion. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
217 lines
7.0 KiB
Python
217 lines
7.0 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Path, Query
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, or_
|
|
from datetime import datetime, timezone
|
|
from typing import Optional, List
|
|
import asyncio
|
|
import json
|
|
import urllib.request
|
|
import urllib.parse
|
|
import logging
|
|
import re
|
|
|
|
from app.database import get_db
|
|
from app.models.location import Location
|
|
from app.schemas.location import LocationCreate, LocationUpdate, LocationResponse, LocationSearchResult
|
|
from app.routers.auth import get_current_user
|
|
from app.models.user import User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/search", response_model=List[LocationSearchResult])
|
|
async def search_locations(
|
|
q: str = Query(..., min_length=1),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Search locations from local DB and Nominatim OSM."""
|
|
results: List[LocationSearchResult] = []
|
|
|
|
# Local DB search — scoped to user's locations
|
|
local_query = (
|
|
select(Location)
|
|
.where(
|
|
Location.user_id == current_user.id,
|
|
or_(
|
|
Location.name.ilike(f"%{q}%"),
|
|
Location.address.ilike(f"%{q}%"),
|
|
),
|
|
)
|
|
.limit(5)
|
|
)
|
|
local_result = await db.execute(local_query)
|
|
local_locations = local_result.scalars().all()
|
|
|
|
for loc in local_locations:
|
|
results.append(
|
|
LocationSearchResult(
|
|
source="local",
|
|
location_id=loc.id,
|
|
name=loc.name,
|
|
address=loc.address,
|
|
)
|
|
)
|
|
|
|
# Nominatim proxy search (run in thread executor to avoid blocking event loop)
|
|
def _fetch_nominatim() -> list:
|
|
encoded_q = urllib.parse.quote(q)
|
|
url = f"https://nominatim.openstreetmap.org/search?q={encoded_q}&format=json&addressdetails=1&limit=5"
|
|
req = urllib.request.Request(url, headers={"User-Agent": "UMBRA-LifeManager/1.0"})
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
osm_data = await loop.run_in_executor(None, _fetch_nominatim)
|
|
for item in osm_data:
|
|
display_name = item.get("display_name", "")
|
|
addr = item.get("address", {})
|
|
house_number = addr.get("house_number", "")
|
|
road = addr.get("road", "")
|
|
|
|
# If Nominatim didn't return a house_number but the user's
|
|
# query starts with one, preserve it from the original query.
|
|
if not house_number and road:
|
|
m = re.match(r"^(\d+[\w/-]*)\s+", q.strip())
|
|
if m:
|
|
house_number = m.group(1)
|
|
|
|
# Name = place/building label from Nominatim (e.g. "The Quadrant").
|
|
# Falls back to street address if no distinct place name exists.
|
|
osm_name = item.get("name", "")
|
|
street = f"{house_number} {road}" if house_number and road else road
|
|
if osm_name and osm_name != road:
|
|
name = osm_name
|
|
elif street:
|
|
name = street
|
|
else:
|
|
name = display_name.split(",", 1)[0].strip()
|
|
|
|
# Address = full street address with suburb/state/postcode.
|
|
addr_parts = []
|
|
if street:
|
|
addr_parts.append(street)
|
|
for key in ("suburb", "city", "state", "postcode", "country"):
|
|
val = addr.get(key, "")
|
|
if val:
|
|
addr_parts.append(val)
|
|
address = ", ".join(addr_parts) if addr_parts else display_name
|
|
results.append(
|
|
LocationSearchResult(
|
|
source="nominatim",
|
|
name=name,
|
|
address=address,
|
|
)
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Nominatim search failed: {e}")
|
|
|
|
return results
|
|
|
|
|
|
@router.get("/", response_model=List[LocationResponse])
|
|
async def get_locations(
|
|
category: Optional[str] = Query(None),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get all locations with optional category filter."""
|
|
query = select(Location).where(Location.user_id == current_user.id)
|
|
|
|
if category:
|
|
query = query.where(Location.category == category)
|
|
|
|
query = query.order_by(Location.name.asc())
|
|
|
|
result = await db.execute(query)
|
|
locations = result.scalars().all()
|
|
|
|
return locations
|
|
|
|
|
|
@router.post("/", response_model=LocationResponse, status_code=201)
|
|
async def create_location(
|
|
location: LocationCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Create a new location."""
|
|
new_location = Location(**location.model_dump(), user_id=current_user.id)
|
|
db.add(new_location)
|
|
await db.commit()
|
|
await db.refresh(new_location)
|
|
|
|
return new_location
|
|
|
|
|
|
@router.get("/{location_id}", response_model=LocationResponse)
|
|
async def get_location(
|
|
location_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get a specific location by ID."""
|
|
result = await db.execute(
|
|
select(Location).where(Location.id == location_id, Location.user_id == current_user.id)
|
|
)
|
|
location = result.scalar_one_or_none()
|
|
|
|
if not location:
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
|
|
return location
|
|
|
|
|
|
@router.put("/{location_id}", response_model=LocationResponse)
|
|
async def update_location(
|
|
location_id: int = Path(ge=1, le=2147483647),
|
|
location_update: LocationUpdate = ...,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Update a location."""
|
|
result = await db.execute(
|
|
select(Location).where(Location.id == location_id, Location.user_id == current_user.id)
|
|
)
|
|
location = result.scalar_one_or_none()
|
|
|
|
if not location:
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
|
|
update_data = location_update.model_dump(exclude_unset=True)
|
|
|
|
for key, value in update_data.items():
|
|
setattr(location, key, value)
|
|
|
|
# Guarantee timestamp refresh regardless of DB driver behaviour
|
|
location.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
|
|
|
|
await db.commit()
|
|
await db.refresh(location)
|
|
|
|
return location
|
|
|
|
|
|
@router.delete("/{location_id}", status_code=204)
|
|
async def delete_location(
|
|
location_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Delete a location."""
|
|
result = await db.execute(
|
|
select(Location).where(Location.id == location_id, Location.user_id == current_user.id)
|
|
)
|
|
location = result.scalar_one_or_none()
|
|
|
|
if not location:
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
|
|
await db.delete(location)
|
|
await db.commit()
|
|
|
|
return None
|