added migrations

This commit is contained in:
dogeystamp 2023-04-30 17:42:45 -04:00
parent e96e3a376b
commit 32618dec69
Signed by: dogeystamp
GPG Key ID: 7225FE3592EFFA38
8 changed files with 250 additions and 39 deletions

1
migrations/README Normal file
View File

@ -0,0 +1 @@
Single-database configuration for Flask.

50
migrations/alembic.ini Normal file
View File

@ -0,0 +1,50 @@
# A generic, single database configuration.
[alembic]
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic,flask_migrate
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[logger_flask_migrate]
level = INFO
handlers =
qualname = flask_migrate
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

110
migrations/env.py Normal file
View File

@ -0,0 +1,110 @@
import logging
from logging.config import fileConfig
from flask import current_app
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
logger = logging.getLogger('alembic.env')
def get_engine():
try:
# this works with Flask-SQLAlchemy<3 and Alchemical
return current_app.extensions['migrate'].db.get_engine()
except TypeError:
# this works with Flask-SQLAlchemy>=3
return current_app.extensions['migrate'].db.engine
def get_engine_url():
try:
return get_engine().url.render_as_string(hide_password=False).replace(
'%', '%%')
except AttributeError:
return str(get_engine().url).replace('%', '%%')
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option('sqlalchemy.url', get_engine_url())
target_db = current_app.extensions['migrate'].db
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def get_metadata():
if hasattr(target_db, 'metadatas'):
return target_db.metadatas[None]
return target_db.metadata
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=get_metadata(), literal_binds=True
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# this callback is used to prevent an auto-migration from being generated
# when there are no changes to the schema
# reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
def process_revision_directives(context, revision, directives):
if getattr(config.cmd_opts, 'autogenerate', False):
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
logger.info('No changes in schema detected.')
connectable = get_engine()
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=get_metadata(),
process_revision_directives=process_revision_directives,
**current_app.extensions['migrate'].configure_args
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

25
migrations/script.py.mako Normal file
View File

@ -0,0 +1,25 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,61 @@
"""empty message
Revision ID: 4cd7cdbc2d1f
Revises:
Create Date: 2023-04-30 17:42:00.329050
"""
from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
# revision identifiers, used by Alembic.
revision = '4cd7cdbc2d1f'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('blacklist_tokens',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('token', sa.String(length=500), nullable=False),
sa.Column('expires', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('token')
)
op.create_table('server_settings',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('default_permissions_number', sa.BigInteger(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('users',
sa.Column('username', sa.String(length=255), nullable=False),
sa.Column('password', sa.String(length=255), nullable=False),
sa.Column('register_date', sa.DateTime(), nullable=False),
sa.Column('permissions_number', sa.BigInteger(), nullable=False),
sa.PrimaryKeyConstraint('username'),
sa.UniqueConstraint('username')
)
op.create_table('shares',
sa.Column('share_id', sqlalchemy_utils.types.uuid.UUIDType(), nullable=False),
sa.Column('owner_name', sa.String(), nullable=True),
sa.Column('initialized', sa.Boolean(), nullable=False),
sa.Column('locked', sa.Boolean(), nullable=False),
sa.Column('create_date', sa.DateTime(), nullable=False),
sa.Column('file_name', sa.String(), nullable=False),
sa.ForeignKeyConstraint(['owner_name'], ['users.username'], ),
sa.PrimaryKeyConstraint('share_id')
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('shares')
op.drop_table('users')
op.drop_table('server_settings')
op.drop_table('blacklist_tokens')
# ### end Alembic commands ###

View File

@ -4,6 +4,7 @@ from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow
from flask_bcrypt import Bcrypt
from flask_migrate import Migrate
from .config import DevelopmentConfig, ProductionConfig, TestingConfig, overlay_config
app = Flask(__name__)
@ -22,6 +23,7 @@ with app.app_context():
bcrypt = Bcrypt(app)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
ma = Marshmallow()
_storage_method = app.config["SACHET_STORAGE"]

View File

@ -6,30 +6,6 @@ from flask.cli import AppGroup
from bitmask import Bitmask
db_cli = AppGroup("db")
@db_cli.command("create")
def create_db():
"""Create all db tables."""
db.create_all()
@db_cli.command("drop")
@click.option(
"--yes",
is_flag=True,
expose_value=False,
prompt="Are you sure you want to drop all tables?",
)
def drop_db():
"""Drop all db tables."""
db.drop_all()
app.cli.add_command(db_cli)
user_cli = AppGroup("user")

View File

@ -1,24 +1,10 @@
import pytest
from sachet.server.commands import create_db, drop_db, create_user, delete_user
from sachet.server import db
from sachet.server.commands import create_user, delete_user
from sqlalchemy import inspect
from sachet.server.models import User
def test_db(flask_app_bare, cli):
"""Test the CLI's ability to create and drop the DB."""
# make tables
result = cli.invoke(create_db)
assert result.exit_code == 0
assert "users" in inspect(db.engine).get_table_names()
# tear down
result = cli.invoke(drop_db, ["--yes"])
assert result.exit_code == 0
assert "users" not in inspect(db.engine).get_table_names()
def test_user(client, cli):
"""Test the CLI's ability to create then delete a user."""
# create user