auth: added logins

users can now request a login and we can check that they are
authenticated or administrator using function decorators
This commit is contained in:
dogeystamp 2023-03-10 09:19:21 -05:00
parent 8e1b3e4b81
commit 950265e013
Signed by: dogeystamp
GPG Key ID: 7225FE3592EFFA38
8 changed files with 141 additions and 12 deletions

View File

@ -9,6 +9,7 @@ greenlet==2.0.2
itsdangerous==2.1.2
Jinja2==3.1.2
MarkupSafe==2.1.2
PyJWT==2.6.0
PyYAML==6.0
six==1.16.0
SQLAlchemy==2.0.5.post1

View File

@ -18,5 +18,5 @@ db = SQLAlchemy(app)
import sachet.server.commands
from sachet.server.auth.views import auth_blueprint
app.register_blueprint(auth_blueprint)
from sachet.server.users.views import users_blueprint
app.register_blueprint(users_blueprint)

View File

@ -1,7 +0,0 @@
from flask import Blueprint
auth_blueprint = Blueprint("auth_blueprint", __name__)
@auth_blueprint.route('/')
def index():
return "Hello world"

View File

@ -1,7 +1,7 @@
import click
from sachet.server import app, db
from sachet.server.models import User
from sachet.server.auth import manage
from sachet.server.users import manage
from flask.cli import AppGroup

View File

@ -1,11 +1,13 @@
from sachet.server import app, db, bcrypt
from flask import request, jsonify
from functools import wraps
import datetime
import jwt
class User(db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(255), unique=True, nullable=False)
username = db.Column(db.String(255), unique=True, nullable=False, primary_key=True)
password = db.Column(db.String(255), nullable=False)
register_date = db.Column(db.DateTime, nullable=False)
admin = db.Column(db.Boolean, nullable=False, default=False)
@ -17,3 +19,64 @@ class User(db.Model):
self.username = username
self.register_date = datetime.datetime.now()
self.admin = admin
def encode_token(self):
"""Generates an authentication token"""
payload = {
"exp": datetime.datetime.utcnow() + datetime.timedelta(days=7),
"iat": datetime.datetime.utcnow(),
"sub": self.username
}
return jwt.encode(
payload,
app.config.get("SECRET_KEY"),
algorithm="HS256"
)
def _token_decorator(require_admin, f, *args, **kwargs):
"""Generic function for checking tokens.
require_admin: require user to be administrator to authenticate
"""
token = None
auth_header = request.headers.get("Authorization")
if auth_header:
try:
token = auth_header.split(" ")[1]
except IndexError:
resp = {
"status": "fail",
"message": "Malformed Authorization header."
}
return jsonify(resp)
if not token:
return jsonify({"status": "fail", "message": "Missing auth token"}), 401
try:
data = jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"])
user = User.query.filter_by(username=data.get("sub")).first()
except:
return jsonify({"status": "fail", "message": "Invalid auth token."}), 401
if not user:
return jsonify({"status": "fail", "message": "Invalid auth token."}), 401
if require_admin and not user.admin:
return jsonify({"status": "fail", "message": "You are not authorized to view this page."}), 403
return f(user, *args, **kwargs)
def token_required(f):
"""Decorator to require authentication."""
@wraps(f)
def decorator(*args, **kwargs):
return _token_decorator(False, f, *args, **kwargs)
return decorator
def admin_required(f):
"""Decorator to require authentication and admin privileges."""
@wraps(f)
def decorator(*args, **kwargs):
return _token_decorator(True, f, *args, **kwargs)
return decorator

View File

@ -2,6 +2,12 @@ from sachet.server import app, db
from sachet.server.models import User
def create_user(admin, username, password):
# to reduce confusion with API endpoints
forbidden = {"login", "logout", "extend"}
if username in forbidden:
raise KeyError(f"Username '{username}' is reserved and can not be used.")
user = User.query.filter_by(username=username).first()
if not user:
user = User(

View File

@ -0,0 +1,66 @@
from flask import Blueprint, request, jsonify
from flask.views import MethodView
from sachet.server.models import token_required, admin_required, User
from sachet.server import bcrypt
users_blueprint = Blueprint("users_blueprint", __name__)
class LoginAPI(MethodView):
def post(self):
post_data = request.get_json()
user = User.query.filter_by(username=post_data.get("username")).first()
if not user:
resp = {
"status": "fail",
"message": "Invalid credentials."
}
return jsonify(resp), 401
if bcrypt.check_password_hash(
user.password, post_data.get("password", "")
):
token = user.encode_token()
resp = {
"status": "success",
"message": "Logged in.",
"username": user.username,
"auth_token": token
}
return jsonify(resp), 200
else:
resp = {
"status": "fail",
"message": "Invalid credentials.",
}
return jsonify(resp), 401
users_blueprint.add_url_rule(
"/users/login",
view_func=LoginAPI.as_view("login_api"),
methods=['POST']
)
class UserAPI(MethodView):
"""User information API"""
@token_required
def get(user, self, username):
info_user = User.query.filter_by(username=username).first()
if (not info_user) or (info_user != user and not user.admin):
resp = {
"status": "fail",
"message": "You are not authorized to view this page."
}
return jsonify(resp), 403
return jsonify({
"username": info_user.username,
"admin": info_user.admin,
})
users_blueprint.add_url_rule(
"/users/<username>",
view_func=UserAPI.as_view("user_api"),
methods=['GET']
)