Compare commits
No commits in common. "c0d4be83636bf5375c4c46d97a8e8c2da234bbdd" and "bb3731cc6dee65de64307fe8342a1ff33b2941b7" have entirely different histories.
c0d4be8363
...
bb3731cc6d
@ -21,18 +21,10 @@ function sachet_set_perms -d "setup permissions"
|
|||||||
http --session=sachet-admin patch $BASENAME/users/user permissions:='["READ", "CREATE", "LIST", "DELETE"]'
|
http --session=sachet-admin patch $BASENAME/users/user permissions:='["READ", "CREATE", "LIST", "DELETE"]'
|
||||||
end
|
end
|
||||||
|
|
||||||
function sachet_anon -d "setup anonymous user's permissions"
|
|
||||||
if test -z $argv
|
|
||||||
set argv '["READ", "CREATE", "LIST", "DELETE"]'
|
|
||||||
end
|
|
||||||
http --session=sachet-admin patch $BASENAME/admin/settings default_permissions:=$argv
|
|
||||||
end
|
|
||||||
|
|
||||||
function sachet_upload -d "uploads a file"
|
function sachet_upload -d "uploads a file"
|
||||||
argparse 's/session=?' -- $argv
|
|
||||||
set FNAME (basename $argv)
|
set FNAME (basename $argv)
|
||||||
set URL (http --session=$_flag_session post $BASENAME/files file_name=$FNAME | jq -r .url)
|
set URL (http --session=sachet-user post $BASENAME/files file_name=$FNAME | jq -r .url)
|
||||||
http --session=$_flag_session -f post $BASENAME/$URL/content upload@$argv
|
http --session=sachet-user -f post $BASENAME/$URL/content upload@$argv
|
||||||
end
|
end
|
||||||
|
|
||||||
function sachet_upload_meme -d "uploads a random meme"
|
function sachet_upload_meme -d "uploads a random meme"
|
||||||
@ -41,21 +33,19 @@ function sachet_upload_meme -d "uploads a random meme"
|
|||||||
end
|
end
|
||||||
|
|
||||||
function sachet_list -d "lists files on a given page"
|
function sachet_list -d "lists files on a given page"
|
||||||
argparse 'P/per-page=!_validate_int' 's/session=?' -- $argv
|
argparse 'P/per-page=!_validate_int' -- $argv
|
||||||
if not set -q _flag_per_page
|
if not set -q _flag_per_page
|
||||||
set _flag_per_page 5
|
set _flag_per_page 5
|
||||||
end
|
end
|
||||||
http --session=$_flag_session get localhost:5000/files per_page=$_flag_per_page page=$argv[1]
|
http --session=sachet-user get localhost:5000/files per_page=$_flag_per_page page=$argv[1]
|
||||||
end
|
end
|
||||||
|
|
||||||
function sachet_download -d "downloads a given file id"
|
function sachet_download -d "downloads a given file id"
|
||||||
argparse 's/session=?' -- $argv
|
http --session=sachet-user -f -d get $BASENAME/files/$argv/content
|
||||||
http --session=$_flag_session -f -d get $BASENAME/files/$argv/content
|
|
||||||
end
|
end
|
||||||
|
|
||||||
function sachet_delete -d "delete given file ids"
|
function sachet_delete -d "delete given file ids"
|
||||||
argparse 's/session=?' -- $argv
|
|
||||||
for file in $argv
|
for file in $argv
|
||||||
http --session=$_flag_session delete $BASENAME/files/$file
|
http --session=sachet-user delete $BASENAME/files/$file
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
from flask import Blueprint, request, jsonify
|
from flask import Blueprint, request, jsonify
|
||||||
from flask.views import MethodView
|
from flask.views import MethodView
|
||||||
from sachet.server.models import ServerSettings, get_settings, Permissions
|
from sachet.server.models import ServerSettings, Permissions
|
||||||
from sachet.server import db
|
from sachet.server import db
|
||||||
from sachet.server.views_common import auth_required, ModelAPI
|
from sachet.server.views_common import auth_required, ModelAPI
|
||||||
|
|
||||||
@ -9,19 +9,28 @@ admin_blueprint = Blueprint("admin_blueprint", __name__)
|
|||||||
|
|
||||||
|
|
||||||
class ServerSettingsAPI(ModelAPI):
|
class ServerSettingsAPI(ModelAPI):
|
||||||
|
def get_settings(self):
|
||||||
|
rows = ServerSettings.query.all()
|
||||||
|
if len(rows) == 0:
|
||||||
|
settings = ServerSettings()
|
||||||
|
db.session.add(settings)
|
||||||
|
db.session.commit()
|
||||||
|
return settings
|
||||||
|
return rows[-1]
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.ADMIN,))
|
@auth_required(required_permissions=(Permissions.ADMIN,))
|
||||||
def get(self, auth_user=None):
|
def get(self, auth_user=None):
|
||||||
settings = get_settings()
|
settings = self.get_settings()
|
||||||
return super().get(settings)
|
return super().get(settings)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.ADMIN,))
|
@auth_required(required_permissions=(Permissions.ADMIN,))
|
||||||
def patch(self, auth_user=None):
|
def patch(self, auth_user=None):
|
||||||
settings = get_settings()
|
settings = self.get_settings()
|
||||||
return super().patch(settings)
|
return super().patch(settings)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.ADMIN,))
|
@auth_required(required_permissions=(Permissions.ADMIN,))
|
||||||
def put(self, auth_user=None):
|
def put(self, auth_user=None):
|
||||||
settings = get_settings()
|
settings = self.get_settings()
|
||||||
return super().put(settings)
|
return super().put(settings)
|
||||||
|
|
||||||
|
|
||||||
|
@ -10,27 +10,30 @@ files_blueprint = Blueprint("files_blueprint", __name__)
|
|||||||
|
|
||||||
|
|
||||||
class FilesMetadataAPI(ModelAPI):
|
class FilesMetadataAPI(ModelAPI):
|
||||||
@auth_required(required_permissions=(Permissions.READ,), allow_anonymous=True)
|
@auth_required(required_permissions=(Permissions.READ,))
|
||||||
def get(self, share_id, auth_user=None):
|
def get(self, share_id, auth_user=None):
|
||||||
share = Share.query.filter_by(share_id=share_id).first()
|
share = Share.query.filter_by(share_id=share_id).first()
|
||||||
return super().get(share)
|
return super().get(share)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.MODIFY,), allow_anonymous=True)
|
@auth_required(required_permissions=(Permissions.MODIFY,))
|
||||||
def patch(self, share_id, auth_user=None):
|
def patch(self, share_id, auth_user=None):
|
||||||
share = Share.query.filter_by(share_id=share_id).first()
|
share = Share.query.filter_by(share_id=share_id).first()
|
||||||
return super().patch(share)
|
return super().patch(share)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.MODIFY,), allow_anonymous=True)
|
@auth_required(required_permissions=(Permissions.MODIFY,))
|
||||||
def put(self, share_id, auth_user=None):
|
def put(self, share_id, auth_user=None):
|
||||||
share = Share.query.filter_by(share_id=share_id).first()
|
share = Share.query.filter_by(share_id=share_id).first()
|
||||||
return super().put(share)
|
return super().put(share)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.DELETE,), allow_anonymous=True)
|
@auth_required(required_permissions=(Permissions.DELETE,))
|
||||||
def delete(self, share_id, auth_user=None):
|
def delete(self, share_id, auth_user=None):
|
||||||
try:
|
try:
|
||||||
uuid.UUID(share_id)
|
uuid.UUID(share_id)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return jsonify(dict(status="fail", message=f"Invalid ID: '{share_id}'."))
|
return jsonify(dict(
|
||||||
|
status="fail",
|
||||||
|
message=f"Invalid ID: '{share_id}'."
|
||||||
|
))
|
||||||
share = Share.query.filter_by(share_id=share_id).first()
|
share = Share.query.filter_by(share_id=share_id).first()
|
||||||
return super().delete(share)
|
return super().delete(share)
|
||||||
|
|
||||||
@ -43,14 +46,13 @@ files_blueprint.add_url_rule(
|
|||||||
|
|
||||||
|
|
||||||
class FilesAPI(ModelListAPI):
|
class FilesAPI(ModelListAPI):
|
||||||
@auth_required(required_permissions=(Permissions.CREATE,), allow_anonymous=True)
|
@auth_required(required_permissions=(Permissions.CREATE,))
|
||||||
def post(self, auth_user=None):
|
def post(self, auth_user=None):
|
||||||
data = request.get_json()
|
data = request.get_json()
|
||||||
if auth_user:
|
data["owner_name"] = auth_user.username
|
||||||
data["owner_name"] = auth_user.username
|
|
||||||
return super().post(Share, data)
|
return super().post(Share, data)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.LIST,), allow_anonymous=True)
|
@auth_required(required_permissions=(Permissions.LIST,))
|
||||||
def get(self, auth_user=None):
|
def get(self, auth_user=None):
|
||||||
return super().get(Share)
|
return super().get(Share)
|
||||||
|
|
||||||
@ -63,7 +65,7 @@ files_blueprint.add_url_rule(
|
|||||||
|
|
||||||
|
|
||||||
class FileContentAPI(ModelAPI):
|
class FileContentAPI(ModelAPI):
|
||||||
@auth_required(required_permissions=(Permissions.CREATE,), allow_anonymous=True)
|
@auth_required(required_permissions=(Permissions.CREATE,))
|
||||||
def post(self, share_id, auth_user=None):
|
def post(self, share_id, auth_user=None):
|
||||||
share = Share.query.filter_by(share_id=uuid.UUID(share_id)).first()
|
share = Share.query.filter_by(share_id=uuid.UUID(share_id)).first()
|
||||||
|
|
||||||
@ -109,7 +111,7 @@ class FileContentAPI(ModelAPI):
|
|||||||
201,
|
201,
|
||||||
)
|
)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.MODIFY,), allow_anonymous=True)
|
@auth_required(required_permissions=(Permissions.MODIFY,))
|
||||||
def put(self, share_id, auth_user=None):
|
def put(self, share_id, auth_user=None):
|
||||||
share = Share.query.filter_by(share_id=share_id).first()
|
share = Share.query.filter_by(share_id=share_id).first()
|
||||||
if not share:
|
if not share:
|
||||||
@ -117,17 +119,6 @@ class FileContentAPI(ModelAPI):
|
|||||||
jsonify({"status": "fail", "message": "This share does not exist."})
|
jsonify({"status": "fail", "message": "This share does not exist."})
|
||||||
), 404
|
), 404
|
||||||
|
|
||||||
if auth_user != share.owner:
|
|
||||||
return (
|
|
||||||
jsonify(
|
|
||||||
{
|
|
||||||
"status": "fail",
|
|
||||||
"message": "Share must be modified by its owner.",
|
|
||||||
}
|
|
||||||
),
|
|
||||||
403,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not share.initialized:
|
if not share.initialized:
|
||||||
return (
|
return (
|
||||||
jsonify(
|
jsonify(
|
||||||
@ -151,7 +142,7 @@ class FileContentAPI(ModelAPI):
|
|||||||
200,
|
200,
|
||||||
)
|
)
|
||||||
|
|
||||||
@auth_required(required_permissions=(Permissions.READ,), allow_anonymous=True)
|
@auth_required(required_permissions=(Permissions.READ,))
|
||||||
def get(self, share_id, auth_user=None):
|
def get(self, share_id, auth_user=None):
|
||||||
share = Share.query.filter_by(share_id=share_id).first()
|
share = Share.query.filter_by(share_id=share_id).first()
|
||||||
if not share:
|
if not share:
|
||||||
|
@ -192,17 +192,6 @@ class ServerSettings(db.Model):
|
|||||||
return Schema()
|
return Schema()
|
||||||
|
|
||||||
|
|
||||||
def get_settings():
|
|
||||||
"""Return server settings, and create them if they don't exist."""
|
|
||||||
rows = ServerSettings.query.all()
|
|
||||||
if len(rows) == 0:
|
|
||||||
settings = ServerSettings()
|
|
||||||
db.session.add(settings)
|
|
||||||
db.session.commit()
|
|
||||||
return settings
|
|
||||||
return rows[-1]
|
|
||||||
|
|
||||||
|
|
||||||
class Share(db.Model):
|
class Share(db.Model):
|
||||||
"""Share for a single file.
|
"""Share for a single file.
|
||||||
|
|
||||||
@ -247,10 +236,9 @@ class Share(db.Model):
|
|||||||
|
|
||||||
file_name = db.Column(db.String, nullable=False)
|
file_name = db.Column(db.String, nullable=False)
|
||||||
|
|
||||||
def __init__(self, owner_name=None, file_name=None):
|
def __init__(self, owner_name, file_name=None):
|
||||||
self.owner = User.query.filter_by(username=owner_name).first()
|
self.owner = User.query.filter_by(username=owner_name).first()
|
||||||
if self.owner:
|
self.owner_name = self.owner.username
|
||||||
self.owner_name = self.owner.username
|
|
||||||
self.share_id = uuid.uuid4()
|
self.share_id = uuid.uuid4()
|
||||||
self.url = url_for("files_blueprint.files_metadata_api", share_id=self.share_id)
|
self.url = url_for("files_blueprint.files_metadata_api", share_id=self.share_id)
|
||||||
self.create_date = datetime.datetime.now()
|
self.create_date = datetime.datetime.now()
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
from flask import request, jsonify
|
from flask import request, jsonify
|
||||||
from flask.views import MethodView
|
from flask.views import MethodView
|
||||||
from sachet.server.models import Permissions, User, BlacklistToken, get_settings
|
from sachet.server.models import Permissions, User, BlacklistToken
|
||||||
from sachet.server import db
|
from sachet.server import db
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from marshmallow import ValidationError
|
from marshmallow import ValidationError
|
||||||
@ -8,21 +8,20 @@ from bitmask import Bitmask
|
|||||||
import jwt
|
import jwt
|
||||||
|
|
||||||
|
|
||||||
# https://stackoverflow.com/questions/3888158/making-decorators-with-optional-arguments
|
def auth_required(func=None, *, required_permissions=()):
|
||||||
def auth_required(func=None, *, required_permissions=(), allow_anonymous=False):
|
"""Decorator to require authentication.
|
||||||
"""Require specific authentication.
|
|
||||||
|
|
||||||
Passes an argument `user` to the function, with a User object corresponding
|
Passes an argument 'user' to the function, with a User object corresponding
|
||||||
to the authenticated session.
|
to the authenticated session.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
required_permissions : tuple of Permissions, optional
|
|
||||||
|
required_permissions : tuple of Permissions
|
||||||
Permissions required to access this endpoint.
|
Permissions required to access this endpoint.
|
||||||
allow_anonymous : bool, optional
|
|
||||||
Allow anonymous authentication. This means the `user` parameter might be None.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# see https://stackoverflow.com/questions/3888158/making-decorators-with-optional-arguments
|
||||||
def _decorate(f):
|
def _decorate(f):
|
||||||
@wraps(f)
|
@wraps(f)
|
||||||
def decorator(*args, **kwargs):
|
def decorator(*args, **kwargs):
|
||||||
@ -39,28 +38,7 @@ def auth_required(func=None, *, required_permissions=(), allow_anonymous=False):
|
|||||||
return jsonify(resp), 401
|
return jsonify(resp), 401
|
||||||
|
|
||||||
if not token:
|
if not token:
|
||||||
if allow_anonymous:
|
return jsonify({"status": "fail", "message": "Missing auth token"}), 401
|
||||||
server_settings = get_settings()
|
|
||||||
if (
|
|
||||||
Bitmask(AllFlags=Permissions, *required_permissions)
|
|
||||||
not in server_settings.default_permissions
|
|
||||||
):
|
|
||||||
return (
|
|
||||||
jsonify(
|
|
||||||
{
|
|
||||||
"status": "fail",
|
|
||||||
"message": "Missing permissions to access this page.",
|
|
||||||
}
|
|
||||||
),
|
|
||||||
401,
|
|
||||||
)
|
|
||||||
kwargs["auth_user"] = None
|
|
||||||
return f(*args, **kwargs)
|
|
||||||
else:
|
|
||||||
return (
|
|
||||||
jsonify({"status": "fail", "message": "Missing auth token"}),
|
|
||||||
401,
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data, user = User.read_token(token)
|
data, user = User.read_token(token)
|
||||||
@ -101,6 +79,7 @@ def auth_required(func=None, *, required_permissions=(), allow_anonymous=False):
|
|||||||
|
|
||||||
def patch(orig, diff):
|
def patch(orig, diff):
|
||||||
"""Patch the dictionary orig recursively with the dictionary diff."""
|
"""Patch the dictionary orig recursively with the dictionary diff."""
|
||||||
|
|
||||||
# if we get to a leaf node, just replace it
|
# if we get to a leaf node, just replace it
|
||||||
if not isinstance(orig, dict) or not isinstance(diff, dict):
|
if not isinstance(orig, dict) or not isinstance(diff, dict):
|
||||||
return diff
|
return diff
|
||||||
@ -266,22 +245,14 @@ class ModelListAPI(MethodView):
|
|||||||
Number of next page (if this is not the last).
|
Number of next page (if this is not the last).
|
||||||
"""
|
"""
|
||||||
json_data = request.get_json()
|
json_data = request.get_json()
|
||||||
try:
|
per_page = int(json_data.get("per_page", 15))
|
||||||
per_page = int(json_data.get("per_page", 15))
|
page = int(json_data.get("page", 1))
|
||||||
page = int(json_data.get("page", 1))
|
|
||||||
except ValueError as e:
|
|
||||||
return jsonify(dict(
|
|
||||||
status="fail",
|
|
||||||
message=str(e),
|
|
||||||
)), 400
|
|
||||||
|
|
||||||
page_data = ModelClass.query.paginate(page=page, per_page=per_page)
|
page_data = ModelClass.query.paginate(page=page, per_page=per_page)
|
||||||
data = [model.get_schema().dump(model) for model in page_data]
|
data = [model.get_schema().dump(model) for model in page_data]
|
||||||
|
|
||||||
return jsonify(
|
return jsonify(dict(
|
||||||
dict(
|
data=data,
|
||||||
data=data,
|
prev=page_data.prev_num,
|
||||||
prev=page_data.prev_num,
|
next=page_data.next_num,
|
||||||
next=page_data.next_num,
|
))
|
||||||
)
|
|
||||||
)
|
|
||||||
|
@ -75,10 +75,7 @@ def users(client):
|
|||||||
dave=dict(
|
dave=dict(
|
||||||
password="1234",
|
password="1234",
|
||||||
permissions=Bitmask(
|
permissions=Bitmask(
|
||||||
Permissions.CREATE,
|
Permissions.CREATE, Permissions.READ, Permissions.DELETE
|
||||||
Permissions.READ,
|
|
||||||
Permissions.DELETE,
|
|
||||||
Permissions.MODIFY,
|
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
# admins don't have the other permissions by default,
|
# admins don't have the other permissions by default,
|
||||||
|
@ -1,210 +0,0 @@
|
|||||||
import pytest
|
|
||||||
from io import BytesIO
|
|
||||||
from werkzeug.datastructures import FileStorage
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
"""Test anonymous authentication to endpoints."""
|
|
||||||
|
|
||||||
|
|
||||||
def test_files(client, auth, rand):
|
|
||||||
# set create perm for anon users
|
|
||||||
resp = client.patch(
|
|
||||||
"/admin/settings",
|
|
||||||
headers=auth("administrator"),
|
|
||||||
json={"default_permissions": ["CREATE"]},
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
|
|
||||||
# create share
|
|
||||||
resp = client.post("/files", json={"file_name": "content.bin"})
|
|
||||||
assert resp.status_code == 201
|
|
||||||
|
|
||||||
data = resp.get_json()
|
|
||||||
url = data.get("url")
|
|
||||||
|
|
||||||
assert url is not None
|
|
||||||
assert "/files/" in url
|
|
||||||
|
|
||||||
upload_data = rand.randbytes(4000)
|
|
||||||
|
|
||||||
# upload file to share
|
|
||||||
resp = client.post(
|
|
||||||
url + "/content",
|
|
||||||
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
|
|
||||||
content_type="multipart/form-data",
|
|
||||||
)
|
|
||||||
assert resp.status_code == 201
|
|
||||||
|
|
||||||
# set read perm for anon users
|
|
||||||
resp = client.patch(
|
|
||||||
"/admin/settings",
|
|
||||||
headers=auth("administrator"),
|
|
||||||
json={"default_permissions": ["READ"]},
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
|
|
||||||
# read file
|
|
||||||
resp = client.get(
|
|
||||||
url + "/content",
|
|
||||||
)
|
|
||||||
assert resp.data == upload_data
|
|
||||||
assert "filename=content.bin" in resp.headers["Content-Disposition"].split("; ")
|
|
||||||
|
|
||||||
# set modify perm for anon users
|
|
||||||
resp = client.patch(
|
|
||||||
"/admin/settings",
|
|
||||||
headers=auth("administrator"),
|
|
||||||
json={"default_permissions": ["MODIFY"]},
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
|
|
||||||
# modify share
|
|
||||||
upload_data = rand.randbytes(4000)
|
|
||||||
resp = client.put(
|
|
||||||
url + "/content",
|
|
||||||
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
|
|
||||||
content_type="multipart/form-data",
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
resp = client.patch(
|
|
||||||
url,
|
|
||||||
json={"file_name": "new_bin.bin"},
|
|
||||||
)
|
|
||||||
# set read perm for anon users
|
|
||||||
resp = client.patch(
|
|
||||||
"/admin/settings",
|
|
||||||
headers=auth("administrator"),
|
|
||||||
json={"default_permissions": ["READ"]},
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
resp = client.get(
|
|
||||||
url + "/content",
|
|
||||||
)
|
|
||||||
assert resp.data == upload_data
|
|
||||||
assert "filename=new_bin.bin" in resp.headers["Content-Disposition"].split("; ")
|
|
||||||
|
|
||||||
# set list perm for anon users
|
|
||||||
resp = client.patch(
|
|
||||||
"/admin/settings",
|
|
||||||
headers=auth("administrator"),
|
|
||||||
json={"default_permissions": ["LIST"]},
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
|
|
||||||
# test listing file
|
|
||||||
resp = client.get(
|
|
||||||
"/files",
|
|
||||||
json={},
|
|
||||||
)
|
|
||||||
data = resp.get_json()
|
|
||||||
assert resp.status_code == 200
|
|
||||||
listed = data.get("data")
|
|
||||||
assert len(listed) == 1
|
|
||||||
assert listed[0].get("initialized") is True
|
|
||||||
|
|
||||||
# set delete perm for anon users
|
|
||||||
resp = client.patch(
|
|
||||||
"/admin/settings",
|
|
||||||
headers=auth("administrator"),
|
|
||||||
json={"default_permissions": ["DELETE"]},
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
|
|
||||||
# test deletion
|
|
||||||
resp = client.delete(
|
|
||||||
url,
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
|
|
||||||
# set delete perm for anon users
|
|
||||||
resp = client.patch(
|
|
||||||
"/admin/settings",
|
|
||||||
headers=auth("administrator"),
|
|
||||||
json={"default_permissions": ["READ"]},
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
# file shouldn't exist anymore
|
|
||||||
resp = client.get(
|
|
||||||
url + "/content",
|
|
||||||
)
|
|
||||||
assert resp.status_code == 404
|
|
||||||
|
|
||||||
|
|
||||||
def test_files_invalid(client, auth, rand):
|
|
||||||
# set create perm for anon users
|
|
||||||
resp = client.patch(
|
|
||||||
"/admin/settings",
|
|
||||||
headers=auth("administrator"),
|
|
||||||
json={"default_permissions": ["CREATE"]},
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
|
|
||||||
# create an uninitialized share
|
|
||||||
resp = client.post("/files", json={"file_name": "content.bin"})
|
|
||||||
assert resp.status_code == 201
|
|
||||||
data = resp.get_json()
|
|
||||||
uninit_url = data.get("url")
|
|
||||||
|
|
||||||
# upload a share
|
|
||||||
resp = client.post("/files", json={"file_name": "content.bin"})
|
|
||||||
assert resp.status_code == 201
|
|
||||||
data = resp.get_json()
|
|
||||||
url = data.get("url")
|
|
||||||
upload_data = rand.randbytes(4000)
|
|
||||||
resp = client.post(
|
|
||||||
url + "/content",
|
|
||||||
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
|
|
||||||
content_type="multipart/form-data",
|
|
||||||
)
|
|
||||||
assert resp.status_code == 201
|
|
||||||
|
|
||||||
# disable all permissions
|
|
||||||
resp = client.patch(
|
|
||||||
"/admin/settings",
|
|
||||||
headers=auth("administrator"),
|
|
||||||
json={"default_permissions": []},
|
|
||||||
)
|
|
||||||
assert resp.status_code == 200
|
|
||||||
|
|
||||||
# test initializing a share without perms
|
|
||||||
resp = client.post(
|
|
||||||
uninit_url + "/content",
|
|
||||||
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
|
|
||||||
content_type="multipart/form-data",
|
|
||||||
)
|
|
||||||
assert resp.status_code == 401
|
|
||||||
# test reading a share without perms
|
|
||||||
resp = client.get(url + "/content")
|
|
||||||
# test modifying an uninitialized share without perms
|
|
||||||
resp = client.put(
|
|
||||||
uninit_url + "/content",
|
|
||||||
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
|
|
||||||
content_type="multipart/form-data",
|
|
||||||
)
|
|
||||||
assert resp.status_code == 401
|
|
||||||
assert resp.status_code == 401
|
|
||||||
# test modifying a share without perms
|
|
||||||
resp = client.put(
|
|
||||||
url + "/content",
|
|
||||||
data={"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")},
|
|
||||||
content_type="multipart/form-data",
|
|
||||||
)
|
|
||||||
assert resp.status_code == 401
|
|
||||||
|
|
||||||
# test deleting a share without perms
|
|
||||||
resp = client.delete(url)
|
|
||||||
assert resp.status_code == 401
|
|
||||||
# test modifying share metadata without perms
|
|
||||||
resp = client.patch(url)
|
|
||||||
resp = client.put(url)
|
|
||||||
assert resp.status_code == 401
|
|
||||||
# test reading share metadata without perms
|
|
||||||
resp = client.get(url)
|
|
||||||
assert resp.status_code == 401
|
|
||||||
|
|
||||||
# test listing shares without perms
|
|
||||||
resp = client.get("/files")
|
|
||||||
assert resp.status_code == 401
|
|
||||||
# test creating share without perms
|
|
||||||
resp = client.post("/files")
|
|
||||||
assert resp.status_code == 401
|
|
@ -91,7 +91,9 @@ class TestSuite:
|
|||||||
resp = client.put(
|
resp = client.put(
|
||||||
url + "/content",
|
url + "/content",
|
||||||
headers=auth("jeff"),
|
headers=auth("jeff"),
|
||||||
data={"upload": FileStorage(stream=BytesIO(new_data), filename="upload")},
|
data={
|
||||||
|
"upload": FileStorage(stream=BytesIO(new_data), filename="upload")
|
||||||
|
},
|
||||||
)
|
)
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
||||||
@ -103,6 +105,7 @@ class TestSuite:
|
|||||||
assert resp.data == new_data
|
assert resp.data == new_data
|
||||||
assert "filename=new_bin.bin" in resp.headers["Content-Disposition"].split("; ")
|
assert "filename=new_bin.bin" in resp.headers["Content-Disposition"].split("; ")
|
||||||
|
|
||||||
|
|
||||||
def test_invalid(self, client, users, auth, rand):
|
def test_invalid(self, client, users, auth, rand):
|
||||||
"""Test invalid requests."""
|
"""Test invalid requests."""
|
||||||
|
|
||||||
@ -182,18 +185,6 @@ class TestSuite:
|
|||||||
)
|
)
|
||||||
assert resp.status_code == 201
|
assert resp.status_code == 201
|
||||||
|
|
||||||
# test other user being unable to modify this share
|
|
||||||
resp = client.put(
|
|
||||||
url + "/content",
|
|
||||||
headers=auth("dave"),
|
|
||||||
data={
|
|
||||||
"upload": FileStorage(stream=BytesIO(upload_data), filename="upload")
|
|
||||||
},
|
|
||||||
content_type="multipart/form-data",
|
|
||||||
)
|
|
||||||
assert resp.status_code == 403
|
|
||||||
|
|
||||||
|
|
||||||
# test not allowing re-upload
|
# test not allowing re-upload
|
||||||
resp = client.post(
|
resp = client.post(
|
||||||
url + "/content",
|
url + "/content",
|
||||||
|
@ -45,7 +45,10 @@ def test_files(client, users, auth):
|
|||||||
per_page = 9
|
per_page = 9
|
||||||
while page is not None:
|
while page is not None:
|
||||||
resp = client.get(
|
resp = client.get(
|
||||||
"/files", headers=auth("jeff"), json=dict(page=page, per_page=per_page)
|
"/files", headers=auth("jeff"), json=dict(
|
||||||
|
page=page,
|
||||||
|
per_page=per_page
|
||||||
|
)
|
||||||
)
|
)
|
||||||
assert resp.status_code == 200
|
assert resp.status_code == 200
|
||||||
|
|
||||||
@ -75,12 +78,3 @@ def test_files(client, users, auth):
|
|||||||
|
|
||||||
end_page = paginate(forwards=True)
|
end_page = paginate(forwards=True)
|
||||||
paginate(forwards=False, page=end_page)
|
paginate(forwards=False, page=end_page)
|
||||||
|
|
||||||
|
|
||||||
def test_invalid(client, auth):
|
|
||||||
"""Test invalid requests to pagination."""
|
|
||||||
|
|
||||||
resp = client.get(
|
|
||||||
"/files", headers=auth("jeff"), json=dict(page="one", per_page="two")
|
|
||||||
)
|
|
||||||
assert resp.status_code == 400
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user