UMBRA/backend/app/jobs/notifications.py
Kyle Pope 67456c78dd Implement Track C: NTFY push notification integration
- Add ntfy columns to Settings model (server_url, topic, auth_token, enabled, per-type toggles, lead times)
- Create NtfySent dedup model to prevent duplicate notifications
- Create ntfy service with SSRF validation and async httpx send
- Create ntfy_templates service with per-type payload builders
- Create APScheduler background dispatch job (60s interval, events/reminders/todos/projects)
- Register scheduler in main.py lifespan with max_instances=1
- Update SettingsUpdate with ntfy validators (URL scheme, topic regex, lead time ranges)
- Update SettingsResponse with ntfy fields; ntfy_has_token computed, token never exposed
- Add POST /api/settings/ntfy/test endpoint
- Update GET/PUT settings to use explicit _to_settings_response() helper
- Add Alembic migration 022 for ntfy settings columns + ntfy_sent table
- Add httpx==0.27.2 and apscheduler==3.10.4 to requirements.txt

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 04:04:23 +08:00

248 lines
8.6 KiB
Python

"""
Background notification dispatch job.
Runs every 60 seconds via APScheduler (registered in main.py lifespan).
DATETIME NOTE: All comparisons use datetime.now() without timezone info.
The DB uses TIMESTAMP WITHOUT TIME ZONE (naive datetimes). The Docker container
runs UTC. datetime.now() inside the container returns UTC, which matches the
naive datetimes stored in the DB. Do NOT use datetime.now(timezone.utc) here —
that would produce a timezone-aware object that cannot be compared with naive DB values.
"""
import logging
from datetime import datetime, timedelta
from sqlalchemy import select, delete, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import AsyncSessionLocal
from app.models.settings import Settings
from app.models.reminder import Reminder
from app.models.calendar_event import CalendarEvent
from app.models.todo import Todo
from app.models.project import Project
from app.models.ntfy_sent import NtfySent
from app.services.ntfy import send_ntfy_notification
from app.services.ntfy_templates import (
build_event_notification,
build_reminder_notification,
build_todo_notification,
build_project_notification,
)
logger = logging.getLogger(__name__)
UMBRA_URL = "http://10.0.69.35"
# ── Dedup helpers ─────────────────────────────────────────────────────────────
async def _already_sent(db: AsyncSession, key: str) -> bool:
result = await db.execute(
select(NtfySent).where(NtfySent.notification_key == key)
)
return result.scalar_one_or_none() is not None
async def _mark_sent(db: AsyncSession, key: str) -> None:
db.add(NtfySent(notification_key=key))
await db.commit()
# ── Dispatch functions ────────────────────────────────────────────────────────
async def _dispatch_reminders(db: AsyncSession, settings: Settings, now: datetime) -> None:
"""Send notifications for reminders that are currently due and not dismissed/snoozed."""
# Mirror the filter from /api/reminders/due
result = await db.execute(
select(Reminder).where(
and_(
Reminder.remind_at <= now,
Reminder.is_dismissed == False, # noqa: E712
Reminder.is_active == True, # noqa: E712
)
)
)
reminders = result.scalars().all()
today = now.date()
for reminder in reminders:
if reminder.snoozed_until and reminder.snoozed_until > now:
continue # respect snooze
# Key ties notification to the specific day to handle re-fires after midnight
key = f"reminder:{reminder.id}:{reminder.remind_at.date()}"
if await _already_sent(db, key):
continue
payload = build_reminder_notification(
title=reminder.title,
remind_at=reminder.remind_at,
today=today,
description=reminder.description,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key)
async def _dispatch_events(db: AsyncSession, settings: Settings, now: datetime) -> None:
"""Send notifications for calendar events within the configured lead time window."""
lead_minutes = settings.ntfy_event_lead_minutes
# Window: events starting between now and (now + lead_minutes)
window_end = now + timedelta(minutes=lead_minutes)
result = await db.execute(
select(CalendarEvent).where(
and_(
CalendarEvent.start_datetime >= now,
CalendarEvent.start_datetime <= window_end,
)
)
)
events = result.scalars().all()
today = now.date()
for event in events:
# Key includes the minute-precision start to avoid re-firing during the window
key = f"event:{event.id}:{event.start_datetime.strftime('%Y-%m-%dT%H:%M')}"
if await _already_sent(db, key):
continue
payload = build_event_notification(
title=event.title,
start_datetime=event.start_datetime,
all_day=event.all_day,
today=today,
description=event.description,
is_starred=event.is_starred,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key)
async def _dispatch_todos(db: AsyncSession, settings: Settings, today) -> None:
"""Send notifications for incomplete todos due within the configured lead days."""
from datetime import date as date_type
lead_days = settings.ntfy_todo_lead_days
cutoff = today + timedelta(days=lead_days)
result = await db.execute(
select(Todo).where(
and_(
Todo.completed == False, # noqa: E712
Todo.due_date != None, # noqa: E711
Todo.due_date <= cutoff,
)
)
)
todos = result.scalars().all()
for todo in todos:
key = f"todo:{todo.id}:{today}"
if await _already_sent(db, key):
continue
payload = build_todo_notification(
title=todo.title,
due_date=todo.due_date,
today=today,
priority=todo.priority,
category=todo.category,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key)
async def _dispatch_projects(db: AsyncSession, settings: Settings, today) -> None:
"""Send notifications for projects with deadlines within the configured lead days."""
lead_days = settings.ntfy_project_lead_days
cutoff = today + timedelta(days=lead_days)
result = await db.execute(
select(Project).where(
and_(
Project.due_date != None, # noqa: E711
Project.due_date <= cutoff,
Project.status != "completed",
)
)
)
projects = result.scalars().all()
for project in projects:
key = f"project:{project.id}:{today}"
if await _already_sent(db, key):
continue
payload = build_project_notification(
name=project.name,
due_date=project.due_date,
today=today,
status=project.status,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key)
async def _purge_old_sent_records(db: AsyncSession) -> None:
"""Remove ntfy_sent entries older than 7 days to keep the table lean."""
# See DATETIME NOTE at top of file re: naive datetime usage
cutoff = datetime.now() - timedelta(days=7)
await db.execute(delete(NtfySent).where(NtfySent.sent_at < cutoff))
await db.commit()
# ── Entry point ───────────────────────────────────────────────────────────────
async def run_notification_dispatch() -> None:
"""
Main dispatch function called by APScheduler every 60 seconds.
Uses AsyncSessionLocal directly — not the get_db() request-scoped dependency.
"""
try:
async with AsyncSessionLocal() as db:
result = await db.execute(select(Settings))
settings = result.scalar_one_or_none()
if not settings or not settings.ntfy_enabled:
return
# See DATETIME NOTE at top of file re: naive datetime usage
now = datetime.now()
today = now.date()
if settings.ntfy_reminders_enabled:
await _dispatch_reminders(db, settings, now)
if settings.ntfy_events_enabled:
await _dispatch_events(db, settings, now)
if settings.ntfy_todos_enabled:
await _dispatch_todos(db, settings, today)
if settings.ntfy_projects_enabled:
await _dispatch_projects(db, settings, today)
# Daily housekeeping: purge stale dedup records
await _purge_old_sent_records(db)
except Exception:
# Broad catch: job failure must never crash the scheduler or the app
logger.exception("ntfy dispatch job encountered an unhandled error")