UMBRA/backend/app/jobs/notifications.py
Kyle Pope 337b50c7ce Fix QA review findings: source_id, N+1 queries, event bubbling, type mismatches
Critical fixes:
- C-01: Add receiver_umbral_name/receiver_preferred_name to frontend ConnectionRequest type
- C-02: Flush connection request before notification to populate source_id
- C-03: Add umbral_name to ProfileResponse/UserProfile, use in Settings Social card
- C-04: Remove dead code in sharing-overrides endpoint, merge instead of replace

Warning fixes:
- W-01/W-02: Batch-fetch settings in incoming/outgoing/list connection endpoints (N+1 fix)
- W-04: Add _purge_resolved_requests job for rejected/cancelled requests (30-day retention)
- W-10: Add e.stopPropagation() to notification mark-read and delete buttons

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 02:29:04 +08:00

338 lines
13 KiB
Python

"""
Background notification dispatch job.
Runs every 60 seconds via APScheduler (registered in main.py lifespan).
DATETIME NOTE: All comparisons use datetime.now() without timezone info.
The DB uses TIMESTAMP WITHOUT TIME ZONE (naive datetimes). The Docker container
runs UTC. datetime.now() inside the container returns UTC, which matches the
naive datetimes stored in the DB. Do NOT use datetime.now(timezone.utc) here —
that would produce a timezone-aware object that cannot be compared with naive DB values.
"""
import logging
from datetime import datetime, timedelta
from sqlalchemy import select, delete, and_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import AsyncSessionLocal
from app.models.settings import Settings
from app.models.notification import Notification as AppNotification
from app.models.reminder import Reminder
from app.models.calendar_event import CalendarEvent
from app.models.calendar import Calendar
from app.models.todo import Todo
from app.models.project import Project
from app.models.ntfy_sent import NtfySent
from app.models.totp_usage import TOTPUsage
from app.models.session import UserSession
from app.models.connection_request import ConnectionRequest
from app.services.ntfy import send_ntfy_notification
from app.services.ntfy_templates import (
build_event_notification,
build_reminder_notification,
build_todo_notification,
build_project_notification,
)
logger = logging.getLogger(__name__)
from app.config import settings as app_settings
# ── Dedup helpers ─────────────────────────────────────────────────────────────
async def _get_sent_keys(db: AsyncSession, user_id: int) -> set[str]:
"""Batch-fetch recent notification keys for a user (within the 7-day purge window)."""
cutoff = datetime.now() - timedelta(days=7)
result = await db.execute(
select(NtfySent.notification_key).where(
NtfySent.user_id == user_id,
NtfySent.sent_at >= cutoff,
)
)
return set(result.scalars().all())
async def _mark_sent(db: AsyncSession, key: str, user_id: int) -> None:
db.add(NtfySent(notification_key=key, user_id=user_id))
await db.commit()
# ── Dispatch functions ────────────────────────────────────────────────────────
async def _dispatch_reminders(db: AsyncSession, settings: Settings, now: datetime, sent_keys: set[str]) -> None:
"""Send notifications for reminders that are currently due and not dismissed/snoozed."""
# Mirror the filter from /api/reminders/due, scoped to this user
result = await db.execute(
select(Reminder).where(
and_(
Reminder.user_id == settings.user_id,
Reminder.remind_at <= now,
Reminder.is_dismissed == False, # noqa: E712
Reminder.is_active == True, # noqa: E712
)
)
)
reminders = result.scalars().all()
today = now.date()
for reminder in reminders:
if reminder.snoozed_until and reminder.snoozed_until > now:
continue # respect snooze
# Key includes user_id to prevent cross-user dedup collisions
key = f"reminder:{settings.user_id}:{reminder.id}:{reminder.remind_at.date()}"
if key in sent_keys:
continue
payload = build_reminder_notification(
title=reminder.title,
remind_at=reminder.remind_at,
today=today,
description=reminder.description,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=app_settings.UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key, settings.user_id)
sent_keys.add(key)
async def _dispatch_events(db: AsyncSession, settings: Settings, now: datetime, sent_keys: set[str]) -> None:
"""Send notifications for calendar events within the configured lead time window."""
lead_minutes = settings.ntfy_event_lead_minutes
# Window: events starting between now and (now + lead_minutes)
window_end = now + timedelta(minutes=lead_minutes)
# Scope events through calendar ownership
user_calendar_ids = select(Calendar.id).where(Calendar.user_id == settings.user_id)
result = await db.execute(
select(CalendarEvent).where(
and_(
CalendarEvent.calendar_id.in_(user_calendar_ids),
CalendarEvent.start_datetime >= now,
CalendarEvent.start_datetime <= window_end,
# Exclude recurring parent templates — they duplicate the child instance rows.
# Parent templates have recurrence_rule set but no parent_event_id.
~and_(
CalendarEvent.recurrence_rule != None, # noqa: E711
CalendarEvent.parent_event_id == None, # noqa: E711
),
)
).options(selectinload(CalendarEvent.location))
)
events = result.scalars().all()
today = now.date()
for event in events:
# Key includes user_id to prevent cross-user dedup collisions
key = f"event:{settings.user_id}:{event.id}:{event.start_datetime.strftime('%Y-%m-%dT%H:%M')}"
if key in sent_keys:
continue
payload = build_event_notification(
title=event.title,
start_datetime=event.start_datetime,
all_day=event.all_day,
today=today,
location_name=event.location.name if event.location else None,
description=event.description,
is_starred=event.is_starred,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=app_settings.UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key, settings.user_id)
sent_keys.add(key)
async def _dispatch_todos(db: AsyncSession, settings: Settings, today, sent_keys: set[str]) -> None:
"""Send notifications for incomplete todos due within the configured lead days."""
lead_days = settings.ntfy_todo_lead_days
cutoff = today + timedelta(days=lead_days)
result = await db.execute(
select(Todo).where(
and_(
Todo.user_id == settings.user_id,
Todo.completed == False, # noqa: E712
Todo.due_date != None, # noqa: E711
Todo.due_date <= cutoff,
)
)
)
todos = result.scalars().all()
for todo in todos:
# Key includes user_id to prevent cross-user dedup collisions
key = f"todo:{settings.user_id}:{todo.id}:{today}"
if key in sent_keys:
continue
payload = build_todo_notification(
title=todo.title,
due_date=todo.due_date,
today=today,
priority=todo.priority,
category=todo.category,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=app_settings.UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key, settings.user_id)
sent_keys.add(key)
async def _dispatch_projects(db: AsyncSession, settings: Settings, today, sent_keys: set[str]) -> None:
"""Send notifications for projects with deadlines within the configured lead days."""
lead_days = settings.ntfy_project_lead_days
cutoff = today + timedelta(days=lead_days)
result = await db.execute(
select(Project).where(
and_(
Project.user_id == settings.user_id,
Project.due_date != None, # noqa: E711
Project.due_date <= cutoff,
Project.status != "completed",
)
)
)
projects = result.scalars().all()
for project in projects:
# Key includes user_id to prevent cross-user dedup collisions
key = f"project:{settings.user_id}:{project.id}:{today}"
if key in sent_keys:
continue
payload = build_project_notification(
name=project.name,
due_date=project.due_date,
today=today,
status=project.status,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=app_settings.UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key, settings.user_id)
sent_keys.add(key)
async def _dispatch_for_user(db: AsyncSession, settings: Settings, now: datetime) -> None:
"""Run all notification dispatches for a single user's settings."""
# Batch-fetch all sent keys once per user instead of one query per entity
sent_keys = await _get_sent_keys(db, settings.user_id)
if settings.ntfy_reminders_enabled:
await _dispatch_reminders(db, settings, now, sent_keys)
if settings.ntfy_events_enabled:
await _dispatch_events(db, settings, now, sent_keys)
if settings.ntfy_todos_enabled:
await _dispatch_todos(db, settings, now.date(), sent_keys)
if settings.ntfy_projects_enabled:
await _dispatch_projects(db, settings, now.date(), sent_keys)
async def _purge_old_sent_records(db: AsyncSession) -> None:
"""Remove ntfy_sent entries older than 7 days to keep the table lean."""
# See DATETIME NOTE at top of file re: naive datetime usage
cutoff = datetime.now() - timedelta(days=7)
await db.execute(delete(NtfySent).where(NtfySent.sent_at < cutoff))
await db.commit()
async def _purge_totp_usage(db: AsyncSession) -> None:
"""Remove TOTP usage records older than 5 minutes — they serve no purpose beyond replay prevention."""
cutoff = datetime.now() - timedelta(minutes=5)
await db.execute(delete(TOTPUsage).where(TOTPUsage.used_at < cutoff))
await db.commit()
async def _purge_expired_sessions(db: AsyncSession) -> None:
"""Remove expired UserSession rows to keep the sessions table lean."""
await db.execute(delete(UserSession).where(UserSession.expires_at < datetime.now()))
await db.commit()
async def _purge_old_notifications(db: AsyncSession) -> None:
"""Remove in-app notifications older than 90 days."""
cutoff = datetime.now() - timedelta(days=90)
await db.execute(delete(AppNotification).where(AppNotification.created_at < cutoff))
await db.commit()
async def _purge_resolved_requests(db: AsyncSession) -> None:
"""Remove rejected/cancelled connection requests older than 30 days."""
cutoff = datetime.now() - timedelta(days=30)
await db.execute(
delete(ConnectionRequest).where(
ConnectionRequest.status.in_(["rejected", "cancelled"]),
ConnectionRequest.resolved_at < cutoff,
)
)
await db.commit()
# ── Entry point ───────────────────────────────────────────────────────────────
async def run_notification_dispatch() -> None:
"""
Main dispatch function called by APScheduler every 60 seconds.
Uses AsyncSessionLocal directly — not the get_db() request-scoped dependency.
Iterates over ALL users with ntfy enabled. Per-user errors are caught and
logged individually so one user's failure does not prevent others from
receiving notifications.
"""
try:
async with AsyncSessionLocal() as db:
# Fetch all Settings rows that have ntfy enabled
result = await db.execute(
select(Settings).where(Settings.ntfy_enabled == True) # noqa: E712
)
all_settings = result.scalars().all()
if not all_settings:
return
# See DATETIME NOTE at top of file re: naive datetime usage
now = datetime.now()
for user_settings in all_settings:
try:
await _dispatch_for_user(db, user_settings, now)
except Exception:
# Isolate per-user failures — log and continue to next user
logger.exception(
"ntfy dispatch failed for user_id=%s", user_settings.user_id
)
# Daily housekeeping: purge stale dedup records (shared across all users)
await _purge_old_sent_records(db)
# Security housekeeping runs every cycle regardless of ntfy_enabled
async with AsyncSessionLocal() as db:
await _purge_totp_usage(db)
await _purge_expired_sessions(db)
await _purge_old_notifications(db)
await _purge_resolved_requests(db)
except Exception:
# Broad catch: job failure must never crash the scheduler or the app
logger.exception("ntfy dispatch job encountered an unhandled error")