Kyle Pope 92efeba2ec Fix QA review findings: update docs and comments
- W-01: Update README.md security section to reflect removed in-memory
  rate limiter and add /setup to nginx rate-limited endpoint list
- W-02: Replace misleading ALLOW_LAN_NTFY reference with actionable
  guidance to edit _BLOCKED_NETWORKS directly
- S-04: Add comment explaining burst=3 on /api/auth/setup vs burst=5

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 02:42:59 +08:00

129 lines
4.5 KiB
Python

"""
ntfy push notification service.
Responsible for:
- SSRF validation of user-supplied server URLs
- Building and sending ntfy JSON payloads via httpx
- Never raising — notification failures must not interrupt application flow
"""
import httpx
import socket
import ipaddress
import logging
from typing import Optional
from app.models.settings import Settings
logger = logging.getLogger(__name__)
NTFY_TIMEOUT = 8.0 # seconds — hard cap to prevent hung requests
# Loopback, link-local, and all RFC 1918 private ranges are blocked to prevent
# SSRF against Docker-internal services. If a self-hosted ntfy server on the LAN
# is required, remove the RFC 1918 entries from _BLOCKED_NETWORKS and document the accepted risk.
_BLOCKED_NETWORKS = [
ipaddress.ip_network("127.0.0.0/8"), # IPv4 loopback
ipaddress.ip_network("10.0.0.0/8"), # RFC 1918 private
ipaddress.ip_network("172.16.0.0/12"), # RFC 1918 private — covers Docker bridge 172.17-31.x
ipaddress.ip_network("192.168.0.0/16"), # RFC 1918 private
ipaddress.ip_network("169.254.0.0/16"), # IPv4 link-local
ipaddress.ip_network("::1/128"), # IPv6 loopback
ipaddress.ip_network("fe80::/10"), # IPv6 link-local
ipaddress.ip_network("fc00::/7"), # IPv6 ULA (covers fd00::/8)
]
def validate_ntfy_host(url: str) -> None:
"""
SSRF guard: resolve the hostname and reject if it points to any private/blocked range.
Raises ValueError on failure. Must be called before any outbound HTTP request.
"""
from urllib.parse import urlparse
hostname = urlparse(url).hostname
if not hostname:
raise ValueError("Invalid ntfy URL: no hostname")
try:
infos = socket.getaddrinfo(hostname, None)
except socket.gaierror as exc:
raise ValueError(f"Cannot resolve ntfy hostname '{hostname}': {exc}") from exc
for info in infos:
ip = ipaddress.ip_address(info[4][0])
for net in _BLOCKED_NETWORKS:
if ip in net:
raise ValueError(
f"ntfy hostname '{hostname}' resolves to blocked IP range ({ip})"
)
def _build_headers(auth_token: Optional[str]) -> dict:
headers = {"Content-Type": "application/json"}
if auth_token:
headers["Authorization"] = f"Bearer {auth_token}"
return headers
async def send_ntfy_notification(
settings: Settings,
title: str,
message: str,
tags: list[str],
priority: int = 3,
click_url: Optional[str] = None,
) -> bool:
"""
Fire-and-forget ntfy notification.
Returns True on success, False on any failure.
Never raises — notification failure must not interrupt application flow.
"""
if not settings.ntfy_enabled:
return False
if not settings.ntfy_server_url or not settings.ntfy_topic:
return False
# Truncate to prevent oversized payloads (security requirement 6.3)
safe_title = (title[:77] + "...") if len(title) > 80 else title
safe_message = (message[:197] + "...") if len(message) > 200 else message
payload: dict = {
"topic": settings.ntfy_topic,
"title": safe_title,
"message": safe_message,
"tags": tags,
"priority": priority,
}
if click_url:
payload["click"] = click_url
payload["actions"] = [
{"action": "view", "label": "Open UMBRA", "url": click_url, "clear": True}
]
try:
# SSRF guard: validate resolved IP before making the request
validate_ntfy_host(settings.ntfy_server_url)
async with httpx.AsyncClient(timeout=NTFY_TIMEOUT, follow_redirects=False) as client:
resp = await client.post(
settings.ntfy_server_url,
json=payload,
headers=_build_headers(settings.ntfy_auth_token),
)
resp.raise_for_status()
return True
except ValueError as e:
# SSRF validation failure
logger.warning("ntfy SSRF validation rejected URL: %s", e)
return False
except httpx.TimeoutException:
logger.warning("ntfy notification timed out (server=%s)", settings.ntfy_server_url)
return False
except httpx.HTTPStatusError as e:
logger.warning(
"ntfy HTTP error %s for topic '%s': %s",
e.response.status_code,
settings.ntfy_topic,
e.response.text[:200],
)
return False
except Exception as e:
logger.warning("ntfy notification failed unexpectedly: %s", type(e).__name__)
return False