Toast accept button captured a stale `respond` reference from the Sonner closure. Use respondRef pattern so clicks always dispatch through the current mutation. Backend respond endpoint now catches unhandled exceptions and returns proper JSON with detail field instead of plain-text 500s. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
828 lines
32 KiB
Python
828 lines
32 KiB
Python
"""
|
|
Connection router — search, request, respond, manage connections.
|
|
|
|
Security:
|
|
- Timing-safe search (50ms sleep floor)
|
|
- Per-receiver pending request cap (5 within 10 minutes)
|
|
- Atomic accept via UPDATE...WHERE status='pending' RETURNING *
|
|
- All endpoints scoped by current_user.id
|
|
- Audit logging for all connection events
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
from datetime import date as date_type, datetime, timedelta, timezone
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Path, Query, Request
|
|
from sqlalchemy import delete, select, func, and_, update
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.database import get_db
|
|
from app.models.connection_request import ConnectionRequest
|
|
from app.models.notification import Notification
|
|
from app.models.person import Person
|
|
from app.models.settings import Settings
|
|
from app.models.user import User
|
|
from app.models.user_connection import UserConnection
|
|
from app.routers.auth import get_current_user
|
|
from app.schemas.connection import (
|
|
CancelResponse,
|
|
ConnectionRequestResponse,
|
|
ConnectionResponse,
|
|
RespondAcceptResponse,
|
|
RespondRejectResponse,
|
|
RespondRequest,
|
|
SendConnectionRequest,
|
|
SharingOverrideUpdate,
|
|
UmbralSearchRequest,
|
|
UmbralSearchResponse,
|
|
)
|
|
from app.services.audit import get_client_ip, log_audit_event
|
|
from app.services.connection import (
|
|
SHAREABLE_FIELDS,
|
|
create_person_from_connection,
|
|
detach_umbral_contact,
|
|
extract_ntfy_config,
|
|
resolve_shared_profile,
|
|
send_connection_ntfy,
|
|
)
|
|
from app.services.notification import create_notification
|
|
|
|
router = APIRouter()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────────────
|
|
|
|
async def _get_settings_for_user(db: AsyncSession, user_id: int) -> Settings | None:
|
|
result = await db.execute(select(Settings).where(Settings.user_id == user_id))
|
|
return result.scalar_one_or_none()
|
|
|
|
|
|
def _build_request_response(
|
|
req: ConnectionRequest,
|
|
sender: User,
|
|
sender_settings: Settings | None,
|
|
receiver: User,
|
|
receiver_settings: Settings | None,
|
|
) -> ConnectionRequestResponse:
|
|
return ConnectionRequestResponse(
|
|
id=req.id,
|
|
sender_umbral_name=sender.umbral_name,
|
|
sender_preferred_name=sender_settings.preferred_name if sender_settings else None,
|
|
receiver_umbral_name=receiver.umbral_name,
|
|
receiver_preferred_name=receiver_settings.preferred_name if receiver_settings else None,
|
|
status=req.status,
|
|
created_at=req.created_at,
|
|
)
|
|
|
|
|
|
# ── POST /search ────────────────────────────────────────────────────
|
|
|
|
@router.post("/search", response_model=UmbralSearchResponse)
|
|
async def search_user(
|
|
body: UmbralSearchRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
Timing-safe user search. Always queries by umbral_name alone,
|
|
then checks accept_connections + is_active in Python.
|
|
Generic "not found" for non-existent, opted-out, AND inactive users.
|
|
50ms sleep floor to eliminate timing side-channel.
|
|
"""
|
|
# Always sleep to prevent timing attacks
|
|
await asyncio.sleep(0.05)
|
|
|
|
# Sender must have accept_connections enabled to search
|
|
sender_settings = await _get_settings_for_user(db, current_user.id)
|
|
if not sender_settings or not sender_settings.accept_connections:
|
|
return UmbralSearchResponse(found=False)
|
|
|
|
# Don't find yourself
|
|
if body.umbral_name == current_user.umbral_name:
|
|
return UmbralSearchResponse(found=False)
|
|
|
|
result = await db.execute(
|
|
select(User).where(User.umbral_name == body.umbral_name)
|
|
)
|
|
target = result.scalar_one_or_none()
|
|
|
|
if not target or not target.is_active:
|
|
return UmbralSearchResponse(found=False)
|
|
|
|
# Check if they accept connections
|
|
target_settings = await _get_settings_for_user(db, target.id)
|
|
if not target_settings or not target_settings.accept_connections:
|
|
return UmbralSearchResponse(found=False)
|
|
|
|
return UmbralSearchResponse(found=True)
|
|
|
|
|
|
# ── POST /request ───────────────────────────────────────────────────
|
|
|
|
@router.post("/request", response_model=ConnectionRequestResponse, status_code=201)
|
|
async def send_connection_request(
|
|
body: SendConnectionRequest,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Send a connection request to another user."""
|
|
# Resolve target
|
|
result = await db.execute(
|
|
select(User).where(User.umbral_name == body.umbral_name)
|
|
)
|
|
target = result.scalar_one_or_none()
|
|
|
|
if not target or not target.is_active:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Self-request guard
|
|
if target.id == current_user.id:
|
|
raise HTTPException(status_code=400, detail="Cannot send a connection request to yourself")
|
|
|
|
# Sender must have accept_connections enabled to participate
|
|
sender_settings = await _get_settings_for_user(db, current_user.id)
|
|
if not sender_settings or not sender_settings.accept_connections:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="You must enable 'Accept Connections' in your settings before sending requests",
|
|
)
|
|
|
|
# Check accept_connections on target
|
|
target_settings = await _get_settings_for_user(db, target.id)
|
|
if not target_settings or not target_settings.accept_connections:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Check existing connection
|
|
existing_conn = await db.execute(
|
|
select(UserConnection).where(
|
|
UserConnection.user_id == current_user.id,
|
|
UserConnection.connected_user_id == target.id,
|
|
)
|
|
)
|
|
if existing_conn.scalar_one_or_none():
|
|
raise HTTPException(status_code=409, detail="Already connected")
|
|
|
|
# Check pending request in either direction
|
|
existing_req = await db.execute(
|
|
select(ConnectionRequest).where(
|
|
and_(
|
|
ConnectionRequest.status == "pending",
|
|
(
|
|
(ConnectionRequest.sender_id == current_user.id) & (ConnectionRequest.receiver_id == target.id)
|
|
) | (
|
|
(ConnectionRequest.sender_id == target.id) & (ConnectionRequest.receiver_id == current_user.id)
|
|
),
|
|
)
|
|
)
|
|
)
|
|
if existing_req.scalar_one_or_none():
|
|
raise HTTPException(status_code=409, detail="A pending request already exists")
|
|
|
|
# Per-receiver cap: max 5 pending requests within 10 minutes
|
|
ten_min_ago = datetime.now() - timedelta(minutes=10)
|
|
pending_count = await db.scalar(
|
|
select(func.count())
|
|
.select_from(ConnectionRequest)
|
|
.where(
|
|
ConnectionRequest.receiver_id == target.id,
|
|
ConnectionRequest.status == "pending",
|
|
ConnectionRequest.created_at >= ten_min_ago,
|
|
)
|
|
) or 0
|
|
if pending_count >= 5:
|
|
raise HTTPException(status_code=429, detail="Too many pending requests for this user")
|
|
|
|
# Validate person_id if provided (link existing standard contact)
|
|
link_person_id = None
|
|
if body.person_id is not None:
|
|
person_result = await db.execute(
|
|
select(Person).where(Person.id == body.person_id, Person.user_id == current_user.id)
|
|
)
|
|
link_person = person_result.scalar_one_or_none()
|
|
if not link_person:
|
|
raise HTTPException(status_code=400, detail="Person not found or not owned by you")
|
|
if link_person.is_umbral_contact:
|
|
raise HTTPException(status_code=400, detail="Person is already an umbral contact")
|
|
link_person_id = body.person_id
|
|
|
|
# Create the request (IntegrityError guard for TOCTOU race on partial unique index)
|
|
conn_request = ConnectionRequest(
|
|
sender_id=current_user.id,
|
|
receiver_id=target.id,
|
|
person_id=link_person_id,
|
|
)
|
|
db.add(conn_request)
|
|
try:
|
|
await db.flush() # populate conn_request.id for source_id
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
raise HTTPException(status_code=409, detail="A pending request already exists")
|
|
|
|
# Create in-app notification for receiver (sender_settings already fetched above)
|
|
sender_display = (sender_settings.preferred_name if sender_settings else None) or current_user.umbral_name
|
|
|
|
await create_notification(
|
|
db,
|
|
user_id=target.id,
|
|
type="connection_request",
|
|
title="New Connection Request",
|
|
message=f"{sender_display} wants to connect with you",
|
|
data={"sender_umbral_name": current_user.umbral_name},
|
|
source_type="connection_request",
|
|
source_id=conn_request.id,
|
|
)
|
|
|
|
await log_audit_event(
|
|
db,
|
|
action="connection.request_sent",
|
|
actor_id=current_user.id,
|
|
target_id=target.id,
|
|
detail={"receiver_umbral_name": target.umbral_name},
|
|
ip=get_client_ip(request),
|
|
)
|
|
|
|
# Extract ntfy config before commit (avoids detached SA object in background task)
|
|
target_ntfy = extract_ntfy_config(target_settings) if target_settings else None
|
|
|
|
await db.commit()
|
|
await db.refresh(conn_request)
|
|
|
|
# ntfy push in background (non-blocking)
|
|
background_tasks.add_task(
|
|
send_connection_ntfy,
|
|
target_ntfy,
|
|
sender_display,
|
|
"request_received",
|
|
)
|
|
|
|
return _build_request_response(conn_request, current_user, sender_settings, target, target_settings)
|
|
|
|
|
|
# ── GET /requests/incoming ──────────────────────────────────────────
|
|
|
|
@router.get("/requests/incoming", response_model=list[ConnectionRequestResponse])
|
|
async def get_incoming_requests(
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(20, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""List pending connection requests received by the current user."""
|
|
offset = (page - 1) * per_page
|
|
result = await db.execute(
|
|
select(ConnectionRequest)
|
|
.where(
|
|
ConnectionRequest.receiver_id == current_user.id,
|
|
ConnectionRequest.status == "pending",
|
|
)
|
|
.options(selectinload(ConnectionRequest.sender))
|
|
.order_by(ConnectionRequest.created_at.desc())
|
|
.offset(offset)
|
|
.limit(per_page)
|
|
)
|
|
requests = result.scalars().all()
|
|
|
|
# Fetch current user's settings once, batch-fetch sender settings
|
|
receiver_settings = await _get_settings_for_user(db, current_user.id)
|
|
sender_ids = [req.sender_id for req in requests]
|
|
if sender_ids:
|
|
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(sender_ids)))
|
|
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
|
|
else:
|
|
settings_by_user = {}
|
|
|
|
responses = []
|
|
for req in requests:
|
|
sender_settings = settings_by_user.get(req.sender_id)
|
|
responses.append(_build_request_response(req, req.sender, sender_settings, current_user, receiver_settings))
|
|
|
|
return responses
|
|
|
|
|
|
# ── GET /requests/outgoing ──────────────────────────────────────────
|
|
|
|
@router.get("/requests/outgoing", response_model=list[ConnectionRequestResponse])
|
|
async def get_outgoing_requests(
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(20, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""List pending connection requests sent by the current user."""
|
|
offset = (page - 1) * per_page
|
|
result = await db.execute(
|
|
select(ConnectionRequest)
|
|
.where(
|
|
ConnectionRequest.sender_id == current_user.id,
|
|
ConnectionRequest.status == "pending",
|
|
)
|
|
.options(selectinload(ConnectionRequest.receiver))
|
|
.order_by(ConnectionRequest.created_at.desc())
|
|
.offset(offset)
|
|
.limit(per_page)
|
|
)
|
|
requests = result.scalars().all()
|
|
|
|
# Fetch current user's settings once, batch-fetch receiver settings
|
|
sender_settings = await _get_settings_for_user(db, current_user.id)
|
|
receiver_ids = [req.receiver_id for req in requests]
|
|
if receiver_ids:
|
|
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(receiver_ids)))
|
|
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
|
|
else:
|
|
settings_by_user = {}
|
|
|
|
responses = []
|
|
for req in requests:
|
|
receiver_settings = settings_by_user.get(req.receiver_id)
|
|
responses.append(_build_request_response(req, current_user, sender_settings, req.receiver, receiver_settings))
|
|
|
|
return responses
|
|
|
|
|
|
# ── PUT /requests/{id}/respond ──────────────────────────────────────
|
|
|
|
@router.put("/requests/{request_id}/respond", response_model=RespondAcceptResponse | RespondRejectResponse)
|
|
async def respond_to_request(
|
|
body: RespondRequest,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
request_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Accept or reject a connection request. Atomic via UPDATE...WHERE status='pending'."""
|
|
try:
|
|
return await _respond_to_request_inner(body, request, background_tasks, request_id, db, current_user)
|
|
except HTTPException:
|
|
raise
|
|
except Exception:
|
|
# get_db middleware auto-rollbacks on unhandled exceptions
|
|
logger.exception("Unhandled error in respond_to_request (request_id=%s, user=%s)", request_id, current_user.id)
|
|
raise HTTPException(status_code=500, detail=f"Internal server error while processing connection response (request {request_id})")
|
|
|
|
|
|
async def _respond_to_request_inner(
|
|
body: RespondRequest,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
request_id: int,
|
|
db: AsyncSession,
|
|
current_user: User,
|
|
) -> RespondAcceptResponse | RespondRejectResponse:
|
|
now = datetime.now()
|
|
|
|
# Atomic update — only succeeds if status is still 'pending' and receiver is current user
|
|
result = await db.execute(
|
|
update(ConnectionRequest)
|
|
.where(
|
|
ConnectionRequest.id == request_id,
|
|
ConnectionRequest.receiver_id == current_user.id,
|
|
ConnectionRequest.status == "pending",
|
|
)
|
|
.values(status=body.action + "ed", resolved_at=now)
|
|
.returning(
|
|
ConnectionRequest.id,
|
|
ConnectionRequest.sender_id,
|
|
ConnectionRequest.receiver_id,
|
|
ConnectionRequest.person_id,
|
|
)
|
|
)
|
|
row = result.first()
|
|
if not row:
|
|
raise HTTPException(status_code=409, detail="Request not found or already resolved")
|
|
|
|
sender_id = row.sender_id
|
|
request_person_id = row.person_id
|
|
|
|
if body.action == "accept":
|
|
# Verify sender is still active
|
|
sender_result = await db.execute(select(User).where(User.id == sender_id))
|
|
sender = sender_result.scalar_one_or_none()
|
|
if not sender or not sender.is_active:
|
|
# Revert to rejected
|
|
await db.execute(
|
|
update(ConnectionRequest)
|
|
.where(ConnectionRequest.id == request_id)
|
|
.values(status="rejected")
|
|
)
|
|
await db.commit()
|
|
raise HTTPException(status_code=409, detail="Sender account is no longer active")
|
|
|
|
# Get settings for both users
|
|
sender_settings = await _get_settings_for_user(db, sender_id)
|
|
receiver_settings = await _get_settings_for_user(db, current_user.id)
|
|
|
|
# Resolve shared profiles for both directions
|
|
sender_shared = resolve_shared_profile(sender, sender_settings, None) if sender_settings else {}
|
|
receiver_shared = resolve_shared_profile(current_user, receiver_settings, None) if receiver_settings else {}
|
|
|
|
# Create Person records for both users
|
|
person_for_receiver = create_person_from_connection(
|
|
current_user.id, sender, sender_settings, sender_shared
|
|
)
|
|
db.add(person_for_receiver)
|
|
|
|
# Sender side: reuse existing Person if person_id was provided on the request
|
|
person_for_sender = None
|
|
if request_person_id:
|
|
existing_result = await db.execute(
|
|
select(Person).where(Person.id == request_person_id)
|
|
)
|
|
existing_person = existing_result.scalar_one_or_none()
|
|
# Re-validate at accept time: ownership must match sender,
|
|
# and must not already be umbral (prevents double-conversion races)
|
|
if existing_person and existing_person.user_id == sender_id and not existing_person.is_umbral_contact:
|
|
# Convert existing standard contact to umbral
|
|
existing_person.linked_user_id = current_user.id
|
|
existing_person.is_umbral_contact = True
|
|
existing_person.category = "Umbral"
|
|
# Update from shared profile
|
|
first_name = receiver_shared.get("first_name") or receiver_shared.get("preferred_name") or current_user.umbral_name
|
|
last_name = receiver_shared.get("last_name")
|
|
existing_person.first_name = first_name
|
|
existing_person.last_name = last_name
|
|
existing_person.email = receiver_shared.get("email") or existing_person.email
|
|
existing_person.phone = receiver_shared.get("phone") or existing_person.phone
|
|
existing_person.mobile = receiver_shared.get("mobile") or existing_person.mobile
|
|
existing_person.address = receiver_shared.get("address") or existing_person.address
|
|
existing_person.company = receiver_shared.get("company") or existing_person.company
|
|
existing_person.job_title = receiver_shared.get("job_title") or existing_person.job_title
|
|
# Sync birthday from shared profile
|
|
birthday_str = receiver_shared.get("birthday")
|
|
if birthday_str:
|
|
try:
|
|
existing_person.birthday = date_type.fromisoformat(birthday_str)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
# Recompute display name
|
|
full = ((first_name or '') + ' ' + (last_name or '')).strip()
|
|
existing_person.name = full or current_user.umbral_name
|
|
person_for_sender = existing_person
|
|
|
|
if person_for_sender is None:
|
|
person_for_sender = create_person_from_connection(
|
|
sender_id, current_user, receiver_settings, receiver_shared
|
|
)
|
|
db.add(person_for_sender)
|
|
|
|
try:
|
|
await db.flush() # populate person IDs
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
raise HTTPException(status_code=409, detail="Connection already exists")
|
|
|
|
# Create bidirectional connections
|
|
conn_a = UserConnection(
|
|
user_id=current_user.id,
|
|
connected_user_id=sender_id,
|
|
person_id=person_for_receiver.id,
|
|
)
|
|
conn_b = UserConnection(
|
|
user_id=sender_id,
|
|
connected_user_id=current_user.id,
|
|
person_id=person_for_sender.id,
|
|
)
|
|
db.add(conn_a)
|
|
db.add(conn_b)
|
|
|
|
try:
|
|
await db.flush() # populate conn_a.id for source_id
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
raise HTTPException(status_code=409, detail="Connection already exists")
|
|
|
|
# Notification to sender
|
|
receiver_display = (receiver_settings.preferred_name if receiver_settings else None) or current_user.umbral_name
|
|
await create_notification(
|
|
db,
|
|
user_id=sender_id,
|
|
type="connection_accepted",
|
|
title="Connection Accepted",
|
|
message=f"{receiver_display} accepted your connection request",
|
|
data={"connected_umbral_name": current_user.umbral_name},
|
|
source_type="user_connection",
|
|
source_id=conn_b.id,
|
|
)
|
|
|
|
await log_audit_event(
|
|
db,
|
|
action="connection.accepted",
|
|
actor_id=current_user.id,
|
|
target_id=sender_id,
|
|
detail={"request_id": request_id},
|
|
ip=get_client_ip(request),
|
|
)
|
|
|
|
# Extract ntfy config before commit (avoids detached SA object in background task)
|
|
sender_ntfy = extract_ntfy_config(sender_settings) if sender_settings else None
|
|
|
|
await db.commit()
|
|
|
|
# ntfy push in background
|
|
background_tasks.add_task(
|
|
send_connection_ntfy,
|
|
sender_ntfy,
|
|
receiver_display,
|
|
"request_accepted",
|
|
)
|
|
|
|
return {"message": "Connection accepted", "connection_id": conn_a.id}
|
|
|
|
else:
|
|
# Reject — only create notification for receiver (not sender per plan)
|
|
await log_audit_event(
|
|
db,
|
|
action="connection.rejected",
|
|
actor_id=current_user.id,
|
|
target_id=sender_id,
|
|
detail={"request_id": request_id},
|
|
ip=get_client_ip(request),
|
|
)
|
|
await db.commit()
|
|
return {"message": "Connection request rejected"}
|
|
|
|
|
|
# ── PUT /requests/{id}/cancel ──────────────────────────────────────
|
|
|
|
@router.put("/requests/{request_id}/cancel", response_model=CancelResponse)
|
|
async def cancel_request(
|
|
request: Request,
|
|
request_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Cancel an outgoing connection request. Atomic via UPDATE...WHERE status='pending'."""
|
|
now = datetime.now()
|
|
|
|
# Atomic update — only succeeds if sender is current user and status is still pending
|
|
result = await db.execute(
|
|
update(ConnectionRequest)
|
|
.where(
|
|
ConnectionRequest.id == request_id,
|
|
ConnectionRequest.sender_id == current_user.id,
|
|
ConnectionRequest.status == "pending",
|
|
)
|
|
.values(status="cancelled", resolved_at=now)
|
|
.returning(ConnectionRequest.id, ConnectionRequest.receiver_id)
|
|
)
|
|
row = result.first()
|
|
if not row:
|
|
raise HTTPException(status_code=409, detail="Request not found or already resolved")
|
|
|
|
receiver_id = row.receiver_id
|
|
|
|
# Silent cleanup: remove the notification sent to the receiver
|
|
await db.execute(
|
|
delete(Notification).where(
|
|
Notification.source_type == "connection_request",
|
|
Notification.source_id == request_id,
|
|
Notification.user_id == receiver_id,
|
|
)
|
|
)
|
|
|
|
# Look up receiver umbral_name for audit detail
|
|
receiver_result = await db.execute(select(User.umbral_name).where(User.id == receiver_id))
|
|
receiver_umbral_name = receiver_result.scalar_one_or_none() or "unknown"
|
|
|
|
await log_audit_event(
|
|
db,
|
|
action="connection.request_cancelled",
|
|
actor_id=current_user.id,
|
|
target_id=receiver_id,
|
|
detail={"request_id": request_id, "receiver_umbral_name": receiver_umbral_name},
|
|
ip=get_client_ip(request),
|
|
)
|
|
|
|
await db.commit()
|
|
return {"message": "Connection request cancelled"}
|
|
|
|
|
|
# ── GET / ───────────────────────────────────────────────────────────
|
|
|
|
@router.get("/", response_model=list[ConnectionResponse])
|
|
async def list_connections(
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(50, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""List all connections for the current user."""
|
|
offset = (page - 1) * per_page
|
|
result = await db.execute(
|
|
select(UserConnection)
|
|
.where(UserConnection.user_id == current_user.id)
|
|
.options(selectinload(UserConnection.connected_user))
|
|
.order_by(UserConnection.created_at.desc())
|
|
.offset(offset)
|
|
.limit(per_page)
|
|
)
|
|
connections = result.scalars().all()
|
|
|
|
# Batch-fetch settings for connected users
|
|
connected_ids = [conn.connected_user_id for conn in connections]
|
|
if connected_ids:
|
|
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(connected_ids)))
|
|
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
|
|
else:
|
|
settings_by_user = {}
|
|
|
|
responses = []
|
|
for conn in connections:
|
|
conn_settings = settings_by_user.get(conn.connected_user_id)
|
|
responses.append(ConnectionResponse(
|
|
id=conn.id,
|
|
connected_user_id=conn.connected_user_id,
|
|
connected_umbral_name=conn.connected_user.umbral_name,
|
|
connected_preferred_name=conn_settings.preferred_name if conn_settings else None,
|
|
person_id=conn.person_id,
|
|
created_at=conn.created_at,
|
|
))
|
|
|
|
return responses
|
|
|
|
|
|
# ── GET /{id} ───────────────────────────────────────────────────────
|
|
|
|
@router.get("/{connection_id}", response_model=ConnectionResponse)
|
|
async def get_connection(
|
|
connection_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Get a single connection detail."""
|
|
result = await db.execute(
|
|
select(UserConnection)
|
|
.where(
|
|
UserConnection.id == connection_id,
|
|
UserConnection.user_id == current_user.id,
|
|
)
|
|
.options(selectinload(UserConnection.connected_user))
|
|
)
|
|
conn = result.scalar_one_or_none()
|
|
if not conn:
|
|
raise HTTPException(status_code=404, detail="Connection not found")
|
|
|
|
conn_settings = await _get_settings_for_user(db, conn.connected_user_id)
|
|
return ConnectionResponse(
|
|
id=conn.id,
|
|
connected_user_id=conn.connected_user_id,
|
|
connected_umbral_name=conn.connected_user.umbral_name,
|
|
connected_preferred_name=conn_settings.preferred_name if conn_settings else None,
|
|
person_id=conn.person_id,
|
|
created_at=conn.created_at,
|
|
)
|
|
|
|
|
|
# ── GET /{id}/shared-profile ────────────────────────────────────────
|
|
|
|
@router.get("/{connection_id}/shared-profile")
|
|
async def get_shared_profile(
|
|
connection_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Get the resolved shared profile for a connection."""
|
|
result = await db.execute(
|
|
select(UserConnection)
|
|
.where(
|
|
UserConnection.id == connection_id,
|
|
UserConnection.user_id == current_user.id,
|
|
)
|
|
.options(selectinload(UserConnection.connected_user))
|
|
)
|
|
conn = result.scalar_one_or_none()
|
|
if not conn:
|
|
raise HTTPException(status_code=404, detail="Connection not found")
|
|
|
|
conn_settings = await _get_settings_for_user(db, conn.connected_user_id)
|
|
if not conn_settings:
|
|
return {}
|
|
|
|
return resolve_shared_profile(
|
|
conn.connected_user,
|
|
conn_settings,
|
|
conn.sharing_overrides,
|
|
)
|
|
|
|
|
|
# ── PUT /{id}/sharing-overrides ─────────────────────────────────────
|
|
|
|
@router.put("/{connection_id}/sharing-overrides")
|
|
async def update_sharing_overrides(
|
|
body: SharingOverrideUpdate,
|
|
connection_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Update what YOU share with a specific connection."""
|
|
# Get our connection to know who the counterpart is
|
|
our_conn = await db.execute(
|
|
select(UserConnection).where(
|
|
UserConnection.id == connection_id,
|
|
UserConnection.user_id == current_user.id,
|
|
)
|
|
)
|
|
conn = our_conn.scalar_one_or_none()
|
|
if not conn:
|
|
raise HTTPException(status_code=404, detail="Connection not found")
|
|
|
|
# Find the reverse connection (their row pointing to us)
|
|
reverse_result = await db.execute(
|
|
select(UserConnection).where(
|
|
UserConnection.user_id == conn.connected_user_id,
|
|
UserConnection.connected_user_id == current_user.id,
|
|
)
|
|
)
|
|
reverse_conn = reverse_result.scalar_one_or_none()
|
|
if not reverse_conn:
|
|
raise HTTPException(status_code=404, detail="Reverse connection not found")
|
|
|
|
# Merge validated overrides — only SHAREABLE_FIELDS keys
|
|
existing = dict(reverse_conn.sharing_overrides or {})
|
|
update_data = body.model_dump(exclude_unset=True)
|
|
for key, value in update_data.items():
|
|
if key in SHAREABLE_FIELDS:
|
|
if value is None:
|
|
existing.pop(key, None)
|
|
else:
|
|
existing[key] = value
|
|
|
|
reverse_conn.sharing_overrides = existing if existing else None
|
|
|
|
await db.commit()
|
|
return {"message": "Sharing overrides updated"}
|
|
|
|
|
|
# ── DELETE /{id} ────────────────────────────────────────────────────
|
|
|
|
@router.delete("/{connection_id}", status_code=204)
|
|
async def remove_connection(
|
|
request: Request,
|
|
connection_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
Remove a connection. Removes BOTH UserConnection rows.
|
|
Detaches BOTH Person records (sets linked_user_id=null, is_umbral_contact=false).
|
|
Silent — no notification sent.
|
|
"""
|
|
# Get our connection
|
|
result = await db.execute(
|
|
select(UserConnection)
|
|
.where(
|
|
UserConnection.id == connection_id,
|
|
UserConnection.user_id == current_user.id,
|
|
)
|
|
)
|
|
conn = result.scalar_one_or_none()
|
|
if not conn:
|
|
raise HTTPException(status_code=404, detail="Connection not found")
|
|
|
|
counterpart_id = conn.connected_user_id
|
|
|
|
# Find reverse connection
|
|
reverse_result = await db.execute(
|
|
select(UserConnection).where(
|
|
UserConnection.user_id == counterpart_id,
|
|
UserConnection.connected_user_id == current_user.id,
|
|
)
|
|
)
|
|
reverse_conn = reverse_result.scalar_one_or_none()
|
|
|
|
# Detach Person records
|
|
if conn.person_id:
|
|
person_result = await db.execute(select(Person).where(Person.id == conn.person_id))
|
|
person = person_result.scalar_one_or_none()
|
|
if person:
|
|
await detach_umbral_contact(person)
|
|
|
|
if reverse_conn and reverse_conn.person_id:
|
|
person_result = await db.execute(select(Person).where(Person.id == reverse_conn.person_id))
|
|
person = person_result.scalar_one_or_none()
|
|
if person:
|
|
await detach_umbral_contact(person)
|
|
|
|
# Delete both connections
|
|
await db.delete(conn)
|
|
if reverse_conn:
|
|
await db.delete(reverse_conn)
|
|
|
|
await log_audit_event(
|
|
db,
|
|
action="connection.removed",
|
|
actor_id=current_user.id,
|
|
target_id=counterpart_id,
|
|
detail={"connection_id": connection_id},
|
|
ip=get_client_ip(request),
|
|
)
|
|
|
|
await db.commit()
|
|
return None
|