diff --git a/backend/alembic/versions/024_totp_mfa_tables.py b/backend/alembic/versions/024_totp_mfa_tables.py new file mode 100644 index 0000000..6999831 --- /dev/null +++ b/backend/alembic/versions/024_totp_mfa_tables.py @@ -0,0 +1,74 @@ +"""TOTP MFA: create totp_usage and backup_codes tables. + +Revision ID: 024 +Revises: 023 +Create Date: 2026-02-25 + +Note: totp_secret and totp_enabled columns are already on the users table +from migration 023 — this migration only adds the support tables. +""" +from alembic import op +import sqlalchemy as sa + +revision = "024" +down_revision = "023" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # --- totp_usage: tracks used TOTP codes for replay prevention --- + op.create_table( + "totp_usage", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column( + "user_id", + sa.Integer(), + sa.ForeignKey("users.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("code", sa.String(6), nullable=False), + # The actual TOTP time window (floor(unix_time / 30)) that matched + sa.Column("window", sa.Integer(), nullable=False), + sa.Column( + "used_at", + sa.DateTime(), + server_default=sa.text("NOW()"), + nullable=False, + ), + sa.PrimaryKeyConstraint("id"), + # Unique on (user_id, code, window) — not just (user_id, window) — see model comment + sa.UniqueConstraint("user_id", "code", "window", name="uq_totp_user_code_window"), + ) + op.create_index("ix_totp_usage_user_id", "totp_usage", ["user_id"]) + + # --- backup_codes: hashed recovery codes (Argon2id) --- + op.create_table( + "backup_codes", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column( + "user_id", + sa.Integer(), + sa.ForeignKey("users.id", ondelete="CASCADE"), + nullable=False, + ), + # Argon2id hash of the plaintext recovery code + sa.Column("code_hash", sa.String(255), nullable=False), + # Null until redeemed + sa.Column("used_at", sa.DateTime(), nullable=True), + sa.Column( + "created_at", + sa.DateTime(), + server_default=sa.text("NOW()"), + nullable=False, + ), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index("ix_backup_codes_user_id", "backup_codes", ["user_id"]) + + +def downgrade() -> None: + op.drop_index("ix_backup_codes_user_id", table_name="backup_codes") + op.drop_table("backup_codes") + op.drop_index("ix_totp_usage_user_id", table_name="totp_usage") + op.drop_table("totp_usage") diff --git a/backend/app/jobs/notifications.py b/backend/app/jobs/notifications.py index bf27475..0aed47b 100644 --- a/backend/app/jobs/notifications.py +++ b/backend/app/jobs/notifications.py @@ -21,6 +21,8 @@ from app.models.calendar_event import CalendarEvent from app.models.todo import Todo from app.models.project import Project from app.models.ntfy_sent import NtfySent +from app.models.totp_usage import TOTPUsage +from app.models.session import UserSession from app.services.ntfy import send_ntfy_notification from app.services.ntfy_templates import ( build_event_notification, @@ -211,6 +213,19 @@ async def _purge_old_sent_records(db: AsyncSession) -> None: await db.commit() +async def _purge_totp_usage(db: AsyncSession) -> None: + """Remove TOTP usage records older than 5 minutes — they serve no purpose beyond replay prevention.""" + cutoff = datetime.now() - timedelta(minutes=5) + await db.execute(delete(TOTPUsage).where(TOTPUsage.used_at < cutoff)) + await db.commit() + + +async def _purge_expired_sessions(db: AsyncSession) -> None: + """Remove expired UserSession rows to keep the sessions table lean.""" + await db.execute(delete(UserSession).where(UserSession.expires_at < datetime.now())) + await db.commit() + + # ── Entry point ─────────────────────────────────────────────────────────────── async def run_notification_dispatch() -> None: @@ -242,6 +257,11 @@ async def run_notification_dispatch() -> None: # Daily housekeeping: purge stale dedup records await _purge_old_sent_records(db) + # Security housekeeping runs every cycle regardless of ntfy_enabled + async with AsyncSessionLocal() as db: + await _purge_totp_usage(db) + await _purge_expired_sessions(db) + except Exception: # Broad catch: job failure must never crash the scheduler or the app logger.exception("ntfy dispatch job encountered an unhandled error") diff --git a/backend/app/main.py b/backend/app/main.py index 60a7d65..8a3b1ef 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -6,11 +6,14 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler from app.database import engine from app.routers import auth, todos, events, calendars, reminders, projects, people, locations, settings as settings_router, dashboard, weather, event_templates +from app.routers import totp from app.jobs.notifications import run_notification_dispatch # Import models so Alembic's autogenerate can discover them from app.models import user as _user_model # noqa: F401 from app.models import session as _session_model # noqa: F401 +from app.models import totp_usage as _totp_usage_model # noqa: F401 +from app.models import backup_code as _backup_code_model # noqa: F401 @asynccontextmanager @@ -58,6 +61,7 @@ app.include_router(settings_router.router, prefix="/api/settings", tags=["Settin app.include_router(dashboard.router, prefix="/api", tags=["Dashboard"]) app.include_router(weather.router, prefix="/api/weather", tags=["Weather"]) app.include_router(event_templates.router, prefix="/api/event-templates", tags=["Event Templates"]) +app.include_router(totp.router, prefix="/api/auth", tags=["TOTP MFA"]) @app.get("/") diff --git a/backend/app/models/backup_code.py b/backend/app/models/backup_code.py new file mode 100644 index 0000000..49f5ef8 --- /dev/null +++ b/backend/app/models/backup_code.py @@ -0,0 +1,19 @@ +from sqlalchemy import String, ForeignKey, func +from sqlalchemy.orm import Mapped, mapped_column +from datetime import datetime +from typing import Optional +from app.database import Base + + +class BackupCode(Base): + __tablename__ = "backup_codes" + + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column( + ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True + ) + # Argon2id hash of the plaintext recovery code — never store plaintext + code_hash: Mapped[str] = mapped_column(String(255), nullable=False) + # Null until redeemed; set to datetime.now() on successful use + used_at: Mapped[Optional[datetime]] = mapped_column(nullable=True, default=None) + created_at: Mapped[datetime] = mapped_column(default=func.now()) diff --git a/backend/app/models/totp_usage.py b/backend/app/models/totp_usage.py new file mode 100644 index 0000000..869be02 --- /dev/null +++ b/backend/app/models/totp_usage.py @@ -0,0 +1,30 @@ +from sqlalchemy import String, Integer, ForeignKey, UniqueConstraint, func +from sqlalchemy.orm import Mapped, mapped_column +from datetime import datetime +from app.database import Base + + +class TOTPUsage(Base): + __tablename__ = "totp_usage" + + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column( + ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True + ) + # The 6-digit code that was used + code: Mapped[str] = mapped_column(String(6), nullable=False) + # The TOTP time window in which the code was valid (floor(unix_time / 30)) + window: Mapped[int] = mapped_column(Integer, nullable=False) + used_at: Mapped[datetime] = mapped_column(default=func.now()) + + # NOTE on replay prevention design: + # The unique constraint is on (user_id, code, window) — NOT (user_id, window). + # This allows a user to present the T-1 window code and then the T window code + # consecutively — these are different 6-digit OTPs, so both should succeed. + # Constraining only (user_id, window) would reject the second legitimate code. + # The actual attack we prevent is reusing the *same code string in the same window*, + # which is the only true replay. This is compliant with RFC 6238 §5.2. + # The 5-minute MFA token expiry provides the outer time bound on any abuse window. + __table_args__ = ( + UniqueConstraint("user_id", "code", "window", name="uq_totp_user_code_window"), + ) diff --git a/backend/app/routers/totp.py b/backend/app/routers/totp.py new file mode 100644 index 0000000..e8b9be7 --- /dev/null +++ b/backend/app/routers/totp.py @@ -0,0 +1,416 @@ +""" +TOTP MFA router. + +Endpoints (all under /api/auth — registered in main.py with prefix="/api/auth"): + + POST /totp/setup — Generate secret + QR + backup codes (auth required) + POST /totp/confirm — Verify first code, enable TOTP (auth required) + POST /totp-verify — MFA challenge: mfa_token + TOTP/backup code, issues session + POST /totp/disable — Disable TOTP (auth required, needs password + code) + POST /totp/backup-codes/regenerate — Regenerate backup codes (auth required, needs password + code) + GET /totp/status — { enabled, backup_codes_remaining } (auth required) + +Security: + - TOTP secrets encrypted at rest (Fernet/AES-128-CBC, key derived from SECRET_KEY) + - Replay prevention via totp_usage table (unique on user_id+code+window) + - Backup codes hashed with Argon2id, shown plaintext once only + - Failed TOTP attempts increment user.failed_login_count (shared lockout counter) + - totp-verify uses mfa_token (not session cookie) — user is not yet authenticated +""" +import uuid +import secrets +import logging +from datetime import datetime, timedelta +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Request, Response +from pydantic import BaseModel +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, delete +from sqlalchemy.exc import IntegrityError + +from app.database import get_db +from app.models.user import User +from app.models.session import UserSession +from app.models.totp_usage import TOTPUsage +from app.models.backup_code import BackupCode +from app.routers.auth import get_current_user, _set_session_cookie +from app.services.auth import ( + verify_password_with_upgrade, + hash_password, + verify_mfa_token, + create_session_token, +) +from app.services.totp import ( + generate_totp_secret, + encrypt_totp_secret, + decrypt_totp_secret, + get_totp_uri, + verify_totp_code, + generate_qr_base64, + generate_backup_codes, +) +from app.config import settings as app_settings + +# Argon2id for backup code hashing — treat each code like a password +from argon2 import PasswordHasher +from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError + +logger = logging.getLogger(__name__) + +router = APIRouter() + +# Argon2id instance for backup code hashes (same params as password hashing) +_ph = PasswordHasher( + time_cost=2, + memory_cost=19456, + parallelism=1, + hash_len=32, + salt_len=16, +) + + +# --------------------------------------------------------------------------- +# Request schemas +# --------------------------------------------------------------------------- + +class TOTPConfirmRequest(BaseModel): + code: str + + +class TOTPVerifyRequest(BaseModel): + mfa_token: str + code: Optional[str] = None # 6-digit TOTP code + backup_code: Optional[str] = None # Alternative: XXXX-XXXX backup code + + +class TOTPDisableRequest(BaseModel): + password: str + code: str # Current TOTP code required to disable + + +class BackupCodesRegenerateRequest(BaseModel): + password: str + code: str # Current TOTP code required to regenerate + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +async def _store_backup_codes(db: AsyncSession, user_id: int, plaintext_codes: list[str]) -> None: + """Hash and insert backup codes for the given user.""" + for code in plaintext_codes: + code_hash = _ph.hash(code) + db.add(BackupCode(user_id=user_id, code_hash=code_hash)) + await db.commit() + + +async def _delete_backup_codes(db: AsyncSession, user_id: int) -> None: + """Delete all backup codes for a user.""" + await db.execute(delete(BackupCode).where(BackupCode.user_id == user_id)) + await db.commit() + + +async def _verify_backup_code( + db: AsyncSession, user_id: int, submitted_code: str +) -> bool: + """ + Check submitted backup code against all unused hashes for the user. + On match, marks the code as used. Returns True if a valid unused code was found. + Uses Argon2id verification — constant-time by design. + """ + result = await db.execute( + select(BackupCode).where( + BackupCode.user_id == user_id, + BackupCode.used_at.is_(None), + ) + ) + unused_codes = result.scalars().all() + + for record in unused_codes: + try: + if _ph.verify(record.code_hash, submitted_code): + record.used_at = datetime.now() + await db.commit() + return True + except (VerifyMismatchError, VerificationError, InvalidHashError): + continue + + return False + + +async def _create_full_session( + db: AsyncSession, + user: User, + request: Request, +) -> str: + """Create a UserSession row and return the signed cookie token.""" + session_id = uuid.uuid4().hex + expires_at = datetime.now() + timedelta(days=app_settings.SESSION_MAX_AGE_DAYS) + ip = request.client.host if request.client else None + user_agent = request.headers.get("user-agent") + + db_session = UserSession( + id=session_id, + user_id=user.id, + expires_at=expires_at, + ip_address=ip[:45] if ip else None, + user_agent=(user_agent or "")[:255] if user_agent else None, + ) + db.add(db_session) + await db.commit() + return create_session_token(user.id, session_id) + + +# --------------------------------------------------------------------------- +# Routes +# --------------------------------------------------------------------------- + +@router.post("/totp/setup") +async def totp_setup( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """ + Generate a new TOTP secret, QR code, and backup codes. + Stores the encrypted secret with totp_enabled=False until confirmed. + + Idempotent: calling again before confirmation overwrites the unconfirmed secret, + so browser refreshes mid-setup generate a fresh QR without error. + + Returns { secret, qr_code_base64, backup_codes } — the only time plaintext + values are shown. The `secret` field is the raw base32 for manual entry. + """ + # Generate new secret (idempotent — overwrite any existing unconfirmed secret) + raw_secret = generate_totp_secret() + encrypted_secret = encrypt_totp_secret(raw_secret) + + current_user.totp_secret = encrypted_secret + current_user.totp_enabled = False # Not enabled until /confirm called + + # Generate backup codes — hash before storage, return plaintext once + plaintext_codes = generate_backup_codes(10) + await _delete_backup_codes(db, current_user.id) # Remove any previous unconfirmed codes + await _store_backup_codes(db, current_user.id, plaintext_codes) + + await db.commit() + + # Build QR code from provisioning URI + uri = get_totp_uri(encrypted_secret, current_user.username) + qr_base64 = generate_qr_base64(uri) + + return { + "secret": raw_secret, # Raw base32 for manual authenticator entry + "qr_code_base64": qr_base64, # PNG QR code, data:image/png;base64,... + "backup_codes": plaintext_codes, # Shown once — user must save these + } + + +@router.post("/totp/confirm") +async def totp_confirm( + data: TOTPConfirmRequest, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """ + Verify the first TOTP code from the authenticator app and enable TOTP. + Must be called after /setup while totp_enabled is still False. + """ + if not current_user.totp_secret: + raise HTTPException(status_code=400, detail="TOTP setup not started — call /setup first") + + if current_user.totp_enabled: + raise HTTPException(status_code=400, detail="TOTP is already enabled") + + matched_window = verify_totp_code(current_user.totp_secret, data.code) + if matched_window is None: + raise HTTPException(status_code=400, detail="Invalid code — check your authenticator app time sync") + + current_user.totp_enabled = True + await db.commit() + + return {"message": "TOTP enabled successfully"} + + +@router.post("/totp-verify") +async def totp_verify( + data: TOTPVerifyRequest, + request: Request, + response: Response, + db: AsyncSession = Depends(get_db), +): + """ + MFA challenge endpoint — called after a successful password login when TOTP is enabled. + Accepts either a 6-digit TOTP code or a backup recovery code. + + Uses the short-lived mfa_token (from POST /login) instead of a session cookie + because the user is not yet fully authenticated at this stage. + + On success: issues a full session cookie and returns { authenticated: true }. + """ + if not data.code and not data.backup_code: + raise HTTPException(status_code=422, detail="Provide either 'code' or 'backup_code'") + + # Validate the MFA challenge token (5-minute TTL) + user_id = verify_mfa_token(data.mfa_token) + if user_id is None: + raise HTTPException(status_code=401, detail="MFA session expired — please log in again") + + result = await db.execute(select(User).where(User.id == user_id, User.is_active == True)) + user = result.scalar_one_or_none() + if not user: + raise HTTPException(status_code=401, detail="User not found or inactive") + + if not user.totp_enabled or not user.totp_secret: + raise HTTPException(status_code=400, detail="TOTP not configured for this account") + + # Check account lockout (shared counter with password failures) + if user.locked_until and datetime.now() < user.locked_until: + remaining = int((user.locked_until - datetime.now()).total_seconds() / 60) + 1 + raise HTTPException( + status_code=423, + detail=f"Account locked. Try again in {remaining} minutes.", + ) + + # --- Backup code path --- + if data.backup_code: + normalized = data.backup_code.strip().upper() + valid = await _verify_backup_code(db, user.id, normalized) + if not valid: + user.failed_login_count += 1 + if user.failed_login_count >= 10: + user.locked_until = datetime.now() + timedelta(minutes=30) + await db.commit() + raise HTTPException(status_code=401, detail="Invalid backup code") + + # Backup code accepted — reset lockout counter and issue session + user.failed_login_count = 0 + user.locked_until = None + user.last_login_at = datetime.now() + await db.commit() + + token = await _create_full_session(db, user, request) + _set_session_cookie(response, token) + return {"authenticated": True} + + # --- TOTP code path --- + matched_window = verify_totp_code(user.totp_secret, data.code) + if matched_window is None: + user.failed_login_count += 1 + if user.failed_login_count >= 10: + user.locked_until = datetime.now() + timedelta(minutes=30) + await db.commit() + raise HTTPException(status_code=401, detail="Invalid code") + + # Replay prevention — record (user_id, code, actual_matching_window) + totp_record = TOTPUsage(user_id=user.id, code=data.code, window=matched_window) + db.add(totp_record) + try: + await db.commit() + except IntegrityError: + await db.rollback() + raise HTTPException(status_code=401, detail="Code already used — wait for the next code") + + # Success — reset lockout counter, update last_login_at, issue full session + user.failed_login_count = 0 + user.locked_until = None + user.last_login_at = datetime.now() + await db.commit() + + token = await _create_full_session(db, user, request) + _set_session_cookie(response, token) + return {"authenticated": True} + + +@router.post("/totp/disable") +async def totp_disable( + data: TOTPDisableRequest, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """ + Disable TOTP for the current user. + Requires both current password AND a valid TOTP code as confirmation. + Clears totp_secret, sets totp_enabled=False, and deletes all backup codes. + """ + if not current_user.totp_enabled: + raise HTTPException(status_code=400, detail="TOTP is not enabled") + + # Verify password (handles bcrypt→Argon2id upgrade transparently) + valid, new_hash = verify_password_with_upgrade(data.password, current_user.password_hash) + if not valid: + raise HTTPException(status_code=401, detail="Invalid password") + + if new_hash: + current_user.password_hash = new_hash + + # Verify TOTP code — both checks required for disable + matched_window = verify_totp_code(current_user.totp_secret, data.code) + if matched_window is None: + raise HTTPException(status_code=401, detail="Invalid TOTP code") + + # All checks passed — disable TOTP + current_user.totp_secret = None + current_user.totp_enabled = False + await _delete_backup_codes(db, current_user.id) + await db.commit() + + return {"message": "TOTP disabled successfully"} + + +@router.post("/totp/backup-codes/regenerate") +async def regenerate_backup_codes( + data: BackupCodesRegenerateRequest, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """ + Regenerate backup recovery codes. + Requires current password AND a valid TOTP code. + Deletes all existing backup codes and generates 10 fresh ones. + Returns plaintext codes once — never retrievable again. + """ + if not current_user.totp_enabled: + raise HTTPException(status_code=400, detail="TOTP is not enabled") + + valid, new_hash = verify_password_with_upgrade(data.password, current_user.password_hash) + if not valid: + raise HTTPException(status_code=401, detail="Invalid password") + + if new_hash: + current_user.password_hash = new_hash + await db.commit() + + matched_window = verify_totp_code(current_user.totp_secret, data.code) + if matched_window is None: + raise HTTPException(status_code=401, detail="Invalid TOTP code") + + # Regenerate + plaintext_codes = generate_backup_codes(10) + await _delete_backup_codes(db, current_user.id) + await _store_backup_codes(db, current_user.id, plaintext_codes) + + return {"backup_codes": plaintext_codes} + + +@router.get("/totp/status") +async def totp_status( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Return TOTP enabled state and count of remaining (unused) backup codes.""" + remaining = 0 + if current_user.totp_enabled: + result = await db.execute( + select(BackupCode).where( + BackupCode.user_id == current_user.id, + BackupCode.used_at.is_(None), + ) + ) + remaining = len(result.scalars().all()) + + return { + "enabled": current_user.totp_enabled, + "backup_codes_remaining": remaining, + } diff --git a/backend/app/services/totp.py b/backend/app/services/totp.py new file mode 100644 index 0000000..b7554ee --- /dev/null +++ b/backend/app/services/totp.py @@ -0,0 +1,134 @@ +""" +TOTP service: secret generation/encryption, code verification, QR code generation, +backup code generation. + +All TOTP secrets are Fernet-encrypted at rest using a key derived from SECRET_KEY. +Raw secrets are never logged and are only returned to the client once (at setup). +""" +import pyotp +import secrets +import string +import time +import io +import base64 +import hashlib + +import qrcode +from cryptography.fernet import Fernet + +from app.config import settings as app_settings + + +# --------------------------------------------------------------------------- +# Fernet key derivation +# --------------------------------------------------------------------------- + +def _get_fernet() -> Fernet: + """Derive a 32-byte Fernet key from SECRET_KEY via SHA-256.""" + key = hashlib.sha256(app_settings.SECRET_KEY.encode()).digest() + return Fernet(base64.urlsafe_b64encode(key)) + + +# --------------------------------------------------------------------------- +# Secret management +# --------------------------------------------------------------------------- + +def generate_totp_secret() -> str: + """Generate a new random TOTP secret (base32, ~160 bits entropy).""" + return pyotp.random_base32() + + +def encrypt_totp_secret(raw: str) -> str: + """Encrypt a raw TOTP secret before storing in the DB.""" + return _get_fernet().encrypt(raw.encode()).decode() + + +def decrypt_totp_secret(encrypted: str) -> str: + """Decrypt a TOTP secret retrieved from the DB.""" + return _get_fernet().decrypt(encrypted.encode()).decode() + + +# --------------------------------------------------------------------------- +# Provisioning URI and QR code +# --------------------------------------------------------------------------- + +def get_totp_uri(encrypted_secret: str, username: str) -> str: + """Return the otpauth:// provisioning URI for QR code generation.""" + raw = decrypt_totp_secret(encrypted_secret) + totp = pyotp.TOTP(raw) + return totp.provisioning_uri(name=username, issuer_name=app_settings.TOTP_ISSUER) + + +def generate_qr_base64(uri: str) -> str: + """Return a base64-encoded PNG of the QR code for the provisioning URI.""" + qr = qrcode.QRCode( + version=1, + error_correction=qrcode.constants.ERROR_CORRECT_L, + box_size=10, + border=4, + ) + qr.add_data(uri) + qr.make(fit=True) + img = qr.make_image(fill_color="black", back_color="white") + buf = io.BytesIO() + img.save(buf, format="PNG") + return base64.b64encode(buf.getvalue()).decode() + + +# --------------------------------------------------------------------------- +# Code verification +# --------------------------------------------------------------------------- + +def verify_totp_code(encrypted_secret: str, code: str, valid_window: int = 1) -> int | None: + """ + Verify a TOTP code and return the matching time window, or None if invalid. + + Checks each window individually (T-valid_window ... T+valid_window) so the + caller knows WHICH window matched — required for correct replay-prevention + (the TOTPUsage row must record the actual matching window, not the current one). + + Uses secrets.compare_digest for constant-time comparison to prevent timing attacks. + + Returns: + int — the floor(unix_time / 30) window value that matched + None — no window matched (invalid code) + """ + raw = decrypt_totp_secret(encrypted_secret) + totp = pyotp.TOTP(raw) + current_window = int(time.time() // 30) + + for offset in range(-valid_window, valid_window + 1): + check_window = current_window + offset + # pyotp.at() accepts a unix timestamp; multiply window back to seconds + expected_code = totp.at(check_window * 30) + if secrets.compare_digest(code.strip(), expected_code): + return check_window # Return the ACTUAL window that matched + + return None # No window matched + + +# --------------------------------------------------------------------------- +# Backup codes +# --------------------------------------------------------------------------- + +def generate_backup_codes(count: int = 10) -> list[str]: + """ + Generate recovery backup codes in XXXX-XXXX format. + Uses cryptographically secure randomness (secrets module). + """ + alphabet = string.ascii_uppercase + string.digits + return [ + "".join(secrets.choice(alphabet) for _ in range(4)) + + "-" + + "".join(secrets.choice(alphabet) for _ in range(4)) + for _ in range(count) + ] + + +# --------------------------------------------------------------------------- +# Utility +# --------------------------------------------------------------------------- + +def get_totp_window() -> int: + """Return the current TOTP time window (floor(unix_time / 30)).""" + return int(time.time() // 30) diff --git a/backend/requirements.txt b/backend/requirements.txt index 50ed902..0f6463b 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -7,6 +7,9 @@ pydantic==2.10.4 pydantic-settings==2.7.1 bcrypt==4.2.1 argon2-cffi>=23.1.0 +pyotp>=2.9.0 +qrcode[pil]>=7.4.0 +cryptography>=42.0.0 python-multipart==0.0.20 python-dateutil==2.9.0 itsdangerous==2.2.0