tests: refactor authentication header
This commit is contained in:
parent
831017092a
commit
518898d554
@ -122,3 +122,27 @@ def tokens(client, users):
|
||||
def cli():
|
||||
"""click's testing fixture"""
|
||||
return CliRunner()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def auth(tokens):
|
||||
"""Generate auth headers.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
username : str
|
||||
Username to authenticate as.
|
||||
data : dict
|
||||
Extra headers to add.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
Dictionary of all headers.
|
||||
"""
|
||||
def auth_headers(username, data={}):
|
||||
ret = {"Authorization": f"bearer {tokens[username]}"}
|
||||
ret.update(data)
|
||||
return ret
|
||||
|
||||
return auth_headers
|
||||
|
@ -69,12 +69,12 @@ def test_login(client, users):
|
||||
assert token is not None and token != ""
|
||||
|
||||
|
||||
def test_extend(client, tokens, validate_info):
|
||||
def test_extend(client, tokens, validate_info, auth):
|
||||
"""Test extending the token lifespan (get a new one with later expiry)."""
|
||||
|
||||
# obtain new token
|
||||
resp = client.post(
|
||||
"/users/extend", headers={"Authorization": f"Bearer {tokens['jeff']}"}
|
||||
"/users/extend", headers=auth("jeff")
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
resp_json = resp.get_json()
|
||||
@ -87,7 +87,7 @@ def test_extend(client, tokens, validate_info):
|
||||
resp = client.post(
|
||||
"/users/logout",
|
||||
json={"token": tokens["jeff"]},
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
)
|
||||
|
||||
# log in with the new token
|
||||
@ -97,7 +97,7 @@ def test_extend(client, tokens, validate_info):
|
||||
validate_info("jeff", resp_json)
|
||||
|
||||
|
||||
def test_logout(client, tokens, validate_info):
|
||||
def test_logout(client, tokens, validate_info, auth):
|
||||
"""Test logging out."""
|
||||
|
||||
# unauthenticated
|
||||
@ -109,7 +109,7 @@ def test_logout(client, tokens, validate_info):
|
||||
|
||||
# missing token
|
||||
resp = client.post(
|
||||
"/users/logout", json={}, headers={"Authorization": f"bearer {tokens['jeff']}"}
|
||||
"/users/logout", json={}, headers=auth("jeff")
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
@ -117,7 +117,7 @@ def test_logout(client, tokens, validate_info):
|
||||
resp = client.post(
|
||||
"/users/logout",
|
||||
json={"token": "not.real.jwt"},
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
@ -125,13 +125,13 @@ def test_logout(client, tokens, validate_info):
|
||||
resp = client.post(
|
||||
"/users/logout",
|
||||
json={"token": tokens["administrator"]},
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
# check that we can access this endpoint before logging out
|
||||
resp = client.get(
|
||||
"/users/jeff", headers={"Authorization": f"bearer {tokens['jeff']}"}
|
||||
"/users/jeff", headers=auth("jeff")
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
validate_info("jeff", resp.get_json())
|
||||
@ -140,32 +140,32 @@ def test_logout(client, tokens, validate_info):
|
||||
resp = client.post(
|
||||
"/users/logout",
|
||||
json={"token": tokens["jeff"]},
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# check that the logout worked
|
||||
|
||||
resp = client.get(
|
||||
"/users/jeff", headers={"Authorization": f"bearer {tokens['jeff']}"}
|
||||
"/users/jeff", headers=auth("jeff")
|
||||
)
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
def test_admin_revoke(client, tokens, validate_info):
|
||||
def test_admin_revoke(client, tokens, validate_info, auth):
|
||||
"""Test that an admin can revoke any token from other users."""
|
||||
|
||||
resp = client.post(
|
||||
"/users/logout",
|
||||
json={"token": tokens["jeff"]},
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# check that the logout worked
|
||||
|
||||
resp = client.get(
|
||||
"/users/jeff", headers={"Authorization": f"bearer {tokens['jeff']}"}
|
||||
"/users/jeff", headers=auth("jeff")
|
||||
)
|
||||
assert resp.status_code == 401
|
||||
|
||||
@ -174,6 +174,6 @@ def test_admin_revoke(client, tokens, validate_info):
|
||||
resp = client.post(
|
||||
"/users/logout",
|
||||
json={"token": tokens["jeff"]},
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
@ -9,10 +9,10 @@ from werkzeug.datastructures import FileStorage
|
||||
# this might be redundant because test_storage tests the backends already
|
||||
@pytest.mark.parametrize("client", [{"SACHET_STORAGE": "filesystem"}], indirect=True)
|
||||
class TestSuite:
|
||||
def test_sharing(self, client, users, tokens, rand):
|
||||
def test_sharing(self, client, users, auth, rand):
|
||||
# create share
|
||||
resp = client.post(
|
||||
"/files", headers={"Authorization": f"bearer {tokens['jeff']}"}
|
||||
"/files", headers=auth("jeff")
|
||||
)
|
||||
assert resp.status_code == 201
|
||||
|
||||
@ -27,7 +27,7 @@ class TestSuite:
|
||||
# upload file to share
|
||||
resp = client.post(
|
||||
url + "/content",
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
data={
|
||||
"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")
|
||||
},
|
||||
@ -38,7 +38,7 @@ class TestSuite:
|
||||
# test not allowing re-upload
|
||||
resp = client.post(
|
||||
url + "/content",
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
data={
|
||||
"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")
|
||||
},
|
||||
@ -49,20 +49,20 @@ class TestSuite:
|
||||
# read file
|
||||
resp = client.get(
|
||||
url + "/content",
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
)
|
||||
assert resp.data == upload_data
|
||||
|
||||
# test deletion
|
||||
resp = client.delete(
|
||||
url,
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# file shouldn't exist anymore
|
||||
resp = client.get(
|
||||
url + "/content",
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
|
@ -4,19 +4,19 @@ from sachet.server.models import Permissions, ServerSettings
|
||||
server_settings_schema = ServerSettings.get_schema(ServerSettings)
|
||||
|
||||
|
||||
def test_default_perms(client, tokens):
|
||||
def test_default_perms(client, auth):
|
||||
"""Test the default permissions."""
|
||||
|
||||
# try with regular user to make sure it doesn't work
|
||||
resp = client.get(
|
||||
"/admin/settings",
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
resp = client.get(
|
||||
"/admin/settings",
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
@ -25,14 +25,14 @@ def test_default_perms(client, tokens):
|
||||
)
|
||||
|
||||
|
||||
def test_patch_perms(client, tokens):
|
||||
def test_patch_perms(client, auth):
|
||||
"""Test the PATCH endpoint for default server permissions."""
|
||||
|
||||
# try with regular user to make sure it doesn't work
|
||||
resp = client.patch(
|
||||
"/admin/settings",
|
||||
json={"default_permissions": ["ADMIN"]},
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
@ -40,21 +40,21 @@ def test_patch_perms(client, tokens):
|
||||
resp = client.patch(
|
||||
"/admin/settings",
|
||||
json="hurr durr",
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
resp = client.patch(
|
||||
"/admin/settings",
|
||||
json={"default_permissions": ["ADMIN"]},
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# request new info
|
||||
resp = client.get(
|
||||
"/admin/settings",
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
@ -63,14 +63,14 @@ def test_patch_perms(client, tokens):
|
||||
)
|
||||
|
||||
|
||||
def test_put_perms(client, tokens):
|
||||
def test_put_perms(client, auth):
|
||||
"""Test the PUT endpoint for default server permissions."""
|
||||
|
||||
# try with regular user to make sure it doesn't work
|
||||
resp = client.put(
|
||||
"/admin/settings",
|
||||
json={"default_permissions": ["ADMIN"]},
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
@ -78,14 +78,14 @@ def test_put_perms(client, tokens):
|
||||
resp = client.put(
|
||||
"/admin/settings",
|
||||
json="hurr durr",
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
# request current info (that we'll modify before putting back)
|
||||
resp = client.get(
|
||||
"/admin/settings",
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
@ -95,14 +95,14 @@ def test_put_perms(client, tokens):
|
||||
resp = client.put(
|
||||
"/admin/settings",
|
||||
json=data,
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# request new info
|
||||
resp = client.get(
|
||||
"/admin/settings",
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
|
@ -6,50 +6,50 @@ from datetime import datetime
|
||||
user_schema = User.get_schema(User)
|
||||
|
||||
|
||||
def test_get(client, tokens, validate_info):
|
||||
def test_get(client, auth, validate_info):
|
||||
"""Test accessing the user information endpoint as a normal user."""
|
||||
|
||||
# access user info endpoint
|
||||
resp = client.get(
|
||||
"/users/jeff", headers={"Authorization": f"bearer {tokens['jeff']}"}
|
||||
"/users/jeff", headers=auth("jeff")
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
validate_info("jeff", resp.get_json())
|
||||
|
||||
# access other user's info endpoint
|
||||
resp = client.get(
|
||||
"/users/administrator", headers={"Authorization": f"bearer {tokens['jeff']}"}
|
||||
"/users/administrator", headers=auth("jeff")
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
def test_userinfo_admin(client, tokens, validate_info):
|
||||
def test_userinfo_admin(client, auth, validate_info):
|
||||
"""Test accessing other user's information as an admin."""
|
||||
|
||||
# first test that admin can access its own info
|
||||
resp = client.get(
|
||||
"/users/administrator",
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
validate_info("administrator", resp.get_json())
|
||||
|
||||
# now test accessing other user's info
|
||||
resp = client.get(
|
||||
"/users/jeff", headers={"Authorization": f"bearer {tokens['administrator']}"}
|
||||
"/users/jeff", headers=auth("administrator")
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
validate_info("jeff", resp.get_json())
|
||||
|
||||
|
||||
def test_patch(client, users, tokens, validate_info):
|
||||
def test_patch(client, users, auth, validate_info):
|
||||
"""Test modifying user information as an administrator."""
|
||||
|
||||
# try with regular user to make sure it doesn't work
|
||||
resp = client.patch(
|
||||
"/users/jeff",
|
||||
json={"permissions": ["ADMIN"]},
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
@ -57,14 +57,14 @@ def test_patch(client, users, tokens, validate_info):
|
||||
resp = client.patch(
|
||||
"/users/jeff",
|
||||
json="hurr durr",
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
resp = client.patch(
|
||||
"/users/jeff",
|
||||
json={"permissions": ["ADMIN"]},
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
@ -73,20 +73,20 @@ def test_patch(client, users, tokens, validate_info):
|
||||
|
||||
# request new info
|
||||
resp = client.get(
|
||||
"/users/jeff", headers={"Authorization": f"bearer {tokens['jeff']}"}
|
||||
"/users/jeff", headers=auth("jeff")
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
validate_info("jeff", resp.get_json())
|
||||
|
||||
|
||||
def test_put(client, users, tokens, validate_info):
|
||||
def test_put(client, users, auth, validate_info):
|
||||
"""Test replacing user information as an administrator."""
|
||||
|
||||
# try with regular user to make sure it doesn't work
|
||||
resp = client.patch(
|
||||
"/users/jeff",
|
||||
json=dict(),
|
||||
headers={"Authorization": f"bearer {tokens['jeff']}"},
|
||||
headers=auth("jeff"),
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
@ -97,7 +97,7 @@ def test_put(client, users, tokens, validate_info):
|
||||
resp = client.put(
|
||||
"/users/jeff",
|
||||
json=user_schema.dump(new_data),
|
||||
headers={"Authorization": f"bearer {tokens['administrator']}"},
|
||||
headers=auth("administrator"),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
@ -106,7 +106,7 @@ def test_put(client, users, tokens, validate_info):
|
||||
|
||||
# request new info
|
||||
resp = client.get(
|
||||
"/users/jeff", headers={"Authorization": f"bearer {tokens['jeff']}"}
|
||||
"/users/jeff", headers=auth("jeff")
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
validate_info("jeff", resp.get_json())
|
||||
|
Loading…
Reference in New Issue
Block a user