- New User model (username, argon2id password_hash, totp fields, lockout) - New UserSession model (DB-backed revocation, replaces in-memory set) - New services/auth.py: Argon2id hashing, bcrypt→Argon2id upgrade path, URLSafeTimedSerializer session/MFA tokens - New schemas/auth.py: SetupRequest, LoginRequest, ChangePasswordRequest with OWASP password strength validation - Full rewrite of routers/auth.py: setup/login/logout/status/change-password with account lockout (10 failures → 30-min, HTTP 423), IP rate limiting retained as outer layer, get_current_user + get_current_settings dependencies replacing get_current_session - Settings model: drop pin_hash, add user_id FK (nullable for migration) - Schemas/settings.py: remove SettingsCreate, ChangePinRequest, _validate_pin_length - Settings router: rewrite to use get_current_user + get_current_settings, preserve ntfy test endpoint - All 11 consumer routers updated: auth-gate-only routers use get_current_user, routers reading Settings fields use get_current_settings - config.py: add SESSION_MAX_AGE_DAYS, MFA_TOKEN_MAX_AGE_SECONDS, TOTP_ISSUER - main.py: import User and UserSession models for Alembic discovery - requirements.txt: add argon2-cffi>=23.1.0 - Migration 023: create users + user_sessions tables, migrate pin_hash → User row (admin), backfill settings.user_id, drop pin_hash Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
129 lines
4.4 KiB
Python
129 lines
4.4 KiB
Python
"""
|
|
Authentication service: password hashing, session tokens, MFA tokens.
|
|
|
|
Password strategy:
|
|
- New passwords: Argon2id (OWASP/NIST preferred, PHC winner)
|
|
- Legacy bcrypt hashes (migrated from PIN auth): accepted on login, immediately
|
|
rehashed to Argon2id on first successful use.
|
|
"""
|
|
from argon2 import PasswordHasher
|
|
from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError
|
|
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
|
|
|
|
from app.config import settings as app_settings
|
|
|
|
# OWASP-minimum Argon2id parameters (m=19 MB, t=2 iterations, p=1)
|
|
_ph = PasswordHasher(
|
|
time_cost=2,
|
|
memory_cost=19456, # 19 MB in KB
|
|
parallelism=1,
|
|
hash_len=32,
|
|
salt_len=16,
|
|
)
|
|
|
|
# Session serializer — salt differentiates from MFA tokens
|
|
_serializer = URLSafeTimedSerializer(
|
|
secret_key=app_settings.SECRET_KEY,
|
|
salt="umbra-session-v2",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Password helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""Hash a password with Argon2id."""
|
|
return _ph.hash(password)
|
|
|
|
|
|
def verify_password(password: str, hashed: str) -> bool:
|
|
"""Verify an Argon2id password hash. Returns False on any failure."""
|
|
try:
|
|
return _ph.verify(hashed, password)
|
|
except (VerifyMismatchError, VerificationError, InvalidHashError):
|
|
return False
|
|
|
|
|
|
def needs_rehash(hashed: str) -> bool:
|
|
"""True if the stored hash was created with outdated parameters."""
|
|
return _ph.check_needs_rehash(hashed)
|
|
|
|
|
|
def verify_password_with_upgrade(password: str, hashed: str) -> tuple[bool, str | None]:
|
|
"""
|
|
Verify a password against a stored hash (Argon2id or legacy bcrypt).
|
|
|
|
Returns (is_valid, new_hash_if_upgrade_needed).
|
|
new_hash is non-None only when the stored hash is bcrypt and the password is
|
|
correct — caller must persist the new hash to complete the migration.
|
|
Also returns a new hash when Argon2id parameters are outdated.
|
|
"""
|
|
if hashed.startswith("$2b$") or hashed.startswith("$2a$"):
|
|
# Legacy bcrypt — verify then immediately rehash to Argon2id
|
|
import bcrypt # noqa: PLC0415 — intentional lazy import; bcrypt is only needed during migration
|
|
try:
|
|
valid = bcrypt.checkpw(password.encode(), hashed.encode())
|
|
except Exception:
|
|
return False, None
|
|
if valid:
|
|
return True, hash_password(password)
|
|
return False, None
|
|
|
|
# Argon2id path
|
|
valid = verify_password(password, hashed)
|
|
new_hash = hash_password(password) if (valid and needs_rehash(hashed)) else None
|
|
return valid, new_hash
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Session tokens
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_session_token(user_id: int, session_id: str) -> str:
|
|
"""Create a signed session cookie payload embedding user_id + session_id."""
|
|
return _serializer.dumps({"uid": user_id, "sid": session_id})
|
|
|
|
|
|
def verify_session_token(token: str, max_age: int | None = None) -> dict | None:
|
|
"""
|
|
Verify a session cookie and return its payload dict, or None if invalid/expired.
|
|
max_age defaults to SESSION_MAX_AGE_DAYS from config.
|
|
"""
|
|
if max_age is None:
|
|
max_age = app_settings.SESSION_MAX_AGE_DAYS * 86400
|
|
try:
|
|
return _serializer.loads(token, max_age=max_age)
|
|
except (BadSignature, SignatureExpired):
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MFA tokens (short-lived, used between password OK and TOTP verification)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# MFA tokens use a distinct salt so they cannot be replayed as session tokens
|
|
_mfa_serializer = URLSafeTimedSerializer(
|
|
secret_key=app_settings.SECRET_KEY,
|
|
salt="mfa-challenge",
|
|
)
|
|
|
|
|
|
def create_mfa_token(user_id: int) -> str:
|
|
"""Create a short-lived signed token for the MFA challenge step."""
|
|
return _mfa_serializer.dumps({"uid": user_id})
|
|
|
|
|
|
def verify_mfa_token(token: str) -> int | None:
|
|
"""
|
|
Verify an MFA challenge token.
|
|
Returns the user_id on success, None if invalid or expired (5-minute TTL).
|
|
"""
|
|
try:
|
|
data = _mfa_serializer.loads(
|
|
token, max_age=app_settings.MFA_TOKEN_MAX_AGE_SECONDS
|
|
)
|
|
return data["uid"]
|
|
except Exception:
|
|
return None
|