- models/totp_usage.py: replay-prevention table, unique on (user_id, code, window) - models/backup_code.py: Argon2id-hashed recovery codes with used_at tracking - services/totp.py: Fernet encrypt/decrypt, verify_totp_code returns actual window, QR base64, backup code generation - routers/totp.py: setup (idempotent), confirm, totp-verify (mfa_token + TOTP or backup code), disable, regenerate, status - alembic/024: creates totp_usage and backup_codes tables - main.py: register totp router, import new models for Alembic discovery - requirements.txt: add pyotp>=2.9.0, qrcode[pil]>=7.4.0, cryptography>=42.0.0 - jobs/notifications.py: periodic cleanup for totp_usage (5 min) and expired user_sessions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
75 lines
2.4 KiB
Python
75 lines
2.4 KiB
Python
"""TOTP MFA: create totp_usage and backup_codes tables.
|
|
|
|
Revision ID: 024
|
|
Revises: 023
|
|
Create Date: 2026-02-25
|
|
|
|
Note: totp_secret and totp_enabled columns are already on the users table
|
|
from migration 023 — this migration only adds the support tables.
|
|
"""
|
|
from alembic import op
|
|
import sqlalchemy as sa
|
|
|
|
revision = "024"
|
|
down_revision = "023"
|
|
branch_labels = None
|
|
depends_on = None
|
|
|
|
|
|
def upgrade() -> None:
|
|
# --- totp_usage: tracks used TOTP codes for replay prevention ---
|
|
op.create_table(
|
|
"totp_usage",
|
|
sa.Column("id", sa.Integer(), nullable=False),
|
|
sa.Column(
|
|
"user_id",
|
|
sa.Integer(),
|
|
sa.ForeignKey("users.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
),
|
|
sa.Column("code", sa.String(6), nullable=False),
|
|
# The actual TOTP time window (floor(unix_time / 30)) that matched
|
|
sa.Column("window", sa.Integer(), nullable=False),
|
|
sa.Column(
|
|
"used_at",
|
|
sa.DateTime(),
|
|
server_default=sa.text("NOW()"),
|
|
nullable=False,
|
|
),
|
|
sa.PrimaryKeyConstraint("id"),
|
|
# Unique on (user_id, code, window) — not just (user_id, window) — see model comment
|
|
sa.UniqueConstraint("user_id", "code", "window", name="uq_totp_user_code_window"),
|
|
)
|
|
op.create_index("ix_totp_usage_user_id", "totp_usage", ["user_id"])
|
|
|
|
# --- backup_codes: hashed recovery codes (Argon2id) ---
|
|
op.create_table(
|
|
"backup_codes",
|
|
sa.Column("id", sa.Integer(), nullable=False),
|
|
sa.Column(
|
|
"user_id",
|
|
sa.Integer(),
|
|
sa.ForeignKey("users.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
),
|
|
# Argon2id hash of the plaintext recovery code
|
|
sa.Column("code_hash", sa.String(255), nullable=False),
|
|
# Null until redeemed
|
|
sa.Column("used_at", sa.DateTime(), nullable=True),
|
|
sa.Column(
|
|
"created_at",
|
|
sa.DateTime(),
|
|
server_default=sa.text("NOW()"),
|
|
nullable=False,
|
|
),
|
|
sa.PrimaryKeyConstraint("id"),
|
|
)
|
|
op.create_index("ix_backup_codes_user_id", "backup_codes", ["user_id"])
|
|
|
|
|
|
def downgrade() -> None:
|
|
op.drop_index("ix_backup_codes_user_id", table_name="backup_codes")
|
|
op.drop_table("backup_codes")
|
|
op.drop_index("ix_totp_usage_user_id", table_name="totp_usage")
|
|
op.drop_table("totp_usage")
|