""" Background notification dispatch job. Runs every 60 seconds via APScheduler (registered in main.py lifespan). DATETIME NOTE: All comparisons use datetime.now() without timezone info. The DB uses TIMESTAMP WITHOUT TIME ZONE (naive datetimes). The Docker container runs UTC. datetime.now() inside the container returns UTC, which matches the naive datetimes stored in the DB. Do NOT use datetime.now(timezone.utc) here — that would produce a timezone-aware object that cannot be compared with naive DB values. """ import logging from datetime import datetime, timedelta from sqlalchemy import select, delete, and_ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.database import AsyncSessionLocal from app.models.settings import Settings from app.models.notification import Notification as AppNotification from app.models.reminder import Reminder from app.models.calendar_event import CalendarEvent from app.models.calendar import Calendar from app.models.todo import Todo from app.models.project import Project from app.models.ntfy_sent import NtfySent from app.models.totp_usage import TOTPUsage from app.models.session import UserSession from app.models.connection_request import ConnectionRequest from app.services.ntfy import send_ntfy_notification from app.services.ntfy_templates import ( build_event_notification, build_reminder_notification, build_todo_notification, build_project_notification, ) logger = logging.getLogger(__name__) from app.config import settings as app_settings # ── Dedup helpers ───────────────────────────────────────────────────────────── async def _get_sent_keys(db: AsyncSession, user_id: int) -> set[str]: """Batch-fetch recent notification keys for a user (within the 7-day purge window).""" cutoff = datetime.now() - timedelta(days=7) result = await db.execute( select(NtfySent.notification_key).where( NtfySent.user_id == user_id, NtfySent.sent_at >= cutoff, ) ) return set(result.scalars().all()) async def _mark_sent(db: AsyncSession, key: str, user_id: int) -> None: db.add(NtfySent(notification_key=key, user_id=user_id)) await db.commit() # ── Dispatch functions ──────────────────────────────────────────────────────── async def _dispatch_reminders(db: AsyncSession, settings: Settings, now: datetime, sent_keys: set[str]) -> None: """Send notifications for reminders that are currently due and not dismissed/snoozed.""" # Mirror the filter from /api/reminders/due, scoped to this user result = await db.execute( select(Reminder).where( and_( Reminder.user_id == settings.user_id, Reminder.remind_at <= now, Reminder.is_dismissed == False, # noqa: E712 Reminder.is_active == True, # noqa: E712 ) ) ) reminders = result.scalars().all() today = now.date() for reminder in reminders: if reminder.snoozed_until and reminder.snoozed_until > now: continue # respect snooze # Key includes user_id to prevent cross-user dedup collisions key = f"reminder:{settings.user_id}:{reminder.id}:{reminder.remind_at.date()}" if key in sent_keys: continue payload = build_reminder_notification( title=reminder.title, remind_at=reminder.remind_at, today=today, description=reminder.description, ) sent = await send_ntfy_notification( settings=settings, click_url=app_settings.UMBRA_URL, **payload, ) if sent: await _mark_sent(db, key, settings.user_id) sent_keys.add(key) async def _dispatch_events(db: AsyncSession, settings: Settings, now: datetime, sent_keys: set[str]) -> None: """Send notifications for calendar events within the configured lead time window.""" lead_minutes = settings.ntfy_event_lead_minutes # Window: events starting between now and (now + lead_minutes) window_end = now + timedelta(minutes=lead_minutes) # Scope events through calendar ownership user_calendar_ids = select(Calendar.id).where(Calendar.user_id == settings.user_id) result = await db.execute( select(CalendarEvent).where( and_( CalendarEvent.calendar_id.in_(user_calendar_ids), CalendarEvent.start_datetime >= now, CalendarEvent.start_datetime <= window_end, # Exclude recurring parent templates — they duplicate the child instance rows. # Parent templates have recurrence_rule set but no parent_event_id. ~and_( CalendarEvent.recurrence_rule != None, # noqa: E711 CalendarEvent.parent_event_id == None, # noqa: E711 ), ) ).options(selectinload(CalendarEvent.location)) ) events = result.scalars().all() today = now.date() for event in events: # Key includes user_id to prevent cross-user dedup collisions key = f"event:{settings.user_id}:{event.id}:{event.start_datetime.strftime('%Y-%m-%dT%H:%M')}" if key in sent_keys: continue payload = build_event_notification( title=event.title, start_datetime=event.start_datetime, all_day=event.all_day, today=today, location_name=event.location.name if event.location else None, description=event.description, is_starred=event.is_starred, ) sent = await send_ntfy_notification( settings=settings, click_url=app_settings.UMBRA_URL, **payload, ) if sent: await _mark_sent(db, key, settings.user_id) sent_keys.add(key) async def _dispatch_todos(db: AsyncSession, settings: Settings, today, sent_keys: set[str]) -> None: """Send notifications for incomplete todos due within the configured lead days.""" lead_days = settings.ntfy_todo_lead_days cutoff = today + timedelta(days=lead_days) result = await db.execute( select(Todo).where( and_( Todo.user_id == settings.user_id, Todo.completed == False, # noqa: E712 Todo.due_date != None, # noqa: E711 Todo.due_date <= cutoff, ) ) ) todos = result.scalars().all() for todo in todos: # Key includes user_id to prevent cross-user dedup collisions key = f"todo:{settings.user_id}:{todo.id}:{today}" if key in sent_keys: continue payload = build_todo_notification( title=todo.title, due_date=todo.due_date, today=today, priority=todo.priority, category=todo.category, ) sent = await send_ntfy_notification( settings=settings, click_url=app_settings.UMBRA_URL, **payload, ) if sent: await _mark_sent(db, key, settings.user_id) sent_keys.add(key) async def _dispatch_projects(db: AsyncSession, settings: Settings, today, sent_keys: set[str]) -> None: """Send notifications for projects with deadlines within the configured lead days.""" lead_days = settings.ntfy_project_lead_days cutoff = today + timedelta(days=lead_days) result = await db.execute( select(Project).where( and_( Project.user_id == settings.user_id, Project.due_date != None, # noqa: E711 Project.due_date <= cutoff, Project.status != "completed", ) ) ) projects = result.scalars().all() for project in projects: # Key includes user_id to prevent cross-user dedup collisions key = f"project:{settings.user_id}:{project.id}:{today}" if key in sent_keys: continue payload = build_project_notification( name=project.name, due_date=project.due_date, today=today, status=project.status, ) sent = await send_ntfy_notification( settings=settings, click_url=app_settings.UMBRA_URL, **payload, ) if sent: await _mark_sent(db, key, settings.user_id) sent_keys.add(key) async def _dispatch_for_user(db: AsyncSession, settings: Settings, now: datetime) -> None: """Run all notification dispatches for a single user's settings.""" # Batch-fetch all sent keys once per user instead of one query per entity sent_keys = await _get_sent_keys(db, settings.user_id) if settings.ntfy_reminders_enabled: await _dispatch_reminders(db, settings, now, sent_keys) if settings.ntfy_events_enabled: await _dispatch_events(db, settings, now, sent_keys) if settings.ntfy_todos_enabled: await _dispatch_todos(db, settings, now.date(), sent_keys) if settings.ntfy_projects_enabled: await _dispatch_projects(db, settings, now.date(), sent_keys) async def _purge_old_sent_records(db: AsyncSession) -> None: """Remove ntfy_sent entries older than 7 days to keep the table lean.""" # See DATETIME NOTE at top of file re: naive datetime usage cutoff = datetime.now() - timedelta(days=7) await db.execute(delete(NtfySent).where(NtfySent.sent_at < cutoff)) await db.commit() async def _purge_totp_usage(db: AsyncSession) -> None: """Remove TOTP usage records older than 5 minutes — they serve no purpose beyond replay prevention.""" cutoff = datetime.now() - timedelta(minutes=5) await db.execute(delete(TOTPUsage).where(TOTPUsage.used_at < cutoff)) await db.commit() async def _purge_expired_sessions(db: AsyncSession) -> None: """Remove expired UserSession rows to keep the sessions table lean.""" await db.execute(delete(UserSession).where(UserSession.expires_at < datetime.now())) await db.commit() async def _purge_old_notifications(db: AsyncSession) -> None: """Remove in-app notifications older than 90 days.""" cutoff = datetime.now() - timedelta(days=90) await db.execute(delete(AppNotification).where(AppNotification.created_at < cutoff)) await db.commit() async def _purge_resolved_requests(db: AsyncSession) -> None: """Remove resolved connection requests after retention period. Rejected/cancelled: 30 days. Accepted: 90 days (longer for audit trail). resolved_at must be set when changing status. NULL resolved_at rows are preserved (comparison with NULL yields NULL). """ reject_cutoff = datetime.now() - timedelta(days=30) accept_cutoff = datetime.now() - timedelta(days=90) await db.execute( delete(ConnectionRequest).where( ConnectionRequest.status.in_(["rejected", "cancelled"]), ConnectionRequest.resolved_at < reject_cutoff, ) ) await db.execute( delete(ConnectionRequest).where( ConnectionRequest.status == "accepted", ConnectionRequest.resolved_at < accept_cutoff, ) ) await db.commit() # ── Entry point ─────────────────────────────────────────────────────────────── async def run_notification_dispatch() -> None: """ Main dispatch function called by APScheduler every 60 seconds. Uses AsyncSessionLocal directly — not the get_db() request-scoped dependency. Iterates over ALL users with ntfy enabled. Per-user errors are caught and logged individually so one user's failure does not prevent others from receiving notifications. """ try: async with AsyncSessionLocal() as db: # Fetch all Settings rows that have ntfy enabled result = await db.execute( select(Settings).where(Settings.ntfy_enabled == True) # noqa: E712 ) all_settings = result.scalars().all() if not all_settings: return # See DATETIME NOTE at top of file re: naive datetime usage now = datetime.now() for user_settings in all_settings: try: await _dispatch_for_user(db, user_settings, now) except Exception: # Isolate per-user failures — log and continue to next user logger.exception( "ntfy dispatch failed for user_id=%s", user_settings.user_id ) # Daily housekeeping: purge stale dedup records (shared across all users) await _purge_old_sent_records(db) # Security housekeeping runs every cycle regardless of ntfy_enabled async with AsyncSessionLocal() as db: await _purge_totp_usage(db) await _purge_expired_sessions(db) await _purge_old_notifications(db) await _purge_resolved_requests(db) except Exception: # Broad catch: job failure must never crash the scheduler or the app logger.exception("ntfy dispatch job encountered an unhandled error")