/users/password: implemented endpoint

also added uuid to JTIs in JWTs to prevent issues in tests
This commit is contained in:
dogeystamp 2023-05-22 20:52:46 -04:00
parent 0bd36b9a95
commit ca0c6512fe
Signed by: dogeystamp
GPG Key ID: 7225FE3592EFFA38
3 changed files with 97 additions and 5 deletions

View File

@ -87,13 +87,17 @@ class User(db.Model):
permissions.AllFlags = Permissions permissions.AllFlags = Permissions
self.permissions = permissions self.permissions = permissions
self.password = bcrypt.generate_password_hash( self.password = self.gen_hash(password)
password, current_app.config.get("BCRYPT_LOG_ROUNDS")
).decode()
self.username = username self.username = username
self.url = url_for("users_blueprint.user_api", username=self.username) self.url = url_for("users_blueprint.user_api", username=self.username)
self.register_date = datetime.datetime.now() self.register_date = datetime.datetime.now()
def gen_hash(self, psswd):
"""Generates a hash from a password."""
return bcrypt.generate_password_hash(
psswd, current_app.config.get("BCRYPT_LOG_ROUNDS")
).decode()
def encode_token(self, jti=None): def encode_token(self, jti=None):
"""Generates an authentication token""" """Generates an authentication token"""
payload = { payload = {

View File

@ -8,6 +8,7 @@ from sachet.server.models import (
) )
from sachet.server.views_common import ModelAPI, ModelListAPI, auth_required from sachet.server.views_common import ModelAPI, ModelListAPI, auth_required
from sachet.server import bcrypt, db from sachet.server import bcrypt, db
import uuid
users_blueprint = Blueprint("users_blueprint", __name__) users_blueprint = Blueprint("users_blueprint", __name__)
@ -21,7 +22,7 @@ class LoginAPI(MethodView):
return jsonify(resp), 401 return jsonify(resp), 401
if bcrypt.check_password_hash(user.password, post_data.get("password", "")): if bcrypt.check_password_hash(user.password, post_data.get("password", "")):
token = user.encode_token() token = user.encode_token(jti=f"login_{uuid.uuid4()}")
resp = { resp = {
"status": "success", "status": "success",
"message": "Logged in.", "message": "Logged in.",
@ -88,12 +89,58 @@ users_blueprint.add_url_rule(
) )
class PasswordAPI(MethodView):
"""Endpoint to change passwords."""
@auth_required
def post(self, auth_user=None):
post_data = request.get_json()
old_psswd = post_data.get("old")
new_psswd = post_data.get("new")
if not old_psswd or not new_psswd:
return (
jsonify(
{
"status": "fail",
"message": "Specify the 'old' password and the 'new' password.",
}
),
400,
)
if not bcrypt.check_password_hash(auth_user.password, old_psswd):
return (
jsonify(
{
"status": "fail",
"message": "Invalid 'old' password.",
}
),
400,
)
else:
auth_user.password = auth_user.gen_hash(new_psswd)
db.session.commit()
return jsonify(
{
"status": "success",
"message": "Password changed.",
}
)
users_blueprint.add_url_rule(
"/users/password", view_func=PasswordAPI.as_view("password_api"), methods=["POST"]
)
class ExtendAPI(MethodView): class ExtendAPI(MethodView):
"""Endpoint to take a token and get a new one with a later expiry date.""" """Endpoint to take a token and get a new one with a later expiry date."""
@auth_required @auth_required
def post(self, auth_user=None): def post(self, auth_user=None):
token = auth_user.encode_token(jti="renew") token = auth_user.encode_token(jti=f"renew{uuid.uuid4()}")
resp = { resp = {
"status": "success", "status": "success",
"message": "Renewed token.", "message": "Renewed token.",

View File

@ -137,6 +137,47 @@ def test_logout(client, tokens, validate_info, auth):
assert resp.status_code == 401 assert resp.status_code == 401
def test_password_change(client, tokens, users, auth):
"""Test changing passwords."""
# test that we're logged in
resp = client.get("/users/jeff", headers=auth("jeff"))
assert resp.status_code == 200
# change password
resp = client.post(
"/users/password",
json=dict(old=users["jeff"]["password"], new="new_password"),
headers=auth("jeff"),
)
assert resp.status_code == 200
# revoke old token
resp = client.post(
"/users/logout", json=dict(token=tokens["jeff"]), headers=auth("jeff")
)
assert resp.status_code == 200
# test that we're logged out
resp = client.get(
"/users/jeff", headers=auth("jeff"), json=dict(token=tokens["jeff"])
)
assert resp.status_code == 401
# sign in with new token
resp = client.post(
"/users/login", json=dict(username="jeff", password="new_password")
)
assert resp.status_code == 200
data = resp.get_json()
new_token = data.get("auth_token")
assert new_token
# test that we're logged in
resp = client.get("/users/jeff", headers=dict(Authorization=f"bearer {new_token}"))
assert resp.status_code == 200
def test_admin_revoke(client, tokens, validate_info, auth): def test_admin_revoke(client, tokens, validate_info, auth):
"""Test that an admin can revoke any token from other users.""" """Test that an admin can revoke any token from other users."""