Kyle Pope fbc452a004 Implement Stage 6 Track A: PIN → Username/Password auth migration
- New User model (username, argon2id password_hash, totp fields, lockout)
- New UserSession model (DB-backed revocation, replaces in-memory set)
- New services/auth.py: Argon2id hashing, bcrypt→Argon2id upgrade path, URLSafeTimedSerializer session/MFA tokens
- New schemas/auth.py: SetupRequest, LoginRequest, ChangePasswordRequest with OWASP password strength validation
- Full rewrite of routers/auth.py: setup/login/logout/status/change-password with account lockout (10 failures → 30-min, HTTP 423), IP rate limiting retained as outer layer, get_current_user + get_current_settings dependencies replacing get_current_session
- Settings model: drop pin_hash, add user_id FK (nullable for migration)
- Schemas/settings.py: remove SettingsCreate, ChangePinRequest, _validate_pin_length
- Settings router: rewrite to use get_current_user + get_current_settings, preserve ntfy test endpoint
- All 11 consumer routers updated: auth-gate-only routers use get_current_user, routers reading Settings fields use get_current_settings
- config.py: add SESSION_MAX_AGE_DAYS, MFA_TOKEN_MAX_AGE_SECONDS, TOTP_ISSUER
- main.py: import User and UserSession models for Alembic discovery
- requirements.txt: add argon2-cffi>=23.1.0
- Migration 023: create users + user_sessions tables, migrate pin_hash → User row (admin), backfill settings.user_id, drop pin_hash

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 04:12:37 +08:00

129 lines
4.4 KiB
Python

"""
Authentication service: password hashing, session tokens, MFA tokens.
Password strategy:
- New passwords: Argon2id (OWASP/NIST preferred, PHC winner)
- Legacy bcrypt hashes (migrated from PIN auth): accepted on login, immediately
rehashed to Argon2id on first successful use.
"""
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
from app.config import settings as app_settings
# OWASP-minimum Argon2id parameters (m=19 MB, t=2 iterations, p=1)
_ph = PasswordHasher(
time_cost=2,
memory_cost=19456, # 19 MB in KB
parallelism=1,
hash_len=32,
salt_len=16,
)
# Session serializer — salt differentiates from MFA tokens
_serializer = URLSafeTimedSerializer(
secret_key=app_settings.SECRET_KEY,
salt="umbra-session-v2",
)
# ---------------------------------------------------------------------------
# Password helpers
# ---------------------------------------------------------------------------
def hash_password(password: str) -> str:
"""Hash a password with Argon2id."""
return _ph.hash(password)
def verify_password(password: str, hashed: str) -> bool:
"""Verify an Argon2id password hash. Returns False on any failure."""
try:
return _ph.verify(hashed, password)
except (VerifyMismatchError, VerificationError, InvalidHashError):
return False
def needs_rehash(hashed: str) -> bool:
"""True if the stored hash was created with outdated parameters."""
return _ph.check_needs_rehash(hashed)
def verify_password_with_upgrade(password: str, hashed: str) -> tuple[bool, str | None]:
"""
Verify a password against a stored hash (Argon2id or legacy bcrypt).
Returns (is_valid, new_hash_if_upgrade_needed).
new_hash is non-None only when the stored hash is bcrypt and the password is
correct — caller must persist the new hash to complete the migration.
Also returns a new hash when Argon2id parameters are outdated.
"""
if hashed.startswith("$2b$") or hashed.startswith("$2a$"):
# Legacy bcrypt — verify then immediately rehash to Argon2id
import bcrypt # noqa: PLC0415 — intentional lazy import; bcrypt is only needed during migration
try:
valid = bcrypt.checkpw(password.encode(), hashed.encode())
except Exception:
return False, None
if valid:
return True, hash_password(password)
return False, None
# Argon2id path
valid = verify_password(password, hashed)
new_hash = hash_password(password) if (valid and needs_rehash(hashed)) else None
return valid, new_hash
# ---------------------------------------------------------------------------
# Session tokens
# ---------------------------------------------------------------------------
def create_session_token(user_id: int, session_id: str) -> str:
"""Create a signed session cookie payload embedding user_id + session_id."""
return _serializer.dumps({"uid": user_id, "sid": session_id})
def verify_session_token(token: str, max_age: int | None = None) -> dict | None:
"""
Verify a session cookie and return its payload dict, or None if invalid/expired.
max_age defaults to SESSION_MAX_AGE_DAYS from config.
"""
if max_age is None:
max_age = app_settings.SESSION_MAX_AGE_DAYS * 86400
try:
return _serializer.loads(token, max_age=max_age)
except (BadSignature, SignatureExpired):
return None
# ---------------------------------------------------------------------------
# MFA tokens (short-lived, used between password OK and TOTP verification)
# ---------------------------------------------------------------------------
# MFA tokens use a distinct salt so they cannot be replayed as session tokens
_mfa_serializer = URLSafeTimedSerializer(
secret_key=app_settings.SECRET_KEY,
salt="mfa-challenge",
)
def create_mfa_token(user_id: int) -> str:
"""Create a short-lived signed token for the MFA challenge step."""
return _mfa_serializer.dumps({"uid": user_id})
def verify_mfa_token(token: str) -> int | None:
"""
Verify an MFA challenge token.
Returns the user_id on success, None if invalid or expired (5-minute TTL).
"""
try:
data = _mfa_serializer.loads(
token, max_age=app_settings.MFA_TOKEN_MAX_AGE_SECONDS
)
return data["uid"]
except Exception:
return None