UMBRA/backend/app/routers/connections.py
Kyle Pope 73cef1df55 Add umbral name header, preferred name field, and link button for contacts
- Inject umbral_name into shared_fields for umbral contacts (always visible)
- Show @umbralname subtitle in detail panel header
- Add preferred_name to panel fields with synced label for umbral contacts
- Add Link button on standard contacts to tie to umbral user via connection request
- Migration 046: person_id FK on connection_requests with index
- Validate person_id ownership on send, re-validate + convert on accept

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 08:37:01 +08:00

793 lines
30 KiB
Python

"""
Connection router — search, request, respond, manage connections.
Security:
- Timing-safe search (50ms sleep floor)
- Per-receiver pending request cap (5 within 10 minutes)
- Atomic accept via UPDATE...WHERE status='pending' RETURNING *
- All endpoints scoped by current_user.id
- Audit logging for all connection events
"""
import asyncio
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Path, Query, Request
from sqlalchemy import delete, select, func, and_, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.connection_request import ConnectionRequest
from app.models.notification import Notification
from app.models.person import Person
from app.models.settings import Settings
from app.models.user import User
from app.models.user_connection import UserConnection
from app.routers.auth import get_current_user
from app.schemas.connection import (
CancelResponse,
ConnectionRequestResponse,
ConnectionResponse,
RespondAcceptResponse,
RespondRejectResponse,
RespondRequest,
SendConnectionRequest,
SharingOverrideUpdate,
UmbralSearchRequest,
UmbralSearchResponse,
)
from app.services.audit import get_client_ip, log_audit_event
from app.services.connection import (
SHAREABLE_FIELDS,
create_person_from_connection,
detach_umbral_contact,
extract_ntfy_config,
resolve_shared_profile,
send_connection_ntfy,
)
from app.services.notification import create_notification
router = APIRouter()
# ── Helpers ──────────────────────────────────────────────────────────
async def _get_settings_for_user(db: AsyncSession, user_id: int) -> Settings | None:
result = await db.execute(select(Settings).where(Settings.user_id == user_id))
return result.scalar_one_or_none()
def _build_request_response(
req: ConnectionRequest,
sender: User,
sender_settings: Settings | None,
receiver: User,
receiver_settings: Settings | None,
) -> ConnectionRequestResponse:
return ConnectionRequestResponse(
id=req.id,
sender_umbral_name=sender.umbral_name,
sender_preferred_name=sender_settings.preferred_name if sender_settings else None,
receiver_umbral_name=receiver.umbral_name,
receiver_preferred_name=receiver_settings.preferred_name if receiver_settings else None,
status=req.status,
created_at=req.created_at,
)
# ── POST /search ────────────────────────────────────────────────────
@router.post("/search", response_model=UmbralSearchResponse)
async def search_user(
body: UmbralSearchRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Timing-safe user search. Always queries by umbral_name alone,
then checks accept_connections + is_active in Python.
Generic "not found" for non-existent, opted-out, AND inactive users.
50ms sleep floor to eliminate timing side-channel.
"""
# Always sleep to prevent timing attacks
await asyncio.sleep(0.05)
# Sender must have accept_connections enabled to search
sender_settings = await _get_settings_for_user(db, current_user.id)
if not sender_settings or not sender_settings.accept_connections:
return UmbralSearchResponse(found=False)
# Don't find yourself
if body.umbral_name == current_user.umbral_name:
return UmbralSearchResponse(found=False)
result = await db.execute(
select(User).where(User.umbral_name == body.umbral_name)
)
target = result.scalar_one_or_none()
if not target or not target.is_active:
return UmbralSearchResponse(found=False)
# Check if they accept connections
target_settings = await _get_settings_for_user(db, target.id)
if not target_settings or not target_settings.accept_connections:
return UmbralSearchResponse(found=False)
return UmbralSearchResponse(found=True)
# ── POST /request ───────────────────────────────────────────────────
@router.post("/request", response_model=ConnectionRequestResponse, status_code=201)
async def send_connection_request(
body: SendConnectionRequest,
request: Request,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Send a connection request to another user."""
# Resolve target
result = await db.execute(
select(User).where(User.umbral_name == body.umbral_name)
)
target = result.scalar_one_or_none()
if not target or not target.is_active:
raise HTTPException(status_code=404, detail="User not found")
# Self-request guard
if target.id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot send a connection request to yourself")
# Sender must have accept_connections enabled to participate
sender_settings = await _get_settings_for_user(db, current_user.id)
if not sender_settings or not sender_settings.accept_connections:
raise HTTPException(
status_code=403,
detail="You must enable 'Accept Connections' in your settings before sending requests",
)
# Check accept_connections on target
target_settings = await _get_settings_for_user(db, target.id)
if not target_settings or not target_settings.accept_connections:
raise HTTPException(status_code=404, detail="User not found")
# Check existing connection
existing_conn = await db.execute(
select(UserConnection).where(
UserConnection.user_id == current_user.id,
UserConnection.connected_user_id == target.id,
)
)
if existing_conn.scalar_one_or_none():
raise HTTPException(status_code=409, detail="Already connected")
# Check pending request in either direction
existing_req = await db.execute(
select(ConnectionRequest).where(
and_(
ConnectionRequest.status == "pending",
(
(ConnectionRequest.sender_id == current_user.id) & (ConnectionRequest.receiver_id == target.id)
) | (
(ConnectionRequest.sender_id == target.id) & (ConnectionRequest.receiver_id == current_user.id)
),
)
)
)
if existing_req.scalar_one_or_none():
raise HTTPException(status_code=409, detail="A pending request already exists")
# Per-receiver cap: max 5 pending requests within 10 minutes
ten_min_ago = datetime.now() - timedelta(minutes=10)
pending_count = await db.scalar(
select(func.count())
.select_from(ConnectionRequest)
.where(
ConnectionRequest.receiver_id == target.id,
ConnectionRequest.status == "pending",
ConnectionRequest.created_at >= ten_min_ago,
)
) or 0
if pending_count >= 5:
raise HTTPException(status_code=429, detail="Too many pending requests for this user")
# Validate person_id if provided (link existing standard contact)
link_person_id = None
if body.person_id is not None:
person_result = await db.execute(
select(Person).where(Person.id == body.person_id, Person.user_id == current_user.id)
)
link_person = person_result.scalar_one_or_none()
if not link_person:
raise HTTPException(status_code=400, detail="Person not found or not owned by you")
if link_person.is_umbral_contact:
raise HTTPException(status_code=400, detail="Person is already an umbral contact")
link_person_id = body.person_id
# Create the request (IntegrityError guard for TOCTOU race on partial unique index)
conn_request = ConnectionRequest(
sender_id=current_user.id,
receiver_id=target.id,
person_id=link_person_id,
)
db.add(conn_request)
try:
await db.flush() # populate conn_request.id for source_id
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=409, detail="A pending request already exists")
# Create in-app notification for receiver (sender_settings already fetched above)
sender_display = (sender_settings.preferred_name if sender_settings else None) or current_user.umbral_name
await create_notification(
db,
user_id=target.id,
type="connection_request",
title="New Connection Request",
message=f"{sender_display} wants to connect with you",
data={"sender_umbral_name": current_user.umbral_name},
source_type="connection_request",
source_id=conn_request.id,
)
await log_audit_event(
db,
action="connection.request_sent",
actor_id=current_user.id,
target_id=target.id,
detail={"receiver_umbral_name": target.umbral_name},
ip=get_client_ip(request),
)
# Extract ntfy config before commit (avoids detached SA object in background task)
target_ntfy = extract_ntfy_config(target_settings) if target_settings else None
await db.commit()
await db.refresh(conn_request)
# ntfy push in background (non-blocking)
background_tasks.add_task(
send_connection_ntfy,
target_ntfy,
sender_display,
"request_received",
)
return _build_request_response(conn_request, current_user, sender_settings, target, target_settings)
# ── GET /requests/incoming ──────────────────────────────────────────
@router.get("/requests/incoming", response_model=list[ConnectionRequestResponse])
async def get_incoming_requests(
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List pending connection requests received by the current user."""
offset = (page - 1) * per_page
result = await db.execute(
select(ConnectionRequest)
.where(
ConnectionRequest.receiver_id == current_user.id,
ConnectionRequest.status == "pending",
)
.options(selectinload(ConnectionRequest.sender))
.order_by(ConnectionRequest.created_at.desc())
.offset(offset)
.limit(per_page)
)
requests = result.scalars().all()
# Fetch current user's settings once, batch-fetch sender settings
receiver_settings = await _get_settings_for_user(db, current_user.id)
sender_ids = [req.sender_id for req in requests]
if sender_ids:
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(sender_ids)))
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
else:
settings_by_user = {}
responses = []
for req in requests:
sender_settings = settings_by_user.get(req.sender_id)
responses.append(_build_request_response(req, req.sender, sender_settings, current_user, receiver_settings))
return responses
# ── GET /requests/outgoing ──────────────────────────────────────────
@router.get("/requests/outgoing", response_model=list[ConnectionRequestResponse])
async def get_outgoing_requests(
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List pending connection requests sent by the current user."""
offset = (page - 1) * per_page
result = await db.execute(
select(ConnectionRequest)
.where(
ConnectionRequest.sender_id == current_user.id,
ConnectionRequest.status == "pending",
)
.options(selectinload(ConnectionRequest.receiver))
.order_by(ConnectionRequest.created_at.desc())
.offset(offset)
.limit(per_page)
)
requests = result.scalars().all()
# Fetch current user's settings once, batch-fetch receiver settings
sender_settings = await _get_settings_for_user(db, current_user.id)
receiver_ids = [req.receiver_id for req in requests]
if receiver_ids:
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(receiver_ids)))
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
else:
settings_by_user = {}
responses = []
for req in requests:
receiver_settings = settings_by_user.get(req.receiver_id)
responses.append(_build_request_response(req, current_user, sender_settings, req.receiver, receiver_settings))
return responses
# ── PUT /requests/{id}/respond ──────────────────────────────────────
@router.put("/requests/{request_id}/respond", response_model=RespondAcceptResponse | RespondRejectResponse)
async def respond_to_request(
body: RespondRequest,
request: Request,
background_tasks: BackgroundTasks,
request_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Accept or reject a connection request. Atomic via UPDATE...WHERE status='pending'."""
now = datetime.now()
# Atomic update — only succeeds if status is still 'pending' and receiver is current user
result = await db.execute(
update(ConnectionRequest)
.where(
ConnectionRequest.id == request_id,
ConnectionRequest.receiver_id == current_user.id,
ConnectionRequest.status == "pending",
)
.values(status=body.action + "ed", resolved_at=now)
.returning(
ConnectionRequest.id,
ConnectionRequest.sender_id,
ConnectionRequest.receiver_id,
ConnectionRequest.person_id,
)
)
row = result.first()
if not row:
raise HTTPException(status_code=409, detail="Request not found or already resolved")
sender_id = row.sender_id
request_person_id = row.person_id
if body.action == "accept":
# Verify sender is still active
sender_result = await db.execute(select(User).where(User.id == sender_id))
sender = sender_result.scalar_one_or_none()
if not sender or not sender.is_active:
# Revert to rejected
await db.execute(
update(ConnectionRequest)
.where(ConnectionRequest.id == request_id)
.values(status="rejected")
)
await db.commit()
raise HTTPException(status_code=409, detail="Sender account is no longer active")
# Get settings for both users
sender_settings = await _get_settings_for_user(db, sender_id)
receiver_settings = await _get_settings_for_user(db, current_user.id)
# Resolve shared profiles for both directions
sender_shared = resolve_shared_profile(sender, sender_settings, None) if sender_settings else {}
receiver_shared = resolve_shared_profile(current_user, receiver_settings, None) if receiver_settings else {}
# Create Person records for both users
person_for_receiver = create_person_from_connection(
current_user.id, sender, sender_settings, sender_shared
)
db.add(person_for_receiver)
# Sender side: reuse existing Person if person_id was provided on the request
person_for_sender = None
if request_person_id:
existing_result = await db.execute(
select(Person).where(Person.id == request_person_id)
)
existing_person = existing_result.scalar_one_or_none()
# Re-validate at accept time (C-01, W-01): ownership must match sender,
# and must not already be umbral (prevents double-conversion races)
if existing_person and existing_person.user_id == sender_id and not existing_person.is_umbral_contact:
# Convert existing standard contact to umbral
existing_person.linked_user_id = current_user.id
existing_person.is_umbral_contact = True
existing_person.category = "Umbral"
# Update from shared profile
first_name = receiver_shared.get("first_name") or receiver_shared.get("preferred_name") or current_user.umbral_name
last_name = receiver_shared.get("last_name")
existing_person.first_name = first_name
existing_person.last_name = last_name
existing_person.email = receiver_shared.get("email") or existing_person.email
existing_person.phone = receiver_shared.get("phone") or existing_person.phone
existing_person.mobile = receiver_shared.get("mobile") or existing_person.mobile
existing_person.address = receiver_shared.get("address") or existing_person.address
existing_person.company = receiver_shared.get("company") or existing_person.company
existing_person.job_title = receiver_shared.get("job_title") or existing_person.job_title
# Recompute display name
full = ((first_name or '') + ' ' + (last_name or '')).strip()
existing_person.name = full or current_user.umbral_name
person_for_sender = existing_person
if person_for_sender is None:
person_for_sender = create_person_from_connection(
sender_id, current_user, receiver_settings, receiver_shared
)
db.add(person_for_sender)
await db.flush() # populate person IDs
# Create bidirectional connections
conn_a = UserConnection(
user_id=current_user.id,
connected_user_id=sender_id,
person_id=person_for_receiver.id,
)
conn_b = UserConnection(
user_id=sender_id,
connected_user_id=current_user.id,
person_id=person_for_sender.id,
)
db.add(conn_a)
db.add(conn_b)
await db.flush() # populate conn_a.id for source_id
# Notification to sender
receiver_display = (receiver_settings.preferred_name if receiver_settings else None) or current_user.umbral_name
await create_notification(
db,
user_id=sender_id,
type="connection_accepted",
title="Connection Accepted",
message=f"{receiver_display} accepted your connection request",
data={"connected_umbral_name": current_user.umbral_name},
source_type="user_connection",
source_id=conn_b.id,
)
await log_audit_event(
db,
action="connection.accepted",
actor_id=current_user.id,
target_id=sender_id,
detail={"request_id": request_id},
ip=get_client_ip(request),
)
# Extract ntfy config before commit (avoids detached SA object in background task)
sender_ntfy = extract_ntfy_config(sender_settings) if sender_settings else None
await db.commit()
# ntfy push in background
background_tasks.add_task(
send_connection_ntfy,
sender_ntfy,
receiver_display,
"request_accepted",
)
return {"message": "Connection accepted", "connection_id": conn_a.id}
else:
# Reject — only create notification for receiver (not sender per plan)
await log_audit_event(
db,
action="connection.rejected",
actor_id=current_user.id,
target_id=sender_id,
detail={"request_id": request_id},
ip=get_client_ip(request),
)
await db.commit()
return {"message": "Connection request rejected"}
# ── PUT /requests/{id}/cancel ──────────────────────────────────────
@router.put("/requests/{request_id}/cancel", response_model=CancelResponse)
async def cancel_request(
request: Request,
request_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Cancel an outgoing connection request. Atomic via UPDATE...WHERE status='pending'."""
now = datetime.now()
# Atomic update — only succeeds if sender is current user and status is still pending
result = await db.execute(
update(ConnectionRequest)
.where(
ConnectionRequest.id == request_id,
ConnectionRequest.sender_id == current_user.id,
ConnectionRequest.status == "pending",
)
.values(status="cancelled", resolved_at=now)
.returning(ConnectionRequest.id, ConnectionRequest.receiver_id)
)
row = result.first()
if not row:
raise HTTPException(status_code=409, detail="Request not found or already resolved")
receiver_id = row.receiver_id
# Silent cleanup: remove the notification sent to the receiver
await db.execute(
delete(Notification).where(
Notification.source_type == "connection_request",
Notification.source_id == request_id,
Notification.user_id == receiver_id,
)
)
# Look up receiver umbral_name for audit detail
receiver_result = await db.execute(select(User.umbral_name).where(User.id == receiver_id))
receiver_umbral_name = receiver_result.scalar_one_or_none() or "unknown"
await log_audit_event(
db,
action="connection.request_cancelled",
actor_id=current_user.id,
target_id=receiver_id,
detail={"request_id": request_id, "receiver_umbral_name": receiver_umbral_name},
ip=get_client_ip(request),
)
await db.commit()
return {"message": "Connection request cancelled"}
# ── GET / ───────────────────────────────────────────────────────────
@router.get("/", response_model=list[ConnectionResponse])
async def list_connections(
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all connections for the current user."""
offset = (page - 1) * per_page
result = await db.execute(
select(UserConnection)
.where(UserConnection.user_id == current_user.id)
.options(selectinload(UserConnection.connected_user))
.order_by(UserConnection.created_at.desc())
.offset(offset)
.limit(per_page)
)
connections = result.scalars().all()
# Batch-fetch settings for connected users
connected_ids = [conn.connected_user_id for conn in connections]
if connected_ids:
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(connected_ids)))
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
else:
settings_by_user = {}
responses = []
for conn in connections:
conn_settings = settings_by_user.get(conn.connected_user_id)
responses.append(ConnectionResponse(
id=conn.id,
connected_user_id=conn.connected_user_id,
connected_umbral_name=conn.connected_user.umbral_name,
connected_preferred_name=conn_settings.preferred_name if conn_settings else None,
person_id=conn.person_id,
created_at=conn.created_at,
))
return responses
# ── GET /{id} ───────────────────────────────────────────────────────
@router.get("/{connection_id}", response_model=ConnectionResponse)
async def get_connection(
connection_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get a single connection detail."""
result = await db.execute(
select(UserConnection)
.where(
UserConnection.id == connection_id,
UserConnection.user_id == current_user.id,
)
.options(selectinload(UserConnection.connected_user))
)
conn = result.scalar_one_or_none()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
conn_settings = await _get_settings_for_user(db, conn.connected_user_id)
return ConnectionResponse(
id=conn.id,
connected_user_id=conn.connected_user_id,
connected_umbral_name=conn.connected_user.umbral_name,
connected_preferred_name=conn_settings.preferred_name if conn_settings else None,
person_id=conn.person_id,
created_at=conn.created_at,
)
# ── GET /{id}/shared-profile ────────────────────────────────────────
@router.get("/{connection_id}/shared-profile")
async def get_shared_profile(
connection_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get the resolved shared profile for a connection."""
result = await db.execute(
select(UserConnection)
.where(
UserConnection.id == connection_id,
UserConnection.user_id == current_user.id,
)
.options(selectinload(UserConnection.connected_user))
)
conn = result.scalar_one_or_none()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
conn_settings = await _get_settings_for_user(db, conn.connected_user_id)
if not conn_settings:
return {}
return resolve_shared_profile(
conn.connected_user,
conn_settings,
conn.sharing_overrides,
)
# ── PUT /{id}/sharing-overrides ─────────────────────────────────────
@router.put("/{connection_id}/sharing-overrides")
async def update_sharing_overrides(
body: SharingOverrideUpdate,
connection_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update what YOU share with a specific connection."""
# Get our connection to know who the counterpart is
our_conn = await db.execute(
select(UserConnection).where(
UserConnection.id == connection_id,
UserConnection.user_id == current_user.id,
)
)
conn = our_conn.scalar_one_or_none()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
# Find the reverse connection (their row pointing to us)
reverse_result = await db.execute(
select(UserConnection).where(
UserConnection.user_id == conn.connected_user_id,
UserConnection.connected_user_id == current_user.id,
)
)
reverse_conn = reverse_result.scalar_one_or_none()
if not reverse_conn:
raise HTTPException(status_code=404, detail="Reverse connection not found")
# Merge validated overrides — only SHAREABLE_FIELDS keys
existing = dict(reverse_conn.sharing_overrides or {})
update_data = body.model_dump(exclude_unset=True)
for key, value in update_data.items():
if key in SHAREABLE_FIELDS:
if value is None:
existing.pop(key, None)
else:
existing[key] = value
reverse_conn.sharing_overrides = existing if existing else None
await db.commit()
return {"message": "Sharing overrides updated"}
# ── DELETE /{id} ────────────────────────────────────────────────────
@router.delete("/{connection_id}", status_code=204)
async def remove_connection(
request: Request,
connection_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Remove a connection. Removes BOTH UserConnection rows.
Detaches BOTH Person records (sets linked_user_id=null, is_umbral_contact=false).
Silent — no notification sent.
"""
# Get our connection
result = await db.execute(
select(UserConnection)
.where(
UserConnection.id == connection_id,
UserConnection.user_id == current_user.id,
)
)
conn = result.scalar_one_or_none()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
counterpart_id = conn.connected_user_id
# Find reverse connection
reverse_result = await db.execute(
select(UserConnection).where(
UserConnection.user_id == counterpart_id,
UserConnection.connected_user_id == current_user.id,
)
)
reverse_conn = reverse_result.scalar_one_or_none()
# Detach Person records
if conn.person_id:
person_result = await db.execute(select(Person).where(Person.id == conn.person_id))
person = person_result.scalar_one_or_none()
if person:
await detach_umbral_contact(person)
if reverse_conn and reverse_conn.person_id:
person_result = await db.execute(select(Person).where(Person.id == reverse_conn.person_id))
person = person_result.scalar_one_or_none()
if person:
await detach_umbral_contact(person)
# Delete both connections
await db.delete(conn)
if reverse_conn:
await db.delete(reverse_conn)
await log_audit_event(
db,
action="connection.removed",
actor_id=current_user.id,
target_id=counterpart_id,
detail={"connection_id": connection_id},
ip=get_client_ip(request),
)
await db.commit()
return None