UMBRA/backend/app/services/connection.py
Kyle Pope 8aec5a5078 Sync birthday to umbral contacts on DOB or share_birthday change
When a user updates their date of birth or toggles share_birthday,
all linked Person records (where linked_user_id matches) are updated.
If share_birthday is off, the birthday is cleared on linked records.
Virtual birthday events auto-reflect the change on next calendar poll.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 06:01:35 +08:00

229 lines
7.8 KiB
Python

"""
Connection service — shared profile resolution, Person creation, ntfy dispatch.
SHAREABLE_FIELDS is the single source of truth for which fields can be shared.
"""
import asyncio
import logging
from datetime import date as date_type
from types import SimpleNamespace
from typing import Optional
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.person import Person
from app.models.settings import Settings
from app.models.user import User
from app.services.ntfy import send_ntfy_notification
logger = logging.getLogger(__name__)
# Notification type constants — keep in sync with notifications model CHECK constraint
NOTIF_TYPE_CONNECTION_REQUEST = "connection_request"
NOTIF_TYPE_CONNECTION_ACCEPTED = "connection_accepted"
# Single source of truth — only these fields can be shared via connections
SHAREABLE_FIELDS = frozenset({
"first_name", "last_name", "preferred_name", "email", "phone", "mobile",
"birthday", "address", "company", "job_title",
})
# Maps shareable field names to their Settings model column names
_SETTINGS_FIELD_MAP = {
"first_name": None, # first_name comes from User model
"last_name": None, # last_name comes from User model
"preferred_name": "preferred_name",
"email": None, # email comes from User model
"phone": "phone",
"mobile": "mobile",
"birthday": None, # birthday comes from User model (date_of_birth)
"address": "address",
"company": "company",
"job_title": "job_title",
}
def resolve_shared_profile(
user: User,
settings: Settings,
overrides: Optional[dict] = None,
) -> dict:
"""
Merge global sharing defaults with per-connection overrides.
Returns {field: value} dict of fields the user is sharing.
Only fields in SHAREABLE_FIELDS are included.
"""
overrides = overrides or {}
result = {}
for field in SHAREABLE_FIELDS:
# Determine if this field is shared: override wins, else global default
share_key = f"share_{field}"
global_share = getattr(settings, share_key, False)
is_shared = overrides.get(field, global_share)
if not is_shared:
continue
# Resolve the actual value
if field == "first_name":
result[field] = user.first_name
elif field == "last_name":
result[field] = user.last_name
elif field == "preferred_name":
result[field] = settings.preferred_name
elif field == "email":
result[field] = user.email
elif field == "birthday":
result[field] = str(user.date_of_birth) if user.date_of_birth else None
elif field in _SETTINGS_FIELD_MAP and _SETTINGS_FIELD_MAP[field]:
result[field] = getattr(settings, _SETTINGS_FIELD_MAP[field], None)
return filter_to_shareable(result)
def filter_to_shareable(profile: dict) -> dict:
"""Strip any keys not in SHAREABLE_FIELDS. Defence-in-depth gate."""
return {k: v for k, v in profile.items() if k in SHAREABLE_FIELDS}
def create_person_from_connection(
owner_user_id: int,
connected_user: User,
connected_settings: Settings,
shared_profile: dict,
) -> Person:
"""Create a Person record for a new connection. Does NOT add to session — caller does."""
# Use shared first_name, fall back to preferred_name, then umbral_name
first_name = shared_profile.get("first_name") or shared_profile.get("preferred_name") or connected_user.umbral_name
last_name = shared_profile.get("last_name")
email = shared_profile.get("email")
phone = shared_profile.get("phone")
mobile = shared_profile.get("mobile")
address = shared_profile.get("address")
company = shared_profile.get("company")
job_title = shared_profile.get("job_title")
birthday_str = shared_profile.get("birthday")
birthday = None
if birthday_str:
try:
birthday = date_type.fromisoformat(birthday_str)
except (ValueError, TypeError):
pass
# Compute display name
full = ((first_name or '') + ' ' + (last_name or '')).strip()
display_name = full or connected_user.umbral_name
return Person(
user_id=owner_user_id,
name=display_name,
first_name=first_name,
last_name=last_name,
email=email,
phone=phone,
mobile=mobile,
address=address,
company=company,
job_title=job_title,
birthday=birthday,
category="Umbral",
linked_user_id=connected_user.id,
is_umbral_contact=True,
)
async def sync_birthday_to_contacts(db: AsyncSession, user_id: int) -> None:
"""Sync user's DOB to all Person records where linked_user_id == user_id.
Respects share_birthday setting — if disabled, clears birthday on linked records."""
user = await db.execute(select(User).where(User.id == user_id))
user_obj = user.scalar_one()
settings_result = await db.execute(select(Settings).where(Settings.user_id == user_id))
settings_obj = settings_result.scalar_one_or_none()
share = settings_obj.share_birthday if settings_obj else False
new_birthday = user_obj.date_of_birth if share else None
await db.execute(
update(Person)
.where(Person.linked_user_id == user_id)
.values(birthday=new_birthday)
)
async def detach_umbral_contact(person: Person) -> None:
"""Convert an umbral contact back to a standard contact. Does NOT commit.
Preserves all person data (name, email, phone, etc.) so the user does not
lose contact information when a connection is severed. Only unlinks the
umbral association — the person becomes a standard contact.
"""
person.linked_user_id = None
person.is_umbral_contact = False
person.category = None
def extract_ntfy_config(settings: Settings) -> dict | None:
"""Extract ntfy config values into a plain dict safe for use after session close."""
if not settings.ntfy_enabled or not settings.ntfy_connections_enabled:
return None
return {
"ntfy_enabled": True,
"ntfy_server_url": settings.ntfy_server_url,
"ntfy_topic": settings.ntfy_topic,
"ntfy_auth_token": settings.ntfy_auth_token,
"user_id": settings.user_id,
}
async def send_connection_ntfy(
ntfy_config: dict | None,
sender_name: str,
event_type: str,
) -> None:
"""Send ntfy push for connection events. Non-blocking with 3s timeout.
Accepts a plain dict (from extract_ntfy_config) to avoid accessing
detached SQLAlchemy objects after session close.
"""
if not ntfy_config:
return
title_map = {
"request_received": "New Connection Request",
"request_accepted": "Connection Accepted",
}
message_map = {
"request_received": f"{sender_name} wants to connect with you on Umbra",
"request_accepted": f"{sender_name} accepted your connection request",
}
tag_map = {
"request_received": ["handshake"],
"request_accepted": ["white_check_mark"],
}
title = title_map.get(event_type, "Connection Update")
message = message_map.get(event_type, f"Connection update from {sender_name}")
tags = tag_map.get(event_type, ["bell"])
# Build a settings-like object for send_ntfy_notification (avoids detached SA objects)
settings_proxy = SimpleNamespace(**ntfy_config)
try:
await asyncio.wait_for(
send_ntfy_notification(
settings=settings_proxy,
title=title,
message=message,
tags=tags,
priority=3,
),
timeout=3.0,
)
except asyncio.TimeoutError:
logger.warning("ntfy connection push timed out for user_id=%s", ntfy_config["user_id"])
except Exception:
logger.warning("ntfy connection push failed for user_id=%s", ntfy_config["user_id"])