Use pwdlib with Argon2 by default, adding logic (and tests) to autoupdate old passwords using Bcrypt (#2104)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Sebastián Ramírez
2026-01-22 07:24:19 -08:00
committed by GitHub
parent a0fe8a236f
commit 730c6e9ebb
8 changed files with 304 additions and 47 deletions

View File

@@ -104,7 +104,8 @@ def update_password_me(
"""
Update own password.
"""
if not verify_password(body.current_password, current_user.hashed_password):
verified, _ = verify_password(body.current_password, current_user.hashed_password)
if not verified:
raise HTTPException(status_code=400, detail="Incorrect password")
if body.current_password == body.new_password:
raise HTTPException(

View File

@@ -2,11 +2,18 @@ from datetime import datetime, timedelta, timezone
from typing import Any
import jwt
from passlib.context import CryptContext
from pwdlib import PasswordHash
from pwdlib.hashers.argon2 import Argon2Hasher
from pwdlib.hashers.bcrypt import BcryptHasher
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
password_hash = PasswordHash(
(
Argon2Hasher(),
BcryptHasher(),
)
)
ALGORITHM = "HS256"
@@ -19,9 +26,11 @@ def create_access_token(subject: str | Any, expires_delta: timedelta) -> str:
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def verify_password(
plain_password: str, hashed_password: str
) -> tuple[bool, str | None]:
return password_hash.verify_and_update(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
return password_hash.hash(password)

View File

@@ -41,8 +41,14 @@ def authenticate(*, session: Session, email: str, password: str) -> User | None:
db_user = get_user_by_email(session=session, email=email)
if not db_user:
return None
if not verify_password(password, db_user.hashed_password):
verified, updated_password_hash = verify_password(password, db_user.hashed_password)
if not verified:
return None
if updated_password_hash:
db_user.hashed_password = updated_password_hash
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user

View File

@@ -7,7 +7,6 @@ dependencies = [
"fastapi[standard]<1.0.0,>=0.114.2",
"python-multipart<1.0.0,>=0.0.7",
"email-validator<3.0.0.0,>=2.1.0.post1",
"passlib[bcrypt]<2.0.0,>=1.7.4",
"tenacity<9.0.0,>=8.2.3",
"pydantic>2.0",
"emails<1.0,>=0.6",
@@ -16,11 +15,10 @@ dependencies = [
"httpx<1.0.0,>=0.25.1",
"psycopg[binary]<4.0.0,>=3.1.13",
"sqlmodel<1.0.0,>=0.0.21",
# Pin bcrypt until passlib supports the latest
"bcrypt==4.3.0",
"pydantic-settings<3.0.0,>=2.2.1",
"sentry-sdk[fastapi]<2.0.0,>=1.40.6",
"pyjwt<3.0.0,>=2.8.0",
"pwdlib[argon2,bcrypt]>=0.3.0",
]
[dependency-groups]
@@ -29,7 +27,6 @@ dev = [
"mypy<2.0.0,>=1.8.0",
"ruff<1.0.0,>=0.2.2",
"prek>=0.2.24,<1.0.0",
"types-passlib<2.0.0.0,>=1.7.7.20240106",
"coverage<8.0.0,>=7.4.3",
]

View File

@@ -1,12 +1,13 @@
from unittest.mock import patch
from fastapi.testclient import TestClient
from pwdlib.hashers.bcrypt import BcryptHasher
from sqlmodel import Session
from app.core.config import settings
from app.core.security import verify_password
from app.core.security import get_password_hash, verify_password
from app.crud import create_user
from app.models import UserCreate
from app.models import User, UserCreate
from app.utils import generate_password_reset_token
from tests.utils.user import user_authentication_headers
from tests.utils.utils import random_email, random_lower_string
@@ -99,7 +100,8 @@ def test_reset_password(client: TestClient, db: Session) -> None:
assert r.json() == {"message": "Password updated successfully"}
db.refresh(user)
assert verify_password(new_password, user.hashed_password)
verified, _ = verify_password(new_password, user.hashed_password)
assert verified
def test_reset_password_invalid_token(
@@ -116,3 +118,68 @@ def test_reset_password_invalid_token(
assert "detail" in response
assert r.status_code == 400
assert response["detail"] == "Invalid token"
def test_login_with_bcrypt_password_upgrades_to_argon2(
client: TestClient, db: Session
) -> None:
"""Test that logging in with a bcrypt password hash upgrades it to argon2."""
email = random_email()
password = random_lower_string()
# Create a bcrypt hash directly (simulating legacy password)
bcrypt_hasher = BcryptHasher()
bcrypt_hash = bcrypt_hasher.hash(password)
assert bcrypt_hash.startswith("$2") # bcrypt hashes start with $2
user = User(email=email, hashed_password=bcrypt_hash, is_active=True)
db.add(user)
db.commit()
db.refresh(user)
assert user.hashed_password.startswith("$2")
login_data = {"username": email, "password": password}
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
assert r.status_code == 200
tokens = r.json()
assert "access_token" in tokens
db.refresh(user)
# Verify the hash was upgraded to argon2
assert user.hashed_password.startswith("$argon2")
verified, updated_hash = verify_password(password, user.hashed_password)
assert verified
# Should not need another update since it's already argon2
assert updated_hash is None
def test_login_with_argon2_password_keeps_hash(client: TestClient, db: Session) -> None:
"""Test that logging in with an argon2 password hash does not update it."""
email = random_email()
password = random_lower_string()
# Create an argon2 hash (current default)
argon2_hash = get_password_hash(password)
assert argon2_hash.startswith("$argon2")
# Create user with argon2 hash
user = User(email=email, hashed_password=argon2_hash, is_active=True)
db.add(user)
db.commit()
db.refresh(user)
original_hash = user.hashed_password
login_data = {"username": email, "password": password}
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
assert r.status_code == 200
tokens = r.json()
assert "access_token" in tokens
db.refresh(user)
assert user.hashed_password == original_hash
assert user.hashed_password.startswith("$argon2")

View File

@@ -242,7 +242,8 @@ def test_update_password_me(
user_db = db.exec(user_query).first()
assert user_db
assert user_db.email == settings.FIRST_SUPERUSER
assert verify_password(new_password, user_db.hashed_password)
verified, _ = verify_password(new_password, user_db.hashed_password)
assert verified
# Revert to the old password to keep consistency in test
old_data = {
@@ -257,7 +258,10 @@ def test_update_password_me(
db.refresh(user_db)
assert r.status_code == 200
assert verify_password(settings.FIRST_SUPERUSER_PASSWORD, user_db.hashed_password)
verified, _ = verify_password(
settings.FIRST_SUPERUSER_PASSWORD, user_db.hashed_password
)
assert verified
def test_update_password_me_incorrect_password(
@@ -331,7 +335,8 @@ def test_register_user(client: TestClient, db: Session) -> None:
assert user_db
assert user_db.email == username
assert user_db.full_name == full_name
assert verify_password(password, user_db.hashed_password)
verified, _ = verify_password(password, user_db.hashed_password)
assert verified
def test_register_user_already_exists_error(client: TestClient) -> None:

View File

@@ -1,4 +1,5 @@
from fastapi.encoders import jsonable_encoder
from pwdlib.hashers.bcrypt import BcryptHasher
from sqlmodel import Session
from app import crud
@@ -88,4 +89,42 @@ def test_update_user(db: Session) -> None:
user_2 = db.get(User, user.id)
assert user_2
assert user.email == user_2.email
assert verify_password(new_password, user_2.hashed_password)
verified, _ = verify_password(new_password, user_2.hashed_password)
assert verified
def test_authenticate_user_with_bcrypt_upgrades_to_argon2(db: Session) -> None:
"""Test that a user with bcrypt password hash gets upgraded to argon2 on login."""
email = random_email()
password = random_lower_string()
# Create a bcrypt hash directly (simulating legacy password)
bcrypt_hasher = BcryptHasher()
bcrypt_hash = bcrypt_hasher.hash(password)
assert bcrypt_hash.startswith("$2") # bcrypt hashes start with $2
# Create user with bcrypt hash directly in the database
user = User(email=email, hashed_password=bcrypt_hash)
db.add(user)
db.commit()
db.refresh(user)
# Verify the hash is bcrypt before authentication
assert user.hashed_password.startswith("$2")
# Authenticate - this should upgrade the hash to argon2
authenticated_user = crud.authenticate(session=db, email=email, password=password)
assert authenticated_user
assert authenticated_user.email == email
db.refresh(authenticated_user)
# Verify the hash was upgraded to argon2
assert authenticated_user.hashed_password.startswith("$argon2")
verified, updated_hash = verify_password(
password, authenticated_user.hashed_password
)
assert verified
# Should not need another update since it's already argon2
assert updated_hash is None