Kyle Pope 3268bfc5d5 Fix SSRF guard to allow private IPs for LAN ntfy servers (W5)
Remove RFC 1918 blocks from _BLOCKED_NETWORKS — only block loopback
and link-local. Self-hosted ntfy servers are typically on the same LAN.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 04:22:48 +08:00

124 lines
4.0 KiB
Python

"""
ntfy push notification service.
Responsible for:
- SSRF validation of user-supplied server URLs
- Building and sending ntfy JSON payloads via httpx
- Never raising — notification failures must not interrupt application flow
"""
import httpx
import socket
import ipaddress
import logging
from typing import Optional
from app.models.settings import Settings
logger = logging.getLogger(__name__)
NTFY_TIMEOUT = 8.0 # seconds — hard cap to prevent hung requests
# Loopback + link-local only. Private IPs (RFC 1918) are intentionally allowed
# because UMBRA is self-hosted and the user's ntfy server is typically on the same LAN.
_BLOCKED_NETWORKS = [
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"),
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("fe80::/10"),
]
def validate_ntfy_host(url: str) -> None:
"""
SSRF guard: resolve the hostname and reject if it points to any private/blocked range.
Raises ValueError on failure. Must be called before any outbound HTTP request.
"""
from urllib.parse import urlparse
hostname = urlparse(url).hostname
if not hostname:
raise ValueError("Invalid ntfy URL: no hostname")
try:
infos = socket.getaddrinfo(hostname, None)
except socket.gaierror as exc:
raise ValueError(f"Cannot resolve ntfy hostname '{hostname}': {exc}") from exc
for info in infos:
ip = ipaddress.ip_address(info[4][0])
for net in _BLOCKED_NETWORKS:
if ip in net:
raise ValueError(
f"ntfy hostname '{hostname}' resolves to blocked IP range ({ip})"
)
def _build_headers(auth_token: Optional[str]) -> dict:
headers = {"Content-Type": "application/json"}
if auth_token:
headers["Authorization"] = f"Bearer {auth_token}"
return headers
async def send_ntfy_notification(
settings: Settings,
title: str,
message: str,
tags: list[str],
priority: int = 3,
click_url: Optional[str] = None,
) -> bool:
"""
Fire-and-forget ntfy notification.
Returns True on success, False on any failure.
Never raises — notification failure must not interrupt application flow.
"""
if not settings.ntfy_enabled:
return False
if not settings.ntfy_server_url or not settings.ntfy_topic:
return False
# Truncate to prevent oversized payloads (security requirement 6.3)
safe_title = (title[:77] + "...") if len(title) > 80 else title
safe_message = (message[:197] + "...") if len(message) > 200 else message
payload: dict = {
"topic": settings.ntfy_topic,
"title": safe_title,
"message": safe_message,
"tags": tags,
"priority": priority,
}
if click_url:
payload["click"] = click_url
payload["actions"] = [
{"action": "view", "label": "Open UMBRA", "url": click_url, "clear": True}
]
try:
# SSRF guard: validate resolved IP before making the request
validate_ntfy_host(settings.ntfy_server_url)
async with httpx.AsyncClient(timeout=NTFY_TIMEOUT, follow_redirects=False) as client:
resp = await client.post(
settings.ntfy_server_url,
json=payload,
headers=_build_headers(settings.ntfy_auth_token),
)
resp.raise_for_status()
return True
except ValueError as e:
# SSRF validation failure
logger.warning("ntfy SSRF validation rejected URL: %s", e)
return False
except httpx.TimeoutException:
logger.warning("ntfy notification timed out (server=%s)", settings.ntfy_server_url)
return False
except httpx.HTTPStatusError as e:
logger.warning(
"ntfy HTTP error %s for topic '%s': %s",
e.response.status_code,
settings.ntfy_topic,
e.response.text[:200],
)
return False
except Exception as e:
logger.warning("ntfy notification failed unexpectedly: %s", type(e).__name__)
return False