UMBRA/backend/app/routers/connections.py
Kyle Pope 0e94b6e1f7 Fix QA review findings: race condition, detached session, validation
- C-01: Wrap connection request flush in IntegrityError handler for
  TOCTOU race on partial unique index
- W-02: Extract ntfy config into plain dict before commit to avoid
  DetachedInstanceError in background tasks
- W-04: Add integer range validation (1–2147483647) on notification IDs
- W-07: Add typed response models for respond_to_request endpoint
- W-09: Document resolved_at requirement for future cancel endpoint
- S-02: Use Literal type for ConnectionRequestResponse.status
- S-04: Check ntfy master switch in extract_ntfy_config
- S-05: Move date import to module level in connection service

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 06:36:14 +08:00

684 lines
25 KiB
Python

"""
Connection router — search, request, respond, manage connections.
Security:
- Timing-safe search (50ms sleep floor)
- Per-receiver pending request cap (5 within 10 minutes)
- Atomic accept via UPDATE...WHERE status='pending' RETURNING *
- All endpoints scoped by current_user.id
- Audit logging for all connection events
"""
import asyncio
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Path, Query, Request
from sqlalchemy import select, func, and_, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.connection_request import ConnectionRequest
from app.models.notification import Notification
from app.models.person import Person
from app.models.settings import Settings
from app.models.user import User
from app.models.user_connection import UserConnection
from app.routers.auth import get_current_user
from app.schemas.connection import (
ConnectionRequestResponse,
ConnectionResponse,
RespondAcceptResponse,
RespondRejectResponse,
RespondRequest,
SendConnectionRequest,
SharingOverrideUpdate,
UmbralSearchRequest,
UmbralSearchResponse,
)
from app.services.audit import get_client_ip, log_audit_event
from app.services.connection import (
SHAREABLE_FIELDS,
create_person_from_connection,
detach_umbral_contact,
extract_ntfy_config,
resolve_shared_profile,
send_connection_ntfy,
)
from app.services.notification import create_notification
router = APIRouter()
# ── Helpers ──────────────────────────────────────────────────────────
async def _get_settings_for_user(db: AsyncSession, user_id: int) -> Settings | None:
result = await db.execute(select(Settings).where(Settings.user_id == user_id))
return result.scalar_one_or_none()
def _build_request_response(
req: ConnectionRequest,
sender: User,
sender_settings: Settings | None,
receiver: User,
receiver_settings: Settings | None,
) -> ConnectionRequestResponse:
return ConnectionRequestResponse(
id=req.id,
sender_umbral_name=sender.umbral_name,
sender_preferred_name=sender_settings.preferred_name if sender_settings else None,
receiver_umbral_name=receiver.umbral_name,
receiver_preferred_name=receiver_settings.preferred_name if receiver_settings else None,
status=req.status,
created_at=req.created_at,
)
# ── POST /search ────────────────────────────────────────────────────
@router.post("/search", response_model=UmbralSearchResponse)
async def search_user(
body: UmbralSearchRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Timing-safe user search. Always queries by umbral_name alone,
then checks accept_connections + is_active in Python.
Generic "not found" for non-existent, opted-out, AND inactive users.
50ms sleep floor to eliminate timing side-channel.
"""
# Always sleep to prevent timing attacks
await asyncio.sleep(0.05)
# Sender must have accept_connections enabled to search
sender_settings = await _get_settings_for_user(db, current_user.id)
if not sender_settings or not sender_settings.accept_connections:
return UmbralSearchResponse(found=False)
# Don't find yourself
if body.umbral_name == current_user.umbral_name:
return UmbralSearchResponse(found=False)
result = await db.execute(
select(User).where(User.umbral_name == body.umbral_name)
)
target = result.scalar_one_or_none()
if not target or not target.is_active:
return UmbralSearchResponse(found=False)
# Check if they accept connections
target_settings = await _get_settings_for_user(db, target.id)
if not target_settings or not target_settings.accept_connections:
return UmbralSearchResponse(found=False)
return UmbralSearchResponse(found=True)
# ── POST /request ───────────────────────────────────────────────────
@router.post("/request", response_model=ConnectionRequestResponse, status_code=201)
async def send_connection_request(
body: SendConnectionRequest,
request: Request,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Send a connection request to another user."""
# Resolve target
result = await db.execute(
select(User).where(User.umbral_name == body.umbral_name)
)
target = result.scalar_one_or_none()
if not target or not target.is_active:
raise HTTPException(status_code=404, detail="User not found")
# Self-request guard
if target.id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot send a connection request to yourself")
# Sender must have accept_connections enabled to participate
sender_settings = await _get_settings_for_user(db, current_user.id)
if not sender_settings or not sender_settings.accept_connections:
raise HTTPException(
status_code=403,
detail="You must enable 'Accept Connections' in your settings before sending requests",
)
# Check accept_connections on target
target_settings = await _get_settings_for_user(db, target.id)
if not target_settings or not target_settings.accept_connections:
raise HTTPException(status_code=404, detail="User not found")
# Check existing connection
existing_conn = await db.execute(
select(UserConnection).where(
UserConnection.user_id == current_user.id,
UserConnection.connected_user_id == target.id,
)
)
if existing_conn.scalar_one_or_none():
raise HTTPException(status_code=409, detail="Already connected")
# Check pending request in either direction
existing_req = await db.execute(
select(ConnectionRequest).where(
and_(
ConnectionRequest.status == "pending",
(
(ConnectionRequest.sender_id == current_user.id) & (ConnectionRequest.receiver_id == target.id)
) | (
(ConnectionRequest.sender_id == target.id) & (ConnectionRequest.receiver_id == current_user.id)
),
)
)
)
if existing_req.scalar_one_or_none():
raise HTTPException(status_code=409, detail="A pending request already exists")
# Per-receiver cap: max 5 pending requests within 10 minutes
ten_min_ago = datetime.now() - timedelta(minutes=10)
pending_count = await db.scalar(
select(func.count())
.select_from(ConnectionRequest)
.where(
ConnectionRequest.receiver_id == target.id,
ConnectionRequest.status == "pending",
ConnectionRequest.created_at >= ten_min_ago,
)
) or 0
if pending_count >= 5:
raise HTTPException(status_code=429, detail="Too many pending requests for this user")
# Create the request (IntegrityError guard for TOCTOU race on partial unique index)
conn_request = ConnectionRequest(
sender_id=current_user.id,
receiver_id=target.id,
)
db.add(conn_request)
try:
await db.flush() # populate conn_request.id for source_id
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=409, detail="A pending request already exists")
# Create in-app notification for receiver (sender_settings already fetched above)
sender_display = (sender_settings.preferred_name if sender_settings else None) or current_user.umbral_name
await create_notification(
db,
user_id=target.id,
type="connection_request",
title="New Connection Request",
message=f"{sender_display} wants to connect with you",
data={"sender_umbral_name": current_user.umbral_name},
source_type="connection_request",
source_id=conn_request.id,
)
await log_audit_event(
db,
action="connection.request_sent",
actor_id=current_user.id,
target_id=target.id,
detail={"receiver_umbral_name": target.umbral_name},
ip=get_client_ip(request),
)
# Extract ntfy config before commit (avoids detached SA object in background task)
target_ntfy = extract_ntfy_config(target_settings) if target_settings else None
await db.commit()
await db.refresh(conn_request)
# ntfy push in background (non-blocking)
background_tasks.add_task(
send_connection_ntfy,
target_ntfy,
sender_display,
"request_received",
)
return _build_request_response(conn_request, current_user, sender_settings, target, target_settings)
# ── GET /requests/incoming ──────────────────────────────────────────
@router.get("/requests/incoming", response_model=list[ConnectionRequestResponse])
async def get_incoming_requests(
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List pending connection requests received by the current user."""
offset = (page - 1) * per_page
result = await db.execute(
select(ConnectionRequest)
.where(
ConnectionRequest.receiver_id == current_user.id,
ConnectionRequest.status == "pending",
)
.options(selectinload(ConnectionRequest.sender))
.order_by(ConnectionRequest.created_at.desc())
.offset(offset)
.limit(per_page)
)
requests = result.scalars().all()
# Fetch current user's settings once, batch-fetch sender settings
receiver_settings = await _get_settings_for_user(db, current_user.id)
sender_ids = [req.sender_id for req in requests]
if sender_ids:
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(sender_ids)))
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
else:
settings_by_user = {}
responses = []
for req in requests:
sender_settings = settings_by_user.get(req.sender_id)
responses.append(_build_request_response(req, req.sender, sender_settings, current_user, receiver_settings))
return responses
# ── GET /requests/outgoing ──────────────────────────────────────────
@router.get("/requests/outgoing", response_model=list[ConnectionRequestResponse])
async def get_outgoing_requests(
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List pending connection requests sent by the current user."""
offset = (page - 1) * per_page
result = await db.execute(
select(ConnectionRequest)
.where(
ConnectionRequest.sender_id == current_user.id,
ConnectionRequest.status == "pending",
)
.options(selectinload(ConnectionRequest.receiver))
.order_by(ConnectionRequest.created_at.desc())
.offset(offset)
.limit(per_page)
)
requests = result.scalars().all()
# Fetch current user's settings once, batch-fetch receiver settings
sender_settings = await _get_settings_for_user(db, current_user.id)
receiver_ids = [req.receiver_id for req in requests]
if receiver_ids:
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(receiver_ids)))
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
else:
settings_by_user = {}
responses = []
for req in requests:
receiver_settings = settings_by_user.get(req.receiver_id)
responses.append(_build_request_response(req, current_user, sender_settings, req.receiver, receiver_settings))
return responses
# ── PUT /requests/{id}/respond ──────────────────────────────────────
@router.put("/requests/{request_id}/respond", response_model=RespondAcceptResponse | RespondRejectResponse)
async def respond_to_request(
body: RespondRequest,
request: Request,
background_tasks: BackgroundTasks,
request_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Accept or reject a connection request. Atomic via UPDATE...WHERE status='pending'."""
now = datetime.now()
# Atomic update — only succeeds if status is still 'pending' and receiver is current user
result = await db.execute(
update(ConnectionRequest)
.where(
ConnectionRequest.id == request_id,
ConnectionRequest.receiver_id == current_user.id,
ConnectionRequest.status == "pending",
)
.values(status=body.action + "ed", resolved_at=now)
.returning(ConnectionRequest.id, ConnectionRequest.sender_id, ConnectionRequest.receiver_id)
)
row = result.first()
if not row:
raise HTTPException(status_code=409, detail="Request not found or already resolved")
sender_id = row.sender_id
if body.action == "accept":
# Verify sender is still active
sender_result = await db.execute(select(User).where(User.id == sender_id))
sender = sender_result.scalar_one_or_none()
if not sender or not sender.is_active:
# Revert to rejected
await db.execute(
update(ConnectionRequest)
.where(ConnectionRequest.id == request_id)
.values(status="rejected")
)
await db.commit()
raise HTTPException(status_code=409, detail="Sender account is no longer active")
# Get settings for both users
sender_settings = await _get_settings_for_user(db, sender_id)
receiver_settings = await _get_settings_for_user(db, current_user.id)
# Resolve shared profiles for both directions
sender_shared = resolve_shared_profile(sender, sender_settings, None) if sender_settings else {}
receiver_shared = resolve_shared_profile(current_user, receiver_settings, None) if receiver_settings else {}
# Create Person records for both users
person_for_receiver = create_person_from_connection(
current_user.id, sender, sender_settings, sender_shared
)
person_for_sender = create_person_from_connection(
sender_id, current_user, receiver_settings, receiver_shared
)
db.add(person_for_receiver)
db.add(person_for_sender)
await db.flush() # populate person IDs
# Create bidirectional connections
conn_a = UserConnection(
user_id=current_user.id,
connected_user_id=sender_id,
person_id=person_for_receiver.id,
)
conn_b = UserConnection(
user_id=sender_id,
connected_user_id=current_user.id,
person_id=person_for_sender.id,
)
db.add(conn_a)
db.add(conn_b)
await db.flush() # populate conn_a.id for source_id
# Notification to sender
receiver_display = (receiver_settings.preferred_name if receiver_settings else None) or current_user.umbral_name
await create_notification(
db,
user_id=sender_id,
type="connection_accepted",
title="Connection Accepted",
message=f"{receiver_display} accepted your connection request",
data={"connected_umbral_name": current_user.umbral_name},
source_type="user_connection",
source_id=conn_b.id,
)
await log_audit_event(
db,
action="connection.accepted",
actor_id=current_user.id,
target_id=sender_id,
detail={"request_id": request_id},
ip=get_client_ip(request),
)
# Extract ntfy config before commit (avoids detached SA object in background task)
sender_ntfy = extract_ntfy_config(sender_settings) if sender_settings else None
await db.commit()
# ntfy push in background
background_tasks.add_task(
send_connection_ntfy,
sender_ntfy,
receiver_display,
"request_accepted",
)
return {"message": "Connection accepted", "connection_id": conn_a.id}
else:
# Reject — only create notification for receiver (not sender per plan)
await log_audit_event(
db,
action="connection.rejected",
actor_id=current_user.id,
target_id=sender_id,
detail={"request_id": request_id},
ip=get_client_ip(request),
)
await db.commit()
return {"message": "Connection request rejected"}
# ── GET / ───────────────────────────────────────────────────────────
@router.get("/", response_model=list[ConnectionResponse])
async def list_connections(
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all connections for the current user."""
offset = (page - 1) * per_page
result = await db.execute(
select(UserConnection)
.where(UserConnection.user_id == current_user.id)
.options(selectinload(UserConnection.connected_user))
.order_by(UserConnection.created_at.desc())
.offset(offset)
.limit(per_page)
)
connections = result.scalars().all()
# Batch-fetch settings for connected users
connected_ids = [conn.connected_user_id for conn in connections]
if connected_ids:
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(connected_ids)))
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
else:
settings_by_user = {}
responses = []
for conn in connections:
conn_settings = settings_by_user.get(conn.connected_user_id)
responses.append(ConnectionResponse(
id=conn.id,
connected_user_id=conn.connected_user_id,
connected_umbral_name=conn.connected_user.umbral_name,
connected_preferred_name=conn_settings.preferred_name if conn_settings else None,
person_id=conn.person_id,
created_at=conn.created_at,
))
return responses
# ── GET /{id} ───────────────────────────────────────────────────────
@router.get("/{connection_id}", response_model=ConnectionResponse)
async def get_connection(
connection_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get a single connection detail."""
result = await db.execute(
select(UserConnection)
.where(
UserConnection.id == connection_id,
UserConnection.user_id == current_user.id,
)
.options(selectinload(UserConnection.connected_user))
)
conn = result.scalar_one_or_none()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
conn_settings = await _get_settings_for_user(db, conn.connected_user_id)
return ConnectionResponse(
id=conn.id,
connected_user_id=conn.connected_user_id,
connected_umbral_name=conn.connected_user.umbral_name,
connected_preferred_name=conn_settings.preferred_name if conn_settings else None,
person_id=conn.person_id,
created_at=conn.created_at,
)
# ── GET /{id}/shared-profile ────────────────────────────────────────
@router.get("/{connection_id}/shared-profile")
async def get_shared_profile(
connection_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get the resolved shared profile for a connection."""
result = await db.execute(
select(UserConnection)
.where(
UserConnection.id == connection_id,
UserConnection.user_id == current_user.id,
)
.options(selectinload(UserConnection.connected_user))
)
conn = result.scalar_one_or_none()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
conn_settings = await _get_settings_for_user(db, conn.connected_user_id)
if not conn_settings:
return {}
return resolve_shared_profile(
conn.connected_user,
conn_settings,
conn.sharing_overrides,
)
# ── PUT /{id}/sharing-overrides ─────────────────────────────────────
@router.put("/{connection_id}/sharing-overrides")
async def update_sharing_overrides(
body: SharingOverrideUpdate,
connection_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update what YOU share with a specific connection."""
# Get our connection to know who the counterpart is
our_conn = await db.execute(
select(UserConnection).where(
UserConnection.id == connection_id,
UserConnection.user_id == current_user.id,
)
)
conn = our_conn.scalar_one_or_none()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
# Find the reverse connection (their row pointing to us)
reverse_result = await db.execute(
select(UserConnection).where(
UserConnection.user_id == conn.connected_user_id,
UserConnection.connected_user_id == current_user.id,
)
)
reverse_conn = reverse_result.scalar_one_or_none()
if not reverse_conn:
raise HTTPException(status_code=404, detail="Reverse connection not found")
# Merge validated overrides — only SHAREABLE_FIELDS keys
existing = dict(reverse_conn.sharing_overrides or {})
update_data = body.model_dump(exclude_unset=True)
for key, value in update_data.items():
if key in SHAREABLE_FIELDS:
if value is None:
existing.pop(key, None)
else:
existing[key] = value
reverse_conn.sharing_overrides = existing if existing else None
await db.commit()
return {"message": "Sharing overrides updated"}
# ── DELETE /{id} ────────────────────────────────────────────────────
@router.delete("/{connection_id}", status_code=204)
async def remove_connection(
request: Request,
connection_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Remove a connection. Removes BOTH UserConnection rows.
Detaches BOTH Person records (sets linked_user_id=null, is_umbral_contact=false).
Silent — no notification sent.
"""
# Get our connection
result = await db.execute(
select(UserConnection)
.where(
UserConnection.id == connection_id,
UserConnection.user_id == current_user.id,
)
)
conn = result.scalar_one_or_none()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
counterpart_id = conn.connected_user_id
# Find reverse connection
reverse_result = await db.execute(
select(UserConnection).where(
UserConnection.user_id == counterpart_id,
UserConnection.connected_user_id == current_user.id,
)
)
reverse_conn = reverse_result.scalar_one_or_none()
# Detach Person records
if conn.person_id:
person_result = await db.execute(select(Person).where(Person.id == conn.person_id))
person = person_result.scalar_one_or_none()
if person:
await detach_umbral_contact(person)
if reverse_conn and reverse_conn.person_id:
person_result = await db.execute(select(Person).where(Person.id == reverse_conn.person_id))
person = person_result.scalar_one_or_none()
if person:
await detach_umbral_contact(person)
# Delete both connections
await db.delete(conn)
if reverse_conn:
await db.delete(reverse_conn)
await log_audit_event(
db,
action="connection.removed",
actor_id=current_user.id,
target_id=counterpart_id,
detail={"connection_id": connection_id},
ip=get_client_ip(request),
)
await db.commit()
return None