- AC-1: Merge get_current_user into single JOIN query (session + user in one round-trip instead of two sequential queries per request) - AC-2: Wrap all Argon2id hash/verify calls in run_in_executor to avoid blocking the async event loop (~150ms per operation) - AW-7: Add connection pool config (pool_size=10, pool_pre_ping=True, pool_recycle=1800) to prevent connection exhaustion under load - AC-4: Batch-fetch tasks in reorder_tasks with IN clause instead of N sequential queries during Kanban drag operations - AW-4: Bulk NtfySent inserts with single commit per user instead of per-notification commits in the dispatch job Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
367 lines
14 KiB
Python
367 lines
14 KiB
Python
"""
|
|
Background notification dispatch job.
|
|
Runs every 60 seconds via APScheduler (registered in main.py lifespan).
|
|
|
|
DATETIME NOTE: All comparisons use datetime.now() without timezone info.
|
|
The DB uses TIMESTAMP WITHOUT TIME ZONE (naive datetimes). The Docker container
|
|
runs UTC. datetime.now() inside the container returns UTC, which matches the
|
|
naive datetimes stored in the DB. Do NOT use datetime.now(timezone.utc) here —
|
|
that would produce a timezone-aware object that cannot be compared with naive DB values.
|
|
"""
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
from sqlalchemy import select, delete, and_
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.database import AsyncSessionLocal
|
|
from app.models.settings import Settings
|
|
from app.models.notification import Notification as AppNotification
|
|
from app.models.reminder import Reminder
|
|
from app.models.calendar_event import CalendarEvent
|
|
from app.models.calendar import Calendar
|
|
from app.models.event_lock import EventLock
|
|
from app.models.todo import Todo
|
|
from app.models.project import Project
|
|
from app.models.ntfy_sent import NtfySent
|
|
from app.models.totp_usage import TOTPUsage
|
|
from app.models.session import UserSession
|
|
from app.models.connection_request import ConnectionRequest
|
|
from app.services.ntfy import send_ntfy_notification
|
|
from app.services.ntfy_templates import (
|
|
build_event_notification,
|
|
build_reminder_notification,
|
|
build_todo_notification,
|
|
build_project_notification,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
from app.config import settings as app_settings
|
|
|
|
|
|
# ── Dedup helpers ─────────────────────────────────────────────────────────────
|
|
|
|
async def _get_sent_keys(db: AsyncSession, user_id: int) -> set[str]:
|
|
"""Batch-fetch recent notification keys for a user (within the 7-day purge window)."""
|
|
cutoff = datetime.now() - timedelta(days=7)
|
|
result = await db.execute(
|
|
select(NtfySent.notification_key).where(
|
|
NtfySent.user_id == user_id,
|
|
NtfySent.sent_at >= cutoff,
|
|
)
|
|
)
|
|
return set(result.scalars().all())
|
|
|
|
|
|
async def _mark_sent(db: AsyncSession, key: str, user_id: int) -> None:
|
|
"""Stage a sent record — caller must commit (AW-4: bulk commit per user)."""
|
|
db.add(NtfySent(notification_key=key, user_id=user_id))
|
|
|
|
|
|
# ── Dispatch functions ────────────────────────────────────────────────────────
|
|
|
|
async def _dispatch_reminders(db: AsyncSession, settings: Settings, now: datetime, sent_keys: set[str]) -> None:
|
|
"""Send notifications for reminders that are currently due and not dismissed/snoozed."""
|
|
# Mirror the filter from /api/reminders/due, scoped to this user
|
|
result = await db.execute(
|
|
select(Reminder).where(
|
|
and_(
|
|
Reminder.user_id == settings.user_id,
|
|
Reminder.remind_at <= now,
|
|
Reminder.is_dismissed == False, # noqa: E712
|
|
Reminder.is_active == True, # noqa: E712
|
|
)
|
|
)
|
|
)
|
|
reminders = result.scalars().all()
|
|
today = now.date()
|
|
|
|
for reminder in reminders:
|
|
if reminder.snoozed_until and reminder.snoozed_until > now:
|
|
continue # respect snooze
|
|
|
|
# Key includes user_id to prevent cross-user dedup collisions
|
|
key = f"reminder:{settings.user_id}:{reminder.id}:{reminder.remind_at.date()}"
|
|
if key in sent_keys:
|
|
continue
|
|
|
|
payload = build_reminder_notification(
|
|
title=reminder.title,
|
|
remind_at=reminder.remind_at,
|
|
today=today,
|
|
description=reminder.description,
|
|
)
|
|
sent = await send_ntfy_notification(
|
|
settings=settings,
|
|
click_url=app_settings.UMBRA_URL,
|
|
**payload,
|
|
)
|
|
if sent:
|
|
await _mark_sent(db, key, settings.user_id)
|
|
sent_keys.add(key)
|
|
|
|
|
|
async def _dispatch_events(db: AsyncSession, settings: Settings, now: datetime, sent_keys: set[str]) -> None:
|
|
"""Send notifications for calendar events within the configured lead time window."""
|
|
lead_minutes = settings.ntfy_event_lead_minutes
|
|
# Window: events starting between now and (now + lead_minutes)
|
|
window_end = now + timedelta(minutes=lead_minutes)
|
|
|
|
# Scope events through calendar ownership
|
|
user_calendar_ids = select(Calendar.id).where(Calendar.user_id == settings.user_id)
|
|
|
|
result = await db.execute(
|
|
select(CalendarEvent).where(
|
|
and_(
|
|
CalendarEvent.calendar_id.in_(user_calendar_ids),
|
|
CalendarEvent.start_datetime >= now,
|
|
CalendarEvent.start_datetime <= window_end,
|
|
# Exclude recurring parent templates — they duplicate the child instance rows.
|
|
# Parent templates have recurrence_rule set but no parent_event_id.
|
|
~and_(
|
|
CalendarEvent.recurrence_rule != None, # noqa: E711
|
|
CalendarEvent.parent_event_id == None, # noqa: E711
|
|
),
|
|
)
|
|
).options(selectinload(CalendarEvent.location))
|
|
)
|
|
events = result.scalars().all()
|
|
today = now.date()
|
|
|
|
for event in events:
|
|
# Key includes user_id to prevent cross-user dedup collisions
|
|
key = f"event:{settings.user_id}:{event.id}:{event.start_datetime.strftime('%Y-%m-%dT%H:%M')}"
|
|
if key in sent_keys:
|
|
continue
|
|
|
|
payload = build_event_notification(
|
|
title=event.title,
|
|
start_datetime=event.start_datetime,
|
|
all_day=event.all_day,
|
|
today=today,
|
|
location_name=event.location.name if event.location else None,
|
|
description=event.description,
|
|
is_starred=event.is_starred,
|
|
)
|
|
sent = await send_ntfy_notification(
|
|
settings=settings,
|
|
click_url=app_settings.UMBRA_URL,
|
|
**payload,
|
|
)
|
|
if sent:
|
|
await _mark_sent(db, key, settings.user_id)
|
|
sent_keys.add(key)
|
|
|
|
|
|
async def _dispatch_todos(db: AsyncSession, settings: Settings, today, sent_keys: set[str]) -> None:
|
|
"""Send notifications for incomplete todos due within the configured lead days."""
|
|
lead_days = settings.ntfy_todo_lead_days
|
|
cutoff = today + timedelta(days=lead_days)
|
|
|
|
result = await db.execute(
|
|
select(Todo).where(
|
|
and_(
|
|
Todo.user_id == settings.user_id,
|
|
Todo.completed == False, # noqa: E712
|
|
Todo.due_date != None, # noqa: E711
|
|
Todo.due_date <= cutoff,
|
|
)
|
|
)
|
|
)
|
|
todos = result.scalars().all()
|
|
|
|
for todo in todos:
|
|
# Key includes user_id to prevent cross-user dedup collisions
|
|
key = f"todo:{settings.user_id}:{todo.id}:{today}"
|
|
if key in sent_keys:
|
|
continue
|
|
|
|
payload = build_todo_notification(
|
|
title=todo.title,
|
|
due_date=todo.due_date,
|
|
today=today,
|
|
priority=todo.priority,
|
|
category=todo.category,
|
|
)
|
|
sent = await send_ntfy_notification(
|
|
settings=settings,
|
|
click_url=app_settings.UMBRA_URL,
|
|
**payload,
|
|
)
|
|
if sent:
|
|
await _mark_sent(db, key, settings.user_id)
|
|
sent_keys.add(key)
|
|
|
|
|
|
async def _dispatch_projects(db: AsyncSession, settings: Settings, today, sent_keys: set[str]) -> None:
|
|
"""Send notifications for projects with deadlines within the configured lead days."""
|
|
lead_days = settings.ntfy_project_lead_days
|
|
cutoff = today + timedelta(days=lead_days)
|
|
|
|
result = await db.execute(
|
|
select(Project).where(
|
|
and_(
|
|
Project.user_id == settings.user_id,
|
|
Project.due_date != None, # noqa: E711
|
|
Project.due_date <= cutoff,
|
|
Project.status != "completed",
|
|
)
|
|
)
|
|
)
|
|
projects = result.scalars().all()
|
|
|
|
for project in projects:
|
|
# Key includes user_id to prevent cross-user dedup collisions
|
|
key = f"project:{settings.user_id}:{project.id}:{today}"
|
|
if key in sent_keys:
|
|
continue
|
|
|
|
payload = build_project_notification(
|
|
name=project.name,
|
|
due_date=project.due_date,
|
|
today=today,
|
|
status=project.status,
|
|
)
|
|
sent = await send_ntfy_notification(
|
|
settings=settings,
|
|
click_url=app_settings.UMBRA_URL,
|
|
**payload,
|
|
)
|
|
if sent:
|
|
await _mark_sent(db, key, settings.user_id)
|
|
sent_keys.add(key)
|
|
|
|
|
|
async def _dispatch_for_user(db: AsyncSession, settings: Settings, now: datetime) -> None:
|
|
"""Run all notification dispatches for a single user's settings."""
|
|
# Batch-fetch all sent keys once per user instead of one query per entity
|
|
sent_keys = await _get_sent_keys(db, settings.user_id)
|
|
|
|
if settings.ntfy_reminders_enabled:
|
|
await _dispatch_reminders(db, settings, now, sent_keys)
|
|
if settings.ntfy_events_enabled:
|
|
await _dispatch_events(db, settings, now, sent_keys)
|
|
if settings.ntfy_todos_enabled:
|
|
await _dispatch_todos(db, settings, now.date(), sent_keys)
|
|
if settings.ntfy_projects_enabled:
|
|
await _dispatch_projects(db, settings, now.date(), sent_keys)
|
|
|
|
# AW-4: Single commit per user instead of per-notification
|
|
await db.commit()
|
|
|
|
|
|
async def _purge_old_sent_records(db: AsyncSession) -> None:
|
|
"""Remove ntfy_sent entries older than 7 days to keep the table lean."""
|
|
# See DATETIME NOTE at top of file re: naive datetime usage
|
|
cutoff = datetime.now() - timedelta(days=7)
|
|
await db.execute(delete(NtfySent).where(NtfySent.sent_at < cutoff))
|
|
await db.commit()
|
|
|
|
|
|
async def _purge_totp_usage(db: AsyncSession) -> None:
|
|
"""Remove TOTP usage records older than 5 minutes — they serve no purpose beyond replay prevention."""
|
|
cutoff = datetime.now() - timedelta(minutes=5)
|
|
await db.execute(delete(TOTPUsage).where(TOTPUsage.used_at < cutoff))
|
|
await db.commit()
|
|
|
|
|
|
async def _purge_expired_sessions(db: AsyncSession) -> None:
|
|
"""Remove expired UserSession rows to keep the sessions table lean."""
|
|
await db.execute(delete(UserSession).where(UserSession.expires_at < datetime.now()))
|
|
await db.commit()
|
|
|
|
|
|
async def _purge_old_notifications(db: AsyncSession) -> None:
|
|
"""Remove in-app notifications older than 90 days."""
|
|
cutoff = datetime.now() - timedelta(days=90)
|
|
await db.execute(delete(AppNotification).where(AppNotification.created_at < cutoff))
|
|
await db.commit()
|
|
|
|
|
|
async def _purge_resolved_requests(db: AsyncSession) -> None:
|
|
"""Remove resolved connection requests after retention period.
|
|
|
|
Rejected/cancelled: 30 days. Accepted: 90 days (longer for audit trail).
|
|
resolved_at must be set when changing status. NULL resolved_at rows are
|
|
preserved (comparison with NULL yields NULL).
|
|
"""
|
|
reject_cutoff = datetime.now() - timedelta(days=30)
|
|
accept_cutoff = datetime.now() - timedelta(days=90)
|
|
await db.execute(
|
|
delete(ConnectionRequest).where(
|
|
ConnectionRequest.status.in_(["rejected", "cancelled"]),
|
|
ConnectionRequest.resolved_at < reject_cutoff,
|
|
)
|
|
)
|
|
await db.execute(
|
|
delete(ConnectionRequest).where(
|
|
ConnectionRequest.status == "accepted",
|
|
ConnectionRequest.resolved_at < accept_cutoff,
|
|
)
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
|
|
async def _purge_expired_locks(db: AsyncSession) -> None:
|
|
"""Remove non-permanent event locks that have expired."""
|
|
await db.execute(
|
|
delete(EventLock).where(
|
|
EventLock.is_permanent == False, # noqa: E712
|
|
EventLock.expires_at < datetime.now(),
|
|
)
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
|
|
|
async def run_notification_dispatch() -> None:
|
|
"""
|
|
Main dispatch function called by APScheduler every 60 seconds.
|
|
Uses AsyncSessionLocal directly — not the get_db() request-scoped dependency.
|
|
|
|
Iterates over ALL users with ntfy enabled. Per-user errors are caught and
|
|
logged individually so one user's failure does not prevent others from
|
|
receiving notifications.
|
|
"""
|
|
try:
|
|
async with AsyncSessionLocal() as db:
|
|
# Fetch all Settings rows that have ntfy enabled
|
|
result = await db.execute(
|
|
select(Settings).where(Settings.ntfy_enabled == True) # noqa: E712
|
|
)
|
|
all_settings = result.scalars().all()
|
|
|
|
if not all_settings:
|
|
return
|
|
|
|
# See DATETIME NOTE at top of file re: naive datetime usage
|
|
now = datetime.now()
|
|
|
|
for user_settings in all_settings:
|
|
try:
|
|
await _dispatch_for_user(db, user_settings, now)
|
|
except Exception:
|
|
# Isolate per-user failures — log and continue to next user
|
|
logger.exception(
|
|
"ntfy dispatch failed for user_id=%s", user_settings.user_id
|
|
)
|
|
|
|
# Daily housekeeping: purge stale dedup records (shared across all users)
|
|
await _purge_old_sent_records(db)
|
|
|
|
# Security housekeeping runs every cycle regardless of ntfy_enabled
|
|
async with AsyncSessionLocal() as db:
|
|
await _purge_totp_usage(db)
|
|
await _purge_expired_sessions(db)
|
|
await _purge_old_notifications(db)
|
|
await _purge_resolved_requests(db)
|
|
await _purge_expired_locks(db)
|
|
|
|
except Exception:
|
|
# Broad catch: job failure must never crash the scheduler or the app
|
|
logger.exception("ntfy dispatch job encountered an unhandled error")
|