/files: add more tests

This commit is contained in:
dogeystamp 2023-04-15 16:33:50 -04:00
parent f29fd193fb
commit ae0cc950f5
Signed by: dogeystamp
GPG Key ID: 7225FE3592EFFA38
5 changed files with 152 additions and 44 deletions

View File

@ -97,6 +97,13 @@ class ModelAPI(MethodView):
"""Generic REST API for interacting with models.""" """Generic REST API for interacting with models."""
def get(self, model): def get(self, model):
if not model:
resp = {
"status": "fail",
"message": "This resource does not exist.",
}
return jsonify(resp), 404
return jsonify(model.get_schema().dump(model)) return jsonify(model.get_schema().dump(model))
def patch(self, model): def patch(self, model):

View File

@ -63,11 +63,49 @@ def users(client):
""" """
userinfo = dict( userinfo = dict(
jeff=dict( jeff=dict(
password="1234",
permissions=Bitmask(
Permissions.CREATE,
Permissions.READ,
Permissions.DELETE,
Permissions.MODIFY,
),
),
dave=dict(
password="1234", password="1234",
permissions=Bitmask( permissions=Bitmask(
Permissions.CREATE, Permissions.READ, Permissions.DELETE Permissions.CREATE, Permissions.READ, Permissions.DELETE
), ),
), ),
# admins don't have the other permissions by default,
# but admins can add perms to themselves
no_create_user=dict(
password="password",
permissions=Bitmask(
Permissions.READ,
Permissions.DELETE,
Permissions.ADMIN,
Permissions.MODIFY,
),
),
no_read_user=dict(
password="password",
permissions=Bitmask(
Permissions.CREATE,
Permissions.DELETE,
Permissions.ADMIN,
Permissions.MODIFY,
),
),
no_modify_user=dict(
password="password",
permissions=Bitmask(
Permissions.CREATE,
Permissions.DELETE,
Permissions.ADMIN,
Permissions.READ,
),
),
administrator=dict(password="4321", permissions=Bitmask(Permissions.ADMIN)), administrator=dict(password="4321", permissions=Bitmask(Permissions.ADMIN)),
) )
@ -140,6 +178,7 @@ def auth(tokens):
dict dict
Dictionary of all headers. Dictionary of all headers.
""" """
def auth_headers(username, data={}): def auth_headers(username, data={}):
ret = {"Authorization": f"bearer {tokens[username]}"} ret = {"Authorization": f"bearer {tokens[username]}"}
ret.update(data) ret.update(data)

View File

@ -73,9 +73,7 @@ def test_extend(client, tokens, validate_info, auth):
"""Test extending the token lifespan (get a new one with later expiry).""" """Test extending the token lifespan (get a new one with later expiry)."""
# obtain new token # obtain new token
resp = client.post( resp = client.post("/users/extend", headers=auth("jeff"))
"/users/extend", headers=auth("jeff")
)
assert resp.status_code == 200 assert resp.status_code == 200
resp_json = resp.get_json() resp_json = resp.get_json()
new_token = resp_json.get("auth_token") new_token = resp_json.get("auth_token")
@ -108,9 +106,7 @@ def test_logout(client, tokens, validate_info, auth):
assert resp.status_code == 401 assert resp.status_code == 401
# missing token # missing token
resp = client.post( resp = client.post("/users/logout", json={}, headers=auth("jeff"))
"/users/logout", json={}, headers=auth("jeff")
)
assert resp.status_code == 400 assert resp.status_code == 400
# invalid token # invalid token
@ -130,9 +126,7 @@ def test_logout(client, tokens, validate_info, auth):
assert resp.status_code == 403 assert resp.status_code == 403
# check that we can access this endpoint before logging out # check that we can access this endpoint before logging out
resp = client.get( resp = client.get("/users/jeff", headers=auth("jeff"))
"/users/jeff", headers=auth("jeff")
)
assert resp.status_code == 200 assert resp.status_code == 200
validate_info("jeff", resp.get_json()) validate_info("jeff", resp.get_json())
@ -146,9 +140,7 @@ def test_logout(client, tokens, validate_info, auth):
# check that the logout worked # check that the logout worked
resp = client.get( resp = client.get("/users/jeff", headers=auth("jeff"))
"/users/jeff", headers=auth("jeff")
)
assert resp.status_code == 401 assert resp.status_code == 401
@ -164,9 +156,7 @@ def test_admin_revoke(client, tokens, validate_info, auth):
# check that the logout worked # check that the logout worked
resp = client.get( resp = client.get("/users/jeff", headers=auth("jeff"))
"/users/jeff", headers=auth("jeff")
)
assert resp.status_code == 401 assert resp.status_code == 401
# try revoking twice # try revoking twice

View File

@ -1,6 +1,7 @@
import pytest import pytest
from io import BytesIO from io import BytesIO
from werkzeug.datastructures import FileStorage from werkzeug.datastructures import FileStorage
import uuid
"""Test file share endpoints.""" """Test file share endpoints."""
@ -11,9 +12,7 @@ from werkzeug.datastructures import FileStorage
class TestSuite: class TestSuite:
def test_sharing(self, client, users, auth, rand): def test_sharing(self, client, users, auth, rand):
# create share # create share
resp = client.post( resp = client.post("/files", headers=auth("jeff"))
"/files", headers=auth("jeff")
)
assert resp.status_code == 201 assert resp.status_code == 201
data = resp.get_json() data = resp.get_json()
@ -35,17 +34,6 @@ class TestSuite:
) )
assert resp.status_code == 201 assert resp.status_code == 201
# test not allowing re-upload
resp = client.post(
url + "/content",
headers=auth("jeff"),
data={
"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")
},
content_type="multipart/form-data",
)
assert resp.status_code == 423
# read file # read file
resp = client.get( resp = client.get(
url + "/content", url + "/content",
@ -66,3 +54,97 @@ class TestSuite:
headers=auth("jeff"), headers=auth("jeff"),
) )
assert resp.status_code == 404 assert resp.status_code == 404
def test_invalid(self, client, users, auth, rand):
"""Test invalid requests."""
upload_data = rand.randbytes(4000)
# unauthenticated
resp = client.post("/files")
assert resp.status_code == 401
# non-existent
resp = client.get("/files/" + str(uuid.UUID(int=0)), headers=auth("jeff"))
assert resp.status_code == 404
resp = client.get(
"/files/" + str(uuid.UUID(int=0)) + "/content", headers=auth("jeff")
)
assert resp.status_code == 404
resp = client.post(
"/files/" + str(uuid.UUID(int=0)) + "/content", headers=auth("jeff")
)
assert resp.status_code == 404
resp = client.put(
"/files/" + str(uuid.UUID(int=0)) + "/content", headers=auth("jeff")
)
assert resp.status_code == 404
# no CREATE permission
resp = client.post("/files", headers=auth("no_create_user"))
assert resp.status_code == 403
# valid share creation to move on to testing content endpoint
resp = client.post("/files", headers=auth("jeff"))
assert resp.status_code == 201
data = resp.get_json()
url = data.get("url")
# test invalid methods
resp = client.put(
url + "/content",
headers=auth("jeff"),
data={
"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")
},
content_type="multipart/form-data",
)
assert resp.status_code == 423
resp = client.patch(
url + "/content",
headers=auth("jeff"),
data={
"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")
},
content_type="multipart/form-data",
)
assert resp.status_code == 405
# test other user being unable to upload to this share
resp = client.post(
url + "/content",
headers=auth("dave"),
data={
"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")
},
content_type="multipart/form-data",
)
assert resp.status_code == 403
# upload file to share (properly)
resp = client.post(
url + "/content",
headers=auth("jeff"),
data={
"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")
},
content_type="multipart/form-data",
)
assert resp.status_code == 201
# test not allowing re-upload
resp = client.post(
url + "/content",
headers=auth("jeff"),
data={
"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")
},
content_type="multipart/form-data",
)
assert resp.status_code == 423
# no READ permission
resp = client.get(url, headers=auth("no_read_user"))
assert resp.status_code == 403
resp = client.get(url + "/content", headers=auth("no_read_user"))
assert resp.status_code == 403

View File

@ -10,16 +10,12 @@ def test_get(client, auth, validate_info):
"""Test accessing the user information endpoint as a normal user.""" """Test accessing the user information endpoint as a normal user."""
# access user info endpoint # access user info endpoint
resp = client.get( resp = client.get("/users/jeff", headers=auth("jeff"))
"/users/jeff", headers=auth("jeff")
)
assert resp.status_code == 200 assert resp.status_code == 200
validate_info("jeff", resp.get_json()) validate_info("jeff", resp.get_json())
# access other user's info endpoint # access other user's info endpoint
resp = client.get( resp = client.get("/users/administrator", headers=auth("jeff"))
"/users/administrator", headers=auth("jeff")
)
assert resp.status_code == 403 assert resp.status_code == 403
@ -35,9 +31,7 @@ def test_userinfo_admin(client, auth, validate_info):
validate_info("administrator", resp.get_json()) validate_info("administrator", resp.get_json())
# now test accessing other user's info # now test accessing other user's info
resp = client.get( resp = client.get("/users/jeff", headers=auth("administrator"))
"/users/jeff", headers=auth("administrator")
)
assert resp.status_code == 200 assert resp.status_code == 200
validate_info("jeff", resp.get_json()) validate_info("jeff", resp.get_json())
@ -72,9 +66,7 @@ def test_patch(client, users, auth, validate_info):
users["jeff"]["permissions"] = Bitmask(Permissions.ADMIN) users["jeff"]["permissions"] = Bitmask(Permissions.ADMIN)
# request new info # request new info
resp = client.get( resp = client.get("/users/jeff", headers=auth("jeff"))
"/users/jeff", headers=auth("jeff")
)
assert resp.status_code == 200 assert resp.status_code == 200
validate_info("jeff", resp.get_json()) validate_info("jeff", resp.get_json())
@ -105,8 +97,6 @@ def test_put(client, users, auth, validate_info):
users["jeff"]["permissions"] = Bitmask(Permissions.ADMIN) users["jeff"]["permissions"] = Bitmask(Permissions.ADMIN)
# request new info # request new info
resp = client.get( resp = client.get("/users/jeff", headers=auth("jeff"))
"/users/jeff", headers=auth("jeff")
)
assert resp.status_code == 200 assert resp.status_code == 200
validate_info("jeff", resp.get_json()) validate_info("jeff", resp.get_json())