Backend pentest remediation (PT-03/05/06/07)

PT-03: Make UMBRA_URL configurable via env var (default http://localhost).
Replaces hardcoded http://10.0.69.35 in notification dispatch job and
ntfy test endpoint. Add UMBRA_URL to .env.example.

PT-05: Add explicit path="/" to session cookie for clarity.

PT-06: Add concurrent session limit (MAX_SESSIONS_PER_USER, default 10).
When exceeded, oldest sessions are revoked. New login always succeeds.

PT-07: Escape LIKE metacharacters (%, _) in admin audit log action
filter to prevent wildcard abuse.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Kyle 2026-03-02 17:43:27 +08:00
parent 7721bf5cec
commit ab7e4a7c7e
6 changed files with 40 additions and 7 deletions

View File

@ -18,6 +18,9 @@ ENVIRONMENT=development
# CORS allowed origins (comma-separated, default: http://localhost:5173) # CORS allowed origins (comma-separated, default: http://localhost:5173)
# CORS_ORIGINS=https://umbra.example.com # CORS_ORIGINS=https://umbra.example.com
# Public URL for ntfy notification click links (default: http://localhost)
# UMBRA_URL=https://umbra.example.com
# Timezone (applied to backend + db containers via env_file) # Timezone (applied to backend + db containers via env_file)
TZ=Australia/Perth TZ=Australia/Perth

View File

@ -23,6 +23,12 @@ class Settings(BaseSettings):
# CORS allowed origins (comma-separated) # CORS allowed origins (comma-separated)
CORS_ORIGINS: str = "http://localhost:5173" CORS_ORIGINS: str = "http://localhost:5173"
# Public-facing URL used in ntfy notification click links
UMBRA_URL: str = "http://localhost"
# Concurrent session limit per user (oldest evicted when exceeded)
MAX_SESSIONS_PER_USER: int = 10
model_config = SettingsConfigDict( model_config = SettingsConfigDict(
env_file=".env", env_file=".env",
env_file_encoding="utf-8", env_file_encoding="utf-8",

View File

@ -35,7 +35,7 @@ from app.services.ntfy_templates import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
UMBRA_URL = "http://10.0.69.35" from app.config import settings as app_settings
# ── Dedup helpers ───────────────────────────────────────────────────────────── # ── Dedup helpers ─────────────────────────────────────────────────────────────
@ -92,7 +92,7 @@ async def _dispatch_reminders(db: AsyncSession, settings: Settings, now: datetim
) )
sent = await send_ntfy_notification( sent = await send_ntfy_notification(
settings=settings, settings=settings,
click_url=UMBRA_URL, click_url=app_settings.UMBRA_URL,
**payload, **payload,
) )
if sent: if sent:
@ -144,7 +144,7 @@ async def _dispatch_events(db: AsyncSession, settings: Settings, now: datetime,
) )
sent = await send_ntfy_notification( sent = await send_ntfy_notification(
settings=settings, settings=settings,
click_url=UMBRA_URL, click_url=app_settings.UMBRA_URL,
**payload, **payload,
) )
if sent: if sent:
@ -184,7 +184,7 @@ async def _dispatch_todos(db: AsyncSession, settings: Settings, today, sent_keys
) )
sent = await send_ntfy_notification( sent = await send_ntfy_notification(
settings=settings, settings=settings,
click_url=UMBRA_URL, click_url=app_settings.UMBRA_URL,
**payload, **payload,
) )
if sent: if sent:
@ -223,7 +223,7 @@ async def _dispatch_projects(db: AsyncSession, settings: Settings, today, sent_k
) )
sent = await send_ntfy_notification( sent = await send_ntfy_notification(
settings=settings, settings=settings,
click_url=UMBRA_URL, click_url=app_settings.UMBRA_URL,
**payload, **payload,
) )
if sent: if sent:

View File

@ -770,7 +770,9 @@ async def get_audit_log(
) )
if action: if action:
base_q = base_q.where(AuditLog.action.like(f"{action}%")) # Escape LIKE metacharacters so user input is treated literally
safe_action = action.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
base_q = base_q.where(AuditLog.action.like(f"{safe_action}%", escape="\\"))
if target_user_id is not None: if target_user_id is not None:
base_q = base_q.where(AuditLog.target_user_id == target_user_id) base_q = base_q.where(AuditLog.target_user_id == target_user_id)

View File

@ -66,6 +66,7 @@ def _set_session_cookie(response: Response, token: str) -> None:
secure=app_settings.COOKIE_SECURE, secure=app_settings.COOKIE_SECURE,
max_age=app_settings.SESSION_MAX_AGE_DAYS * 86400, max_age=app_settings.SESSION_MAX_AGE_DAYS * 86400,
samesite="lax", samesite="lax",
path="/",
) )
@ -219,6 +220,26 @@ async def _create_db_session(
) )
db.add(db_session) db.add(db_session)
await db.flush() await db.flush()
# Enforce concurrent session limit: revoke oldest sessions beyond the cap
active_sessions = (
await db.execute(
select(UserSession)
.where(
UserSession.user_id == user.id,
UserSession.revoked == False, # noqa: E712
UserSession.expires_at > datetime.now(),
)
.order_by(UserSession.created_at.asc())
)
).scalars().all()
max_sessions = app_settings.MAX_SESSIONS_PER_USER
if len(active_sessions) > max_sessions:
for old_session in active_sessions[: len(active_sessions) - max_sessions]:
old_session.revoked = True
await db.flush()
token = create_session_token(user.id, session_id) token = create_session_token(user.id, session_id)
return session_id, token return session_id, token

View File

@ -2,6 +2,7 @@ from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db from app.database import get_db
from app.config import settings as app_settings
from app.models.settings import Settings from app.models.settings import Settings
from app.models.user import User from app.models.user import User
from app.schemas.settings import SettingsUpdate, SettingsResponse from app.schemas.settings import SettingsUpdate, SettingsResponse
@ -108,7 +109,7 @@ async def test_ntfy(
message="If you see this, your ntfy integration is working correctly.", message="If you see this, your ntfy integration is working correctly.",
tags=["white_check_mark"], tags=["white_check_mark"],
priority=3, priority=3,
click_url="http://10.0.69.35", click_url=app_settings.UMBRA_URL,
) )
if not success: if not success: