274 lines
6.9 KiB
Python
274 lines
6.9 KiB
Python
import pytest
|
|
import uuid
|
|
from math import ceil
|
|
from sachet.server.users import manage
|
|
from click.testing import CliRunner
|
|
from sachet.server import app, db, storage
|
|
from sachet.server.models import Permissions, User
|
|
from werkzeug.datastructures import FileStorage
|
|
from io import BytesIO
|
|
from bitmask import Bitmask
|
|
from pathlib import Path
|
|
import random
|
|
|
|
|
|
@pytest.fixture
|
|
def rand():
|
|
"""Deterministic random data generator.
|
|
|
|
Be sure to seed 0 with each test!
|
|
"""
|
|
r = random.Random()
|
|
r.seed(0)
|
|
return r
|
|
|
|
|
|
def clear_filesystem():
|
|
if app.config["SACHET_STORAGE"] == "filesystem":
|
|
for file in storage._files_directory.iterdir():
|
|
if file.is_relative_to(Path(app.instance_path)) and file.is_file():
|
|
file.unlink()
|
|
else:
|
|
raise OSError(f"Attempted to delete {file}: please delete it yourself.")
|
|
|
|
|
|
@pytest.fixture
|
|
def client(config={}):
|
|
"""Flask application with DB already set up and ready."""
|
|
with app.test_client() as client:
|
|
with app.app_context():
|
|
for k, v in config.items():
|
|
app.config[k] = v
|
|
|
|
db.drop_all()
|
|
db.create_all()
|
|
db.session.commit()
|
|
clear_filesystem()
|
|
yield client
|
|
clear_filesystem()
|
|
db.session.remove()
|
|
db.drop_all()
|
|
|
|
|
|
@pytest.fixture
|
|
def flask_app_bare():
|
|
"""Flask application with empty DB."""
|
|
with app.test_client() as client:
|
|
with app.app_context():
|
|
yield client
|
|
db.drop_all()
|
|
|
|
|
|
@pytest.fixture
|
|
def users(client):
|
|
"""Create all the test users.
|
|
|
|
Returns a dictionary with all the info for each user.
|
|
"""
|
|
userinfo = dict(
|
|
jeff=dict(
|
|
password="1234",
|
|
permissions=Bitmask(
|
|
Permissions.CREATE,
|
|
Permissions.READ,
|
|
Permissions.DELETE,
|
|
Permissions.MODIFY,
|
|
Permissions.LIST,
|
|
Permissions.LOCK,
|
|
),
|
|
),
|
|
dave=dict(
|
|
password="1234",
|
|
permissions=Bitmask(
|
|
Permissions.CREATE,
|
|
Permissions.READ,
|
|
Permissions.DELETE,
|
|
Permissions.MODIFY,
|
|
),
|
|
),
|
|
# admins don't have the other permissions by default,
|
|
# but admins can add perms to themselves
|
|
no_create_user=dict(
|
|
password="password",
|
|
permissions=Bitmask(
|
|
Permissions.READ,
|
|
Permissions.DELETE,
|
|
Permissions.ADMIN,
|
|
Permissions.MODIFY,
|
|
),
|
|
),
|
|
no_read_user=dict(
|
|
password="password",
|
|
permissions=Bitmask(
|
|
Permissions.CREATE,
|
|
Permissions.DELETE,
|
|
Permissions.ADMIN,
|
|
Permissions.MODIFY,
|
|
),
|
|
),
|
|
no_modify_user=dict(
|
|
password="password",
|
|
permissions=Bitmask(
|
|
Permissions.CREATE,
|
|
Permissions.DELETE,
|
|
Permissions.ADMIN,
|
|
Permissions.READ,
|
|
),
|
|
),
|
|
no_lock_user=dict(
|
|
password="password",
|
|
permissions=Bitmask(
|
|
Permissions.CREATE,
|
|
Permissions.MODIFY,
|
|
Permissions.DELETE,
|
|
Permissions.ADMIN,
|
|
Permissions.READ,
|
|
),
|
|
),
|
|
no_admin_user=dict(
|
|
password="password",
|
|
permissions=Bitmask(
|
|
Permissions.CREATE,
|
|
Permissions.MODIFY,
|
|
Permissions.DELETE,
|
|
Permissions.LOCK,
|
|
Permissions.READ,
|
|
),
|
|
),
|
|
administrator=dict(password="4321", permissions=Bitmask(Permissions.ADMIN)),
|
|
)
|
|
|
|
for user, info in userinfo.items():
|
|
info["username"] = user
|
|
manage.create_user(info["permissions"], info["username"], info["password"])
|
|
|
|
return userinfo
|
|
|
|
|
|
@pytest.fixture
|
|
def validate_info(users):
|
|
"""Given a response, deserialize and validate the information against a given user's info."""
|
|
|
|
verify_fields = [
|
|
"username",
|
|
"permissions",
|
|
]
|
|
|
|
def _validate(user, info):
|
|
schema = User.get_schema(User)
|
|
|
|
dumped = schema.dump(users[user])
|
|
|
|
for k in verify_fields:
|
|
assert dumped[k] == info[k]
|
|
|
|
return _validate
|
|
|
|
|
|
@pytest.fixture
|
|
def tokens(client, users):
|
|
"""Logs in all test users.
|
|
|
|
Returns a dictionary of auth tokens for all test users.
|
|
"""
|
|
|
|
toks = {}
|
|
|
|
for user, creds in users.items():
|
|
resp = client.post(
|
|
"/users/login",
|
|
json={"username": creds["username"], "password": creds["password"]},
|
|
)
|
|
resp_json = resp.get_json()
|
|
token = resp_json.get("auth_token")
|
|
assert token is not None and token != ""
|
|
toks[creds["username"]] = token
|
|
|
|
return toks
|
|
|
|
|
|
@pytest.fixture
|
|
def cli():
|
|
"""click's testing fixture"""
|
|
return CliRunner()
|
|
|
|
|
|
@pytest.fixture
|
|
def auth(tokens):
|
|
"""Generate auth headers.
|
|
|
|
Parameters
|
|
----------
|
|
username : str
|
|
Username to authenticate as.
|
|
data : dict
|
|
Extra headers to add.
|
|
|
|
Returns
|
|
-------
|
|
dict
|
|
Dictionary of all headers.
|
|
"""
|
|
|
|
def auth_headers(username, data={}):
|
|
ret = {"Authorization": f"bearer {tokens[username]}"}
|
|
ret.update(data)
|
|
return ret
|
|
|
|
return auth_headers
|
|
|
|
|
|
@pytest.fixture
|
|
def upload(client):
|
|
"""Perform chunked upload of some data.
|
|
|
|
Parameters
|
|
----------
|
|
url : str
|
|
URL to upload to.
|
|
data : BytesIO
|
|
Stream of data to upload.
|
|
You can use BytesIO(data) to convert raw bytes to a stream.
|
|
headers : dict, optional
|
|
Headers to upload with.
|
|
chunk_size : int, optional
|
|
Size of chunks in bytes.
|
|
method : function
|
|
Method like client.post or client.put to use.
|
|
"""
|
|
|
|
def upload(url, data, headers={}, chunk_size=int(2e6), method=client.post):
|
|
data_size = len(data.getbuffer())
|
|
|
|
buf = data.getbuffer()
|
|
|
|
upload_uuid = uuid.uuid4()
|
|
|
|
total_chunks = int(ceil(data_size / chunk_size))
|
|
|
|
resp = None
|
|
|
|
for chunk_idx in range(total_chunks):
|
|
start = chunk_size * chunk_idx
|
|
end = min(chunk_size * (chunk_idx + 1), data_size)
|
|
|
|
resp = method(
|
|
url,
|
|
headers=headers,
|
|
data={
|
|
"upload": FileStorage(
|
|
stream=BytesIO(buf[start:end]), filename="upload"
|
|
),
|
|
"dzuuid": str(upload_uuid),
|
|
"dzchunkindex": chunk_idx,
|
|
"dztotalchunks": total_chunks,
|
|
},
|
|
content_type="multipart/form-data",
|
|
)
|
|
if not resp.status_code == 200 or resp.status_code == 201:
|
|
break
|
|
|
|
return resp
|
|
|
|
return upload
|