UMBRA/backend/app/jobs/notifications.py
Kyle Pope 4a98b67b0b Address all QA review warnings and suggestions for entity pages
- W1: Add ntfy_has_token property to Settings model for safe from_attributes usage
- W2: Eager-load event location and pass location_name to ntfy template builder
- W3: Add missing accent color swatches (red, pink, yellow) to match backend Literal
- W7: Cap IP rate-limit dict at 10k entries with stale-entry purge to prevent OOM
- W9: Include user_id in SettingsResponse for multi-user readiness

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 07:48:45 +08:00

270 lines
9.6 KiB
Python

"""
Background notification dispatch job.
Runs every 60 seconds via APScheduler (registered in main.py lifespan).
DATETIME NOTE: All comparisons use datetime.now() without timezone info.
The DB uses TIMESTAMP WITHOUT TIME ZONE (naive datetimes). The Docker container
runs UTC. datetime.now() inside the container returns UTC, which matches the
naive datetimes stored in the DB. Do NOT use datetime.now(timezone.utc) here —
that would produce a timezone-aware object that cannot be compared with naive DB values.
"""
import logging
from datetime import datetime, timedelta
from sqlalchemy import select, delete, and_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import AsyncSessionLocal
from app.models.settings import Settings
from app.models.reminder import Reminder
from app.models.calendar_event import CalendarEvent
from app.models.todo import Todo
from app.models.project import Project
from app.models.ntfy_sent import NtfySent
from app.models.totp_usage import TOTPUsage
from app.models.session import UserSession
from app.services.ntfy import send_ntfy_notification
from app.services.ntfy_templates import (
build_event_notification,
build_reminder_notification,
build_todo_notification,
build_project_notification,
)
logger = logging.getLogger(__name__)
UMBRA_URL = "http://10.0.69.35"
# ── Dedup helpers ─────────────────────────────────────────────────────────────
async def _already_sent(db: AsyncSession, key: str) -> bool:
result = await db.execute(
select(NtfySent).where(NtfySent.notification_key == key)
)
return result.scalar_one_or_none() is not None
async def _mark_sent(db: AsyncSession, key: str) -> None:
db.add(NtfySent(notification_key=key))
await db.commit()
# ── Dispatch functions ────────────────────────────────────────────────────────
async def _dispatch_reminders(db: AsyncSession, settings: Settings, now: datetime) -> None:
"""Send notifications for reminders that are currently due and not dismissed/snoozed."""
# Mirror the filter from /api/reminders/due
result = await db.execute(
select(Reminder).where(
and_(
Reminder.remind_at <= now,
Reminder.is_dismissed == False, # noqa: E712
Reminder.is_active == True, # noqa: E712
)
)
)
reminders = result.scalars().all()
today = now.date()
for reminder in reminders:
if reminder.snoozed_until and reminder.snoozed_until > now:
continue # respect snooze
# Key ties notification to the specific day to handle re-fires after midnight
key = f"reminder:{reminder.id}:{reminder.remind_at.date()}"
if await _already_sent(db, key):
continue
payload = build_reminder_notification(
title=reminder.title,
remind_at=reminder.remind_at,
today=today,
description=reminder.description,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key)
async def _dispatch_events(db: AsyncSession, settings: Settings, now: datetime) -> None:
"""Send notifications for calendar events within the configured lead time window."""
lead_minutes = settings.ntfy_event_lead_minutes
# Window: events starting between now and (now + lead_minutes)
window_end = now + timedelta(minutes=lead_minutes)
result = await db.execute(
select(CalendarEvent).where(
and_(
CalendarEvent.start_datetime >= now,
CalendarEvent.start_datetime <= window_end,
)
).options(selectinload(CalendarEvent.location))
)
events = result.scalars().all()
today = now.date()
for event in events:
# Key includes the minute-precision start to avoid re-firing during the window
key = f"event:{event.id}:{event.start_datetime.strftime('%Y-%m-%dT%H:%M')}"
if await _already_sent(db, key):
continue
payload = build_event_notification(
title=event.title,
start_datetime=event.start_datetime,
all_day=event.all_day,
today=today,
location_name=event.location.name if event.location else None,
description=event.description,
is_starred=event.is_starred,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key)
async def _dispatch_todos(db: AsyncSession, settings: Settings, today) -> None:
"""Send notifications for incomplete todos due within the configured lead days."""
from datetime import date as date_type
lead_days = settings.ntfy_todo_lead_days
cutoff = today + timedelta(days=lead_days)
result = await db.execute(
select(Todo).where(
and_(
Todo.completed == False, # noqa: E712
Todo.due_date != None, # noqa: E711
Todo.due_date <= cutoff,
)
)
)
todos = result.scalars().all()
for todo in todos:
key = f"todo:{todo.id}:{today}"
if await _already_sent(db, key):
continue
payload = build_todo_notification(
title=todo.title,
due_date=todo.due_date,
today=today,
priority=todo.priority,
category=todo.category,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key)
async def _dispatch_projects(db: AsyncSession, settings: Settings, today) -> None:
"""Send notifications for projects with deadlines within the configured lead days."""
lead_days = settings.ntfy_project_lead_days
cutoff = today + timedelta(days=lead_days)
result = await db.execute(
select(Project).where(
and_(
Project.due_date != None, # noqa: E711
Project.due_date <= cutoff,
Project.status != "completed",
)
)
)
projects = result.scalars().all()
for project in projects:
key = f"project:{project.id}:{today}"
if await _already_sent(db, key):
continue
payload = build_project_notification(
name=project.name,
due_date=project.due_date,
today=today,
status=project.status,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key)
async def _purge_old_sent_records(db: AsyncSession) -> None:
"""Remove ntfy_sent entries older than 7 days to keep the table lean."""
# See DATETIME NOTE at top of file re: naive datetime usage
cutoff = datetime.now() - timedelta(days=7)
await db.execute(delete(NtfySent).where(NtfySent.sent_at < cutoff))
await db.commit()
async def _purge_totp_usage(db: AsyncSession) -> None:
"""Remove TOTP usage records older than 5 minutes — they serve no purpose beyond replay prevention."""
cutoff = datetime.now() - timedelta(minutes=5)
await db.execute(delete(TOTPUsage).where(TOTPUsage.used_at < cutoff))
await db.commit()
async def _purge_expired_sessions(db: AsyncSession) -> None:
"""Remove expired UserSession rows to keep the sessions table lean."""
await db.execute(delete(UserSession).where(UserSession.expires_at < datetime.now()))
await db.commit()
# ── Entry point ───────────────────────────────────────────────────────────────
async def run_notification_dispatch() -> None:
"""
Main dispatch function called by APScheduler every 60 seconds.
Uses AsyncSessionLocal directly — not the get_db() request-scoped dependency.
"""
try:
async with AsyncSessionLocal() as db:
result = await db.execute(select(Settings))
settings = result.scalar_one_or_none()
if not settings or not settings.ntfy_enabled:
return
# See DATETIME NOTE at top of file re: naive datetime usage
now = datetime.now()
today = now.date()
if settings.ntfy_reminders_enabled:
await _dispatch_reminders(db, settings, now)
if settings.ntfy_events_enabled:
await _dispatch_events(db, settings, now)
if settings.ntfy_todos_enabled:
await _dispatch_todos(db, settings, today)
if settings.ntfy_projects_enabled:
await _dispatch_projects(db, settings, today)
# Daily housekeeping: purge stale dedup records
await _purge_old_sent_records(db)
# Security housekeeping runs every cycle regardless of ntfy_enabled
async with AsyncSessionLocal() as db:
await _purge_totp_usage(db)
await _purge_expired_sessions(db)
except Exception:
# Broad catch: job failure must never crash the scheduler or the app
logger.exception("ntfy dispatch job encountered an unhandled error")