""" Connection service — shared profile resolution, Person creation, ntfy dispatch. SHAREABLE_FIELDS is the single source of truth for which fields can be shared. """ import asyncio import logging from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from app.models.person import Person from app.models.settings import Settings from app.models.user import User from app.services.ntfy import send_ntfy_notification logger = logging.getLogger(__name__) # Single source of truth — only these fields can be shared via connections SHAREABLE_FIELDS = frozenset({ "preferred_name", "email", "phone", "mobile", "birthday", "address", "company", "job_title", }) # Maps shareable field names to their Settings model column names _SETTINGS_FIELD_MAP = { "preferred_name": "preferred_name", "email": None, # email comes from User model, not Settings "phone": "phone", "mobile": "mobile", "birthday": None, # birthday comes from User model (date_of_birth) "address": "address", "company": "company", "job_title": "job_title", } def resolve_shared_profile( user: User, settings: Settings, overrides: Optional[dict] = None, ) -> dict: """ Merge global sharing defaults with per-connection overrides. Returns {field: value} dict of fields the user is sharing. Only fields in SHAREABLE_FIELDS are included. """ overrides = overrides or {} result = {} for field in SHAREABLE_FIELDS: # Determine if this field is shared: override wins, else global default share_key = f"share_{field}" global_share = getattr(settings, share_key, False) is_shared = overrides.get(field, global_share) if not is_shared: continue # Resolve the actual value if field == "preferred_name": result[field] = settings.preferred_name elif field == "email": result[field] = user.email elif field == "birthday": result[field] = str(user.date_of_birth) if user.date_of_birth else None elif field in _SETTINGS_FIELD_MAP and _SETTINGS_FIELD_MAP[field]: result[field] = getattr(settings, _SETTINGS_FIELD_MAP[field], None) return result def create_person_from_connection( owner_user_id: int, connected_user: User, connected_settings: Settings, shared_profile: dict, ) -> Person: """Create a Person record for a new connection. Does NOT add to session — caller does.""" # Use shared preferred_name for display, fall back to umbral_name first_name = shared_profile.get("preferred_name") or connected_user.umbral_name email = shared_profile.get("email") phone = shared_profile.get("phone") mobile = shared_profile.get("mobile") address = shared_profile.get("address") company = shared_profile.get("company") job_title = shared_profile.get("job_title") birthday_str = shared_profile.get("birthday") from datetime import date as date_type birthday = None if birthday_str: try: birthday = date_type.fromisoformat(birthday_str) except (ValueError, TypeError): pass # Compute display name display_name = first_name or connected_user.umbral_name return Person( user_id=owner_user_id, name=display_name, first_name=first_name, email=email, phone=phone, mobile=mobile, address=address, company=company, job_title=job_title, birthday=birthday, category="Umbral", linked_user_id=connected_user.id, is_umbral_contact=True, ) async def detach_umbral_contact(person: Person) -> None: """Convert an umbral contact back to a standard contact. Does NOT commit.""" person.linked_user_id = None person.is_umbral_contact = False # Clear shared field values but preserve locally-entered data # If no first_name exists, fill from the old name if not person.first_name: person.first_name = person.name or None async def send_connection_ntfy( settings: Settings, sender_name: str, event_type: str, ) -> None: """Send ntfy push for connection events. Non-blocking with 3s timeout.""" if not settings.ntfy_connections_enabled: return title_map = { "request_received": "New Connection Request", "request_accepted": "Connection Accepted", } message_map = { "request_received": f"{sender_name} wants to connect with you on Umbra", "request_accepted": f"{sender_name} accepted your connection request", } tag_map = { "request_received": ["handshake"], "request_accepted": ["white_check_mark"], } title = title_map.get(event_type, "Connection Update") message = message_map.get(event_type, f"Connection update from {sender_name}") tags = tag_map.get(event_type, ["bell"]) try: await asyncio.wait_for( send_ntfy_notification( settings=settings, title=title, message=message, tags=tags, priority=3, ), timeout=3.0, ) except asyncio.TimeoutError: logger.warning("ntfy connection push timed out for user_id=%s", settings.user_id) except Exception: logger.warning("ntfy connection push failed for user_id=%s", settings.user_id)