Compare commits

..

3 Commits

Author SHA1 Message Date
29f854f3a5
deleted migrations
i can't figure out how to fix this db mess so
2023-07-15 22:37:36 -04:00
996de46127
/users/<user>: fix inability to change password via PATCH 2023-07-15 22:26:22 -04:00
2ce47e55c0
test/test_userinfo.py: add missing password change test code
how did i not see this
2023-07-15 20:25:08 -04:00
7 changed files with 163 additions and 153 deletions

View File

@ -1,68 +0,0 @@
"""empty message
Revision ID: 4cd7cdbc2d1f
Revises:
Create Date: 2023-04-30 17:42:00.329050
"""
from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
# revision identifiers, used by Alembic.
revision = "4cd7cdbc2d1f"
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"blacklist_tokens",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("token", sa.String(length=500), nullable=False),
sa.Column("expires", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("token"),
)
op.create_table(
"server_settings",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("default_permissions_number", sa.BigInteger(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"users",
sa.Column("username", sa.String(length=255), nullable=False),
sa.Column("password", sa.String(length=255), nullable=False),
sa.Column("register_date", sa.DateTime(), nullable=False),
sa.Column("permissions_number", sa.BigInteger(), nullable=False),
sa.PrimaryKeyConstraint("username"),
sa.UniqueConstraint("username"),
)
op.create_table(
"shares",
sa.Column("share_id", sqlalchemy_utils.types.uuid.UUIDType(), nullable=False),
sa.Column("owner_name", sa.String(), nullable=True),
sa.Column("initialized", sa.Boolean(), nullable=False),
sa.Column("locked", sa.Boolean(), nullable=False),
sa.Column("create_date", sa.DateTime(), nullable=False),
sa.Column("file_name", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["owner_name"],
["users.username"],
),
sa.PrimaryKeyConstraint("share_id"),
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("shares")
op.drop_table("users")
op.drop_table("server_settings")
op.drop_table("blacklist_tokens")
# ### end Alembic commands ###

View File

@ -1,72 +0,0 @@
"""empty message
Revision ID: 70ab3c81827a
Revises: 4cd7cdbc2d1f
Create Date: 2023-05-07 21:52:08.250195
"""
from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
# revision identifiers, used by Alembic.
revision = "70ab3c81827a"
down_revision = "4cd7cdbc2d1f"
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"uploads",
sa.Column("upload_id", sa.String(), nullable=False),
sa.Column("share_id", sqlalchemy_utils.types.uuid.UUIDType(), nullable=True),
sa.Column("create_date", sa.DateTime(), nullable=False),
sa.Column("total_chunks", sa.Integer(), nullable=False),
sa.Column("recv_chunks", sa.Integer(), nullable=False),
sa.Column("completed", sa.Boolean(), nullable=False),
sa.ForeignKeyConstraint(
["share_id"],
["shares.share_id"],
),
sa.PrimaryKeyConstraint("upload_id"),
)
op.create_table(
"chunks",
sa.Column("chunk_id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("create_date", sa.DateTime(), nullable=False),
sa.Column("index", sa.Integer(), nullable=False),
sa.Column("upload_id", sa.String(), nullable=True),
sa.Column("filename", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["upload_id"],
["uploads.upload_id"],
),
sa.PrimaryKeyConstraint("chunk_id"),
)
with op.batch_alter_table("shares", schema=None) as batch_op:
batch_op.alter_column(
"share_id",
existing_type=sa.NUMERIC(precision=16),
type_=sqlalchemy_utils.types.uuid.UUIDType(),
existing_nullable=False,
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("shares", schema=None) as batch_op:
batch_op.alter_column(
"share_id",
existing_type=sqlalchemy_utils.types.uuid.UUIDType(),
type_=sa.NUMERIC(precision=16),
existing_nullable=False,
)
op.drop_table("chunks")
op.drop_table("uploads")
# ### end Alembic commands ###

View File

@ -0,0 +1,82 @@
"""empty message
Revision ID: e8d2a7570f70
Revises:
Create Date: 2023-07-15 22:34:31.075577
"""
from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
# revision identifiers, used by Alembic.
revision = 'e8d2a7570f70'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('blacklist_tokens',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=500), nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_blacklist_tokens')),
sa.UniqueConstraint('token', name=op.f('uq_blacklist_tokens_token'))
)
op.create_table('server_settings',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('default_permissions_number', sa.BigInteger(), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_server_settings'))
)
op.create_table('users',
sa.Column('username', sa.String(length=255), nullable=False),
sa.Column('password_hash', sa.String(length=255), nullable=False),
sa.Column('register_date', sa.DateTime(), nullable=False),
sa.Column('permissions_number', sa.BigInteger(), nullable=False),
sa.PrimaryKeyConstraint('username', name=op.f('pk_users')),
sa.UniqueConstraint('username', name=op.f('uq_users_username'))
)
op.create_table('shares',
sa.Column('share_id', sqlalchemy_utils.types.uuid.UUIDType(), nullable=False),
sa.Column('owner_name', sa.String(), nullable=True),
sa.Column('initialized', sa.Boolean(), nullable=False),
sa.Column('locked', sa.Boolean(), nullable=False),
sa.Column('create_date', sa.DateTime(), nullable=False),
sa.Column('file_name', sa.String(), nullable=False),
sa.ForeignKeyConstraint(['owner_name'], ['users.username'], name=op.f('fk_shares_owner_name_users')),
sa.PrimaryKeyConstraint('share_id', name=op.f('pk_shares'))
)
op.create_table('uploads',
sa.Column('upload_id', sa.String(), nullable=False),
sa.Column('share_id', sqlalchemy_utils.types.uuid.UUIDType(), nullable=True),
sa.Column('create_date', sa.DateTime(), nullable=False),
sa.Column('total_chunks', sa.Integer(), nullable=False),
sa.Column('recv_chunks', sa.Integer(), nullable=False),
sa.Column('completed', sa.Boolean(), nullable=False),
sa.ForeignKeyConstraint(['share_id'], ['shares.share_id'], name=op.f('fk_uploads_share_id_shares')),
sa.PrimaryKeyConstraint('upload_id', name=op.f('pk_uploads'))
)
op.create_table('chunks',
sa.Column('chunk_id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('create_date', sa.DateTime(), nullable=False),
sa.Column('index', sa.Integer(), nullable=False),
sa.Column('upload_id', sa.String(), nullable=True),
sa.Column('filename', sa.String(), nullable=False),
sa.ForeignKeyConstraint(['upload_id'], ['uploads.upload_id'], name=op.f('fk_chunks_upload_id_uploads'), ondelete='CASCADE'),
sa.PrimaryKeyConstraint('chunk_id', name=op.f('pk_chunks'))
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('chunks')
op.drop_table('uploads')
op.drop_table('shares')
op.drop_table('users')
op.drop_table('server_settings')
op.drop_table('blacklist_tokens')
# ### end Alembic commands ###

View File

@ -6,7 +6,7 @@ from flask_marshmallow import Marshmallow
from flask_bcrypt import Bcrypt
from flask_migrate import Migrate
from .config import DevelopmentConfig, ProductionConfig, TestingConfig, overlay_config
from sqlalchemy import event
from sqlalchemy import event, MetaData
from sqlalchemy.engine import Engine
from sqlite3 import Connection as SQLite3Connection
@ -25,8 +25,18 @@ with app.app_context():
overlay_config(ProductionConfig)
bcrypt = Bcrypt(app)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
# https://stackoverflow.com/questions/62640576/
convention = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
}
metadata = MetaData(naming_convention=convention)
db = SQLAlchemy(app, metadata=metadata)
migrate = Migrate(app, db, render_as_batch=True)
ma = Marshmallow()
_storage_method = app.config["SACHET_STORAGE"]

View File

@ -73,11 +73,44 @@ class PermissionProperty:
db.session.commit()
class PasswordProperty:
"""Property to hash plaintext to a password.
The hash field will have the same name as this property, suffixed with "_hash".
For example::
class User(db.Model):
password = db.Column(db.String(255), nullable=False)
password = PasswordProperty()
Reading will return the hash, while writing hashes plaintext.
"""
def __set_name__(self, owner, name):
self.name = name
def __get__(self, obj, objtype=None):
return getattr(obj, self.name + "_hash")
def __set__(self, obj, value):
hash = User.gen_hash(value)
setattr(obj, self.name + "_hash", hash.decode())
class User(db.Model):
__tablename__ = "users"
username = db.Column(db.String(255), unique=True, nullable=False, primary_key=True)
password = db.Column(db.String(255), nullable=False)
password_hash = db.Column(db.String(255), nullable=False)
password = PasswordProperty()
@staticmethod
def gen_hash(value):
return bcrypt.generate_password_hash(
value, current_app.config.get("BCRYPT_LOG_ROUNDS")
)
register_date = db.Column(db.DateTime, nullable=False)
permissions_number = db.Column(db.BigInteger, nullable=False, default=0)
@ -87,7 +120,7 @@ class User(db.Model):
permissions.AllFlags = Permissions
self.permissions = permissions
self.password = self.gen_hash(password)
self.password = password
self.username = username
self.register_date = datetime.datetime.now()
@ -96,12 +129,6 @@ class User(db.Model):
"""URL linking to this resource."""
return url_for("users_blueprint.user_api", username=self.username)
def gen_hash(self, psswd):
"""Generates a hash from a password."""
return bcrypt.generate_password_hash(
psswd, current_app.config.get("BCRYPT_LOG_ROUNDS")
).decode()
def encode_token(self, jti=None):
"""Generates an authentication token"""
payload = {
@ -114,6 +141,7 @@ class User(db.Model):
payload, current_app.config.get("SECRET_KEY"), algorithm="HS256"
)
@staticmethod
def read_token(token):
"""Read a JWT and validate it.
@ -143,7 +171,7 @@ class User(db.Model):
username = ma.auto_field()
register_date = ma.auto_field(dump_only=True)
password = ma.auto_field(load_only=True, required=False)
password = fields.Str(load_only=True, required=False)
permissions = PermissionField()
return Schema()

View File

@ -120,7 +120,7 @@ class PasswordAPI(MethodView):
403,
)
else:
auth_user.password = auth_user.gen_hash(new_psswd)
auth_user.password = new_psswd
db.session.commit()
return jsonify(
{

View File

@ -70,6 +70,25 @@ def test_patch(client, users, auth, validate_info):
assert resp.status_code == 200
validate_info("jeff", resp.get_json())
# test password change through patch
resp = client.patch(
"/users/jeff",
json=dict(password="123"),
headers=auth("administrator"),
)
assert resp.status_code == 200
# sign in with new token
resp = client.post("/users/login", json=dict(username="jeff", password="123"))
assert resp.status_code == 200
data = resp.get_json()
new_token = data.get("auth_token")
assert new_token
# test that we're logged in
resp = client.get("/users/jeff", headers=dict(Authorization=f"bearer {new_token}"))
assert resp.status_code == 200
def test_put(client, users, auth, validate_info):
"""Test replacing user information as an administrator."""
@ -102,3 +121,14 @@ def test_put(client, users, auth, validate_info):
resp = client.get("/users/jeff", headers=auth("jeff"))
assert resp.status_code == 200
validate_info("jeff", resp.get_json())
# sign in with new token
resp = client.post("/users/login", json=dict(username="jeff", password="123"))
assert resp.status_code == 200
data = resp.get_json()
new_token = data.get("auth_token")
assert new_token
# test that we're logged in
resp = client.get("/users/jeff", headers=dict(Authorization=f"bearer {new_token}"))
assert resp.status_code == 200