Kyle Pope a94485b138 Address code review findings across all phases
Phase 1 fixes:
- W-01: Add start_period: 30s to backend healthcheck for migration window
- W-03: Narrow .dockerignore *.md to specific files (preserve alembic/README)

Phase 2 fixes:
- C-01: Wrap Argon2id calls in totp.py (disable, regenerate, backup verify,
  backup store) — missed in initial AC-2 pass
- S-01: Extract async wrappers (ahash_password, averify_password,
  averify_password_with_upgrade) into services/auth.py, refactor all
  callers to use them instead of manual run_in_executor boilerplate
- W-01: Fix ntfy dedup regression — commit per category instead of per-user
  to preserve dedup records if a later category fails

Phase 4 fixes:
- C-01: Fix optimistic drag-and-drop cache key to include date range
- C-02: Replace toISOString() with format() to avoid UTC date shift in
  visible range calculation
- W-02: Initialize visibleRange from current month to eliminate unscoped
  first fetch + immediate refetch

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 00:19:33 +08:00

186 lines
6.7 KiB
Python

"""
Authentication service: password hashing, session tokens, MFA tokens.
Password strategy:
- New passwords: Argon2id (OWASP/NIST preferred, PHC winner)
- Legacy bcrypt hashes (migrated from PIN auth): accepted on login, immediately
rehashed to Argon2id on first successful use.
"""
import asyncio
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
from app.config import settings as app_settings
# OWASP-minimum Argon2id parameters (m=19 MB, t=2 iterations, p=1)
_ph = PasswordHasher(
time_cost=2,
memory_cost=19456, # 19 MB in KB
parallelism=1,
hash_len=32,
salt_len=16,
)
# Session serializer — salt differentiates from MFA tokens
_serializer = URLSafeTimedSerializer(
secret_key=app_settings.SECRET_KEY,
salt="umbra-session-v2",
)
# ---------------------------------------------------------------------------
# Password helpers
# ---------------------------------------------------------------------------
def hash_password(password: str) -> str:
"""Hash a password with Argon2id."""
return _ph.hash(password)
def verify_password(password: str, hashed: str) -> bool:
"""Verify an Argon2id password hash. Returns False on any failure."""
try:
return _ph.verify(hashed, password)
except (VerifyMismatchError, VerificationError, InvalidHashError):
return False
def needs_rehash(hashed: str) -> bool:
"""True if the stored hash was created with outdated parameters."""
return _ph.check_needs_rehash(hashed)
def verify_password_with_upgrade(password: str, hashed: str) -> tuple[bool, str | None]:
"""
Verify a password against a stored hash (Argon2id or legacy bcrypt).
Returns (is_valid, new_hash_if_upgrade_needed).
new_hash is non-None only when the stored hash is bcrypt and the password is
correct — caller must persist the new hash to complete the migration.
Also returns a new hash when Argon2id parameters are outdated.
"""
if hashed.startswith("$2b$") or hashed.startswith("$2a$"):
# Legacy bcrypt — verify then immediately rehash to Argon2id
import bcrypt # noqa: PLC0415 — intentional lazy import; bcrypt is only needed during migration
try:
valid = bcrypt.checkpw(password.encode(), hashed.encode())
except Exception:
return False, None
if valid:
return True, hash_password(password)
return False, None
# Argon2id path
valid = verify_password(password, hashed)
new_hash = hash_password(password) if (valid and needs_rehash(hashed)) else None
return valid, new_hash
# ---------------------------------------------------------------------------
# Async wrappers — run CPU-bound Argon2id ops in a thread pool (AC-2/S-01)
# ---------------------------------------------------------------------------
async def ahash_password(password: str) -> str:
"""Async wrapper for hash_password — runs Argon2id in executor."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, hash_password, password)
async def averify_password(password: str, hashed: str) -> bool:
"""Async wrapper for verify_password — runs Argon2id in executor."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, verify_password, password, hashed)
async def averify_password_with_upgrade(password: str, hashed: str) -> tuple[bool, str | None]:
"""Async wrapper for verify_password_with_upgrade — runs Argon2id in executor."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, verify_password_with_upgrade, password, hashed)
# ---------------------------------------------------------------------------
# Session tokens
# ---------------------------------------------------------------------------
def create_session_token(user_id: int, session_id: str) -> str:
"""Create a signed session cookie payload embedding user_id + session_id."""
return _serializer.dumps({"uid": user_id, "sid": session_id})
def verify_session_token(token: str, max_age: int | None = None) -> dict | None:
"""
Verify a session cookie and return its payload dict, or None if invalid/expired.
max_age defaults to SESSION_TOKEN_HARD_CEILING_DAYS (absolute token lifetime).
The sliding window (SESSION_MAX_AGE_DAYS) is enforced via DB expires_at checks,
not by itsdangerous — this decoupling prevents the serializer from rejecting
renewed tokens that were created more than SESSION_MAX_AGE_DAYS ago.
"""
if max_age is None:
max_age = app_settings.SESSION_TOKEN_HARD_CEILING_DAYS * 86400
try:
return _serializer.loads(token, max_age=max_age)
except (BadSignature, SignatureExpired):
return None
# ---------------------------------------------------------------------------
# MFA tokens (short-lived, used between password OK and TOTP verification)
# ---------------------------------------------------------------------------
# MFA tokens use a distinct salt so they cannot be replayed as session tokens
_mfa_serializer = URLSafeTimedSerializer(
secret_key=app_settings.SECRET_KEY,
salt="mfa-challenge",
)
def create_mfa_token(user_id: int) -> str:
"""Create a short-lived signed token for the MFA challenge step."""
return _mfa_serializer.dumps({"uid": user_id})
def verify_mfa_token(token: str) -> int | None:
"""
Verify an MFA challenge token.
Returns the user_id on success, None if invalid or expired (5-minute TTL).
"""
try:
data = _mfa_serializer.loads(
token, max_age=app_settings.MFA_TOKEN_MAX_AGE_SECONDS
)
return data["uid"]
except Exception:
return None
# ---------------------------------------------------------------------------
# MFA enforcement tokens (SEC-03: distinct salt from challenge tokens)
# ---------------------------------------------------------------------------
_mfa_enforce_serializer = URLSafeTimedSerializer(
secret_key=app_settings.SECRET_KEY,
salt="mfa-enforce-setup-v1",
)
def create_mfa_enforce_token(user_id: int) -> str:
"""Create a short-lived token for MFA enforcement setup (not a session)."""
return _mfa_enforce_serializer.dumps({"uid": user_id})
def verify_mfa_enforce_token(token: str) -> int | None:
"""
Verify an MFA enforcement setup token.
Returns user_id on success, None if invalid or expired (5-minute TTL).
"""
try:
data = _mfa_enforce_serializer.loads(
token, max_age=app_settings.MFA_TOKEN_MAX_AGE_SECONDS
)
return data["uid"]
except Exception:
return None