""" Connection router — search, request, respond, manage connections. Security: - Timing-safe search (50ms sleep floor) - Per-receiver pending request cap (5 within 10 minutes) - Atomic accept via UPDATE...WHERE status='pending' RETURNING * - All endpoints scoped by current_user.id - Audit logging for all connection events """ import asyncio from datetime import datetime, timedelta, timezone from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Path, Query, Request from sqlalchemy import select, func, and_, update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.database import get_db from app.models.connection_request import ConnectionRequest from app.models.notification import Notification from app.models.person import Person from app.models.settings import Settings from app.models.user import User from app.models.user_connection import UserConnection from app.routers.auth import get_current_user from app.schemas.connection import ( ConnectionRequestResponse, ConnectionResponse, RespondRequest, SendConnectionRequest, SharingOverrideUpdate, UmbralSearchRequest, UmbralSearchResponse, ) from app.services.audit import get_client_ip, log_audit_event from app.services.connection import ( SHAREABLE_FIELDS, create_person_from_connection, detach_umbral_contact, resolve_shared_profile, send_connection_ntfy, ) from app.services.notification import create_notification router = APIRouter() # ── Helpers ────────────────────────────────────────────────────────── async def _get_settings_for_user(db: AsyncSession, user_id: int) -> Settings | None: result = await db.execute(select(Settings).where(Settings.user_id == user_id)) return result.scalar_one_or_none() def _build_request_response( req: ConnectionRequest, sender: User, sender_settings: Settings | None, receiver: User, receiver_settings: Settings | None, ) -> ConnectionRequestResponse: return ConnectionRequestResponse( id=req.id, sender_umbral_name=sender.umbral_name, sender_preferred_name=sender_settings.preferred_name if sender_settings else None, receiver_umbral_name=receiver.umbral_name, receiver_preferred_name=receiver_settings.preferred_name if receiver_settings else None, status=req.status, created_at=req.created_at, ) # ── POST /search ──────────────────────────────────────────────────── @router.post("/search", response_model=UmbralSearchResponse) async def search_user( body: UmbralSearchRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Timing-safe user search. Always queries by umbral_name alone, then checks accept_connections + is_active in Python. Generic "not found" for non-existent, opted-out, AND inactive users. 50ms sleep floor to eliminate timing side-channel. """ # Always sleep to prevent timing attacks await asyncio.sleep(0.05) # Don't find yourself if body.umbral_name == current_user.umbral_name: return UmbralSearchResponse(found=False) result = await db.execute( select(User).where(User.umbral_name == body.umbral_name) ) target = result.scalar_one_or_none() if not target or not target.is_active: return UmbralSearchResponse(found=False) # Check if they accept connections target_settings = await _get_settings_for_user(db, target.id) if not target_settings or not target_settings.accept_connections: return UmbralSearchResponse(found=False) return UmbralSearchResponse(found=True) # ── POST /request ─────────────────────────────────────────────────── @router.post("/request", response_model=ConnectionRequestResponse, status_code=201) async def send_connection_request( body: SendConnectionRequest, request: Request, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Send a connection request to another user.""" # Resolve target result = await db.execute( select(User).where(User.umbral_name == body.umbral_name) ) target = result.scalar_one_or_none() if not target or not target.is_active: raise HTTPException(status_code=404, detail="User not found") # Self-request guard if target.id == current_user.id: raise HTTPException(status_code=400, detail="Cannot send a connection request to yourself") # Check accept_connections target_settings = await _get_settings_for_user(db, target.id) if not target_settings or not target_settings.accept_connections: raise HTTPException(status_code=404, detail="User not found") # Check existing connection existing_conn = await db.execute( select(UserConnection).where( UserConnection.user_id == current_user.id, UserConnection.connected_user_id == target.id, ) ) if existing_conn.scalar_one_or_none(): raise HTTPException(status_code=409, detail="Already connected") # Check pending request in either direction existing_req = await db.execute( select(ConnectionRequest).where( and_( ConnectionRequest.status == "pending", ( (ConnectionRequest.sender_id == current_user.id) & (ConnectionRequest.receiver_id == target.id) ) | ( (ConnectionRequest.sender_id == target.id) & (ConnectionRequest.receiver_id == current_user.id) ), ) ) ) if existing_req.scalar_one_or_none(): raise HTTPException(status_code=409, detail="A pending request already exists") # Per-receiver cap: max 5 pending requests within 10 minutes ten_min_ago = datetime.now() - timedelta(minutes=10) pending_count = await db.scalar( select(func.count()) .select_from(ConnectionRequest) .where( ConnectionRequest.receiver_id == target.id, ConnectionRequest.status == "pending", ConnectionRequest.created_at >= ten_min_ago, ) ) or 0 if pending_count >= 5: raise HTTPException(status_code=429, detail="Too many pending requests for this user") # Create the request conn_request = ConnectionRequest( sender_id=current_user.id, receiver_id=target.id, ) db.add(conn_request) # Create in-app notification for receiver sender_settings = await _get_settings_for_user(db, current_user.id) sender_display = (sender_settings.preferred_name if sender_settings else None) or current_user.umbral_name await create_notification( db, user_id=target.id, type="connection_request", title="New Connection Request", message=f"{sender_display} wants to connect with you", data={"sender_umbral_name": current_user.umbral_name}, source_type="connection_request", source_id=None, # Will be set after flush ) await log_audit_event( db, action="connection.request_sent", actor_id=current_user.id, target_id=target.id, detail={"receiver_umbral_name": target.umbral_name}, ip=get_client_ip(request), ) await db.commit() await db.refresh(conn_request) # ntfy push in background (non-blocking) background_tasks.add_task( send_connection_ntfy, target_settings, sender_display, "request_received", ) return _build_request_response(conn_request, current_user, sender_settings, target, target_settings) # ── GET /requests/incoming ────────────────────────────────────────── @router.get("/requests/incoming", response_model=list[ConnectionRequestResponse]) async def get_incoming_requests( page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """List pending connection requests received by the current user.""" offset = (page - 1) * per_page result = await db.execute( select(ConnectionRequest) .where( ConnectionRequest.receiver_id == current_user.id, ConnectionRequest.status == "pending", ) .options(selectinload(ConnectionRequest.sender)) .order_by(ConnectionRequest.created_at.desc()) .offset(offset) .limit(per_page) ) requests = result.scalars().all() responses = [] for req in requests: sender_settings = await _get_settings_for_user(db, req.sender_id) receiver_settings = await _get_settings_for_user(db, current_user.id) responses.append(_build_request_response(req, req.sender, sender_settings, current_user, receiver_settings)) return responses # ── GET /requests/outgoing ────────────────────────────────────────── @router.get("/requests/outgoing", response_model=list[ConnectionRequestResponse]) async def get_outgoing_requests( page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """List pending connection requests sent by the current user.""" offset = (page - 1) * per_page result = await db.execute( select(ConnectionRequest) .where( ConnectionRequest.sender_id == current_user.id, ConnectionRequest.status == "pending", ) .options(selectinload(ConnectionRequest.receiver)) .order_by(ConnectionRequest.created_at.desc()) .offset(offset) .limit(per_page) ) requests = result.scalars().all() responses = [] for req in requests: sender_settings = await _get_settings_for_user(db, current_user.id) receiver_settings = await _get_settings_for_user(db, req.receiver_id) responses.append(_build_request_response(req, current_user, sender_settings, req.receiver, receiver_settings)) return responses # ── PUT /requests/{id}/respond ────────────────────────────────────── @router.put("/requests/{request_id}/respond") async def respond_to_request( body: RespondRequest, request: Request, background_tasks: BackgroundTasks, request_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Accept or reject a connection request. Atomic via UPDATE...WHERE status='pending'.""" now = datetime.now() # Atomic update — only succeeds if status is still 'pending' and receiver is current user result = await db.execute( update(ConnectionRequest) .where( ConnectionRequest.id == request_id, ConnectionRequest.receiver_id == current_user.id, ConnectionRequest.status == "pending", ) .values(status=body.action + "ed", resolved_at=now) .returning(ConnectionRequest.id, ConnectionRequest.sender_id, ConnectionRequest.receiver_id) ) row = result.first() if not row: raise HTTPException(status_code=409, detail="Request not found or already resolved") sender_id = row.sender_id if body.action == "accept": # Verify sender is still active sender_result = await db.execute(select(User).where(User.id == sender_id)) sender = sender_result.scalar_one_or_none() if not sender or not sender.is_active: # Revert to rejected await db.execute( update(ConnectionRequest) .where(ConnectionRequest.id == request_id) .values(status="rejected") ) await db.commit() raise HTTPException(status_code=409, detail="Sender account is no longer active") # Get settings for both users sender_settings = await _get_settings_for_user(db, sender_id) receiver_settings = await _get_settings_for_user(db, current_user.id) # Resolve shared profiles for both directions sender_shared = resolve_shared_profile(sender, sender_settings, None) if sender_settings else {} receiver_shared = resolve_shared_profile(current_user, receiver_settings, None) if receiver_settings else {} # Create Person records for both users person_for_receiver = create_person_from_connection( current_user.id, sender, sender_settings, sender_shared ) person_for_sender = create_person_from_connection( sender_id, current_user, receiver_settings, receiver_shared ) db.add(person_for_receiver) db.add(person_for_sender) await db.flush() # populate person IDs # Create bidirectional connections conn_a = UserConnection( user_id=current_user.id, connected_user_id=sender_id, person_id=person_for_receiver.id, ) conn_b = UserConnection( user_id=sender_id, connected_user_id=current_user.id, person_id=person_for_sender.id, ) db.add(conn_a) db.add(conn_b) # Notification to sender receiver_display = (receiver_settings.preferred_name if receiver_settings else None) or current_user.umbral_name await create_notification( db, user_id=sender_id, type="connection_accepted", title="Connection Accepted", message=f"{receiver_display} accepted your connection request", data={"connected_umbral_name": current_user.umbral_name}, source_type="user_connection", source_id=None, ) await log_audit_event( db, action="connection.accepted", actor_id=current_user.id, target_id=sender_id, detail={"request_id": request_id}, ip=get_client_ip(request), ) await db.commit() # ntfy push in background if sender_settings: background_tasks.add_task( send_connection_ntfy, sender_settings, receiver_display, "request_accepted", ) return {"message": "Connection accepted", "connection_id": conn_a.id} else: # Reject — only create notification for receiver (not sender per plan) await log_audit_event( db, action="connection.rejected", actor_id=current_user.id, target_id=sender_id, detail={"request_id": request_id}, ip=get_client_ip(request), ) await db.commit() return {"message": "Connection request rejected"} # ── GET / ─────────────────────────────────────────────────────────── @router.get("/", response_model=list[ConnectionResponse]) async def list_connections( page: int = Query(1, ge=1), per_page: int = Query(50, ge=1, le=100), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """List all connections for the current user.""" offset = (page - 1) * per_page result = await db.execute( select(UserConnection) .where(UserConnection.user_id == current_user.id) .options(selectinload(UserConnection.connected_user)) .order_by(UserConnection.created_at.desc()) .offset(offset) .limit(per_page) ) connections = result.scalars().all() responses = [] for conn in connections: conn_settings = await _get_settings_for_user(db, conn.connected_user_id) responses.append(ConnectionResponse( id=conn.id, connected_user_id=conn.connected_user_id, connected_umbral_name=conn.connected_user.umbral_name, connected_preferred_name=conn_settings.preferred_name if conn_settings else None, person_id=conn.person_id, created_at=conn.created_at, )) return responses # ── GET /{id} ─────────────────────────────────────────────────────── @router.get("/{connection_id}", response_model=ConnectionResponse) async def get_connection( connection_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get a single connection detail.""" result = await db.execute( select(UserConnection) .where( UserConnection.id == connection_id, UserConnection.user_id == current_user.id, ) .options(selectinload(UserConnection.connected_user)) ) conn = result.scalar_one_or_none() if not conn: raise HTTPException(status_code=404, detail="Connection not found") conn_settings = await _get_settings_for_user(db, conn.connected_user_id) return ConnectionResponse( id=conn.id, connected_user_id=conn.connected_user_id, connected_umbral_name=conn.connected_user.umbral_name, connected_preferred_name=conn_settings.preferred_name if conn_settings else None, person_id=conn.person_id, created_at=conn.created_at, ) # ── GET /{id}/shared-profile ──────────────────────────────────────── @router.get("/{connection_id}/shared-profile") async def get_shared_profile( connection_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get the resolved shared profile for a connection.""" result = await db.execute( select(UserConnection) .where( UserConnection.id == connection_id, UserConnection.user_id == current_user.id, ) .options(selectinload(UserConnection.connected_user)) ) conn = result.scalar_one_or_none() if not conn: raise HTTPException(status_code=404, detail="Connection not found") conn_settings = await _get_settings_for_user(db, conn.connected_user_id) if not conn_settings: return {} return resolve_shared_profile( conn.connected_user, conn_settings, conn.sharing_overrides, ) # ── PUT /{id}/sharing-overrides ───────────────────────────────────── @router.put("/{connection_id}/sharing-overrides") async def update_sharing_overrides( body: SharingOverrideUpdate, connection_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Update what YOU share with a specific connection.""" # Find the connection where the OTHER user connects to YOU result = await db.execute( select(UserConnection).where( UserConnection.connected_user_id == current_user.id, UserConnection.user_id != current_user.id, ) ) # We need the reverse connection (where we are the connected_user) # Actually, we need to find the connection from the counterpart's perspective # The connection_id is OUR connection. The sharing overrides go on the # counterpart's connection row (since they determine what they see from us). # Wait — per the plan, sharing overrides control what WE share with THEM. # So they go on their connection row pointing to us. # First, get our connection to know who the counterpart is our_conn = await db.execute( select(UserConnection).where( UserConnection.id == connection_id, UserConnection.user_id == current_user.id, ) ) conn = our_conn.scalar_one_or_none() if not conn: raise HTTPException(status_code=404, detail="Connection not found") # Find the reverse connection (their row pointing to us) reverse_result = await db.execute( select(UserConnection).where( UserConnection.user_id == conn.connected_user_id, UserConnection.connected_user_id == current_user.id, ) ) reverse_conn = reverse_result.scalar_one_or_none() if not reverse_conn: raise HTTPException(status_code=404, detail="Reverse connection not found") # Build validated overrides dict — only SHAREABLE_FIELDS keys overrides = {} update_data = body.model_dump(exclude_unset=True) for key, value in update_data.items(): if key in SHAREABLE_FIELDS: overrides[key] = value reverse_conn.sharing_overrides = overrides if overrides else None await db.commit() return {"message": "Sharing overrides updated"} # ── DELETE /{id} ──────────────────────────────────────────────────── @router.delete("/{connection_id}", status_code=204) async def remove_connection( request: Request, connection_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Remove a connection. Removes BOTH UserConnection rows. Detaches BOTH Person records (sets linked_user_id=null, is_umbral_contact=false). Silent — no notification sent. """ # Get our connection result = await db.execute( select(UserConnection) .where( UserConnection.id == connection_id, UserConnection.user_id == current_user.id, ) ) conn = result.scalar_one_or_none() if not conn: raise HTTPException(status_code=404, detail="Connection not found") counterpart_id = conn.connected_user_id # Find reverse connection reverse_result = await db.execute( select(UserConnection).where( UserConnection.user_id == counterpart_id, UserConnection.connected_user_id == current_user.id, ) ) reverse_conn = reverse_result.scalar_one_or_none() # Detach Person records if conn.person_id: person_result = await db.execute(select(Person).where(Person.id == conn.person_id)) person = person_result.scalar_one_or_none() if person: await detach_umbral_contact(person) if reverse_conn and reverse_conn.person_id: person_result = await db.execute(select(Person).where(Person.id == reverse_conn.person_id)) person = person_result.scalar_one_or_none() if person: await detach_umbral_contact(person) # Delete both connections await db.delete(conn) if reverse_conn: await db.delete(reverse_conn) await log_audit_event( db, action="connection.removed", actor_id=current_user.id, target_id=counterpart_id, detail={"connection_id": connection_id}, ip=get_client_ip(request), ) await db.commit() return None