Compare commits
6 Commits
bb3731cc6d
...
c0d4be8363
Author | SHA1 | Date | |
---|---|---|---|
c0d4be8363 | |||
424de4f282 | |||
fa57548e0b | |||
30899847fb | |||
f97cfbbe33 | |||
2388bac57f |
@ -21,10 +21,18 @@ function sachet_set_perms -d "setup permissions"
|
|||||||
http --session=sachet-admin patch $BASENAME/users/user permissions:='["READ", "CREATE", "LIST", "DELETE"]'
|
http --session=sachet-admin patch $BASENAME/users/user permissions:='["READ", "CREATE", "LIST", "DELETE"]'
|
||||||
end
|
end
|
||||||
|
|
||||||
|
function sachet_anon -d "setup anonymous user's permissions"
|
||||||
|
if test -z $argv
|
||||||
|
set argv '["READ", "CREATE", "LIST", "DELETE"]'
|
||||||
|
end
|
||||||
|
http --session=sachet-admin patch $BASENAME/admin/settings default_permissions:=$argv
|
||||||
|
end
|
||||||
|
|
||||||
function sachet_upload -d "uploads a file"
|
function sachet_upload -d "uploads a file"
|
||||||
|
argparse 's/session=?' -- $argv
|
||||||
set FNAME (basename $argv)
|
set FNAME (basename $argv)
|
||||||
set URL (http --session=sachet-user post $BASENAME/files file_name=$FNAME | jq -r .url)
|
set URL (http --session=$_flag_session post $BASENAME/files file_name=$FNAME | jq -r .url)
|
||||||
http --session=sachet-user -f post $BASENAME/$URL/content upload@$argv
|
http --session=$_flag_session -f post $BASENAME/$URL/content upload@$argv
|
||||||
end
|
end
|
||||||
|
|
||||||
function sachet_upload_meme -d "uploads a random meme"
|
function sachet_upload_meme -d "uploads a random meme"
|
||||||
@ -33,19 +41,21 @@ function sachet_upload_meme -d "uploads a random meme"
|
|||||||
end
|
end
|
||||||
|
|
||||||
function sachet_list -d "lists files on a given page"
|
function sachet_list -d "lists files on a given page"
|
||||||
argparse 'P/per-page=!_validate_int' -- $argv
|
argparse 'P/per-page=!_validate_int' 's/session=?' -- $argv
|
||||||
if not set -q _flag_per_page
|
if not set -q _flag_per_page
|
||||||
set _flag_per_page 5
|
set _flag_per_page 5
|
||||||
end
|
end
|
||||||
http --session=sachet-user get localhost:5000/files per_page=$_flag_per_page page=$argv[1]
|
http --session=$_flag_session get localhost:5000/files per_page=$_flag_per_page page=$argv[1]
|
||||||
end
|
end
|
||||||
|
|
||||||
function sachet_download -d "downloads a given file id"
|
function sachet_download -d "downloads a given file id"
|
||||||
http --session=sachet-user -f -d get $BASENAME/files/$argv/content
|
argparse 's/session=?' -- $argv
|
||||||
|
http --session=$_flag_session -f -d get $BASENAME/files/$argv/content
|
||||||
end
|
end
|
||||||
|
|
||||||
function sachet_delete -d "delete given file ids"
|
function sachet_delete -d "delete given file ids"
|
||||||
|
argparse 's/session=?' -- $argv
|
||||||
for file in $argv
|
for file in $argv
|
||||||
http --session=sachet-user delete $BASENAME/files/$file
|
http --session=$_flag_session delete $BASENAME/files/$file
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
from flask import Blueprint, request, jsonify
|
from flask import Blueprint, request, jsonify
|
||||||
from flask.views import MethodView
|
from flask.views import MethodView
|
||||||
from sachet.server.models import ServerSettings, Permissions
|
from sachet.server.models import ServerSettings, get_settings, Permissions
|
||||||
from sachet.server import db
|
from sachet.server import db
|
||||||
from sachet.server.views_common import auth_required, ModelAPI
|
from sachet.server.views_common import auth_required, ModelAPI
|
||||||
|
|
||||||
@ -9,28 +9,19 @@ admin_blueprint = Blueprint("admin_blueprint", __name__)
|
|||||||
|
|
||||||
|
|
||||||
class ServerSettingsAPI(ModelAPI):
|
class ServerSettingsAPI(ModelAPI):
|
||||||
def get_settings(self):
|
|
||||||
rows = ServerSettings.query.all()
|
|
||||||
if len(rows) == 0:
|
|
||||||
settings = ServerSettings()
|
|
||||||
db.session.add(settings)
|
|
||||||
db.session.commit()
|
|
||||||
return settings
|
|
||||||
return rows[-1]
|
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.ADMIN,))
|
@auth_required(required_permissions=(Permissions.ADMIN,))
|
||||||
def get(self, auth_user=None):
|
def get(self, auth_user=None):
|
||||||
settings = self.get_settings()
|
settings = get_settings()
|
||||||
return super().get(settings)
|
return super().get(settings)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.ADMIN,))
|
@auth_required(required_permissions=(Permissions.ADMIN,))
|
||||||
def patch(self, auth_user=None):
|
def patch(self, auth_user=None):
|
||||||
settings = self.get_settings()
|
settings = get_settings()
|
||||||
return super().patch(settings)
|
return super().patch(settings)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.ADMIN,))
|
@auth_required(required_permissions=(Permissions.ADMIN,))
|
||||||
def put(self, auth_user=None):
|
def put(self, auth_user=None):
|
||||||
settings = self.get_settings()
|
settings = get_settings()
|
||||||
return super().put(settings)
|
return super().put(settings)
|
||||||
|
|
||||||
|
|
||||||
|
@ -10,30 +10,27 @@ files_blueprint = Blueprint("files_blueprint", __name__)
|
|||||||
|
|
||||||
|
|
||||||
class FilesMetadataAPI(ModelAPI):
|
class FilesMetadataAPI(ModelAPI):
|
||||||
@auth_required(required_permissions=(Permissions.READ,))
|
@auth_required(required_permissions=(Permissions.READ,), allow_anonymous=True)
|
||||||
def get(self, share_id, auth_user=None):
|
def get(self, share_id, auth_user=None):
|
||||||
share = Share.query.filter_by(share_id=share_id).first()
|
share = Share.query.filter_by(share_id=share_id).first()
|
||||||
return super().get(share)
|
return super().get(share)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.MODIFY,))
|
@auth_required(required_permissions=(Permissions.MODIFY,), allow_anonymous=True)
|
||||||
def patch(self, share_id, auth_user=None):
|
def patch(self, share_id, auth_user=None):
|
||||||
share = Share.query.filter_by(share_id=share_id).first()
|
share = Share.query.filter_by(share_id=share_id).first()
|
||||||
return super().patch(share)
|
return super().patch(share)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.MODIFY,))
|
@auth_required(required_permissions=(Permissions.MODIFY,), allow_anonymous=True)
|
||||||
def put(self, share_id, auth_user=None):
|
def put(self, share_id, auth_user=None):
|
||||||
share = Share.query.filter_by(share_id=share_id).first()
|
share = Share.query.filter_by(share_id=share_id).first()
|
||||||
return super().put(share)
|
return super().put(share)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.DELETE,))
|
@auth_required(required_permissions=(Permissions.DELETE,), allow_anonymous=True)
|
||||||
def delete(self, share_id, auth_user=None):
|
def delete(self, share_id, auth_user=None):
|
||||||
try:
|
try:
|
||||||
uuid.UUID(share_id)
|
uuid.UUID(share_id)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return jsonify(dict(
|
return jsonify(dict(status="fail", message=f"Invalid ID: '{share_id}'."))
|
||||||
status="fail",
|
|
||||||
message=f"Invalid ID: '{share_id}'."
|
|
||||||
))
|
|
||||||
share = Share.query.filter_by(share_id=share_id).first()
|
share = Share.query.filter_by(share_id=share_id).first()
|
||||||
return super().delete(share)
|
return super().delete(share)
|
||||||
|
|
||||||
@ -46,13 +43,14 @@ files_blueprint.add_url_rule(
|
|||||||
|
|
||||||
|
|
||||||
class FilesAPI(ModelListAPI):
|
class FilesAPI(ModelListAPI):
|
||||||
@auth_required(required_permissions=(Permissions.CREATE,))
|
@auth_required(required_permissions=(Permissions.CREATE,), allow_anonymous=True)
|
||||||
def post(self, auth_user=None):
|
def post(self, auth_user=None):
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
|
if auth_user:
|
||||||
data["owner_name"] = auth_user.username
|
data["owner_name"] = auth_user.username
|
||||||
return super().post(Share, data)
|
return super().post(Share, data)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.LIST,))
|
@auth_required(required_permissions=(Permissions.LIST,), allow_anonymous=True)
|
||||||
def get(self, auth_user=None):
|
def get(self, auth_user=None):
|
||||||
return super().get(Share)
|
return super().get(Share)
|
||||||
|
|
||||||
@ -65,7 +63,7 @@ files_blueprint.add_url_rule(
|
|||||||
|
|
||||||
|
|
||||||
class FileContentAPI(ModelAPI):
|
class FileContentAPI(ModelAPI):
|
||||||
@auth_required(required_permissions=(Permissions.CREATE,))
|
@auth_required(required_permissions=(Permissions.CREATE,), allow_anonymous=True)
|
||||||
def post(self, share_id, auth_user=None):
|
def post(self, share_id, auth_user=None):
|
||||||
share = Share.query.filter_by(share_id=uuid.UUID(share_id)).first()
|
share = Share.query.filter_by(share_id=uuid.UUID(share_id)).first()
|
||||||
|
|
||||||
@ -111,7 +109,7 @@ class FileContentAPI(ModelAPI):
|
|||||||
201,
|
201,
|
||||||
)
|
)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.MODIFY,))
|
@auth_required(required_permissions=(Permissions.MODIFY,), allow_anonymous=True)
|
||||||
def put(self, share_id, auth_user=None):
|
def put(self, share_id, auth_user=None):
|
||||||
share = Share.query.filter_by(share_id=share_id).first()
|
share = Share.query.filter_by(share_id=share_id).first()
|
||||||
if not share:
|
if not share:
|
||||||
@ -119,6 +117,17 @@ class FileContentAPI(ModelAPI):
|
|||||||
jsonify({"status": "fail", "message": "This share does not exist."})
|
jsonify({"status": "fail", "message": "This share does not exist."})
|
||||||
), 404
|
), 404
|
||||||
|
|
||||||
|
if auth_user != share.owner:
|
||||||
|
return (
|
||||||
|
jsonify(
|
||||||
|
{
|
||||||
|
"status": "fail",
|
||||||
|
"message": "Share must be modified by its owner.",
|
||||||
|
}
|
||||||
|
),
|
||||||
|
403,
|
||||||
|
)
|
||||||
|
|
||||||
if not share.initialized:
|
if not share.initialized:
|
||||||
return (
|
return (
|
||||||
jsonify(
|
jsonify(
|
||||||
@ -142,7 +151,7 @@ class FileContentAPI(ModelAPI):
|
|||||||
200,
|
200,
|
||||||
)
|
)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.READ,))
|
@auth_required(required_permissions=(Permissions.READ,), allow_anonymous=True)
|
||||||
def get(self, share_id, auth_user=None):
|
def get(self, share_id, auth_user=None):
|
||||||
share = Share.query.filter_by(share_id=share_id).first()
|
share = Share.query.filter_by(share_id=share_id).first()
|
||||||
if not share:
|
if not share:
|
||||||
|
@ -192,6 +192,17 @@ class ServerSettings(db.Model):
|
|||||||
return Schema()
|
return Schema()
|
||||||
|
|
||||||
|
|
||||||
|
def get_settings():
|
||||||
|
"""Return server settings, and create them if they don't exist."""
|
||||||
|
rows = ServerSettings.query.all()
|
||||||
|
if len(rows) == 0:
|
||||||
|
settings = ServerSettings()
|
||||||
|
db.session.add(settings)
|
||||||
|
db.session.commit()
|
||||||
|
return settings
|
||||||
|
return rows[-1]
|
||||||
|
|
||||||
|
|
||||||
class Share(db.Model):
|
class Share(db.Model):
|
||||||
"""Share for a single file.
|
"""Share for a single file.
|
||||||
|
|
||||||
@ -236,8 +247,9 @@ class Share(db.Model):
|
|||||||
|
|
||||||
file_name = db.Column(db.String, nullable=False)
|
file_name = db.Column(db.String, nullable=False)
|
||||||
|
|
||||||
def __init__(self, owner_name, file_name=None):
|
def __init__(self, owner_name=None, file_name=None):
|
||||||
self.owner = User.query.filter_by(username=owner_name).first()
|
self.owner = User.query.filter_by(username=owner_name).first()
|
||||||
|
if self.owner:
|
||||||
self.owner_name = self.owner.username
|
self.owner_name = self.owner.username
|
||||||
self.share_id = uuid.uuid4()
|
self.share_id = uuid.uuid4()
|
||||||
self.url = url_for("files_blueprint.files_metadata_api", share_id=self.share_id)
|
self.url = url_for("files_blueprint.files_metadata_api", share_id=self.share_id)
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
from flask import request, jsonify
|
from flask import request, jsonify
|
||||||
from flask.views import MethodView
|
from flask.views import MethodView
|
||||||
from sachet.server.models import Permissions, User, BlacklistToken
|
from sachet.server.models import Permissions, User, BlacklistToken, get_settings
|
||||||
from sachet.server import db
|
from sachet.server import db
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from marshmallow import ValidationError
|
from marshmallow import ValidationError
|
||||||
@ -8,20 +8,21 @@ from bitmask import Bitmask
|
|||||||
import jwt
|
import jwt
|
||||||
|
|
||||||
|
|
||||||
def auth_required(func=None, *, required_permissions=()):
|
# https://stackoverflow.com/questions/3888158/making-decorators-with-optional-arguments
|
||||||
"""Decorator to require authentication.
|
def auth_required(func=None, *, required_permissions=(), allow_anonymous=False):
|
||||||
|
"""Require specific authentication.
|
||||||
|
|
||||||
Passes an argument 'user' to the function, with a User object corresponding
|
Passes an argument `user` to the function, with a User object corresponding
|
||||||
to the authenticated session.
|
to the authenticated session.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
|
required_permissions : tuple of Permissions, optional
|
||||||
required_permissions : tuple of Permissions
|
|
||||||
Permissions required to access this endpoint.
|
Permissions required to access this endpoint.
|
||||||
|
allow_anonymous : bool, optional
|
||||||
|
Allow anonymous authentication. This means the `user` parameter might be None.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# see https://stackoverflow.com/questions/3888158/making-decorators-with-optional-arguments
|
|
||||||
def _decorate(f):
|
def _decorate(f):
|
||||||
@wraps(f)
|
@wraps(f)
|
||||||
def decorator(*args, **kwargs):
|
def decorator(*args, **kwargs):
|
||||||
@ -38,7 +39,28 @@ def auth_required(func=None, *, required_permissions=()):
|
|||||||
return jsonify(resp), 401
|
return jsonify(resp), 401
|
||||||
|
|
||||||
if not token:
|
if not token:
|
||||||
return jsonify({"status": "fail", "message": "Missing auth token"}), 401
|
if allow_anonymous:
|
||||||
|
server_settings = get_settings()
|
||||||
|
if (
|
||||||
|
Bitmask(AllFlags=Permissions, *required_permissions)
|
||||||
|
not in server_settings.default_permissions
|
||||||
|
):
|
||||||
|
return (
|
||||||
|
jsonify(
|
||||||
|
{
|
||||||
|
"status": "fail",
|
||||||
|
"message": "Missing permissions to access this page.",
|
||||||
|
}
|
||||||
|
),
|
||||||
|
401,
|
||||||
|
)
|
||||||
|
kwargs["auth_user"] = None
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
else:
|
||||||
|
return (
|
||||||
|
jsonify({"status": "fail", "message": "Missing auth token"}),
|
||||||
|
401,
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data, user = User.read_token(token)
|
data, user = User.read_token(token)
|
||||||
@ -79,7 +101,6 @@ def auth_required(func=None, *, required_permissions=()):
|
|||||||
|
|
||||||
def patch(orig, diff):
|
def patch(orig, diff):
|
||||||
"""Patch the dictionary orig recursively with the dictionary diff."""
|
"""Patch the dictionary orig recursively with the dictionary diff."""
|
||||||
|
|
||||||
# if we get to a leaf node, just replace it
|
# if we get to a leaf node, just replace it
|
||||||
if not isinstance(orig, dict) or not isinstance(diff, dict):
|
if not isinstance(orig, dict) or not isinstance(diff, dict):
|
||||||
return diff
|
return diff
|
||||||
@ -245,14 +266,22 @@ class ModelListAPI(MethodView):
|
|||||||
Number of next page (if this is not the last).
|
Number of next page (if this is not the last).
|
||||||
"""
|
"""
|
||||||
json_data = request.get_json()
|
json_data = request.get_json()
|
||||||
|
try:
|
||||||
per_page = int(json_data.get("per_page", 15))
|
per_page = int(json_data.get("per_page", 15))
|
||||||
page = int(json_data.get("page", 1))
|
page = int(json_data.get("page", 1))
|
||||||
|
except ValueError as e:
|
||||||
|
return jsonify(dict(
|
||||||
|
status="fail",
|
||||||
|
message=str(e),
|
||||||
|
)), 400
|
||||||
|
|
||||||
page_data = ModelClass.query.paginate(page=page, per_page=per_page)
|
page_data = ModelClass.query.paginate(page=page, per_page=per_page)
|
||||||
data = [model.get_schema().dump(model) for model in page_data]
|
data = [model.get_schema().dump(model) for model in page_data]
|
||||||
|
|
||||||
return jsonify(dict(
|
return jsonify(
|
||||||
|
dict(
|
||||||
data=data,
|
data=data,
|
||||||
prev=page_data.prev_num,
|
prev=page_data.prev_num,
|
||||||
next=page_data.next_num,
|
next=page_data.next_num,
|
||||||
))
|
)
|
||||||
|
)
|
||||||
|
@ -75,7 +75,10 @@ def users(client):
|
|||||||
dave=dict(
|
dave=dict(
|
||||||
password="1234",
|
password="1234",
|
||||||
permissions=Bitmask(
|
permissions=Bitmask(
|
||||||
Permissions.CREATE, Permissions.READ, Permissions.DELETE
|
Permissions.CREATE,
|
||||||
|
Permissions.READ,
|
||||||
|
Permissions.DELETE,
|
||||||
|
Permissions.MODIFY,
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
# admins don't have the other permissions by default,
|
# admins don't have the other permissions by default,
|
||||||
|
210
tests/test_anonymous.py
Normal file
210
tests/test_anonymous.py
Normal file
@ -0,0 +1,210 @@
|
|||||||
|
import pytest
|
||||||
|
from io import BytesIO
|
||||||
|
from werkzeug.datastructures import FileStorage
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
"""Test anonymous authentication to endpoints."""
|
||||||
|
|
||||||
|
|
||||||
|
def test_files(client, auth, rand):
|
||||||
|
# set create perm for anon users
|
||||||
|
resp = client.patch(
|
||||||
|
"/admin/settings",
|
||||||
|
headers=auth("administrator"),
|
||||||
|
json={"default_permissions": ["CREATE"]},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
# create share
|
||||||
|
resp = client.post("/files", json={"file_name": "content.bin"})
|
||||||
|
assert resp.status_code == 201
|
||||||
|
|
||||||
|
data = resp.get_json()
|
||||||
|
url = data.get("url")
|
||||||
|
|
||||||
|
assert url is not None
|
||||||
|
assert "/files/" in url
|
||||||
|
|
||||||
|
upload_data = rand.randbytes(4000)
|
||||||
|
|
||||||
|
# upload file to share
|
||||||
|
resp = client.post(
|
||||||
|
url + "/content",
|
||||||
|
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
|
||||||
|
content_type="multipart/form-data",
|
||||||
|
)
|
||||||
|
assert resp.status_code == 201
|
||||||
|
|
||||||
|
# set read perm for anon users
|
||||||
|
resp = client.patch(
|
||||||
|
"/admin/settings",
|
||||||
|
headers=auth("administrator"),
|
||||||
|
json={"default_permissions": ["READ"]},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
# read file
|
||||||
|
resp = client.get(
|
||||||
|
url + "/content",
|
||||||
|
)
|
||||||
|
assert resp.data == upload_data
|
||||||
|
assert "filename=content.bin" in resp.headers["Content-Disposition"].split("; ")
|
||||||
|
|
||||||
|
# set modify perm for anon users
|
||||||
|
resp = client.patch(
|
||||||
|
"/admin/settings",
|
||||||
|
headers=auth("administrator"),
|
||||||
|
json={"default_permissions": ["MODIFY"]},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
# modify share
|
||||||
|
upload_data = rand.randbytes(4000)
|
||||||
|
resp = client.put(
|
||||||
|
url + "/content",
|
||||||
|
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
|
||||||
|
content_type="multipart/form-data",
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
resp = client.patch(
|
||||||
|
url,
|
||||||
|
json={"file_name": "new_bin.bin"},
|
||||||
|
)
|
||||||
|
# set read perm for anon users
|
||||||
|
resp = client.patch(
|
||||||
|
"/admin/settings",
|
||||||
|
headers=auth("administrator"),
|
||||||
|
json={"default_permissions": ["READ"]},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
resp = client.get(
|
||||||
|
url + "/content",
|
||||||
|
)
|
||||||
|
assert resp.data == upload_data
|
||||||
|
assert "filename=new_bin.bin" in resp.headers["Content-Disposition"].split("; ")
|
||||||
|
|
||||||
|
# set list perm for anon users
|
||||||
|
resp = client.patch(
|
||||||
|
"/admin/settings",
|
||||||
|
headers=auth("administrator"),
|
||||||
|
json={"default_permissions": ["LIST"]},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
# test listing file
|
||||||
|
resp = client.get(
|
||||||
|
"/files",
|
||||||
|
json={},
|
||||||
|
)
|
||||||
|
data = resp.get_json()
|
||||||
|
assert resp.status_code == 200
|
||||||
|
listed = data.get("data")
|
||||||
|
assert len(listed) == 1
|
||||||
|
assert listed[0].get("initialized") is True
|
||||||
|
|
||||||
|
# set delete perm for anon users
|
||||||
|
resp = client.patch(
|
||||||
|
"/admin/settings",
|
||||||
|
headers=auth("administrator"),
|
||||||
|
json={"default_permissions": ["DELETE"]},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
# test deletion
|
||||||
|
resp = client.delete(
|
||||||
|
url,
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
# set delete perm for anon users
|
||||||
|
resp = client.patch(
|
||||||
|
"/admin/settings",
|
||||||
|
headers=auth("administrator"),
|
||||||
|
json={"default_permissions": ["READ"]},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
# file shouldn't exist anymore
|
||||||
|
resp = client.get(
|
||||||
|
url + "/content",
|
||||||
|
)
|
||||||
|
assert resp.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_files_invalid(client, auth, rand):
|
||||||
|
# set create perm for anon users
|
||||||
|
resp = client.patch(
|
||||||
|
"/admin/settings",
|
||||||
|
headers=auth("administrator"),
|
||||||
|
json={"default_permissions": ["CREATE"]},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
# create an uninitialized share
|
||||||
|
resp = client.post("/files", json={"file_name": "content.bin"})
|
||||||
|
assert resp.status_code == 201
|
||||||
|
data = resp.get_json()
|
||||||
|
uninit_url = data.get("url")
|
||||||
|
|
||||||
|
# upload a share
|
||||||
|
resp = client.post("/files", json={"file_name": "content.bin"})
|
||||||
|
assert resp.status_code == 201
|
||||||
|
data = resp.get_json()
|
||||||
|
url = data.get("url")
|
||||||
|
upload_data = rand.randbytes(4000)
|
||||||
|
resp = client.post(
|
||||||
|
url + "/content",
|
||||||
|
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
|
||||||
|
content_type="multipart/form-data",
|
||||||
|
)
|
||||||
|
assert resp.status_code == 201
|
||||||
|
|
||||||
|
# disable all permissions
|
||||||
|
resp = client.patch(
|
||||||
|
"/admin/settings",
|
||||||
|
headers=auth("administrator"),
|
||||||
|
json={"default_permissions": []},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
|
||||||
|
# test initializing a share without perms
|
||||||
|
resp = client.post(
|
||||||
|
uninit_url + "/content",
|
||||||
|
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
|
||||||
|
content_type="multipart/form-data",
|
||||||
|
)
|
||||||
|
assert resp.status_code == 401
|
||||||
|
# test reading a share without perms
|
||||||
|
resp = client.get(url + "/content")
|
||||||
|
# test modifying an uninitialized share without perms
|
||||||
|
resp = client.put(
|
||||||
|
uninit_url + "/content",
|
||||||
|
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
|
||||||
|
content_type="multipart/form-data",
|
||||||
|
)
|
||||||
|
assert resp.status_code == 401
|
||||||
|
assert resp.status_code == 401
|
||||||
|
# test modifying a share without perms
|
||||||
|
resp = client.put(
|
||||||
|
url + "/content",
|
||||||
|
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
|
||||||
|
content_type="multipart/form-data",
|
||||||
|
)
|
||||||
|
assert resp.status_code == 401
|
||||||
|
|
||||||
|
# test deleting a share without perms
|
||||||
|
resp = client.delete(url)
|
||||||
|
assert resp.status_code == 401
|
||||||
|
# test modifying share metadata without perms
|
||||||
|
resp = client.patch(url)
|
||||||
|
resp = client.put(url)
|
||||||
|
assert resp.status_code == 401
|
||||||
|
# test reading share metadata without perms
|
||||||
|
resp = client.get(url)
|
||||||
|
assert resp.status_code == 401
|
||||||
|
|
||||||
|
# test listing shares without perms
|
||||||
|
resp = client.get("/files")
|
||||||
|
assert resp.status_code == 401
|
||||||
|
# test creating share without perms
|
||||||
|
resp = client.post("/files")
|
||||||
|
assert resp.status_code == 401
|
@ -91,9 +91,7 @@ class TestSuite:
|
|||||||
resp = client.put(
|
resp = client.put(
|
||||||
url + "/content",
|
url + "/content",
|
||||||
headers=auth("jeff"),
|
headers=auth("jeff"),
|
||||||
data={
|
data={"upload": FileStorage(stream=BytesIO(new_data), filename="upload")},
|
||||||
"upload": FileStorage(stream=BytesIO(new_data), filename="upload")
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
||||||
@ -105,7 +103,6 @@ class TestSuite:
|
|||||||
assert resp.data == new_data
|
assert resp.data == new_data
|
||||||
assert "filename=new_bin.bin" in resp.headers["Content-Disposition"].split("; ")
|
assert "filename=new_bin.bin" in resp.headers["Content-Disposition"].split("; ")
|
||||||
|
|
||||||
|
|
||||||
def test_invalid(self, client, users, auth, rand):
|
def test_invalid(self, client, users, auth, rand):
|
||||||
"""Test invalid requests."""
|
"""Test invalid requests."""
|
||||||
|
|
||||||
@ -185,6 +182,18 @@ class TestSuite:
|
|||||||
)
|
)
|
||||||
assert resp.status_code == 201
|
assert resp.status_code == 201
|
||||||
|
|
||||||
|
# test other user being unable to modify this share
|
||||||
|
resp = client.put(
|
||||||
|
url + "/content",
|
||||||
|
headers=auth("dave"),
|
||||||
|
data={
|
||||||
|
"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")
|
||||||
|
},
|
||||||
|
content_type="multipart/form-data",
|
||||||
|
)
|
||||||
|
assert resp.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
# test not allowing re-upload
|
# test not allowing re-upload
|
||||||
resp = client.post(
|
resp = client.post(
|
||||||
url + "/content",
|
url + "/content",
|
||||||
|
@ -45,10 +45,7 @@ def test_files(client, users, auth):
|
|||||||
per_page = 9
|
per_page = 9
|
||||||
while page is not None:
|
while page is not None:
|
||||||
resp = client.get(
|
resp = client.get(
|
||||||
"/files", headers=auth("jeff"), json=dict(
|
"/files", headers=auth("jeff"), json=dict(page=page, per_page=per_page)
|
||||||
page=page,
|
|
||||||
per_page=per_page
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
||||||
@ -78,3 +75,12 @@ def test_files(client, users, auth):
|
|||||||
|
|
||||||
end_page = paginate(forwards=True)
|
end_page = paginate(forwards=True)
|
||||||
paginate(forwards=False, page=end_page)
|
paginate(forwards=False, page=end_page)
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalid(client, auth):
|
||||||
|
"""Test invalid requests to pagination."""
|
||||||
|
|
||||||
|
resp = client.get(
|
||||||
|
"/files", headers=auth("jeff"), json=dict(page="one", per_page="two")
|
||||||
|
)
|
||||||
|
assert resp.status_code == 400
|
||||||
|
Loading…
x
Reference in New Issue
Block a user