sachet-server/tests/conftest.py

274 lines
6.9 KiB
Python

import pytest
import uuid
from math import ceil
from sachet.server.users import manage
from click.testing import CliRunner
from sachet.server import app, db, storage
from sachet.server.models import Permissions, User
from werkzeug.datastructures import FileStorage
from io import BytesIO
from bitmask import Bitmask
from pathlib import Path
import random
@pytest.fixture
def rand():
"""Deterministic random data generator.
Be sure to seed 0 with each test!
"""
r = random.Random()
r.seed(0)
return r
def clear_filesystem():
if app.config["SACHET_STORAGE"] == "filesystem":
for file in storage._files_directory.iterdir():
if file.is_relative_to(Path(app.instance_path)) and file.is_file():
file.unlink()
else:
raise OSError(f"Attempted to delete {file}: please delete it yourself.")
@pytest.fixture
def client(config={}):
"""Flask application with DB already set up and ready."""
with app.test_client() as client:
with app.app_context():
for k, v in config.items():
app.config[k] = v
db.drop_all()
db.create_all()
db.session.commit()
clear_filesystem()
yield client
clear_filesystem()
db.session.remove()
db.drop_all()
@pytest.fixture
def flask_app_bare():
"""Flask application with empty DB."""
with app.test_client() as client:
with app.app_context():
yield client
db.drop_all()
@pytest.fixture
def users(client):
"""Create all the test users.
Returns a dictionary with all the info for each user.
"""
userinfo = dict(
jeff=dict(
password="1234",
permissions=Bitmask(
Permissions.CREATE,
Permissions.READ,
Permissions.DELETE,
Permissions.MODIFY,
Permissions.LIST,
Permissions.LOCK,
),
),
dave=dict(
password="1234",
permissions=Bitmask(
Permissions.CREATE,
Permissions.READ,
Permissions.DELETE,
Permissions.MODIFY,
),
),
# admins don't have the other permissions by default,
# but admins can add perms to themselves
no_create_user=dict(
password="password",
permissions=Bitmask(
Permissions.READ,
Permissions.DELETE,
Permissions.ADMIN,
Permissions.MODIFY,
),
),
no_read_user=dict(
password="password",
permissions=Bitmask(
Permissions.CREATE,
Permissions.DELETE,
Permissions.ADMIN,
Permissions.MODIFY,
),
),
no_modify_user=dict(
password="password",
permissions=Bitmask(
Permissions.CREATE,
Permissions.DELETE,
Permissions.ADMIN,
Permissions.READ,
),
),
no_lock_user=dict(
password="password",
permissions=Bitmask(
Permissions.CREATE,
Permissions.MODIFY,
Permissions.DELETE,
Permissions.ADMIN,
Permissions.READ,
),
),
no_admin_user=dict(
password="password",
permissions=Bitmask(
Permissions.CREATE,
Permissions.MODIFY,
Permissions.DELETE,
Permissions.LOCK,
Permissions.READ,
),
),
administrator=dict(password="4321", permissions=Bitmask(Permissions.ADMIN)),
)
for user, info in userinfo.items():
info["username"] = user
manage.create_user(info["permissions"], info["username"], info["password"])
return userinfo
@pytest.fixture
def validate_info(users):
"""Given a response, deserialize and validate the information against a given user's info."""
verify_fields = [
"username",
"permissions",
]
def _validate(user, info):
schema = User.get_schema(User)
dumped = schema.dump(users[user])
for k in verify_fields:
assert dumped[k] == info[k]
return _validate
@pytest.fixture
def tokens(client, users):
"""Logs in all test users.
Returns a dictionary of auth tokens for all test users.
"""
toks = {}
for user, creds in users.items():
resp = client.post(
"/users/login",
json={"username": creds["username"], "password": creds["password"]},
)
resp_json = resp.get_json()
token = resp_json.get("auth_token")
assert token is not None and token != ""
toks[creds["username"]] = token
return toks
@pytest.fixture
def cli():
"""click's testing fixture"""
return CliRunner()
@pytest.fixture
def auth(tokens):
"""Generate auth headers.
Parameters
----------
username : str
Username to authenticate as.
data : dict
Extra headers to add.
Returns
-------
dict
Dictionary of all headers.
"""
def auth_headers(username, data={}):
ret = {"Authorization": f"bearer {tokens[username]}"}
ret.update(data)
return ret
return auth_headers
@pytest.fixture
def upload(client):
"""Perform chunked upload of some data.
Parameters
----------
url : str
URL to upload to.
data : BytesIO
Stream of data to upload.
You can use BytesIO(data) to convert raw bytes to a stream.
headers : dict, optional
Headers to upload with.
chunk_size : int, optional
Size of chunks in bytes.
method : function
Method like client.post or client.put to use.
"""
def upload(url, data, headers={}, chunk_size=int(2e6), method=client.post):
data_size = len(data.getbuffer())
buf = data.getbuffer()
upload_uuid = uuid.uuid4()
total_chunks = int(ceil(data_size / chunk_size))
resp = None
for chunk_idx in range(total_chunks):
start = chunk_size * chunk_idx
end = min(chunk_size * (chunk_idx + 1), data_size)
resp = method(
url,
headers=headers,
data={
"upload": FileStorage(
stream=BytesIO(buf[start:end]), filename="upload"
),
"dzuuid": str(upload_uuid),
"dzchunkindex": chunk_idx,
"dztotalchunks": total_chunks,
},
content_type="multipart/form-data",
)
if not resp.status_code == 200 or resp.status_code == 201:
break
return resp
return upload