""" ntfy push notification service. Responsible for: - SSRF validation of user-supplied server URLs - Building and sending ntfy JSON payloads via httpx - Never raising — notification failures must not interrupt application flow """ import httpx import socket import ipaddress import logging from typing import Optional from app.models.settings import Settings logger = logging.getLogger(__name__) NTFY_TIMEOUT = 8.0 # seconds — hard cap to prevent hung requests # RFC 1918 + loopback + link-local ranges that must never be contacted _BLOCKED_NETWORKS = [ ipaddress.ip_network("10.0.0.0/8"), ipaddress.ip_network("172.16.0.0/12"), ipaddress.ip_network("192.168.0.0/16"), ipaddress.ip_network("127.0.0.0/8"), ipaddress.ip_network("169.254.0.0/16"), ipaddress.ip_network("::1/128"), ipaddress.ip_network("fc00::/7"), ] def validate_ntfy_host(url: str) -> None: """ SSRF guard: resolve the hostname and reject if it points to any private/blocked range. Raises ValueError on failure. Must be called before any outbound HTTP request. """ from urllib.parse import urlparse hostname = urlparse(url).hostname if not hostname: raise ValueError("Invalid ntfy URL: no hostname") try: infos = socket.getaddrinfo(hostname, None) except socket.gaierror as exc: raise ValueError(f"Cannot resolve ntfy hostname '{hostname}': {exc}") from exc for info in infos: ip = ipaddress.ip_address(info[4][0]) for net in _BLOCKED_NETWORKS: if ip in net: raise ValueError( f"ntfy hostname '{hostname}' resolves to blocked IP range ({ip})" ) def _build_headers(auth_token: Optional[str]) -> dict: headers = {"Content-Type": "application/json"} if auth_token: headers["Authorization"] = f"Bearer {auth_token}" return headers async def send_ntfy_notification( settings: Settings, title: str, message: str, tags: list[str], priority: int = 3, click_url: Optional[str] = None, ) -> bool: """ Fire-and-forget ntfy notification. Returns True on success, False on any failure. Never raises — notification failure must not interrupt application flow. """ if not settings.ntfy_enabled: return False if not settings.ntfy_server_url or not settings.ntfy_topic: return False # Truncate to prevent oversized payloads (security requirement 6.3) safe_title = (title[:77] + "...") if len(title) > 80 else title safe_message = (message[:197] + "...") if len(message) > 200 else message payload: dict = { "topic": settings.ntfy_topic, "title": safe_title, "message": safe_message, "tags": tags, "priority": priority, } if click_url: payload["click"] = click_url payload["actions"] = [ {"action": "view", "label": "Open UMBRA", "url": click_url, "clear": True} ] try: # SSRF guard: validate resolved IP before making the request validate_ntfy_host(settings.ntfy_server_url) async with httpx.AsyncClient(timeout=NTFY_TIMEOUT, follow_redirects=False) as client: resp = await client.post( settings.ntfy_server_url, json=payload, headers=_build_headers(settings.ntfy_auth_token), ) resp.raise_for_status() return True except ValueError as e: # SSRF validation failure logger.warning("ntfy SSRF validation rejected URL: %s", e) return False except httpx.TimeoutException: logger.warning("ntfy notification timed out (server=%s)", settings.ntfy_server_url) return False except httpx.HTTPStatusError as e: logger.warning( "ntfy HTTP error %s for topic '%s': %s", e.response.status_code, settings.ntfy_topic, e.response.text[:200], ) return False except Exception as e: logger.warning("ntfy notification failed unexpectedly: %s", type(e).__name__) return False