""" Authentication service: password hashing, session tokens, MFA tokens. Password strategy: - New passwords: Argon2id (OWASP/NIST preferred, PHC winner) - Legacy bcrypt hashes (migrated from PIN auth): accepted on login, immediately rehashed to Argon2id on first successful use. """ import asyncio from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired from app.config import settings as app_settings # OWASP-minimum Argon2id parameters (m=19 MB, t=2 iterations, p=1) _ph = PasswordHasher( time_cost=2, memory_cost=19456, # 19 MB in KB parallelism=1, hash_len=32, salt_len=16, ) # Session serializer — salt differentiates from MFA tokens _serializer = URLSafeTimedSerializer( secret_key=app_settings.SECRET_KEY, salt="umbra-session-v2", ) # --------------------------------------------------------------------------- # Password helpers # --------------------------------------------------------------------------- def hash_password(password: str) -> str: """Hash a password with Argon2id.""" return _ph.hash(password) def verify_password(password: str, hashed: str) -> bool: """Verify an Argon2id password hash. Returns False on any failure.""" try: return _ph.verify(hashed, password) except (VerifyMismatchError, VerificationError, InvalidHashError): return False def needs_rehash(hashed: str) -> bool: """True if the stored hash was created with outdated parameters.""" return _ph.check_needs_rehash(hashed) def verify_password_with_upgrade(password: str, hashed: str) -> tuple[bool, str | None]: """ Verify a password against a stored hash (Argon2id or legacy bcrypt). Returns (is_valid, new_hash_if_upgrade_needed). new_hash is non-None only when the stored hash is bcrypt and the password is correct — caller must persist the new hash to complete the migration. Also returns a new hash when Argon2id parameters are outdated. """ if hashed.startswith("$2b$") or hashed.startswith("$2a$"): # Legacy bcrypt — verify then immediately rehash to Argon2id import bcrypt # noqa: PLC0415 — intentional lazy import; bcrypt is only needed during migration try: valid = bcrypt.checkpw(password.encode(), hashed.encode()) except Exception: return False, None if valid: return True, hash_password(password) return False, None # Argon2id path valid = verify_password(password, hashed) new_hash = hash_password(password) if (valid and needs_rehash(hashed)) else None return valid, new_hash # --------------------------------------------------------------------------- # Async wrappers — run CPU-bound Argon2id ops in a thread pool (AC-2/S-01) # --------------------------------------------------------------------------- async def ahash_password(password: str) -> str: """Async wrapper for hash_password — runs Argon2id in executor.""" loop = asyncio.get_running_loop() return await loop.run_in_executor(None, hash_password, password) async def averify_password(password: str, hashed: str) -> bool: """Async wrapper for verify_password — runs Argon2id in executor.""" loop = asyncio.get_running_loop() return await loop.run_in_executor(None, verify_password, password, hashed) async def averify_password_with_upgrade(password: str, hashed: str) -> tuple[bool, str | None]: """Async wrapper for verify_password_with_upgrade — runs Argon2id in executor.""" loop = asyncio.get_running_loop() return await loop.run_in_executor(None, verify_password_with_upgrade, password, hashed) # --------------------------------------------------------------------------- # Session tokens # --------------------------------------------------------------------------- def create_session_token(user_id: int, session_id: str) -> str: """Create a signed session cookie payload embedding user_id + session_id.""" return _serializer.dumps({"uid": user_id, "sid": session_id}) def verify_session_token(token: str, max_age: int | None = None) -> dict | None: """ Verify a session cookie and return its payload dict, or None if invalid/expired. max_age defaults to SESSION_TOKEN_HARD_CEILING_DAYS (absolute token lifetime). The sliding window (SESSION_MAX_AGE_DAYS) is enforced via DB expires_at checks, not by itsdangerous — this decoupling prevents the serializer from rejecting renewed tokens that were created more than SESSION_MAX_AGE_DAYS ago. """ if max_age is None: max_age = app_settings.SESSION_TOKEN_HARD_CEILING_DAYS * 86400 try: return _serializer.loads(token, max_age=max_age) except (BadSignature, SignatureExpired): return None # --------------------------------------------------------------------------- # MFA tokens (short-lived, used between password OK and TOTP verification) # --------------------------------------------------------------------------- # MFA tokens use a distinct salt so they cannot be replayed as session tokens _mfa_serializer = URLSafeTimedSerializer( secret_key=app_settings.SECRET_KEY, salt="mfa-challenge", ) def create_mfa_token(user_id: int) -> str: """Create a short-lived signed token for the MFA challenge step.""" return _mfa_serializer.dumps({"uid": user_id}) def verify_mfa_token(token: str) -> int | None: """ Verify an MFA challenge token. Returns the user_id on success, None if invalid or expired (5-minute TTL). """ try: data = _mfa_serializer.loads( token, max_age=app_settings.MFA_TOKEN_MAX_AGE_SECONDS ) return data["uid"] except Exception: return None # --------------------------------------------------------------------------- # MFA enforcement tokens (SEC-03: distinct salt from challenge tokens) # --------------------------------------------------------------------------- _mfa_enforce_serializer = URLSafeTimedSerializer( secret_key=app_settings.SECRET_KEY, salt="mfa-enforce-setup-v1", ) def create_mfa_enforce_token(user_id: int) -> str: """Create a short-lived token for MFA enforcement setup (not a session).""" return _mfa_enforce_serializer.dumps({"uid": user_id}) def verify_mfa_enforce_token(token: str) -> int | None: """ Verify an MFA enforcement setup token. Returns user_id on success, None if invalid or expired (5-minute TTL). """ try: data = _mfa_enforce_serializer.loads( token, max_age=app_settings.MFA_TOKEN_MAX_AGE_SECONDS ) return data["uid"] except Exception: return None