UMBRA/backend/app/routers/connections.py
Kyle Pope 9bcf5ace5d Fix search cold-cache gate, 429 handling, and datetime.now() violations
ConnectionSearch.tsx:
- Add loading guard for useSettings() — prevents cold cache from showing
  "enable connections" gate when settings haven't loaded yet
- Add 429 rate limit handling in search catch block — shows user-friendly
  message instead of silently showing "User not found"
- Import axios for isAxiosError type guard

connections.py:
- Fix 3x datetime.now() → datetime.now(timezone.utc) per hard rule
  (lines 187, 378, 565)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 20:02:52 +08:00

832 lines
32 KiB
Python

"""
Connection router — search, request, respond, manage connections.
Security:
- Timing-safe search (50ms sleep floor)
- Per-receiver pending request cap (5 within 10 minutes)
- Atomic accept via UPDATE...WHERE status='pending' RETURNING *
- All endpoints scoped by current_user.id
- Audit logging for all connection events
"""
import asyncio
import logging
from datetime import date as date_type, datetime, timedelta, timezone
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Path, Query, Request
from sqlalchemy import delete, select, func, and_, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.connection_request import ConnectionRequest
from app.models.notification import Notification
from app.models.person import Person
from app.models.settings import Settings
from app.models.user import User
from app.models.user_connection import UserConnection
from app.routers.auth import get_current_user
from app.schemas.connection import (
CancelResponse,
ConnectionRequestResponse,
ConnectionResponse,
RespondAcceptResponse,
RespondRejectResponse,
RespondRequest,
SendConnectionRequest,
SharingOverrideUpdate,
UmbralSearchRequest,
UmbralSearchResponse,
)
from app.services.audit import get_client_ip, log_audit_event
from app.services.connection import (
SHAREABLE_FIELDS,
create_person_from_connection,
detach_umbral_contact,
extract_ntfy_config,
resolve_shared_profile,
send_connection_ntfy,
)
from app.services.notification import create_notification
router = APIRouter()
logger = logging.getLogger(__name__)
# ── Helpers ──────────────────────────────────────────────────────────
async def _get_settings_for_user(db: AsyncSession, user_id: int) -> Settings | None:
result = await db.execute(select(Settings).where(Settings.user_id == user_id))
return result.scalar_one_or_none()
def _build_request_response(
req: ConnectionRequest,
sender: User,
sender_settings: Settings | None,
receiver: User,
receiver_settings: Settings | None,
) -> ConnectionRequestResponse:
return ConnectionRequestResponse(
id=req.id,
sender_umbral_name=sender.umbral_name,
sender_preferred_name=sender_settings.preferred_name if sender_settings else None,
receiver_umbral_name=receiver.umbral_name,
receiver_preferred_name=receiver_settings.preferred_name if receiver_settings else None,
status=req.status,
created_at=req.created_at,
)
# ── POST /search ────────────────────────────────────────────────────
@router.post("/search", response_model=UmbralSearchResponse)
async def search_user(
body: UmbralSearchRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Timing-safe user search. Always queries by umbral_name alone,
then checks accept_connections + is_active in Python.
Generic "not found" for non-existent, opted-out, AND inactive users.
50ms sleep floor to eliminate timing side-channel.
"""
# Always sleep to prevent timing attacks
await asyncio.sleep(0.05)
# Sender must have accept_connections enabled to search
sender_settings = await _get_settings_for_user(db, current_user.id)
if not sender_settings or not sender_settings.accept_connections:
return UmbralSearchResponse(found=False)
# Don't find yourself
if body.umbral_name == current_user.umbral_name:
return UmbralSearchResponse(found=False)
result = await db.execute(
select(User).where(User.umbral_name == body.umbral_name)
)
target = result.scalar_one_or_none()
if not target or not target.is_active:
return UmbralSearchResponse(found=False)
# Check if they accept connections
target_settings = await _get_settings_for_user(db, target.id)
if not target_settings or not target_settings.accept_connections:
return UmbralSearchResponse(found=False)
return UmbralSearchResponse(found=True)
# ── POST /request ───────────────────────────────────────────────────
@router.post("/request", response_model=ConnectionRequestResponse, status_code=201)
async def send_connection_request(
body: SendConnectionRequest,
request: Request,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Send a connection request to another user."""
# Resolve target
result = await db.execute(
select(User).where(User.umbral_name == body.umbral_name)
)
target = result.scalar_one_or_none()
if not target or not target.is_active:
raise HTTPException(status_code=404, detail="User not found")
# Self-request guard
if target.id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot send a connection request to yourself")
# Sender must have accept_connections enabled to participate
sender_settings = await _get_settings_for_user(db, current_user.id)
if not sender_settings or not sender_settings.accept_connections:
raise HTTPException(
status_code=403,
detail="You must enable 'Accept Connections' in your settings before sending requests",
)
# Check accept_connections on target
target_settings = await _get_settings_for_user(db, target.id)
if not target_settings or not target_settings.accept_connections:
raise HTTPException(status_code=404, detail="User not found")
# Check existing connection
existing_conn = await db.execute(
select(UserConnection).where(
UserConnection.user_id == current_user.id,
UserConnection.connected_user_id == target.id,
)
)
if existing_conn.scalar_one_or_none():
raise HTTPException(status_code=409, detail="Already connected")
# Check pending request in either direction
existing_req = await db.execute(
select(ConnectionRequest).where(
and_(
ConnectionRequest.status == "pending",
(
(ConnectionRequest.sender_id == current_user.id) & (ConnectionRequest.receiver_id == target.id)
) | (
(ConnectionRequest.sender_id == target.id) & (ConnectionRequest.receiver_id == current_user.id)
),
)
)
)
if existing_req.scalar_one_or_none():
raise HTTPException(status_code=409, detail="A pending request already exists")
# Per-receiver cap: max 5 pending requests within 10 minutes
ten_min_ago = datetime.now(timezone.utc) - timedelta(minutes=10)
pending_count = await db.scalar(
select(func.count())
.select_from(ConnectionRequest)
.where(
ConnectionRequest.receiver_id == target.id,
ConnectionRequest.status == "pending",
ConnectionRequest.created_at >= ten_min_ago,
)
) or 0
if pending_count >= 5:
raise HTTPException(status_code=429, detail="Too many pending requests for this user")
# Validate person_id if provided (link existing standard contact)
link_person_id = None
if body.person_id is not None:
person_result = await db.execute(
select(Person).where(Person.id == body.person_id, Person.user_id == current_user.id)
)
link_person = person_result.scalar_one_or_none()
if not link_person:
raise HTTPException(status_code=400, detail="Person not found or not owned by you")
if link_person.is_umbral_contact:
raise HTTPException(status_code=400, detail="Person is already an umbral contact")
link_person_id = body.person_id
# Create the request (IntegrityError guard for TOCTOU race on partial unique index)
conn_request = ConnectionRequest(
sender_id=current_user.id,
receiver_id=target.id,
person_id=link_person_id,
)
db.add(conn_request)
try:
await db.flush() # populate conn_request.id for source_id
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=409, detail="A pending request already exists")
# Create in-app notification for receiver (sender_settings already fetched above)
sender_display = (sender_settings.preferred_name if sender_settings else None) or current_user.umbral_name
await create_notification(
db,
user_id=target.id,
type="connection_request",
title="New Connection Request",
message=f"{sender_display} wants to connect with you",
data={"sender_umbral_name": current_user.umbral_name},
source_type="connection_request",
source_id=conn_request.id,
)
await log_audit_event(
db,
action="connection.request_sent",
actor_id=current_user.id,
target_id=target.id,
detail={"receiver_umbral_name": target.umbral_name},
ip=get_client_ip(request),
)
# Extract ntfy config before commit (avoids detached SA object in background task)
target_ntfy = extract_ntfy_config(target_settings) if target_settings else None
await db.commit()
await db.refresh(conn_request)
# ntfy push in background (non-blocking)
background_tasks.add_task(
send_connection_ntfy,
target_ntfy,
sender_display,
"request_received",
)
return _build_request_response(conn_request, current_user, sender_settings, target, target_settings)
# ── GET /requests/incoming ──────────────────────────────────────────
@router.get("/requests/incoming", response_model=list[ConnectionRequestResponse])
async def get_incoming_requests(
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List pending connection requests received by the current user."""
offset = (page - 1) * per_page
result = await db.execute(
select(ConnectionRequest)
.where(
ConnectionRequest.receiver_id == current_user.id,
ConnectionRequest.status == "pending",
)
.options(selectinload(ConnectionRequest.sender))
.order_by(ConnectionRequest.created_at.desc())
.offset(offset)
.limit(per_page)
)
requests = result.scalars().all()
# Fetch current user's settings once, batch-fetch sender settings
receiver_settings = await _get_settings_for_user(db, current_user.id)
sender_ids = [req.sender_id for req in requests]
if sender_ids:
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(sender_ids)))
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
else:
settings_by_user = {}
responses = []
for req in requests:
sender_settings = settings_by_user.get(req.sender_id)
responses.append(_build_request_response(req, req.sender, sender_settings, current_user, receiver_settings))
return responses
# ── GET /requests/outgoing ──────────────────────────────────────────
@router.get("/requests/outgoing", response_model=list[ConnectionRequestResponse])
async def get_outgoing_requests(
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List pending connection requests sent by the current user."""
offset = (page - 1) * per_page
result = await db.execute(
select(ConnectionRequest)
.where(
ConnectionRequest.sender_id == current_user.id,
ConnectionRequest.status == "pending",
)
.options(selectinload(ConnectionRequest.receiver))
.order_by(ConnectionRequest.created_at.desc())
.offset(offset)
.limit(per_page)
)
requests = result.scalars().all()
# Fetch current user's settings once, batch-fetch receiver settings
sender_settings = await _get_settings_for_user(db, current_user.id)
receiver_ids = [req.receiver_id for req in requests]
if receiver_ids:
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(receiver_ids)))
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
else:
settings_by_user = {}
responses = []
for req in requests:
receiver_settings = settings_by_user.get(req.receiver_id)
responses.append(_build_request_response(req, current_user, sender_settings, req.receiver, receiver_settings))
return responses
# ── PUT /requests/{id}/respond ──────────────────────────────────────
@router.put("/requests/{request_id}/respond", response_model=RespondAcceptResponse | RespondRejectResponse)
async def respond_to_request(
body: RespondRequest,
request: Request,
background_tasks: BackgroundTasks,
request_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Accept or reject a connection request. Atomic via UPDATE...WHERE status='pending'."""
try:
return await _respond_to_request_inner(body, request, background_tasks, request_id, db, current_user)
except HTTPException:
raise
except Exception:
# get_db middleware auto-rollbacks on unhandled exceptions
logger.exception("Unhandled error in respond_to_request (request_id=%s, user=%s)", request_id, current_user.id)
raise HTTPException(status_code=500, detail=f"Internal server error while processing connection response (request {request_id})")
async def _respond_to_request_inner(
body: RespondRequest,
request: Request,
background_tasks: BackgroundTasks,
request_id: int,
db: AsyncSession,
current_user: User,
) -> RespondAcceptResponse | RespondRejectResponse:
now = datetime.now(timezone.utc)
# Atomic update — only succeeds if status is still 'pending' and receiver is current user
result = await db.execute(
update(ConnectionRequest)
.where(
ConnectionRequest.id == request_id,
ConnectionRequest.receiver_id == current_user.id,
ConnectionRequest.status == "pending",
)
.values(status=body.action + "ed", resolved_at=now)
.returning(
ConnectionRequest.id,
ConnectionRequest.sender_id,
ConnectionRequest.receiver_id,
ConnectionRequest.person_id,
)
)
row = result.first()
if not row:
raise HTTPException(status_code=409, detail="Request not found or already resolved")
sender_id = row.sender_id
request_person_id = row.person_id
if body.action == "accept":
# Verify sender is still active
sender_result = await db.execute(select(User).where(User.id == sender_id))
sender = sender_result.scalar_one_or_none()
if not sender or not sender.is_active:
# Revert to rejected
await db.execute(
update(ConnectionRequest)
.where(ConnectionRequest.id == request_id)
.values(status="rejected")
)
await db.commit()
raise HTTPException(status_code=409, detail="Sender account is no longer active")
# Get settings for both users
sender_settings = await _get_settings_for_user(db, sender_id)
receiver_settings = await _get_settings_for_user(db, current_user.id)
# Resolve shared profiles for both directions
sender_shared = resolve_shared_profile(sender, sender_settings, None) if sender_settings else {}
receiver_shared = resolve_shared_profile(current_user, receiver_settings, None) if receiver_settings else {}
# Create Person records for both users
person_for_receiver = create_person_from_connection(
current_user.id, sender, sender_settings, sender_shared
)
db.add(person_for_receiver)
# Sender side: reuse existing Person if person_id was provided on the request
person_for_sender = None
if request_person_id:
existing_result = await db.execute(
select(Person).where(Person.id == request_person_id)
)
existing_person = existing_result.scalar_one_or_none()
# Re-validate at accept time: ownership must match sender,
# and must not already be umbral (prevents double-conversion races)
if existing_person and existing_person.user_id == sender_id and not existing_person.is_umbral_contact:
# Convert existing standard contact to umbral
existing_person.linked_user_id = current_user.id
existing_person.is_umbral_contact = True
existing_person.category = "Umbral"
# Update from shared profile
first_name = receiver_shared.get("first_name") or receiver_shared.get("preferred_name") or current_user.umbral_name
last_name = receiver_shared.get("last_name")
existing_person.first_name = first_name
existing_person.last_name = last_name
existing_person.email = receiver_shared.get("email") or existing_person.email
existing_person.phone = receiver_shared.get("phone") or existing_person.phone
existing_person.mobile = receiver_shared.get("mobile") or existing_person.mobile
existing_person.address = receiver_shared.get("address") or existing_person.address
existing_person.company = receiver_shared.get("company") or existing_person.company
existing_person.job_title = receiver_shared.get("job_title") or existing_person.job_title
# Sync birthday from shared profile
birthday_str = receiver_shared.get("birthday")
if birthday_str:
try:
existing_person.birthday = date_type.fromisoformat(birthday_str)
except (ValueError, TypeError):
pass
# Recompute display name
full = ((first_name or '') + ' ' + (last_name or '')).strip()
existing_person.name = full or current_user.umbral_name
person_for_sender = existing_person
if person_for_sender is None:
person_for_sender = create_person_from_connection(
sender_id, current_user, receiver_settings, receiver_shared
)
db.add(person_for_sender)
try:
await db.flush() # populate person IDs
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=409, detail="Connection already exists")
# Create bidirectional connections
conn_a = UserConnection(
user_id=current_user.id,
connected_user_id=sender_id,
person_id=person_for_receiver.id,
)
conn_b = UserConnection(
user_id=sender_id,
connected_user_id=current_user.id,
person_id=person_for_sender.id,
)
db.add(conn_a)
db.add(conn_b)
try:
await db.flush() # populate conn_a.id for source_id
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=409, detail="Connection already exists")
# Notification to sender
receiver_display = (receiver_settings.preferred_name if receiver_settings else None) or current_user.umbral_name
await create_notification(
db,
user_id=sender_id,
type="connection_accepted",
title="Connection Accepted",
message=f"{receiver_display} accepted your connection request",
data={"connected_umbral_name": current_user.umbral_name},
source_type="user_connection",
source_id=conn_b.id,
)
await log_audit_event(
db,
action="connection.accepted",
actor_id=current_user.id,
target_id=sender_id,
detail={"request_id": request_id},
ip=get_client_ip(request),
)
# Extract ntfy config before commit (avoids detached SA object in background task)
sender_ntfy = extract_ntfy_config(sender_settings) if sender_settings else None
try:
await db.commit()
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=409, detail="Connection already exists")
# ntfy push in background
background_tasks.add_task(
send_connection_ntfy,
sender_ntfy,
receiver_display,
"request_accepted",
)
return {"message": "Connection accepted", "connection_id": conn_a.id}
else:
# Reject — only create notification for receiver (not sender per plan)
await log_audit_event(
db,
action="connection.rejected",
actor_id=current_user.id,
target_id=sender_id,
detail={"request_id": request_id},
ip=get_client_ip(request),
)
await db.commit()
return {"message": "Connection request rejected"}
# ── PUT /requests/{id}/cancel ──────────────────────────────────────
@router.put("/requests/{request_id}/cancel", response_model=CancelResponse)
async def cancel_request(
request: Request,
request_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Cancel an outgoing connection request. Atomic via UPDATE...WHERE status='pending'."""
now = datetime.now(timezone.utc)
# Atomic update — only succeeds if sender is current user and status is still pending
result = await db.execute(
update(ConnectionRequest)
.where(
ConnectionRequest.id == request_id,
ConnectionRequest.sender_id == current_user.id,
ConnectionRequest.status == "pending",
)
.values(status="cancelled", resolved_at=now)
.returning(ConnectionRequest.id, ConnectionRequest.receiver_id)
)
row = result.first()
if not row:
raise HTTPException(status_code=409, detail="Request not found or already resolved")
receiver_id = row.receiver_id
# Silent cleanup: remove the notification sent to the receiver
await db.execute(
delete(Notification).where(
Notification.source_type == "connection_request",
Notification.source_id == request_id,
Notification.user_id == receiver_id,
)
)
# Look up receiver umbral_name for audit detail
receiver_result = await db.execute(select(User.umbral_name).where(User.id == receiver_id))
receiver_umbral_name = receiver_result.scalar_one_or_none() or "unknown"
await log_audit_event(
db,
action="connection.request_cancelled",
actor_id=current_user.id,
target_id=receiver_id,
detail={"request_id": request_id, "receiver_umbral_name": receiver_umbral_name},
ip=get_client_ip(request),
)
await db.commit()
return {"message": "Connection request cancelled"}
# ── GET / ───────────────────────────────────────────────────────────
@router.get("/", response_model=list[ConnectionResponse])
async def list_connections(
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all connections for the current user."""
offset = (page - 1) * per_page
result = await db.execute(
select(UserConnection)
.where(UserConnection.user_id == current_user.id)
.options(selectinload(UserConnection.connected_user))
.order_by(UserConnection.created_at.desc())
.offset(offset)
.limit(per_page)
)
connections = result.scalars().all()
# Batch-fetch settings for connected users
connected_ids = [conn.connected_user_id for conn in connections]
if connected_ids:
settings_result = await db.execute(select(Settings).where(Settings.user_id.in_(connected_ids)))
settings_by_user = {s.user_id: s for s in settings_result.scalars().all()}
else:
settings_by_user = {}
responses = []
for conn in connections:
conn_settings = settings_by_user.get(conn.connected_user_id)
responses.append(ConnectionResponse(
id=conn.id,
connected_user_id=conn.connected_user_id,
connected_umbral_name=conn.connected_user.umbral_name,
connected_preferred_name=conn_settings.preferred_name if conn_settings else None,
person_id=conn.person_id,
created_at=conn.created_at,
))
return responses
# ── GET /{id} ───────────────────────────────────────────────────────
@router.get("/{connection_id}", response_model=ConnectionResponse)
async def get_connection(
connection_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get a single connection detail."""
result = await db.execute(
select(UserConnection)
.where(
UserConnection.id == connection_id,
UserConnection.user_id == current_user.id,
)
.options(selectinload(UserConnection.connected_user))
)
conn = result.scalar_one_or_none()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
conn_settings = await _get_settings_for_user(db, conn.connected_user_id)
return ConnectionResponse(
id=conn.id,
connected_user_id=conn.connected_user_id,
connected_umbral_name=conn.connected_user.umbral_name,
connected_preferred_name=conn_settings.preferred_name if conn_settings else None,
person_id=conn.person_id,
created_at=conn.created_at,
)
# ── GET /{id}/shared-profile ────────────────────────────────────────
@router.get("/{connection_id}/shared-profile")
async def get_shared_profile(
connection_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get the resolved shared profile for a connection."""
result = await db.execute(
select(UserConnection)
.where(
UserConnection.id == connection_id,
UserConnection.user_id == current_user.id,
)
.options(selectinload(UserConnection.connected_user))
)
conn = result.scalar_one_or_none()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
conn_settings = await _get_settings_for_user(db, conn.connected_user_id)
if not conn_settings:
return {}
return resolve_shared_profile(
conn.connected_user,
conn_settings,
conn.sharing_overrides,
)
# ── PUT /{id}/sharing-overrides ─────────────────────────────────────
@router.put("/{connection_id}/sharing-overrides")
async def update_sharing_overrides(
body: SharingOverrideUpdate,
connection_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update what YOU share with a specific connection."""
# Get our connection to know who the counterpart is
our_conn = await db.execute(
select(UserConnection).where(
UserConnection.id == connection_id,
UserConnection.user_id == current_user.id,
)
)
conn = our_conn.scalar_one_or_none()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
# Find the reverse connection (their row pointing to us)
reverse_result = await db.execute(
select(UserConnection).where(
UserConnection.user_id == conn.connected_user_id,
UserConnection.connected_user_id == current_user.id,
)
)
reverse_conn = reverse_result.scalar_one_or_none()
if not reverse_conn:
raise HTTPException(status_code=404, detail="Reverse connection not found")
# Merge validated overrides — only SHAREABLE_FIELDS keys
existing = dict(reverse_conn.sharing_overrides or {})
update_data = body.model_dump(exclude_unset=True)
for key, value in update_data.items():
if key in SHAREABLE_FIELDS:
if value is None:
existing.pop(key, None)
else:
existing[key] = value
reverse_conn.sharing_overrides = existing if existing else None
await db.commit()
return {"message": "Sharing overrides updated"}
# ── DELETE /{id} ────────────────────────────────────────────────────
@router.delete("/{connection_id}", status_code=204)
async def remove_connection(
request: Request,
connection_id: int = Path(ge=1, le=2147483647),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Remove a connection. Removes BOTH UserConnection rows.
Detaches BOTH Person records (sets linked_user_id=null, is_umbral_contact=false).
Silent — no notification sent.
"""
# Get our connection
result = await db.execute(
select(UserConnection)
.where(
UserConnection.id == connection_id,
UserConnection.user_id == current_user.id,
)
)
conn = result.scalar_one_or_none()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
counterpart_id = conn.connected_user_id
# Find reverse connection
reverse_result = await db.execute(
select(UserConnection).where(
UserConnection.user_id == counterpart_id,
UserConnection.connected_user_id == current_user.id,
)
)
reverse_conn = reverse_result.scalar_one_or_none()
# Detach Person records
if conn.person_id:
person_result = await db.execute(select(Person).where(Person.id == conn.person_id))
person = person_result.scalar_one_or_none()
if person:
await detach_umbral_contact(person)
if reverse_conn and reverse_conn.person_id:
person_result = await db.execute(select(Person).where(Person.id == reverse_conn.person_id))
person = person_result.scalar_one_or_none()
if person:
await detach_umbral_contact(person)
# Delete both connections
await db.delete(conn)
if reverse_conn:
await db.delete(reverse_conn)
await log_audit_event(
db,
action="connection.removed",
actor_id=current_user.id,
target_id=counterpart_id,
detail={"connection_id": connection_id},
ip=get_client_ip(request),
)
await db.commit()
return None