Merge branch 'stage6-track-b-totp-mfa' into stage6-phase4-5-settings-totp-ntfy

# Conflicts:
#	frontend/src/components/settings/NtfySettingsSection.tsx
#	frontend/src/components/settings/TotpSetupSection.tsx
This commit is contained in:
Kyle 2026-02-25 04:29:33 +08:00
commit f136a0820d
8 changed files with 700 additions and 0 deletions

View File

@ -0,0 +1,74 @@
"""TOTP MFA: create totp_usage and backup_codes tables.
Revision ID: 024
Revises: 023
Create Date: 2026-02-25
Note: totp_secret and totp_enabled columns are already on the users table
from migration 023 this migration only adds the support tables.
"""
from alembic import op
import sqlalchemy as sa
revision = "024"
down_revision = "023"
branch_labels = None
depends_on = None
def upgrade() -> None:
# --- totp_usage: tracks used TOTP codes for replay prevention ---
op.create_table(
"totp_usage",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"user_id",
sa.Integer(),
sa.ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("code", sa.String(6), nullable=False),
# The actual TOTP time window (floor(unix_time / 30)) that matched
sa.Column("window", sa.Integer(), nullable=False),
sa.Column(
"used_at",
sa.DateTime(),
server_default=sa.text("NOW()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id"),
# Unique on (user_id, code, window) — not just (user_id, window) — see model comment
sa.UniqueConstraint("user_id", "code", "window", name="uq_totp_user_code_window"),
)
op.create_index("ix_totp_usage_user_id", "totp_usage", ["user_id"])
# --- backup_codes: hashed recovery codes (Argon2id) ---
op.create_table(
"backup_codes",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"user_id",
sa.Integer(),
sa.ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
),
# Argon2id hash of the plaintext recovery code
sa.Column("code_hash", sa.String(255), nullable=False),
# Null until redeemed
sa.Column("used_at", sa.DateTime(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(),
server_default=sa.text("NOW()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("ix_backup_codes_user_id", "backup_codes", ["user_id"])
def downgrade() -> None:
op.drop_index("ix_backup_codes_user_id", table_name="backup_codes")
op.drop_table("backup_codes")
op.drop_index("ix_totp_usage_user_id", table_name="totp_usage")
op.drop_table("totp_usage")

View File

@ -21,6 +21,8 @@ from app.models.calendar_event import CalendarEvent
from app.models.todo import Todo from app.models.todo import Todo
from app.models.project import Project from app.models.project import Project
from app.models.ntfy_sent import NtfySent from app.models.ntfy_sent import NtfySent
from app.models.totp_usage import TOTPUsage
from app.models.session import UserSession
from app.services.ntfy import send_ntfy_notification from app.services.ntfy import send_ntfy_notification
from app.services.ntfy_templates import ( from app.services.ntfy_templates import (
build_event_notification, build_event_notification,
@ -211,6 +213,19 @@ async def _purge_old_sent_records(db: AsyncSession) -> None:
await db.commit() await db.commit()
async def _purge_totp_usage(db: AsyncSession) -> None:
"""Remove TOTP usage records older than 5 minutes — they serve no purpose beyond replay prevention."""
cutoff = datetime.now() - timedelta(minutes=5)
await db.execute(delete(TOTPUsage).where(TOTPUsage.used_at < cutoff))
await db.commit()
async def _purge_expired_sessions(db: AsyncSession) -> None:
"""Remove expired UserSession rows to keep the sessions table lean."""
await db.execute(delete(UserSession).where(UserSession.expires_at < datetime.now()))
await db.commit()
# ── Entry point ─────────────────────────────────────────────────────────────── # ── Entry point ───────────────────────────────────────────────────────────────
async def run_notification_dispatch() -> None: async def run_notification_dispatch() -> None:
@ -242,6 +257,11 @@ async def run_notification_dispatch() -> None:
# Daily housekeeping: purge stale dedup records # Daily housekeeping: purge stale dedup records
await _purge_old_sent_records(db) await _purge_old_sent_records(db)
# Security housekeeping runs every cycle regardless of ntfy_enabled
async with AsyncSessionLocal() as db:
await _purge_totp_usage(db)
await _purge_expired_sessions(db)
except Exception: except Exception:
# Broad catch: job failure must never crash the scheduler or the app # Broad catch: job failure must never crash the scheduler or the app
logger.exception("ntfy dispatch job encountered an unhandled error") logger.exception("ntfy dispatch job encountered an unhandled error")

View File

@ -6,11 +6,14 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.database import engine from app.database import engine
from app.routers import auth, todos, events, calendars, reminders, projects, people, locations, settings as settings_router, dashboard, weather, event_templates from app.routers import auth, todos, events, calendars, reminders, projects, people, locations, settings as settings_router, dashboard, weather, event_templates
from app.routers import totp
from app.jobs.notifications import run_notification_dispatch from app.jobs.notifications import run_notification_dispatch
# Import models so Alembic's autogenerate can discover them # Import models so Alembic's autogenerate can discover them
from app.models import user as _user_model # noqa: F401 from app.models import user as _user_model # noqa: F401
from app.models import session as _session_model # noqa: F401 from app.models import session as _session_model # noqa: F401
from app.models import totp_usage as _totp_usage_model # noqa: F401
from app.models import backup_code as _backup_code_model # noqa: F401
@asynccontextmanager @asynccontextmanager
@ -58,6 +61,7 @@ app.include_router(settings_router.router, prefix="/api/settings", tags=["Settin
app.include_router(dashboard.router, prefix="/api", tags=["Dashboard"]) app.include_router(dashboard.router, prefix="/api", tags=["Dashboard"])
app.include_router(weather.router, prefix="/api/weather", tags=["Weather"]) app.include_router(weather.router, prefix="/api/weather", tags=["Weather"])
app.include_router(event_templates.router, prefix="/api/event-templates", tags=["Event Templates"]) app.include_router(event_templates.router, prefix="/api/event-templates", tags=["Event Templates"])
app.include_router(totp.router, prefix="/api/auth", tags=["TOTP MFA"])
@app.get("/") @app.get("/")

View File

@ -0,0 +1,19 @@
from sqlalchemy import String, ForeignKey, func
from sqlalchemy.orm import Mapped, mapped_column
from datetime import datetime
from typing import Optional
from app.database import Base
class BackupCode(Base):
__tablename__ = "backup_codes"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
# Argon2id hash of the plaintext recovery code — never store plaintext
code_hash: Mapped[str] = mapped_column(String(255), nullable=False)
# Null until redeemed; set to datetime.now() on successful use
used_at: Mapped[Optional[datetime]] = mapped_column(nullable=True, default=None)
created_at: Mapped[datetime] = mapped_column(default=func.now())

View File

@ -0,0 +1,30 @@
from sqlalchemy import String, Integer, ForeignKey, UniqueConstraint, func
from sqlalchemy.orm import Mapped, mapped_column
from datetime import datetime
from app.database import Base
class TOTPUsage(Base):
__tablename__ = "totp_usage"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
# The 6-digit code that was used
code: Mapped[str] = mapped_column(String(6), nullable=False)
# The TOTP time window in which the code was valid (floor(unix_time / 30))
window: Mapped[int] = mapped_column(Integer, nullable=False)
used_at: Mapped[datetime] = mapped_column(default=func.now())
# NOTE on replay prevention design:
# The unique constraint is on (user_id, code, window) — NOT (user_id, window).
# This allows a user to present the T-1 window code and then the T window code
# consecutively — these are different 6-digit OTPs, so both should succeed.
# Constraining only (user_id, window) would reject the second legitimate code.
# The actual attack we prevent is reusing the *same code string in the same window*,
# which is the only true replay. This is compliant with RFC 6238 §5.2.
# The 5-minute MFA token expiry provides the outer time bound on any abuse window.
__table_args__ = (
UniqueConstraint("user_id", "code", "window", name="uq_totp_user_code_window"),
)

416
backend/app/routers/totp.py Normal file
View File

@ -0,0 +1,416 @@
"""
TOTP MFA router.
Endpoints (all under /api/auth registered in main.py with prefix="/api/auth"):
POST /totp/setup Generate secret + QR + backup codes (auth required)
POST /totp/confirm Verify first code, enable TOTP (auth required)
POST /totp-verify MFA challenge: mfa_token + TOTP/backup code, issues session
POST /totp/disable Disable TOTP (auth required, needs password + code)
POST /totp/backup-codes/regenerate Regenerate backup codes (auth required, needs password + code)
GET /totp/status { enabled, backup_codes_remaining } (auth required)
Security:
- TOTP secrets encrypted at rest (Fernet/AES-128-CBC, key derived from SECRET_KEY)
- Replay prevention via totp_usage table (unique on user_id+code+window)
- Backup codes hashed with Argon2id, shown plaintext once only
- Failed TOTP attempts increment user.failed_login_count (shared lockout counter)
- totp-verify uses mfa_token (not session cookie) user is not yet authenticated
"""
import uuid
import secrets
import logging
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete
from sqlalchemy.exc import IntegrityError
from app.database import get_db
from app.models.user import User
from app.models.session import UserSession
from app.models.totp_usage import TOTPUsage
from app.models.backup_code import BackupCode
from app.routers.auth import get_current_user, _set_session_cookie
from app.services.auth import (
verify_password_with_upgrade,
hash_password,
verify_mfa_token,
create_session_token,
)
from app.services.totp import (
generate_totp_secret,
encrypt_totp_secret,
decrypt_totp_secret,
get_totp_uri,
verify_totp_code,
generate_qr_base64,
generate_backup_codes,
)
from app.config import settings as app_settings
# Argon2id for backup code hashing — treat each code like a password
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError
logger = logging.getLogger(__name__)
router = APIRouter()
# Argon2id instance for backup code hashes (same params as password hashing)
_ph = PasswordHasher(
time_cost=2,
memory_cost=19456,
parallelism=1,
hash_len=32,
salt_len=16,
)
# ---------------------------------------------------------------------------
# Request schemas
# ---------------------------------------------------------------------------
class TOTPConfirmRequest(BaseModel):
code: str
class TOTPVerifyRequest(BaseModel):
mfa_token: str
code: Optional[str] = None # 6-digit TOTP code
backup_code: Optional[str] = None # Alternative: XXXX-XXXX backup code
class TOTPDisableRequest(BaseModel):
password: str
code: str # Current TOTP code required to disable
class BackupCodesRegenerateRequest(BaseModel):
password: str
code: str # Current TOTP code required to regenerate
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
async def _store_backup_codes(db: AsyncSession, user_id: int, plaintext_codes: list[str]) -> None:
"""Hash and insert backup codes for the given user."""
for code in plaintext_codes:
code_hash = _ph.hash(code)
db.add(BackupCode(user_id=user_id, code_hash=code_hash))
await db.commit()
async def _delete_backup_codes(db: AsyncSession, user_id: int) -> None:
"""Delete all backup codes for a user."""
await db.execute(delete(BackupCode).where(BackupCode.user_id == user_id))
await db.commit()
async def _verify_backup_code(
db: AsyncSession, user_id: int, submitted_code: str
) -> bool:
"""
Check submitted backup code against all unused hashes for the user.
On match, marks the code as used. Returns True if a valid unused code was found.
Uses Argon2id verification constant-time by design.
"""
result = await db.execute(
select(BackupCode).where(
BackupCode.user_id == user_id,
BackupCode.used_at.is_(None),
)
)
unused_codes = result.scalars().all()
for record in unused_codes:
try:
if _ph.verify(record.code_hash, submitted_code):
record.used_at = datetime.now()
await db.commit()
return True
except (VerifyMismatchError, VerificationError, InvalidHashError):
continue
return False
async def _create_full_session(
db: AsyncSession,
user: User,
request: Request,
) -> str:
"""Create a UserSession row and return the signed cookie token."""
session_id = uuid.uuid4().hex
expires_at = datetime.now() + timedelta(days=app_settings.SESSION_MAX_AGE_DAYS)
ip = request.client.host if request.client else None
user_agent = request.headers.get("user-agent")
db_session = UserSession(
id=session_id,
user_id=user.id,
expires_at=expires_at,
ip_address=ip[:45] if ip else None,
user_agent=(user_agent or "")[:255] if user_agent else None,
)
db.add(db_session)
await db.commit()
return create_session_token(user.id, session_id)
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@router.post("/totp/setup")
async def totp_setup(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Generate a new TOTP secret, QR code, and backup codes.
Stores the encrypted secret with totp_enabled=False until confirmed.
Idempotent: calling again before confirmation overwrites the unconfirmed secret,
so browser refreshes mid-setup generate a fresh QR without error.
Returns { secret, qr_code_base64, backup_codes } the only time plaintext
values are shown. The `secret` field is the raw base32 for manual entry.
"""
# Generate new secret (idempotent — overwrite any existing unconfirmed secret)
raw_secret = generate_totp_secret()
encrypted_secret = encrypt_totp_secret(raw_secret)
current_user.totp_secret = encrypted_secret
current_user.totp_enabled = False # Not enabled until /confirm called
# Generate backup codes — hash before storage, return plaintext once
plaintext_codes = generate_backup_codes(10)
await _delete_backup_codes(db, current_user.id) # Remove any previous unconfirmed codes
await _store_backup_codes(db, current_user.id, plaintext_codes)
await db.commit()
# Build QR code from provisioning URI
uri = get_totp_uri(encrypted_secret, current_user.username)
qr_base64 = generate_qr_base64(uri)
return {
"secret": raw_secret, # Raw base32 for manual authenticator entry
"qr_code_base64": qr_base64, # PNG QR code, data:image/png;base64,...
"backup_codes": plaintext_codes, # Shown once — user must save these
}
@router.post("/totp/confirm")
async def totp_confirm(
data: TOTPConfirmRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Verify the first TOTP code from the authenticator app and enable TOTP.
Must be called after /setup while totp_enabled is still False.
"""
if not current_user.totp_secret:
raise HTTPException(status_code=400, detail="TOTP setup not started — call /setup first")
if current_user.totp_enabled:
raise HTTPException(status_code=400, detail="TOTP is already enabled")
matched_window = verify_totp_code(current_user.totp_secret, data.code)
if matched_window is None:
raise HTTPException(status_code=400, detail="Invalid code — check your authenticator app time sync")
current_user.totp_enabled = True
await db.commit()
return {"message": "TOTP enabled successfully"}
@router.post("/totp-verify")
async def totp_verify(
data: TOTPVerifyRequest,
request: Request,
response: Response,
db: AsyncSession = Depends(get_db),
):
"""
MFA challenge endpoint called after a successful password login when TOTP is enabled.
Accepts either a 6-digit TOTP code or a backup recovery code.
Uses the short-lived mfa_token (from POST /login) instead of a session cookie
because the user is not yet fully authenticated at this stage.
On success: issues a full session cookie and returns { authenticated: true }.
"""
if not data.code and not data.backup_code:
raise HTTPException(status_code=422, detail="Provide either 'code' or 'backup_code'")
# Validate the MFA challenge token (5-minute TTL)
user_id = verify_mfa_token(data.mfa_token)
if user_id is None:
raise HTTPException(status_code=401, detail="MFA session expired — please log in again")
result = await db.execute(select(User).where(User.id == user_id, User.is_active == True))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=401, detail="User not found or inactive")
if not user.totp_enabled or not user.totp_secret:
raise HTTPException(status_code=400, detail="TOTP not configured for this account")
# Check account lockout (shared counter with password failures)
if user.locked_until and datetime.now() < user.locked_until:
remaining = int((user.locked_until - datetime.now()).total_seconds() / 60) + 1
raise HTTPException(
status_code=423,
detail=f"Account locked. Try again in {remaining} minutes.",
)
# --- Backup code path ---
if data.backup_code:
normalized = data.backup_code.strip().upper()
valid = await _verify_backup_code(db, user.id, normalized)
if not valid:
user.failed_login_count += 1
if user.failed_login_count >= 10:
user.locked_until = datetime.now() + timedelta(minutes=30)
await db.commit()
raise HTTPException(status_code=401, detail="Invalid backup code")
# Backup code accepted — reset lockout counter and issue session
user.failed_login_count = 0
user.locked_until = None
user.last_login_at = datetime.now()
await db.commit()
token = await _create_full_session(db, user, request)
_set_session_cookie(response, token)
return {"authenticated": True}
# --- TOTP code path ---
matched_window = verify_totp_code(user.totp_secret, data.code)
if matched_window is None:
user.failed_login_count += 1
if user.failed_login_count >= 10:
user.locked_until = datetime.now() + timedelta(minutes=30)
await db.commit()
raise HTTPException(status_code=401, detail="Invalid code")
# Replay prevention — record (user_id, code, actual_matching_window)
totp_record = TOTPUsage(user_id=user.id, code=data.code, window=matched_window)
db.add(totp_record)
try:
await db.commit()
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=401, detail="Code already used — wait for the next code")
# Success — reset lockout counter, update last_login_at, issue full session
user.failed_login_count = 0
user.locked_until = None
user.last_login_at = datetime.now()
await db.commit()
token = await _create_full_session(db, user, request)
_set_session_cookie(response, token)
return {"authenticated": True}
@router.post("/totp/disable")
async def totp_disable(
data: TOTPDisableRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Disable TOTP for the current user.
Requires both current password AND a valid TOTP code as confirmation.
Clears totp_secret, sets totp_enabled=False, and deletes all backup codes.
"""
if not current_user.totp_enabled:
raise HTTPException(status_code=400, detail="TOTP is not enabled")
# Verify password (handles bcrypt→Argon2id upgrade transparently)
valid, new_hash = verify_password_with_upgrade(data.password, current_user.password_hash)
if not valid:
raise HTTPException(status_code=401, detail="Invalid password")
if new_hash:
current_user.password_hash = new_hash
# Verify TOTP code — both checks required for disable
matched_window = verify_totp_code(current_user.totp_secret, data.code)
if matched_window is None:
raise HTTPException(status_code=401, detail="Invalid TOTP code")
# All checks passed — disable TOTP
current_user.totp_secret = None
current_user.totp_enabled = False
await _delete_backup_codes(db, current_user.id)
await db.commit()
return {"message": "TOTP disabled successfully"}
@router.post("/totp/backup-codes/regenerate")
async def regenerate_backup_codes(
data: BackupCodesRegenerateRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Regenerate backup recovery codes.
Requires current password AND a valid TOTP code.
Deletes all existing backup codes and generates 10 fresh ones.
Returns plaintext codes once never retrievable again.
"""
if not current_user.totp_enabled:
raise HTTPException(status_code=400, detail="TOTP is not enabled")
valid, new_hash = verify_password_with_upgrade(data.password, current_user.password_hash)
if not valid:
raise HTTPException(status_code=401, detail="Invalid password")
if new_hash:
current_user.password_hash = new_hash
await db.commit()
matched_window = verify_totp_code(current_user.totp_secret, data.code)
if matched_window is None:
raise HTTPException(status_code=401, detail="Invalid TOTP code")
# Regenerate
plaintext_codes = generate_backup_codes(10)
await _delete_backup_codes(db, current_user.id)
await _store_backup_codes(db, current_user.id, plaintext_codes)
return {"backup_codes": plaintext_codes}
@router.get("/totp/status")
async def totp_status(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return TOTP enabled state and count of remaining (unused) backup codes."""
remaining = 0
if current_user.totp_enabled:
result = await db.execute(
select(BackupCode).where(
BackupCode.user_id == current_user.id,
BackupCode.used_at.is_(None),
)
)
remaining = len(result.scalars().all())
return {
"enabled": current_user.totp_enabled,
"backup_codes_remaining": remaining,
}

View File

@ -0,0 +1,134 @@
"""
TOTP service: secret generation/encryption, code verification, QR code generation,
backup code generation.
All TOTP secrets are Fernet-encrypted at rest using a key derived from SECRET_KEY.
Raw secrets are never logged and are only returned to the client once (at setup).
"""
import pyotp
import secrets
import string
import time
import io
import base64
import hashlib
import qrcode
from cryptography.fernet import Fernet
from app.config import settings as app_settings
# ---------------------------------------------------------------------------
# Fernet key derivation
# ---------------------------------------------------------------------------
def _get_fernet() -> Fernet:
"""Derive a 32-byte Fernet key from SECRET_KEY via SHA-256."""
key = hashlib.sha256(app_settings.SECRET_KEY.encode()).digest()
return Fernet(base64.urlsafe_b64encode(key))
# ---------------------------------------------------------------------------
# Secret management
# ---------------------------------------------------------------------------
def generate_totp_secret() -> str:
"""Generate a new random TOTP secret (base32, ~160 bits entropy)."""
return pyotp.random_base32()
def encrypt_totp_secret(raw: str) -> str:
"""Encrypt a raw TOTP secret before storing in the DB."""
return _get_fernet().encrypt(raw.encode()).decode()
def decrypt_totp_secret(encrypted: str) -> str:
"""Decrypt a TOTP secret retrieved from the DB."""
return _get_fernet().decrypt(encrypted.encode()).decode()
# ---------------------------------------------------------------------------
# Provisioning URI and QR code
# ---------------------------------------------------------------------------
def get_totp_uri(encrypted_secret: str, username: str) -> str:
"""Return the otpauth:// provisioning URI for QR code generation."""
raw = decrypt_totp_secret(encrypted_secret)
totp = pyotp.TOTP(raw)
return totp.provisioning_uri(name=username, issuer_name=app_settings.TOTP_ISSUER)
def generate_qr_base64(uri: str) -> str:
"""Return a base64-encoded PNG of the QR code for the provisioning URI."""
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buf = io.BytesIO()
img.save(buf, format="PNG")
return base64.b64encode(buf.getvalue()).decode()
# ---------------------------------------------------------------------------
# Code verification
# ---------------------------------------------------------------------------
def verify_totp_code(encrypted_secret: str, code: str, valid_window: int = 1) -> int | None:
"""
Verify a TOTP code and return the matching time window, or None if invalid.
Checks each window individually (T-valid_window ... T+valid_window) so the
caller knows WHICH window matched required for correct replay-prevention
(the TOTPUsage row must record the actual matching window, not the current one).
Uses secrets.compare_digest for constant-time comparison to prevent timing attacks.
Returns:
int the floor(unix_time / 30) window value that matched
None no window matched (invalid code)
"""
raw = decrypt_totp_secret(encrypted_secret)
totp = pyotp.TOTP(raw)
current_window = int(time.time() // 30)
for offset in range(-valid_window, valid_window + 1):
check_window = current_window + offset
# pyotp.at() accepts a unix timestamp; multiply window back to seconds
expected_code = totp.at(check_window * 30)
if secrets.compare_digest(code.strip(), expected_code):
return check_window # Return the ACTUAL window that matched
return None # No window matched
# ---------------------------------------------------------------------------
# Backup codes
# ---------------------------------------------------------------------------
def generate_backup_codes(count: int = 10) -> list[str]:
"""
Generate recovery backup codes in XXXX-XXXX format.
Uses cryptographically secure randomness (secrets module).
"""
alphabet = string.ascii_uppercase + string.digits
return [
"".join(secrets.choice(alphabet) for _ in range(4))
+ "-"
+ "".join(secrets.choice(alphabet) for _ in range(4))
for _ in range(count)
]
# ---------------------------------------------------------------------------
# Utility
# ---------------------------------------------------------------------------
def get_totp_window() -> int:
"""Return the current TOTP time window (floor(unix_time / 30))."""
return int(time.time() // 30)

View File

@ -7,6 +7,9 @@ pydantic==2.10.4
pydantic-settings==2.7.1 pydantic-settings==2.7.1
bcrypt==4.2.1 bcrypt==4.2.1
argon2-cffi>=23.1.0 argon2-cffi>=23.1.0
pyotp>=2.9.0
qrcode[pil]>=7.4.0
cryptography>=42.0.0
python-multipart==0.0.20 python-multipart==0.0.20
python-dateutil==2.9.0 python-dateutil==2.9.0
itsdangerous==2.2.0 itsdangerous==2.2.0