Compare commits

...

6 Commits

Author SHA1 Message Date
c0d4be8363
tests/conftest.py: fix test for modifying another users' shares 2023-04-29 12:17:32 -04:00
424de4f282
/files: disallow modifying other users' shares 2023-04-29 12:09:32 -04:00
fa57548e0b
tests/test_anonymous.py: add more tests to prod at anonymous permissions 2023-04-29 11:57:56 -04:00
30899847fb
/files: make tests more robust 2023-04-27 21:16:10 -04:00
f97cfbbe33
/files/: add anon permissions 2023-04-27 20:32:46 -04:00
2388bac57f
reformatted code again
i should probably put Black in a commit hook or something
but nahhhh can't be that bad
2023-04-26 21:02:04 -04:00
9 changed files with 339 additions and 60 deletions

View File

@ -21,10 +21,18 @@ function sachet_set_perms -d "setup permissions"
http --session=sachet-admin patch $BASENAME/users/user permissions:='["READ", "CREATE", "LIST", "DELETE"]'
end
function sachet_anon -d "setup anonymous user's permissions"
if test -z $argv
set argv '["READ", "CREATE", "LIST", "DELETE"]'
end
http --session=sachet-admin patch $BASENAME/admin/settings default_permissions:=$argv
end
function sachet_upload -d "uploads a file"
argparse 's/session=?' -- $argv
set FNAME (basename $argv)
set URL (http --session=sachet-user post $BASENAME/files file_name=$FNAME | jq -r .url)
http --session=sachet-user -f post $BASENAME/$URL/content upload@$argv
set URL (http --session=$_flag_session post $BASENAME/files file_name=$FNAME | jq -r .url)
http --session=$_flag_session -f post $BASENAME/$URL/content upload@$argv
end
function sachet_upload_meme -d "uploads a random meme"
@ -33,19 +41,21 @@ function sachet_upload_meme -d "uploads a random meme"
end
function sachet_list -d "lists files on a given page"
argparse 'P/per-page=!_validate_int' -- $argv
argparse 'P/per-page=!_validate_int' 's/session=?' -- $argv
if not set -q _flag_per_page
set _flag_per_page 5
end
http --session=sachet-user get localhost:5000/files per_page=$_flag_per_page page=$argv[1]
http --session=$_flag_session get localhost:5000/files per_page=$_flag_per_page page=$argv[1]
end
function sachet_download -d "downloads a given file id"
http --session=sachet-user -f -d get $BASENAME/files/$argv/content
argparse 's/session=?' -- $argv
http --session=$_flag_session -f -d get $BASENAME/files/$argv/content
end
function sachet_delete -d "delete given file ids"
argparse 's/session=?' -- $argv
for file in $argv
http --session=sachet-user delete $BASENAME/files/$file
http --session=$_flag_session delete $BASENAME/files/$file
end
end

View File

@ -1,6 +1,6 @@
from flask import Blueprint, request, jsonify
from flask.views import MethodView
from sachet.server.models import ServerSettings, Permissions
from sachet.server.models import ServerSettings, get_settings, Permissions
from sachet.server import db
from sachet.server.views_common import auth_required, ModelAPI
@ -9,28 +9,19 @@ admin_blueprint = Blueprint("admin_blueprint", __name__)
class ServerSettingsAPI(ModelAPI):
def get_settings(self):
rows = ServerSettings.query.all()
if len(rows) == 0:
settings = ServerSettings()
db.session.add(settings)
db.session.commit()
return settings
return rows[-1]
@auth_required(required_permissions=(Permissions.ADMIN,))
def get(self, auth_user=None):
settings = self.get_settings()
settings = get_settings()
return super().get(settings)
@auth_required(required_permissions=(Permissions.ADMIN,))
def patch(self, auth_user=None):
settings = self.get_settings()
settings = get_settings()
return super().patch(settings)
@auth_required(required_permissions=(Permissions.ADMIN,))
def put(self, auth_user=None):
settings = self.get_settings()
settings = get_settings()
return super().put(settings)

View File

@ -10,30 +10,27 @@ files_blueprint = Blueprint("files_blueprint", __name__)
class FilesMetadataAPI(ModelAPI):
@auth_required(required_permissions=(Permissions.READ,))
@auth_required(required_permissions=(Permissions.READ,), allow_anonymous=True)
def get(self, share_id, auth_user=None):
share = Share.query.filter_by(share_id=share_id).first()
return super().get(share)
@auth_required(required_permissions=(Permissions.MODIFY,))
@auth_required(required_permissions=(Permissions.MODIFY,), allow_anonymous=True)
def patch(self, share_id, auth_user=None):
share = Share.query.filter_by(share_id=share_id).first()
return super().patch(share)
@auth_required(required_permissions=(Permissions.MODIFY,))
@auth_required(required_permissions=(Permissions.MODIFY,), allow_anonymous=True)
def put(self, share_id, auth_user=None):
share = Share.query.filter_by(share_id=share_id).first()
return super().put(share)
@auth_required(required_permissions=(Permissions.DELETE,))
@auth_required(required_permissions=(Permissions.DELETE,), allow_anonymous=True)
def delete(self, share_id, auth_user=None):
try:
uuid.UUID(share_id)
except ValueError:
return jsonify(dict(
status="fail",
message=f"Invalid ID: '{share_id}'."
))
return jsonify(dict(status="fail", message=f"Invalid ID: '{share_id}'."))
share = Share.query.filter_by(share_id=share_id).first()
return super().delete(share)
@ -46,13 +43,14 @@ files_blueprint.add_url_rule(
class FilesAPI(ModelListAPI):
@auth_required(required_permissions=(Permissions.CREATE,))
@auth_required(required_permissions=(Permissions.CREATE,), allow_anonymous=True)
def post(self, auth_user=None):
data = request.get_json()
data["owner_name"] = auth_user.username
if auth_user:
data["owner_name"] = auth_user.username
return super().post(Share, data)
@auth_required(required_permissions=(Permissions.LIST,))
@auth_required(required_permissions=(Permissions.LIST,), allow_anonymous=True)
def get(self, auth_user=None):
return super().get(Share)
@ -65,7 +63,7 @@ files_blueprint.add_url_rule(
class FileContentAPI(ModelAPI):
@auth_required(required_permissions=(Permissions.CREATE,))
@auth_required(required_permissions=(Permissions.CREATE,), allow_anonymous=True)
def post(self, share_id, auth_user=None):
share = Share.query.filter_by(share_id=uuid.UUID(share_id)).first()
@ -111,7 +109,7 @@ class FileContentAPI(ModelAPI):
201,
)
@auth_required(required_permissions=(Permissions.MODIFY,))
@auth_required(required_permissions=(Permissions.MODIFY,), allow_anonymous=True)
def put(self, share_id, auth_user=None):
share = Share.query.filter_by(share_id=share_id).first()
if not share:
@ -119,6 +117,17 @@ class FileContentAPI(ModelAPI):
jsonify({"status": "fail", "message": "This share does not exist."})
), 404
if auth_user != share.owner:
return (
jsonify(
{
"status": "fail",
"message": "Share must be modified by its owner.",
}
),
403,
)
if not share.initialized:
return (
jsonify(
@ -142,7 +151,7 @@ class FileContentAPI(ModelAPI):
200,
)
@auth_required(required_permissions=(Permissions.READ,))
@auth_required(required_permissions=(Permissions.READ,), allow_anonymous=True)
def get(self, share_id, auth_user=None):
share = Share.query.filter_by(share_id=share_id).first()
if not share:

View File

@ -192,6 +192,17 @@ class ServerSettings(db.Model):
return Schema()
def get_settings():
"""Return server settings, and create them if they don't exist."""
rows = ServerSettings.query.all()
if len(rows) == 0:
settings = ServerSettings()
db.session.add(settings)
db.session.commit()
return settings
return rows[-1]
class Share(db.Model):
"""Share for a single file.
@ -236,9 +247,10 @@ class Share(db.Model):
file_name = db.Column(db.String, nullable=False)
def __init__(self, owner_name, file_name=None):
def __init__(self, owner_name=None, file_name=None):
self.owner = User.query.filter_by(username=owner_name).first()
self.owner_name = self.owner.username
if self.owner:
self.owner_name = self.owner.username
self.share_id = uuid.uuid4()
self.url = url_for("files_blueprint.files_metadata_api", share_id=self.share_id)
self.create_date = datetime.datetime.now()

View File

@ -1,6 +1,6 @@
from flask import request, jsonify
from flask.views import MethodView
from sachet.server.models import Permissions, User, BlacklistToken
from sachet.server.models import Permissions, User, BlacklistToken, get_settings
from sachet.server import db
from functools import wraps
from marshmallow import ValidationError
@ -8,20 +8,21 @@ from bitmask import Bitmask
import jwt
def auth_required(func=None, *, required_permissions=()):
"""Decorator to require authentication.
# https://stackoverflow.com/questions/3888158/making-decorators-with-optional-arguments
def auth_required(func=None, *, required_permissions=(), allow_anonymous=False):
"""Require specific authentication.
Passes an argument 'user' to the function, with a User object corresponding
Passes an argument `user` to the function, with a User object corresponding
to the authenticated session.
Parameters
----------
required_permissions : tuple of Permissions
required_permissions : tuple of Permissions, optional
Permissions required to access this endpoint.
allow_anonymous : bool, optional
Allow anonymous authentication. This means the `user` parameter might be None.
"""
# see https://stackoverflow.com/questions/3888158/making-decorators-with-optional-arguments
def _decorate(f):
@wraps(f)
def decorator(*args, **kwargs):
@ -38,7 +39,28 @@ def auth_required(func=None, *, required_permissions=()):
return jsonify(resp), 401
if not token:
return jsonify({"status": "fail", "message": "Missing auth token"}), 401
if allow_anonymous:
server_settings = get_settings()
if (
Bitmask(AllFlags=Permissions, *required_permissions)
not in server_settings.default_permissions
):
return (
jsonify(
{
"status": "fail",
"message": "Missing permissions to access this page.",
}
),
401,
)
kwargs["auth_user"] = None
return f(*args, **kwargs)
else:
return (
jsonify({"status": "fail", "message": "Missing auth token"}),
401,
)
try:
data, user = User.read_token(token)
@ -79,7 +101,6 @@ def auth_required(func=None, *, required_permissions=()):
def patch(orig, diff):
"""Patch the dictionary orig recursively with the dictionary diff."""
# if we get to a leaf node, just replace it
if not isinstance(orig, dict) or not isinstance(diff, dict):
return diff
@ -245,14 +266,22 @@ class ModelListAPI(MethodView):
Number of next page (if this is not the last).
"""
json_data = request.get_json()
per_page = int(json_data.get("per_page", 15))
page = int(json_data.get("page", 1))
try:
per_page = int(json_data.get("per_page", 15))
page = int(json_data.get("page", 1))
except ValueError as e:
return jsonify(dict(
status="fail",
message=str(e),
)), 400
page_data = ModelClass.query.paginate(page=page, per_page=per_page)
data = [model.get_schema().dump(model) for model in page_data]
return jsonify(dict(
data=data,
prev=page_data.prev_num,
next=page_data.next_num,
))
return jsonify(
dict(
data=data,
prev=page_data.prev_num,
next=page_data.next_num,
)
)

View File

@ -75,7 +75,10 @@ def users(client):
dave=dict(
password="1234",
permissions=Bitmask(
Permissions.CREATE, Permissions.READ, Permissions.DELETE
Permissions.CREATE,
Permissions.READ,
Permissions.DELETE,
Permissions.MODIFY,
),
),
# admins don't have the other permissions by default,

210
tests/test_anonymous.py Normal file
View File

@ -0,0 +1,210 @@
import pytest
from io import BytesIO
from werkzeug.datastructures import FileStorage
import uuid
"""Test anonymous authentication to endpoints."""
def test_files(client, auth, rand):
# set create perm for anon users
resp = client.patch(
"/admin/settings",
headers=auth("administrator"),
json={"default_permissions": ["CREATE"]},
)
assert resp.status_code == 200
# create share
resp = client.post("/files", json={"file_name": "content.bin"})
assert resp.status_code == 201
data = resp.get_json()
url = data.get("url")
assert url is not None
assert "/files/" in url
upload_data = rand.randbytes(4000)
# upload file to share
resp = client.post(
url + "/content",
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
content_type="multipart/form-data",
)
assert resp.status_code == 201
# set read perm for anon users
resp = client.patch(
"/admin/settings",
headers=auth("administrator"),
json={"default_permissions": ["READ"]},
)
assert resp.status_code == 200
# read file
resp = client.get(
url + "/content",
)
assert resp.data == upload_data
assert "filename=content.bin" in resp.headers["Content-Disposition"].split("; ")
# set modify perm for anon users
resp = client.patch(
"/admin/settings",
headers=auth("administrator"),
json={"default_permissions": ["MODIFY"]},
)
assert resp.status_code == 200
# modify share
upload_data = rand.randbytes(4000)
resp = client.put(
url + "/content",
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
content_type="multipart/form-data",
)
assert resp.status_code == 200
resp = client.patch(
url,
json={"file_name": "new_bin.bin"},
)
# set read perm for anon users
resp = client.patch(
"/admin/settings",
headers=auth("administrator"),
json={"default_permissions": ["READ"]},
)
assert resp.status_code == 200
resp = client.get(
url + "/content",
)
assert resp.data == upload_data
assert "filename=new_bin.bin" in resp.headers["Content-Disposition"].split("; ")
# set list perm for anon users
resp = client.patch(
"/admin/settings",
headers=auth("administrator"),
json={"default_permissions": ["LIST"]},
)
assert resp.status_code == 200
# test listing file
resp = client.get(
"/files",
json={},
)
data = resp.get_json()
assert resp.status_code == 200
listed = data.get("data")
assert len(listed) == 1
assert listed[0].get("initialized") is True
# set delete perm for anon users
resp = client.patch(
"/admin/settings",
headers=auth("administrator"),
json={"default_permissions": ["DELETE"]},
)
assert resp.status_code == 200
# test deletion
resp = client.delete(
url,
)
assert resp.status_code == 200
# set delete perm for anon users
resp = client.patch(
"/admin/settings",
headers=auth("administrator"),
json={"default_permissions": ["READ"]},
)
assert resp.status_code == 200
# file shouldn't exist anymore
resp = client.get(
url + "/content",
)
assert resp.status_code == 404
def test_files_invalid(client, auth, rand):
# set create perm for anon users
resp = client.patch(
"/admin/settings",
headers=auth("administrator"),
json={"default_permissions": ["CREATE"]},
)
assert resp.status_code == 200
# create an uninitialized share
resp = client.post("/files", json={"file_name": "content.bin"})
assert resp.status_code == 201
data = resp.get_json()
uninit_url = data.get("url")
# upload a share
resp = client.post("/files", json={"file_name": "content.bin"})
assert resp.status_code == 201
data = resp.get_json()
url = data.get("url")
upload_data = rand.randbytes(4000)
resp = client.post(
url + "/content",
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
content_type="multipart/form-data",
)
assert resp.status_code == 201
# disable all permissions
resp = client.patch(
"/admin/settings",
headers=auth("administrator"),
json={"default_permissions": []},
)
assert resp.status_code == 200
# test initializing a share without perms
resp = client.post(
uninit_url + "/content",
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
content_type="multipart/form-data",
)
assert resp.status_code == 401
# test reading a share without perms
resp = client.get(url + "/content")
# test modifying an uninitialized share without perms
resp = client.put(
uninit_url + "/content",
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
content_type="multipart/form-data",
)
assert resp.status_code == 401
assert resp.status_code == 401
# test modifying a share without perms
resp = client.put(
url + "/content",
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
content_type="multipart/form-data",
)
assert resp.status_code == 401
# test deleting a share without perms
resp = client.delete(url)
assert resp.status_code == 401
# test modifying share metadata without perms
resp = client.patch(url)
resp = client.put(url)
assert resp.status_code == 401
# test reading share metadata without perms
resp = client.get(url)
assert resp.status_code == 401
# test listing shares without perms
resp = client.get("/files")
assert resp.status_code == 401
# test creating share without perms
resp = client.post("/files")
assert resp.status_code == 401

View File

@ -91,9 +91,7 @@ class TestSuite:
resp = client.put(
url + "/content",
headers=auth("jeff"),
data={
"upload": FileStorage(stream=BytesIO(new_data), filename="upload")
},
data={"upload": FileStorage(stream=BytesIO(new_data), filename="upload")},
)
assert resp.status_code == 200
@ -105,7 +103,6 @@ class TestSuite:
assert resp.data == new_data
assert "filename=new_bin.bin" in resp.headers["Content-Disposition"].split("; ")
def test_invalid(self, client, users, auth, rand):
"""Test invalid requests."""
@ -185,6 +182,18 @@ class TestSuite:
)
assert resp.status_code == 201
# test other user being unable to modify this share
resp = client.put(
url + "/content",
headers=auth("dave"),
data={
"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")
},
content_type="multipart/form-data",
)
assert resp.status_code == 403
# test not allowing re-upload
resp = client.post(
url + "/content",

View File

@ -45,10 +45,7 @@ def test_files(client, users, auth):
per_page = 9
while page is not None:
resp = client.get(
"/files", headers=auth("jeff"), json=dict(
page=page,
per_page=per_page
)
"/files", headers=auth("jeff"), json=dict(page=page, per_page=per_page)
)
assert resp.status_code == 200
@ -78,3 +75,12 @@ def test_files(client, users, auth):
end_page = paginate(forwards=True)
paginate(forwards=False, page=end_page)
def test_invalid(client, auth):
"""Test invalid requests to pagination."""
resp = client.get(
"/files", headers=auth("jeff"), json=dict(page="one", per_page="two")
)
assert resp.status_code == 400