Add TOTP-based Multi-Factor Authentication (MFA) for local users

Global MFA toggle in Security settings, QR code setup on first login,
6-digit TOTP verification on subsequent logins. Azure AD users exempt.
Admins can reset user MFA. TOTP secrets encrypted at rest with Fernet.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-08 23:14:06 +01:00
parent 647630ff19
commit 3d28f13054
13 changed files with 615 additions and 62 deletions

View File

@@ -1,5 +1,7 @@
"""Authentication API endpoints — login, logout, current user, password change, Azure AD."""
"""Authentication API endpoints — login, logout, current user, password change, MFA, Azure AD."""
import base64
import io
import logging
import secrets
from datetime import datetime
@@ -9,10 +11,18 @@ from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.database import get_db
from app.dependencies import create_access_token, get_current_user
from app.dependencies import create_access_token, create_mfa_token, get_current_user, verify_mfa_token
from app.models import SystemConfig, User
from app.utils.security import decrypt_value, hash_password, verify_password
from app.utils.validators import ChangePasswordRequest, LoginRequest
from app.utils.security import (
decrypt_value,
encrypt_value,
generate_totp_secret,
generate_totp_uri,
hash_password,
verify_password,
verify_totp,
)
from app.utils.validators import ChangePasswordRequest, LoginRequest, MfaTokenRequest, MfaVerifyRequest
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -20,15 +30,7 @@ router = APIRouter()
@router.post("/login")
async def login(payload: LoginRequest, db: Session = Depends(get_db)):
"""Authenticate and return a JWT token.
Args:
payload: Username and password.
db: Database session.
Returns:
JSON with ``access_token`` and ``token_type``.
"""
"""Authenticate with username/password. May require MFA as a second step."""
user = db.query(User).filter(User.username == payload.username).first()
if not user or not verify_password(payload.password, user.password_hash):
raise HTTPException(
@@ -41,6 +43,17 @@ async def login(payload: LoginRequest, db: Session = Depends(get_db)):
detail="Account is disabled.",
)
# Check if MFA is required (only for local users)
if user.auth_provider == "local":
config = db.query(SystemConfig).filter(SystemConfig.id == 1).first()
if config and getattr(config, "mfa_enabled", False):
mfa_token = create_mfa_token(user.username)
return {
"mfa_required": True,
"mfa_token": mfa_token,
"totp_setup_needed": not bool(user.totp_enabled),
}
token = create_access_token(user.username)
logger.info("User %s logged in.", user.username)
return {
@@ -50,24 +63,140 @@ async def login(payload: LoginRequest, db: Session = Depends(get_db)):
}
# ---------------------------------------------------------------------------
# MFA endpoints
# ---------------------------------------------------------------------------
@router.post("/mfa/setup")
async def mfa_setup(payload: MfaTokenRequest, db: Session = Depends(get_db)):
"""Generate a new TOTP secret and QR code for first-time MFA setup."""
username = verify_mfa_token(payload.mfa_token)
user = db.query(User).filter(User.username == username).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found.")
# Generate new secret and store encrypted (not yet enabled)
secret = generate_totp_secret()
user.totp_secret_encrypted = encrypt_value(secret)
db.commit()
# Generate QR code as base64 data URI
uri = generate_totp_uri(secret, username)
import qrcode
img = qrcode.make(uri)
buf = io.BytesIO()
img.save(buf, format="PNG")
qr_b64 = base64.b64encode(buf.getvalue()).decode()
return {
"secret": secret,
"qr_code": f"data:image/png;base64,{qr_b64}",
"otpauth_uri": uri,
}
@router.post("/mfa/setup/complete")
async def mfa_setup_complete(payload: MfaVerifyRequest, db: Session = Depends(get_db)):
"""Verify the first TOTP code to complete MFA setup, then issue access token."""
username = verify_mfa_token(payload.mfa_token)
user = db.query(User).filter(User.username == username).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found.")
if not user.totp_secret_encrypted:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="TOTP setup not initiated. Call /auth/mfa/setup first.",
)
secret = decrypt_value(user.totp_secret_encrypted)
if not verify_totp(secret, payload.totp_code):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid TOTP code.",
)
user.totp_enabled = True
db.commit()
token = create_access_token(user.username)
logger.info("User %s completed MFA setup and logged in.", user.username)
return {
"access_token": token,
"token_type": "bearer",
"user": user.to_dict(),
}
@router.post("/mfa/verify")
async def mfa_verify(payload: MfaVerifyRequest, db: Session = Depends(get_db)):
"""Verify a TOTP code for users who already have MFA set up."""
username = verify_mfa_token(payload.mfa_token)
user = db.query(User).filter(User.username == username).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found.")
if not user.totp_secret_encrypted or not user.totp_enabled:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="TOTP is not set up for this user.",
)
secret = decrypt_value(user.totp_secret_encrypted)
if not verify_totp(secret, payload.totp_code):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid TOTP code.",
)
token = create_access_token(user.username)
logger.info("User %s passed MFA verification.", user.username)
return {
"access_token": token,
"token_type": "bearer",
"user": user.to_dict(),
}
@router.get("/mfa/status")
async def mfa_status(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Return MFA status for the current user and global setting."""
config = db.query(SystemConfig).filter(SystemConfig.id == 1).first()
return {
"mfa_enabled_global": bool(config and getattr(config, "mfa_enabled", False)),
"totp_enabled_user": bool(current_user.totp_enabled),
}
@router.post("/mfa/disable")
async def mfa_disable(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Disable TOTP for the current user."""
current_user.totp_enabled = False
current_user.totp_secret_encrypted = None
db.commit()
logger.info("User %s disabled their TOTP.", current_user.username)
return {"message": "TOTP disabled successfully."}
# ---------------------------------------------------------------------------
# Password change
# ---------------------------------------------------------------------------
@router.post("/logout")
async def logout(current_user: User = Depends(get_current_user)):
"""Logout (client-side token discard).
Returns:
Confirmation message.
"""
"""Logout (client-side token discard)."""
logger.info("User %s logged out.", current_user.username)
return {"message": "Logged out successfully."}
@router.get("/me")
async def get_me(current_user: User = Depends(get_current_user)):
"""Return the current authenticated user's profile.
Returns:
User dict (no password hash).
"""
"""Return the current authenticated user's profile."""
return current_user.to_dict()
@@ -77,16 +206,7 @@ async def change_password(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Change the current user's password.
Args:
payload: Current and new password.
current_user: Authenticated user.
db: Database session.
Returns:
Confirmation message.
"""
"""Change the current user's password."""
if not verify_password(payload.current_password, current_user.password_hash):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
@@ -99,6 +219,9 @@ async def change_password(
return {"message": "Password changed successfully."}
# ---------------------------------------------------------------------------
# Azure AD
# ---------------------------------------------------------------------------
class AzureCallbackRequest(BaseModel):
"""Azure AD auth code callback payload."""
code: str