2026-02-15 16:13:41 +08:00

152 lines
4.3 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Response, Cookie
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import Optional
import bcrypt
from itsdangerous import TimestampSigner, BadSignature
from app.database import get_db
from app.models.settings import Settings
from app.schemas.settings import SettingsCreate
from app.config import settings as app_settings
router = APIRouter()
# Initialize signer for session management
signer = TimestampSigner(app_settings.SECRET_KEY)
def hash_pin(pin: str) -> str:
"""Hash a PIN using bcrypt."""
return bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode()
def verify_pin(pin: str, hashed: str) -> bool:
"""Verify a PIN against its hash."""
return bcrypt.checkpw(pin.encode(), hashed.encode())
def create_session_token(user_id: int) -> str:
"""Create a signed session token."""
return signer.sign(str(user_id)).decode()
def verify_session_token(token: str) -> Optional[int]:
"""Verify and extract user ID from session token."""
try:
unsigned = signer.unsign(token, max_age=86400 * 30) # 30 days
return int(unsigned)
except (BadSignature, ValueError):
return None
async def get_current_session(
session_cookie: Optional[str] = Cookie(None, alias="session"),
db: AsyncSession = Depends(get_db)
) -> Settings:
"""Dependency to verify session and return current settings."""
if not session_cookie:
raise HTTPException(status_code=401, detail="Not authenticated")
user_id = verify_session_token(session_cookie)
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid or expired session")
result = await db.execute(select(Settings).where(Settings.id == user_id))
settings_obj = result.scalar_one_or_none()
if not settings_obj:
raise HTTPException(status_code=401, detail="Session invalid")
return settings_obj
@router.post("/setup")
async def setup_pin(
data: SettingsCreate,
response: Response,
db: AsyncSession = Depends(get_db)
):
"""Create initial PIN. Only works if no settings exist."""
result = await db.execute(select(Settings))
existing = result.scalar_one_or_none()
if existing:
raise HTTPException(status_code=400, detail="Setup already completed")
pin_hash = hash_pin(data.pin)
new_settings = Settings(pin_hash=pin_hash)
db.add(new_settings)
await db.commit()
await db.refresh(new_settings)
# Create session
token = create_session_token(new_settings.id)
response.set_cookie(
key="session",
value=token,
httponly=True,
max_age=86400 * 30, # 30 days
samesite="lax"
)
return {"message": "Setup completed successfully", "authenticated": True}
@router.post("/login")
async def login(
data: SettingsCreate,
response: Response,
db: AsyncSession = Depends(get_db)
):
"""Verify PIN and create session."""
result = await db.execute(select(Settings))
settings_obj = result.scalar_one_or_none()
if not settings_obj:
raise HTTPException(status_code=400, detail="Setup required")
if not verify_pin(data.pin, settings_obj.pin_hash):
raise HTTPException(status_code=401, detail="Invalid PIN")
# Create session
token = create_session_token(settings_obj.id)
response.set_cookie(
key="session",
value=token,
httponly=True,
max_age=86400 * 30, # 30 days
samesite="lax"
)
return {"message": "Login successful", "authenticated": True}
@router.post("/logout")
async def logout(response: Response):
"""Clear session cookie."""
response.delete_cookie(key="session")
return {"message": "Logout successful"}
@router.get("/status")
async def auth_status(
session_cookie: Optional[str] = Cookie(None, alias="session"),
db: AsyncSession = Depends(get_db)
):
"""Check authentication status."""
result = await db.execute(select(Settings))
settings_obj = result.scalar_one_or_none()
setup_required = settings_obj is None
authenticated = False
if not setup_required and session_cookie:
user_id = verify_session_token(session_cookie)
authenticated = user_id is not None
return {
"authenticated": authenticated,
"setup_required": setup_required
}