diff --git a/README.md b/README.md index ca2531e..6da25b8 100644 --- a/README.md +++ b/README.md @@ -34,3 +34,11 @@ Tests are available with the following command: ``` pytest --cov --cov-report term-missing ``` + +### linting + +Please use the linter before submitting code. + +``` +black . +``` diff --git a/requirements.txt b/requirements.txt index c2e7a50..94b3771 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ attrs==22.2.0 bcrypt==4.0.1 bitmask @ git+https://github.com/dogeystamp/bitmask@8524113fcdc22a570bda77d440374f5f269fdb79 +black==23.3.0 click==8.1.3 coverage==7.2.1 exceptiongroup==1.1.0 @@ -18,7 +19,10 @@ Jinja2==3.1.2 MarkupSafe==2.1.2 marshmallow==3.19.0 marshmallow-sqlalchemy==0.29.0 +mypy-extensions==1.0.0 packaging==23.0 +pathspec==0.11.1 +platformdirs==3.2.0 pluggy==1.0.0 PyJWT==2.6.0 pytest==7.2.2 diff --git a/sachet/server/__init__.py b/sachet/server/__init__.py index e323c0c..27023af 100644 --- a/sachet/server/__init__.py +++ b/sachet/server/__init__.py @@ -24,6 +24,7 @@ ma = Marshmallow() import sachet.server.commands from sachet.server.users.views import users_blueprint + app.register_blueprint(users_blueprint) with app.app_context(): diff --git a/sachet/server/commands.py b/sachet/server/commands.py index e8f7fb2..a079f72 100644 --- a/sachet/server/commands.py +++ b/sachet/server/commands.py @@ -8,29 +8,45 @@ from bitmask import Bitmask db_cli = AppGroup("db") + @db_cli.command("create") def create_db(): """Create all db tables.""" db.create_all() + @db_cli.command("drop") -@click.option('--yes', is_flag=True, expose_value=False, prompt="Are you sure you want to drop all tables?") +@click.option( + "--yes", + is_flag=True, + expose_value=False, + prompt="Are you sure you want to drop all tables?", +) def drop_db(): """Drop all db tables.""" db.drop_all() + app.cli.add_command(db_cli) user_cli = AppGroup("user") + @user_cli.command("create") -@click.option("--admin", default=False, prompt="Set this user as administrator?", help="Set this user an administrator.") +@click.option( + "--admin", + default=False, + prompt="Set this user as administrator?", + help="Set this user an administrator.", +) @click.option("--username", prompt="Username", help="Sets the username.") -@click.option("--password", - prompt="Password", - hide_input=True, - help="Sets the user's password (for security, avoid setting this from the command line).") +@click.option( + "--password", + prompt="Password", + hide_input=True, + help="Sets the user's password (for security, avoid setting this from the command line).", +) def create_user(admin, username, password): """Create a user directly in the database.""" perms = Bitmask() @@ -38,10 +54,17 @@ def create_user(admin, username, password): perms.add(Permissions.ADMIN) manage.create_user(perms, username, password) + @user_cli.command("delete") @click.argument("username") -@click.option('--yes', is_flag=True, expose_value=False, prompt=f"Are you sure you want to delete this user?") +@click.option( + "--yes", + is_flag=True, + expose_value=False, + prompt=f"Are you sure you want to delete this user?", +) def delete_user(username): manage.delete_user_by_username(username) + app.cli.add_command(user_cli) diff --git a/sachet/server/config.py b/sachet/server/config.py index 3c0bb8e..bb72653 100644 --- a/sachet/server/config.py +++ b/sachet/server/config.py @@ -3,22 +3,27 @@ import yaml sqlalchemy_base = "sqlite:///sachet" + class BaseConfig: SQLALCHEMY_DATABASE_URI = sqlalchemy_base + ".db" BCRYPT_LOG_ROUNDS = 13 SQLALCHEMY_TRACK_MODIFICATIONS = False + class TestingConfig(BaseConfig): SQLALCHEMY_DATABASE_URI = sqlalchemy_base + "_test" + ".db" BCRYPT_LOG_ROUNDS = 4 + class DevelopmentConfig(BaseConfig): SQLALCHEMY_DATABASE_URI = sqlalchemy_base + "_dev" + ".db" BCRYPT_LOG_ROUNDS = 4 + class ProductionConfig(BaseConfig): pass + def overlay_config(base, config_file=None): """Reading from a YAML file, this overrides configuration options from the bases above.""" config_locations = [config_file, "/etc/sachet/config.yml", "./config.yml"] @@ -33,7 +38,9 @@ def overlay_config(base, config_file=None): break if config_path == "": - raise FileNotFoundError("Please create a configuration: copy config.yml.example to config.yml.") + raise FileNotFoundError( + "Please create a configuration: copy config.yml.example to config.yml." + ) config = yaml.safe_load(open(config_path)) @@ -41,6 +48,7 @@ def overlay_config(base, config_file=None): raise ValueError("Please set secret_key within the configuration.") from sachet.server import app + app.config.from_object(base) for k, v in config.items(): diff --git a/sachet/server/models.py b/sachet/server/models.py index b5a495d..a8a02d1 100644 --- a/sachet/server/models.py +++ b/sachet/server/models.py @@ -10,11 +10,11 @@ from enum import IntFlag class Permissions(IntFlag): CREATE = 1 - MODIFY = 1<<1 - DELETE = 1<<2 - LOCK = 1<<3 - LIST = 1<<4 - ADMIN = 1<<5 + MODIFY = 1 << 1 + DELETE = 1 << 2 + LOCK = 1 << 3 + LIST = 1 << 4 + ADMIN = 1 << 5 def patch(orig, diff): @@ -25,13 +25,13 @@ def patch(orig, diff): return diff # deep copy - new = {k:v for k, v in orig.items()} + new = {k: v for k, v in orig.items()} for key, value in diff.items(): new[key] = patch(orig.get(key, {}), diff[key]) return new - + class User(db.Model): __tablename__ = "users" @@ -48,7 +48,7 @@ class User(db.Model): Bitmask listing all permissions. See the Permissions class for all possible permissions. - + Also, see https://github.com/dogeystamp/bitmask for information on how to use this field. """ @@ -66,31 +66,25 @@ class User(db.Model): self.permissions_number = mask.value db.session.commit() - def __init__(self, username, password, permissions): permissions.AllFlags = Permissions self.permissions = permissions self.password = bcrypt.generate_password_hash( - password, app.config.get("BCRYPT_LOG_ROUNDS") + password, app.config.get("BCRYPT_LOG_ROUNDS") ).decode() self.username = username self.register_date = datetime.datetime.now() - def encode_token(self, jti=None): """Generates an authentication token""" payload = { "exp": datetime.datetime.utcnow() + datetime.timedelta(days=7), "iat": datetime.datetime.utcnow(), "sub": self.username, - "jti": jti + "jti": jti, } - return jwt.encode( - payload, - app.config.get("SECRET_KEY"), - algorithm="HS256" - ) + return jwt.encode(payload, app.config.get("SECRET_KEY"), algorithm="HS256") class PermissionField(fields.Field): @@ -133,9 +127,9 @@ class BlacklistToken(db.Model): """ __tablename__ = "blacklist_tokens" - + id = db.Column(db.Integer, primary_key=True, autoincrement=True) - token = db.Column(db.String(500), unique=True, nullable=False) + token = db.Column(db.String(500), unique=True, nullable=False) expires = db.Column(db.DateTime, nullable=False) def __init__(self, token): @@ -190,6 +184,7 @@ def auth_required(f): Passes an argument 'user' to the function, with a User object corresponding to the authenticated session. """ + @wraps(f) def decorator(*args, **kwargs): token = None @@ -198,10 +193,7 @@ def auth_required(f): try: token = auth_header.split(" ")[1] except IndexError: - resp = { - "status": "fail", - "message": "Malformed Authorization header." - } + resp = {"status": "fail", "message": "Malformed Authorization header."} return jsonify(resp), 401 if not token: diff --git a/sachet/server/users/manage.py b/sachet/server/users/manage.py index 8985381..6efa581 100644 --- a/sachet/server/users/manage.py +++ b/sachet/server/users/manage.py @@ -1,6 +1,7 @@ from sachet.server import app, db from sachet.server.models import User + def create_user(permissions, username, password): # to reduce confusion with API endpoints forbidden = {"login", "logout", "extend"} @@ -10,16 +11,13 @@ def create_user(permissions, username, password): user = User.query.filter_by(username=username).first() if not user: - user = User( - username=username, - password=password, - permissions=permissions - ) + user = User(username=username, password=password, permissions=permissions) db.session.add(user) db.session.commit() else: raise KeyError(f"User '{username}' already exists.") + def delete_user_by_username(username): user = User.query.filter_by(username=username).first() diff --git a/sachet/server/users/views.py b/sachet/server/users/views.py index a500e39..8665d06 100644 --- a/sachet/server/users/views.py +++ b/sachet/server/users/views.py @@ -1,7 +1,15 @@ import jwt from flask import Blueprint, request, jsonify from flask.views import MethodView -from sachet.server.models import auth_required, read_token, patch, Permissions, User, UserSchema, BlacklistToken +from sachet.server.models import ( + auth_required, + read_token, + patch, + Permissions, + User, + UserSchema, + BlacklistToken, +) from sachet.server import bcrypt, db from marshmallow import ValidationError @@ -9,26 +17,22 @@ user_schema = UserSchema() users_blueprint = Blueprint("users_blueprint", __name__) + class LoginAPI(MethodView): def post(self): post_data = request.get_json() user = User.query.filter_by(username=post_data.get("username")).first() if not user: - resp = { - "status": "fail", - "message": "Invalid credentials." - } + resp = {"status": "fail", "message": "Invalid credentials."} return jsonify(resp), 401 - if bcrypt.check_password_hash( - user.password, post_data.get("password", "") - ): + if bcrypt.check_password_hash(user.password, post_data.get("password", "")): token = user.encode_token() resp = { "status": "success", "message": "Logged in.", "username": user.username, - "auth_token": token + "auth_token": token, } return jsonify(resp), 200 else: @@ -40,9 +44,7 @@ class LoginAPI(MethodView): users_blueprint.add_url_rule( - "/users/login", - view_func=LoginAPI.as_view("login_api"), - methods=['POST'] + "/users/login", view_func=LoginAPI.as_view("login_api"), methods=["POST"] ) @@ -54,7 +56,10 @@ class LogoutAPI(MethodView): post_data = request.get_json() token = post_data.get("token") if not token: - return jsonify({"status": "fail", "message": "Specify a token to revoke."}), 400 + return ( + jsonify({"status": "fail", "message": "Specify a token to revoke."}), + 400, + ) res = BlacklistToken.check_blacklist(token) if res: @@ -73,13 +78,19 @@ class LogoutAPI(MethodView): db.session.commit() return jsonify({"status": "success", "message": "Token revoked."}), 200 else: - return jsonify({"status": "fail", "message": "You are not allowed to revoke this token."}), 403 + return ( + jsonify( + { + "status": "fail", + "message": "You are not allowed to revoke this token.", + } + ), + 403, + ) users_blueprint.add_url_rule( - "/users/logout", - view_func=LogoutAPI.as_view("logout_api"), - methods=['POST'] + "/users/logout", view_func=LogoutAPI.as_view("logout_api"), methods=["POST"] ) @@ -93,27 +104,28 @@ class ExtendAPI(MethodView): "status": "success", "message": "Renewed token.", "username": user.username, - "auth_token": token + "auth_token": token, } return jsonify(resp), 200 users_blueprint.add_url_rule( - "/users/extend", - view_func=ExtendAPI.as_view("extend_api"), - methods=['POST'] + "/users/extend", view_func=ExtendAPI.as_view("extend_api"), methods=["POST"] ) class UserAPI(MethodView): """User information API""" + @auth_required def get(user, self, username): info_user = User.query.filter_by(username=username).first() - if (not info_user) or (info_user != user and Permissions.ADMIN not in user.permissions): + if (not info_user) or ( + info_user != user and Permissions.ADMIN not in user.permissions + ): resp = { "status": "fail", - "message": "You are not authorized to view this page." + "message": "You are not authorized to view this page.", } return jsonify(resp), 403 @@ -126,22 +138,19 @@ class UserAPI(MethodView): if not patch_user or Permissions.ADMIN not in user.permissions: resp = { "status": "fail", - "message": "You are not authorized to access this page." + "message": "You are not authorized to access this page.", } return jsonify(resp), 403 patch_json = request.get_json() orig_json = user_schema.dump(patch_user) - + new_json = patch(orig_json, patch_json) try: deserialized = user_schema.load(new_json) except ValidationError as e: - resp = { - "status": "fail", - "message": f"Invalid patch: {str(e)}" - } + resp = {"status": "fail", "message": f"Invalid patch: {str(e)}"} return jsonify(resp), 400 for k, v in deserialized.items(): @@ -159,7 +168,7 @@ class UserAPI(MethodView): if not put_user or Permissions.ADMIN not in user.permissions: resp = { "status": "fail", - "message": "You are not authorized to access this page." + "message": "You are not authorized to access this page.", } return jsonify(resp), 403 @@ -168,10 +177,7 @@ class UserAPI(MethodView): try: deserialized = user_schema.load(new_json) except ValidationError as e: - resp = { - "status": "fail", - "message": f"Invalid data: {str(e)}" - } + resp = {"status": "fail", "message": f"Invalid data: {str(e)}"} return jsonify(resp), 400 for k, v in deserialized.items(): @@ -182,8 +188,9 @@ class UserAPI(MethodView): } return jsonify(resp), 200 + users_blueprint.add_url_rule( "/users/", view_func=UserAPI.as_view("user_api"), - methods=['GET', 'PATCH', 'PUT'] + methods=["GET", "PATCH", "PUT"], ) diff --git a/tests/conftest.py b/tests/conftest.py index a19454f..8175a4f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,7 @@ from bitmask import Bitmask user_schema = UserSchema() + @pytest.fixture def client(): """Flask application with DB already set up and ready.""" @@ -19,7 +20,7 @@ def client(): yield client db.session.remove() db.drop_all() - + @pytest.fixture def flask_app_bare(): @@ -37,23 +38,13 @@ def users(client): Returns a dictionary with all the info for each user. """ userinfo = dict( - jeff = dict( - password = "1234", - permissions = Bitmask() - ), - administrator = dict( - password = "4321", - permissions = Bitmask(Permissions.ADMIN) - ), - ) + jeff=dict(password="1234", permissions=Bitmask()), + administrator=dict(password="4321", permissions=Bitmask(Permissions.ADMIN)), + ) for user, info in userinfo.items(): info["username"] = user - manage.create_user( - info["permissions"], - info["username"], - info["password"] - ) + manage.create_user(info["permissions"], info["username"], info["password"]) return userinfo @@ -86,10 +77,10 @@ def tokens(client, users): toks = {} for user, creds in users.items(): - resp = client.post("/users/login", json={ - "username": creds["username"], - "password": creds["password"] - }) + resp = client.post( + "/users/login", + json={"username": creds["username"], "password": creds["password"]}, + ) resp_json = resp.get_json() token = resp_json.get("auth_token") assert token is not None and token != "" diff --git a/tests/test_auth.py b/tests/test_auth.py index 914183d..cb39900 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -3,81 +3,64 @@ import jwt from sachet.server import db from sachet.server.users import manage + def test_reserved_users(client): """Test that the server prevents reserved endpoints from being registered as usernames.""" for user in ["login", "logout", "extend"]: with pytest.raises(KeyError): manage.create_user(False, user, "") + def test_unauth_perms(client): """Test endpoints to see if they allow unauthenticated users.""" resp = client.get("/users/jeff") assert resp.status_code == 401 + def test_malformed_authorization(client): """Test attempting authorization incorrectly.""" # incorrect token token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c" - resp = client.get( - "/users/jeff", - headers={ - "Authorization": f"bearer {token}" - } - ) + resp = client.get("/users/jeff", headers={"Authorization": f"bearer {token}"}) assert resp.status_code == 401 # token for incorrect user (but properly signed) token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.nZ86hUWPdG43W6HVSGFy6DJnDVOZhx8a73LhQ3gIxY8" - resp = client.get( - "/users/jeff", - headers={ - "Authorization": f"bearer {token}" - } - ) + resp = client.get("/users/jeff", headers={"Authorization": f"bearer {token}"}) assert resp.status_code == 401 # invalid token token = "not a.real JWT.token" - resp = client.get( - "/users/jeff", - headers={ - "Authorization": f"bearer {token}" - } - ) + resp = client.get("/users/jeff", headers={"Authorization": f"bearer {token}"}) assert resp.status_code == 401 # missing token - resp = client.get( - "/users/jeff", - headers={ - "Authorization": "bearer" - } - ) + resp = client.get("/users/jeff", headers={"Authorization": "bearer"}) assert resp.status_code == 401 + def test_login(client, users): """Test logging in.""" # wrong password - resp = client.post("/users/login", json={ - "username": "jeff", - "password": users["jeff"]["password"] + "garbage" - }) + resp = client.post( + "/users/login", + json={"username": "jeff", "password": users["jeff"]["password"] + "garbage"}, + ) assert resp.status_code == 401 # wrong user - resp = client.post("/users/login", json={ - "username": "jeffery", - "password": users["jeff"]["password"] + "garbage" - }) + resp = client.post( + "/users/login", + json={"username": "jeffery", "password": users["jeff"]["password"] + "garbage"}, + ) assert resp.status_code == 401 # logging in correctly - resp = client.post("/users/login", json={ - "username": "jeff", - "password": users["jeff"]["password"] - }) + resp = client.post( + "/users/login", json={"username": "jeff", "password": users["jeff"]["password"]} + ) assert resp.status_code == 200 resp_json = resp.get_json() assert resp_json.get("status") == "success" @@ -90,10 +73,8 @@ def test_extend(client, tokens, validate_info): """Test extending the token lifespan (get a new one with later expiry).""" # obtain new token - resp = client.post("/users/extend", - headers={ - "Authorization": f"Bearer {tokens['jeff']}" - } + resp = client.post( + "/users/extend", headers={"Authorization": f"Bearer {tokens['jeff']}"} ) assert resp.status_code == 200 resp_json = resp.get_json() @@ -103,20 +84,14 @@ def test_extend(client, tokens, validate_info): # revoke old token - resp = client.post("/users/logout", json={ - "token": tokens["jeff"] - }, - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + resp = client.post( + "/users/logout", + json={"token": tokens["jeff"]}, + headers={"Authorization": f"bearer {tokens['jeff']}"}, ) # log in with the new token - resp = client.get("/users/jeff", - headers={ - "Authorization": f"Bearer {new_token}" - } - ) + resp = client.get("/users/jeff", headers={"Authorization": f"Bearer {new_token}"}) assert resp.status_code == 200 resp_json = resp.get_json() validate_info("jeff", resp_json) @@ -126,98 +101,79 @@ def test_logout(client, tokens, validate_info): """Test logging out.""" # unauthenticated - resp = client.post("/users/logout", json={ - "token": tokens["jeff"] - }, + resp = client.post( + "/users/logout", + json={"token": tokens["jeff"]}, ) assert resp.status_code == 401 # missing token - resp = client.post("/users/logout", json={ - - }, - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + resp = client.post( + "/users/logout", json={}, headers={"Authorization": f"bearer {tokens['jeff']}"} ) assert resp.status_code == 400 # invalid token - resp = client.post("/users/logout", json={ - "token": "not.real.jwt" - }, - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + resp = client.post( + "/users/logout", + json={"token": "not.real.jwt"}, + headers={"Authorization": f"bearer {tokens['jeff']}"}, ) assert resp.status_code == 400 # wrong user's token - resp = client.post("/users/logout", json={ - "token": tokens["administrator"] - }, - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + resp = client.post( + "/users/logout", + json={"token": tokens["administrator"]}, + headers={"Authorization": f"bearer {tokens['jeff']}"}, ) assert resp.status_code == 403 # check that we can access this endpoint before logging out - resp = client.get("/users/jeff", - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + resp = client.get( + "/users/jeff", headers={"Authorization": f"bearer {tokens['jeff']}"} ) assert resp.status_code == 200 validate_info("jeff", resp.get_json()) # valid logout - resp = client.post("/users/logout", json={ - "token": tokens["jeff"] - }, - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + resp = client.post( + "/users/logout", + json={"token": tokens["jeff"]}, + headers={"Authorization": f"bearer {tokens['jeff']}"}, ) assert resp.status_code == 200 # check that the logout worked - resp = client.get("/users/jeff", - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + resp = client.get( + "/users/jeff", headers={"Authorization": f"bearer {tokens['jeff']}"} ) assert resp.status_code == 401 + def test_admin_revoke(client, tokens, validate_info): """Test that an admin can revoke any token from other users.""" - resp = client.post("/users/logout", json={ - "token": tokens["jeff"] - }, - headers={ - "Authorization": f"bearer {tokens['administrator']}" - } + resp = client.post( + "/users/logout", + json={"token": tokens["jeff"]}, + headers={"Authorization": f"bearer {tokens['administrator']}"}, ) assert resp.status_code == 200 # check that the logout worked - resp = client.get("/users/jeff", - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + resp = client.get( + "/users/jeff", headers={"Authorization": f"bearer {tokens['jeff']}"} ) assert resp.status_code == 401 # try revoking twice - resp = client.post("/users/logout", json={ - "token": tokens["jeff"] - }, - headers={ - "Authorization": f"bearer {tokens['administrator']}" - } + resp = client.post( + "/users/logout", + json={"token": tokens["jeff"]}, + headers={"Authorization": f"bearer {tokens['administrator']}"}, ) assert resp.status_code == 400 diff --git a/tests/test_cli.py b/tests/test_cli.py index 0be7a0f..65243ac 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -5,6 +5,7 @@ from sqlalchemy import inspect from sachet.server.models import User + def test_db(flask_app_bare, cli): """Test the CLI's ability to create and drop the DB.""" # make tables diff --git a/tests/test_models.py b/tests/test_models.py index 3686d4a..9847fa6 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,74 +1,27 @@ from sachet.server.models import patch + def test_patch(): """Tests sachet/server/models.py's patch() method for dicts.""" - assert patch( - dict(), - dict() - ) == dict() + assert patch(dict(), dict()) == dict() - assert patch( - dict(key="value"), - dict() - ) == dict(key="value") + assert patch(dict(key="value"), dict()) == dict(key="value") - assert patch( - dict(key="value"), - dict(key="newvalue") - ) == dict(key="newvalue") + assert patch(dict(key="value"), dict(key="newvalue")) == dict(key="newvalue") - assert patch( - dict(key="value"), - dict(key="newvalue") - ) == dict(key="newvalue") + assert patch(dict(key="value"), dict(key="newvalue")) == dict(key="newvalue") - assert patch( - dict(key="value"), - dict(key2="other_value") - ) == dict( - key="value", - key2="other_value" + assert patch(dict(key="value"), dict(key2="other_value")) == dict( + key="value", key2="other_value" ) assert patch( - dict( - nest = dict( - key="value", - key2="other_value" - ) - ), - dict( - top_key="newvalue", - nest = dict( - key2 = "new_other_value" - ) - ) - ) == dict( - top_key="newvalue", - nest = dict( - key="value", - key2="new_other_value" - ) - ) + dict(nest=dict(key="value", key2="other_value")), + dict(top_key="newvalue", nest=dict(key2="new_other_value")), + ) == dict(top_key="newvalue", nest=dict(key="value", key2="new_other_value")) assert patch( - dict( - nest = dict( - key="value", - list=[1, 2, 3, 4, 5] - ) - ), - dict( - top_key="newvalue", - nest = dict( - list = [3, 1, 4, 1, 5] - ) - ) - ) == dict( - top_key="newvalue", - nest = dict( - key="value", - list=[3, 1, 4, 1, 5] - ) - ) + dict(nest=dict(key="value", list=[1, 2, 3, 4, 5])), + dict(top_key="newvalue", nest=dict(list=[3, 1, 4, 1, 5])), + ) == dict(top_key="newvalue", nest=dict(key="value", list=[3, 1, 4, 1, 5])) diff --git a/tests/test_userinfo.py b/tests/test_userinfo.py index 343cd61..bd9a9a8 100644 --- a/tests/test_userinfo.py +++ b/tests/test_userinfo.py @@ -5,81 +5,66 @@ from datetime import datetime user_schema = UserSchema() + def test_get(client, tokens, validate_info): """Test accessing the user information endpoint as a normal user.""" # access user info endpoint resp = client.get( - "/users/jeff", - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + "/users/jeff", headers={"Authorization": f"bearer {tokens['jeff']}"} ) assert resp.status_code == 200 validate_info("jeff", resp.get_json()) - # access other user's info endpoint resp = client.get( - "/users/administrator", - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + "/users/administrator", headers={"Authorization": f"bearer {tokens['jeff']}"} ) assert resp.status_code == 403 + def test_userinfo_admin(client, tokens, validate_info): """Test accessing other user's information as an admin.""" # first test that admin can access its own info resp = client.get( "/users/administrator", - headers={ - "Authorization": f"bearer {tokens['administrator']}" - } + headers={"Authorization": f"bearer {tokens['administrator']}"}, ) assert resp.status_code == 200 validate_info("administrator", resp.get_json()) # now test accessing other user's info resp = client.get( - "/users/jeff", - headers={ - "Authorization": f"bearer {tokens['administrator']}" - } + "/users/jeff", headers={"Authorization": f"bearer {tokens['administrator']}"} ) assert resp.status_code == 200 validate_info("jeff", resp.get_json()) + def test_patch(client, users, tokens, validate_info): """Test modifying user information as an administrator.""" # try with regular user to make sure it doesn't work resp = client.patch( "/users/jeff", - json = { "permissions": ["ADMIN"] }, - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + json={"permissions": ["ADMIN"]}, + headers={"Authorization": f"bearer {tokens['jeff']}"}, ) assert resp.status_code == 403 # test malformed patch resp = client.patch( "/users/jeff", - json = "hurr durr", - headers={ - "Authorization": f"bearer {tokens['administrator']}" - } + json="hurr durr", + headers={"Authorization": f"bearer {tokens['administrator']}"}, ) assert resp.status_code == 400 resp = client.patch( "/users/jeff", - json = { "permissions": ["ADMIN"] }, - headers={ - "Authorization": f"bearer {tokens['administrator']}" - } + json={"permissions": ["ADMIN"]}, + headers={"Authorization": f"bearer {tokens['administrator']}"}, ) assert resp.status_code == 200 @@ -88,37 +73,31 @@ def test_patch(client, users, tokens, validate_info): # request new info resp = client.get( - "/users/jeff", - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + "/users/jeff", headers={"Authorization": f"bearer {tokens['jeff']}"} ) assert resp.status_code == 200 validate_info("jeff", resp.get_json()) + def test_put(client, users, tokens, validate_info): """Test replacing user information as an administrator.""" # try with regular user to make sure it doesn't work resp = client.patch( "/users/jeff", - json = dict(), - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + json=dict(), + headers={"Authorization": f"bearer {tokens['jeff']}"}, ) assert resp.status_code == 403 - new_data = {k:v for k, v in users["jeff"].items()} + new_data = {k: v for k, v in users["jeff"].items()} new_data["permissions"] = Bitmask(Permissions.ADMIN) - new_data["register_date"] = datetime(2022,2,2,0,0,0) + new_data["register_date"] = datetime(2022, 2, 2, 0, 0, 0) resp = client.put( "/users/jeff", - json = user_schema.dump(new_data), - headers={ - "Authorization": f"bearer {tokens['administrator']}" - } + json=user_schema.dump(new_data), + headers={"Authorization": f"bearer {tokens['administrator']}"}, ) assert resp.status_code == 200 @@ -127,10 +106,7 @@ def test_put(client, users, tokens, validate_info): # request new info resp = client.get( - "/users/jeff", - headers={ - "Authorization": f"bearer {tokens['jeff']}" - } + "/users/jeff", headers={"Authorization": f"bearer {tokens['jeff']}"} ) assert resp.status_code == 200 validate_info("jeff", resp.get_json())