""" Connection service — shared profile resolution, Person creation, ntfy dispatch. SHAREABLE_FIELDS is the single source of truth for which fields can be shared. """ import asyncio import logging from datetime import date as date_type from types import SimpleNamespace from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from app.models.person import Person from app.models.settings import Settings from app.models.user import User from app.services.ntfy import send_ntfy_notification logger = logging.getLogger(__name__) # Notification type constants — keep in sync with notifications model CHECK constraint NOTIF_TYPE_CONNECTION_REQUEST = "connection_request" NOTIF_TYPE_CONNECTION_ACCEPTED = "connection_accepted" # Single source of truth — only these fields can be shared via connections SHAREABLE_FIELDS = frozenset({ "first_name", "last_name", "preferred_name", "email", "phone", "mobile", "birthday", "address", "company", "job_title", }) # Maps shareable field names to their Settings model column names _SETTINGS_FIELD_MAP = { "first_name": None, # first_name comes from User model "last_name": None, # last_name comes from User model "preferred_name": "preferred_name", "email": None, # email comes from User model "phone": "phone", "mobile": "mobile", "birthday": None, # birthday comes from User model (date_of_birth) "address": "address", "company": "company", "job_title": "job_title", } def resolve_shared_profile( user: User, settings: Settings, overrides: Optional[dict] = None, ) -> dict: """ Merge global sharing defaults with per-connection overrides. Returns {field: value} dict of fields the user is sharing. Only fields in SHAREABLE_FIELDS are included. """ overrides = overrides or {} result = {} for field in SHAREABLE_FIELDS: # Determine if this field is shared: override wins, else global default share_key = f"share_{field}" global_share = getattr(settings, share_key, False) is_shared = overrides.get(field, global_share) if not is_shared: continue # Resolve the actual value if field == "first_name": result[field] = user.first_name elif field == "last_name": result[field] = user.last_name elif field == "preferred_name": result[field] = settings.preferred_name elif field == "email": result[field] = user.email elif field == "birthday": result[field] = str(user.date_of_birth) if user.date_of_birth else None elif field in _SETTINGS_FIELD_MAP and _SETTINGS_FIELD_MAP[field]: result[field] = getattr(settings, _SETTINGS_FIELD_MAP[field], None) return filter_to_shareable(result) def filter_to_shareable(profile: dict) -> dict: """Strip any keys not in SHAREABLE_FIELDS. Defence-in-depth gate.""" return {k: v for k, v in profile.items() if k in SHAREABLE_FIELDS} def create_person_from_connection( owner_user_id: int, connected_user: User, connected_settings: Settings, shared_profile: dict, ) -> Person: """Create a Person record for a new connection. Does NOT add to session — caller does.""" # Use shared first_name, fall back to preferred_name, then umbral_name first_name = shared_profile.get("first_name") or shared_profile.get("preferred_name") or connected_user.umbral_name last_name = shared_profile.get("last_name") email = shared_profile.get("email") phone = shared_profile.get("phone") mobile = shared_profile.get("mobile") address = shared_profile.get("address") company = shared_profile.get("company") job_title = shared_profile.get("job_title") birthday_str = shared_profile.get("birthday") birthday = None if birthday_str: try: birthday = date_type.fromisoformat(birthday_str) except (ValueError, TypeError): pass # Compute display name full = ((first_name or '') + ' ' + (last_name or '')).strip() display_name = full or connected_user.umbral_name return Person( user_id=owner_user_id, name=display_name, first_name=first_name, last_name=last_name, email=email, phone=phone, mobile=mobile, address=address, company=company, job_title=job_title, birthday=birthday, category="Umbral", linked_user_id=connected_user.id, is_umbral_contact=True, ) async def detach_umbral_contact(person: Person) -> None: """Convert an umbral contact back to a standard contact. Does NOT commit. Preserves all person data (name, email, phone, etc.) so the user does not lose contact information when a connection is severed. Only unlinks the umbral association — the person becomes a standard contact. """ person.linked_user_id = None person.is_umbral_contact = False person.category = None def extract_ntfy_config(settings: Settings) -> dict | None: """Extract ntfy config values into a plain dict safe for use after session close.""" if not settings.ntfy_enabled or not settings.ntfy_connections_enabled: return None return { "ntfy_enabled": True, "ntfy_server_url": settings.ntfy_server_url, "ntfy_topic": settings.ntfy_topic, "ntfy_auth_token": settings.ntfy_auth_token, "user_id": settings.user_id, } async def send_connection_ntfy( ntfy_config: dict | None, sender_name: str, event_type: str, ) -> None: """Send ntfy push for connection events. Non-blocking with 3s timeout. Accepts a plain dict (from extract_ntfy_config) to avoid accessing detached SQLAlchemy objects after session close. """ if not ntfy_config: return title_map = { "request_received": "New Connection Request", "request_accepted": "Connection Accepted", } message_map = { "request_received": f"{sender_name} wants to connect with you on Umbra", "request_accepted": f"{sender_name} accepted your connection request", } tag_map = { "request_received": ["handshake"], "request_accepted": ["white_check_mark"], } title = title_map.get(event_type, "Connection Update") message = message_map.get(event_type, f"Connection update from {sender_name}") tags = tag_map.get(event_type, ["bell"]) # Build a settings-like object for send_ntfy_notification (avoids detached SA objects) settings_proxy = SimpleNamespace(**ntfy_config) try: await asyncio.wait_for( send_ntfy_notification( settings=settings_proxy, title=title, message=message, tags=tags, priority=3, ), timeout=3.0, ) except asyncio.TimeoutError: logger.warning("ntfy connection push timed out for user_id=%s", ntfy_config["user_id"]) except Exception: logger.warning("ntfy connection push failed for user_id=%s", ntfy_config["user_id"])