Implement Track C: NTFY push notification integration

- Add ntfy columns to Settings model (server_url, topic, auth_token, enabled, per-type toggles, lead times)
- Create NtfySent dedup model to prevent duplicate notifications
- Create ntfy service with SSRF validation and async httpx send
- Create ntfy_templates service with per-type payload builders
- Create APScheduler background dispatch job (60s interval, events/reminders/todos/projects)
- Register scheduler in main.py lifespan with max_instances=1
- Update SettingsUpdate with ntfy validators (URL scheme, topic regex, lead time ranges)
- Update SettingsResponse with ntfy fields; ntfy_has_token computed, token never exposed
- Add POST /api/settings/ntfy/test endpoint
- Update GET/PUT settings to use explicit _to_settings_response() helper
- Add Alembic migration 022 for ntfy settings columns + ntfy_sent table
- Add httpx==0.27.2 and apscheduler==3.10.4 to requirements.txt

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Kyle 2026-02-25 04:04:23 +08:00
parent 7f0ae0b6ef
commit 67456c78dd
10 changed files with 825 additions and 5 deletions

View File

@ -0,0 +1,88 @@
"""Add ntfy settings columns and ntfy_sent deduplication table
Revision ID: 022
Revises: 021
Create Date: 2026-02-25
"""
from alembic import op
import sqlalchemy as sa
revision = '022'
down_revision = '021'
branch_labels = None
depends_on = None
def upgrade() -> None:
# ── New columns on settings ──────────────────────────────────────────────
op.add_column('settings', sa.Column('ntfy_server_url', sa.String(500), nullable=True))
op.add_column('settings', sa.Column('ntfy_topic', sa.String(255), nullable=True))
op.add_column('settings', sa.Column('ntfy_auth_token', sa.String(500), nullable=True))
op.add_column('settings', sa.Column(
'ntfy_enabled', sa.Boolean(), nullable=False,
server_default=sa.text('false')
))
op.add_column('settings', sa.Column(
'ntfy_events_enabled', sa.Boolean(), nullable=False,
server_default=sa.text('true')
))
op.add_column('settings', sa.Column(
'ntfy_reminders_enabled', sa.Boolean(), nullable=False,
server_default=sa.text('true')
))
op.add_column('settings', sa.Column(
'ntfy_todos_enabled', sa.Boolean(), nullable=False,
server_default=sa.text('true')
))
op.add_column('settings', sa.Column(
'ntfy_projects_enabled', sa.Boolean(), nullable=False,
server_default=sa.text('true')
))
op.add_column('settings', sa.Column(
'ntfy_event_lead_minutes', sa.Integer(), nullable=False,
server_default=sa.text('15')
))
op.add_column('settings', sa.Column(
'ntfy_todo_lead_days', sa.Integer(), nullable=False,
server_default=sa.text('1')
))
op.add_column('settings', sa.Column(
'ntfy_project_lead_days', sa.Integer(), nullable=False,
server_default=sa.text('2')
))
# ── New ntfy_sent deduplication table ────────────────────────────────────
op.create_table(
'ntfy_sent',
sa.Column('id', sa.Integer(), primary_key=True, autoincrement=True),
sa.Column('notification_key', sa.String(255), nullable=False),
sa.Column(
'sent_at', sa.DateTime(), nullable=False,
server_default=sa.text('now()')
),
)
op.create_index(
'ix_ntfy_sent_notification_key',
'ntfy_sent',
['notification_key'],
unique=True,
)
def downgrade() -> None:
# ── Drop ntfy_sent table ─────────────────────────────────────────────────
op.drop_index('ix_ntfy_sent_notification_key', table_name='ntfy_sent')
op.drop_table('ntfy_sent')
# ── Remove settings columns ──────────────────────────────────────────────
op.drop_column('settings', 'ntfy_project_lead_days')
op.drop_column('settings', 'ntfy_todo_lead_days')
op.drop_column('settings', 'ntfy_event_lead_minutes')
op.drop_column('settings', 'ntfy_projects_enabled')
op.drop_column('settings', 'ntfy_todos_enabled')
op.drop_column('settings', 'ntfy_reminders_enabled')
op.drop_column('settings', 'ntfy_events_enabled')
op.drop_column('settings', 'ntfy_enabled')
op.drop_column('settings', 'ntfy_auth_token')
op.drop_column('settings', 'ntfy_topic')
op.drop_column('settings', 'ntfy_server_url')

View File

@ -0,0 +1,247 @@
"""
Background notification dispatch job.
Runs every 60 seconds via APScheduler (registered in main.py lifespan).
DATETIME NOTE: All comparisons use datetime.now() without timezone info.
The DB uses TIMESTAMP WITHOUT TIME ZONE (naive datetimes). The Docker container
runs UTC. datetime.now() inside the container returns UTC, which matches the
naive datetimes stored in the DB. Do NOT use datetime.now(timezone.utc) here
that would produce a timezone-aware object that cannot be compared with naive DB values.
"""
import logging
from datetime import datetime, timedelta
from sqlalchemy import select, delete, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import AsyncSessionLocal
from app.models.settings import Settings
from app.models.reminder import Reminder
from app.models.calendar_event import CalendarEvent
from app.models.todo import Todo
from app.models.project import Project
from app.models.ntfy_sent import NtfySent
from app.services.ntfy import send_ntfy_notification
from app.services.ntfy_templates import (
build_event_notification,
build_reminder_notification,
build_todo_notification,
build_project_notification,
)
logger = logging.getLogger(__name__)
UMBRA_URL = "http://10.0.69.35"
# ── Dedup helpers ─────────────────────────────────────────────────────────────
async def _already_sent(db: AsyncSession, key: str) -> bool:
result = await db.execute(
select(NtfySent).where(NtfySent.notification_key == key)
)
return result.scalar_one_or_none() is not None
async def _mark_sent(db: AsyncSession, key: str) -> None:
db.add(NtfySent(notification_key=key))
await db.commit()
# ── Dispatch functions ────────────────────────────────────────────────────────
async def _dispatch_reminders(db: AsyncSession, settings: Settings, now: datetime) -> None:
"""Send notifications for reminders that are currently due and not dismissed/snoozed."""
# Mirror the filter from /api/reminders/due
result = await db.execute(
select(Reminder).where(
and_(
Reminder.remind_at <= now,
Reminder.is_dismissed == False, # noqa: E712
Reminder.is_active == True, # noqa: E712
)
)
)
reminders = result.scalars().all()
today = now.date()
for reminder in reminders:
if reminder.snoozed_until and reminder.snoozed_until > now:
continue # respect snooze
# Key ties notification to the specific day to handle re-fires after midnight
key = f"reminder:{reminder.id}:{reminder.remind_at.date()}"
if await _already_sent(db, key):
continue
payload = build_reminder_notification(
title=reminder.title,
remind_at=reminder.remind_at,
today=today,
description=reminder.description,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key)
async def _dispatch_events(db: AsyncSession, settings: Settings, now: datetime) -> None:
"""Send notifications for calendar events within the configured lead time window."""
lead_minutes = settings.ntfy_event_lead_minutes
# Window: events starting between now and (now + lead_minutes)
window_end = now + timedelta(minutes=lead_minutes)
result = await db.execute(
select(CalendarEvent).where(
and_(
CalendarEvent.start_datetime >= now,
CalendarEvent.start_datetime <= window_end,
)
)
)
events = result.scalars().all()
today = now.date()
for event in events:
# Key includes the minute-precision start to avoid re-firing during the window
key = f"event:{event.id}:{event.start_datetime.strftime('%Y-%m-%dT%H:%M')}"
if await _already_sent(db, key):
continue
payload = build_event_notification(
title=event.title,
start_datetime=event.start_datetime,
all_day=event.all_day,
today=today,
description=event.description,
is_starred=event.is_starred,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key)
async def _dispatch_todos(db: AsyncSession, settings: Settings, today) -> None:
"""Send notifications for incomplete todos due within the configured lead days."""
from datetime import date as date_type
lead_days = settings.ntfy_todo_lead_days
cutoff = today + timedelta(days=lead_days)
result = await db.execute(
select(Todo).where(
and_(
Todo.completed == False, # noqa: E712
Todo.due_date != None, # noqa: E711
Todo.due_date <= cutoff,
)
)
)
todos = result.scalars().all()
for todo in todos:
key = f"todo:{todo.id}:{today}"
if await _already_sent(db, key):
continue
payload = build_todo_notification(
title=todo.title,
due_date=todo.due_date,
today=today,
priority=todo.priority,
category=todo.category,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key)
async def _dispatch_projects(db: AsyncSession, settings: Settings, today) -> None:
"""Send notifications for projects with deadlines within the configured lead days."""
lead_days = settings.ntfy_project_lead_days
cutoff = today + timedelta(days=lead_days)
result = await db.execute(
select(Project).where(
and_(
Project.due_date != None, # noqa: E711
Project.due_date <= cutoff,
Project.status != "completed",
)
)
)
projects = result.scalars().all()
for project in projects:
key = f"project:{project.id}:{today}"
if await _already_sent(db, key):
continue
payload = build_project_notification(
name=project.name,
due_date=project.due_date,
today=today,
status=project.status,
)
sent = await send_ntfy_notification(
settings=settings,
click_url=UMBRA_URL,
**payload,
)
if sent:
await _mark_sent(db, key)
async def _purge_old_sent_records(db: AsyncSession) -> None:
"""Remove ntfy_sent entries older than 7 days to keep the table lean."""
# See DATETIME NOTE at top of file re: naive datetime usage
cutoff = datetime.now() - timedelta(days=7)
await db.execute(delete(NtfySent).where(NtfySent.sent_at < cutoff))
await db.commit()
# ── Entry point ───────────────────────────────────────────────────────────────
async def run_notification_dispatch() -> None:
"""
Main dispatch function called by APScheduler every 60 seconds.
Uses AsyncSessionLocal directly not the get_db() request-scoped dependency.
"""
try:
async with AsyncSessionLocal() as db:
result = await db.execute(select(Settings))
settings = result.scalar_one_or_none()
if not settings or not settings.ntfy_enabled:
return
# See DATETIME NOTE at top of file re: naive datetime usage
now = datetime.now()
today = now.date()
if settings.ntfy_reminders_enabled:
await _dispatch_reminders(db, settings, now)
if settings.ntfy_events_enabled:
await _dispatch_events(db, settings, now)
if settings.ntfy_todos_enabled:
await _dispatch_todos(db, settings, today)
if settings.ntfy_projects_enabled:
await _dispatch_projects(db, settings, today)
# Daily housekeeping: purge stale dedup records
await _purge_old_sent_records(db)
except Exception:
# Broad catch: job failure must never crash the scheduler or the app
logger.exception("ntfy dispatch job encountered an unhandled error")

View File

@ -2,13 +2,26 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.database import engine
from app.routers import auth, todos, events, calendars, reminders, projects, people, locations, settings as settings_router, dashboard, weather, event_templates
from app.jobs.notifications import run_notification_dispatch
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = AsyncIOScheduler()
scheduler.add_job(
run_notification_dispatch,
"interval",
minutes=1,
id="ntfy_dispatch",
max_instances=1, # prevent overlap if a run takes longer than 60s
)
scheduler.start()
yield
scheduler.shutdown(wait=False)
await engine.dispose()

View File

@ -0,0 +1,24 @@
from sqlalchemy import String, func
from sqlalchemy.orm import Mapped, mapped_column
from datetime import datetime
from app.database import Base
class NtfySent(Base):
"""
Deduplication table for ntfy notifications.
Prevents the background job from re-sending the same notification
within a given time window.
Key format: "{type}:{entity_id}:{date_window}"
Examples:
"reminder:42:2026-02-25"
"event:17:2026-02-25T09:00"
"todo:8:2026-02-25"
"project:3:2026-02-25"
"""
__tablename__ = "ntfy_sent"
id: Mapped[int] = mapped_column(primary_key=True)
notification_key: Mapped[str] = mapped_column(String(255), unique=True, index=True)
sent_at: Mapped[datetime] = mapped_column(default=func.now(), server_default=func.now())

View File

@ -1,6 +1,7 @@
from sqlalchemy import String, Integer, Float, func
from sqlalchemy import String, Integer, Float, Boolean, func
from sqlalchemy.orm import Mapped, mapped_column
from datetime import datetime
from typing import Optional
from app.database import Base
@ -16,5 +17,23 @@ class Settings(Base):
weather_lat: Mapped[float | None] = mapped_column(Float, nullable=True, default=None)
weather_lon: Mapped[float | None] = mapped_column(Float, nullable=True, default=None)
first_day_of_week: Mapped[int] = mapped_column(Integer, default=0) # 0=Sunday, 1=Monday
# ntfy push notification configuration
ntfy_server_url: Mapped[Optional[str]] = mapped_column(String(500), nullable=True, default=None)
ntfy_topic: Mapped[Optional[str]] = mapped_column(String(255), nullable=True, default=None)
ntfy_auth_token: Mapped[Optional[str]] = mapped_column(String(500), nullable=True, default=None)
ntfy_enabled: Mapped[bool] = mapped_column(Boolean, default=False, server_default="false")
# Per-type notification toggles (default on so they work immediately once master is enabled)
ntfy_events_enabled: Mapped[bool] = mapped_column(Boolean, default=True, server_default="true")
ntfy_reminders_enabled: Mapped[bool] = mapped_column(Boolean, default=True, server_default="true")
ntfy_todos_enabled: Mapped[bool] = mapped_column(Boolean, default=True, server_default="true")
ntfy_projects_enabled: Mapped[bool] = mapped_column(Boolean, default=True, server_default="true")
# Lead time controls
ntfy_event_lead_minutes: Mapped[int] = mapped_column(Integer, default=15, server_default="15")
ntfy_todo_lead_days: Mapped[int] = mapped_column(Integer, default=1, server_default="1")
ntfy_project_lead_days: Mapped[int] = mapped_column(Integer, default=2, server_default="2")
created_at: Mapped[datetime] = mapped_column(default=func.now())
updated_at: Mapped[datetime] = mapped_column(default=func.now(), onupdate=func.now())

View File

@ -10,13 +10,43 @@ from app.routers.auth import get_current_session, hash_pin, verify_pin
router = APIRouter()
def _to_settings_response(s: Settings) -> SettingsResponse:
"""
Explicitly construct SettingsResponse, computing ntfy_has_token
from the stored token without ever exposing the token value.
"""
return SettingsResponse(
id=s.id,
accent_color=s.accent_color,
upcoming_days=s.upcoming_days,
preferred_name=s.preferred_name,
weather_city=s.weather_city,
weather_lat=s.weather_lat,
weather_lon=s.weather_lon,
first_day_of_week=s.first_day_of_week,
ntfy_server_url=s.ntfy_server_url,
ntfy_topic=s.ntfy_topic,
ntfy_enabled=s.ntfy_enabled,
ntfy_events_enabled=s.ntfy_events_enabled,
ntfy_reminders_enabled=s.ntfy_reminders_enabled,
ntfy_todos_enabled=s.ntfy_todos_enabled,
ntfy_projects_enabled=s.ntfy_projects_enabled,
ntfy_event_lead_minutes=s.ntfy_event_lead_minutes,
ntfy_todo_lead_days=s.ntfy_todo_lead_days,
ntfy_project_lead_days=s.ntfy_project_lead_days,
ntfy_has_token=bool(s.ntfy_auth_token), # derived — never expose the token value
created_at=s.created_at,
updated_at=s.updated_at,
)
@router.get("/", response_model=SettingsResponse)
async def get_settings(
db: AsyncSession = Depends(get_db),
current_user: Settings = Depends(get_current_session)
):
"""Get current settings (excluding PIN hash)."""
return current_user
"""Get current settings (excluding PIN hash and ntfy auth token)."""
return _to_settings_response(current_user)
@router.put("/", response_model=SettingsResponse)
@ -25,7 +55,7 @@ async def update_settings(
db: AsyncSession = Depends(get_db),
current_user: Settings = Depends(get_current_session)
):
"""Update settings (accent color, upcoming days)."""
"""Update settings."""
update_data = settings_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
@ -34,7 +64,7 @@ async def update_settings(
await db.commit()
await db.refresh(current_user)
return current_user
return _to_settings_response(current_user)
@router.put("/pin")
@ -52,3 +82,53 @@ async def change_pin(
await db.commit()
return {"message": "PIN changed successfully"}
@router.post("/ntfy/test")
async def test_ntfy(
db: AsyncSession = Depends(get_db),
current_user: Settings = Depends(get_current_session)
):
"""
Send a test ntfy notification to verify the user's configuration.
Requires ntfy_server_url and ntfy_topic to be set.
Note: ntfy_enabled does not need to be True to run the test the service
call bypasses that check because we pass settings directly.
"""
if not current_user.ntfy_server_url or not current_user.ntfy_topic:
raise HTTPException(
status_code=400,
detail="ntfy server URL and topic must be configured before sending a test"
)
# SSRF-validate the URL before attempting the outbound request
from app.services.ntfy import validate_ntfy_host, send_ntfy_notification
try:
validate_ntfy_host(current_user.ntfy_server_url)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Temporarily treat ntfy as enabled for the test send even if master switch is off
class _TestSettings:
"""Thin wrapper that forces ntfy_enabled=True for the test call."""
ntfy_enabled = True
ntfy_server_url = current_user.ntfy_server_url
ntfy_topic = current_user.ntfy_topic
ntfy_auth_token = current_user.ntfy_auth_token
success = await send_ntfy_notification(
settings=_TestSettings(), # type: ignore[arg-type]
title="UMBRA Test Notification",
message="If you see this, your ntfy integration is working correctly.",
tags=["white_check_mark"],
priority=3,
click_url="http://10.0.69.35",
)
if not success:
raise HTTPException(
status_code=502,
detail="Failed to deliver test notification — check server URL, topic, and auth token"
)
return {"message": "Test notification sent successfully"}

View File

@ -1,9 +1,12 @@
import re
from pydantic import BaseModel, ConfigDict, field_validator
from datetime import datetime
from typing import Literal, Optional
AccentColor = Literal["cyan", "blue", "green", "purple", "red", "orange", "pink", "yellow"]
_NTFY_TOPIC_RE = re.compile(r'^[a-zA-Z0-9_-]{1,64}$')
def _validate_pin_length(v: str, label: str = "PIN") -> str:
if len(v) < 4:
@ -31,6 +34,20 @@ class SettingsUpdate(BaseModel):
weather_lon: float | None = None
first_day_of_week: int | None = None
# ntfy configuration fields
ntfy_server_url: Optional[str] = None
ntfy_topic: Optional[str] = None
# Empty string means "clear the token"; None means "leave unchanged"
ntfy_auth_token: Optional[str] = None
ntfy_enabled: Optional[bool] = None
ntfy_events_enabled: Optional[bool] = None
ntfy_reminders_enabled: Optional[bool] = None
ntfy_todos_enabled: Optional[bool] = None
ntfy_projects_enabled: Optional[bool] = None
ntfy_event_lead_minutes: Optional[int] = None
ntfy_todo_lead_days: Optional[int] = None
ntfy_project_lead_days: Optional[int] = None
@field_validator('first_day_of_week')
@classmethod
def validate_first_day(cls, v: int | None) -> int | None:
@ -52,6 +69,62 @@ class SettingsUpdate(BaseModel):
raise ValueError('Longitude must be between -180 and 180')
return v
@field_validator('ntfy_server_url')
@classmethod
def validate_ntfy_url(cls, v: Optional[str]) -> Optional[str]:
if v is None or v == "":
return None
from urllib.parse import urlparse
parsed = urlparse(v)
if parsed.scheme not in ("http", "https"):
raise ValueError("ntfy server URL must use http or https")
if not parsed.netloc:
raise ValueError("ntfy server URL must include a hostname")
# Strip trailing slash — ntfy base URL must not have one
return v.rstrip("/")
@field_validator('ntfy_topic')
@classmethod
def validate_ntfy_topic(cls, v: Optional[str]) -> Optional[str]:
if v is None or v == "":
return None
if not _NTFY_TOPIC_RE.match(v):
raise ValueError(
"ntfy topic must be 1-64 alphanumeric characters, hyphens, or underscores"
)
return v
@field_validator('ntfy_auth_token')
@classmethod
def validate_ntfy_token(cls, v: Optional[str]) -> Optional[str]:
# Empty string signals "clear the token" — normalise to None for storage
if v == "":
return None
if v is not None and len(v) > 500:
raise ValueError("ntfy auth token must be at most 500 characters")
return v
@field_validator('ntfy_event_lead_minutes')
@classmethod
def validate_event_lead(cls, v: Optional[int]) -> Optional[int]:
if v is not None and not (1 <= v <= 1440):
raise ValueError("ntfy_event_lead_minutes must be between 1 and 1440")
return v
@field_validator('ntfy_todo_lead_days')
@classmethod
def validate_todo_lead(cls, v: Optional[int]) -> Optional[int]:
if v is not None and not (0 <= v <= 30):
raise ValueError("ntfy_todo_lead_days must be between 0 and 30")
return v
@field_validator('ntfy_project_lead_days')
@classmethod
def validate_project_lead(cls, v: Optional[int]) -> Optional[int]:
if v is not None and not (0 <= v <= 30):
raise ValueError("ntfy_project_lead_days must be between 0 and 30")
return v
class SettingsResponse(BaseModel):
id: int
@ -62,6 +135,21 @@ class SettingsResponse(BaseModel):
weather_lat: float | None = None
weather_lon: float | None = None
first_day_of_week: int = 0
# ntfy fields — ntfy_auth_token is NEVER included here (security requirement 6.2)
ntfy_server_url: Optional[str] = None
ntfy_topic: Optional[str] = None
ntfy_enabled: bool = False
ntfy_events_enabled: bool = True
ntfy_reminders_enabled: bool = True
ntfy_todos_enabled: bool = True
ntfy_projects_enabled: bool = True
ntfy_event_lead_minutes: int = 15
ntfy_todo_lead_days: int = 1
ntfy_project_lead_days: int = 2
# Derived field: True if a token is stored, never exposes the value itself
ntfy_has_token: bool = False
created_at: datetime
updated_at: datetime

View File

@ -0,0 +1,125 @@
"""
ntfy push notification service.
Responsible for:
- SSRF validation of user-supplied server URLs
- Building and sending ntfy JSON payloads via httpx
- Never raising notification failures must not interrupt application flow
"""
import httpx
import socket
import ipaddress
import logging
from typing import Optional
from app.models.settings import Settings
logger = logging.getLogger(__name__)
NTFY_TIMEOUT = 8.0 # seconds — hard cap to prevent hung requests
# RFC 1918 + loopback + link-local ranges that must never be contacted
_BLOCKED_NETWORKS = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"),
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("fc00::/7"),
]
def validate_ntfy_host(url: str) -> None:
"""
SSRF guard: resolve the hostname and reject if it points to any private/blocked range.
Raises ValueError on failure. Must be called before any outbound HTTP request.
"""
from urllib.parse import urlparse
hostname = urlparse(url).hostname
if not hostname:
raise ValueError("Invalid ntfy URL: no hostname")
try:
infos = socket.getaddrinfo(hostname, None)
except socket.gaierror as exc:
raise ValueError(f"Cannot resolve ntfy hostname '{hostname}': {exc}") from exc
for info in infos:
ip = ipaddress.ip_address(info[4][0])
for net in _BLOCKED_NETWORKS:
if ip in net:
raise ValueError(
f"ntfy hostname '{hostname}' resolves to blocked IP range ({ip})"
)
def _build_headers(auth_token: Optional[str]) -> dict:
headers = {"Content-Type": "application/json"}
if auth_token:
headers["Authorization"] = f"Bearer {auth_token}"
return headers
async def send_ntfy_notification(
settings: Settings,
title: str,
message: str,
tags: list[str],
priority: int = 3,
click_url: Optional[str] = None,
) -> bool:
"""
Fire-and-forget ntfy notification.
Returns True on success, False on any failure.
Never raises notification failure must not interrupt application flow.
"""
if not settings.ntfy_enabled:
return False
if not settings.ntfy_server_url or not settings.ntfy_topic:
return False
# Truncate to prevent oversized payloads (security requirement 6.3)
safe_title = (title[:77] + "...") if len(title) > 80 else title
safe_message = (message[:197] + "...") if len(message) > 200 else message
payload: dict = {
"topic": settings.ntfy_topic,
"title": safe_title,
"message": safe_message,
"tags": tags,
"priority": priority,
}
if click_url:
payload["click"] = click_url
payload["actions"] = [
{"action": "view", "label": "Open UMBRA", "url": click_url, "clear": True}
]
try:
# SSRF guard: validate resolved IP before making the request
validate_ntfy_host(settings.ntfy_server_url)
async with httpx.AsyncClient(timeout=NTFY_TIMEOUT, follow_redirects=False) as client:
resp = await client.post(
settings.ntfy_server_url,
json=payload,
headers=_build_headers(settings.ntfy_auth_token),
)
resp.raise_for_status()
return True
except ValueError as e:
# SSRF validation failure
logger.warning("ntfy SSRF validation rejected URL: %s", e)
return False
except httpx.TimeoutException:
logger.warning("ntfy notification timed out (server=%s)", settings.ntfy_server_url)
return False
except httpx.HTTPStatusError as e:
logger.warning(
"ntfy HTTP error %s for topic '%s': %s",
e.response.status_code,
settings.ntfy_topic,
e.response.text[:200],
)
return False
except Exception as e:
logger.warning("ntfy notification failed unexpectedly: %s", type(e).__name__)
return False

View File

@ -0,0 +1,134 @@
"""
Notification template builders for ntfy push notifications.
Each build_* function returns a dict with keys: title, message, tags, priority.
These are passed directly to send_ntfy_notification().
"""
from datetime import datetime, date
from typing import Optional
# ── Shared helpers ────────────────────────────────────────────────────────────
def urgency_label(target_date: date, today: date) -> str:
"""Human-readable urgency string relative to today."""
delta = (target_date - today).days
if delta < 0:
return f"OVERDUE ({abs(delta)}d ago)"
elif delta == 0:
return "Today"
elif delta == 1:
return "Tomorrow"
elif delta <= 7:
return f"in {delta} days"
else:
return target_date.strftime("%d %b")
def day_str(dt: datetime, today: date) -> str:
"""Return 'Today', 'Tomorrow', or a short date string."""
d = dt.date()
if d == today:
return "Today"
delta = (d - today).days
if delta == 1:
return "Tomorrow"
return dt.strftime("%a %d %b")
def time_str(dt: datetime, all_day: bool = False) -> str:
"""Return 'All day' or HH:MM."""
if all_day:
return "All day"
return dt.strftime("%H:%M")
def _truncate(text: str, max_len: int) -> str:
"""Truncate with ellipsis if over limit."""
return (text[:max_len - 3] + "...") if len(text) > max_len else text
# ── Template builders ─────────────────────────────────────────────────────────
def build_event_notification(
title: str,
start_datetime: datetime,
all_day: bool,
today: date,
location_name: Optional[str] = None,
description: Optional[str] = None,
is_starred: bool = False,
) -> dict:
"""Build notification payload for a calendar event reminder."""
day = day_str(start_datetime, today)
time = time_str(start_datetime, all_day)
loc = f" @ {location_name}" if location_name else ""
desc = f"{description[:80]}" if description else ""
return {
"title": _truncate(f"Calendar: {title}", 80),
"message": _truncate(f"{day} at {time}{loc}{desc}", 200),
"tags": ["calendar"],
"priority": 4 if is_starred else 3,
}
def build_reminder_notification(
title: str,
remind_at: datetime,
today: date,
description: Optional[str] = None,
) -> dict:
"""Build notification payload for a general reminder."""
day = day_str(remind_at, today)
time = time_str(remind_at)
desc = f"{description[:80]}" if description else ""
return {
"title": _truncate(f"Reminder: {title}", 80),
"message": _truncate(f"{day} at {time}{desc}", 200),
"tags": ["bell"],
"priority": 3,
}
def build_todo_notification(
title: str,
due_date: date,
today: date,
priority: str = "medium",
category: Optional[str] = None,
) -> dict:
"""Build notification payload for a todo due date alert."""
urgency = urgency_label(due_date, today)
priority_label = priority.capitalize()
cat = f" [{category}]" if category else ""
# High priority for today/overdue, default otherwise
ntfy_priority = 4 if (due_date - today).days <= 0 else 3
return {
"title": _truncate(f"Due {urgency}: {title}", 80),
"message": _truncate(f"Priority: {priority_label}{cat}", 200),
"tags": ["white_check_mark"],
"priority": ntfy_priority,
}
def build_project_notification(
name: str,
due_date: date,
today: date,
status: str = "in_progress",
) -> dict:
"""Build notification payload for a project deadline alert."""
urgency = urgency_label(due_date, today)
# Format status label
status_label = status.replace("_", " ").title()
ntfy_priority = 4 if due_date <= today else 3
return {
"title": _truncate(f"Project Deadline: {name}", 80),
"message": _truncate(f"Due {urgency} — Status: {status_label}", 200),
"tags": ["briefcase"],
"priority": ntfy_priority,
}

View File

@ -9,3 +9,5 @@ bcrypt==4.2.1
python-multipart==0.0.20
python-dateutil==2.9.0
itsdangerous==2.2.0
httpx==0.27.2
apscheduler==3.10.4