- New User model (username, argon2id password_hash, totp fields, lockout) - New UserSession model (DB-backed revocation, replaces in-memory set) - New services/auth.py: Argon2id hashing, bcrypt→Argon2id upgrade path, URLSafeTimedSerializer session/MFA tokens - New schemas/auth.py: SetupRequest, LoginRequest, ChangePasswordRequest with OWASP password strength validation - Full rewrite of routers/auth.py: setup/login/logout/status/change-password with account lockout (10 failures → 30-min, HTTP 423), IP rate limiting retained as outer layer, get_current_user + get_current_settings dependencies replacing get_current_session - Settings model: drop pin_hash, add user_id FK (nullable for migration) - Schemas/settings.py: remove SettingsCreate, ChangePinRequest, _validate_pin_length - Settings router: rewrite to use get_current_user + get_current_settings, preserve ntfy test endpoint - All 11 consumer routers updated: auth-gate-only routers use get_current_user, routers reading Settings fields use get_current_settings - config.py: add SESSION_MAX_AGE_DAYS, MFA_TOKEN_MAX_AGE_SECONDS, TOTP_ISSUER - main.py: import User and UserSession models for Alembic discovery - requirements.txt: add argon2-cffi>=23.1.0 - Migration 023: create users + user_sessions tables, migrate pin_hash → User row (admin), backfill settings.user_id, drop pin_hash Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
144 lines
6.1 KiB
Python
144 lines
6.1 KiB
Python
"""Auth migration: create users + user_sessions tables, migrate pin_hash to User row,
|
|
add user_id FK to settings, drop pin_hash from settings.
|
|
|
|
Revision ID: 023
|
|
Revises: 022
|
|
Create Date: 2026-02-25
|
|
|
|
Data migration strategy (handles both fresh DB and existing single-user DB):
|
|
1. Create users table
|
|
2. Create user_sessions table
|
|
3. If settings row exists with a pin_hash, create a User row with
|
|
username='admin' and password_hash = pin_hash (bcrypt hash is valid —
|
|
user will be prompted to change password on first login, hash upgrades
|
|
transparently to Argon2id on first successful login).
|
|
4. Add user_id FK column to settings (nullable initially)
|
|
5. Backfill settings.user_id to point at the migrated admin user (if any)
|
|
6. Drop pin_hash from settings
|
|
"""
|
|
from alembic import op
|
|
import sqlalchemy as sa
|
|
from sqlalchemy.dialects import postgresql
|
|
|
|
|
|
# revision identifiers
|
|
revision = '023'
|
|
down_revision = '022'
|
|
branch_labels = None
|
|
depends_on = None
|
|
|
|
|
|
def upgrade() -> None:
|
|
# ------------------------------------------------------------------
|
|
# 1. Create users table
|
|
# ------------------------------------------------------------------
|
|
op.create_table(
|
|
'users',
|
|
sa.Column('id', sa.Integer(), nullable=False),
|
|
sa.Column('username', sa.String(length=50), nullable=False),
|
|
sa.Column('password_hash', sa.String(length=255), nullable=False),
|
|
sa.Column('totp_secret', sa.String(length=500), nullable=True),
|
|
sa.Column('totp_enabled', sa.Boolean(), nullable=False, server_default='false'),
|
|
sa.Column('failed_login_count', sa.Integer(), nullable=False, server_default='0'),
|
|
sa.Column('locked_until', sa.DateTime(), nullable=True),
|
|
sa.Column('is_active', sa.Boolean(), nullable=False, server_default='true'),
|
|
sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.text('NOW()')),
|
|
sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.text('NOW()')),
|
|
sa.Column('last_login_at', sa.DateTime(), nullable=True),
|
|
sa.PrimaryKeyConstraint('id'),
|
|
)
|
|
op.create_index('ix_users_id', 'users', ['id'], unique=False)
|
|
op.create_index('ix_users_username', 'users', ['username'], unique=True)
|
|
|
|
# ------------------------------------------------------------------
|
|
# 2. Create user_sessions table
|
|
# ------------------------------------------------------------------
|
|
op.create_table(
|
|
'user_sessions',
|
|
sa.Column('id', sa.String(length=64), nullable=False),
|
|
sa.Column('user_id', sa.Integer(), nullable=False),
|
|
sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.text('NOW()')),
|
|
sa.Column('expires_at', sa.DateTime(), nullable=False),
|
|
sa.Column('revoked', sa.Boolean(), nullable=False, server_default='false'),
|
|
sa.Column('ip_address', sa.String(length=45), nullable=True),
|
|
sa.Column('user_agent', sa.String(length=255), nullable=True),
|
|
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ondelete='CASCADE'),
|
|
sa.PrimaryKeyConstraint('id'),
|
|
)
|
|
op.create_index('ix_user_sessions_user_id', 'user_sessions', ['user_id'], unique=False)
|
|
|
|
# ------------------------------------------------------------------
|
|
# 3. Data migration: create admin User from existing pin_hash (if any)
|
|
# Uses raw SQL to avoid any ORM dependency issues in the migration.
|
|
# ------------------------------------------------------------------
|
|
bind = op.get_bind()
|
|
|
|
# Check whether settings table has a row with a pin_hash
|
|
result = bind.execute(
|
|
sa.text("SELECT id, pin_hash FROM settings WHERE pin_hash IS NOT NULL LIMIT 1")
|
|
)
|
|
settings_row = result.fetchone()
|
|
|
|
admin_user_id = None
|
|
if settings_row is not None:
|
|
settings_id, existing_pin_hash = settings_row[0], settings_row[1]
|
|
|
|
# Insert the migrated user — username defaults to 'admin', password_hash
|
|
# retains the existing bcrypt hash (will upgrade to Argon2id on first login)
|
|
insert_result = bind.execute(
|
|
sa.text(
|
|
"INSERT INTO users (username, password_hash, is_active, totp_enabled, "
|
|
"failed_login_count, created_at, updated_at) "
|
|
"VALUES ('admin', :ph, true, false, 0, NOW(), NOW()) RETURNING id"
|
|
),
|
|
{"ph": existing_pin_hash},
|
|
)
|
|
admin_user_id = insert_result.fetchone()[0]
|
|
|
|
# ------------------------------------------------------------------
|
|
# 4. Add user_id FK column to settings (nullable during migration)
|
|
# ------------------------------------------------------------------
|
|
op.add_column(
|
|
'settings',
|
|
sa.Column(
|
|
'user_id',
|
|
sa.Integer(),
|
|
sa.ForeignKey('users.id', ondelete='CASCADE'),
|
|
nullable=True,
|
|
)
|
|
)
|
|
op.create_index('ix_settings_user_id', 'settings', ['user_id'], unique=False)
|
|
|
|
# ------------------------------------------------------------------
|
|
# 5. Backfill settings.user_id for the migrated admin user
|
|
# ------------------------------------------------------------------
|
|
if admin_user_id is not None:
|
|
bind.execute(
|
|
sa.text("UPDATE settings SET user_id = :uid"),
|
|
{"uid": admin_user_id},
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# 6. Drop pin_hash from settings — data now lives in users.password_hash
|
|
# ------------------------------------------------------------------
|
|
op.drop_column('settings', 'pin_hash')
|
|
|
|
|
|
def downgrade() -> None:
|
|
# Restore pin_hash column (empty — data cannot be recovered from users table
|
|
# because the column was dropped, not just migrated; acceptable for downgrade path)
|
|
op.add_column(
|
|
'settings',
|
|
sa.Column('pin_hash', sa.String(length=255), nullable=True)
|
|
)
|
|
|
|
op.drop_index('ix_settings_user_id', table_name='settings')
|
|
op.drop_column('settings', 'user_id')
|
|
|
|
op.drop_index('ix_user_sessions_user_id', table_name='user_sessions')
|
|
op.drop_table('user_sessions')
|
|
|
|
op.drop_index('ix_users_username', table_name='users')
|
|
op.drop_index('ix_users_id', table_name='users')
|
|
op.drop_table('users')
|