Phase 1: Add role, mfa_enforce_pending, must_change_password to users table. Create system_config (singleton) and audit_log tables. Migration 026. Phase 2: Add user_id FK to all 8 data tables (todos, reminders, projects, calendars, people, locations, event_templates, ntfy_sent) with 4-step nullable→backfill→FK→NOT NULL pattern. Migrations 027-034. Phase 3: Harden auth schemas (extra="forbid" on RegisterRequest), add MFA enforcement token serializer with distinct salt, rewrite auth router with require_role() factory and registration endpoint. Phase 4: Scope all 12 routers by user_id, fix dependency type bugs, bound weather cache (SEC-15), multi-user ntfy dispatch. Phase 5: Create admin router (14 endpoints), admin schemas, audit service, rate limiting in nginx. SEC-08 CSRF via X-Requested-With. Phase 6: Update frontend types, useAuth hook (role/isAdmin/register), App.tsx (AdminRoute guard), Sidebar (admin link), api.ts (XHR header). Security findings addressed: SEC-01, SEC-02, SEC-03, SEC-04, SEC-05, SEC-06, SEC-07, SEC-08, SEC-12, SEC-13, SEC-15. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
158 lines
5.4 KiB
Python
158 lines
5.4 KiB
Python
"""
|
|
Authentication service: password hashing, session tokens, MFA tokens.
|
|
|
|
Password strategy:
|
|
- New passwords: Argon2id (OWASP/NIST preferred, PHC winner)
|
|
- Legacy bcrypt hashes (migrated from PIN auth): accepted on login, immediately
|
|
rehashed to Argon2id on first successful use.
|
|
"""
|
|
from argon2 import PasswordHasher
|
|
from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError
|
|
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
|
|
|
|
from app.config import settings as app_settings
|
|
|
|
# OWASP-minimum Argon2id parameters (m=19 MB, t=2 iterations, p=1)
|
|
_ph = PasswordHasher(
|
|
time_cost=2,
|
|
memory_cost=19456, # 19 MB in KB
|
|
parallelism=1,
|
|
hash_len=32,
|
|
salt_len=16,
|
|
)
|
|
|
|
# Session serializer — salt differentiates from MFA tokens
|
|
_serializer = URLSafeTimedSerializer(
|
|
secret_key=app_settings.SECRET_KEY,
|
|
salt="umbra-session-v2",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Password helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""Hash a password with Argon2id."""
|
|
return _ph.hash(password)
|
|
|
|
|
|
def verify_password(password: str, hashed: str) -> bool:
|
|
"""Verify an Argon2id password hash. Returns False on any failure."""
|
|
try:
|
|
return _ph.verify(hashed, password)
|
|
except (VerifyMismatchError, VerificationError, InvalidHashError):
|
|
return False
|
|
|
|
|
|
def needs_rehash(hashed: str) -> bool:
|
|
"""True if the stored hash was created with outdated parameters."""
|
|
return _ph.check_needs_rehash(hashed)
|
|
|
|
|
|
def verify_password_with_upgrade(password: str, hashed: str) -> tuple[bool, str | None]:
|
|
"""
|
|
Verify a password against a stored hash (Argon2id or legacy bcrypt).
|
|
|
|
Returns (is_valid, new_hash_if_upgrade_needed).
|
|
new_hash is non-None only when the stored hash is bcrypt and the password is
|
|
correct — caller must persist the new hash to complete the migration.
|
|
Also returns a new hash when Argon2id parameters are outdated.
|
|
"""
|
|
if hashed.startswith("$2b$") or hashed.startswith("$2a$"):
|
|
# Legacy bcrypt — verify then immediately rehash to Argon2id
|
|
import bcrypt # noqa: PLC0415 — intentional lazy import; bcrypt is only needed during migration
|
|
try:
|
|
valid = bcrypt.checkpw(password.encode(), hashed.encode())
|
|
except Exception:
|
|
return False, None
|
|
if valid:
|
|
return True, hash_password(password)
|
|
return False, None
|
|
|
|
# Argon2id path
|
|
valid = verify_password(password, hashed)
|
|
new_hash = hash_password(password) if (valid and needs_rehash(hashed)) else None
|
|
return valid, new_hash
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Session tokens
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_session_token(user_id: int, session_id: str) -> str:
|
|
"""Create a signed session cookie payload embedding user_id + session_id."""
|
|
return _serializer.dumps({"uid": user_id, "sid": session_id})
|
|
|
|
|
|
def verify_session_token(token: str, max_age: int | None = None) -> dict | None:
|
|
"""
|
|
Verify a session cookie and return its payload dict, or None if invalid/expired.
|
|
max_age defaults to SESSION_MAX_AGE_DAYS from config.
|
|
"""
|
|
if max_age is None:
|
|
max_age = app_settings.SESSION_MAX_AGE_DAYS * 86400
|
|
try:
|
|
return _serializer.loads(token, max_age=max_age)
|
|
except (BadSignature, SignatureExpired):
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MFA tokens (short-lived, used between password OK and TOTP verification)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# MFA tokens use a distinct salt so they cannot be replayed as session tokens
|
|
_mfa_serializer = URLSafeTimedSerializer(
|
|
secret_key=app_settings.SECRET_KEY,
|
|
salt="mfa-challenge",
|
|
)
|
|
|
|
|
|
def create_mfa_token(user_id: int) -> str:
|
|
"""Create a short-lived signed token for the MFA challenge step."""
|
|
return _mfa_serializer.dumps({"uid": user_id})
|
|
|
|
|
|
def verify_mfa_token(token: str) -> int | None:
|
|
"""
|
|
Verify an MFA challenge token.
|
|
Returns the user_id on success, None if invalid or expired (5-minute TTL).
|
|
"""
|
|
try:
|
|
data = _mfa_serializer.loads(
|
|
token, max_age=app_settings.MFA_TOKEN_MAX_AGE_SECONDS
|
|
)
|
|
return data["uid"]
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MFA enforcement tokens (SEC-03: distinct salt from challenge tokens)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_mfa_enforce_serializer = URLSafeTimedSerializer(
|
|
secret_key=app_settings.SECRET_KEY,
|
|
salt="mfa-enforce-setup-v1",
|
|
)
|
|
|
|
|
|
def create_mfa_enforce_token(user_id: int) -> str:
|
|
"""Create a short-lived token for MFA enforcement setup (not a session)."""
|
|
return _mfa_enforce_serializer.dumps({"uid": user_id})
|
|
|
|
|
|
def verify_mfa_enforce_token(token: str) -> int | None:
|
|
"""
|
|
Verify an MFA enforcement setup token.
|
|
Returns user_id on success, None if invalid or expired (5-minute TTL).
|
|
"""
|
|
try:
|
|
data = _mfa_enforce_serializer.loads(
|
|
token, max_age=app_settings.MFA_TOKEN_MAX_AGE_SECONDS
|
|
)
|
|
return data["uid"]
|
|
except Exception:
|
|
return None
|