""" Connection router — search, request, respond, manage connections. Security: - Timing-safe search (50ms sleep floor) - Per-receiver pending request cap (5 within 10 minutes) - Atomic accept via UPDATE...WHERE status='pending' RETURNING * - All endpoints scoped by current_user.id - Audit logging for all connection events """ import asyncio import logging from datetime import date as date_type, datetime, timedelta from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Path, Query, Request from sqlalchemy import delete, select, func, and_, update from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.database import get_db from app.models.connection_request import ConnectionRequest from app.models.notification import Notification from app.models.person import Person from app.models.settings import Settings from app.models.user import User from app.models.user_connection import UserConnection from app.routers.auth import get_current_user from app.schemas.connection import ( CancelResponse, ConnectionRequestResponse, ConnectionResponse, RespondAcceptResponse, RespondRejectResponse, RespondRequest, SendConnectionRequest, SharingOverrideUpdate, UmbralSearchRequest, UmbralSearchResponse, ) from app.services.audit import get_client_ip, log_audit_event from app.services.connection import ( NOTIF_TYPE_CONNECTION_ACCEPTED, NOTIF_TYPE_CONNECTION_REQUEST, SHAREABLE_FIELDS, create_person_from_connection, detach_umbral_contact, extract_ntfy_config, resolve_shared_profile, send_connection_ntfy, ) from app.services.notification import create_notification router = APIRouter() logger = logging.getLogger(__name__) # ── Helpers ────────────────────────────────────────────────────────── async def _get_settings_for_user(db: AsyncSession, user_id: int) -> Settings | None: result = await db.execute(select(Settings).where(Settings.user_id == user_id)) return result.scalar_one_or_none() def _build_request_response( req: ConnectionRequest, sender: User, sender_settings: Settings | None, receiver: User, receiver_settings: Settings | None, ) -> ConnectionRequestResponse: return ConnectionRequestResponse( id=req.id, sender_umbral_name=sender.umbral_name, sender_preferred_name=sender_settings.preferred_name if sender_settings else None, receiver_umbral_name=receiver.umbral_name, receiver_preferred_name=receiver_settings.preferred_name if receiver_settings else None, status=req.status, created_at=req.created_at, ) # ── POST /search ──────────────────────────────────────────────────── @router.post("/search", response_model=UmbralSearchResponse) async def search_user( body: UmbralSearchRequest, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Timing-safe user search. Always queries by umbral_name alone, then checks accept_connections + is_active in Python. Generic "not found" for non-existent, opted-out, AND inactive users. 50ms sleep floor to eliminate timing side-channel. """ # Always sleep to prevent timing attacks await asyncio.sleep(0.05) # Sender must have accept_connections enabled to search sender_settings = await _get_settings_for_user(db, current_user.id) if not sender_settings or not sender_settings.accept_connections: return UmbralSearchResponse(found=False) # Don't find yourself if body.umbral_name == current_user.umbral_name: return UmbralSearchResponse(found=False) result = await db.execute( select(User).where(User.umbral_name == body.umbral_name) ) target = result.scalar_one_or_none() if not target or not target.is_active: return UmbralSearchResponse(found=False) # Check if they accept connections target_settings = await _get_settings_for_user(db, target.id) if not target_settings or not target_settings.accept_connections: return UmbralSearchResponse(found=False) return UmbralSearchResponse(found=True) # ── POST /request ─────────────────────────────────────────────────── @router.post("/request", response_model=ConnectionRequestResponse, status_code=201) async def send_connection_request( body: SendConnectionRequest, request: Request, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Send a connection request to another user.""" # Resolve target result = await db.execute( select(User).where(User.umbral_name == body.umbral_name) ) target = result.scalar_one_or_none() if not target or not target.is_active: raise HTTPException(status_code=404, detail="User not found") # Self-request guard if target.id == current_user.id: raise HTTPException(status_code=400, detail="Cannot send a connection request to yourself") # Sender must have accept_connections enabled to participate sender_settings = await _get_settings_for_user(db, current_user.id) if not sender_settings or not sender_settings.accept_connections: raise HTTPException( status_code=403, detail="You must enable 'Accept Connections' in your settings before sending requests", ) # Check accept_connections on target target_settings = await _get_settings_for_user(db, target.id) if not target_settings or not target_settings.accept_connections: raise HTTPException(status_code=404, detail="User not found") # Check existing connection existing_conn = await db.execute( select(UserConnection).where( UserConnection.user_id == current_user.id, UserConnection.connected_user_id == target.id, ) ) if existing_conn.scalar_one_or_none(): raise HTTPException(status_code=409, detail="Already connected") # Check pending request in either direction existing_req = await db.execute( select(ConnectionRequest).where( and_( ConnectionRequest.status == "pending", ( (ConnectionRequest.sender_id == current_user.id) & (ConnectionRequest.receiver_id == target.id) ) | ( (ConnectionRequest.sender_id == target.id) & (ConnectionRequest.receiver_id == current_user.id) ), ) ) ) if existing_req.scalar_one_or_none(): raise HTTPException(status_code=409, detail="A pending request already exists") # Per-receiver cap: max 5 pending requests within 10 minutes ten_min_ago = datetime.now() - timedelta(minutes=10) pending_count = await db.scalar( select(func.count()) .select_from(ConnectionRequest) .where( ConnectionRequest.receiver_id == target.id, ConnectionRequest.status == "pending", ConnectionRequest.created_at >= ten_min_ago, ) ) or 0 if pending_count >= 5: raise HTTPException(status_code=429, detail="Too many pending requests for this user") # Validate person_id if provided (link existing standard contact) link_person_id = None if body.person_id is not None: person_result = await db.execute( select(Person).where(Person.id == body.person_id, Person.user_id == current_user.id) ) link_person = person_result.scalar_one_or_none() if not link_person: raise HTTPException(status_code=400, detail="Person not found or not owned by you") if link_person.is_umbral_contact: raise HTTPException(status_code=400, detail="Person is already an umbral contact") link_person_id = body.person_id # Create the request (IntegrityError guard for TOCTOU race on partial unique index) conn_request = ConnectionRequest( sender_id=current_user.id, receiver_id=target.id, person_id=link_person_id, ) db.add(conn_request) try: await db.flush() # populate conn_request.id for source_id except IntegrityError: await db.rollback() raise HTTPException(status_code=409, detail="A pending request already exists") # Create in-app notification for receiver (sender_settings already fetched above) sender_display = (sender_settings.preferred_name if sender_settings else None) or current_user.umbral_name await create_notification( db, user_id=target.id, type=NOTIF_TYPE_CONNECTION_REQUEST, title="New Connection Request", message=f"{sender_display} wants to connect with you", data={"sender_umbral_name": current_user.umbral_name}, source_type=NOTIF_TYPE_CONNECTION_REQUEST, source_id=conn_request.id, ) await log_audit_event( db, action="connection.request_sent", actor_id=current_user.id, target_id=target.id, detail={"receiver_umbral_name": target.umbral_name}, ip=get_client_ip(request), ) # Extract ntfy config before commit (avoids detached SA object in background task) target_ntfy = extract_ntfy_config(target_settings) if target_settings else None # Build response BEFORE commit — commit expires all ORM objects, and accessing # their attributes after commit triggers lazy loads → MissingGreenlet in async SA. response = _build_request_response(conn_request, current_user, sender_settings, target, target_settings) await db.commit() # ntfy push in background (non-blocking) background_tasks.add_task( send_connection_ntfy, target_ntfy, sender_display, "request_received", ) return response # ── GET /requests/incoming ────────────────────────────────────────── @router.get("/requests/incoming", response_model=list[ConnectionRequestResponse]) async def get_incoming_requests( page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """List pending connection requests received by the current user.""" offset = (page - 1) * per_page result = await db.execute( select(ConnectionRequest) .where( ConnectionRequest.receiver_id == current_user.id, ConnectionRequest.status == "pending", ) .options(selectinload(ConnectionRequest.sender)) .order_by(ConnectionRequest.created_at.desc()) .offset(offset) .limit(per_page) ) requests = result.scalars().all() # Fetch current user's settings once, batch-fetch sender settings receiver_settings = await _get_settings_for_user(db, current_user.id) sender_ids = [req.sender_id for req in requests] if sender_ids: settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(sender_ids))) settings_by_user = {s.user_id: s for s in settings_result.scalars().all()} else: settings_by_user = {} responses = [] for req in requests: sender_settings = settings_by_user.get(req.sender_id) responses.append(_build_request_response(req, req.sender, sender_settings, current_user, receiver_settings)) return responses # ── GET /requests/outgoing ────────────────────────────────────────── @router.get("/requests/outgoing", response_model=list[ConnectionRequestResponse]) async def get_outgoing_requests( page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """List pending connection requests sent by the current user.""" offset = (page - 1) * per_page result = await db.execute( select(ConnectionRequest) .where( ConnectionRequest.sender_id == current_user.id, ConnectionRequest.status == "pending", ) .options(selectinload(ConnectionRequest.receiver)) .order_by(ConnectionRequest.created_at.desc()) .offset(offset) .limit(per_page) ) requests = result.scalars().all() # Fetch current user's settings once, batch-fetch receiver settings sender_settings = await _get_settings_for_user(db, current_user.id) receiver_ids = [req.receiver_id for req in requests] if receiver_ids: settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(receiver_ids))) settings_by_user = {s.user_id: s for s in settings_result.scalars().all()} else: settings_by_user = {} responses = [] for req in requests: receiver_settings = settings_by_user.get(req.receiver_id) responses.append(_build_request_response(req, current_user, sender_settings, req.receiver, receiver_settings)) return responses # ── PUT /requests/{id}/respond ────────────────────────────────────── @router.put("/requests/{request_id}/respond", response_model=RespondAcceptResponse | RespondRejectResponse) async def respond_to_request( body: RespondRequest, request: Request, background_tasks: BackgroundTasks, request_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Accept or reject a connection request. Atomic via UPDATE...WHERE status='pending'.""" try: return await _respond_to_request_inner(body, request, background_tasks, request_id, db, current_user) except HTTPException: raise except Exception: # get_db middleware auto-rollbacks on unhandled exceptions logger.exception("Unhandled error in respond_to_request (request_id=%s, user=%s)", request_id, current_user.id) raise HTTPException(status_code=500, detail=f"Internal server error while processing connection response (request {request_id})") async def _respond_to_request_inner( body: RespondRequest, request: Request, background_tasks: BackgroundTasks, request_id: int, db: AsyncSession, current_user: User, ) -> RespondAcceptResponse | RespondRejectResponse: now = datetime.now() # Atomic update — only succeeds if status is still 'pending' and receiver is current user result = await db.execute( update(ConnectionRequest) .where( ConnectionRequest.id == request_id, ConnectionRequest.receiver_id == current_user.id, ConnectionRequest.status == "pending", ) .values(status=body.action + "ed", resolved_at=now) .returning( ConnectionRequest.id, ConnectionRequest.sender_id, ConnectionRequest.receiver_id, ConnectionRequest.person_id, ) ) row = result.first() if not row: raise HTTPException(status_code=409, detail="Request not found or already resolved") sender_id = row.sender_id request_person_id = row.person_id if body.action == "accept": # Verify sender is still active sender_result = await db.execute(select(User).where(User.id == sender_id)) sender = sender_result.scalar_one_or_none() if not sender or not sender.is_active: # Revert to rejected await db.execute( update(ConnectionRequest) .where(ConnectionRequest.id == request_id) .values(status="rejected") ) await db.commit() raise HTTPException(status_code=409, detail="Sender account is no longer active") # Get settings for both users sender_settings = await _get_settings_for_user(db, sender_id) receiver_settings = await _get_settings_for_user(db, current_user.id) # Resolve shared profiles for both directions sender_shared = resolve_shared_profile(sender, sender_settings, None) if sender_settings else {} receiver_shared = resolve_shared_profile(current_user, receiver_settings, None) if receiver_settings else {} # Create Person records for both users person_for_receiver = create_person_from_connection( current_user.id, sender, sender_settings, sender_shared ) db.add(person_for_receiver) # Sender side: reuse existing Person if person_id was provided on the request person_for_sender = None if request_person_id: existing_result = await db.execute( select(Person).where(Person.id == request_person_id) ) existing_person = existing_result.scalar_one_or_none() # Re-validate at accept time: ownership must match sender, # and must not already be umbral (prevents double-conversion races) if existing_person and existing_person.user_id == sender_id and not existing_person.is_umbral_contact: # Convert existing standard contact to umbral existing_person.linked_user_id = current_user.id existing_person.is_umbral_contact = True existing_person.category = "Umbral" # Update from shared profile first_name = receiver_shared.get("first_name") or receiver_shared.get("preferred_name") or current_user.umbral_name last_name = receiver_shared.get("last_name") existing_person.first_name = first_name existing_person.last_name = last_name existing_person.email = receiver_shared.get("email") or existing_person.email existing_person.phone = receiver_shared.get("phone") or existing_person.phone existing_person.mobile = receiver_shared.get("mobile") or existing_person.mobile existing_person.address = receiver_shared.get("address") or existing_person.address existing_person.company = receiver_shared.get("company") or existing_person.company existing_person.job_title = receiver_shared.get("job_title") or existing_person.job_title # Sync birthday from shared profile birthday_str = receiver_shared.get("birthday") if birthday_str: try: existing_person.birthday = date_type.fromisoformat(birthday_str) except (ValueError, TypeError): pass # Recompute display name full = ((first_name or '') + ' ' + (last_name or '')).strip() existing_person.name = full or current_user.umbral_name person_for_sender = existing_person if person_for_sender is None: person_for_sender = create_person_from_connection( sender_id, current_user, receiver_settings, receiver_shared ) db.add(person_for_sender) try: await db.flush() # populate person IDs except IntegrityError: await db.rollback() raise HTTPException(status_code=409, detail="Connection already exists") # Create bidirectional connections conn_a = UserConnection( user_id=current_user.id, connected_user_id=sender_id, person_id=person_for_receiver.id, ) conn_b = UserConnection( user_id=sender_id, connected_user_id=current_user.id, person_id=person_for_sender.id, ) db.add(conn_a) db.add(conn_b) try: await db.flush() # populate conn_a.id for source_id except IntegrityError: await db.rollback() raise HTTPException(status_code=409, detail="Connection already exists") # Notification to sender receiver_display = (receiver_settings.preferred_name if receiver_settings else None) or current_user.umbral_name await create_notification( db, user_id=sender_id, type=NOTIF_TYPE_CONNECTION_ACCEPTED, title="Connection Accepted", message=f"{receiver_display} accepted your connection request", data={"connected_umbral_name": current_user.umbral_name}, source_type="user_connection", source_id=conn_b.id, ) await log_audit_event( db, action="connection.accepted", actor_id=current_user.id, target_id=sender_id, detail={"request_id": request_id}, ip=get_client_ip(request), ) # Extract ntfy config before commit (avoids detached SA object in background task) sender_ntfy = extract_ntfy_config(sender_settings) if sender_settings else None try: await db.commit() except IntegrityError: await db.rollback() raise HTTPException(status_code=409, detail="Connection already exists") # ntfy push in background background_tasks.add_task( send_connection_ntfy, sender_ntfy, receiver_display, "request_accepted", ) return {"message": "Connection accepted", "connection_id": conn_a.id} else: # Reject — only create notification for receiver (not sender per plan) await log_audit_event( db, action="connection.rejected", actor_id=current_user.id, target_id=sender_id, detail={"request_id": request_id}, ip=get_client_ip(request), ) await db.commit() return {"message": "Connection request rejected"} # ── PUT /requests/{id}/cancel ────────────────────────────────────── @router.put("/requests/{request_id}/cancel", response_model=CancelResponse) async def cancel_request( request: Request, request_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Cancel an outgoing connection request. Atomic via UPDATE...WHERE status='pending'.""" now = datetime.now() # Atomic update — only succeeds if sender is current user and status is still pending result = await db.execute( update(ConnectionRequest) .where( ConnectionRequest.id == request_id, ConnectionRequest.sender_id == current_user.id, ConnectionRequest.status == "pending", ) .values(status="cancelled", resolved_at=now) .returning(ConnectionRequest.id, ConnectionRequest.receiver_id) ) row = result.first() if not row: raise HTTPException(status_code=409, detail="Request not found or already resolved") receiver_id = row.receiver_id # Silent cleanup: remove the notification sent to the receiver await db.execute( delete(Notification).where( Notification.source_type == "connection_request", Notification.source_id == request_id, Notification.user_id == receiver_id, ) ) # Look up receiver umbral_name for audit detail receiver_result = await db.execute(select(User.umbral_name).where(User.id == receiver_id)) receiver_umbral_name = receiver_result.scalar_one_or_none() or "unknown" await log_audit_event( db, action="connection.request_cancelled", actor_id=current_user.id, target_id=receiver_id, detail={"request_id": request_id, "receiver_umbral_name": receiver_umbral_name}, ip=get_client_ip(request), ) await db.commit() return {"message": "Connection request cancelled"} # ── GET / ─────────────────────────────────────────────────────────── @router.get("/", response_model=list[ConnectionResponse]) async def list_connections( page: int = Query(1, ge=1), per_page: int = Query(50, ge=1, le=100), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """List all connections for the current user.""" offset = (page - 1) * per_page result = await db.execute( select(UserConnection) .where(UserConnection.user_id == current_user.id) .options(selectinload(UserConnection.connected_user)) .order_by(UserConnection.created_at.desc()) .offset(offset) .limit(per_page) ) connections = result.scalars().all() # Batch-fetch settings for connected users connected_ids = [conn.connected_user_id for conn in connections] if connected_ids: settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(connected_ids))) settings_by_user = {s.user_id: s for s in settings_result.scalars().all()} else: settings_by_user = {} responses = [] for conn in connections: conn_settings = settings_by_user.get(conn.connected_user_id) responses.append(ConnectionResponse( id=conn.id, connected_user_id=conn.connected_user_id, connected_umbral_name=conn.connected_user.umbral_name, connected_preferred_name=conn_settings.preferred_name if conn_settings else None, person_id=conn.person_id, created_at=conn.created_at, )) return responses # ── GET /{id} ─────────────────────────────────────────────────────── @router.get("/{connection_id}", response_model=ConnectionResponse) async def get_connection( connection_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get a single connection detail.""" result = await db.execute( select(UserConnection) .where( UserConnection.id == connection_id, UserConnection.user_id == current_user.id, ) .options(selectinload(UserConnection.connected_user)) ) conn = result.scalar_one_or_none() if not conn: raise HTTPException(status_code=404, detail="Connection not found") conn_settings = await _get_settings_for_user(db, conn.connected_user_id) return ConnectionResponse( id=conn.id, connected_user_id=conn.connected_user_id, connected_umbral_name=conn.connected_user.umbral_name, connected_preferred_name=conn_settings.preferred_name if conn_settings else None, person_id=conn.person_id, created_at=conn.created_at, ) # ── GET /{id}/shared-profile ──────────────────────────────────────── @router.get("/{connection_id}/shared-profile") async def get_shared_profile( connection_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Get the resolved shared profile for a connection.""" result = await db.execute( select(UserConnection) .where( UserConnection.id == connection_id, UserConnection.user_id == current_user.id, ) .options(selectinload(UserConnection.connected_user)) ) conn = result.scalar_one_or_none() if not conn: raise HTTPException(status_code=404, detail="Connection not found") conn_settings = await _get_settings_for_user(db, conn.connected_user_id) if not conn_settings: return {} return resolve_shared_profile( conn.connected_user, conn_settings, conn.sharing_overrides, ) # ── PUT /{id}/sharing-overrides ───────────────────────────────────── @router.put("/{connection_id}/sharing-overrides") async def update_sharing_overrides( body: SharingOverrideUpdate, connection_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Update what YOU share with a specific connection.""" # Get our connection to know who the counterpart is our_conn = await db.execute( select(UserConnection).where( UserConnection.id == connection_id, UserConnection.user_id == current_user.id, ) ) conn = our_conn.scalar_one_or_none() if not conn: raise HTTPException(status_code=404, detail="Connection not found") # Find the reverse connection (their row pointing to us) reverse_result = await db.execute( select(UserConnection).where( UserConnection.user_id == conn.connected_user_id, UserConnection.connected_user_id == current_user.id, ) ) reverse_conn = reverse_result.scalar_one_or_none() if not reverse_conn: raise HTTPException(status_code=404, detail="Reverse connection not found") # Merge validated overrides — only SHAREABLE_FIELDS keys existing = dict(reverse_conn.sharing_overrides or {}) update_data = body.model_dump(exclude_unset=True) for key, value in update_data.items(): if key in SHAREABLE_FIELDS: if value is None: existing.pop(key, None) else: existing[key] = value reverse_conn.sharing_overrides = existing if existing else None await db.commit() return {"message": "Sharing overrides updated"} # ── DELETE /{id} ──────────────────────────────────────────────────── @router.delete("/{connection_id}", status_code=204) async def remove_connection( request: Request, connection_id: int = Path(ge=1, le=2147483647), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Remove a connection. Removes BOTH UserConnection rows. Detaches BOTH Person records (sets linked_user_id=null, is_umbral_contact=false). Silent — no notification sent. """ # Get our connection result = await db.execute( select(UserConnection) .where( UserConnection.id == connection_id, UserConnection.user_id == current_user.id, ) ) conn = result.scalar_one_or_none() if not conn: raise HTTPException(status_code=404, detail="Connection not found") counterpart_id = conn.connected_user_id # Find reverse connection reverse_result = await db.execute( select(UserConnection).where( UserConnection.user_id == counterpart_id, UserConnection.connected_user_id == current_user.id, ) ) reverse_conn = reverse_result.scalar_one_or_none() # Detach Person records if conn.person_id: person_result = await db.execute(select(Person).where(Person.id == conn.person_id)) person = person_result.scalar_one_or_none() if person: await detach_umbral_contact(person) if reverse_conn and reverse_conn.person_id: person_result = await db.execute(select(Person).where(Person.id == reverse_conn.person_id)) person = person_result.scalar_one_or_none() if person: await detach_umbral_contact(person) # Delete both connections await db.delete(conn) if reverse_conn: await db.delete(reverse_conn) await log_audit_event( db, action="connection.removed", actor_id=current_user.id, target_id=counterpart_id, detail={"connection_id": connection_id}, ip=get_client_ip(request), ) await db.commit() return None