""" Connection router — search, request, respond, manage connections. Security: - Timing-safe search (50ms sleep floor) - Per-receiver pending request cap (5 within 10 minutes) - Atomic accept via UPDATE...WHERE status='pending' RETURNING * - All endpoints scoped by current_user.id - Audit logging for all connection events """ import asyncio from datetime import datetime, timedelta, timezone from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Path, Query, Request from sqlalchemy import select, func, and_, update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.database import get_db from app.models.connection_request import ConnectionRequest from app.models.notification import Notification from app.models.person import Person from app.models.settings import Settings from app.models.user import User from app.models.user_connection import UserConnection from app.routers.auth import get_current_user from app.schemas.connection import ( ConnectionRequestResponse, ConnectionResponse, RespondRequest, SendConnectionRequest, SharingOverrideUpdate, UmbralSearchRequest, UmbralSearchResponse, ) from app.services.audit import get_client_ip, log_audit_event from app.services.connection import ( SHAREABLE_FIELDS, create_person_from_connection, detach_umbral_contact, resolve_shared_profile, send_connection_ntfy, ) from app.services.notification import create_notification router = APIRouter() # ── Helpers ────────────────────────────────────────────────────────── async def _get_settings_for_user(db: AsyncSession, user_id: int) -> Settings | None: result = await db.execute(select(Settings).where(Settings.user_id == user_id)) return result.scalar_one_or_none() def _build_request_response( req: ConnectionRequest, sender: User, sender_settings: Settings | None, receiver: User, receiver_settings: Settings | None, ) -> ConnectionRequestResponse: return ConnectionRequestResponse( id=req.id, sender_umbral_name=sender.umbral_name, sender_preferred_name=sender_settings.preferred_name if sender_settings else None, receiver_umbral_name=receiver.umbral_name, receiver_preferred_name=receiver_settings.preferred_name if receiver_settings else None, status=req.status, created_at=req.created_at, ) # ── POST /search ──────────────────────────────────────────────────── @router.post("/search", response_model=UmbralSearchResponse) async def search_user( body: UmbralSearchRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Timing-safe user search. Always queries by umbral_name alone, then checks accept_connections + is_active in Python. Generic "not found" for non-existent, opted-out, AND inactive users. 50ms sleep floor to eliminate timing side-channel. """ # Always sleep to prevent timing attacks await asyncio.sleep(0.05) # Sender must have accept_connections enabled to search sender_settings = await _get_settings_for_user(db, current_user.id) if not sender_settings or not sender_settings.accept_connections: return UmbralSearchResponse(found=False) # Don't find yourself if body.umbral_name == current_user.umbral_name: return UmbralSearchResponse(found=False) result = await db.execute( select(User).where(User.umbral_name == body.umbral_name) ) target = result.scalar_one_or_none() if not target or not target.is_active: return UmbralSearchResponse(found=False) # Check if they accept connections target_settings = await _get_settings_for_user(db, target.id) if not target_settings or not target_settings.accept_connections: return UmbralSearchResponse(found=False) return UmbralSearchResponse(found=True) # ── POST /request ─────────────────────────────────────────────────── @router.post("/request", response_model=ConnectionRequestResponse, status_code=201) async def send_connection_request( body: SendConnectionRequest, request: Request, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Send a connection request to another user.""" # Resolve target result = await db.execute( select(User).where(User.umbral_name == body.umbral_name) ) target = result.scalar_one_or_none() if not target or not target.is_active: raise HTTPException(status_code=404, detail="User not found") # Self-request guard if target.id == current_user.id: raise HTTPException(status_code=400, detail="Cannot send a connection request to yourself") # Sender must have accept_connections enabled to participate sender_settings = await _get_settings_for_user(db, current_user.id) if not sender_settings or not sender_settings.accept_connections: raise HTTPException( status_code=403, detail="You must enable 'Accept Connections' in your settings before sending requests", ) # Check accept_connections on target target_settings = await _get_settings_for_user(db, target.id) if not target_settings or not target_settings.accept_connections: raise HTTPException(status_code=404, detail="User not found") # Check existing connection existing_conn = await db.execute( select(UserConnection).where( UserConnection.user_id == current_user.id, UserConnection.connected_user_id == target.id, ) ) if existing_conn.scalar_one_or_none(): raise HTTPException(status_code=409, detail="Already connected") # Check pending request in either direction existing_req = await db.execute( select(ConnectionRequest).where( and_( ConnectionRequest.status == "pending", ( (ConnectionRequest.sender_id == current_user.id) & (ConnectionRequest.receiver_id == target.id) ) | ( (ConnectionRequest.sender_id == target.id) & (ConnectionRequest.receiver_id == current_user.id) ), ) ) ) if existing_req.scalar_one_or_none(): raise HTTPException(status_code=409, detail="A pending request already exists") # Per-receiver cap: max 5 pending requests within 10 minutes ten_min_ago = datetime.now() - timedelta(minutes=10) pending_count = await db.scalar( select(func.count()) .select_from(ConnectionRequest) .where( ConnectionRequest.receiver_id == target.id, ConnectionRequest.status == "pending", ConnectionRequest.created_at >= ten_min_ago, ) ) or 0 if pending_count >= 5: raise HTTPException(status_code=429, detail="Too many pending requests for this user") # Create the request conn_request = ConnectionRequest( sender_id=current_user.id, receiver_id=target.id, ) db.add(conn_request) await db.flush() # populate conn_request.id for source_id # Create in-app notification for receiver (sender_settings already fetched above) sender_display = (sender_settings.preferred_name if sender_settings else None) or current_user.umbral_name await create_notification( db, user_id=target.id, type="connection_request", title="New Connection Request", message=f"{sender_display} wants to connect with you", data={"sender_umbral_name": current_user.umbral_name}, source_type="connection_request", source_id=conn_request.id, ) await log_audit_event( db, action="connection.request_sent", actor_id=current_user.id, target_id=target.id, detail={"receiver_umbral_name": target.umbral_name}, ip=get_client_ip(request), ) await db.commit() await db.refresh(conn_request) # ntfy push in background (non-blocking) background_tasks.add_task( send_connection_ntfy, target_settings, sender_display, "request_received", ) return _build_request_response(conn_request, current_user, sender_settings, target, target_settings) # ── GET /requests/incoming ────────────────────────────────────────── @router.get("/requests/incoming", response_model=list[ConnectionRequestResponse]) async def get_incoming_requests( page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """List pending connection requests received by the current user.""" offset = (page - 1) * per_page result = await db.execute( select(ConnectionRequest) .where( ConnectionRequest.receiver_id == current_user.id, ConnectionRequest.status == "pending", ) .options(selectinload(ConnectionRequest.sender)) .order_by(ConnectionRequest.created_at.desc()) .offset(offset) .limit(per_page) ) requests = result.scalars().all() # Fetch current user's settings once, batch-fetch sender settings receiver_settings = await _get_settings_for_user(db, current_user.id) sender_ids = [req.sender_id for req in requests] if sender_ids: settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(sender_ids))) settings_by_user = {s.user_id: s for s in settings_result.scalars().all()} else: settings_by_user = {} responses = [] for req in requests: sender_settings = settings_by_user.get(req.sender_id) responses.append(_build_request_response(req, req.sender, sender_settings, current_user, receiver_settings)) return responses # ── GET /requests/outgoing ────────────────────────────────────────── @router.get("/requests/outgoing", response_model=list[ConnectionRequestResponse]) async def get_outgoing_requests( page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """List pending connection requests sent by the current user.""" offset = (page - 1) * per_page result = await db.execute( select(ConnectionRequest) .where( ConnectionRequest.sender_id == current_user.id, ConnectionRequest.status == "pending", ) .options(selectinload(ConnectionRequest.receiver)) .order_by(ConnectionRequest.created_at.desc()) .offset(offset) .limit(per_page) ) requests = result.scalars().all() # Fetch current user's settings once, batch-fetch receiver settings sender_settings = await _get_settings_for_user(db, current_user.id) receiver_ids = [req.receiver_id for req in requests] if receiver_ids: settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(receiver_ids))) settings_by_user = {s.user_id: s for s in settings_result.scalars().all()} else: settings_by_user = {} responses = [] for req in requests: receiver_settings = settings_by_user.get(req.receiver_id) responses.append(_build_request_response(req, current_user, sender_settings, req.receiver, receiver_settings)) return responses # ── PUT /requests/{id}/respond ────────────────────────────────────── @router.put("/requests/{request_id}/respond") async def respond_to_request( body: RespondRequest, request: Request, background_tasks: BackgroundTasks, request_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Accept or reject a connection request. Atomic via UPDATE...WHERE status='pending'.""" now = datetime.now() # Atomic update — only succeeds if status is still 'pending' and receiver is current user result = await db.execute( update(ConnectionRequest) .where( ConnectionRequest.id == request_id, ConnectionRequest.receiver_id == current_user.id, ConnectionRequest.status == "pending", ) .values(status=body.action + "ed", resolved_at=now) .returning(ConnectionRequest.id, ConnectionRequest.sender_id, ConnectionRequest.receiver_id) ) row = result.first() if not row: raise HTTPException(status_code=409, detail="Request not found or already resolved") sender_id = row.sender_id if body.action == "accept": # Verify sender is still active sender_result = await db.execute(select(User).where(User.id == sender_id)) sender = sender_result.scalar_one_or_none() if not sender or not sender.is_active: # Revert to rejected await db.execute( update(ConnectionRequest) .where(ConnectionRequest.id == request_id) .values(status="rejected") ) await db.commit() raise HTTPException(status_code=409, detail="Sender account is no longer active") # Get settings for both users sender_settings = await _get_settings_for_user(db, sender_id) receiver_settings = await _get_settings_for_user(db, current_user.id) # Resolve shared profiles for both directions sender_shared = resolve_shared_profile(sender, sender_settings, None) if sender_settings else {} receiver_shared = resolve_shared_profile(current_user, receiver_settings, None) if receiver_settings else {} # Create Person records for both users person_for_receiver = create_person_from_connection( current_user.id, sender, sender_settings, sender_shared ) person_for_sender = create_person_from_connection( sender_id, current_user, receiver_settings, receiver_shared ) db.add(person_for_receiver) db.add(person_for_sender) await db.flush() # populate person IDs # Create bidirectional connections conn_a = UserConnection( user_id=current_user.id, connected_user_id=sender_id, person_id=person_for_receiver.id, ) conn_b = UserConnection( user_id=sender_id, connected_user_id=current_user.id, person_id=person_for_sender.id, ) db.add(conn_a) db.add(conn_b) await db.flush() # populate conn_a.id for source_id # Notification to sender receiver_display = (receiver_settings.preferred_name if receiver_settings else None) or current_user.umbral_name await create_notification( db, user_id=sender_id, type="connection_accepted", title="Connection Accepted", message=f"{receiver_display} accepted your connection request", data={"connected_umbral_name": current_user.umbral_name}, source_type="user_connection", source_id=conn_b.id, ) await log_audit_event( db, action="connection.accepted", actor_id=current_user.id, target_id=sender_id, detail={"request_id": request_id}, ip=get_client_ip(request), ) await db.commit() # ntfy push in background if sender_settings: background_tasks.add_task( send_connection_ntfy, sender_settings, receiver_display, "request_accepted", ) return {"message": "Connection accepted", "connection_id": conn_a.id} else: # Reject — only create notification for receiver (not sender per plan) await log_audit_event( db, action="connection.rejected", actor_id=current_user.id, target_id=sender_id, detail={"request_id": request_id}, ip=get_client_ip(request), ) await db.commit() return {"message": "Connection request rejected"} # ── GET / ─────────────────────────────────────────────────────────── @router.get("/", response_model=list[ConnectionResponse]) async def list_connections( page: int = Query(1, ge=1), per_page: int = Query(50, ge=1, le=100), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """List all connections for the current user.""" offset = (page - 1) * per_page result = await db.execute( select(UserConnection) .where(UserConnection.user_id == current_user.id) .options(selectinload(UserConnection.connected_user)) .order_by(UserConnection.created_at.desc()) .offset(offset) .limit(per_page) ) connections = result.scalars().all() # Batch-fetch settings for connected users connected_ids = [conn.connected_user_id for conn in connections] if connected_ids: settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(connected_ids))) settings_by_user = {s.user_id: s for s in settings_result.scalars().all()} else: settings_by_user = {} responses = [] for conn in connections: conn_settings = settings_by_user.get(conn.connected_user_id) responses.append(ConnectionResponse( id=conn.id, connected_user_id=conn.connected_user_id, connected_umbral_name=conn.connected_user.umbral_name, connected_preferred_name=conn_settings.preferred_name if conn_settings else None, person_id=conn.person_id, created_at=conn.created_at, )) return responses # ── GET /{id} ─────────────────────────────────────────────────────── @router.get("/{connection_id}", response_model=ConnectionResponse) async def get_connection( connection_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get a single connection detail.""" result = await db.execute( select(UserConnection) .where( UserConnection.id == connection_id, UserConnection.user_id == current_user.id, ) .options(selectinload(UserConnection.connected_user)) ) conn = result.scalar_one_or_none() if not conn: raise HTTPException(status_code=404, detail="Connection not found") conn_settings = await _get_settings_for_user(db, conn.connected_user_id) return ConnectionResponse( id=conn.id, connected_user_id=conn.connected_user_id, connected_umbral_name=conn.connected_user.umbral_name, connected_preferred_name=conn_settings.preferred_name if conn_settings else None, person_id=conn.person_id, created_at=conn.created_at, ) # ── GET /{id}/shared-profile ──────────────────────────────────────── @router.get("/{connection_id}/shared-profile") async def get_shared_profile( connection_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get the resolved shared profile for a connection.""" result = await db.execute( select(UserConnection) .where( UserConnection.id == connection_id, UserConnection.user_id == current_user.id, ) .options(selectinload(UserConnection.connected_user)) ) conn = result.scalar_one_or_none() if not conn: raise HTTPException(status_code=404, detail="Connection not found") conn_settings = await _get_settings_for_user(db, conn.connected_user_id) if not conn_settings: return {} return resolve_shared_profile( conn.connected_user, conn_settings, conn.sharing_overrides, ) # ── PUT /{id}/sharing-overrides ───────────────────────────────────── @router.put("/{connection_id}/sharing-overrides") async def update_sharing_overrides( body: SharingOverrideUpdate, connection_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Update what YOU share with a specific connection.""" # Get our connection to know who the counterpart is our_conn = await db.execute( select(UserConnection).where( UserConnection.id == connection_id, UserConnection.user_id == current_user.id, ) ) conn = our_conn.scalar_one_or_none() if not conn: raise HTTPException(status_code=404, detail="Connection not found") # Find the reverse connection (their row pointing to us) reverse_result = await db.execute( select(UserConnection).where( UserConnection.user_id == conn.connected_user_id, UserConnection.connected_user_id == current_user.id, ) ) reverse_conn = reverse_result.scalar_one_or_none() if not reverse_conn: raise HTTPException(status_code=404, detail="Reverse connection not found") # Merge validated overrides — only SHAREABLE_FIELDS keys existing = dict(reverse_conn.sharing_overrides or {}) update_data = body.model_dump(exclude_unset=True) for key, value in update_data.items(): if key in SHAREABLE_FIELDS: if value is None: existing.pop(key, None) else: existing[key] = value reverse_conn.sharing_overrides = existing if existing else None await db.commit() return {"message": "Sharing overrides updated"} # ── DELETE /{id} ──────────────────────────────────────────────────── @router.delete("/{connection_id}", status_code=204) async def remove_connection( request: Request, connection_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Remove a connection. Removes BOTH UserConnection rows. Detaches BOTH Person records (sets linked_user_id=null, is_umbral_contact=false). Silent — no notification sent. """ # Get our connection result = await db.execute( select(UserConnection) .where( UserConnection.id == connection_id, UserConnection.user_id == current_user.id, ) ) conn = result.scalar_one_or_none() if not conn: raise HTTPException(status_code=404, detail="Connection not found") counterpart_id = conn.connected_user_id # Find reverse connection reverse_result = await db.execute( select(UserConnection).where( UserConnection.user_id == counterpart_id, UserConnection.connected_user_id == current_user.id, ) ) reverse_conn = reverse_result.scalar_one_or_none() # Detach Person records if conn.person_id: person_result = await db.execute(select(Person).where(Person.id == conn.person_id)) person = person_result.scalar_one_or_none() if person: await detach_umbral_contact(person) if reverse_conn and reverse_conn.person_id: person_result = await db.execute(select(Person).where(Person.id == reverse_conn.person_id)) person = person_result.scalar_one_or_none() if person: await detach_umbral_contact(person) # Delete both connections await db.delete(conn) if reverse_conn: await db.delete(reverse_conn) await log_audit_event( db, action="connection.removed", actor_id=current_user.id, target_id=counterpart_id, detail={"connection_id": connection_id}, ip=get_client_ip(request), ) await db.commit() return None