UMBRA/backend/alembic/versions/024_totp_mfa_tables.py
Kyle Pope b134ad9e8b Implement Stage 6 Track B: TOTP MFA (pyotp, Fernet-encrypted secrets, backup codes)
- models/totp_usage.py: replay-prevention table, unique on (user_id, code, window)
- models/backup_code.py: Argon2id-hashed recovery codes with used_at tracking
- services/totp.py: Fernet encrypt/decrypt, verify_totp_code returns actual window, QR base64, backup code generation
- routers/totp.py: setup (idempotent), confirm, totp-verify (mfa_token + TOTP or backup code), disable, regenerate, status
- alembic/024: creates totp_usage and backup_codes tables
- main.py: register totp router, import new models for Alembic discovery
- requirements.txt: add pyotp>=2.9.0, qrcode[pil]>=7.4.0, cryptography>=42.0.0
- jobs/notifications.py: periodic cleanup for totp_usage (5 min) and expired user_sessions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 04:18:05 +08:00

75 lines
2.4 KiB
Python

"""TOTP MFA: create totp_usage and backup_codes tables.
Revision ID: 024
Revises: 023
Create Date: 2026-02-25
Note: totp_secret and totp_enabled columns are already on the users table
from migration 023 — this migration only adds the support tables.
"""
from alembic import op
import sqlalchemy as sa
revision = "024"
down_revision = "023"
branch_labels = None
depends_on = None
def upgrade() -> None:
# --- totp_usage: tracks used TOTP codes for replay prevention ---
op.create_table(
"totp_usage",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"user_id",
sa.Integer(),
sa.ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("code", sa.String(6), nullable=False),
# The actual TOTP time window (floor(unix_time / 30)) that matched
sa.Column("window", sa.Integer(), nullable=False),
sa.Column(
"used_at",
sa.DateTime(),
server_default=sa.text("NOW()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id"),
# Unique on (user_id, code, window) — not just (user_id, window) — see model comment
sa.UniqueConstraint("user_id", "code", "window", name="uq_totp_user_code_window"),
)
op.create_index("ix_totp_usage_user_id", "totp_usage", ["user_id"])
# --- backup_codes: hashed recovery codes (Argon2id) ---
op.create_table(
"backup_codes",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"user_id",
sa.Integer(),
sa.ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
),
# Argon2id hash of the plaintext recovery code
sa.Column("code_hash", sa.String(255), nullable=False),
# Null until redeemed
sa.Column("used_at", sa.DateTime(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(),
server_default=sa.text("NOW()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("ix_backup_codes_user_id", "backup_codes", ["user_id"])
def downgrade() -> None:
op.drop_index("ix_backup_codes_user_id", table_name="backup_codes")
op.drop_table("backup_codes")
op.drop_index("ix_totp_usage_user_id", table_name="totp_usage")
op.drop_table("totp_usage")