- C-01: Wrap accept flow flush/commit in IntegrityError handling (409) - C-02: Use separate remote_timestamps dict instead of pop() on shared profile - W-01: Add birthday sync in Link conversion path (existing person → umbral) - W-02: Add None guard on max(updated_at) comparison in get_person Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
808 lines
31 KiB
Python
808 lines
31 KiB
Python
"""
|
|
Connection router — search, request, respond, manage connections.
|
|
|
|
Security:
|
|
- Timing-safe search (50ms sleep floor)
|
|
- Per-receiver pending request cap (5 within 10 minutes)
|
|
- Atomic accept via UPDATE...WHERE status='pending' RETURNING *
|
|
- All endpoints scoped by current_user.id
|
|
- Audit logging for all connection events
|
|
"""
|
|
import asyncio
|
|
from datetime import date as date_type, datetime, timedelta, timezone
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Path, Query, Request
|
|
from sqlalchemy import delete, select, func, and_, update
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.database import get_db
|
|
from app.models.connection_request import ConnectionRequest
|
|
from app.models.notification import Notification
|
|
from app.models.person import Person
|
|
from app.models.settings import Settings
|
|
from app.models.user import User
|
|
from app.models.user_connection import UserConnection
|
|
from app.routers.auth import get_current_user
|
|
from app.schemas.connection import (
|
|
CancelResponse,
|
|
ConnectionRequestResponse,
|
|
ConnectionResponse,
|
|
RespondAcceptResponse,
|
|
RespondRejectResponse,
|
|
RespondRequest,
|
|
SendConnectionRequest,
|
|
SharingOverrideUpdate,
|
|
UmbralSearchRequest,
|
|
UmbralSearchResponse,
|
|
)
|
|
from app.services.audit import get_client_ip, log_audit_event
|
|
from app.services.connection import (
|
|
SHAREABLE_FIELDS,
|
|
create_person_from_connection,
|
|
detach_umbral_contact,
|
|
extract_ntfy_config,
|
|
resolve_shared_profile,
|
|
send_connection_ntfy,
|
|
)
|
|
from app.services.notification import create_notification
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────────────
|
|
|
|
async def _get_settings_for_user(db: AsyncSession, user_id: int) -> Settings | None:
|
|
result = await db.execute(select(Settings).where(Settings.user_id == user_id))
|
|
return result.scalar_one_or_none()
|
|
|
|
|
|
def _build_request_response(
|
|
req: ConnectionRequest,
|
|
sender: User,
|
|
sender_settings: Settings | None,
|
|
receiver: User,
|
|
receiver_settings: Settings | None,
|
|
) -> ConnectionRequestResponse:
|
|
return ConnectionRequestResponse(
|
|
id=req.id,
|
|
sender_umbral_name=sender.umbral_name,
|
|
sender_preferred_name=sender_settings.preferred_name if sender_settings else None,
|
|
receiver_umbral_name=receiver.umbral_name,
|
|
receiver_preferred_name=receiver_settings.preferred_name if receiver_settings else None,
|
|
status=req.status,
|
|
created_at=req.created_at,
|
|
)
|
|
|
|
|
|
# ── POST /search ────────────────────────────────────────────────────
|
|
|
|
@router.post("/search", response_model=UmbralSearchResponse)
|
|
async def search_user(
|
|
body: UmbralSearchRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
Timing-safe user search. Always queries by umbral_name alone,
|
|
then checks accept_connections + is_active in Python.
|
|
Generic "not found" for non-existent, opted-out, AND inactive users.
|
|
50ms sleep floor to eliminate timing side-channel.
|
|
"""
|
|
# Always sleep to prevent timing attacks
|
|
await asyncio.sleep(0.05)
|
|
|
|
# Sender must have accept_connections enabled to search
|
|
sender_settings = await _get_settings_for_user(db, current_user.id)
|
|
if not sender_settings or not sender_settings.accept_connections:
|
|
return UmbralSearchResponse(found=False)
|
|
|
|
# Don't find yourself
|
|
if body.umbral_name == current_user.umbral_name:
|
|
return UmbralSearchResponse(found=False)
|
|
|
|
result = await db.execute(
|
|
select(User).where(User.umbral_name == body.umbral_name)
|
|
)
|
|
target = result.scalar_one_or_none()
|
|
|
|
if not target or not target.is_active:
|
|
return UmbralSearchResponse(found=False)
|
|
|
|
# Check if they accept connections
|
|
target_settings = await _get_settings_for_user(db, target.id)
|
|
if not target_settings or not target_settings.accept_connections:
|
|
return UmbralSearchResponse(found=False)
|
|
|
|
return UmbralSearchResponse(found=True)
|
|
|
|
|
|
# ── POST /request ───────────────────────────────────────────────────
|
|
|
|
@router.post("/request", response_model=ConnectionRequestResponse, status_code=201)
|
|
async def send_connection_request(
|
|
body: SendConnectionRequest,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Send a connection request to another user."""
|
|
# Resolve target
|
|
result = await db.execute(
|
|
select(User).where(User.umbral_name == body.umbral_name)
|
|
)
|
|
target = result.scalar_one_or_none()
|
|
|
|
if not target or not target.is_active:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Self-request guard
|
|
if target.id == current_user.id:
|
|
raise HTTPException(status_code=400, detail="Cannot send a connection request to yourself")
|
|
|
|
# Sender must have accept_connections enabled to participate
|
|
sender_settings = await _get_settings_for_user(db, current_user.id)
|
|
if not sender_settings or not sender_settings.accept_connections:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="You must enable 'Accept Connections' in your settings before sending requests",
|
|
)
|
|
|
|
# Check accept_connections on target
|
|
target_settings = await _get_settings_for_user(db, target.id)
|
|
if not target_settings or not target_settings.accept_connections:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Check existing connection
|
|
existing_conn = await db.execute(
|
|
select(UserConnection).where(
|
|
UserConnection.user_id == current_user.id,
|
|
UserConnection.connected_user_id == target.id,
|
|
)
|
|
)
|
|
if existing_conn.scalar_one_or_none():
|
|
raise HTTPException(status_code=409, detail="Already connected")
|
|
|
|
# Check pending request in either direction
|
|
existing_req = await db.execute(
|
|
select(ConnectionRequest).where(
|
|
and_(
|
|
ConnectionRequest.status == "pending",
|
|
(
|
|
(ConnectionRequest.sender_id == current_user.id) & (ConnectionRequest.receiver_id == target.id)
|
|
) | (
|
|
(ConnectionRequest.sender_id == target.id) & (ConnectionRequest.receiver_id == current_user.id)
|
|
),
|
|
)
|
|
)
|
|
)
|
|
if existing_req.scalar_one_or_none():
|
|
raise HTTPException(status_code=409, detail="A pending request already exists")
|
|
|
|
# Per-receiver cap: max 5 pending requests within 10 minutes
|
|
ten_min_ago = datetime.now() - timedelta(minutes=10)
|
|
pending_count = await db.scalar(
|
|
select(func.count())
|
|
.select_from(ConnectionRequest)
|
|
.where(
|
|
ConnectionRequest.receiver_id == target.id,
|
|
ConnectionRequest.status == "pending",
|
|
ConnectionRequest.created_at >= ten_min_ago,
|
|
)
|
|
) or 0
|
|
if pending_count >= 5:
|
|
raise HTTPException(status_code=429, detail="Too many pending requests for this user")
|
|
|
|
# Validate person_id if provided (link existing standard contact)
|
|
link_person_id = None
|
|
if body.person_id is not None:
|
|
person_result = await db.execute(
|
|
select(Person).where(Person.id == body.person_id, Person.user_id == current_user.id)
|
|
)
|
|
link_person = person_result.scalar_one_or_none()
|
|
if not link_person:
|
|
raise HTTPException(status_code=400, detail="Person not found or not owned by you")
|
|
if link_person.is_umbral_contact:
|
|
raise HTTPException(status_code=400, detail="Person is already an umbral contact")
|
|
link_person_id = body.person_id
|
|
|
|
# Create the request (IntegrityError guard for TOCTOU race on partial unique index)
|
|
conn_request = ConnectionRequest(
|
|
sender_id=current_user.id,
|
|
receiver_id=target.id,
|
|
person_id=link_person_id,
|
|
)
|
|
db.add(conn_request)
|
|
try:
|
|
await db.flush() # populate conn_request.id for source_id
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
raise HTTPException(status_code=409, detail="A pending request already exists")
|
|
|
|
# Create in-app notification for receiver (sender_settings already fetched above)
|
|
sender_display = (sender_settings.preferred_name if sender_settings else None) or current_user.umbral_name
|
|
|
|
await create_notification(
|
|
db,
|
|
user_id=target.id,
|
|
type="connection_request",
|
|
title="New Connection Request",
|
|
message=f"{sender_display} wants to connect with you",
|
|
data={"sender_umbral_name": current_user.umbral_name},
|
|
source_type="connection_request",
|
|
source_id=conn_request.id,
|
|
)
|
|
|
|
await log_audit_event(
|
|
db,
|
|
action="connection.request_sent",
|
|
actor_id=current_user.id,
|
|
target_id=target.id,
|
|
detail={"receiver_umbral_name": target.umbral_name},
|
|
ip=get_client_ip(request),
|
|
)
|
|
|
|
# Extract ntfy config before commit (avoids detached SA object in background task)
|
|
target_ntfy = extract_ntfy_config(target_settings) if target_settings else None
|
|
|
|
await db.commit()
|
|
await db.refresh(conn_request)
|
|
|
|
# ntfy push in background (non-blocking)
|
|
background_tasks.add_task(
|
|
send_connection_ntfy,
|
|
target_ntfy,
|
|
sender_display,
|
|
"request_received",
|
|
)
|
|
|
|
return _build_request_response(conn_request, current_user, sender_settings, target, target_settings)
|
|
|
|
|
|
# ── GET /requests/incoming ──────────────────────────────────────────
|
|
|
|
@router.get("/requests/incoming", response_model=list[ConnectionRequestResponse])
|
|
async def get_incoming_requests(
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(20, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""List pending connection requests received by the current user."""
|
|
offset = (page - 1) * per_page
|
|
result = await db.execute(
|
|
select(ConnectionRequest)
|
|
.where(
|
|
ConnectionRequest.receiver_id == current_user.id,
|
|
ConnectionRequest.status == "pending",
|
|
)
|
|
.options(selectinload(ConnectionRequest.sender))
|
|
.order_by(ConnectionRequest.created_at.desc())
|
|
.offset(offset)
|
|
.limit(per_page)
|
|
)
|
|
requests = result.scalars().all()
|
|
|
|
# Fetch current user's settings once, batch-fetch sender settings
|
|
receiver_settings = await _get_settings_for_user(db, current_user.id)
|
|
sender_ids = [req.sender_id for req in requests]
|
|
if sender_ids:
|
|
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(sender_ids)))
|
|
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
|
|
else:
|
|
settings_by_user = {}
|
|
|
|
responses = []
|
|
for req in requests:
|
|
sender_settings = settings_by_user.get(req.sender_id)
|
|
responses.append(_build_request_response(req, req.sender, sender_settings, current_user, receiver_settings))
|
|
|
|
return responses
|
|
|
|
|
|
# ── GET /requests/outgoing ──────────────────────────────────────────
|
|
|
|
@router.get("/requests/outgoing", response_model=list[ConnectionRequestResponse])
|
|
async def get_outgoing_requests(
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(20, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""List pending connection requests sent by the current user."""
|
|
offset = (page - 1) * per_page
|
|
result = await db.execute(
|
|
select(ConnectionRequest)
|
|
.where(
|
|
ConnectionRequest.sender_id == current_user.id,
|
|
ConnectionRequest.status == "pending",
|
|
)
|
|
.options(selectinload(ConnectionRequest.receiver))
|
|
.order_by(ConnectionRequest.created_at.desc())
|
|
.offset(offset)
|
|
.limit(per_page)
|
|
)
|
|
requests = result.scalars().all()
|
|
|
|
# Fetch current user's settings once, batch-fetch receiver settings
|
|
sender_settings = await _get_settings_for_user(db, current_user.id)
|
|
receiver_ids = [req.receiver_id for req in requests]
|
|
if receiver_ids:
|
|
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(receiver_ids)))
|
|
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
|
|
else:
|
|
settings_by_user = {}
|
|
|
|
responses = []
|
|
for req in requests:
|
|
receiver_settings = settings_by_user.get(req.receiver_id)
|
|
responses.append(_build_request_response(req, current_user, sender_settings, req.receiver, receiver_settings))
|
|
|
|
return responses
|
|
|
|
|
|
# ── PUT /requests/{id}/respond ──────────────────────────────────────
|
|
|
|
@router.put("/requests/{request_id}/respond", response_model=RespondAcceptResponse | RespondRejectResponse)
|
|
async def respond_to_request(
|
|
body: RespondRequest,
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
request_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Accept or reject a connection request. Atomic via UPDATE...WHERE status='pending'."""
|
|
now = datetime.now()
|
|
|
|
# Atomic update — only succeeds if status is still 'pending' and receiver is current user
|
|
result = await db.execute(
|
|
update(ConnectionRequest)
|
|
.where(
|
|
ConnectionRequest.id == request_id,
|
|
ConnectionRequest.receiver_id == current_user.id,
|
|
ConnectionRequest.status == "pending",
|
|
)
|
|
.values(status=body.action + "ed", resolved_at=now)
|
|
.returning(
|
|
ConnectionRequest.id,
|
|
ConnectionRequest.sender_id,
|
|
ConnectionRequest.receiver_id,
|
|
ConnectionRequest.person_id,
|
|
)
|
|
)
|
|
row = result.first()
|
|
if not row:
|
|
raise HTTPException(status_code=409, detail="Request not found or already resolved")
|
|
|
|
sender_id = row.sender_id
|
|
request_person_id = row.person_id
|
|
|
|
if body.action == "accept":
|
|
# Verify sender is still active
|
|
sender_result = await db.execute(select(User).where(User.id == sender_id))
|
|
sender = sender_result.scalar_one_or_none()
|
|
if not sender or not sender.is_active:
|
|
# Revert to rejected
|
|
await db.execute(
|
|
update(ConnectionRequest)
|
|
.where(ConnectionRequest.id == request_id)
|
|
.values(status="rejected")
|
|
)
|
|
await db.commit()
|
|
raise HTTPException(status_code=409, detail="Sender account is no longer active")
|
|
|
|
# Get settings for both users
|
|
sender_settings = await _get_settings_for_user(db, sender_id)
|
|
receiver_settings = await _get_settings_for_user(db, current_user.id)
|
|
|
|
# Resolve shared profiles for both directions
|
|
sender_shared = resolve_shared_profile(sender, sender_settings, None) if sender_settings else {}
|
|
receiver_shared = resolve_shared_profile(current_user, receiver_settings, None) if receiver_settings else {}
|
|
|
|
# Create Person records for both users
|
|
person_for_receiver = create_person_from_connection(
|
|
current_user.id, sender, sender_settings, sender_shared
|
|
)
|
|
db.add(person_for_receiver)
|
|
|
|
# Sender side: reuse existing Person if person_id was provided on the request
|
|
person_for_sender = None
|
|
if request_person_id:
|
|
existing_result = await db.execute(
|
|
select(Person).where(Person.id == request_person_id)
|
|
)
|
|
existing_person = existing_result.scalar_one_or_none()
|
|
# Re-validate at accept time: ownership must match sender,
|
|
# and must not already be umbral (prevents double-conversion races)
|
|
if existing_person and existing_person.user_id == sender_id and not existing_person.is_umbral_contact:
|
|
# Convert existing standard contact to umbral
|
|
existing_person.linked_user_id = current_user.id
|
|
existing_person.is_umbral_contact = True
|
|
existing_person.category = "Umbral"
|
|
# Update from shared profile
|
|
first_name = receiver_shared.get("first_name") or receiver_shared.get("preferred_name") or current_user.umbral_name
|
|
last_name = receiver_shared.get("last_name")
|
|
existing_person.first_name = first_name
|
|
existing_person.last_name = last_name
|
|
existing_person.email = receiver_shared.get("email") or existing_person.email
|
|
existing_person.phone = receiver_shared.get("phone") or existing_person.phone
|
|
existing_person.mobile = receiver_shared.get("mobile") or existing_person.mobile
|
|
existing_person.address = receiver_shared.get("address") or existing_person.address
|
|
existing_person.company = receiver_shared.get("company") or existing_person.company
|
|
existing_person.job_title = receiver_shared.get("job_title") or existing_person.job_title
|
|
# Sync birthday from shared profile
|
|
birthday_str = receiver_shared.get("birthday")
|
|
if birthday_str:
|
|
try:
|
|
existing_person.birthday = date_type.fromisoformat(birthday_str)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
# Recompute display name
|
|
full = ((first_name or '') + ' ' + (last_name or '')).strip()
|
|
existing_person.name = full or current_user.umbral_name
|
|
person_for_sender = existing_person
|
|
|
|
if person_for_sender is None:
|
|
person_for_sender = create_person_from_connection(
|
|
sender_id, current_user, receiver_settings, receiver_shared
|
|
)
|
|
db.add(person_for_sender)
|
|
|
|
try:
|
|
await db.flush() # populate person IDs
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
raise HTTPException(status_code=409, detail="Connection already exists")
|
|
|
|
# Create bidirectional connections
|
|
conn_a = UserConnection(
|
|
user_id=current_user.id,
|
|
connected_user_id=sender_id,
|
|
person_id=person_for_receiver.id,
|
|
)
|
|
conn_b = UserConnection(
|
|
user_id=sender_id,
|
|
connected_user_id=current_user.id,
|
|
person_id=person_for_sender.id,
|
|
)
|
|
db.add(conn_a)
|
|
db.add(conn_b)
|
|
|
|
try:
|
|
await db.flush() # populate conn_a.id for source_id
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
raise HTTPException(status_code=409, detail="Connection already exists")
|
|
|
|
# Notification to sender
|
|
receiver_display = (receiver_settings.preferred_name if receiver_settings else None) or current_user.umbral_name
|
|
await create_notification(
|
|
db,
|
|
user_id=sender_id,
|
|
type="connection_accepted",
|
|
title="Connection Accepted",
|
|
message=f"{receiver_display} accepted your connection request",
|
|
data={"connected_umbral_name": current_user.umbral_name},
|
|
source_type="user_connection",
|
|
source_id=conn_b.id,
|
|
)
|
|
|
|
await log_audit_event(
|
|
db,
|
|
action="connection.accepted",
|
|
actor_id=current_user.id,
|
|
target_id=sender_id,
|
|
detail={"request_id": request_id},
|
|
ip=get_client_ip(request),
|
|
)
|
|
|
|
# Extract ntfy config before commit (avoids detached SA object in background task)
|
|
sender_ntfy = extract_ntfy_config(sender_settings) if sender_settings else None
|
|
|
|
await db.commit()
|
|
|
|
# ntfy push in background
|
|
background_tasks.add_task(
|
|
send_connection_ntfy,
|
|
sender_ntfy,
|
|
receiver_display,
|
|
"request_accepted",
|
|
)
|
|
|
|
return {"message": "Connection accepted", "connection_id": conn_a.id}
|
|
|
|
else:
|
|
# Reject — only create notification for receiver (not sender per plan)
|
|
await log_audit_event(
|
|
db,
|
|
action="connection.rejected",
|
|
actor_id=current_user.id,
|
|
target_id=sender_id,
|
|
detail={"request_id": request_id},
|
|
ip=get_client_ip(request),
|
|
)
|
|
await db.commit()
|
|
return {"message": "Connection request rejected"}
|
|
|
|
|
|
# ── PUT /requests/{id}/cancel ──────────────────────────────────────
|
|
|
|
@router.put("/requests/{request_id}/cancel", response_model=CancelResponse)
|
|
async def cancel_request(
|
|
request: Request,
|
|
request_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Cancel an outgoing connection request. Atomic via UPDATE...WHERE status='pending'."""
|
|
now = datetime.now()
|
|
|
|
# Atomic update — only succeeds if sender is current user and status is still pending
|
|
result = await db.execute(
|
|
update(ConnectionRequest)
|
|
.where(
|
|
ConnectionRequest.id == request_id,
|
|
ConnectionRequest.sender_id == current_user.id,
|
|
ConnectionRequest.status == "pending",
|
|
)
|
|
.values(status="cancelled", resolved_at=now)
|
|
.returning(ConnectionRequest.id, ConnectionRequest.receiver_id)
|
|
)
|
|
row = result.first()
|
|
if not row:
|
|
raise HTTPException(status_code=409, detail="Request not found or already resolved")
|
|
|
|
receiver_id = row.receiver_id
|
|
|
|
# Silent cleanup: remove the notification sent to the receiver
|
|
await db.execute(
|
|
delete(Notification).where(
|
|
Notification.source_type == "connection_request",
|
|
Notification.source_id == request_id,
|
|
Notification.user_id == receiver_id,
|
|
)
|
|
)
|
|
|
|
# Look up receiver umbral_name for audit detail
|
|
receiver_result = await db.execute(select(User.umbral_name).where(User.id == receiver_id))
|
|
receiver_umbral_name = receiver_result.scalar_one_or_none() or "unknown"
|
|
|
|
await log_audit_event(
|
|
db,
|
|
action="connection.request_cancelled",
|
|
actor_id=current_user.id,
|
|
target_id=receiver_id,
|
|
detail={"request_id": request_id, "receiver_umbral_name": receiver_umbral_name},
|
|
ip=get_client_ip(request),
|
|
)
|
|
|
|
await db.commit()
|
|
return {"message": "Connection request cancelled"}
|
|
|
|
|
|
# ── GET / ───────────────────────────────────────────────────────────
|
|
|
|
@router.get("/", response_model=list[ConnectionResponse])
|
|
async def list_connections(
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(50, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""List all connections for the current user."""
|
|
offset = (page - 1) * per_page
|
|
result = await db.execute(
|
|
select(UserConnection)
|
|
.where(UserConnection.user_id == current_user.id)
|
|
.options(selectinload(UserConnection.connected_user))
|
|
.order_by(UserConnection.created_at.desc())
|
|
.offset(offset)
|
|
.limit(per_page)
|
|
)
|
|
connections = result.scalars().all()
|
|
|
|
# Batch-fetch settings for connected users
|
|
connected_ids = [conn.connected_user_id for conn in connections]
|
|
if connected_ids:
|
|
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(connected_ids)))
|
|
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
|
|
else:
|
|
settings_by_user = {}
|
|
|
|
responses = []
|
|
for conn in connections:
|
|
conn_settings = settings_by_user.get(conn.connected_user_id)
|
|
responses.append(ConnectionResponse(
|
|
id=conn.id,
|
|
connected_user_id=conn.connected_user_id,
|
|
connected_umbral_name=conn.connected_user.umbral_name,
|
|
connected_preferred_name=conn_settings.preferred_name if conn_settings else None,
|
|
person_id=conn.person_id,
|
|
created_at=conn.created_at,
|
|
))
|
|
|
|
return responses
|
|
|
|
|
|
# ── GET /{id} ───────────────────────────────────────────────────────
|
|
|
|
@router.get("/{connection_id}", response_model=ConnectionResponse)
|
|
async def get_connection(
|
|
connection_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Get a single connection detail."""
|
|
result = await db.execute(
|
|
select(UserConnection)
|
|
.where(
|
|
UserConnection.id == connection_id,
|
|
UserConnection.user_id == current_user.id,
|
|
)
|
|
.options(selectinload(UserConnection.connected_user))
|
|
)
|
|
conn = result.scalar_one_or_none()
|
|
if not conn:
|
|
raise HTTPException(status_code=404, detail="Connection not found")
|
|
|
|
conn_settings = await _get_settings_for_user(db, conn.connected_user_id)
|
|
return ConnectionResponse(
|
|
id=conn.id,
|
|
connected_user_id=conn.connected_user_id,
|
|
connected_umbral_name=conn.connected_user.umbral_name,
|
|
connected_preferred_name=conn_settings.preferred_name if conn_settings else None,
|
|
person_id=conn.person_id,
|
|
created_at=conn.created_at,
|
|
)
|
|
|
|
|
|
# ── GET /{id}/shared-profile ────────────────────────────────────────
|
|
|
|
@router.get("/{connection_id}/shared-profile")
|
|
async def get_shared_profile(
|
|
connection_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Get the resolved shared profile for a connection."""
|
|
result = await db.execute(
|
|
select(UserConnection)
|
|
.where(
|
|
UserConnection.id == connection_id,
|
|
UserConnection.user_id == current_user.id,
|
|
)
|
|
.options(selectinload(UserConnection.connected_user))
|
|
)
|
|
conn = result.scalar_one_or_none()
|
|
if not conn:
|
|
raise HTTPException(status_code=404, detail="Connection not found")
|
|
|
|
conn_settings = await _get_settings_for_user(db, conn.connected_user_id)
|
|
if not conn_settings:
|
|
return {}
|
|
|
|
return resolve_shared_profile(
|
|
conn.connected_user,
|
|
conn_settings,
|
|
conn.sharing_overrides,
|
|
)
|
|
|
|
|
|
# ── PUT /{id}/sharing-overrides ─────────────────────────────────────
|
|
|
|
@router.put("/{connection_id}/sharing-overrides")
|
|
async def update_sharing_overrides(
|
|
body: SharingOverrideUpdate,
|
|
connection_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Update what YOU share with a specific connection."""
|
|
# Get our connection to know who the counterpart is
|
|
our_conn = await db.execute(
|
|
select(UserConnection).where(
|
|
UserConnection.id == connection_id,
|
|
UserConnection.user_id == current_user.id,
|
|
)
|
|
)
|
|
conn = our_conn.scalar_one_or_none()
|
|
if not conn:
|
|
raise HTTPException(status_code=404, detail="Connection not found")
|
|
|
|
# Find the reverse connection (their row pointing to us)
|
|
reverse_result = await db.execute(
|
|
select(UserConnection).where(
|
|
UserConnection.user_id == conn.connected_user_id,
|
|
UserConnection.connected_user_id == current_user.id,
|
|
)
|
|
)
|
|
reverse_conn = reverse_result.scalar_one_or_none()
|
|
if not reverse_conn:
|
|
raise HTTPException(status_code=404, detail="Reverse connection not found")
|
|
|
|
# Merge validated overrides — only SHAREABLE_FIELDS keys
|
|
existing = dict(reverse_conn.sharing_overrides or {})
|
|
update_data = body.model_dump(exclude_unset=True)
|
|
for key, value in update_data.items():
|
|
if key in SHAREABLE_FIELDS:
|
|
if value is None:
|
|
existing.pop(key, None)
|
|
else:
|
|
existing[key] = value
|
|
|
|
reverse_conn.sharing_overrides = existing if existing else None
|
|
|
|
await db.commit()
|
|
return {"message": "Sharing overrides updated"}
|
|
|
|
|
|
# ── DELETE /{id} ────────────────────────────────────────────────────
|
|
|
|
@router.delete("/{connection_id}", status_code=204)
|
|
async def remove_connection(
|
|
request: Request,
|
|
connection_id: int = Path(ge=1, le=2147483647),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
Remove a connection. Removes BOTH UserConnection rows.
|
|
Detaches BOTH Person records (sets linked_user_id=null, is_umbral_contact=false).
|
|
Silent — no notification sent.
|
|
"""
|
|
# Get our connection
|
|
result = await db.execute(
|
|
select(UserConnection)
|
|
.where(
|
|
UserConnection.id == connection_id,
|
|
UserConnection.user_id == current_user.id,
|
|
)
|
|
)
|
|
conn = result.scalar_one_or_none()
|
|
if not conn:
|
|
raise HTTPException(status_code=404, detail="Connection not found")
|
|
|
|
counterpart_id = conn.connected_user_id
|
|
|
|
# Find reverse connection
|
|
reverse_result = await db.execute(
|
|
select(UserConnection).where(
|
|
UserConnection.user_id == counterpart_id,
|
|
UserConnection.connected_user_id == current_user.id,
|
|
)
|
|
)
|
|
reverse_conn = reverse_result.scalar_one_or_none()
|
|
|
|
# Detach Person records
|
|
if conn.person_id:
|
|
person_result = await db.execute(select(Person).where(Person.id == conn.person_id))
|
|
person = person_result.scalar_one_or_none()
|
|
if person:
|
|
await detach_umbral_contact(person)
|
|
|
|
if reverse_conn and reverse_conn.person_id:
|
|
person_result = await db.execute(select(Person).where(Person.id == reverse_conn.person_id))
|
|
person = person_result.scalar_one_or_none()
|
|
if person:
|
|
await detach_umbral_contact(person)
|
|
|
|
# Delete both connections
|
|
await db.delete(conn)
|
|
if reverse_conn:
|
|
await db.delete(reverse_conn)
|
|
|
|
await log_audit_event(
|
|
db,
|
|
action="connection.removed",
|
|
actor_id=current_user.id,
|
|
target_id=counterpart_id,
|
|
detail={"connection_id": connection_id},
|
|
ip=get_client_ip(request),
|
|
)
|
|
|
|
await db.commit()
|
|
return None
|